Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.0.3 #14

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ napi-derive = "2.16"

anyhow = { version = "1.0", features = ["backtrace"] }

rdkafka = { version = "0.36", features = [
rdkafka = { version = "0.37", features = [
"libz-static",
"cmake-build",
"tokio",
Expand Down
119 changes: 113 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# 🦀 Kafka Crab JS 🦀

A lightweight and flexible Kafka client for JavaScript/TypeScript, built with Rust-inspired reliability.
A lightweight, flexible, and reliable Kafka client for JavaScript/TypeScript. It is built using Rust and librdkafka, providing a high-performance and feature-rich Kafka client.

[![npm version](https://img.shields.io/npm/v/kafka-crab-js.svg)](https://www.npmjs.com/package/kafka-crab-js)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Expand Down Expand Up @@ -205,12 +205,124 @@ async function produceWithMetadata() {
}
```

### Reconnecting Kafka Consumer

```javascript
import { KafkaClient } from 'kafka-crab-js'

const kafkaClient = new KafkaClient({
brokers: 'localhost:29092',
clientId: 'reconnect-test',
logLevel: 'debug',
brokerAddressFamily: 'v4',
configuration: {
'auto.offset.reset': 'earliest',
},
})

/**
* Creates and configures a new Kafka stream consumer
*/
async function createConsumer() {
const kafkaStream = kafkaClient.createStreamConsumer({
groupId: 'reconnect-test',
enableAutoCommit: true,
})
await kafkaStream.subscribe([
{ topic: 'foo' },
{ topic: 'bar' },
])
return kafkaStream
}

/**
* Starts a Kafka consumer with auto-restart capability
*/
async function startConsumer() {
let counter = 0
let retryCount = 0
const MAX_RETRIES = 5
const RETRY_DELAY = 5000 // 5 seconds

async function handleRetry() {
if (retryCount < MAX_RETRIES) {
retryCount++
console.log(
`Attempting to restart consumer (attempt ${retryCount}/${MAX_RETRIES}) in ${RETRY_DELAY / 1000} seconds...`,
)
setTimeout(setupConsumerWithRetry, RETRY_DELAY)
} else {
console.error(`Maximum retry attempts (${MAX_RETRIES}) reached. Stopping consumer.`)
process.exit(1)
}
}

async function setupConsumerWithRetry() {
try {
const kafkaStream = await createConsumer()
retryCount = 0 // Reset retry count on successful connection

console.log('Starting consumer')

kafkaStream.on('data', (message) => {
counter++
console.log('>>> Message received:', {
counter,
payload: message.payload.toString(),
offset: message.offset,
partition: message.partition,
topic: message.topic,
})
})

kafkaStream.on('error', async (error) => {
console.error('Stream error:', error)
handleRetry()
})

kafkaStream.on('close', () => {
console.log('Stream ended')
try {
kafkaStream.unsubscribe()
} catch (unsubError) {
console.error('Error unsubscribing:', unsubError)
}
})
} catch (error) {
console.error('Error setting up consumer:', error)
handleRetry()
}
}

await setupConsumerWithRetry()
}

await startConsumer()
```

### Examples

You can find some examples on the [example](https://github.com/mthssdrbrg/kafka-crab-js/tree/main/example) folder of this project.

## Configuration

### Configuration properties

### KafkaConfiguration

| Property | Type | Default | Description |
| --- | --- | --- | --- |
| `brokers` | `string` || List of brokers to connect to |
| `clientId` | `string` || Client id to use for the connection |
| `securityProtocol` | `SecurityProtocol` || Security protocol to use (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) |
| `logLevel` | `string` | `info` | Client id to use for the connection |
| `brokerAddressFamily` | `string` | `"v4"` | Address family to use for the connection (v4, v6) |
| `configuration` | `Map<string, string>` | `{}` | Additional configuration options for the client. See [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) |

You can see the available options here: [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).



## Best Practices

### Error Handling
Expand All @@ -223,11 +335,6 @@ You can see the available options here: [librdkafka](https://github.com/edenhill
- Configure appropriate batch sizes and compression
- Monitor and tune consumer group performance

### Resource Management
- Implement proper shutdown procedures
- Clean up resources (disconnect producers/consumers)
- Handle process signals (SIGTERM, SIGINT)

### Message Processing
- Validate message formats before processing
- Implement proper serialization/deserialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,58 @@ import { KafkaClient } from '../index.js'

const kafkaClient = new KafkaClient({
brokers: 'localhost:29092',
clientId: 'my-js-group-11',
clientId: 'reconnect-test',
logLevel: 'debug',
brokerAddressFamily: 'v4',
configuration: {
'auto.offset.reset': 'earliest',
},
})

/**
* Creates and configures a new Kafka stream consumer
*/
async function createConsumer() {
const kafkaStream = kafkaClient.createStreamConsumer({
groupId: `crab-test`,
groupId: `reconnect-test-3`,
enableAutoCommit: true,
configuration: {
'auto.offset.reset': 'earliest',
},
})

await kafkaStream.subscribe([
{ topic: 'foo' },
{ topic: 'bar' },
])

return kafkaStream
}

/**
* Starts a Kafka consumer with auto-restart capability
*/
async function startConsumer() {
let counter = 0
let retryCount = 0
const MAX_RETRIES = 5
const RETRY_DELAY = 5000 // 5 seconds

async function handleRetry() {
if (retryCount < MAX_RETRIES) {
retryCount++
console.log(
`Attempting to restart consumer (attempt ${retryCount}/${MAX_RETRIES}) in ${RETRY_DELAY / 1000} seconds...`,
)
setTimeout(setupConsumerWithRetry, RETRY_DELAY)
} else {
console.error(`Maximum retry attempts (${MAX_RETRIES}) reached. Stopping consumer.`)
process.exit(1)
}
}

async function setupConsumerWithRetry() {
try {
const kafkaStream = await createConsumer()
retryCount = 0 // Reset retry count on successful connection
let counter = 0

console.log('Starting consumer')

Expand All @@ -53,41 +70,20 @@ async function startConsumer() {

kafkaStream.on('error', async (error) => {
console.error('Stream error:', error)
handleRetry()
})

kafkaStream.on('close', () => {
console.log('Stream ended')
try {
kafkaStream.unsubscribe()
} catch (unsubError) {
console.error('Error unsubscribing:', unsubError)
}

if (retryCount < MAX_RETRIES) {
retryCount++
console.log(
`Attempting to restart consumer (attempt ${retryCount}/${MAX_RETRIES}) in ${RETRY_DELAY / 1000} seconds...`,
)
setTimeout(setupConsumerWithRetry, RETRY_DELAY)
} else {
console.error(`Maximum retry attempts (${MAX_RETRIES}) reached. Stopping consumer.`)
process.exit(1)
}
})

kafkaStream.on('close', () => {
console.log('Stream ended')
kafkaStream.unsubscribe()
})
} catch (error) {
console.error('Error setting up consumer:', error)
if (retryCount < MAX_RETRIES) {
retryCount++
console.log(
`Attempting to restart consumer (attempt ${retryCount}/${MAX_RETRIES}) in ${RETRY_DELAY / 1000} seconds...`,
)
setTimeout(setupConsumerWithRetry, RETRY_DELAY)
} else {
console.error(`Maximum retry attempts (${MAX_RETRIES}) reached. Stopping consumer.`)
process.exit(1)
}
handleRetry()
}
}

Expand Down
69 changes: 0 additions & 69 deletions example/sample.mjs

This file was deleted.

53 changes: 53 additions & 0 deletions example/simple.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { nanoid } from 'nanoid'
import { Buffer } from 'node:buffer'
import { CommitMode, KafkaClient, SecurityProtocol } from '../index.js'

const kafkaClient = new KafkaClient({
brokers: 'localhost:29092',
clientId: 'my-js-group',
securityProtocol: SecurityProtocol.Plaintext,
logLevel: 'debug',
brokerAddressFamily: 'v4',
})
const topic = `topic-${nanoid()}`

async function produce() {
const producer = kafkaClient.createProducer({ topic, configuration: { 'message.timeout.ms': '5000' } })
for (let i = 0; i < 10; i++) {
try {
const result = await producer.send(
{
topic,
messages: [{
key: Buffer.from(nanoid()),
headers: { 'correlation-id': Buffer.from(nanoid()) },
payload: Buffer.from(`{"_id":"${i}","name":"Elizeu Drummond","phone":"1234567890"}`),
}],
},
)
console.log('Js message sent. Offset:', result)
} catch (error) {
console.error('Js Error on send', error)
}
}
}

async function startConsumer() {
const consumer = kafkaClient.createConsumer({
topic,
groupId: 'my-js-group2',
configuration: {
'auto.offset.reset': 'earliest',
},
})
await consumer.subscribe(topic)
while (true) {
const message = await consumer.recv()
const { partition, offset, headers, payload } = message
console.log('Message received! Partition:', partition, 'Offset:', offset, 'headers:',
Object.entries(headers).map(([k, v]) => ({ [k]: v.toString() })), 'Message => ', payload.toString())
}
}

await produce()
await startConsumer()
Loading