Skip to content

Connecting DataFlow and nodes


An event processor supports bi-directional linking between flows and normal java classes, also known as nodes, in the event processor.

Bridging functional and imperartive models

Connecting DataFlow and nodes is a powerful mechanism for joining functional and imperative programming in a streaming environment

Supported bindings:

  • Node to data flow. The node is the start of a data flow
  • Data flow to node. The node has runtime access to pull current value of a data flow
  • Data flow Push to node. Data is pushed from the data flow to the node
  • Data flow to event processor. Data flow pushes re-entrant events to parent event processor, triggers new calculation cycle

Node to DataFlow

A Dataflow can be created by subscribing to a node that has been imperatively added to the event processor. When the node triggers in a calculation cycle the DataFlow will be triggered. Create a DataFlow from a node with:

See sample - SubscribeToNodeSample.java

DataFlow.subscribeToNode(new MyComplexNode())

If the node referred to in the DataFlow.subscribeToNode method call is not in the event processor it will be bound automatically.

The example below creates an instance of MyComplexNode as the head of a DataFlow. When a String event is received the DataFlow path is executed. In this case we are aggregating into a list that has the four most recent elements

public class SubscribeToNodeSample {
    public static void main(String[] args) {
        DataFlow processor = DataFlowBuilder
                .subscribeToNode(new MyComplexNode())
                .console("node triggered -> {}")
                .map(MyComplexNode::getIn)
                .aggregate(Collectors.listFactory(4))
                .console("last 4 elements:{}\n")
                .build();

        processor.onEvent("A");
        processor.onEvent("B");
        processor.onEvent("C");
        processor.onEvent("D");
        processor.onEvent("E");
        processor.onEvent("F");
    }

    @Getter
    @ToString
    public static class MyComplexNode {
        private String in;

        @OnEventHandler
        public boolean stringUpdate(String in) {
            this.in = in;
            return true;
        }
    }
}

Running the example code above logs to console

node triggered -> SubscribeToNodeSample.MyComplexNode(in=A)
last 4 elements:[A]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=B)
last 4 elements:[A, B]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=C)
last 4 elements:[A, B, C]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=D)
last 4 elements:[A, B, C, D]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=E)
last 4 elements:[B, C, D, E]

node triggered -> SubscribeToNodeSample.MyComplexNode(in=F)
last 4 elements:[C, D, E, F]

DataFlow to node

A data flow can be consumed by a normal java class within the event processor. The data flow runtime class FlowSupplier

See sample - FlowSupplierAsMemberVariableSample.java

FlowSupplier is a normal java Supplier the current value can be accessed by calling get(). When the data flow triggers the OnTrigger callback method in the child class will be called.

When building the processor, the FlowSupplier is accessed with: [DataFlow].flowSupplier()

This example binds a data flow of String's to a java record that has an onTrigger method annotated with @OnTrigger

public static void main(String[] args) {
    FlowSupplier<String> stringFlow = DataFlowBuilder
            .subscribe(String.class)
            .flowSupplier();

    DataFlow processor = DataFlowBuilder
            .subscribeToNode(new MyFlowHolder(stringFlow))
            .build();

    processor.onEvent("test");
}

public record MyFlowHolder(FlowSupplier<String> flowSupplier) {
    @OnTrigger
    public boolean onTrigger() {
        //FlowSupplier is used at runtime to access the current value of the data flow
        System.out.println("triggered by data flow -> " + flowSupplier.get().toUpperCase());
        return true;
    }
}

Running the example code above logs to console

triggered by data flow -> TEST

Push to node

A data flow can push a value to any normal java class

See sample - PushSample.java

public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribe(String.class)
            .push(new MyPushTarget()::updated)
            .build();

    processor.onEvent("AAA");
    processor.onEvent("BBB");
}

public static class MyPushTarget {
    public void updated(String in) {
        System.out.println("received push: " + in);
    }
}

Running the example code above logs to console

received push: AAA
received push: BBB

Map from node property

A data flow can retrieve a value from any normal java class and map it to the data flow

See sample - MapFromNodePropertySample.java

public class MapNodeSupplierSample {
    public static void main(String[] args) {
        MyPushTarget myPushTarget = new MyPushTarget();
        DataFlow processor = DataFlowBuilder
                .subscribe(String.class)
                .push(myPushTarget::updated)
                .mapFromSupplier(myPushTarget::received)
                .console("Received - [{}]")
                .build();

        processor.onEvent("AAA");
        processor.onEvent("BBB");
    }

    public static class MyPushTarget {
        private String store = " ";
        public void updated(String in) {
            store += "'" + in + "' ";
        }

        public String received() {
            return store;
        }
    }
}

Running the example code above logs to console

Received - [ 'AAA' ]
Received - [ 'AAA' 'BBB' ]

Wrapping user functions

A data flow can wrap a user defined function and use it in the data flow. The function can be stateful or stateless, both binaary and unary functions are supported.

See sample - WrapFunctionsSample.java

public class WrapFunctionsSample {

    public static void main(String[] args) {
        //STATEFUL FUNCTIONS
        MyFunctions myFunctions = new MyFunctions();

        var stringFlow = DataFlowBuilder
                .subscribe(String.class)
                .console("input: '{}'");

        var charCount = stringFlow
                .map(myFunctions::totalCharCount)
                .console("charCountAggregate: {}");

        var upperCharCount = stringFlow
                .map(myFunctions::totalUpperCaseCharCount)
                .console("upperCharCountAggregate: {}");

        DataFlowBuilder
                .mapBiFunction(
                        new MyFunctions.SimpleMath()::updatePercentage, 
                        upperCharCount, 
                        charCount)
                .console("percentage chars upperCase all words:{}");

        //STATELESS FUNCTION
        DataFlow processor = DataFlowBuilder
                .mapBiFunction(MyFunctions::wordUpperCasePercentage,
                        stringFlow.map(MyFunctions::upperCaseCharCount).console("charCourWord:{}"),
                        stringFlow.map(MyFunctions::charCount).console("upperCharCountWord:{}"))
                .console("percentage chars upperCase this word:{}\n")
                .build();

        processor.onEvent("test ME");
        processor.onEvent("and AGAIN");
        processor.onEvent("ALL CAPS");
    }
}

Running the example code above logs to console

input: 'test ME'
charCountAggregate: 6
upperCharCountAggregate: 2
percentage chars upperCase all words:0.3333333333333333
charCourWord:2
upperCharCountWord:6
percentage chars upperCase this word:0.3333333333333333

input: 'and AGAIN'
charCountAggregate: 14
upperCharCountAggregate: 7
percentage chars upperCase all words:0.45
charCourWord:5
upperCharCountWord:8
percentage chars upperCase this word:0.625

input: 'ALL CAPS'
charCountAggregate: 21
upperCharCountAggregate: 14
percentage chars upperCase all words:0.5609756097560976
charCourWord:7
upperCharCountWord:7
percentage chars upperCase this word:1.0