zeromq
Receive messages using ZeroMQ sockets. This input supports PULL and SUB socket types for building distributed messaging patterns.
input:
zeromq:
socket_type: "pull"
bind: "tcp://*:5555"
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://publisher:5555"
subscribe:
- "events"
- "logs"
input:
zeromq:
socket_type: "pull"
connect:
- "tcp://worker1:5555"
- "tcp://worker2:5555"
- "tcp://worker3:5555"
input:
retry:
max_retries: 3
initial_wait: "1s"
backoff: "exponential"
zeromq:
socket_type: "pull"
bind: "tcp://*:5555"
Fields
socket_type
The type of ZeroMQ socket to create.
Type: string
Required: true
| Value | Description |
|---|---|
pull |
Receive messages from PUSH sockets (pipeline pattern) |
sub |
Subscribe to topics from PUB sockets (pub/sub pattern) |
bind
Address to bind the socket to.
Type: string
Required: false (but either bind or connect must be specified)
Format: <transport>://<address>
connect
Addresses to connect to.
Type: array[string]
Required: false (but either bind or connect must be specified)
Multiple addresses can be specified to connect to several endpoints.
subscribe
Topic filters for SUB sockets.
Type: array[string]
Required: Required for sub socket type
Use "" (empty string) to receive all messages.
retry
Retry policy for failed reads. When present, the runtime retries failed reads with backoff.
Type: object
Required: false
| Field | Type | Default | Description |
|---|---|---|---|
max_retries |
integer | 3 | Maximum retry attempts |
initial_wait |
string | "1s" | Wait before first retry |
max_wait |
string | "30s" | Maximum wait cap |
backoff |
string | "exponential" | Strategy: constant, linear, or exponential |
How It Works
PULL Socket
- Creates a PULL socket and binds/connects to the specified addresses
- Messages are received from connected PUSH sockets
- Messages are load-balanced across multiple PUSH senders
- Fair-queuing ensures even distribution
SUB Socket
- Creates a SUB socket and binds/connects to the specified addresses
- Subscribes to the specified topic filters
- Messages matching any filter are received
- Topic matching is prefix-based
Socket Patterns
Pipeline (PUSH/PULL)
Distributes work across multiple workers:
┌──► [PULL Worker 1]
[PUSH] ──┼──► [PULL Worker 2]
└──► [PULL Worker 3]
# Worker configuration
input:
zeromq:
socket_type: "pull"
connect:
- "tcp://dispatcher:5555"
Pub/Sub
Broadcasts messages to all subscribers:
┌──► [SUB Client 1]
[PUB] ───────┼──► [SUB Client 2]
└──► [SUB Client 3]
# Subscriber configuration
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://publisher:5555"
subscribe:
- "events"
Examples
Work Distribution Pipeline
# Collector receiving from multiple producers
input:
zeromq:
socket_type: "pull"
bind: "tcp://*:5555"
processors:
- fiddlerscript:
code: |
let data = parse_json(this);
data = set(data, "collector_ts", now());
this = bytes(str(data));
output:
clickhouse:
url: "http://localhost:8123"
table: "events"
Subscribe to All Messages
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://publisher:5555"
subscribe:
- "" # Empty string = all messages
output:
stdout: {}
Multi-Topic Subscription
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://events.example.com:5555"
subscribe:
- "order:"
- "payment:"
- "shipment:"
processors:
- filter:
condition: "type != null"
output:
kafka:
brokers: ["kafka:9092"]
topic: "events"
Connect to Multiple Publishers
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://server1:5555"
- "tcp://server2:5555"
- "tcp://server3:5555"
subscribe:
- "metrics"
output:
stdout: {}
Bind vs Connect
Both SUB and PULL sockets can either bind or connect:
# Publisher binds, subscriber connects (common)
input:
zeromq:
socket_type: "sub"
connect:
- "tcp://publisher:5555"
subscribe:
- ""
# Subscriber binds, publisher connects (reverse)
input:
zeromq:
socket_type: "sub"
bind: "tcp://*:5556"
subscribe:
- ""
Transport Types
| Transport | Format | Description |
|---|---|---|
| TCP | tcp://host:port |
Network communication |
| IPC | ipc:///path |
Inter-process (Unix sockets) |
| inproc | inproc://name |
In-process threads |
TCP Examples
bind: "tcp://*:5555" # Bind to all interfaces
bind: "tcp://127.0.0.1:5555" # Localhost only
connect:
- "tcp://remote:5555" # Connect to remote host
IPC Examples
bind: "ipc:///tmp/fiddler.sock"
connect:
- "ipc:///tmp/producer.sock"
Topic Matching (SUB)
Topic filtering uses prefix matching:
| Subscribe | Matches |
|---|---|
"" |
All messages |
"order" |
"order", "order:new", "order:update" |
"order:" |
"order:new", "order:update" (not "order") |
"error" |
"error", "error.critical", "errors" |
Error Handling
- Bind failures: Error returned during startup
- Connect failures: Logged, connection retried automatically
- Receive errors: Logged, continues listening
Performance Considerations
- High-water mark: ZeroMQ buffers messages; backpressure is handled internally
- Multiple connections: Fair-queuing across all connected sockets
- Message framing: ZeroMQ handles message boundaries automatically
See Also
- zeromq output - Send messages using ZeroMQ sockets
- ZeroMQ Guide - Comprehensive ZeroMQ patterns