Getting started: Stream programming with Fluxtion DataFlow¶
A runnable example of this tutorial here: Stream programming tutorial.
For all examples, see: mongoose-examples repository.
This guide shows how to build dataflow pipelines that connect feeds (sources) to functional logic and sinks (outputs) with minimal infrastructure code. The programming model mirrors Java Streams: you compose transformations, filters, and aggregates, then connect to inbound feeds and outbound sinks. Native support for Fluxtion DataFlow in Mongoose makes integration straightforward.
What you’ll do:
- Define a small functional pipeline using DataFlow operators (map, filter, window/aggregate)
- Connect the pipeline to named feeds and sinks provided by Mongoose
- Run the flow end‑to‑end and observe results
Focus: declarative, stream‑style logic; operational wiring (agents, threads, IO) is configured for you.
What we’ll build¶
- One in‑memory source producing strings on a feed named "prices"
- Stream logic: subscribe to the feed, map to upper-case, and print to console
- Output: console logging
Components and event flow¶
flowchart TD
subgraph "Inbound feed agent"
prices[prices feed]
end
subgraph "Processor agent"
DF[DataFlow pipeline<br>subscribe 'prices'<br> -> map toUpperCase<br> -> console]
end
prices --> DF
style DF text-align:left
End‑to‑end runnable code (see mongoose-examples repository):
- Main application: StreamProgrammingTutorial.java
1) Write the stream pipeline (functional logic)¶
The example uses Fluxtion DataFlow’s builder to subscribe to a named feed and compose operators inline.
Snippet from the example main:
import com.fluxtion.compiler.builder.dataflow.DataFlow;
import com.fluxtion.runtime.EventProcessor;
// build data stream processor subscribing to prices feed,
// with a simple map function and console output
EventProcessor<?> processor = (EventProcessor) DataFlow
.subscribeToFeed("prices", String.class)
.map(String::toUpperCase)
.console("Hello, {}")
.build();
Key points:
- DataFlow.subscribeToFeed("prices", String.class) subscribes to the named feed.
- map(String::toUpperCase) transforms the payload.
- console("Hello, {}") prints formatted output.
2) Provide a feed and boot the server¶
The example source:
public static void main(String[] args) {
// build data stream processor subscribing to prices feed,
// with a simple map function and console output
var processor = (EventProcessor) DataFlow
.subscribeToFeed("prices", String.class)
.map(String::toUpperCase)
.console("Hello, {}")
.build();
var eventProcessorConfig = EventProcessorConfig.builder()
.name("filter-processor")
.handler(processor)
.build();
// Build EventFeed configs with name: 'prices'
var prices = new InMemoryEventSource<>();
var pricesFeed = EventFeedConfig.builder()
.instance(prices)
.name("prices")
.wrapWithNamedEvent(true)
.agent("prices-agent", new BusySpinIdleStrategy())
.build();
// build server config
var mongooseServerConfig = MongooseServerConfig.builder()
.addProcessor("processor-agent", eventProcessorConfig)
.addEventFeed(pricesFeed)
.build();
//boot server
MongooseServer.bootServer(mongooseServerConfig);
//send some data
prices.offer("World!!");
}
Notes:
- wrapWithNamedEvent(true) aligns the feed’s naming with DataFlow.subscribeToFeed("prices", ...).
- The example builds the EventProcessor via DataFlow, then passes it as the handler to EventProcessorConfig.
3) Run and verify¶
Publish an event into the feed and observe console output.
// After booting the server and having a reference to prices feed:
prices.offer("World!!");
// Console output will include:
// Hello, WORLD!!
Why stream programming with Mongoose?¶
- Minimal plumbing: focus on map/filter/window/join logic; Mongoose wires feeds, sinks, and scheduling.
- Java‑stream‑like fluency: readable functional pipelines, but with event‑driven semantics.
- Native Fluxtion DataFlow: the pipeline compiles into an efficient event processor — no reflective overhead on hot path.
- Composable: add more nodes (timers, stateful ops, joins) as your flow grows.
- Portable: swap in Kafka/file/HTTP sources and sinks without touching the processing logic.
Next steps¶
- Explore the 5‑minute tutorials for event handlers and YAML configuration:
- five-minute-event-handler-tutorial.md
- five-minute-yaml-configuration-tutorial.md
- Browse more DataFlow operators in Fluxtion Runtime documentation.
- Try replacing the in‑memory sources/sinks with file or Kafka adapters in configuration.