mqtt
Subscribe to MQTT topics and receive messages. This input connects to an MQTT broker and subscribes to one or more topics using the MQTT 3.1.1 protocol.
input:
mqtt:
broker: "tcp://localhost:1883"
topics:
- "sensors/+"
input:
mqtt:
broker: "tcp://broker.example.com:1883"
topics:
- "events/#"
- "logs/+"
username: "user"
password: "secret"
qos: 2
input:
mqtt:
broker: "tcp://localhost:1883"
client_id: "fiddler-subscriber"
topics:
- "home/+/temperature"
- "home/+/humidity"
- "alerts/#"
qos: 1
input:
retry:
max_retries: 3
initial_wait: "1s"
backoff: "exponential"
mqtt:
broker: "tcp://localhost:1883"
topics:
- "sensors/+"
Fields
broker
The MQTT broker URL.
Type: string
Required: true
Format: tcp://host[:port] or mqtt://host[:port]
Default port is 1883 if not specified.
topics
List of MQTT topic patterns to subscribe to.
Type: array[string]
Required: true
Supports MQTT wildcards:
- + matches a single topic level
- # matches any number of levels (must be last character)
client_id
Client identifier sent to the broker.
Type: string
Required: false
Default: Auto-generated UUID (fiddler_<uuid>)
Must be unique across all clients connected to the broker.
qos
Quality of Service level for subscriptions.
Type: integer
Required: false
Default: 1
| Value | Description |
|---|---|
0 |
At most once - fire and forget |
1 |
At least once - acknowledged delivery |
2 |
Exactly once - assured delivery |
username
Username for broker authentication.
Type: string
Required: false
password
Password for broker authentication.
Type: string
Required: false
keep_alive_secs
Keep-alive interval in seconds.
Type: integer
Required: false
Default: 60
The client sends ping requests to maintain the connection when idle.
retry
Retry policy for failed reads. When present, the runtime retries failed reads with backoff.
Type: object
Required: false
| Field | Type | Default | Description |
|---|---|---|---|
max_retries |
integer | 3 | Maximum retry attempts |
initial_wait |
string | "1s" | Wait before first retry |
max_wait |
string | "30s" | Maximum wait cap |
backoff |
string | "exponential" | Strategy: constant, linear, or exponential |
How It Works
- The input connects to the MQTT broker
- It subscribes to all configured topics with the specified QoS
- Messages are received asynchronously and fed into the pipeline
- Automatic reconnection occurs on connection failures
MQTT Wildcards
Single-Level Wildcard (+)
Matches exactly one topic level:
topics:
- "sensors/+/temperature" # Matches sensors/room1/temperature, sensors/room2/temperature
- "home/+/+/status" # Matches home/floor1/room1/status
Multi-Level Wildcard (#)
Matches any number of levels (must be at the end):
topics:
- "sensors/#" # Matches sensors/temp, sensors/room/temp, etc.
- "home/floor1/#" # Matches all topics under home/floor1/
Examples
IoT Sensor Data
input:
mqtt:
broker: "tcp://localhost:1883"
topics:
- "sensors/+/temperature"
- "sensors/+/humidity"
qos: 1
processors:
- fiddlerscript:
code: |
let data = parse_json(this);
data = set(data, "received_at", now());
this = bytes(str(data));
output:
clickhouse:
url: "http://localhost:8123"
table: "sensor_readings"
Secure Connection with Auth
input:
mqtt:
broker: "tcp://mqtt.example.com:1883"
client_id: "fiddler-production"
topics:
- "events/#"
qos: 2
username: "fiddler"
password: "${MQTT_PASSWORD}"
keep_alive_secs: 30
output:
kafka:
brokers: ["kafka:9092"]
topic: "mqtt-events"
Home Automation
input:
mqtt:
broker: "tcp://homeassistant.local:1883"
topics:
- "homeassistant/sensor/+/state"
- "homeassistant/binary_sensor/+/state"
- "homeassistant/switch/+/state"
processors:
- filter:
condition: "state != 'unavailable'"
output:
stdout: {}
All Messages (Development)
input:
mqtt:
broker: "tcp://localhost:1883"
topics:
- "#" # Subscribe to everything
qos: 0 # Fire and forget for debugging
output:
stdout: {}
Quality of Service Levels
| QoS | Delivery | Duplicates | Use Case |
|---|---|---|---|
| 0 | At most once | No | Telemetry, non-critical data |
| 1 | At least once | Possible | Most applications |
| 2 | Exactly once | No | Critical data, transactions |
Higher QoS levels add latency due to additional handshakes.
Error Handling
- Connection failures: Automatic reconnection with 1-second backoff
- Subscribe failures: Logged per topic
- Broker disconnection: Automatic reconnection
Broker URL Format
tcp://host:port # Standard MQTT (port 1883)
mqtt://host:port # Alias for tcp://
host:port # Protocol prefix optional
host # Uses default port 1883
See Also
- mqtt output - Publish messages to MQTT topics