redis
Consume data from Redis using list operations (BLPOP/BRPOP), Pub/Sub subscriptions, or streams with consumer groups (XREADGROUP). This input supports queue-like consumption from lists, real-time message streaming via Pub/Sub, and at-least-once delivery with acknowledgment via Redis Streams.
input:
redis:
url: "redis://localhost:6379"
mode: "list"
keys:
- "input_queue"
input:
redis:
url: "redis://localhost:6379"
mode: "pubsub"
channels:
- "events"
- "notifications"
input:
redis:
url: "redis://localhost:6379"
mode: "stream"
streams:
- "my_stream"
consumer_group: "my_group"
consumer_name: "worker-1"
input:
redis:
url: "redis://localhost:6379"
mode: "stream"
streams:
- "my_stream"
consumer_group: "my_group"
stream_read_count: 100
block_ms: 5000
auto_claim:
enabled: true
idle_ms: 30000
interval_ms: 10000
batch_size: 100
create_group: true
input:
redis:
url: "redis://localhost:6379"
mode: "pubsub"
channels:
- "events.*"
- "logs.*"
use_patterns: true
input:
retry:
max_retries: 3
initial_wait: "1s"
backoff: "exponential"
redis:
url: "redis://localhost:6379"
mode: "list"
keys:
- "input_queue"
Fields
url
Redis connection URL.
Type: string
Required: true
Format: redis://[username:password@]host[:port][/db]
mode
Operation mode for consuming data.
Type: string
Required: false
Default: "list"
| Value | Description |
|---|---|
list |
Pop messages from Redis lists using BLPOP/BRPOP |
pubsub |
Subscribe to channels for real-time messages |
stream |
Read from streams using consumer groups (XREADGROUP) with at-least-once delivery |
keys
List key names to pop from (list mode only).
Type: array[string]
Required: Required for list mode
Multiple keys are supported; the first key with data is popped.
channels
Channel names or patterns to subscribe to (pubsub mode only).
Type: array[string]
Required: Required for pubsub mode
list_command
The blocking list operation to use.
Type: string
Required: false
Default: "brpop"
| Value | Description |
|---|---|
brpop |
Pop from the tail (FIFO queue behavior) |
blpop |
Pop from the head (LIFO stack behavior) |
timeout
Blocking timeout in seconds for list operations.
Type: integer
Required: false
Default: 1
streams
Stream names to read from (stream mode only).
Type: array[string]
Required: Required for stream mode
consumer_group
Consumer group name for stream mode.
Type: string
Required: Required for stream mode
consumer_name
Consumer name within the group.
Type: string
Required: false
Default: hostname:pid
Defaults to hostname:pid if not specified. Supports Handlebars templating (e.g. "{{HOSTNAME}}:{{PID}}").
stream_read_count
Maximum entries per XREADGROUP call.
Type: integer
Required: false
Default: 100
block_ms
How long XREADGROUP blocks waiting for new messages, in milliseconds.
Type: integer
Required: false
Default: 5000
auto_claim
Controls periodic reclamation of idle pending messages from dead consumers using XAUTOCLAIM (requires Redis 6.2+).
Type: object
Required: false
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
boolean | true |
Enable auto-claim background task |
idle_ms |
integer | 30000 |
Minimum idle time (ms) before a message is eligible for reclamation |
interval_ms |
integer | 10000 |
How often the auto-claim task runs (ms) |
batch_size |
integer | 100 |
Maximum messages to reclaim per cycle |
Set idle_ms high enough to avoid stealing in-flight messages from healthy consumers.
create_group
Automatically create the consumer group (and stream via MKSTREAM) on startup.
Type: boolean
Required: false
Default: true
If false and the group or stream doesn't exist, startup fails with a validation error.
use_patterns
Enable pattern matching for Pub/Sub subscriptions.
Type: boolean
Required: false
Default: false
When enabled, channel names are treated as patterns (uses PSUBSCRIBE).
retry
Retry policy for failed reads. When present, the runtime retries failed reads with backoff.
Type: object
Required: false
| Field | Type | Default | Description |
|---|---|---|---|
max_retries |
integer | 3 | Maximum retry attempts |
initial_wait |
string | "1s" | Wait before first retry |
max_wait |
string | "30s" | Maximum wait cap |
backoff |
string | "exponential" | Strategy: constant, linear, or exponential |
How It Works
List Mode
- Connects to Redis using the provided URL
- Executes BRPOP/BLPOP on the specified keys with timeout
- Returns the first available message from any key
- Repeats indefinitely until shutdown
Stream Mode
- Connects to Redis and creates the consumer group if
create_group: true - Reads any pending messages for this consumer first (crash recovery)
- Enters the main read loop using XREADGROUP with BLOCK
- When a message is processed by the output, XACK is sent to acknowledge it
- Unacknowledged messages remain in the pending entry list (PEL) and are reprocessed on restart
- If auto-claim is enabled, a background task periodically runs XAUTOCLAIM to reclaim idle messages from dead consumers
Pub/Sub Mode
- Connects to Redis and creates a Pub/Sub connection
- Subscribes to all specified channels (or patterns if
use_patterns: true) - Messages are received asynchronously as they're published
- Automatic reconnection on connection failures
Examples
Simple Queue Processing
input:
redis:
url: "redis://localhost:6379"
mode: "list"
keys:
- "job_queue"
processors:
- fiddlerscript:
code: |
let job = parse_json(this);
// Process job...
this = bytes(str(job));
output:
stdout: {}
Priority Queue with Multiple Keys
input:
redis:
url: "redis://localhost:6379"
mode: "list"
keys:
- "high_priority"
- "normal_priority"
- "low_priority"
list_command: "brpop"
timeout: 5
output:
http:
url: "https://api.example.com/process"
Keys are checked in order; first non-empty key wins.
Real-Time Event Streaming
input:
redis:
url: "redis://localhost:6379"
mode: "pubsub"
channels:
- "events"
- "notifications"
output:
kafka:
brokers: ["kafka:9092"]
topic: "redis-events"
Pattern-Based Subscriptions
input:
redis:
url: "redis://localhost:6379"
mode: "pubsub"
channels:
- "sensor:*:temperature"
- "sensor:*:humidity"
use_patterns: true
processors:
- transform:
mappings:
- source: "value"
target: "reading"
- source: "timestamp"
target: "ts"
output:
clickhouse:
url: "http://localhost:8123"
table: "sensor_data"
Stream Consumer Group
input:
redis:
url: "redis://localhost:6379"
mode: "stream"
streams:
- "order_events"
consumer_group: "order_processors"
block_ms: 5000
auto_claim:
enabled: true
idle_ms: 30000
create_group: true
processors:
- fiddlerscript:
code: |
let order = parse_json(this);
// Process order...
this = bytes(str(order));
output:
http:
url: "https://api.example.com/orders"
Messages are acknowledged after successful output processing. If a consumer crashes, pending messages are automatically reclaimed by other consumers via XAUTOCLAIM.
Stack Processing (LIFO)
input:
redis:
url: "redis://localhost:6379"
mode: "list"
keys:
- "task_stack"
list_command: "blpop" # Pop from head = LIFO
output:
stdout: {}
With Authentication
input:
redis:
url: "redis://default:mypassword@redis.example.com:6379/0"
mode: "list"
keys:
- "secure_queue"
output:
stdout: {}
List vs Pub/Sub vs Stream
| Feature | List Mode | Pub/Sub Mode | Stream Mode |
|---|---|---|---|
| Persistence | Until consumed | Ephemeral | Persistent with trimming |
| Multiple consumers | Load distributed | All receive all | Consumer groups distribute load |
| Ordering | FIFO/LIFO guaranteed | Not guaranteed | Guaranteed within stream |
| Backpressure | Natural (list grows) | Messages dropped if slow | Natural (stream grows) |
| Replay | Not possible (consumed) | Not possible | Possible (stream persists) |
| Acknowledgment | Implicit (pop removes) | None | Explicit (XACK) |
| Dead consumer recovery | N/A | N/A | Auto-claim from PEL |
When to Use List Mode
- Simple job queues and task processing
- Load distribution across workers
- No need for message replay
When to Use Pub/Sub
- Real-time notifications
- Event broadcasting
- Fan-out patterns
When to Use Stream Mode
- At-least-once delivery guarantees
- Consumer group coordination across instances
- Dead consumer recovery without message loss
- Message replay and audit trails
Pattern Syntax (Pub/Sub)
When use_patterns: true:
| Pattern | Matches |
|---|---|
sensor:* |
sensor:1, sensor:room1, etc. |
logs:*:error |
logs:app:error, logs:db:error |
event:? |
event:1, event:a (single char) |
data:[abc] |
data:a, data:b, data:c |
Error Handling
- Connection failures: Automatic reconnection with backoff
- List timeout: Returns to blocking wait
- Pub/Sub disconnection: Stream ends, reconnection attempted
Redis URL Format
redis://localhost # Localhost, default port
redis://host:6380 # Custom port
redis://user:pass@host:6379 # With authentication
redis://host:6379/1 # Database 1
redis://:password@host:6379/0 # Password only (no username)
See Also
- redis output - Push data to Redis lists, publish to channels, or write to streams