Skip to main content

amqplib vs KafkaJS vs Redis Streams: Message Queue Clients (2026)

·PkgPulse Team

TL;DR

amqplib is the RabbitMQ client for Node.js — AMQP 0-9-1 protocol, exchanges, routing keys, dead letter queues, the classic message broker. KafkaJS is the Apache Kafka client — distributed event streaming, consumer groups, topic partitions, ordered logs, high throughput. Redis Streams (via ioredis) is the lightweight stream — append-only log, consumer groups, built into Redis, no separate infrastructure. In 2026: amqplib for RabbitMQ message routing, KafkaJS for distributed event streaming at scale, Redis Streams for lightweight streaming with existing Redis.

Key Takeaways

  • amqplib: ~1.5M weekly downloads — RabbitMQ, AMQP, exchanges, routing, dead letters
  • KafkaJS: ~1M weekly downloads — Kafka, event streaming, partitions, consumer groups
  • Redis Streams: via ioredis ~2M downloads — append-only log, XADD/XREAD, lightweight
  • Different architectures: message broker (RabbitMQ), event log (Kafka), stream (Redis)
  • RabbitMQ excels at routing and message patterns
  • Kafka excels at high-throughput ordered event streaming

amqplib (RabbitMQ)

amqplib — AMQP client for RabbitMQ:

Basic publish/consume

import amqp from "amqplib"

// Publisher:
async function publishMessage() {
  const conn = await amqp.connect("amqp://localhost")
  const channel = await conn.createChannel()

  const queue = "tasks"
  await channel.assertQueue(queue, { durable: true })

  channel.sendToQueue(queue, Buffer.from(JSON.stringify({
    type: "process-package",
    name: "react",
  })), {
    persistent: true,  // Survive broker restart
  })

  console.log("Message sent")
  await channel.close()
  await conn.close()
}

// Consumer:
async function consumeMessages() {
  const conn = await amqp.connect("amqp://localhost")
  const channel = await conn.createChannel()

  const queue = "tasks"
  await channel.assertQueue(queue, { durable: true })
  await channel.prefetch(1)  // Process one at a time

  channel.consume(queue, async (msg) => {
    if (!msg) return

    const data = JSON.parse(msg.content.toString())
    console.log("Processing:", data)

    try {
      await processTask(data)
      channel.ack(msg)       // Acknowledge success
    } catch (err) {
      channel.nack(msg, false, true)  // Requeue on failure
    }
  })
}

Exchanges and routing

import amqp from "amqplib"

async function setupExchanges() {
  const conn = await amqp.connect("amqp://localhost")
  const channel = await conn.createChannel()

  // Direct exchange — route by exact key:
  await channel.assertExchange("packages", "direct", { durable: true })
  await channel.assertQueue("npm-packages", { durable: true })
  await channel.bindQueue("npm-packages", "packages", "npm")

  channel.publish("packages", "npm", Buffer.from(JSON.stringify({
    name: "react", version: "19.0.0",
  })))

  // Topic exchange — route by pattern:
  await channel.assertExchange("events", "topic", { durable: true })
  await channel.assertQueue("all-downloads", { durable: true })
  await channel.bindQueue("all-downloads", "events", "package.*.download")

  channel.publish("events", "package.react.download", Buffer.from("{}"))
  channel.publish("events", "package.vue.download", Buffer.from("{}"))

  // Fanout exchange — broadcast to all queues:
  await channel.assertExchange("notifications", "fanout", { durable: true })
  await channel.assertQueue("email-notifications", { durable: true })
  await channel.assertQueue("slack-notifications", { durable: true })
  await channel.bindQueue("email-notifications", "notifications", "")
  await channel.bindQueue("slack-notifications", "notifications", "")

  channel.publish("notifications", "", Buffer.from(JSON.stringify({
    message: "New package version released!",
  })))
}

Dead letter queue

import amqp from "amqplib"

async function setupDeadLetterQueue() {
  const conn = await amqp.connect("amqp://localhost")
  const channel = await conn.createChannel()

  // Dead letter exchange:
  await channel.assertExchange("dlx", "direct", { durable: true })
  await channel.assertQueue("dead-letters", { durable: true })
  await channel.bindQueue("dead-letters", "dlx", "failed")

  // Main queue with DLX:
  await channel.assertQueue("tasks", {
    durable: true,
    arguments: {
      "x-dead-letter-exchange": "dlx",
      "x-dead-letter-routing-key": "failed",
      "x-message-ttl": 60000,  // 60s TTL
    },
  })

  // Failed messages go to dead-letters queue automatically
  channel.consume("dead-letters", (msg) => {
    if (!msg) return
    console.error("Dead letter:", msg.content.toString())
    channel.ack(msg)
  })
}

KafkaJS

KafkaJS — Apache Kafka client:

Basic producer/consumer

import { Kafka } from "kafkajs"

const kafka = new Kafka({
  clientId: "pkgpulse",
  brokers: ["localhost:9092"],
})

// Producer:
const producer = kafka.producer()
await producer.connect()

await producer.send({
  topic: "package-events",
  messages: [
    {
      key: "react",
      value: JSON.stringify({
        type: "download",
        package: "react",
        timestamp: Date.now(),
      }),
    },
  ],
})

await producer.disconnect()

// Consumer:
const consumer = kafka.consumer({ groupId: "analytics-service" })
await consumer.connect()
await consumer.subscribe({ topic: "package-events", fromBeginning: true })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      topic,
      partition,
      key: message.key?.toString(),
      value: message.value?.toString(),
      offset: message.offset,
    })
  },
})

Consumer groups and partitions

import { Kafka } from "kafkajs"

const kafka = new Kafka({
  clientId: "pkgpulse",
  brokers: ["broker1:9092", "broker2:9092", "broker3:9092"],
})

// Create topic with partitions:
const admin = kafka.admin()
await admin.connect()
await admin.createTopics({
  topics: [{
    topic: "downloads",
    numPartitions: 6,
    replicationFactor: 3,
  }],
})
await admin.disconnect()

// Consumer group — Kafka assigns partitions automatically:
const consumer = kafka.consumer({
  groupId: "download-tracker",
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
})

await consumer.connect()
await consumer.subscribe({ topic: "downloads" })

await consumer.run({
  partitionsConsumedConcurrently: 3,  // Process 3 partitions concurrently
  eachMessage: async ({ topic, partition, message }) => {
    const data = JSON.parse(message.value!.toString())
    await trackDownload(data)
  },
})

Batch processing

import { Kafka } from "kafkajs"

const kafka = new Kafka({ clientId: "pkgpulse", brokers: ["localhost:9092"] })

// Batch producer:
const producer = kafka.producer({
  allowAutoTopicCreation: false,
  transactionTimeout: 30000,
})
await producer.connect()

await producer.sendBatch({
  topicMessages: [
    {
      topic: "package-downloads",
      messages: packages.map((pkg) => ({
        key: pkg.name,
        value: JSON.stringify(pkg),
        headers: { source: "npm-registry" },
      })),
    },
  ],
})

// Batch consumer:
const consumer = kafka.consumer({ groupId: "batch-processor" })
await consumer.connect()
await consumer.subscribe({ topic: "package-downloads" })

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
    for (const message of batch.messages) {
      const data = JSON.parse(message.value!.toString())
      await processPackage(data)
      resolveOffset(message.offset)
      await heartbeat()  // Keep session alive during long batches
    }
  },
})

Error handling and retries

import { Kafka } from "kafkajs"

const kafka = new Kafka({
  clientId: "pkgpulse",
  brokers: ["localhost:9092"],
  retry: {
    initialRetryTime: 300,
    retries: 8,
    maxRetryTime: 30000,
    factor: 2,
  },
})

const consumer = kafka.consumer({ groupId: "resilient-service" })
await consumer.connect()
await consumer.subscribe({ topic: "events" })

await consumer.run({
  eachMessage: async ({ message, topic, partition }) => {
    try {
      await processEvent(JSON.parse(message.value!.toString()))
    } catch (err) {
      // Send to dead letter topic:
      await producer.send({
        topic: `${topic}.dlq`,
        messages: [{
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            "error": Buffer.from(err.message),
            "original-topic": Buffer.from(topic),
            "original-partition": Buffer.from(String(partition)),
          },
        }],
      })
    }
  },
})

Redis Streams

ioredis — Redis Streams:

Basic stream

import Redis from "ioredis"

const redis = new Redis()

// Add to stream (XADD):
await redis.xadd("package-events", "*",
  "type", "download",
  "package", "react",
  "version", "19.0.0",
  "timestamp", String(Date.now()),
)

// Read from stream (XREAD):
const messages = await redis.xread(
  "COUNT", 10,
  "BLOCK", 5000,  // Block for 5 seconds
  "STREAMS", "package-events",
  "0"  // Read from beginning ("$" for new messages only)
)

if (messages) {
  for (const [stream, entries] of messages) {
    for (const [id, fields] of entries) {
      const data: Record<string, string> = {}
      for (let i = 0; i < fields.length; i += 2) {
        data[fields[i]] = fields[i + 1]
      }
      console.log(`Stream ${stream}, ID ${id}:`, data)
    }
  }
}

Consumer groups

import Redis from "ioredis"

const redis = new Redis()

// Create consumer group:
try {
  await redis.xgroup("CREATE", "events", "analytics-group", "0", "MKSTREAM")
} catch (e) {
  // Group already exists
}

// Consumer reads from group (XREADGROUP):
async function consumeFromGroup(consumerName: string) {
  while (true) {
    const messages = await redis.xreadgroup(
      "GROUP", "analytics-group", consumerName,
      "COUNT", 10,
      "BLOCK", 5000,
      "STREAMS", "events",
      ">"  // Only new messages
    )

    if (!messages) continue

    for (const [stream, entries] of messages) {
      for (const [id, fields] of entries) {
        const data: Record<string, string> = {}
        for (let i = 0; i < fields.length; i += 2) {
          data[fields[i]] = fields[i + 1]
        }

        await processEvent(data)

        // Acknowledge:
        await redis.xack("events", "analytics-group", id)
      }
    }
  }
}

// Run multiple consumers:
consumeFromGroup("consumer-1")
consumeFromGroup("consumer-2")

Pending and claiming

import Redis from "ioredis"

const redis = new Redis()

// Check pending messages (unacknowledged):
const pending = await redis.xpending(
  "events", "analytics-group",
  "-", "+", 10  // Range and count
)

console.log("Pending messages:", pending)

// Claim stale messages (idle > 60s):
const claimed = await redis.xclaim(
  "events", "analytics-group", "consumer-recovery",
  60000,  // Min idle time (ms)
  ...pendingIds
)

// Auto-claim (Redis 6.2+):
const autoClaimed = await redis.xautoclaim(
  "events", "analytics-group", "consumer-recovery",
  60000,  // Min idle time
  "0",    // Start ID
  "COUNT", 10
)

// Trim stream (cap size):
await redis.xtrim("events", "MAXLEN", "~", 10000)  // ~10K entries

Pub/Sub pattern with Streams

import Redis from "ioredis"

const redis = new Redis()

// Publisher:
async function publish(channel: string, data: Record<string, string>) {
  await redis.xadd(channel, "MAXLEN", "~", "1000", "*",
    ...Object.entries(data).flat()
  )
}

// Subscriber (real-time):
async function subscribe(channel: string, handler: (data: any) => void) {
  let lastId = "$"  // Only new messages

  while (true) {
    const messages = await redis.xread(
      "COUNT", 10,
      "BLOCK", 0,  // Block indefinitely
      "STREAMS", channel,
      lastId
    )

    if (!messages) continue

    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        const data: Record<string, string> = {}
        for (let i = 0; i < fields.length; i += 2) {
          data[fields[i]] = fields[i + 1]
        }
        handler(data)
        lastId = id
      }
    }
  }
}

// Usage:
await publish("notifications", { type: "release", package: "react" })
subscribe("notifications", (data) => console.log("Received:", data))

Feature Comparison

Featureamqplib (RabbitMQ)KafkaJSRedis Streams
ProtocolAMQP 0-9-1Kafka protocolRedis protocol
ArchitectureMessage brokerEvent logAppend-only stream
Message routing✅ (exchanges)✅ (topics/partitions)❌ (manual)
Consumer groups❌ (competing)
Message replay❌ (consumed once)✅ (offset-based)✅ (ID-based)
OrderingPer-queuePer-partitionPer-stream
Dead letter queue✅ (built-in)Manual (DLQ topic)Manual
Message TTL✅ (retention)✅ (MAXLEN/MINID)
Transactions✅ (MULTI)
ThroughputHighVery highHigh
PersistenceDiskDisk (replicated)RDB/AOF
Extra infraRabbitMQ serverKafka + ZooKeeperRedis (existing)
TypeScript✅ (@types)✅ (ioredis)
Weekly downloads~1.5M~1M~2M (ioredis)

When to Use Each

Use amqplib (RabbitMQ) if:

  • Need complex message routing (exchanges, routing keys, topics)
  • Building request/reply or RPC patterns
  • Want dead letter queues and message TTL out of the box
  • Building microservices with diverse messaging patterns

Use KafkaJS if:

  • Need high-throughput event streaming
  • Want message replay (consumers can re-read history)
  • Building event-driven architecture at scale
  • Need ordered processing with partitioned topics

Use Redis Streams if:

  • Already running Redis and want to avoid extra infrastructure
  • Need lightweight streaming with consumer groups
  • Building real-time features (notifications, activity feeds)
  • Want the simplest setup for basic streaming needs

Methodology

Download data from npm registry (weekly average, February 2026). Feature comparison based on amqplib v0.10.x, KafkaJS v2.x, and ioredis v5.x (Redis Streams).

Compare messaging and backend libraries on PkgPulse →

Comments

Stay Updated

Get the latest package insights, npm trends, and tooling tips delivered to your inbox.