Skip to content

Example: Wiring File and In‑Memory Event Sources to a File Sink using MongooseServerConfig (Fluent API)

This guide shows how to:

  • Extend ObjectEventHandlerNode to build a simple processor that receives all events.
  • Configure two event sources: FileEventSource and InMemoryEventSource.
  • Configure a FileMessageSink as an output.
  • Bind everything together with the fluent MongooseServerConfig builder.

The processor receives events from both sources and writes them to a file sink.

1) Create a custom handler

Extend ObjectEventHandlerNode and inject the FileMessageSink using @ServiceRegistered, see BuilderApiExampleHandler. In handleEvent, forward the incoming event to the sink:

public class BuilderApiExampleHandler extends ObjectEventHandlerNode {

    private MessageSink fileSink;

    @ServiceRegistered
    public void wire(MessageSink fileSink, String name) {
        this.fileSink = fileSink;
    }

    @Override
    protected boolean handleEvent(Object event) {
        if (fileSink != null && event != null) {
            fileSink.accept(event.toString());
        }
        return true;
    }
}

Notes:

  • FileMessageSink extends AbstractMessageSink and provides accept(value) for publishing.
  • The sink is injected automatically by the server when registered as a service.

2) Configure sources, processor, and sink via MongooseServerConfig

Use the builder APIs for EventFeedConfig, EventProcessorGroupConfig, and register the sink as a service.

Path inputFile = Paths.get("/path/to/input/events.txt");
Path outputFile = Paths.get("/path/to/output/out.log");

FileMessageSink fileSink = new FileMessageSink();
fileSink.setFilename(outputFile.toString());

FileEventSource fileSource = new FileEventSource();
fileSource.setFilename(inputFile.toString());
fileSource.setCacheEventLog(true);

InMemoryEventSource<String> inMemSource = new InMemoryEventSource<>();
inMemSource.setCacheEventLog(true);

EventProcessorGroupConfig processorGroup = EventProcessorGroupConfig.builder()
        .agentName("processor-agent")
        .put("example-processor", new EventProcessorConfig(new BuilderApiExampleHandler()))
        .build();

EventFeedConfig<?> fileFeedCfg = EventFeedConfig.builder()
        .instance(fileSource)
        .name("fileFeed")
        .broadcast(true)
        .agent("file-source-agent", new BusySpinIdleStrategy())
        .build();

EventFeedConfig<?> memFeedCfg = EventFeedConfig.builder()
        .instance(inMemSource)
        .name("inMemFeed")
        .broadcast(true)
        .agent("memory-source-agent", new BusySpinIdleStrategy())
        .build();

EventSinkConfig<FileMessageSink> sinkCfg = EventSinkConfig.<FileMessageSink>builder()
        .instance(fileSink)
        .name("fileSink")
        .build();

MongooseServerConfig mongooseServerConfig = MongooseServerConfig.builder()
        .addProcessorGroup(processorGroup)
        .addEventFeed(fileFeedCfg)
        .addEventFeed(memFeedCfg)
        .addEventSink(sinkCfg)
        .build();

3) Boot the server and send events

MongooseServer server = MongooseServer.bootServer(mongooseServerConfig, rec -> {});

try {
    // Stimulate sources: write to input file and offer memory events
    Files.writeString(inputFile, "file-1\nfile-2\n", StandardCharsets.UTF_8);

    // Access the in-memory source via registered services
    Map<Service<?>> services = server.registeredServices();
    InMemoryEventSource<String> registeredMem = services.get("inMemFeed").instance();
    registeredMem.offer("mem-1");
    registeredMem.offer("mem-2");

    // Allow agents to process. Spin-wait up to a few seconds for output lines.
    List<String> lines = waitForLines(outputFile, 4, 5, TimeUnit.SECONDS);
    Assertions.assertTrue(
            lines.containsAll(List.of("file-1", "file-2", "mem-1", "mem-2")),
            () -> "Missing expected lines in sink: " + lines);
} finally {
    server.stop();
}

The example test BuilderApiFluentExampleTest demonstrates the complete flow and asserts that the sink contains these events from both sources:

  • file-1, file-2 (from FileEventSource)
  • mem-1, mem-2 (from InMemoryEventSource)

Tips

  • FileEventSource supports caching and replay across startComplete. Using setCacheEventLog(true) helps capture pre-start data.
  • InMemoryEventSource supports offer(item) and respects caching similarly.
  • You can register sinks either via EventSinkConfig (when your sink type matches its generic bound) or simply as a Service using ServiceConfig.