TL;DR
amqplib is the RabbitMQ client for Node.js — AMQP 0-9-1 protocol, exchanges, routing keys, dead letter queues, the classic message broker. KafkaJS is the Apache Kafka client — distributed event streaming, consumer groups, topic partitions, ordered logs, high throughput. Redis Streams (via ioredis) is the lightweight stream — append-only log, consumer groups, built into Redis, no separate infrastructure. In 2026: amqplib for RabbitMQ message routing, KafkaJS for distributed event streaming at scale, Redis Streams for lightweight streaming with existing Redis.
Key Takeaways
- amqplib: ~1.5M weekly downloads — RabbitMQ, AMQP, exchanges, routing, dead letters
- KafkaJS: ~1M weekly downloads — Kafka, event streaming, partitions, consumer groups
- Redis Streams: via ioredis ~2M downloads — append-only log, XADD/XREAD, lightweight
- Different architectures: message broker (RabbitMQ), event log (Kafka), stream (Redis)
- RabbitMQ excels at routing and message patterns
- Kafka excels at high-throughput ordered event streaming
amqplib (RabbitMQ)
amqplib — AMQP client for RabbitMQ:
Basic publish/consume
import amqp from "amqplib"
// Publisher:
async function publishMessage() {
const conn = await amqp.connect("amqp://localhost")
const channel = await conn.createChannel()
const queue = "tasks"
await channel.assertQueue(queue, { durable: true })
channel.sendToQueue(queue, Buffer.from(JSON.stringify({
type: "process-package",
name: "react",
})), {
persistent: true, // Survive broker restart
})
console.log("Message sent")
await channel.close()
await conn.close()
}
// Consumer:
async function consumeMessages() {
const conn = await amqp.connect("amqp://localhost")
const channel = await conn.createChannel()
const queue = "tasks"
await channel.assertQueue(queue, { durable: true })
await channel.prefetch(1) // Process one at a time
channel.consume(queue, async (msg) => {
if (!msg) return
const data = JSON.parse(msg.content.toString())
console.log("Processing:", data)
try {
await processTask(data)
channel.ack(msg) // Acknowledge success
} catch (err) {
channel.nack(msg, false, true) // Requeue on failure
}
})
}
Exchanges and routing
import amqp from "amqplib"
async function setupExchanges() {
const conn = await amqp.connect("amqp://localhost")
const channel = await conn.createChannel()
// Direct exchange — route by exact key:
await channel.assertExchange("packages", "direct", { durable: true })
await channel.assertQueue("npm-packages", { durable: true })
await channel.bindQueue("npm-packages", "packages", "npm")
channel.publish("packages", "npm", Buffer.from(JSON.stringify({
name: "react", version: "19.0.0",
})))
// Topic exchange — route by pattern:
await channel.assertExchange("events", "topic", { durable: true })
await channel.assertQueue("all-downloads", { durable: true })
await channel.bindQueue("all-downloads", "events", "package.*.download")
channel.publish("events", "package.react.download", Buffer.from("{}"))
channel.publish("events", "package.vue.download", Buffer.from("{}"))
// Fanout exchange — broadcast to all queues:
await channel.assertExchange("notifications", "fanout", { durable: true })
await channel.assertQueue("email-notifications", { durable: true })
await channel.assertQueue("slack-notifications", { durable: true })
await channel.bindQueue("email-notifications", "notifications", "")
await channel.bindQueue("slack-notifications", "notifications", "")
channel.publish("notifications", "", Buffer.from(JSON.stringify({
message: "New package version released!",
})))
}
Dead letter queue
import amqp from "amqplib"
async function setupDeadLetterQueue() {
const conn = await amqp.connect("amqp://localhost")
const channel = await conn.createChannel()
// Dead letter exchange:
await channel.assertExchange("dlx", "direct", { durable: true })
await channel.assertQueue("dead-letters", { durable: true })
await channel.bindQueue("dead-letters", "dlx", "failed")
// Main queue with DLX:
await channel.assertQueue("tasks", {
durable: true,
arguments: {
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "failed",
"x-message-ttl": 60000, // 60s TTL
},
})
// Failed messages go to dead-letters queue automatically
channel.consume("dead-letters", (msg) => {
if (!msg) return
console.error("Dead letter:", msg.content.toString())
channel.ack(msg)
})
}
KafkaJS
KafkaJS — Apache Kafka client:
Basic producer/consumer
import { Kafka } from "kafkajs"
const kafka = new Kafka({
clientId: "pkgpulse",
brokers: ["localhost:9092"],
})
// Producer:
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: "package-events",
messages: [
{
key: "react",
value: JSON.stringify({
type: "download",
package: "react",
timestamp: Date.now(),
}),
},
],
})
await producer.disconnect()
// Consumer:
const consumer = kafka.consumer({ groupId: "analytics-service" })
await consumer.connect()
await consumer.subscribe({ topic: "package-events", fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
key: message.key?.toString(),
value: message.value?.toString(),
offset: message.offset,
})
},
})
Consumer groups and partitions
import { Kafka } from "kafkajs"
const kafka = new Kafka({
clientId: "pkgpulse",
brokers: ["broker1:9092", "broker2:9092", "broker3:9092"],
})
// Create topic with partitions:
const admin = kafka.admin()
await admin.connect()
await admin.createTopics({
topics: [{
topic: "downloads",
numPartitions: 6,
replicationFactor: 3,
}],
})
await admin.disconnect()
// Consumer group — Kafka assigns partitions automatically:
const consumer = kafka.consumer({
groupId: "download-tracker",
sessionTimeout: 30000,
heartbeatInterval: 3000,
})
await consumer.connect()
await consumer.subscribe({ topic: "downloads" })
await consumer.run({
partitionsConsumedConcurrently: 3, // Process 3 partitions concurrently
eachMessage: async ({ topic, partition, message }) => {
const data = JSON.parse(message.value!.toString())
await trackDownload(data)
},
})
Batch processing
import { Kafka } from "kafkajs"
const kafka = new Kafka({ clientId: "pkgpulse", brokers: ["localhost:9092"] })
// Batch producer:
const producer = kafka.producer({
allowAutoTopicCreation: false,
transactionTimeout: 30000,
})
await producer.connect()
await producer.sendBatch({
topicMessages: [
{
topic: "package-downloads",
messages: packages.map((pkg) => ({
key: pkg.name,
value: JSON.stringify(pkg),
headers: { source: "npm-registry" },
})),
},
],
})
// Batch consumer:
const consumer = kafka.consumer({ groupId: "batch-processor" })
await consumer.connect()
await consumer.subscribe({ topic: "package-downloads" })
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
const data = JSON.parse(message.value!.toString())
await processPackage(data)
resolveOffset(message.offset)
await heartbeat() // Keep session alive during long batches
}
},
})
Error handling and retries
import { Kafka } from "kafkajs"
const kafka = new Kafka({
clientId: "pkgpulse",
brokers: ["localhost:9092"],
retry: {
initialRetryTime: 300,
retries: 8,
maxRetryTime: 30000,
factor: 2,
},
})
const consumer = kafka.consumer({ groupId: "resilient-service" })
await consumer.connect()
await consumer.subscribe({ topic: "events" })
await consumer.run({
eachMessage: async ({ message, topic, partition }) => {
try {
await processEvent(JSON.parse(message.value!.toString()))
} catch (err) {
// Send to dead letter topic:
await producer.send({
topic: `${topic}.dlq`,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
"error": Buffer.from(err.message),
"original-topic": Buffer.from(topic),
"original-partition": Buffer.from(String(partition)),
},
}],
})
}
},
})
Redis Streams
ioredis — Redis Streams:
Basic stream
import Redis from "ioredis"
const redis = new Redis()
// Add to stream (XADD):
await redis.xadd("package-events", "*",
"type", "download",
"package", "react",
"version", "19.0.0",
"timestamp", String(Date.now()),
)
// Read from stream (XREAD):
const messages = await redis.xread(
"COUNT", 10,
"BLOCK", 5000, // Block for 5 seconds
"STREAMS", "package-events",
"0" // Read from beginning ("$" for new messages only)
)
if (messages) {
for (const [stream, entries] of messages) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {}
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1]
}
console.log(`Stream ${stream}, ID ${id}:`, data)
}
}
}
Consumer groups
import Redis from "ioredis"
const redis = new Redis()
// Create consumer group:
try {
await redis.xgroup("CREATE", "events", "analytics-group", "0", "MKSTREAM")
} catch (e) {
// Group already exists
}
// Consumer reads from group (XREADGROUP):
async function consumeFromGroup(consumerName: string) {
while (true) {
const messages = await redis.xreadgroup(
"GROUP", "analytics-group", consumerName,
"COUNT", 10,
"BLOCK", 5000,
"STREAMS", "events",
">" // Only new messages
)
if (!messages) continue
for (const [stream, entries] of messages) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {}
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1]
}
await processEvent(data)
// Acknowledge:
await redis.xack("events", "analytics-group", id)
}
}
}
}
// Run multiple consumers:
consumeFromGroup("consumer-1")
consumeFromGroup("consumer-2")
Pending and claiming
import Redis from "ioredis"
const redis = new Redis()
// Check pending messages (unacknowledged):
const pending = await redis.xpending(
"events", "analytics-group",
"-", "+", 10 // Range and count
)
console.log("Pending messages:", pending)
// Claim stale messages (idle > 60s):
const claimed = await redis.xclaim(
"events", "analytics-group", "consumer-recovery",
60000, // Min idle time (ms)
...pendingIds
)
// Auto-claim (Redis 6.2+):
const autoClaimed = await redis.xautoclaim(
"events", "analytics-group", "consumer-recovery",
60000, // Min idle time
"0", // Start ID
"COUNT", 10
)
// Trim stream (cap size):
await redis.xtrim("events", "MAXLEN", "~", 10000) // ~10K entries
Pub/Sub pattern with Streams
import Redis from "ioredis"
const redis = new Redis()
// Publisher:
async function publish(channel: string, data: Record<string, string>) {
await redis.xadd(channel, "MAXLEN", "~", "1000", "*",
...Object.entries(data).flat()
)
}
// Subscriber (real-time):
async function subscribe(channel: string, handler: (data: any) => void) {
let lastId = "$" // Only new messages
while (true) {
const messages = await redis.xread(
"COUNT", 10,
"BLOCK", 0, // Block indefinitely
"STREAMS", channel,
lastId
)
if (!messages) continue
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
const data: Record<string, string> = {}
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1]
}
handler(data)
lastId = id
}
}
}
}
// Usage:
await publish("notifications", { type: "release", package: "react" })
subscribe("notifications", (data) => console.log("Received:", data))
Feature Comparison
| Feature | amqplib (RabbitMQ) | KafkaJS | Redis Streams |
|---|---|---|---|
| Protocol | AMQP 0-9-1 | Kafka protocol | Redis protocol |
| Architecture | Message broker | Event log | Append-only stream |
| Message routing | ✅ (exchanges) | ✅ (topics/partitions) | ❌ (manual) |
| Consumer groups | ❌ (competing) | ✅ | ✅ |
| Message replay | ❌ (consumed once) | ✅ (offset-based) | ✅ (ID-based) |
| Ordering | Per-queue | Per-partition | Per-stream |
| Dead letter queue | ✅ (built-in) | Manual (DLQ topic) | Manual |
| Message TTL | ✅ | ✅ (retention) | ✅ (MAXLEN/MINID) |
| Transactions | ❌ | ✅ | ✅ (MULTI) |
| Throughput | High | Very high | High |
| Persistence | Disk | Disk (replicated) | RDB/AOF |
| Extra infra | RabbitMQ server | Kafka + ZooKeeper | Redis (existing) |
| TypeScript | ✅ (@types) | ✅ | ✅ (ioredis) |
| Weekly downloads | ~1.5M | ~1M | ~2M (ioredis) |
When to Use Each
Use amqplib (RabbitMQ) if:
- Need complex message routing (exchanges, routing keys, topics)
- Building request/reply or RPC patterns
- Want dead letter queues and message TTL out of the box
- Building microservices with diverse messaging patterns
Use KafkaJS if:
- Need high-throughput event streaming
- Want message replay (consumers can re-read history)
- Building event-driven architecture at scale
- Need ordered processing with partitioned topics
Use Redis Streams if:
- Already running Redis and want to avoid extra infrastructure
- Need lightweight streaming with consumer groups
- Building real-time features (notifications, activity feeds)
- Want the simplest setup for basic streaming needs
Production Deployment and Infrastructure Considerations
Each messaging system carries different operational weight in production. RabbitMQ is a mature, single-binary service that runs easily on a single node for moderate workloads, with clustering available for high availability. A typical production RabbitMQ deployment uses quorum queues (introduced in RabbitMQ 3.8) rather than classic mirrored queues for better durability guarantees. Kafka requires more infrastructure investment: a ZooKeeper ensemble or KRaft cluster (Kafka's newer self-managed consensus mode), multiple broker nodes for replication, and careful partition planning based on expected throughput. For teams without dedicated operations capacity, managed Kafka services like Confluent Cloud or AWS MSK significantly reduce this burden. Redis Streams piggybacks on your existing Redis deployment — if you're already running Redis for caching, you get streaming capabilities without any additional infrastructure, which makes it the most operationally lightweight option for teams that don't need Kafka-scale throughput.
TypeScript Integration and Message Typing
Strongly typing message payloads is important for maintaining large message-driven systems. With amqplib, messages arrive as raw Buffer objects, so you deserialize and validate manually — a good practice is to define a Zod or io-ts schema for each message type and parse at the consumer boundary. KafkaJS similarly delivers raw Buffer values for message keys and payloads, and community libraries like kafkajs-avro integrate Avro schema registry support for schema-enforced serialization. Redis Streams use string key-value pairs as the native format (via Redis protocol), so JSON serialization is the conventional approach, with manual type assertions or Zod parsing at consumption time. None of these libraries provide built-in TypeScript generics for message payloads, but wrapping them in typed producer/consumer factory functions is a standard pattern that prevents type drift between publishers and subscribers across a large codebase.
Connection Management and Error Recovery
Production messaging clients must handle connection failures gracefully. amqplib does not automatically reconnect when the AMQP connection drops — the connection.close event fires and you must implement reconnection logic manually, typically with exponential backoff. Community wrappers like amqp-connection-manager add automatic reconnection and channel re-creation on top of amqplib, making it production-ready without writing reconnection code from scratch. KafkaJS has built-in retry logic for producer and consumer operations, configured through the retry option in the Kafka client constructor. The consumer also handles rebalancing events automatically when consumer group members join or leave, pausing consumption during the rebalance and resuming afterward without message loss. ioredis includes automatic reconnection and command buffering by default — when Redis is temporarily unavailable, commands queue in memory and execute when the connection is restored, which handles transient network issues transparently. For Redis Streams specifically, consumer group consumption survives Redis restarts if you use AOF persistence or RDB snapshots, because the stream data and consumer group state persist to disk.
Message Durability and Delivery Guarantees
Understanding the delivery semantics of each system is essential for choosing the right tool. RabbitMQ with persistent messages and durable queues provides at-least-once delivery — if a consumer crashes before acknowledging, the message is requeued and delivered again. This means consumers must be idempotent for correctness. KafkaJS's offset-based consumption also provides at-least-once delivery by default, and exactly-once semantics require enabling Kafka transactions (supported in KafkaJS v2.x), which adds complexity but eliminates duplicate processing for financial or inventory systems. Redis Streams with consumer groups guarantee at-least-once delivery through the pending entries list, and the XAUTOCLAIM command (Redis 6.2+) automates claiming stalled messages from crashed consumers without manual monitoring. For systems where message loss is unacceptable but duplicates are tolerable, all three support durable at-least-once delivery; for exactly-once, only Kafka provides native transactional support.
Security and Network Considerations
Production messaging systems should enforce authentication and encryption. RabbitMQ supports AMQP over TLS (amqps://), username/password authentication, and LDAP integration through its management plugin. The amqplib connection URL should always use amqps:// in production rather than plaintext amqp://. KafkaJS supports SASL authentication (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and TLS for both broker-to-broker and client-to-broker communication, configured through the ssl and sasl options in the Kafka client constructor. Redis Streams security depends on your Redis configuration — Redis 6+ supports ACL-based user permissions that can restrict specific clients to specific streams, combined with TLS support via tls options in ioredis. For cloud deployments, placing your message broker in a private subnet with VPC-only access is a baseline security requirement that eliminates an entire class of exposure.
Observability and Dead Letter Handling
Production messaging systems require robust observability to detect backlogs, consumer lag, and processing failures before they become outages. RabbitMQ exposes detailed metrics through its management HTTP API and integrates natively with Prometheus via the rabbitmq_prometheus plugin — queue depth, message rates, and consumer counts are all available. KafkaJS works with Kafka's built-in metrics exposed via JMX, and consumer group lag monitoring (the difference between the latest offset and the committed consumer offset) is the key metric for detecting processing slowdowns. The kafkajs-prometheus-exporter community package makes this accessible for Prometheus/Grafana stacks. Redis Streams metrics are available through standard Redis monitoring tools and the INFO command, with stream length (XLEN) as the primary backlog indicator. For dead letter handling, RabbitMQ's built-in dead letter exchange pattern is the most ergonomic, while Kafka conventionally routes failed messages to a separate dead-letter topic using the same producer/consumer infrastructure.
Methodology
Download data from npm registry (weekly average, February 2026). Feature comparison based on amqplib v0.10.x, KafkaJS v2.x, and ioredis v5.x (Redis Streams).
Compare messaging and backend libraries on PkgPulse →
See also: AVA vs Jest and Payload CMS vs Strapi vs Directus, BullMQ vs Bee-Queue vs pg-boss.