connector-kafka¶
Apache Kafka producer (KafkaMessagePublisher) and consumer (KafkaMessageConsumer) for cross-process / cross-host messaging.
<dependency>
<groupId>com.telamin</groupId>
<artifactId>connector-kafka</artifactId>
<version>1.0.35</version>
</dependency>
When to use¶
- Producer/consumer pattern that crosses process or host boundaries.
- You already operate a Kafka cluster.
- You need broker-side fan-out, retention, or consumer-group semantics.
For single-host pipelines, prefer connector-file or connector-chronicle — they have no broker dependency.
Producer¶
eventSinks:
- name: enriched-out
instance: !!com.telamin.mongoose.plugin.connector.kafka.KafkaMessagePublisher
topic: enriched-trades
flushEveryMessage: true
registerShutdownHook: true
closeTimeoutMs: 5000
properties:
bootstrap.servers: kafka-1:9092,kafka-2:9092
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
Async send callback tracks success / failure:
getSendCount()— successful sendsgetSendErrors()— async failures (logged at WARN)
Why a shutdown hook?¶
registerShutdownHook=true (default) installs a JVM shutdown hook that flushes + closes the producer on abrupt VM exit. Combined with closeTimeoutMs (default 5 s), buffered records have a bounded window to drain before the JVM dies. Disable for tests.
Consumer¶
eventFeeds:
- name: orders-feed
instance: !!com.telamin.mongoose.plugin.connector.kafka.KafkaMessageConsumer
topics: ["orders"]
pollTimeoutMs: 100
wakeupOnTearDown: true
properties:
bootstrap.servers: kafka-1:9092
group.id: mongoose-orders
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset: latest
tearDown() calls consumer.wakeup() to unblock an in-flight poll() before close() — the agent thread exits within pollTimeoutMs.
Configuration reference¶
KafkaMessagePublisher¶
| Property | Default | Notes |
|---|---|---|
topic |
required | Throws IllegalStateException if unset. |
properties |
empty | Standard Kafka producer config. |
flushEveryMessage |
true |
Synchronous flush after every send. Durable but slow. |
registerShutdownHook |
true |
JVM hook flushes buffered records on abrupt exit. |
closeTimeoutMs |
5000 |
Bound on producer.close() during teardown. |
KafkaMessageConsumer¶
| Property | Default | Notes |
|---|---|---|
properties |
required | Standard Kafka consumer config. |
topics |
required | One or more topic names. |
pollTimeoutMs |
100 |
Per-doWork poll timeout. |
wakeupOnTearDown |
true |
Wake an in-flight poll to unblock teardown. |
Operational notes¶
- Bring your own Kafka client config —
bootstrap.servers, serializers, and ACL credentials live in thepropertiesmap. - The publisher uses an async send callback; if your
value.serializerblows up the failure is logged at WARN and counted ingetSendErrors(). SetflushEveryMessage=trueand checkgetSendErrors()after a critical batch. - For at-least-once on the consumer side, configure
enable.auto.commit=falseinpropertiesand commit manually from your processor.
Examples¶
- plugins/event-source-example — agent-hosted source template; mirrors how
KafkaMessageConsumerintegrates. - plugins/message-sink-example — sink template; same shape as
KafkaMessagePublisher.
A docker-compose-based round-trip example is on the Examples roadmap.