How-to: Fan out one sink write to many targets with FanOutSink¶
FanOutSink is a policy layer over N downstream sinks. A processor
writes once to the fan-out; the fan-out then forwards that write to
every configured target, governed by per-target failure / retry /
circuit-breaker policy.
It lives in mongoose-plugins/library/lib-fanout (artifact
com.telamin:lib-fanout).
When to use it¶
- Compliance: every business write must also land in an audit sink. Operator-controlled, not application-controlled.
- Cross-tier deployment: send to a fast in-memory sink AND a durable file sink for replay-on-restart.
- Migration: send to old + new sinks in parallel, compare, cut over when confidence is high.
- Best-effort fan-out: one sink is critical, the rest are observability. CONTINUE policy + circuit-breaker isolates the noisy targets from the critical path.
Quick start¶
// Three downstream sinks named in server config — could be file,
// kafka, multicast, anything that registers a Service<MessageSink>.
FanOutSink fan = new FanOutSink();
fan.setTargetSinkNames(List.of("audit-file", "kafka-out", "metrics-sink"));
fan.setFailurePolicy(FanOutSink.FailurePolicy.CONTINUE);
fan.setCircuitOpenThreshold(5);
fan.setCircuitOpenMillis(30_000);
EventSinkConfig<?> fanCfg = EventSinkConfig.builder()
.instance(fan)
.name("trade-fanout") // processors write to this name
.build();
MongooseServerConfig.builder()
.addEventSink(fanCfg)
// … target sinks declared separately, by their own names …
.build();
The processor consumes the fan-out sink like any other sink — no awareness that it's a fan-out:
@ServiceRegistered
public void wire(MessageSink<Trade> sink, String name) {
if ("trade-fanout".equals(name)) this.sink = sink;
}
@OnEventHandler
public boolean onTrade(Trade t) {
sink.accept(t); // FanOutSink delivers this to audit-file +
// kafka-out + metrics-sink behind the scenes
return true;
}
Failure policies¶
FanOutSink.FailurePolicy controls how the fan-out reacts when one
target's accept throws:
| Policy | Behaviour |
|---|---|
CONTINUE (default) |
Log + continue with remaining targets. The fan-out itself never throws. Right for "one of these targets is critical, the rest are observability." |
FAIL_FAST |
Propagate the first exception. Right for "every target must succeed for this write to count." |
RETRY_THEN_DROP |
Retry the failing target N times (configurable via retryAttempts), then drop + log. Other targets receive normally. |
Circuit-breaker¶
After circuitOpenThreshold consecutive failures on a target, the
fan-out stops attempting that target for circuitOpenMillis. Once
the window elapses, the next write attempts it again — success
resets the counter, failure refreshes the open window. Set
circuitOpenThreshold = 0 to disable (target always tried).
This isolates one degraded target from slowing the fan-out hot path.
Discovery + binding¶
Target sinks are discovered by name via
@ServiceRegistered MessageSink onTargetSink(MessageSink, String name) —
the standard Mongoose injection pattern. Order doesn't matter; bind
state is per-target. A target registered after boot (via the 1.0.18
runtime-add broadcast) binds automatically on first accept.
Unbound targets (configured in targetSinkNames but no service yet
registered with that name) are skipped silently — the fan-out
doesn't throw. The admin can introspect via
FanOutSink.targetHealthSnapshot() to see which targets are bound +
each one's bind state, delivery count, and circuit state.
YAML equivalent¶
eventSinks:
- name: trade-fanout
instance: !!com.telamin.mongoose.plugin.lib.fanout.FanOutSink
targetSinkNames: [ audit-file, kafka-out, metrics-sink ]
failurePolicy: CONTINUE
circuitOpenThreshold: 5
circuitOpenMillis: 30000
- name: audit-file
instance: !!com.telamin.mongoose.connector.file.FileMessageSink
filename: trades-audit.log
- name: kafka-out
instance: !!com.example.KafkaTradeSink { brokers: ... }
- name: metrics-sink
instance: !!com.example.MetricsSink {}
What FanOutSink isn't¶
- Not a router — it sends every value to every target. For conditional routing (this trade to A, that trade to B) use a processor with multiple sink injections.
- Not a load-balancer — every target receives every value. Use Aeron MDC or a transport-level balancer for that.
- Not a multiplexer onto one wire — that's a sink implementation decision (e.g. multicast does it natively).
Tests + reference¶
- Unit tests:
mongoose-plugins/library/lib-fanout/src/test/java/.../FanOutSinkTest.java(9 cases — round-trip, CONTINUE policy, FAIL_FAST, circuit-breaker open + skip, retry-then-drop, late-arriving target, health snapshot, unconfigured-name filter)