Tutorial Part‑4 — Package and embed in a microservice¶
In this tutorial you will:
- Embed a DataFlow in a tiny microservice.
- Add basic logging, a health check endpoint, and simple metrics hooks.
- Package and run it as a plain Java app.
Source reference in examples repository: TutorialPart4
Prerequisites¶
- JDK 21+
- Maven (wrapper provided) or JBang for a quick run
What we’ll build¶
- A simple service that receives synthetic events on a scheduler, computes a rolling metric, logs outputs, and exposes:
- GET /health — reports ready
- GET /metrics — returns a few counters in text
Option A — Run with JBang (single file demo)¶
- Create a file TutorialPart4.java with the code below.
vi TutorialPart4.java
jbang TutorialPart4.java
//DEPS com.telamin.fluxtion:fluxtion-builder:0.9.12
//DEPS org.slf4j:slf4j-simple:2.0.16
//JAVA 25
import com.sun.net.httpserver.HttpServer;
import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.builder.flowfunction.FlowBuilder;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.flowfunction.aggregate.function.primitive.IntAverageFlowFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TutorialPart4 {
private static final Logger LOG = LoggerFactory.getLogger(TutorialPart4.class);
record Request(int latencyMs) {
}
public static void main(String[] args) throws Exception {
LOG.info("Starting microservice with embedded DataFlow");
// Metrics
AtomicLong eventsIn = new AtomicLong();
AtomicLong alertsOut = new AtomicLong();
AtomicLong avgLatency = new AtomicLong();
// Build a DataFlow computing rolling average latency
// 5s window, 1s buckets
DataFlow flow = DataFlowBuilder
.subscribe(Request.class)
.map(Request::latencyMs)
.slidingAggregate(IntAverageFlowFunction::new, 1000, 5)
.sink("avgLatency")
.map(avg -> avg > 250 ? "ALERT: high avg latency " + avg + "ms" : "data:" + avg + "ms")
.sink("alerts")
.build();
// Wire sinks to logging/metrics
flow.addSink("avgLatency", (Number avg) -> {
avgLatency.set(avg.longValue());
LOG.info("avgLatency={}ms", avg);
});
flow.addSink("alerts", (String msg) -> {
alertsOut.incrementAndGet();
LOG.warn("{}", msg);
});
// Start a tiny HTTP server (health + metrics)
HttpServer server = httpServer(8080, eventsIn, alertsOut, avgLatency);
server.start();
LOG.info("HTTP server started on http://localhost:8080");
// Drive synthetic requests
var exec = Executors.newSingleThreadScheduledExecutor();
var rnd = new Random();
exec.scheduleAtFixedRate(() -> {
int latency = 100 + rnd.nextInt(300); // 100..399ms
eventsIn.incrementAndGet();
flow.onEvent(new Request(latency));
}, 100, 200, TimeUnit.MILLISECONDS);
LOG.info("Service running. Try: curl -s localhost:8080/health | jq, curl -s localhost:8080/metrics");
}
private static HttpServer httpServer(int port, AtomicLong in, AtomicLong alerts, AtomicLong avg) throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/health", exchange -> {
String body = "{\"status\":\"UP\",\"time\":\"" + Instant.now() + "\"}";
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(200, bytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(bytes);
}
});
server.createContext("/metrics", exchange -> {
String body = "events_in " + in.get() + "\n" +
"alerts_out " + alerts.get() + "\n" +
"avg_latency_ms " + avg.get() + "\n";
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "text/plain; version=0.0.4");
exchange.sendResponseHeaders(200, bytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(bytes);
}
});
return server;
}
}
Option B — Maven project¶
- Add dependencies and a main class similar to the above. Recommended POM fragments:
<dependency>
<groupId>com.telamin.fluxtion</groupId>
<artifactId>fluxtion-builder</artifactId>
<version>0.9.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>runtime</scope>
</dependency>
- To create a single runnable JAR, you can use Maven shade or your preferred packager. Example (optional):
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.TutorialPart4</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
What you should see¶
- Print metrics and health check endpoints.
- Console logs for avg latency and any high‑latency alerts
fluxtion-exmples % jbang TutorialPart4.java
[jbang] Resolving dependencies...
[jbang] com.telamin.fluxtion:fluxtion-builder:0.9.12
[jbang] org.slf4j:slf4j-simple:2.0.16
[jbang] Dependencies resolved
[jbang] Building jar for TutorialPart4.java...
[main] INFO TutorialPart4 - Starting microservice with embedded DataFlow
[main] INFO TutorialPart4 - HTTP server started on http://localhost:8080
[main] INFO TutorialPart4 - Service running. Try: curl -s localhost:8080/health | jq, curl -s localhost:8080/metrics
[pool-1-thread-1] INFO TutorialPart4 - avgLatency=270ms
[pool-1-thread-1] WARN TutorialPart4 - ALERT: high avg latency 270ms
How to verify¶
Start a new terminal and try to query the service REST endpoints with curl:
Health check¶
REST query:
curl -s localhost:8080/health
REST response:
{"status":"UP","time":"2025-09-27T08:46:25.798373Z"}
Metrics:¶
REST query:
curl -s localhost:8080/metrics
REST response:
events_in 541
alerts_out 104
avg_latency_ms 276
Key ideas reinforced¶
- Fluxtion is an embeddable library: no external server required.
- Sinks are a natural way to hook into logging and metrics.
- Simple HTTP endpoints can be added with JDK HttpServer; use your framework of choice in real services.
Where to next¶
- Explore interpreted vs compiled graphs in Concepts and architecture.
- Add your own sinks for Prometheus, OpenTelemetry, or your logging framework.
- Repackage the flow as an AOT compiled graph for lower latency on critical paths.