September 19, 2025
Building a Production-Ready Message Queue with Redis Streams and .NET
In modern distributed systems, reliable message processing is crucial. This tutorial demonstrates how to build a robust messaging system using Redis Streams, featuring consumer groups, exponential backoff retry logic, and dead letter queue handling.
Table of Contents
- Why Redis Streams?
- Architecture Overview
- Setting Up Redis
- Understanding Redis Streams Commands
- C# Implementation
- Running the Demo
- Best Practices
Why Redis Streams?
Redis Streams provide a powerful, lightweight alternative to traditional message queues like RabbitMQ or Kafka. Key advantages:
- Low latency: In-memory data structure with microsecond response times
- Consumer Groups: Built-in load balancing across multiple consumers
- Persistence: Optional AOF/RDB persistence for durability
- PEL (Pending Entries List): Automatic tracking of unacknowledged messages
- Simple deployment: Single Redis instance, no complex cluster setup needed
When to Use Redis Streams
✅ Good fit:
- High-throughput, low-latency messaging
- Event-driven microservices
- Task queues with retry logic
- Real-time data pipelines
❌ Not ideal for:
- Messages requiring strict ordering guarantees across partitions
- Very large message payloads (> 1MB)
- Messages that need to be retained for months/years
Architecture Overview
Our messaging system follows the Producer-Consumer pattern with these components:
┌─────────────┐ ┌──────────────┐
│ Publisher │────────>│ Redis Stream │
└─────────────┘ └──────┬───────┘
│
┌───────────┴──────────┐
│ Consumer Group │
└───────────┬──────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Consumer #1 │ │ Consumer #2 │ │ Consumer #3 │
└───────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Handler │ │ Handler │ │ Handler │
└──────────────┘ └─────────────┘ └─────────────┘
│
▼
[Complete] or [Retry]
│
▼
┌──────────────┐
│ Retry Queue │ (Exponential Backoff)
└──────────────┘Key Concepts
- Stream: Append-only log of messages (like Kafka topic)
- Consumer Group: Distributes messages across multiple consumers
- PEL: Tracks messages that were delivered but not acknowledged
- XCLAIM: Reclaims messages from failed consumers
- Retry Queue: In-memory priority queue for exponential backoff
Setting Up Redis
1. Docker Compose Setup
Create a docker-compose.yml file:
version: '3.8'
services:
redis:
image: redis/redis-stack:latest
container_name: redis-messaging-poc
ports:
- "6379:6379" # Redis
- "8001:8001" # RedisInsight UI
environment:
- REDIS_ARGS=--appendonly yes
volumes:
- redis-data:/data
networks:
- messaging-net
redis-commander:
image: rediscommander/redis-commander:latest
container_name: redis-commander
environment:
- REDIS_HOSTS=local:redis:6379
ports:
- "8081:8081"
depends_on:
- redis
networks:
- messaging-net
volumes:
redis-data:
networks:
messaging-net:
driver: bridge2. Start Redis
# PowerShell
cd docker
docker-compose up -d
# Verify it's running
docker ps3. Access Redis Tools
- RedisInsight: http://localhost:8001 (Visual stream browser)
- Redis Commander: http://localhost:8081 (Key-value browser)
- CLI:
docker exec -it redis-messaging-poc redis-cli
Understanding Redis Streams Commands
Before diving into C# code, let's understand the Redis commands through hands-on examples.
Basic Message Flow
1. Publishing Messages (XADD)
# Add message to stream
XADD evt:product:order-service:{OrderCreated} * \
meta-data '{"CreatedOn":"2024-02-13T10:00:00Z","Type":"OrderCreated"}' \
data '{"OrderId":"ORD-001","Amount":100.00,"CustomerId":"CUST-123"}'
# Returns: "1707820800000-0" (timestamp-sequence)Stream Key Format: evt:product:{serviceName}:{MessageType}
evt: Event typeproduct: Product/domainorder-service: Service name{OrderCreated}: Message type
2. Creating Consumer Groups (XGROUP CREATE)
# Create consumer group (MKSTREAM creates stream if doesn't exist)
XGROUP CREATE evt:product:order-service:{OrderCreated} \
order-processors $ MKSTREAM
# $ means "start from new messages"
# 0 means "start from beginning of stream"3. Reading Messages (XREADGROUP)
# Consumer reads messages
XREADGROUP GROUP order-processors consumer-1 \
COUNT 10 BLOCK 1000 STREAMS \
evt:product:order-service:{OrderCreated} >
# > means "only new messages"
# Returns array of [stream, [[messageId, [field, value, ...]]]]Response Example:
1) 1) "evt:product:order-service:{OrderCreated}"
2) 1) 1) "1707820800000-0"
2) 1) "meta-data"
2) "{\"CreatedOn\":\"2024-02-13T10:00:00Z\",\"Type\":\"OrderCreated\"}"
3) "data"
4) "{\"OrderId\":\"ORD-001\",\"Amount\":100.00}"4. Acknowledging Messages (XACK)
# Acknowledge successful processing
XACK evt:product:order-service:{OrderCreated} \
order-processors 1707820800000-0
# Returns: (integer) 1 (number of messages acknowledged)Retry Flow
5. Inspecting Pending Messages (XPENDING)
# View pending messages summary
XPENDING evt:product:order-service:{OrderCreated} order-processors
# View detailed pending messages
XPENDING evt:product:order-service:{OrderCreated} order-processors \
- + 10 consumer-1
# Returns: [messageId, consumer, idleTime, deliveryCount]Example Response:
1) 1) "1707820800000-0"
2) "consumer-1"
3) (integer) 5000 # 5 seconds idle
4) (integer) 1 # Delivered once6. Claiming Messages for Retry (XCLAIM)
# Claim message from another consumer or for retry
XCLAIM evt:product:order-service:{OrderCreated} \
order-processors consumer-1 0 1707820800000-0
# 0 = minimum idle time (immediate claim)
# Returns the claimed message7. Automatic Retry with XAUTOCLAIM (Redis 6.2+)
# Automatically claim idle messages
XAUTOCLAIM evt:product:order-service:{OrderCreated} \
order-processors consumer-1 30000 0-0 COUNT 10
# 30000 = messages idle for 30+ seconds
# 0-0 = start from beginning
# Returns: [nextId, [messages], [deletedIds]]Load Balancing vs Pub/Sub
Load Balancing (1 Consumer Group, Multiple Consumers):
# Each message goes to ONE consumer
XGROUP CREATE mystream workers $ MKSTREAM
XREADGROUP GROUP workers consumer-1 STREAMS mystream > # Gets msg1
XREADGROUP GROUP workers consumer-2 STREAMS mystream > # Gets msg2
XREADGROUP GROUP workers consumer-3 STREAMS mystream > # Gets msg3Pub/Sub (Multiple Consumer Groups):
# Each message goes to ALL groups
XGROUP CREATE mystream group-A $ MKSTREAM
XGROUP CREATE mystream group-B $ MKSTREAM
# Same message delivered to both groups
XREADGROUP GROUP group-A consumer-1 STREAMS mystream > # Gets msg1
XREADGROUP GROUP group-B consumer-1 STREAMS mystream > # Gets msg1 (copy)C# Implementation
Now let's implement this in C#. Our POC consists of these layers:
SimplifiedMessaging/
├── Core/ # Core abstractions
│ ├── HandlerResult.cs
│ ├── MessageContext.cs
│ ├── MessageMetadata.cs
│ ├── IMessageHandler.cs
│ ├── IRetryMessagePolicy.cs
│ └── DefaultRetryMessagePolicy.cs
├── Messages/ # Message definitions
│ ├── IMessage.cs
│ ├── OrderCreated.cs
│ └── PaymentProcessed.cs
├── Messaging/ # Infrastructure
│ ├── Publisher.cs
│ ├── Consumer.cs
│ ├── Subscription.cs
│ ├── StreamProcessor.cs
│ └── RetryQueue.cs
└── Handlers/ # Example handlers
├── OrderCreatedHandler.cs
└── PaymentProcessedHandler.cs1. Core Abstractions
HandlerResult.cs - Handler Return Values
namespace SimplifiedMessaging.Core;
public enum HandlerResult
{
Complete = 0, // Message processed successfully (XACK)
Retry = 1 // Message failed, retry later (no XACK)
}MessageContext.cs - Message Processing Context
namespace SimplifiedMessaging.Core;
public class MessageContext<T> where T : IMessage
{
public MessageContext(string messageId, T data,
MessageMetadata metadata, int retryAttempt = 0)
{
MessageId = messageId;
Data = data;
Metadata = metadata;
RetryAttempt = retryAttempt;
}
public string MessageId { get; } // Redis Stream ID
public T Data { get; } // Actual message
public MessageMetadata Metadata { get; } // Type, correlation, etc.
public int RetryAttempt { get; } // Delivery count (0-based)
public HandlerResult HandlerResult { get; set; }
public DateTimeOffset? ScheduleRetryAfter { get; set; }
/// <summary>
/// Schedule a retry for specific time (e.g., fraud check after 1 hour)
/// </summary>
public HandlerResult ScheduleRetry(DateTimeOffset retryAfter)
{
if (retryAfter <= DateTimeOffset.UtcNow.AddSeconds(1))
{
throw new ArgumentException(
"Retry time must be at least 1 second in the future");
}
ScheduleRetryAfter = retryAfter;
return HandlerResult.Retry;
}
}DefaultRetryMessagePolicy.cs - Exponential Backoff
namespace SimplifiedMessaging.Core;
public class DefaultRetryMessagePolicy : IRetryMessagePolicy
{
private readonly int _backoffExponent;
private readonly DateTimeOffset _lastDelivered;
public int DeliveryCount { get; }
private DefaultRetryMessagePolicy(long idleTimeMs, long deliveryCount)
{
DeliveryCount = (int)deliveryCount;
_lastDelivered = DateTimeOffset.UtcNow.AddMilliseconds(-idleTimeMs);
// Exponential backoff: 2^deliveryCount seconds
// Cap at 2^11 = 2048 seconds (~34 minutes)
_backoffExponent = Math.Min(deliveryCount, 11);
}
public bool IsDueToBeProcessed(DateTimeOffset now)
{
var backoffInSeconds = 1 << _backoffExponent; // 2^exponent
return now - _lastDelivered > TimeSpan.FromSeconds(backoffInSeconds);
}
public static IRetryMessagePolicy Create(long idleTimeMs, long deliveryCount)
{
return new DefaultRetryMessagePolicy(idleTimeMs, deliveryCount);
}
}Backoff Schedule:
- Attempt 1: 2^1 = 2 seconds
- Attempt 2: 2^2 = 4 seconds
- Attempt 3: 2^3 = 8 seconds
- Attempt 4: 2^4 = 16 seconds
- ...
- Attempt 11+: 2^11 = 2048 seconds (~34 minutes)
2. Message Definitions
IMessage.cs - Base Interface
namespace SimplifiedMessaging.Messages;
public interface IMessage
{
string MessageType { get; }
}OrderCreated.cs - Example Event
namespace SimplifiedMessaging.Messages;
public class OrderCreated : IMessage
{
public string MessageType => nameof(OrderCreated);
public string OrderId { get; set; } = string.Empty;
public decimal Amount { get; set; }
public string CustomerId { get; set; } = string.Empty;
}3. Publisher - Publishing Messages
namespace SimplifiedMessaging.Messaging;
public class Publisher
{
private readonly IConnectionMultiplexer _redis;
public Publisher(IConnectionMultiplexer redis)
{
_redis = redis;
}
public async Task<string> PublishEventAsync<T>(
string serviceName, T message) where T : IMessage
{
var streamKey = GetStreamKey(serviceName, message.MessageType);
var metadata = new MessageMetadata
{
CreatedOn = DateTime.UtcNow,
Type = typeof(T).FullName ?? message.MessageType
};
var db = _redis.GetDatabase();
var streamEntries = new NameValueEntry[]
{
new("meta-data", JsonConvert.SerializeObject(metadata)),
new("data", JsonConvert.SerializeObject(message))
};
// XADD command
var messageId = await db.StreamAddAsync(streamKey, streamEntries);
return messageId.ToString();
}
// Batch publish for efficiency
public async Task<string[]> PublishEventBatchAsync<T>(
string serviceName, IEnumerable<T> messages) where T : IMessage
{
var messagesList = messages.ToList();
if (!messagesList.Any())
return Array.Empty<string>();
var streamKey = GetStreamKey(serviceName, messagesList.First().MessageType);
var db = _redis.GetDatabase();
var batch = db.CreateBatch();
var tasks = new List<Task<RedisValue>>();
foreach (var message in messagesList)
{
var metadata = new MessageMetadata
{
CreatedOn = DateTime.UtcNow,
Type = typeof(T).FullName ?? message.MessageType
};
var streamEntries = new NameValueEntry[]
{
new("meta-data", JsonConvert.SerializeObject(metadata)),
new("data", JsonConvert.SerializeObject(message))
};
tasks.Add(batch.StreamAddAsync(streamKey, streamEntries));
}
batch.Execute();
var messageIds = await Task.WhenAll(tasks);
return messageIds.Select(id => id.ToString()).ToArray();
}
private static string GetStreamKey(string serviceName, string messageType)
{
return $"evt:product:{serviceName}:{{{messageType}}}";
}
}4. RetryQueue - Priority Queue for Retries
namespace SimplifiedMessaging.Messaging;
public class RetryQueue
{
private readonly SortedSet<RetryEntry> _queue =
new(new RetryEntryComparer());
private readonly object _lock = new();
public void Enqueue(StreamPendingMessageInfo messageInfo,
IRetryMessagePolicy policy)
{
lock (_lock)
{
var entry = new RetryEntry(messageInfo, policy);
_queue.Add(entry);
}
}
public StreamPendingMessageInfo? TryDequeue(DateTimeOffset now)
{
lock (_lock)
{
var entry = _queue.FirstOrDefault();
if (entry == null)
return null;
// Only dequeue if retry time has passed
if (!entry.Policy.IsDueToBeProcessed(now))
return null;
_queue.Remove(entry);
return entry.MessageInfo;
}
}
private record RetryEntry(
StreamPendingMessageInfo MessageInfo,
IRetryMessagePolicy Policy);
// Sort by delivery count (lower first), then by message ID
private class RetryEntryComparer : IComparer<RetryEntry>
{
public int Compare(RetryEntry? x, RetryEntry? y)
{
if (x == null && y == null) return 0;
if (x == null) return -1;
if (y == null) return 1;
var deliveryCompare = x.Policy.DeliveryCount
.CompareTo(y.Policy.DeliveryCount);
if (deliveryCompare != 0)
return deliveryCompare;
return string.Compare(
x.MessageInfo.MessageId.ToString(),
y.MessageInfo.MessageId.ToString(),
StringComparison.Ordinal);
}
}
}5. StreamProcessor - The Core Engine
The StreamProcessor implements the Producer-Consumer pattern:
namespace SimplifiedMessaging.Messaging;
public class StreamProcessor
{
private readonly IConnectionMultiplexer _redis;
private readonly List<object> _subscriptions;
private readonly Channel<MessageTask> _messageChannel;
private const int MaxConcurrentMessages = 10;
public void Start()
{
// Start one producer per subscription
foreach (var subscription in _subscriptions)
{
Task.Run(() => ProduceAsync(subscription, _cts.Token));
}
// Start single consumer
Task.Run(() => ConsumeAsync(_cts.Token));
}
private async Task ProduceAsync(
object subscription, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
// 1. Check scheduled retry queue
await CheckScheduledRetryQueueAsync(...);
// 2. Check standard retry queue
await CheckRetryQueueAsync(...);
// 3. Poll Redis with XREADGROUP
var results = await db.StreamReadGroupAsync(
streamKey, consumerGroupName, consumerName,
lastMessageId: ">", count: 10
);
// 4. Push messages to channel
foreach (var streamEntry in results)
{
await _messageChannel.Writer.WriteAsync(
new MessageTask(subscription, streamEntry, 0));
}
await Task.Delay(100); // Prevent tight loop
}
}
private async Task ConsumeAsync(CancellationToken cancellationToken)
{
var semaphore = new SemaphoreSlim(MaxConcurrentMessages);
await foreach (var messageTask in
_messageChannel.Reader.ReadAllAsync(cancellationToken))
{
await semaphore.WaitAsync();
_ = Task.Run(async () =>
{
try
{
await ProcessMessageAsync(messageTask);
}
finally
{
semaphore.Release();
}
});
}
}
}6. Example Handler - Processing Logic
namespace SimplifiedMessaging.Handlers;
public class OrderCreatedHandler : IMessageHandler<OrderCreated>
{
public Task<HandlerResult> HandleAsync(
MessageContext<OrderCreated> context,
CancellationToken cancellationToken)
{
var order = context.Data;
// Invalid order - retry with exponential backoff
if (order.Amount < 0)
{
Console.WriteLine("Invalid amount, will retry");
return Task.FromResult(HandlerResult.Retry);
}
// High-value order - schedule retry for manual review
if (order.Amount > 1000 && context.RetryAttempt == 0)
{
Console.WriteLine("High-value order, scheduling review in 1 minute");
return Task.FromResult(
context.ScheduleRetry(DateTimeOffset.UtcNow.AddMinutes(1))
);
}
// Success
Console.WriteLine($"Order {order.OrderId} processed successfully!");
return Task.FromResult(HandlerResult.Complete);
}
}7. Consumer - Putting It All Together
namespace SimplifiedMessaging.Messaging;
public class Consumer
{
private readonly IConnectionMultiplexer _redis;
private readonly IServiceProvider _serviceProvider;
private readonly List<object> _subscriptions = new();
public Consumer Subscribe<TMessage, THandler>(
string serviceName,
string consumerGroupName,
int handlerTimeoutMs = 30000)
where TMessage : IMessage
where THandler : IMessageHandler<TMessage>
{
var subscription = new Subscription<TMessage>(
typeof(THandler),
GetStreamKey(serviceName, typeof(TMessage).Name),
consumerGroupName,
handlerTimeoutMs,
_serviceProvider
);
_subscriptions.Add(subscription);
return this;
}
public async Task StartAsync()
{
await EnsureConsumerGroupsExistAsync();
_processor = new StreamProcessor(_redis, _subscriptions);
_processor.Start();
}
private async Task EnsureConsumerGroupsExistAsync()
{
var db = _redis.GetDatabase();
foreach (var subscription in _subscriptions)
{
await db.StreamCreateConsumerGroupAsync(
streamKey, consumerGroupName,
StreamPosition.NewMessages, createStream: true
);
}
}
}Running the Demo
1. Start Redis
cd POC-Learning/docker
docker-compose up -d2. Restore Packages and Build
cd ../SimplifiedMessaging
dotnet restore
dotnet build3. Run the Application
dotnet run --project SimplifiedMessaging4. Expected Output
╔════════════════════════════════════════════════════════╗
║ SimplifiedMessaging POC - Redis Streams Demo ║
╚════════════════════════════════════════════════════════╝
🔌 Connecting to Redis...
✅ Connected to Redis
🔧 Setting up Dependency Injection...
✅ DI configured
📋 Setting up Consumer subscriptions...
📝 Subscribed to evt:product:order-service:{OrderCreated}
📝 Subscribed to evt:product:payment-service:{PaymentProcessed}
🔧 Ensuring consumer groups exist...
✅ Created consumer group 'order-processors'
✅ Created consumer group 'payment-processors'
🚀 Starting StreamProcessor...
✅ Started 2 producers and 1 consumer
╔════════════════════════════════════════════════════════╗
║ Publishing Test Messages ║
╚════════════════════════════════════════════════════════╝
📤 Publishing: Normal order ($50)
📤 Publishing: Medium order ($750 - will retry)
📤 Publishing: High-value order ($1500 - scheduled retry)
📦 OrderCreatedHandler Processing:
Order ID: ORD-001
Amount: $50
✅ Order processed successfully!
📦 OrderCreatedHandler Processing:
Order ID: ORD-002
Amount: $750
🔄 Medium order needs review, will retry
📦 OrderCreatedHandler Processing:
Order ID: ORD-003
Amount: $1500
⏰ High-value order, scheduling review in 1 minute
✅ Message 1707820800000-0 completed and acknowledged
🔄 Message 1707820800001-0 queued for retry (attempt 1)
🔄 Message 1707820800002-0 queued for retry (attempt 1)5. Explore with Redis Tools
RedisInsight (http://localhost:8001):
- View streams visually
- Inspect consumer groups
- See pending messages in PEL
Redis CLI:
docker exec -it redis-messaging-poc redis-cli
# View stream length
XLEN evt:product:order-service:{OrderCreated}
# View consumer groups
XINFO GROUPS evt:product:order-service:{OrderCreated}
# View pending messages
XPENDING evt:product:order-service:{OrderCreated} order-processorsBest Practices
1. Message Retention
Redis Streams can grow indefinitely. Implement trimming:
// Trim to max length
await db.StreamTrimAsync(streamKey, maxLength: 10000);
// Trim by age (requires Redis 6.2+)
await db.StreamTrimAsync(streamKey,
maxLength: 0,
useApproximateMaxLength: true);2. Dead Letter Queue
After N retries, move to DLQ:
if (context.RetryAttempt >= 5)
{
await _publisher.PublishEventAsync("dlq-service", new DeadLetter
{
OriginalMessage = context.Data,
Reason = "Max retries exceeded",
RetryCount = context.RetryAttempt
});
return HandlerResult.Complete; // Remove from PEL
}3. Idempotency
Always design handlers to be idempotent:
public async Task<HandlerResult> HandleAsync(
MessageContext<OrderCreated> context, CancellationToken ct)
{
var order = context.Data;
// Check if already processed
if (await _db.OrderExistsAsync(order.OrderId))
{
Console.WriteLine("Order already processed, skipping");
return HandlerResult.Complete;
}
// Process...
await _db.CreateOrderAsync(order);
return HandlerResult.Complete;
}4. Monitoring
Key metrics to track:
- PEL size:
XPENDINGcount per consumer group - Lag: Difference between latest message ID and last consumed
- Retry rate: Messages returning to retry queue
- Processing time: Handler execution duration
// Example metrics
var pending = await db.StreamPendingAsync(streamKey, consumerGroupName);
Console.WriteLine($"PEL size: {pending.Length}");
Console.WriteLine($"Oldest pending: {pending.OldestPendingMessageId}");5. Graceful Shutdown
Ensure clean shutdown to avoid message loss:
Console.CancelKeyPress += async (sender, e) =>
{
e.Cancel = true;
// 1. Stop accepting new messages
cts.Cancel();
// 2. Wait for in-flight messages
await consumer.StopAsync();
// 3. Optionally claim to "Abandoned" consumer
await ClaimToAbandonedConsumerAsync();
// 4. Close Redis connection
await redis.CloseAsync();
};6. Error Handling Strategies
Transient Errors (network timeout, DB deadlock):
if (ex is TimeoutException || ex is DbDeadlockException)
{
return HandlerResult.Retry; // Exponential backoff
}Permanent Errors (validation, business rule):
if (ex is ValidationException)
{
await LogErrorAsync(context, ex);
return HandlerResult.Complete; // Don't retry
}Scheduled Retry (external service unavailable):
if (ex is HttpRequestException)
{
// Retry in 5 minutes
return context.ScheduleRetry(DateTimeOffset.UtcNow.AddMinutes(5));
}Conclusion
You've now built a production-ready messaging system with:
✅ Reliable delivery: Consumer groups + PEL ✅ Retry logic: Exponential backoff + scheduled retries ✅ Load balancing: Multiple consumers per group ✅ Observability: RedisInsight + CLI tools ✅ Scalability: Horizontal scaling via consumer groups
Next Steps
- Add persistence: Enable AOF/RDB in Redis
- Implement DLQ: Move failed messages after max retries
- Add telemetry: Integrate with OpenTelemetry
- Scale horizontally: Run multiple consumer instances
- Add circuit breaker: Prevent cascade failures
Resources
Peace... 🍀