Skip to content

Commit

Permalink
Merge pull request #22 from inaiat/feat/consumer-events
Browse files Browse the repository at this point in the history
Feat/consumer events
  • Loading branch information
inaiat authored Jan 8, 2025
2 parents 066c4d6 + 9554ba9 commit 3306997
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 126 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ A lightweight, flexible, and reliable Kafka client for JavaScript/TypeScript. It

1. [Installation](#installation)
2. [Quick Start](#quick-start)
3. [Consumer Examples](#consumer-examples)
3. [Consumer Examples](#basic-consumer-setup)
4. [Producer Examples](#producer-examples)
5. [Stream Processing](#stream-processing)
6. [Configuration](#configuration)
Expand Down
50 changes: 50 additions & 0 deletions example/events.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { KafkaClient } from '../index.js'

const kafkaClient = new KafkaClient({
brokers: 'localhost:29092',
clientId: 'abc',
logLevel: 'info',
brokerAddressFamily: 'v4',
})

const consumer = kafkaClient.createConsumer({
topic,
groupId: 'xyz',
})

// If you want to consume events, you need call shutdownConsumer() to stop the consumer and release resources
consumer.onEvents((err, event) => {
console.log(
'Event:',
event.name,
event.payload.tpl
.map(it =>
`Topic: ${it.topic},
${
it.partitionOffset.map(po => `partition: ${po.partition}, offset: ${po.offset.offset}`)
.join(',')
}`
),
)
})

consumer.subscribe('foo')

const printMessage = async () => {
let shutdown = false
while (!shutdown) {
const msg = await consumer.recv()
if (msg) {
console.log('Message receive', msg.payload.toString())
} else {
console.log('The consumer has been shutdown')
shutdown = true
}
}
}

process.on('SIGINT', () => {
consumer.shutdownConsumer()
})

await printMessage()
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export type { ConsumerConfiguration, KafkaConfiguration, KafkaCrabError, Message, MessageProducer, OffsetModel, PartitionOffset, ProducerConfiguration, ProducerRecord, RecordMetadata, RetryStrategy, TopicPartitionConfig, } from './js-binding';
export type { ConsumerConfiguration, KafkaConfiguration, KafkaCrabError, KafkaEvent, KafkaEventPayload, Message, MessageProducer, OffsetModel, PartitionOffset, ProducerConfiguration, ProducerRecord, RecordMetadata, RetryStrategy, TopicPartition, TopicPartitionConfig, } from './js-binding';
export { CommitMode, KafkaClientConfig, KafkaConsumer, KafkaProducer, PartitionPosition, SecurityProtocol, } from './js-binding';
export { KafkaClient } from './kafka-client';
export { KafkaStreamReadable } from './kafka-stream-readable';
18 changes: 17 additions & 1 deletion js-binding.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

/* auto-generated by NAPI-RS */

export interface KafkaEventPayload {
action?: string
tpl: Array<TopicPartition>
error?: string
}
export interface KafkaEvent {
name: string
payload: KafkaEventPayload
}
export interface RetryStrategy {
retries: number
retryTopic?: string
Expand All @@ -25,7 +34,8 @@ export interface ConsumerConfiguration {
export enum PartitionPosition {
Beginning = 'Beginning',
End = 'End',
Stored = 'Stored'
Stored = 'Stored',
Invalid = 'Invalid'
}
export interface OffsetModel {
offset?: number
Expand All @@ -40,6 +50,10 @@ export interface TopicPartitionConfig {
allOffsets?: OffsetModel
partitionOffset?: Array<PartitionOffset>
}
export interface TopicPartition {
topic: string
partitionOffset: Array<PartitionOffset>
}
export enum SecurityProtocol {
Plaintext = 'Plaintext',
Ssl = 'Ssl',
Expand Down Expand Up @@ -88,12 +102,14 @@ export interface ProducerConfiguration {
configuration?: Record<string, string>
}
export declare class KafkaConsumer {
onEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void
subscribe(topicConfigs: string | Array<TopicPartitionConfig>): Promise<void>
pause(): void
resume(): void
unsubscribe(): void
shutdownConsumer(): Promise<void>
seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void
assignment(): Array<TopicPartition>
recv(): Promise<Message | null>
commit(topic: string, partition: number, offset: number, commit: CommitMode): void
}
Expand Down
3 changes: 3 additions & 0 deletions js-src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export type {
ConsumerConfiguration,
KafkaConfiguration,
KafkaCrabError,
KafkaEvent,
KafkaEventPayload,
Message,
MessageProducer,
OffsetModel,
Expand All @@ -10,6 +12,7 @@ export type {
ProducerRecord,
RecordMetadata,
RetryStrategy,
TopicPartition,
TopicPartitionConfig,
} from './js-binding'

Expand Down
18 changes: 17 additions & 1 deletion js-src/js-binding.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

/* auto-generated by NAPI-RS */

export interface KafkaEventPayload {
action?: string
tpl: Array<TopicPartition>
error?: string
}
export interface KafkaEvent {
name: string
payload: KafkaEventPayload
}
export interface RetryStrategy {
retries: number
retryTopic?: string
Expand All @@ -25,7 +34,8 @@ export interface ConsumerConfiguration {
export enum PartitionPosition {
Beginning = 'Beginning',
End = 'End',
Stored = 'Stored'
Stored = 'Stored',
Invalid = 'Invalid'
}
export interface OffsetModel {
offset?: number
Expand All @@ -40,6 +50,10 @@ export interface TopicPartitionConfig {
allOffsets?: OffsetModel
partitionOffset?: Array<PartitionOffset>
}
export interface TopicPartition {
topic: string
partitionOffset: Array<PartitionOffset>
}
export enum SecurityProtocol {
Plaintext = 'Plaintext',
Ssl = 'Ssl',
Expand Down Expand Up @@ -88,12 +102,14 @@ export interface ProducerConfiguration {
configuration?: Record<string, string>
}
export declare class KafkaConsumer {
onEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void
subscribe(topicConfigs: string | Array<TopicPartitionConfig>): Promise<void>
pause(): void
resume(): void
unsubscribe(): void
shutdownConsumer(): Promise<void>
seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void
assignment(): Array<TopicPartition>
recv(): Promise<Message | null>
commit(topic: string, partition: number, offset: number, commit: CommitMode): void
}
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-crab-js",
"version": "1.1.2",
"version": "1.2.0",
"main": "index.js",
"types": "index.d.ts",
"module": "commonjs",
Expand Down Expand Up @@ -42,7 +42,7 @@
"devDependencies": {
"@faker-js/faker": "9.3.0",
"@napi-rs/cli": "^2.18.4",
"@types/node": "22.10.1",
"@types/node": "22.10.5",
"ava": "6.2.0",
"dprint": "^0.48.0",
"nanoid": "5.0.9",
Expand All @@ -65,5 +65,5 @@
"version": "napi version",
"fmt": "dprint fmt"
},
"packageManager": "pnpm@9.15.2"
"packageManager": "pnpm@9.15.3"
}
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 59 additions & 12 deletions src/kafka/consumer/consumer_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,63 @@ use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Offset, TopicPartitionList,
};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, warn};

use crate::kafka::{consumer::model::LoggingConsumer, kafka_admin::KafkaAdmin};
use crate::kafka::{consumer::context::LoggingConsumer, kafka_admin::KafkaAdmin};

use super::model::{
ConsumerConfiguration, CustomContext, OffsetModel, PartitionOffset, PartitionPosition,
use super::{
context::KafkaCrabContext,
model::{ConsumerConfiguration, OffsetModel, PartitionOffset, PartitionPosition, TopicPartition},
};

pub fn convert_to_rdkafka_offset(offset_model: &OffsetModel) -> Offset {
match offset_model.position {
Some(PartitionPosition::Beginning) => Offset::Beginning,
Some(PartitionPosition::End) => Offset::End,
Some(PartitionPosition::Stored) => Offset::Stored,
Some(PartitionPosition::Invalid) => Offset::Invalid,
None => match offset_model.offset {
Some(value) => Offset::Offset(value),
None => Offset::Stored, // Default to stored
},
}
}

pub fn convert_to_offset_model(offset: &Offset) -> OffsetModel {
match offset {
Offset::Beginning => OffsetModel {
position: Some(PartitionPosition::Beginning),
offset: None,
},
Offset::End => OffsetModel {
position: Some(PartitionPosition::End),
offset: None,
},
Offset::Stored => OffsetModel {
position: Some(PartitionPosition::Stored),
offset: None,
},
Offset::Invalid => OffsetModel {
position: Some(PartitionPosition::Invalid),
offset: None,
},
Offset::Offset(value) => OffsetModel {
position: None,
offset: Some(*value),
},
Offset::OffsetTail(value) => OffsetModel {
position: None,
offset: Some(*value),
},
}
}

pub fn create_stream_consumer(
client_config: &ClientConfig,
consumer_configuration: &ConsumerConfiguration,
configuration: Option<HashMap<String, String>>,
) -> anyhow::Result<StreamConsumer<CustomContext>> {
let context = CustomContext;
) -> anyhow::Result<StreamConsumer<KafkaCrabContext>> {
let context = KafkaCrabContext::new();

let ConsumerConfiguration {
group_id,
Expand Down Expand Up @@ -59,7 +90,7 @@ pub fn create_stream_consumer(
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)?;

info!("Consumer created. Group id: {:?}", group_id);
debug!("Consumer created. Group id: {:?}", group_id);
Ok(consumer)
}

Expand All @@ -86,13 +117,13 @@ pub async fn try_create_topic(
warn!("Fail to create topic {:?}", e);
return Err(anyhow::Error::msg(format!("Fail to create topic: {:?}", e)));
}
info!("Topic(s) created: {:?}", topics);
debug!("Topic(s) created: {:?}", topics);
Ok(())
}

pub fn set_offset_of_all_partitions(
offset_model: &OffsetModel,
consumer: &StreamConsumer<CustomContext>,
consumer: &StreamConsumer<KafkaCrabContext>,
topic: &str,
timeout: Duration,
) -> anyhow::Result<()> {
Expand All @@ -103,7 +134,7 @@ pub fn set_offset_of_all_partitions(
metadata.topics().iter().for_each(|meta_topic| {
let mut tpl = TopicPartitionList::new();
meta_topic.partitions().iter().for_each(|meta_partition| {
info!("Adding partition: {:?}", meta_partition.id());
debug!("Adding partition: {:?}", meta_partition.id());
tpl.add_partition(topic, meta_partition.id());
});
match tpl.set_all_offsets(offset) {
Expand Down Expand Up @@ -131,7 +162,7 @@ pub fn assign_offset_or_use_metadata(
topic: &str,
partition_offset: Option<Vec<PartitionOffset>>,
offset_model: Option<&OffsetModel>,
consumer: &StreamConsumer<CustomContext>,
consumer: &StreamConsumer<KafkaCrabContext>,
timeout: Duration,
) -> anyhow::Result<()> {
let mut tpl = TopicPartitionList::new();
Expand All @@ -150,7 +181,7 @@ pub fn assign_offset_or_use_metadata(
let metadata = consumer.fetch_metadata(Some(topic), timeout)?;
for meta_topic in metadata.topics() {
for meta_partition in meta_topic.partitions() {
info!(
debug!(
"Adding partition: {:?} with offset: {:?} for topic: {:?}",
meta_partition.id(),
offset,
Expand All @@ -165,3 +196,19 @@ pub fn assign_offset_or_use_metadata(
consumer.assign(&tpl)?;
Ok(())
}

pub fn convert_tpl_to_array_of_topic_partition(tpl: &TopicPartitionList) -> Vec<TopicPartition> {
tpl
.elements()
.iter()
.map(|tp| {
return TopicPartition {
topic: tp.topic().to_owned(),
partition_offset: vec![PartitionOffset {
partition: tp.partition(),
offset: convert_to_offset_model(&tp.offset()),
}],
};
})
.collect()
}
Loading

0 comments on commit 3306997

Please sign in to comment.