Skip to content

Trigger control


Fluxtion offers a way to override the triggering of a flow node in the event processor. There are four trigger controls available for client code to customise:

  • Flow.publishTrigger - Notifies a child node when triggered, adds a notification to the normal publish
  • Flow.publishTriggerOverride - Notifies a child node when triggered, removes all other publish notifications
  • Flow.updateTrigger - Overrides when the flow node runs its functional operation
  • Flow.resetTrigger - If the functional operation is stateful calls the reset function

In the trigger examples we are using the DataFlow.subscribeToSignal and processor.publishSignal to drive the trigger controls on the flow node.

PublishTrigger

In this example the publishTrigger control enables multiple publish calls for the flow node. Child notifications are in addition to the normal triggering operation of the flow node. The values in the parent node are unchanged when publishing.

publishTrigger(DataFlow.subscribeToSignal("publishMe"))

Child DataFlow nodes are notified when publishTrigger fires or the map function executes in a calculation cycle.

See sample - TriggerPublishSample.java

public class TriggerPublishSample {
    public static void main(String[] args) {
        DataFlow processor = DataFlowBuilder
                .subscribeToNode(new SubscribeToNodeSample.MyComplexNode())
                .console("node triggered -> {}")
                .map(SubscribeToNodeSample.MyComplexNode::getIn)
                .aggregate(Collectors.listFactory(4))
                .publishTrigger(DataFlowBuilder.subscribeToSignal("publishMe"))
                .console("last 4 elements:{}")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.onEvent("D");
        processor.onEvent("E");
        processor.onEvent("F");

        processor.publishSignal("publishMe");
        processor.publishSignal("publishMe");
        processor.publishSignal("publishMe");
    }
}

Running the example code above logs to console

node triggered -> SubscribeToNodeSample.MyComplexNode(in=A)
last 4 elements:[A]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=B)
last 4 elements:[A, B]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=C)
last 4 elements:[A, B, C]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=D)
last 4 elements:[A, B, C, D]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=E)
last 4 elements:[B, C, D, E]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=F)
last 4 elements:[C, D, E, F]
last 4 elements:[C, D, E, F]
last 4 elements:[C, D, E, F]
last 4 elements:[C, D, E, F]

PublishTriggerOverride

In this example the publishTrigger control overrides the normal triggering operation of the flow node. The child is notified only when publishTriggerOverride fires, changes due to recalculation are swallowed and not published downstream. The values in the parent node are unchanged when publishing.

publishTriggerOverride(DataFlow.subscribeToSignal("publishMe"))

Child DataFlow nodes are notified when publishTriggerOverride fires.

See sample - TriggerPublishOverrideSample.java

public class TriggerPublishOverrideSample {
    public static void main(String[] args) {
        DataFlow processor = DataFlowBuilder
                .subscribeToNode(new SubscribeToNodeSample.MyComplexNode())
                .console("node triggered -> {}")
                .map(SubscribeToNodeSample.MyComplexNode::getIn)
                .aggregate(Collectors.listFactory(4))
                .publishTriggerOverride(DataFlowBuilder.subscribeToSignal("publishMe"))
                .console("last 4 elements:{}\n")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.onEvent("D");

        processor.publishSignal("publishMe");
        processor.onEvent("E");
        processor.onEvent("F");
        processor.onEvent("G");
        processor.onEvent("H");

        processor.publishSignal("publishMe");
    }
}

Running the example code above logs to console

node triggered -> SubscribeToNodeSample.MyComplexNode(in=A)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=B)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=C)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=D)
last 4 elements:[A, B, C, D]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=E)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=F)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=G)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=H)
last 4 elements:[E, F, G, H]

UpdateTrigger

In this example the updateTrigger controls when the functional mapping operation of the flow node is invoked. The values are only aggregated when the update trigger is called. Notifications from the parent node are ignored and do not trigger a mapping operation.

updateTrigger(DataFlow.subscribeToSignal("updateMe"))

A map operation only occurs when the update trigger fires.

See sample - TriggerUpdateSample.java

public class TriggerUpdateSample {
    public static void main(String[] args) {
        var processor = DataFlowBuilder
                .subscribeToNode(new SubscribeToNodeSample.MyComplexNode())
                .console("node triggered -> {}")
                .map(SubscribeToNodeSample.MyComplexNode::getIn)
                .aggregate(Collectors.listFactory(4))
                .updateTrigger(DataFlowBuilder.subscribeToSignal("updateMe"))
                .console("last 4 elements:{}\n")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.publishSignal("updateMe");

        processor.onEvent("D");
        processor.onEvent("E");
        processor.onEvent("F");
        processor.publishSignal("updateMe");
    }
}

Running the example code above logs to console

node triggered -> SubscribeToNodeSample.MyComplexNode(in=A)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=B)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=C)
last 4 elements:[C]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=D)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=E)
node triggered -> SubscribeToNodeSample.MyComplexNode(in=F)
last 4 elements:[C, F]

ResetTrigger

In this example the resetTrigger controls when the functional mapping operation of the flow node is reset. The aggregate operation is stateful so all the values in the list are removed when then reset trigger fires. The reset operation causes trigger a notification to children of the flow node.

See sample - TriggerResetSample.java

resetTrigger(DataFlow.subscribeToSignal("resetMe"))

The reset trigger notifies the stateful function to clear its state.

public class TriggerResetSample {
    public static void main(String[] args) {
        DataFlow processor = DataFlowBuilder
                .subscribeToNode(new SubscribeToNodeSample.MyComplexNode())
                .console("node triggered -> {}")
                .map(SubscribeToNodeSample.MyComplexNode::getIn)
                .aggregate(Collectors.listFactory(4))
                .resetTrigger(DataFlowBuilder.subscribeToSignal("resetMe").console("\n--- resetTrigger ---"))
                .console("last 4 elements:{}")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.onEvent("D");

        processor.publishSignal("resetMe");
        processor.onEvent("E");
        processor.onEvent("F");
    }
}

Running the example code above logs to console

node triggered -> SubscribeToNodeSample.MyComplexNode(in=A)
last 4 elements:[A]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=B)
last 4 elements:[A, B]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=C)
last 4 elements:[A, B, C]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=D)
last 4 elements:[A, B, C, D]

--- resetTrigger ---
last 4 elements:[]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=E)
last 4 elements:[E]
node triggered -> SubscribeToNodeSample.MyComplexNode(in=F)
last 4 elements:[E, F]

Stateful function reset

Stateful functions can be reset by implementing the Stateful interface with a reset method. Configuring the resetTrigger will automatically route calls to the reset method of the stateful function.

See sample - ResetFunctionSample.java

public class ResetFunctionSample {
    public static void main(String[] args) {
        DataFlow processor = DataFlowBuilder
                .subscribe(String.class)
                .map(new MyResetSum()::increment)
                .resetTrigger(DataFlowBuilder.subscribeToSignal("resetMe"))
                .console("count:{}")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.onEvent("D");

        processor.publishSignal("resetMe");
        processor.onEvent("E");
        processor.onEvent("F");
    }

    public static class MyResetSum implements Stateful<Integer> {
        public int count = 0;

        public int increment(Object o) {
            return ++count;
        }

        @Override
        public Integer reset() {
            System.out.println("--- RESET CALLED ---");
            count = 0;
            return count;
        }
    }
}

Running the example code above logs to console

count:1
count:2
count:3
count:4
--- RESET CALLED ---
count:0
count:1
count:2