aws_kinesis
Put records to an AWS Kinesis Data Stream. This output batches messages and publishes them efficiently using the PutRecords API.
output:
aws_kinesis:
stream_name: "my-stream"
output:
aws_kinesis:
stream_name: "events"
partition_key: "customer_id"
region: "us-west-2"
output:
aws_kinesis:
stream_name: "my-stream"
region: "us-east-1"
credentials:
access_key_id: "${AWS_ACCESS_KEY_ID}"
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
output:
aws_kinesis:
stream_name: "high-volume"
batch:
size: 500
duration: "5s"
output:
retry:
max_retries: 5
initial_wait: "2s"
backoff: "exponential"
aws_kinesis:
stream_name: "my-stream"
Fields
stream_name
The name of the Kinesis stream to write to.
Type: string
Required: true
partition_key
The partition key for shard distribution.
Type: string
Required: false
Default: Random UUID per record
The partition key determines which shard receives each record. Using a consistent partition key ensures related records go to the same shard and maintain ordering.
region
AWS region where the stream is located.
Type: string
Required: false
Uses the AWS SDK's default region resolution if not specified.
endpoint_url
Custom endpoint URL for the Kinesis API.
Type: string
Required: false
Useful for local development with LocalStack.
credentials
Explicit AWS credentials. If not provided, uses the standard AWS credential chain.
Type: object
Required: false
| Field | Type | Required | Description |
|---|---|---|---|
access_key_id |
string | Yes | AWS access key ID |
secret_access_key |
string | Yes | AWS secret access key |
session_token |
string | No | AWS session token (for temporary credentials) |
batch
Batching policy for grouping records before sending.
Type: object
Required: false
| Field | Type | Description |
|---|---|---|
size |
integer | Maximum records per batch (max: 500) |
duration |
string | Maximum time before flush (default: "5s") |
max_batch_bytes |
integer | Maximum cumulative byte size per batch (default: 5MB) |
Note: Kinesis PutRecords API has a hard limit of 500 records per call. The batch size is automatically capped at 500. The max_batch_bytes default is 5MB to match the Kinesis PutRecords payload limit.
retry
Retry policy for failed writes. When present, the runtime retries failed writes with backoff.
Type: object
Required: false
| Field | Type | Default | Description |
|---|---|---|---|
max_retries |
integer | 3 | Maximum retry attempts |
initial_wait |
string | "1s" | Wait before first retry |
max_wait |
string | "30s" | Maximum wait cap |
backoff |
string | "exponential" | Strategy: constant, linear, or exponential |
How It Works
- Messages are buffered according to the batch policy
- When a batch is ready, records are published using PutRecords
- Each record uses the configured partition key (or random UUID)
- Partial failures are logged but not retried automatically
- Automatic reconnection on connection failures
AWS Authentication
The Kinesis output supports two authentication methods:
Explicit Credentials
output:
aws_kinesis:
stream_name: "events"
credentials:
access_key_id: "${AWS_ACCESS_KEY_ID}"
secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
AWS SDK Credential Chain (default)
When no credentials are specified, the standard AWS credential chain is used:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - IAM role for EC2/ECS/Lambda
- Web identity token (EKS)
Required IAM Permissions
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:*:stream/my-stream"
}
]
}
Examples
Basic Stream Publishing
input:
http_server:
port: 8080
processors:
- fiddlerscript:
code: |
let data = parse_json(this);
data = set(data, "ingested_at", now());
this = bytes(str(data));
output:
aws_kinesis:
stream_name: "events"
Ordered Processing with Partition Key
Use a consistent partition key to maintain ordering for related records:
input:
kafka:
brokers: ["kafka:9092"]
topics: ["orders"]
output:
aws_kinesis:
stream_name: "order-events"
partition_key: "order_id" # All events for same order go to same shard
High-Throughput Configuration
input:
amqp:
url: "amqp://localhost"
queue: "high_volume"
prefetch_count: 1000
output:
aws_kinesis:
stream_name: "analytics"
batch:
size: 500 # Maximum allowed
duration: "1s"
Local Development with LocalStack
input:
stdin: {}
output:
aws_kinesis:
stream_name: "test-stream"
region: "us-east-1"
endpoint_url: "http://localhost:4566"
Cross-Account Access
output:
aws_kinesis:
stream_name: "events"
region: "eu-west-1"
credentials:
access_key_id: "${CROSS_ACCOUNT_ACCESS_KEY}"
secret_access_key: "${CROSS_ACCOUNT_SECRET_KEY}"
Partition Key Strategies
| Strategy | Use Case |
|---|---|
| Random UUID (default) | Even distribution, no ordering guarantee |
| Static value | All records to same shard |
| Entity ID | Ordering within entity (user, order, etc.) |
| Timestamp | Time-based distribution |
Kinesis Limits
| Limit | Value |
|---|---|
| Records per PutRecords | 500 |
| Record size | 1 MB |
| PutRecords payload | 5 MB |
| Write throughput per shard | 1 MB/s or 1,000 records/s |
Error Handling
- Partial failures: Logged with count of failed records
- Connection failures: Automatic retry
- Throttling: SDK handles backoff automatically
- Oversized records: Rejected with error
Performance Considerations
- Batch size: Use 500 for maximum throughput
- Partition key: Distribute evenly to avoid hot shards
- Record size: Keep records small for better batching
- Flush interval: Shorter intervals reduce latency at the cost of more API calls
See Also
- aws_kinesis input - Consume records from Kinesis streams
- aws_sqs - AWS SQS output
- AWS Kinesis Documentation