Advanced topics¶
Collection support¶
Collections or arrays of references are supported, if any element in the collection fires a change notification the trigger method will be called. The trigger method is invoked only once per event cycle whatever the number of parent's updating.
See sample - CollectionSupport.java
Parent change identity can be tracked using the @OnParentUpdate
annotation.
public class CollectionSupport {
public static class MyNode {
@FilterId
private final String filter;
private final String name;
public MyNode(String filter, String name) {
this.filter = filter;
this.name = name;
}
@OnEventHandler
public boolean handleIntSignal(Signal.IntSignal intSignal) {
System.out.printf("MyNode-%s::handleIntSignal - %s%n", filter, intSignal.getValue());
return true;
}
}
public static class Child {
private final MyNode[] nodes;
private int updateCount;
public Child(MyNode... nodes) {
this.nodes = nodes;
}
@OnParentUpdate
public void parentUpdated(MyNode updatedNode) {
updateCount++;
System.out.printf("parentUpdated '%s'%n", updatedNode.name);
}
@OnTrigger
public boolean triggered() {
System.out.printf("Child::triggered updateCount:%d%n%n", updateCount);
updateCount = 0;
return true;
}
}
public static void main(String[] args) {
var processor = DataFlowBuilder.subscribeToNode(new Child(
new MyNode("A", "a_1"),
new MyNode("A", "a_2"),
new MyNode("B", "b_1")))
.build();
processor.publishIntSignal("A", 10);
processor.publishIntSignal("B", 25);
processor.publishIntSignal("A", 12);
processor.publishIntSignal("C", 200);
}
}
Output
MyNode-A::handleIntSignal - 10
parentUpdated 'a_1'
MyNode-A::handleIntSignal - 10
parentUpdated 'a_2'
Child::triggered updateCount:2
MyNode-B::handleIntSignal - 25
parentUpdated 'b_1'
Child::triggered updateCount:1
MyNode-A::handleIntSignal - 12
parentUpdated 'a_1'
MyNode-A::handleIntSignal - 12
parentUpdated 'a_2'
Child::triggered updateCount:2
Forking concurrent trigger methods¶
Forking trigger methods is supported. If multiple trigger methods are fired from a single parent they can be forked to run in parallel using the fork join pool. Only when all the forked trigger methods have completed will an event notification be propagated to their children.
See sample - ForkJoinSupport.java
To for a trigger callback use @OnTrigger(parallelExecution = true)
annotation on the callback method.
public class ForkJoinSupport {
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.printf("%s MyNode::handleStringEvent %n", Thread.currentThread().getName());
return true;
}
}
public static class ForkedChild {
private final MyNode myNode;
private final int id;
public ForkedChild(MyNode myNode, int id) {
this.myNode = myNode;
this.id = id;
}
@OnTrigger(parallelExecution = true)
public boolean triggered() {
int millisSleep = new Random(id).nextInt(25, 200);
String threadName = Thread.currentThread().getName();
System.out.printf("%s ForkedChild[%d]::triggered - sleep:%d %n", threadName, id, millisSleep);
try {
Thread.sleep(millisSleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%s ForkedChild[%d]::complete %n", threadName, id);
return true;
}
}
public static class ResultJoiner {
private final ForkedChild[] forkedTasks;
public ResultJoiner(ForkedChild[] forkedTasks) {
this.forkedTasks = forkedTasks;
}
public ResultJoiner(int forkTaskNumber){
MyNode myNode = new MyNode();
forkedTasks = new ForkedChild[forkTaskNumber];
for (int i = 0; i < forkTaskNumber; i++) {
forkedTasks[i] = new ForkedChild(myNode, i);
}
}
@OnTrigger
public boolean complete(){
System.out.printf("%s ResultJoiner:complete %n%n", Thread.currentThread().getName());
return true;
}
}
public static void main(String[] args) {
var processor = DataFlowBuilder
.subscribeToNode(new ResultJoiner(5))
.build();
Instant start = Instant.now();
processor.onEvent("test");
System.out.printf("duration: %d milliseconds%n", Duration.between(start, Instant.now()).toMillis());
}
}
Output
main MyNode::handleStringEvent
ForkJoinPool.commonPool-worker-1 ForkedChild[0]::triggered - sleep:135
ForkJoinPool.commonPool-worker-2 ForkedChild[1]::triggered - sleep:85
ForkJoinPool.commonPool-worker-3 ForkedChild[2]::triggered - sleep:58
ForkJoinPool.commonPool-worker-4 ForkedChild[3]::triggered - sleep:184
ForkJoinPool.commonPool-worker-5 ForkedChild[4]::triggered - sleep:112
ForkJoinPool.commonPool-worker-3 ForkedChild[2]::complete
ForkJoinPool.commonPool-worker-2 ForkedChild[1]::complete
ForkJoinPool.commonPool-worker-5 ForkedChild[4]::complete
ForkJoinPool.commonPool-worker-1 ForkedChild[0]::complete
ForkJoinPool.commonPool-worker-4 ForkedChild[3]::complete
main ResultJoiner:complete
duration: 184 milliseconds
Batch support¶
Batch callbacks are supported through the BatchHandler interface that the generated EventHandler implements. Any methods
that are annotated with, @OnBatchPause
or @OnBatchEnd
will receive calls from the matching BatchHandler method.
See sample - BatchSupport.java
public class BatchSupport {
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode event received:" + stringToProcess);
return true;
}
@OnBatchPause
public void batchPause(){
System.out.println("MyNode::batchPause");
}
@OnBatchEnd
public void batchEnd(){
System.out.println("MyNode::batchEnd");
}
}
public static void main(String[] args) {
var processor = DataFlowBuilder
.subscribeToNode(new MyNode())
.build();
processor.onEvent("test");
//use BatchHandler service
BatchHandler batchHandler = (BatchHandler)processor;
batchHandler.batchPause();
batchHandler.batchEnd();
}
}
Output
MyNode event received:test
MyNode::batchPause
MyNode::batchEnd