Skip to content

Fluxtion


Welcome to Fluxtion!

Fluxtion is a Java library and builder for real-time, in‑memory dataflow processing. It helps you express business logic as a directed graph of small, composable functions and stateful components that update incrementally when new events arrive.

Think of it like a spreadsheet for streams: when an input changes, only the dependent cells recompute. Fluxtion brings this incremental, dependency-driven update model to event streams, enabling predictable, low‑latency processing with clear dependencies and excellent performance.

Highlights

  • DataFlow programming model: declare how values depend on each other, not when to recompute.
  • Deterministic execution: events propagate through a topologically sorted graph.
  • Low overhead: avoids generic reactive machinery; directly invokes methods in dependency order.
  • Scales down and up: great for microservices, trading systems, IoT gateways, or embedded analytics.
  • Familiar Java: plain objects and methods; no special runtime server required.
  • Infrastructure‑agnostic: not tied to Kafka Streams, Flink, or any specific platform—you choose the messaging/compute stack it runs on.

Performance at a glance

  • 50 million events per second (thrpt)
  • ~20 ns average latency per event including app logic
  • Low‑nanosecond processing overhead, zero GC, single‑threaded

See detailed benchmarks and methodology: Performance results.

Quickstart

Install Jbang and try a Hello world (Java 25):

//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//JAVA 25

import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;

void main() {
    DataFlow dataFlow = DataFlowBuilder
            .subscribe(String.class)           // accept String events
            .map(String::toUpperCase)          // transform
            .console("msg:{}")                 // print to console
            .build();                          // build the DataFlow

    dataFlow.onEvent("hello");  // prints: msg:HELLO
    dataFlow.onEvent("world");  // prints: msg:WORLD
}

copy the code into a hello.java and run it with jbang hello.java

> jbang hello.java 
[jbang] Building jar for hello.java...
msg:HELLO
msg:WORLD

Where Fluxtion fits

  • Real‑time analytics and monitoring
  • Complex event processing and enrichment
  • Per‑key aggregations (counts, moving windows)
  • Signal generation and alerting
  • Incremental computation pipelines

Choosing Fluxtion (compare to Kafka Streams, Reactor, Flink)

Not sure if Fluxtion is the right fit? Read the decision guide: Choosing Fluxtion.

Explore examples

Browse runnable samples in one click: Examples catalog.

Core building blocks

  • Events: any Java object submitted into the DataFlow.
  • Nodes: functions or stateful components wired together by dependencies.
  • Graph: a DAG computed by the builder that determines dispatch order.
  • Handlers and triggers: annotated methods that receive events or fire when dependencies update.

Code samples

See example WindowExample.java

//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//JAVA 25

import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.flowfunction.helpers.Aggregates;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

record CarTracker(String id, double speed) { }

void main() {
    //calculate average speed, sliding window 5 buckets of 200 millis
    DataFlow averageCarSpeed = DataFlowBuilder
        .subscribe(CarTracker::speed)
        .slidingAggregate(Aggregates.doubleAverageFactory(), 200, 5)
        .map(v -> "average speed: " + v.intValue() + " km/h")
        .sink("average car speed")
        .build();

    //register an output sink
    averageCarSpeed.addSink("average car speed", System.out::println);

    //send data from an unbounded real-time feed
    Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(
                    () -> averageCarSpeed.onEvent(
                        new CarTracker("car-reg", 
                        new Random().nextDouble(100))),
            100, 100, TimeUnit.MILLISECONDS);
}

copy the code into a WindowExample.java and run it with jbang WindowExample.java

vi WindowExample.java

console output:

jbang WindowExample.java
[jbang] Building jar for WindowExample.java...
average speed: 59 km/h
average speed: 60 km/h
average speed: 68 km/h

See example TriggerExample.java

//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//JAVA 25

import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.flowfunction.helpers.Aggregates;

void main() {
    var resetSignal = DataFlowBuilder.subscribeToSignal("resetTrigger");
    var publishSignal = DataFlowBuilder.subscribeToSignal("publishSumTrigger");

    DataFlow sumDataFlow = DataFlowBuilder
            .subscribe(Integer.class)
            .aggregate(Aggregates.intSumFactory())
            .resetTrigger(resetSignal)
            .filter(i -> i != 0)
            .publishTriggerOverride(publishSignal)
            .console("Current sum:{}")
            .build();

    sumDataFlow.onEvent(10);
    sumDataFlow.onEvent(50);
    sumDataFlow.onEvent(32);
    //publish
    sumDataFlow.publishSignal("publishSumTrigger");

    //reset sum
    sumDataFlow.publishSignal("resetTrigger");

    //new sum
    sumDataFlow.onEvent(8);
    sumDataFlow.onEvent(17);
    //publish
    sumDataFlow.publishSignal("publishSumTrigger");
}

copy the code into a Triggering.java and run it with jbang Triggering.java

vi Triggering.java

console output:

jbang Triggering.java
[jbang] Building jar for Triggering.java...
Current sum:92
Current sum:25

See example SubscribeToNodeSample.java

//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//JAVA 25

import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.annotations.OnEventHandler;
import com.telamin.fluxtion.runtime.flowfunction.helpers.Collectors;

void main() {
    DataFlow processor = DataFlowBuilder
            .subscribeToNode(new MyComplexNode())
            .console("node triggered -> {}")
            .map(MyComplexNode::getIn)
            .aggregate(Collectors.listFactory(4))
            .console("last 4 elements:{}\n")
            .build();

    processor.onEvent("A");
    processor.onEvent("B");
    processor.onEvent("C");
    processor.onEvent("D");
    processor.onEvent("E");
    processor.onEvent("F");
}

public static class MyComplexNode {
    private String in;

    @OnEventHandler
    public boolean stringUpdate(String in) {
        this.in = in;
        return true;
    }

    public String getIn() {
        return in;
    }
}

copy the code into a StatefulFunction.java and run it with jbang StatefulFunction.java

vi StatefulFunction.java

console output:

jbang StatefulFunction.java
[jbang] Building jar for StatefulFunction.java...
node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[A]

node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[A, B]

node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[A, B, C]

node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[A, B, C, D]

node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[B, C, D, E]

node triggered -> scratch$MyComplexNode@4923ab24
last 4 elements:[C, D, E, F]

See example PollFeedExample

//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//JAVA 25
//JAVA_OPTIONS --add-opens java.base/jdk.internal.misc=ALL-UNNAMED

import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.connector.DataFlowConnector;
import com.telamin.fluxtion.runtime.connector.FileEventFeed;
import com.telamin.fluxtion.runtime.connector.FileMessageSink;
import com.telamin.fluxtion.runtime.eventfeed.ReadStrategy;

public class TutorialPart5 {
    public static void main(String[] args) throws Exception {
        // Feed: publish each line from input file
        // as a String event into feed "myFeed"
        FileEventFeed myFileFeed = new FileEventFeed(
                "./tutorial4-input.txt",    // input file to tail
                "myFeed",                   // logical feed name
                ReadStrategy.EARLIEST       // tail from start of file
        );

        // DataFlow: subscribe to the named feed, 
        // log, uppercase, log, then emit to a named sink
        DataFlow dataFlow = DataFlowBuilder
                .subscribeToFeed("myFeed", String.class)
                .console("read file in:{}")
                .map(String::toUpperCase)
                .console("write file out:{}\n")
                .sink("output")              // name the sink "output"
                .build();

        // Sink: bind sink name "output" to an output file
        FileMessageSink outputFile = new FileMessageSink("./tutorial4-output.txt");

        // Connector: owns threads for the processor and the feed. 
        // Then wires everything together
        DataFlowConnector runner = new DataFlowConnector();
        runner.addDataFlow(dataFlow);
        runner.addFeed(myFileFeed);
        runner.addSink("output", outputFile);

        // Start: spins up threads and begins processing
        runner.start();
    }
}

copy the code into a StatefulFunction.java and run it with jbang StatefulFunction.java

echo "mon\ntue\nwed" > tutorial4-input.txt
vi StatefulFunction.java

console output:

jbang TutorialPart5.java
[jbang] Resolving dependencies...
[jbang]    com.telamin.fluxtion:fluxtion-builder:0.9.9
[jbang] Dependencies resolved
[jbang] Building jar for TutorialPart5.java...
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.DataFlowConnector start
INFO: Starting DataFlowRunner
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.EventFeedToDataFlowAgent lambda$checkForRegistrationUpdates$5
INFO: adding sink DefaultEvent(filterId=2147483647, filterString=output, eventTime=1759403423721) to dataflow com.telamin.fluxtion.builder.generation.target.InMemoryEventProcessor@77abba44
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.FileMessageSink start
INFO: Starting FileMessageSink outputFile: ./tutorial4-output.txt
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.EventFeedToDataFlowAgent checkForRegistrationUpdates
INFO: add feed FileEventFeed{name='myFeed'filename='./tutorial4-input.txt'}
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.EventFeedToDataFlowAgent checkForRegistrationUpdates
INFO: adding eventFeed FileEventFeed{name='myFeed'filename='./tutorial4-input.txt'} to dataflow com.telamin.fluxtion.builder.generation.target.InMemoryEventProcessor@77abba44
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.eventfeed.EventFeedToDataFlowPublisher addDataFlowReceiver
INFO: myFeed adding dataflow: com.telamin.fluxtion.builder.generation.target.InMemoryEventProcessor@77abba44
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.eventfeed.EventFeedToDataFlowPublisher addDataFlowReceiver
INFO: myFeed remaining dataflows: 1
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.FileEventFeed onStart
INFO: start FileEventSource myFeed file:./tutorial4-input.txt tail:true once:false, commitRead:false latestRead:false readStrategy:EARLIEST
Oct 02, 2025 12:10:23 PM com.telamin.fluxtion.runtime.connector.FileEventFeed connectReader
INFO: Opened ./tutorial4-input.txt for reading offset 0
read file in:mon
write file out:MON

read file in:tue
write file out:TUE

read file in:wed
write file out:WED

See example MultiFeedJoinExample

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MultiFeedJoinExample {
    public static void main(String[] args) {
        //stream of realtime machine temperatures grouped by machineId
        DataFlow currentMachineTemp = DataFlowBuilder
                .groupBy(MachineReadingEvent::id, MachineReadingEvent::temp);

        //create a stream of averaged machine sliding temps,
        //with a 4-second window and 1 second buckets grouped by machine id
        DataFlow avgMachineTemp = DataFlowBuilder
                .subscribe(MachineReadingEvent.class)
                .groupBySliding(
                    MachineReadingEvent::id,
                    MachineReadingEvent::temp,
                    DoubleAverageFlowFunction::new,
                    1000,4);

        //join machine profiles with contacts and then with readings.
        //Publish alarms with stateful user function
        DataFlow tempMonitor = DataFlowBuilder
                .groupBy(MachineProfileEvent::id)
                .mapValues(MachineState::new)
                .mapBi(DataFlowBuilder.groupBy(SupportContactEvent::locationCode), Helpers::addContact)
                .innerJoin(currentMachineTemp, MachineState::setCurrentTemperature)
                .innerJoin(avgMachineTemp, MachineState::setAvgTemperature)
                .publishTriggerOverride(FixedRateTrigger.atMillis(1_000))
                .filterValues(MachineState::outsideOperatingTemp)
                .map(GroupBy::toMap)
                .map(new AlarmDeltaFilter()::updateActiveAlarms)
                .filter(AlarmDeltaFilter::isChanged)
                .sink("alarmPublisher")
                .build();

        runSimulation(tempMonitor);
    }

    private static void runSimulation(DataFlow tempMonitor) {
        //any java.util.Consumer can be used as sink
        tempMonitor.addSink("alarmPublisher", Helpers::prettyPrintAlarms);

        //set up machine locations
        tempMonitor.onEvent(
            new MachineProfileEvent("server_GOOG", LocationCode.USA_EAST_1, 70, 48));
        tempMonitor.onEvent(
            new MachineProfileEvent("server_AMZN", LocationCode.USA_EAST_1, 99.999, 65));
        tempMonitor.onEvent(
            new MachineProfileEvent("server_MSFT", LocationCode.USA_EAST_2,92, 49.99));
        tempMonitor.onEvent(
            new MachineProfileEvent("server_TKM", LocationCode.USA_EAST_2,102, 50.0001));

        //set up support contacts
        tempMonitor.onEvent(
            new SupportContactEvent("Jean", LocationCode.USA_EAST_1, "jean@fluxtion.com"));
        tempMonitor.onEvent(
            new SupportContactEvent("Tandy", LocationCode.USA_EAST_2, "tandy@fluxtion.com"));

        //Send random MachineReadingEvent using `DataFlow.onEvent` 
        Random random = new Random();
        final String[] MACHINE_IDS = new String[]{
            "server_GOOG", "server_AMZN", "server_MSFT", "server_TKM"};

        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
                    String machineId = MACHINE_IDS[
                        random.nextInt(MACHINE_IDS.length)];

                    double temperatureReading = random.nextDouble() * 100;

                    tempMonitor.onEvent(
                        new MachineReadingEvent(machineId, temperatureReading));
                },
                10_000, 1, TimeUnit.MICROSECONDS);

        System.out.println("Simulation started - wait four seconds for first machine readings\n");
    }
}

console output:

Application started - wait four seconds for first machine readings

ALARM UPDATE 09:45:20.685
New alarms: ['server_GOOG@USA_EAST_1',  temp:'97.57', avgTemp:'50.07' SupportContactEvent[name=Jean, locationCode=USA_EAST_1, contactDetails=jean@fluxtion.com], 'server_TKM@USA_EAST_2',  temp:'5.15', avgTemp:'50.02' SupportContactEvent[name=Tandy, locationCode=USA_EAST_2, contactDetails=tandy@fluxtion.com], 'server_MSFT@USA_EAST_2',  temp:'46.72', avgTemp:'50.00' SupportContactEvent[name=Tandy, locationCode=USA_EAST_2, contactDetails=tandy@fluxtion.com]]
Alarms to clear[]
Current alarms[server_GOOG, server_TKM, server_MSFT]
------------------------------------

ALARM UPDATE 09:45:21.680
New alarms: []
Alarms to clear[server_TKM, server_MSFT]
Current alarms[server_GOOG]
------------------------------------

ALARM UPDATE 09:45:26.680
New alarms: ['server_MSFT@USA_EAST_2',  temp:'97.16', avgTemp:'49.91' SupportContactEvent[name=Tandy, locationCode=USA_EAST_2, contactDetails=tandy@fluxtion.com]]
Alarms to clear[]
Current alarms[server_GOOG, server_MSFT]
------------------------------------

Library dependencies

Maven (pom.xml):

<dependency>
  <groupId>com.telamin.fluxtion</groupId>
  <artifactId>fluxtion-builder</artifactId>
  <version>0.9.12</version>
</dependency>

Gradle (Kotlin DSL):

implementation("com.telamin.fluxtion:fluxtion-builder:0.9.12")

Start here

Reference