Skip to content

How-to: Event-driven ML inference with Fluxtion (features, calibration, MLR)

This guide shows how to build a simple, low-latency, event-driven inference pipeline using Fluxtion’s ML primitives. You will learn how to:

  • Define Features from events or flows (including lambdas)
  • Build a PredictiveLinearRegressionModel (supports multiple features)
  • Add an intercept (bias) using ConstantFeature
  • Calibrate features at runtime (by identifier or class)
  • Reset calibrations to one/zero and see immediate effects
  • Handle out-of-order calibration updates via versioning

The examples match the code in the fluxtion-runtime ML package and the generator-core tests.

Concepts and APIs

  • Feature: a model input with an identifier, friendly name, and current contribution value.
  • Subclass AbstractFeature for event-driven features.
  • Use FlowSuppliedFeature or PropertyToFeature to bind features to a FlowSupplier.
  • PredictiveLinearRegressionModel: sums Feature.value() across all features to produce predictedValue().
  • Calibration: a runtime message that updates a feature’s coefficient and weight.
  • Target by featureIdentifier (string) or by featureClass (defaults to class simple name).
  • Includes featureVersion: higher or equal versions are applied; lower versions are ignored.
  • CalibrationProcessor: service interface used to setCalibration, resetToOne, and resetToZero.
  • ConstantFeature: a built-in constant-1 feature, acting as model intercept.

Mermaid overview of the data path and calibration control plane:


flowchart LR
    subgraph DataFlow - inference path
      E[Event stream] --> F1[Feature: Area]
      E --> F2[Feature: Distance]
      CF[ConstantFeature - bias] --> M[PredictiveLinearRegressionModel]
      F1 --> M
      F2 --> M
      M --> O[predictedValue]
    end

    subgraph Control - calibration
      C[CalibrationProcessor.setCalibration - list of Calibration] --> F1
      C --> F2
      C --> CF
      C --> M
      R1[resetToOne or resetToZero] --> F1
      R1 --> F2
      R1 --> CF
      R1 --> M
    end

Notes:

  • Features recompute their contribution immediately when they receive a calibration or reset (no new data needed).
  • The model recomputes its sum when calibrations/resets are applied, so predictedValue() updates right away.

Minimal model with one feature

import com.telamin.fluxtion.Fluxtion;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.runtime.ml.PredictiveLinearRegressionModel;
import com.telamin.fluxtion.runtime.ml.AbstractFeature;
import com.telamin.fluxtion.runtime.annotations.OnEventHandler;

// Build the DataFlow using Fluxtion.compile(...)
DataFlow sep = Fluxtion.compile(c -> 
        c.addNode(new PredictiveLinearRegressionModel(new AreaFeature()), "predictiveModel"));

// Event-driven feature using AbstractFeature
public static class AreaFeature extends AbstractFeature {
    @OnEventHandler
    public boolean onHouse(HouseDetails house) {
        // Set the raw signal; AbstractFeature maintains contribution as raw * coefficient * weight
        return updateFromRaw(house.getArea());
    }
}

Calibrate by class (uses default identifier AreaFeature):

import com.telamin.fluxtion.runtime.ml.CalibrationProcessor;
import com.telamin.fluxtion.runtime.ml.Calibration;
import java.util.List;

sep.getExportedService(CalibrationProcessor.class).setCalibration(List.of(
    Calibration.builder()
        .featureClass(AreaFeature.class)
        .co_efficient(1.5)
        .weight(2)
        .featureVersion(1)
        .build()
));

Features from a FlowSupplier (lambda extractors)

import com.telamin.fluxtion.Fluxtion;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.flowfunction.FlowSupplier;
import com.telamin.fluxtion.runtime.ml.*;

DataFlow sep = Fluxtion.compile(c -> {
    FlowSupplier<HouseDetails> flow = DataFlowBuilder.subscribe(HouseDetails.class).flowSupplier();
    c.addNode(new PredictiveLinearRegressionModel(
            PropertyToFeature.build("area", flow, HouseDetails::getArea),
            PropertyToFeature.build("distance", flow, HouseDetails::getDistance)
    ), "predictiveModel");
});

Calibrate by explicit identifier:

sep.getExportedService(CalibrationProcessor.class).setCalibration(List.of(
    Calibration.builder().featureIdentifier("area").co_efficient(2).weight(1).featureVersion(3).build(),
    Calibration.builder().featureIdentifier("distance").co_efficient(1).weight(1).featureVersion(3).build()
));

Adding an intercept (bias)

Add a ConstantFeature to the model and set its coefficient to the desired bias value:

import com.telamin.fluxtion.Fluxtion;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.flowfunction.FlowSupplier;
import com.telamin.fluxtion.runtime.ml.*;

DataFlow sep = Fluxtion.compile(c -> {
    FlowSupplier<HouseDetails> flow = DataFlowBuilder.subscribe(HouseDetails.class).flowSupplier();
    c.addNode(new PredictiveLinearRegressionModel(
            new ConstantFeature(),
            PropertyToFeature.build("area", flow, HouseDetails::getArea)
    ), "predictiveModel");
});

sep.getExportedService(CalibrationProcessor.class).setCalibration(List.of(
    Calibration.builder().featureClass(ConstantFeature.class).co_efficient(3.0).weight(1.0).featureVersion(1).build()
));
// predictedValue() will be 3.0 until data arrives; then bias + sum(features)

Resets and immediate effects

Use resetToZero to disable all features (including bias), and resetToOne to restore neutral scaling:

CalibrationProcessor svc = sep.getExportedService(CalibrationProcessor.class);
svc.resetToZero();   // predictedValue() updates immediately
svc.resetToOne();    // predictedValue() updates immediately using last observed raw values

Implementation details:

  • AbstractFeature stores the last raw signal, recalculating contribution on calibration/reset.
  • If a feature hasn’t seen data yet, resetToZero sets contribution to 0; later resets will recompute when raw becomes available.

Versioning and out-of-order updates

Features ignore calibrations that have a lower featureVersion than the last applied value. This guards against out-of-order delivery. Equality is accepted (idempotent reapply).

// Apply version 10
svc.setCalibration(List.of(Calibration.builder().featureIdentifier("area").co_efficient(2).weight(1).featureVersion(10).build()));
// Out-of-order older update -> ignored
svc.setCalibration(List.of(Calibration.builder().featureIdentifier("area").co_efficient(100).weight(100).featureVersion(9).build()));

Full example

import com.telamin.fluxtion.Fluxtion;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.flowfunction.FlowSupplier;
import com.telamin.fluxtion.runtime.ml.*;

DataFlow sep = Fluxtion.compile(c -> {
    FlowSupplier<HouseDetails> flow = DataFlowBuilder.subscribe(HouseDetails.class).flowSupplier();
    c.addNode(new PredictiveLinearRegressionModel(
            new ConstantFeature(),
            PropertyToFeature.build("area", flow, HouseDetails::getArea),
            PropertyToFeature.build("distance", flow, HouseDetails::getDistance)
    ), "predictiveModel");
});

PredictiveModel predictiveModel = sep.getNodeById("predictiveModel");
CalibrationProcessor svc = sep.getExportedService(CalibrationProcessor.class);

svc.setCalibration(List.of(
    Calibration.builder().featureClass(ConstantFeature.class).co_efficient(5).weight(1).featureVersion(1).build(),
    Calibration.builder().featureIdentifier("area").co_efficient(1.5).weight(2).featureVersion(1).build(),
    Calibration.builder().featureIdentifier("distance").co_efficient(1).weight(1).featureVersion(1).build()
));

// Before data -> bias only
assert predictiveModel.predictedValue() == 5;

sep.onEvent(new HouseDetails(10, 2));
// Prediction: 5 + (10*1.5*2) + (2*1*1) = 5 + 30 + 2 = 37
assert predictiveModel.predictedValue() == 37;

svc.resetToZero(); // -> 0
svc.resetToOne();  // -> uses last raw values: 1*10 + 1*2 + 1*1 = 13

Tips

  • Choose clear feature identifiers when using lambda-based features to simplify calibration.
  • Use ConstantFeature for an intercept; you can also define multiple constants for step-changes or regime flags.
  • Keep coefficient as the trained parameter and use weight for operational calibration if you want separation of concerns.

Acting on inference: wiring application actions

Often you want to take action based on the prediction (e.g., emit an alert, write to a sink, or drive a control loop). You can add an application node that depends on the model and triggers when the model updates.

Mermaid sketch:

flowchart LR
    subgraph DataFlow - inference path
      E[Event stream] --> F1[Feature area]
      E --> F2[Feature distance]
      CF[ConstantFeature bias] --> M[PredictiveLinearRegressionModel]
      F1 --> M
      F2 --> M
      M --> A[Action node]
      A --> S[Alert sink]
    end

Example node taking action above a threshold:

import com.telamin.fluxtion.runtime.annotations.OnTrigger;
import com.telamin.fluxtion.runtime.annotations.builder.AssignToField;
import com.telamin.fluxtion.runtime.ml.PredictiveModel;
import java.util.function.DoubleConsumer;

public final class AlertOnThreshold {

    private final PredictiveModel model;
    private final double threshold;
    private final DoubleConsumer onAlert;

    public AlertOnThreshold(
            @AssignToField("model") PredictiveModel model,
            @AssignToField("threshold") double threshold,
            @AssignToField("onAlert") DoubleConsumer onAlert) {
        this.model = model;
        this.threshold = threshold;
        this.onAlert = onAlert;
    }

    // Triggers whenever upstream dependencies (the model) update
    @OnTrigger
    public boolean onPredictionUpdated() {
        double y = model.predictedValue();
        if (!Double.isNaN(y) && y >= threshold) {
            onAlert.accept(y);
        }
        // No need to propagate further by default
        return false;
    }
}

Wiring it into your graph:

import com.telamin.fluxtion.Fluxtion;
import com.telamin.fluxtion.runtime.DataFlow;
import com.telamin.fluxtion.builder.DataFlowBuilder;
import com.telamin.fluxtion.runtime.flowfunction.FlowSupplier;
import com.telamin.fluxtion.runtime.ml.*;

DataFlow sep = Fluxtion.compile(c -> {
    FlowSupplier<HouseDetails> flow = DataFlowBuilder.subscribe(HouseDetails.class).flowSupplier();
    PredictiveLinearRegressionModel m = new PredictiveLinearRegressionModel(
            new ConstantFeature(),
            PropertyToFeature.build("area", flow, HouseDetails::getArea)
    );
    c.addNode(m, "model");
    c.addNode(new AlertOnThreshold(m, 30.0, v -> System.out.println("ALERT: prediction=" + v)), "alert");
});

Notes:

  • The action node declares a dependency on the model by holding a reference; Fluxtion wires the dependency so that the action node's @OnTrigger method is called after the model updates.
  • Replace the DoubleConsumer with your own sink (Kafka producer, database writer, actuator control, etc.).