From 205ea72aa36010ec9ea4aa61d83f9fff7278012b Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 14:46:10 -0300 Subject: [PATCH 1/6] feat: subscriber consumer events --- index.d.ts | 2 +- js-binding.d.ts | 18 +++- js-src/index.ts | 3 + js-src/js-binding.d.ts | 18 +++- package.json | 4 +- pnpm-lock.yaml | 10 +-- src/kafka/consumer/consumer_helper.rs | 61 +++++++++++-- src/kafka/consumer/kafka_consumer.rs | 121 +++++++++++++------------- src/kafka/consumer/mod.rs | 1 + src/kafka/consumer/model.rs | 43 +++------ src/kafka/kafka_util.rs | 15 +++- 11 files changed, 185 insertions(+), 111 deletions(-) diff --git a/index.d.ts b/index.d.ts index ed2414f..37f42e3 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,4 +1,4 @@ -export type { ConsumerConfiguration, KafkaConfiguration, KafkaCrabError, Message, MessageProducer, OffsetModel, PartitionOffset, ProducerConfiguration, ProducerRecord, RecordMetadata, RetryStrategy, TopicPartitionConfig, } from './js-binding'; +export type { ConsumerConfiguration, KafkaConfiguration, KafkaCrabError, KafkaEvent, KafkaEventPayload, Message, MessageProducer, OffsetModel, PartitionOffset, ProducerConfiguration, ProducerRecord, RecordMetadata, RetryStrategy, TopicPartition, TopicPartitionConfig, } from './js-binding'; export { CommitMode, KafkaClientConfig, KafkaConsumer, KafkaProducer, PartitionPosition, SecurityProtocol, } from './js-binding'; export { KafkaClient } from './kafka-client'; export { KafkaStreamReadable } from './kafka-stream-readable'; diff --git a/js-binding.d.ts b/js-binding.d.ts index b1e8926..947bd2e 100644 --- a/js-binding.d.ts +++ b/js-binding.d.ts @@ -3,6 +3,15 @@ /* auto-generated by NAPI-RS */ +export interface KafkaEventPayload { + action?: string + tpl: Array + error?: string +} +export interface KafkaEvent { + name: string + payload: KafkaEventPayload +} export interface RetryStrategy { retries: number retryTopic?: string @@ -25,7 +34,8 @@ export interface ConsumerConfiguration { export enum PartitionPosition { Beginning = 'Beginning', End = 'End', - Stored = 'Stored' + Stored = 'Stored', + Invalid = 'Invalid' } export interface OffsetModel { offset?: number @@ -40,6 +50,10 @@ export interface TopicPartitionConfig { allOffsets?: OffsetModel partitionOffset?: Array } +export interface TopicPartition { + topic: string + partitionOffset: Array +} export enum SecurityProtocol { Plaintext = 'Plaintext', Ssl = 'Ssl', @@ -88,12 +102,14 @@ export interface ProducerConfiguration { configuration?: Record } export declare class KafkaConsumer { + subscribeToConsumerEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void subscribe(topicConfigs: string | Array): Promise pause(): void resume(): void unsubscribe(): void shutdownConsumer(): Promise seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void + assignment(): Array recv(): Promise commit(topic: string, partition: number, offset: number, commit: CommitMode): void } diff --git a/js-src/index.ts b/js-src/index.ts index 4ecfaac..98ed45b 100644 --- a/js-src/index.ts +++ b/js-src/index.ts @@ -2,6 +2,8 @@ export type { ConsumerConfiguration, KafkaConfiguration, KafkaCrabError, + KafkaEvent, + KafkaEventPayload, Message, MessageProducer, OffsetModel, @@ -10,6 +12,7 @@ export type { ProducerRecord, RecordMetadata, RetryStrategy, + TopicPartition, TopicPartitionConfig, } from './js-binding' diff --git a/js-src/js-binding.d.ts b/js-src/js-binding.d.ts index b1e8926..947bd2e 100644 --- a/js-src/js-binding.d.ts +++ b/js-src/js-binding.d.ts @@ -3,6 +3,15 @@ /* auto-generated by NAPI-RS */ +export interface KafkaEventPayload { + action?: string + tpl: Array + error?: string +} +export interface KafkaEvent { + name: string + payload: KafkaEventPayload +} export interface RetryStrategy { retries: number retryTopic?: string @@ -25,7 +34,8 @@ export interface ConsumerConfiguration { export enum PartitionPosition { Beginning = 'Beginning', End = 'End', - Stored = 'Stored' + Stored = 'Stored', + Invalid = 'Invalid' } export interface OffsetModel { offset?: number @@ -40,6 +50,10 @@ export interface TopicPartitionConfig { allOffsets?: OffsetModel partitionOffset?: Array } +export interface TopicPartition { + topic: string + partitionOffset: Array +} export enum SecurityProtocol { Plaintext = 'Plaintext', Ssl = 'Ssl', @@ -88,12 +102,14 @@ export interface ProducerConfiguration { configuration?: Record } export declare class KafkaConsumer { + subscribeToConsumerEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void subscribe(topicConfigs: string | Array): Promise pause(): void resume(): void unsubscribe(): void shutdownConsumer(): Promise seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void + assignment(): Array recv(): Promise commit(topic: string, partition: number, offset: number, commit: CommitMode): void } diff --git a/package.json b/package.json index a818d23..a4ece8b 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "devDependencies": { "@faker-js/faker": "9.3.0", "@napi-rs/cli": "^2.18.4", - "@types/node": "22.10.1", + "@types/node": "22.10.5", "ava": "6.2.0", "dprint": "^0.48.0", "nanoid": "5.0.9", @@ -65,5 +65,5 @@ "version": "napi version", "fmt": "dprint fmt" }, - "packageManager": "pnpm@9.15.2" + "packageManager": "pnpm@9.15.3" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f7b9bb0..31ea971 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,8 +18,8 @@ importers: specifier: ^2.4.4 version: 2.4.4 '@types/node': - specifier: 22.10.1 - version: 22.10.1 + specifier: 22.10.5 + version: 22.10.5 ava: specifier: 6.2.0 version: 6.2.0 @@ -126,8 +126,8 @@ packages: '@types/copyfiles@2.4.4': resolution: {integrity: sha512-2PhDCltCORlPrpM3gxZ0XuYaDcCOU46SQ7E89uzgbJ4jhpzpya/ssIY0jBA/gHtvh3SE4dxW8aMitQk0/ewlIw==} - '@types/node@22.10.1': - resolution: {integrity: sha512-qKgsUwfHZV2WCWLAnVP1JqnpE6Im6h3Y0+fYgMTasNQ7V++CBX5OT1as0g0f+OyubbFqhf6XVNIsmN4IIhEgGQ==} + '@types/node@22.10.5': + resolution: {integrity: sha512-F8Q+SeGimwOo86fiovQh8qiXfFEh2/ocYv7tU5pJ3EXMSSxk1Joj5wefpFK2fHTf/N6HKGSxIDBT9f3gCxXPkQ==} '@vercel/nft@0.27.6': resolution: {integrity: sha512-mwuyUxskdcV8dd7N7JnxBgvFEz1D9UOePI/WyLLzktv6HSCwgPNQGit/UJ2IykAWGlypKw4pBQjOKWvIbXITSg==} @@ -1004,7 +1004,7 @@ snapshots: '@types/copyfiles@2.4.4': {} - '@types/node@22.10.1': + '@types/node@22.10.5': dependencies: undici-types: 6.20.0 diff --git a/src/kafka/consumer/consumer_helper.rs b/src/kafka/consumer/consumer_helper.rs index f366be0..6c10087 100644 --- a/src/kafka/consumer/consumer_helper.rs +++ b/src/kafka/consumer/consumer_helper.rs @@ -7,10 +7,11 @@ use rdkafka::{ }; use tracing::{debug, error, info, warn}; -use crate::kafka::{consumer::model::LoggingConsumer, kafka_admin::KafkaAdmin}; +use crate::kafka::{consumer::context::LoggingConsumer, kafka_admin::KafkaAdmin}; -use super::model::{ - ConsumerConfiguration, CustomContext, OffsetModel, PartitionOffset, PartitionPosition, +use super::{ + context::KafkaCrabContext, + model::{ConsumerConfiguration, OffsetModel, PartitionOffset, PartitionPosition, TopicPartition}, }; pub fn convert_to_rdkafka_offset(offset_model: &OffsetModel) -> Offset { @@ -18,6 +19,7 @@ pub fn convert_to_rdkafka_offset(offset_model: &OffsetModel) -> Offset { Some(PartitionPosition::Beginning) => Offset::Beginning, Some(PartitionPosition::End) => Offset::End, Some(PartitionPosition::Stored) => Offset::Stored, + Some(PartitionPosition::Invalid) => Offset::Invalid, None => match offset_model.offset { Some(value) => Offset::Offset(value), None => Offset::Stored, // Default to stored @@ -25,12 +27,41 @@ pub fn convert_to_rdkafka_offset(offset_model: &OffsetModel) -> Offset { } } +pub fn convert_to_offset_model(offset: &Offset) -> OffsetModel { + match offset { + Offset::Beginning => OffsetModel { + position: Some(PartitionPosition::Beginning), + offset: None, + }, + Offset::End => OffsetModel { + position: Some(PartitionPosition::End), + offset: None, + }, + Offset::Stored => OffsetModel { + position: Some(PartitionPosition::Stored), + offset: None, + }, + Offset::Invalid => OffsetModel { + position: Some(PartitionPosition::Invalid), + offset: None, + }, + Offset::Offset(value) => OffsetModel { + position: None, + offset: Some(*value), + }, + Offset::OffsetTail(value) => OffsetModel { + position: None, + offset: Some(*value), + }, + } +} + pub fn create_stream_consumer( client_config: &ClientConfig, consumer_configuration: &ConsumerConfiguration, configuration: Option>, -) -> anyhow::Result> { - let context = CustomContext; +) -> anyhow::Result> { + let context = KafkaCrabContext::new(); let ConsumerConfiguration { group_id, @@ -92,7 +123,7 @@ pub async fn try_create_topic( pub fn set_offset_of_all_partitions( offset_model: &OffsetModel, - consumer: &StreamConsumer, + consumer: &StreamConsumer, topic: &str, timeout: Duration, ) -> anyhow::Result<()> { @@ -131,7 +162,7 @@ pub fn assign_offset_or_use_metadata( topic: &str, partition_offset: Option>, offset_model: Option<&OffsetModel>, - consumer: &StreamConsumer, + consumer: &StreamConsumer, timeout: Duration, ) -> anyhow::Result<()> { let mut tpl = TopicPartitionList::new(); @@ -165,3 +196,19 @@ pub fn assign_offset_or_use_metadata( consumer.assign(&tpl)?; Ok(()) } + +pub fn convert_tpl_to_array_of_topic_partition(tpl: &TopicPartitionList) -> Vec { + tpl + .elements() + .iter() + .map(|tp| { + return TopicPartition { + topic: tp.topic().to_owned(), + partition_offset: vec![PartitionOffset { + partition: tp.partition(), + offset: convert_to_offset_model(&tp.offset()), + }], + }; + }) + .collect() +} diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index 3509aa8..5c9b960 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -1,7 +1,10 @@ -use std::time::Duration; +use std::{time::Duration, vec}; use tokio::sync::watch::{self}; -use napi::{Either, Result}; +use napi::{ + threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}, + Either, Result, +}; use rdkafka::{ consumer::{stream_consumer::StreamConsumer, CommitMode as RdKfafkaCommitMode, Consumer}, @@ -9,22 +12,25 @@ use rdkafka::{ ClientConfig, Message as RdMessage, Offset, }; -use tracing::{debug, error, info}; +use tracing::{debug, info}; use crate::kafka::{ consumer::consumer_helper::{ assign_offset_or_use_metadata, convert_to_rdkafka_offset, try_create_topic, try_subscribe, }, kafka_client_config::KafkaClientConfig, - kafka_util::{create_message, AnyhowToNapiError}, + kafka_util::{create_message, AnyhowToNapiError, IntoNapiError}, producer::model::Message, }; use super::{ - consumer_helper::{create_stream_consumer, set_offset_of_all_partitions}, + consumer_helper::{ + convert_tpl_to_array_of_topic_partition, create_stream_consumer, set_offset_of_all_partitions, + }, + context::{KafkaCrabContext, KafkaEvent}, model::{ - CommitMode, ConsumerConfiguration, CustomContext, OffsetModel, TopicPartitionConfig, - DEFAULT_FECTH_METADATA_TIMEOUT, + CommitMode, ConsumerConfiguration, OffsetModel, ShutdownSignal, TopicPartition, + TopicPartitionConfig, DEFAULT_FECTH_METADATA_TIMEOUT, }, }; @@ -32,12 +38,10 @@ use tokio::select; pub const DEFAULT_SEEK_TIMEOUT: i64 = 1500; -type ShutdownSignal = (watch::Sender<()>, watch::Receiver<()>); - #[napi] pub struct KafkaConsumer { client_config: ClientConfig, - stream_consumer: StreamConsumer, + stream_consumer: StreamConsumer, fecth_metadata_timeout: Duration, shutdown_signal: ShutdownSignal, } @@ -67,6 +71,32 @@ impl KafkaConsumer { }) } + #[napi(ts_args_type = "callback: (error: Error | undefined, event: KafkaEvent) => void")] + pub fn subscribe_to_consumer_events( + &self, + callback: ThreadsafeFunction, + ) -> Result<()> { + let mut rx = self.stream_consumer.context().tx_rx_signal.1.clone(); + let mut shutdown_signal = self.shutdown_signal.1.clone(); + + tokio::spawn(async move { + loop { + select! { + _ = rx.changed() => { + if let Some(event) = rx.borrow().clone() { + callback.call(Ok(event), ThreadsafeFunctionCallMode::NonBlocking); + } + } + _ = shutdown_signal.changed() => { + info!("Shutdown signal received and this will stop the consumer from receiving messages"); + break; + } + } + } + }); + Ok(()) + } + #[napi] pub async fn subscribe( &self, @@ -138,13 +168,10 @@ impl KafkaConsumer { } fn get_partitions(&self) -> Result { - let partitions = self.stream_consumer.assignment().map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while getting partitions: {:?}", e), - ) - })?; - + let partitions = self + .stream_consumer + .assignment() + .map_err(|e| e.into_napi_error("getting partitions"))?; Ok(partitions) } @@ -153,12 +180,7 @@ impl KafkaConsumer { self .stream_consumer .pause(&self.get_partitions()?) - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while pausing: {:?}", e), - ) - })?; + .map_err(|e| e.into_napi_error("error while pausing"))?; Ok(()) } @@ -167,12 +189,7 @@ impl KafkaConsumer { self .stream_consumer .resume(&self.get_partitions()?) - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while resuming: {:?}", e), - ) - })?; + .map_err(|e| e.into_napi_error("error while resuming"))?; Ok(()) } @@ -192,12 +209,8 @@ impl KafkaConsumer { // Then send shutdown signal let tx = self.shutdown_signal.0.clone(); - tx.send(()).map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error sending shutdown signal: {:?}", e), - ) - })?; + tx.send(()) + .map_err(|e| e.into_napi_error("Error sending shutdown signal"))?; Ok(()) } @@ -223,28 +236,26 @@ impl KafkaConsumer { offset, Duration::from_millis(timeout.unwrap_or(DEFAULT_SEEK_TIMEOUT) as u64), ) - .map_err(|e| { - error!("Error while seeking: {:?}", e); - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while seeking: {:?}", e), - ) - })?; + .map_err(|e| e.into_napi_error("Error while seeking"))?; Ok(()) } + #[napi] + pub fn assignment(&self) -> Result> { + let assignment = self + .stream_consumer + .assignment() + .map_err(|e| e.into_napi_error("error while getting assignment"))?; + Ok(convert_tpl_to_array_of_topic_partition(&assignment)) + } + #[napi] pub async fn recv(&self) -> Result> { let mut rx = self.shutdown_signal.1.clone(); select! { message = self.stream_consumer.recv() => { message - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while receiving from stream consumer: {:?}", e), - ) - }) + .map_err(|e| e.into_napi_error("Error while receiving from stream consumer")) .map(|message| Some(create_message(&message, message.payload().unwrap_or(&[])))) } _ = rx.changed() => { @@ -265,12 +276,7 @@ impl KafkaConsumer { let mut tpl = RdTopicPartitionList::new(); tpl .add_partition_offset(&topic, partition, Offset::Offset(offset)) - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while adding partition offset: {:?}", e), - ) - })?; + .map_err(|e| e.into_napi_error("Error while adding partition offset"))?; let commit_mode = match commit { CommitMode::Sync => RdKfafkaCommitMode::Sync, CommitMode::Async => RdKfafkaCommitMode::Async, @@ -278,12 +284,7 @@ impl KafkaConsumer { self .stream_consumer .commit(&tpl, commit_mode) - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while committing: {:?}", e), - ) - })?; + .map_err(|e| e.into_napi_error("Error while committing"))?; debug!("Commiting done. Tpl: {:?}", &tpl); Ok(()) } diff --git a/src/kafka/consumer/mod.rs b/src/kafka/consumer/mod.rs index 08c9f47..7c379df 100644 --- a/src/kafka/consumer/mod.rs +++ b/src/kafka/consumer/mod.rs @@ -1,3 +1,4 @@ pub mod consumer_helper; +pub mod context; pub mod kafka_consumer; pub mod model; diff --git a/src/kafka/consumer/model.rs b/src/kafka/consumer/model.rs index 43c8aa8..bd6768c 100644 --- a/src/kafka/consumer/model.rs +++ b/src/kafka/consumer/model.rs @@ -1,41 +1,10 @@ use std::{collections::HashMap, time::Duration}; -use rdkafka::{ - consumer::{BaseConsumer, ConsumerContext, Rebalance, StreamConsumer}, - error::KafkaResult, - ClientContext, TopicPartitionList, -}; -use tracing::info; +use tokio::sync::watch; pub const DEFAULT_FECTH_METADATA_TIMEOUT: Duration = Duration::from_millis(2000); -pub type LoggingConsumer = StreamConsumer; - -pub struct CustomContext; - -impl ClientContext for CustomContext {} - -impl ConsumerContext for CustomContext { - fn pre_rebalance(&self, consumer: &BaseConsumer, rebalance: &Rebalance) { - info!( - "Pre rebalance {:?}, consumer closed: {}", - rebalance, - consumer.closed() - ); - } - - fn post_rebalance(&self, consumer: &BaseConsumer, rebalance: &Rebalance) { - info!( - "Post rebalance {:?}, consumer closed: {} ", - rebalance, - consumer.closed() - ); - } - - fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { - info!("Committing offsets: {:?}. Offset: {:?}", result, _offsets); - } -} +pub type ShutdownSignal = (watch::Sender<()>, watch::Receiver<()>); #[napi(object)] #[derive(Clone, Debug)] @@ -71,6 +40,7 @@ pub enum PartitionPosition { Beginning, End, Stored, + Invalid, } #[napi(object)] #[derive(Clone, Debug)] @@ -93,3 +63,10 @@ pub struct TopicPartitionConfig { pub all_offsets: Option, pub partition_offset: Option>, } + +#[napi(object)] +#[derive(Clone, Debug)] +pub struct TopicPartition { + pub topic: String, + pub partition_offset: Vec, +} diff --git a/src/kafka/kafka_util.rs b/src/kafka/kafka_util.rs index 2a9a9bb..3bfea52 100644 --- a/src/kafka/kafka_util.rs +++ b/src/kafka/kafka_util.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use napi::{bindgen_prelude::Buffer, Status}; +use napi::{bindgen_prelude::Buffer, Error, Status}; use rdkafka::{ message::{BorrowedHeaders, BorrowedMessage, Header, Headers, OwnedHeaders}, Message as RdMessage, @@ -18,6 +18,19 @@ impl AnyhowToNapiError for anyhow::Error { } } +pub trait IntoNapiError { + fn into_napi_error(self, context: &str) -> Error; +} + +impl IntoNapiError for E { + fn into_napi_error(self, context: &str) -> Error { + Error::new( + Status::GenericFailure, + format!("Error while {}: {:?}", context, self), + ) + } +} + pub fn hashmap_to_kafka_headers(map: &HashMap) -> OwnedHeaders { map.iter().fold(OwnedHeaders::new(), |acc, (key, value)| { let value: &[u8] = value.as_ref(); From e4991ba16228226aaceb5cae064b28ae0b047fde Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 14:46:15 -0300 Subject: [PATCH 2/6] feat: subscriber consumer events --- src/kafka/consumer/context.rs | 121 ++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 src/kafka/consumer/context.rs diff --git a/src/kafka/consumer/context.rs b/src/kafka/consumer/context.rs new file mode 100644 index 0000000..589ada9 --- /dev/null +++ b/src/kafka/consumer/context.rs @@ -0,0 +1,121 @@ +use rdkafka::{ + consumer::{BaseConsumer, ConsumerContext, Rebalance, StreamConsumer}, + error::KafkaResult, + ClientContext, TopicPartitionList, +}; +use tokio::sync::watch; +use tracing::{debug, error}; + +use crate::kafka::consumer::consumer_helper::convert_tpl_to_array_of_topic_partition; + +use super::model::TopicPartition; + +pub type TxRxContext = ( + watch::Sender>, + watch::Receiver>, +); + +pub type LoggingConsumer = StreamConsumer; + +#[napi(object)] +#[derive(Clone, Debug)] +pub struct KafkaEventPayload { + pub action: Option, + pub tpl: Vec, + pub error: Option, +} + +#[napi(object)] +#[derive(Clone, Debug)] +pub struct KafkaEvent { + pub name: String, + pub payload: KafkaEventPayload, +} + +pub struct KafkaCrabContext { + pub tx_rx_signal: TxRxContext, +} + +impl KafkaCrabContext { + pub fn new() -> Self { + let (tx, rx) = watch::channel(None); + KafkaCrabContext { + tx_rx_signal: (tx, rx), + } + } + + fn send_event(&self, event: KafkaEvent) { + self.tx_rx_signal.0.send(Some(event)).unwrap_or_else(|err| { + error!("Error while sending event: {:?}", err); + }); + } +} + +impl ClientContext for KafkaCrabContext {} + +impl ConsumerContext for KafkaCrabContext { + fn pre_rebalance(&self, consumer: &BaseConsumer, rebalance: &Rebalance) { + let event = KafkaEvent { + name: "rebalance".to_string(), + payload: convert_rebalance_to_kafka_payload(rebalance), + }; + + debug!( + "Pre rebalance {:?}, consumer closed: {} ", + rebalance, + consumer.closed() + ); + + self.send_event(event); + } + + fn post_rebalance(&self, consumer: &BaseConsumer, rebalance: &Rebalance) { + let event = KafkaEvent { + name: "post_rebalance".to_string(), + payload: convert_rebalance_to_kafka_payload(rebalance), + }; + + debug!( + "Post rebalance {:?}, consumer closed: {} ", + rebalance, + consumer.closed() + ); + + self.send_event(event); + } + + fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) { + let event = KafkaEvent { + name: "commit_callback".to_string(), + payload: KafkaEventPayload { + action: None, + tpl: convert_tpl_to_array_of_topic_partition(offsets), + error: None, + }, + }; + + debug!("Committing offsets: {:?}. Offset: {:?}", result, offsets); + + self.send_event(event); + } +} + +fn convert_rebalance_to_kafka_payload(rebalance: &Rebalance) -> KafkaEventPayload { + match rebalance { + Rebalance::Assign(partitions) => KafkaEventPayload { + action: Some("assign".to_string()), + tpl: convert_tpl_to_array_of_topic_partition(partitions), + error: None, + }, + Rebalance::Revoke(partitions) => KafkaEventPayload { + action: Some("revoke".to_string()), + tpl: convert_tpl_to_array_of_topic_partition(partitions), + error: None, + }, + Rebalance::Error(err) => KafkaEventPayload { + action: Some("error".to_string()), + tpl: vec![], + error: Some(err.to_string()), + }, + } +} From 91e67b8ec6b5728f454dcf611267a42d7bf3c27a Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 14:58:19 -0300 Subject: [PATCH 3/6] chore: remove some logs --- src/kafka/consumer/consumer_helper.rs | 10 +++++----- src/kafka/consumer/kafka_consumer.rs | 10 +++++----- src/kafka/kafka_admin.rs | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/kafka/consumer/consumer_helper.rs b/src/kafka/consumer/consumer_helper.rs index 6c10087..9576374 100644 --- a/src/kafka/consumer/consumer_helper.rs +++ b/src/kafka/consumer/consumer_helper.rs @@ -5,7 +5,7 @@ use rdkafka::{ consumer::{Consumer, StreamConsumer}, ClientConfig, Offset, TopicPartitionList, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; use crate::kafka::{consumer::context::LoggingConsumer, kafka_admin::KafkaAdmin}; @@ -90,7 +90,7 @@ pub fn create_stream_consumer( .set_log_level(RDKafkaLogLevel::Debug) .create_with_context(context)?; - info!("Consumer created. Group id: {:?}", group_id); + debug!("Consumer created. Group id: {:?}", group_id); Ok(consumer) } @@ -117,7 +117,7 @@ pub async fn try_create_topic( warn!("Fail to create topic {:?}", e); return Err(anyhow::Error::msg(format!("Fail to create topic: {:?}", e))); } - info!("Topic(s) created: {:?}", topics); + debug!("Topic(s) created: {:?}", topics); Ok(()) } @@ -134,7 +134,7 @@ pub fn set_offset_of_all_partitions( metadata.topics().iter().for_each(|meta_topic| { let mut tpl = TopicPartitionList::new(); meta_topic.partitions().iter().for_each(|meta_partition| { - info!("Adding partition: {:?}", meta_partition.id()); + debug!("Adding partition: {:?}", meta_partition.id()); tpl.add_partition(topic, meta_partition.id()); }); match tpl.set_all_offsets(offset) { @@ -181,7 +181,7 @@ pub fn assign_offset_or_use_metadata( let metadata = consumer.fetch_metadata(Some(topic), timeout)?; for meta_topic in metadata.topics() { for meta_partition in meta_topic.partitions() { - info!( + debug!( "Adding partition: {:?} with offset: {:?} for topic: {:?}", meta_partition.id(), offset, diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index 5c9b960..cfb183c 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -88,7 +88,7 @@ impl KafkaConsumer { } } _ = shutdown_signal.changed() => { - info!("Shutdown signal received and this will stop the consumer from receiving messages"); + debug!("Subscription to consumer events is stopped"); break; } } @@ -135,7 +135,7 @@ impl KafkaConsumer { topics.iter().for_each(|item| { if let Some(all_offsets) = item.all_offsets.clone() { - info!( + debug!( "Subscribing to topic: {}. Setting all partitions to offset: {:?}", &item.topic, &all_offsets ); @@ -148,7 +148,7 @@ impl KafkaConsumer { .map_err(|e| e.convert_to_napi()) .unwrap(); } else if let Some(partition_offset) = item.partition_offset.clone() { - info!( + debug!( "Subscribing to topic: {} with partition offsets: {:?}", &item.topic, &partition_offset ); @@ -224,7 +224,7 @@ impl KafkaConsumer { timeout: Option, ) -> Result<()> { let offset = convert_to_rdkafka_offset(&offset_model); - info!( + debug!( "Seeking to topic: {}, partition: {}, offset: {:?}", topic, partition, offset ); @@ -259,7 +259,7 @@ impl KafkaConsumer { .map(|message| Some(create_message(&message, message.payload().unwrap_or(&[])))) } _ = rx.changed() => { - info!("Shutdown signal received and this will stop the consumer from receiving messages"); + debug!("Shutdown signal received and this will stop the consumer from receiving messages"); Ok(None) } } diff --git a/src/kafka/kafka_admin.rs b/src/kafka/kafka_admin.rs index be349b0..84f05e9 100644 --- a/src/kafka/kafka_admin.rs +++ b/src/kafka/kafka_admin.rs @@ -8,7 +8,7 @@ use rdkafka::{ error::KafkaError, types::RDKafkaErrorCode, }; -use tracing::{debug, info, trace}; +use tracing::{debug, trace}; use std::{collections::HashMap, str::FromStr, time::Duration}; @@ -42,8 +42,8 @@ impl<'a> KafkaAdmin<'a> { let metadata = consumer.fetch_metadata(None, self.fetch_metadata_timeout)?; for broker in metadata.brokers() { - info!( - "Id: {} Host: {}:{} ", + debug!( + "Metadata borker id:{} Host: {}:{}", broker.id(), broker.host(), broker.port() @@ -87,7 +87,7 @@ impl<'a> KafkaAdmin<'a> { .await .map_err(anyhow::Error::new)?; - info!("Topic(s) {:?} was created successfully", topics); + debug!("Topic(s) {:?} was created successfully", topics); Ok(()) } } From 7d82f039e27597a81641b2c2d5bf15ceb4fc40c0 Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 15:23:44 -0300 Subject: [PATCH 4/6] chore: rename subscribe event to on_events --- js-binding.d.ts | 2 +- js-src/js-binding.d.ts | 2 +- src/kafka/consumer/kafka_consumer.rs | 5 +---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/js-binding.d.ts b/js-binding.d.ts index 947bd2e..a8981c6 100644 --- a/js-binding.d.ts +++ b/js-binding.d.ts @@ -102,7 +102,7 @@ export interface ProducerConfiguration { configuration?: Record } export declare class KafkaConsumer { - subscribeToConsumerEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void + onEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void subscribe(topicConfigs: string | Array): Promise pause(): void resume(): void diff --git a/js-src/js-binding.d.ts b/js-src/js-binding.d.ts index 947bd2e..a8981c6 100644 --- a/js-src/js-binding.d.ts +++ b/js-src/js-binding.d.ts @@ -102,7 +102,7 @@ export interface ProducerConfiguration { configuration?: Record } export declare class KafkaConsumer { - subscribeToConsumerEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void + onEvents(callback: (error: Error | undefined, event: KafkaEvent) => void): void subscribe(topicConfigs: string | Array): Promise pause(): void resume(): void diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index cfb183c..5e696c5 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -72,10 +72,7 @@ impl KafkaConsumer { } #[napi(ts_args_type = "callback: (error: Error | undefined, event: KafkaEvent) => void")] - pub fn subscribe_to_consumer_events( - &self, - callback: ThreadsafeFunction, - ) -> Result<()> { + pub fn on_events(&self, callback: ThreadsafeFunction) -> Result<()> { let mut rx = self.stream_consumer.context().tx_rx_signal.1.clone(); let mut shutdown_signal = self.shutdown_signal.1.clone(); From 65deecdee0a7a84aa3fe1131a921f483f9510b9d Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 15:55:30 -0300 Subject: [PATCH 5/6] chore: Add example/events.mjs --- example/events.mjs | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 example/events.mjs diff --git a/example/events.mjs b/example/events.mjs new file mode 100644 index 0000000..0bfbe6a --- /dev/null +++ b/example/events.mjs @@ -0,0 +1,52 @@ +import { KafkaClient } from '../index.js' + +const kafkaClient = new KafkaClient({ + brokers: 'localhost:29092', + clientId: 'abc', + logLevel: 'info', + brokerAddressFamily: 'v4', +}) + +const consumer = kafkaClient.createConsumer({ + topic, + groupId: 'xyz', +}) + +// If you want to consume events, you need call shutdownConsumer() to stop the consumer and release resources +consumer.onEvents((err, event) => { + console.log( + 'Event:', + event.name, + event + .payload + .tpl + .map(it => + `Topic: ${it.topic}, + ${ + it.partitionOffset.map(po => `partition: ${po.partition}, offset: ${po.offset.offset}`) + .join(',') + }` + ), + ) +}) + +consumer.subscribe('foo') + +const printMessage = async () => { + let shutdown = false + while (!shutdown) { + const msg = await consumer.recv() + if (msg) { + console.log('Message receive', msg.payload.toString()) + } else { + console.log('The consumer has been shutdown') + shutdown = true + } + } +} + +process.on('SIGINT', () => { + consumer.shutdownConsumer() +}) + +await printMessage() From 9554ba95ac9858be29403cbedddc343247985e60 Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Wed, 8 Jan 2025 16:08:22 -0300 Subject: [PATCH 6/6] 1.2.0 --- README.md | 2 +- example/events.mjs | 4 +--- package.json | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b9ef795..d82f994 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ A lightweight, flexible, and reliable Kafka client for JavaScript/TypeScript. It 1. [Installation](#installation) 2. [Quick Start](#quick-start) -3. [Consumer Examples](#consumer-examples) +3. [Consumer Examples](#basic-consumer-setup) 4. [Producer Examples](#producer-examples) 5. [Stream Processing](#stream-processing) 6. [Configuration](#configuration) diff --git a/example/events.mjs b/example/events.mjs index 0bfbe6a..3ec2f98 100644 --- a/example/events.mjs +++ b/example/events.mjs @@ -17,9 +17,7 @@ consumer.onEvents((err, event) => { console.log( 'Event:', event.name, - event - .payload - .tpl + event.payload.tpl .map(it => `Topic: ${it.topic}, ${ diff --git a/package.json b/package.json index a4ece8b..0685322 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kafka-crab-js", - "version": "1.1.2", + "version": "1.2.0", "main": "index.js", "types": "index.d.ts", "module": "commonjs",