How-to: Use HandlerPipe for in-VM communication between handlers¶
HandlerPipe is a lightweight, in-VM pipe for sending messages from one handler (or service) to other handlers via Mongoose's event flow, without external IO.
It couples: - a publish-side MessageSink (sink()) that you call to send data, and - a receive-side InMemoryEventSource (getSource()) that integrates with the event flow, allowing processors to subscribe.
When to use¶
- You want handlers to talk to each other inside the same JVM without setting up external transports.
- You want lifecycle-aware dispatch: cache events before startComplete and replay them once the system is ready.
- You want to reuse Mongoose’s subscription, wrapping, and data-mapping features.
Quick start¶
// Create a pipe for a logical feed name
HandlerPipe<String> pipe = HandlerPipe.<String>of("ordersFeed").cacheEventLog(true);
// Wire the receive-side into your server configuration (pseudo-code):
MongooseServerConfig cfg = new MongooseServerConfig();
cfg.addService("ordersFeedService", pipe.getSource());
// In your processor(s), subscribe to the source by service name
pipe.getSource().subscribe(); // typically invoked during composition/registration
// Publish from anywhere within the JVM
pipe.sink().accept("order-123-created");
Notes: - cacheEventLog(true) will cache any events published before startComplete and replay them automatically when the source calls startComplete(). - You can customize data mapping and wrapping like other EventSource services.
Sample code¶
- Processor source: HandlerPipeServerBootExample.java
- Test node: HandlerPipeTest.java
- Complete how-to example: HandlerPipe Example
Lifecycle semantics¶
HandlerPipe delegates lifecycle to InMemoryEventSource: - start(): If cacheEventLog is true, publishes are cached (not dispatched). - startComplete(): Cached events are replayed to subscribers, subsequent publishes dispatch immediately.
You can still push items before start() using pipe.sink().accept(...); they will be cached if caching is enabled and replayed later.
Controlling wrapping and mapping¶
-
Wrapping: choose how events are wrapped for subscribers.
HandlerPipe<String> pipe = HandlerPipe.of("myFeed", EventSource.EventWrapStrategy.SUBSCRIPTION_NOWRAP);
-
Data mapping: transform outgoing items before dispatch.
pipe.dataMapper((String s) -> s.toUpperCase());
Testing and local observation¶
To observe dispatches without a full server, attach a queue to the publisher used by the source. In tests, we use EventToQueuePublisher and a OneToOneConcurrentArrayQueue:
HandlerPipe<String> pipe = HandlerPipe.<String>of("handlerPipeFeed").cacheEventLog(true);
EventToQueuePublisher<String> eventToQueue = new EventToQueuePublisher<>("handlerPipeFeed");
OneToOneConcurrentArrayQueue<Object> targetQueue = new OneToOneConcurrentArrayQueue<>(128);
eventToQueue.addTargetQueue(targetQueue, "outputQueue");
pipe.getSource().setOutput(eventToQueue); // test hook on source for injection
pipe.getSource().start();
pipe.sink().accept("a");
pipe.sink().accept("b");
// No items dispatched until startComplete when caching
targetQueue.drainTo(new ArrayList<>(), 100); // empty
pipe.getSource().startComplete();
// Now queue drains ["a", "b"]
For simplistic in-memory collection of published values, consider InMemoryMessageSink. You can replace the default sink by wrapping or delegating to pipe.sink().
Full server boot example¶
A complete, runnable example is available here: - Path: src/test/java/com/telamin/mongoose/example/handlerpipe/HandlerPipeServerBootExample.java
Key parts of the example:
-
Create the pipe and a processor that subscribes to its feed name, plus an in-memory sink:
HandlerPipe<String> pipe = HandlerPipe.<String>of("examplePipe").cacheEventLog(true); NamedFeedsFilterHandler handler = new NamedFeedsFilterHandler(java.util.Set.of(pipe.getSource().getName())); InMemoryMessageSink sink = new InMemoryMessageSink();
-
Wire the pipe’s source as an EventFeed and boot the server:
EventProcessorGroupConfig processors = EventProcessorGroupConfig.builder() .agentName("processor-agent") .put("pipe-listener", new EventProcessorConfig(handler)) .build(); EventFeedConfig<?> pipeFeed = EventFeedConfig.builder() .instance(pipe.getSource()) .name(pipe.getSource().getName()) .broadcast(true) .agent("pipe-agent", new BusySpinIdleStrategy()) .build(); EventSinkConfig<MessageSink<?>> sinkCfg = EventSinkConfig.<MessageSink<?>>builder() .instance(sink) .name("memSink") .build(); MongooseServerConfig mongooseServerConfig = MongooseServerConfig.builder() .addProcessorGroup(processors) .addEventFeed(pipeFeed) .addEventSink(sinkCfg) .build(); MongooseServer server = MongooseServer.bootServer(mongooseServerConfig, rec -> {});
-
Publish via the pipe and observe results in the sink:
pipe.sink().accept("hello"); pipe.sink().accept("world"); List<Object> out = waitForMessages(sink, 2, 5, TimeUnit.SECONDS);
See the full file for the waitForMessages helper and assertions.
Tips¶
- Use a descriptive feed name; processors subscribe by service name.
- Prefer small, concise data mappers on the source side when transforming events.
- For backpressure/slow-consumer concerns, see EventToQueuePublisher settings (wrapping, logging). The pipe uses the same underlying publisher infrastructure via the source.