Skip to content

DataFlow


A DataFlow is a live structure where new events trigger a set of dispatch operations. We create a DataFlow with:

DataFlowBuilder.[subscribe operation].build()

Subscribe to event

To create a flow for String events, call DataFlowBuilder.subscribe(String.class), any call to processor.onEvent("myString") will be routed to this flow.

See sample - SubscribeToEventSample.java

DataFlow.subscribe(String.class);

Once a flow has been created map, filter, groupBy, etc. functions can be applied as chained calls.

public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribe(String.class)
            .console("string in {}")
            .build();

    processor.onEvent("AAA");
    processor.onEvent("BBB");
}

Running the example code above logs to console

string in AAA
string in BBB

Map

A map operation takes the output from a parent node and then applies a function to it. If the return of the function is null then the event notification no longer propagates down that path.

See sample - MapSample.java

DataFlow.subscribe(String.class)
        .map(String::toLowerCase);

Map supports

  • Stateless functions
  • Stateful functions
  • Primitive specialisation
  • Method references
public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribe(String.class)
            .map(String::toLowerCase)
            .console("string in {}")
            .build();

    processor.onEvent("AAA");
    processor.onEvent("BBB");
}

Running the example code above logs to console

string mapped aaa
string mapped bbb

BiMap

Two data flows can be mapped with a bi map function. Both flows must have triggered at least once for the bimap function to be invoked

See sample - BiMapSample.java

public static void main(String[] args) {
    var strings = DataFlowBuilder.subscribe(String.class);

    var ints = DataFlowBuilder.subscribe(Integer.class);

    DataFlow processor = DataFlowBuilder
            .mapBiFunction((a, b) -> Integer.parseInt(a) + b, strings, ints)
            .console("biMap ans: {}")
            .build();

    processor.onEvent("500");
    processor.onEvent(55);
}

Running the example code above logs to console

biMap ans: 555

Default value

A default value can be assigned to any flow. This can be useful when calculating a bi map function and one data flow argument is optional

See sample - DefaultValueSample.java

public static void main(String[] args) {
    var strings = DataFlowBuilder
            .subscribe(String.class)
            .defaultValue("200");

    var ints = DataFlowBuilder.subscribe(Integer.class);

    DataFlow processor = DataFlowBuilder
            .mapBiFunction((a, b) -> Integer.parseInt(a) + b, strings, ints)
            .console("biMap with default value ans: {}")
            .build();
    processor.onEvent(55);
}

Running the example code above logs to console

biMap with default value ans: 255

Filter

A filter predicate can be applied to a node to control event propagation, true continues the propagation and false swallows the notification. If the predicate returns true then the input to the predicate is passed to the next operation in the event processor.

See sample - FilterSample.java

DataFlow
    .subscribe(Integer.class)
    .filter(i -> i > 10);

Filter supports

  • Stateless functions
  • Stateful functions
  • Primitive specialisation
  • Method references
  • Inline lambdas - interpreted mode only support, AOT mode will not serialise the inline lambda
public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribe(Integer.class)
            .filter(i -> i > 10)
            .console("int {} > 10 ")
            .build();

    processor.onEvent(1);
    processor.onEvent(17);
    processor.onEvent(4);
}

Running the example code above logs to console

int 17 > 10

Reduce

There is no reduce function required in Fluxtion, stateful map functions perform the role of reduce. In a classic batch environment the reduce operation combines a collection of items into a single value. In a streaming environment the set of values is never complete, we can view the current value of a stateful map operation which is equivalent to the reduce operation. The question is rather, when is the value of the stateful map published and reset.

FlatMap

A Flatmap operation flattens a collection in a data flow. Any operations applied after the flatmap operation are performed on each element in the collection.

See sample - FlatMapSample.java

public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribe(String.class)
            .console("\ncsv in [{}]")
            .flatMap(s -> Arrays.asList(s.split(",")))
            .console("flattened item [{}]")
            .build();

    processor.onEvent("A,B,C");
    processor.onEvent("2,3,5,7,11");
}

Arrays can be flattened with:

[data flow].flatMapFromArray(Function<T, R[]> iterableFunction)

Running the example code above logs to console

csv in [A,B,C]
flattened item [A]
flattened item [B]
flattened item [C]

csv in [2,3,5,7,11]
flattened item [2]
flattened item [3]
flattened item [5]
flattened item [7]
flattened item [11]

Merge flows

Flows can be merged to output a single flow that can be operated on

See sample - MergeSample.java

public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder.merge(
                    DataFlowBuilder.subscribe(Long.class).console("long : {}"),
                    DataFlowBuilder.subscribe(String.class).console("string : {}").map(Mappers::parseLong),
                    DataFlowBuilder.subscribe(Integer.class).console("int : {}").map(Integer::longValue))
            .console("MERGED FLOW -> {}")
            .build();

    processor.onEvent(1234567890835L);
    processor.onEvent("9994567890835");
    processor.onEvent(123);
}

Running the example code above logs to console

long : 1234567890835
MERGED FLOW -> 1234567890835
string : 9994567890835
MERGED FLOW -> 9994567890835
int : 123
MERGED FLOW -> 123

Merge and map flows

See sample - MergeAndMapSample.java

Merge multiple streams of different types into a single output, applying a mapping operation to combine the different types. Only when at least one element from each required flow is received will the data flow publish. The upstream flows are merged into a user class that is published as the output of the merge flow. The target class is specified with:

MergeAndMapFlowBuilder.of(Supplier<T> mergeTargetSupplier)

Upstream flows are set on the merge target class with a Consumer operation on the target class, T:

[merge and map builder]<T>.required(DataFlow<F> updstreamFlow, BiConsumer<T, F>)

Merge inputs are supported that do not have to trigger to publish the flow downstream. The value in the merge target could be null if the upstream has not triggered and all the required flows have.

[merge and map builder]<T>.requiredNoTrigger(DataFlow<F> updstreamFlow, BiConsumer<T, F>)

public static void main(String[] args) {
    DataFlow processor = MergeAndMapFlowBuilder.of(MyData::new)
            .required(DataFlowBuilder.subscribe(String.class), MyData::setCustomer)
            .required(DataFlowBuilder.subscribe(Date.class), MyData::setDate)
            .required(DataFlowBuilder.subscribe(Integer.class), MyData::setId)
            .dataFlow()
            .console("new customer : {}")
            .build();

    processor.onEvent(new Date());
    processor.onEvent("John Doe");
    //only publishes when the last required flow is received
    processor.onEvent(123);
}

@Data
public static class MyData {
    private String customer;
    private Date date;
    private int id;
}

Running the example code above logs to console

new customer : MergeAndMapSample.MyData(customer=John Doe, date=Sat May 11 19:17:11 BST 2024, id=123)

Re-entrant events

See sample - ReEntrantEventSample.java

Events can be added for processing from inside the graph for processing in the next available cycle. Internal events are added to LIFO queue for processing in the correct order. The EventProcessor instance maintains the LIFO queue, any new input events are queued if there is processing currently acting. Support for internal event publishing is built into the streaming api.

Maps an int signal to a String and republishes to the graph

public static void main(String[] args) {
    DataFlowBuilder
            .subscribeToIntSignal("myIntSignal")
            .mapToObj(d -> "intValue:" + d)
            .console("republish re-entrant [{}]")
            .processAsNewGraphEvent();

    var processor = DataFlowBuilder
            .subscribe(String.class)
            .console("received [{}]")
            .build();

    processor.publishSignal("myIntSignal", 256);
}

Output

republish re-entrant [intValue:256]
received [intValue:256]

Sink

See sample - SinkExample.java

An application can register for output from the EventProcessor by supplying a consumer to addSink and removed with a call to removeSink. Bound classes can publish to sinks during an event process cycle, any registered sinks will see the update as soon as the data is published, not at the end of the cycle.

  • Adding sink - processor.addSink("mySink", (Consumer<T> t) ->{})
  • Removing sink - processor.removeSink("mySink")
public static void main(String[] args) {
    DataFlow processor = DataFlowBuilder
            .subscribeToIntSignal("myIntSignal")
            .mapToObj(d -> "intValue:" + d)
            .sink("mySink")//CREATE A SINK IN THE PROCESSOR
            .build();

    //ADDING SINK CONSUMER
    processor.addSink("mySink", (Consumer<String>) System.out::println);

    processor.publishSignal("myIntSignal", 10);
    processor.publishSignal("myIntSignal", 256);

    //REMOVING SINK CONSUMER
    processor.removeSink("mySink");
    processor.publishSignal("myIntSignal", 512);
}

Running the example code above logs to console

intValue:10
intValue:256

DataFlow node lookup by id

See sample - GetFlowNodeByIdExample.java

DataFlow nodes are available for lookup from an event processor instance using their name. In this case the lookup returns a reference to the wrapped value and not the wrapping node. The application can then use the reference to pull data from the node without requiring an event process cycle to push data to an output.

When building the graph with DSL a call to id makes that element addressable for lookup.

public static void main(String[] args) throws NoSuchFieldException {
    DataFlow processor = DataFlowBuilder
            .subscribe(String.class)
            .filter(s -> s.equalsIgnoreCase("monday"))
            //ID START - this makes the wrapped value accessible via the id
            .mapToInt(Mappers.count()).id("MondayChecker")
            //ID END
            .console("Monday is triggered")
            .build();

    processor.onEvent("Monday");
    processor.onEvent("Tuesday");
    processor.onEvent("Wednesday");

    //ACCESS THE WRAPPED VALUE BY ITS ID
    Integer mondayCheckerCount = processor.getStreamed("MondayChecker");
    System.out.println("Monday count:" + mondayCheckerCount + "\n");

    //ACCESS THE WRAPPED VALUE BY ITS ID
    processor.onEvent("Monday");
    mondayCheckerCount = processor.getStreamed("MondayChecker");
    System.out.println("Monday count:" + mondayCheckerCount);
}

Running the example code above logs to console

Monday is triggered
Monday count:1

Monday is triggered
Monday count:2

Graph of functions

Fluxtion automatically wraps the function in a node, actually a monad, and binds both into the event processor. The wrapping node handles all the event notifications, invoking the user function when it is triggered. Each wrapping node can be the head of multiple child flows forming complex graph structures that obey the dispatch rules. This is in contrast to classic java streams that have a terminal operation and a pipeline structure.

This example creates a simple graph structure, multiple stateful/stateless functions are bound to a single parent DataFlow.

We are using the DataFlow.console operation to print intermediate results to the screen for illustrative purposese. The console operation is a specialisation of DataFlow.peek.

public static void main(String[] args) {
    //STATEFUL FUNCTIONS
    MyFunctions myFunctions = new MyFunctions();

    var stringFlow = DataFlowBuilder
            .subscribe(String.class)
            .console("input: '{}'");

    var charCount = stringFlow
            .map(myFunctions::totalCharCount)
            .console("charCountAggregate: {}");

    var upperCharCount = stringFlow
            .map(myFunctions::totalUpperCaseCharCount)
            .console("upperCharCountAggregate: {}");

    DataFlowBuilder.mapBiFunction(new SimpleMath()::updatePercentage, upperCharCount, charCount)
            .console("percentage chars upperCase all words:{}");

    //STATELESS FUNCTION
    DataFlow processor = DataFlowBuilder
            .mapBiFunction(MyFunctions::wordUpperCasePercentage,
                    stringFlow.map(MyFunctions::upperCaseCharCount).console("charCourWord:{}"),
                    stringFlow.map(MyFunctions::charCount).console("upperCharCountWord:{}"))
            .console("percentage chars upperCase this word:{}\n")
            .build();

    processor.onEvent("test ME");
    processor.onEvent("and AGAIN");
    processor.onEvent("ALL CAPS");
}

Running the above with a strings 'test ME', 'and AGAIN' outputs

input: 'test ME'
charCountAggregate: 6
upperCharCountAggregate: 2
percentage chars upperCase all words:0.3333333333333333
charCourWord:2
upperCharCountWord:6
percentage chars upperCase this word:0.3333333333333333

input: 'and AGAIN'
charCountAggregate: 14
upperCharCountAggregate: 7
percentage chars upperCase all words:0.45
charCourWord:5
upperCharCountWord:8
percentage chars upperCase this word:0.625

input: 'ALL CAPS'
charCountAggregate: 21
upperCharCountAggregate: 14
percentage chars upperCase all words:0.5609756097560976
charCourWord:7
upperCharCountWord:7
percentage chars upperCase this word:1.0

Processing graph

Fluxtion DSL only requires the developer to write functions, any wrapping nodes are automatically added to the event processor. The compiler automatically selects stateful or stateless map functions, binding user instances if a stateful map function is specified.

flowchart TB

    EventA><b>InputEvent</b>::String]:::eventHandler 

    HandlerA[<b>Subscriber</b>::String\nid - stringFlow]:::graphNode

    MapData1[<b>Map - stateful</b>\nid - charCount\nmyFunctions::totalCharCount]:::graphNode 
    MapData2[<b>Map - stateful</b>\nid - upperCharCount\nmyFunctions::totalUpperCaseCharCount]:::graphNode

    BiMapSum[<b>BiMap - stateful</b>\nsimpleMath::updatePercentage]:::graphNode 
    BiMapSum2[<b>BiMap - stateless</b>\nMyFunctions::wordUpperCasePercentage]:::graphNode

    EventA --> HandlerA


    subgraph EventProcessor
      myFunctions[<b>User object::MyFunctions</b>\nid - myFunctions]  --- MapData1 & MapData2
      simpleMath[<b>User object::SimpleMath</b>\nid - simpleMath] ----- BiMapSum
      HandlerA --> MapData1 & MapData2 ---> BiMapSum
      MapData1 & MapData2 ---> BiMapSum2
    end

MyFunctions class is a normal java class bound into the event processor.

@Getter
public class MyFunctions {

    private long totalCharCount;
    private long upperCaseCharCount;

    public static long charCount(String s) {
        return s.length();
    }

    public static long upperCaseCharCount(String s) {
        return s.chars().filter(Character::isUpperCase).count();
    }

    public long totalCharCount(String s) {
        totalCharCount += charCount(s);
        return totalCharCount;
    }

    public long totalUpperCaseCharCount(String s) {
        upperCaseCharCount += upperCaseCharCount(s);
        return upperCaseCharCount;
    }

    public static double wordUpperCasePercentage(long longA, long longB) {
        return (double) longA /longB;
    }

    @Getter
    public static class SimpleMath {
        private double a;
        private double b;
        private double percentage;

        public double updatePercentage(long longA, long longB) {
            a += longA;
            b += longB;
            percentage = a / b;
            return percentage;
        }
    }
}