From 12b15fa75453f404b2fe8c4352053ec13c5bfc1c Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Sun, 5 Jan 2025 11:52:01 -0300 Subject: [PATCH 1/5] feat: add shutdown signal --- .github/workflows/CI.yml | 8 ++--- Cargo.toml | 4 +-- js-binding.d.ts | 3 +- js-src/js-binding.d.ts | 3 +- js-src/kafka-client.ts | 2 +- js-src/kafka-stream-readable.ts | 2 +- kafka-client.js | 2 +- kafka-stream-readable.d.ts | 2 +- package.json | 4 +-- src/kafka/consumer/kafka_consumer.rs | 52 +++++++++++++++++++++------- 10 files changed, 56 insertions(+), 26 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 7b68137..081fee9 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -76,7 +76,7 @@ jobs: - uses: goto-bus-stop/setup-zig@v2 if: ${{ matrix.settings.target == 'armv7-unknown-linux-gnueabihf' }} with: - version: 0.10.1 + version: 0.13.0 - name: Setup toolchain run: ${{ matrix.settings.setup }} if: ${{ matrix.settings.setup }} @@ -106,7 +106,7 @@ jobs: if: ${{ !matrix.settings.docker }} shell: bash - name: Upload artifact - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: bindings-${{ matrix.settings.target }} path: ${{ env.APP_NAME }}.*.node @@ -169,7 +169,7 @@ jobs: with: run_install: true - name: Download artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: bindings-x86_64-unknown-linux-gnu path: . @@ -196,7 +196,7 @@ jobs: with: run_install: true - name: Download all artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: path: artifacts - name: Move artifacts diff --git a/Cargo.toml b/Cargo.toml index da59380..642da18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ napi = { version = "2.16", default-features = false, features = [ ] } napi-derive = "2.16" -anyhow = { version = "1.0", features = ["backtrace"] } +anyhow = { version = "1.0.95", features = ["backtrace"] } rdkafka = { version = "0.37", features = [ "libz-static", @@ -30,7 +30,7 @@ tracing-subscriber = { version = "0.3", features = ["json"] } nanoid = "0.4.0" [build-dependencies] -napi-build = "2.1" +napi-build = "2.1.4" [profile.release] lto = true diff --git a/js-binding.d.ts b/js-binding.d.ts index a03be84..b1e8926 100644 --- a/js-binding.d.ts +++ b/js-binding.d.ts @@ -92,8 +92,9 @@ export declare class KafkaConsumer { pause(): void resume(): void unsubscribe(): void + shutdownConsumer(): Promise seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void - recv(): Promise + recv(): Promise commit(topic: string, partition: number, offset: number, commit: CommitMode): void } export declare class KafkaClientConfig { diff --git a/js-src/js-binding.d.ts b/js-src/js-binding.d.ts index a03be84..b1e8926 100644 --- a/js-src/js-binding.d.ts +++ b/js-src/js-binding.d.ts @@ -92,8 +92,9 @@ export declare class KafkaConsumer { pause(): void resume(): void unsubscribe(): void + shutdownConsumer(): Promise seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined | null): void - recv(): Promise + recv(): Promise commit(topic: string, partition: number, offset: number, commit: CommitMode): void } export declare class KafkaClientConfig { diff --git a/js-src/kafka-client.ts b/js-src/kafka-client.ts index 3f0fb98..313243d 100644 --- a/js-src/kafka-client.ts +++ b/js-src/kafka-client.ts @@ -12,7 +12,7 @@ export class KafkaClient { * @throws {Error} If the configuration is invalid */ constructor(private readonly kafkaConfiguration: KafkaConfiguration) { - this.kafkaClientConfig = new KafkaClientConfig(kafkaConfiguration) + this.kafkaClientConfig = new KafkaClientConfig(this.kafkaConfiguration) } /** diff --git a/js-src/kafka-stream-readable.ts b/js-src/kafka-stream-readable.ts index 6fe8cc4..2966c2f 100644 --- a/js-src/kafka-stream-readable.ts +++ b/js-src/kafka-stream-readable.ts @@ -1,6 +1,6 @@ import { Readable } from 'stream' -import { CommitMode } from '../js-binding' +import { CommitMode } from './js-binding' import { KafkaConsumer, OffsetModel, TopicPartitionConfig } from './js-binding' /** diff --git a/kafka-client.js b/kafka-client.js index 30d8f5a..70bbc25 100644 --- a/kafka-client.js +++ b/kafka-client.js @@ -13,7 +13,7 @@ class KafkaClient { */ constructor(kafkaConfiguration) { this.kafkaConfiguration = kafkaConfiguration; - this.kafkaClientConfig = new js_binding_js_1.KafkaClientConfig(kafkaConfiguration); + this.kafkaClientConfig = new js_binding_js_1.KafkaClientConfig(this.kafkaConfiguration); } /** * Creates a KafkaProducer instance diff --git a/kafka-stream-readable.d.ts b/kafka-stream-readable.d.ts index c54b9e3..d595964 100644 --- a/kafka-stream-readable.d.ts +++ b/kafka-stream-readable.d.ts @@ -1,5 +1,5 @@ import { Readable } from 'stream'; -import { CommitMode } from '../js-binding'; +import { CommitMode } from './js-binding'; import { KafkaConsumer, OffsetModel, TopicPartitionConfig } from './js-binding'; /** * KafkaStreamReadable class diff --git a/package.json b/package.json index 403fc31..a77b341 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "@napi-rs/cli": "^2.18.4", "@types/node": "22.10.1", "ava": "6.2.0", - "dprint": "^0.47.6", + "dprint": "^0.48.0", "nanoid": "5.0.9", "typescript": "5.7.2", "copyfiles": "^2.4.1", @@ -65,5 +65,5 @@ "version": "napi version", "fmt": "dprint fmt" }, - "packageManager": "pnpm@9.14.2" + "packageManager": "pnpm@9.15.1" } diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index ef98569..0d8ccb1 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -1,4 +1,5 @@ use std::time::Duration; +use tokio::sync::watch::{self}; use napi::{Either, Result}; @@ -27,13 +28,18 @@ use super::{ }, }; +use tokio::select; + pub const DEFAULT_SEEK_TIMEOUT: i64 = 1500; +type ShutdownSignal = (watch::Sender<()>, watch::Receiver<()>); + #[napi] pub struct KafkaConsumer { client_config: ClientConfig, stream_consumer: StreamConsumer, fecth_metadata_timeout: Duration, + shutdown_signal: ShutdownSignal, } #[napi] @@ -57,6 +63,7 @@ impl KafkaConsumer { .fecth_metadata_timeout .unwrap_or(DEFAULT_FECTH_METADATA_TIMEOUT.as_millis() as i64) as u64, ), + shutdown_signal: watch::channel(()), }) } @@ -171,10 +178,25 @@ impl KafkaConsumer { #[napi] pub fn unsubscribe(&self) -> Result<()> { + info!("Unsubscribing from topics"); self.stream_consumer.unsubscribe(); Ok(()) } + #[napi] + pub async fn shutdown_consumer(&self) -> Result<()> { + info!("Shutting down consumer - this will stop the consumer from receiving messages"); + let tx = self.shutdown_signal.0.clone(); + tx.send(()).map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Error sending shutdown signal: {:?}", e), + ) + })?; + + Ok(()) + } + #[napi] pub fn seek( &self, @@ -207,18 +229,24 @@ impl KafkaConsumer { } #[napi] - pub async fn recv(&self) -> Result { - self - .stream_consumer - .recv() - .await - .map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Error while receiving from stream consumer: {:?}", e), - ) - }) - .map(|message| create_message(&message, message.payload().unwrap_or(&[]))) + pub async fn recv(&self) -> Result> { + let mut rx = self.shutdown_signal.1.clone(); + select! { + message = self.stream_consumer.recv() => { + message + .map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Error while receiving from stream consumer: {:?}", e), + ) + }) + .map(|message| Some(create_message(&message, message.payload().unwrap_or(&[])))) + } + _ = rx.changed() => { + info!("Shutdown signal received and this will stop the consumer from receiving messages"); + Ok(None) + } + } } #[napi] From 80378d6c34a8ba58e36c2750735440455eb5354c Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Sun, 5 Jan 2025 12:01:44 -0300 Subject: [PATCH 2/5] chore: automatically unsubscribe on shutdown consumer --- src/kafka/consumer/kafka_consumer.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index 0d8ccb1..fe4d7ec 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -185,7 +185,12 @@ impl KafkaConsumer { #[napi] pub async fn shutdown_consumer(&self) -> Result<()> { - info!("Shutting down consumer - this will stop the consumer from receiving messages"); + info!("Shutting down consumer - this will unsubscribe and stop the consumer from receiving messages"); + + // First unsubscribe from topics + self.stream_consumer.unsubscribe(); + + // Then send shutdown signal let tx = self.shutdown_signal.0.clone(); tx.send(()).map_err(|e| { napi::Error::new( From 0c0faca4ea898930285372cd74efc7f4da732165 Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Sun, 5 Jan 2025 12:03:32 -0300 Subject: [PATCH 3/5] chore: fix fmt --- src/kafka/consumer/kafka_consumer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kafka/consumer/kafka_consumer.rs b/src/kafka/consumer/kafka_consumer.rs index fe4d7ec..3509aa8 100644 --- a/src/kafka/consumer/kafka_consumer.rs +++ b/src/kafka/consumer/kafka_consumer.rs @@ -186,10 +186,10 @@ impl KafkaConsumer { #[napi] pub async fn shutdown_consumer(&self) -> Result<()> { info!("Shutting down consumer - this will unsubscribe and stop the consumer from receiving messages"); - + // First unsubscribe from topics self.stream_consumer.unsubscribe(); - + // Then send shutdown signal let tx = self.shutdown_signal.0.clone(); tx.send(()).map_err(|e| { From 1db72946a67a6e2dda4c8bf9f904951209cdc25c Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Sun, 5 Jan 2025 12:13:14 -0300 Subject: [PATCH 4/5] chore: update lock file --- package.json | 2 +- pnpm-lock.yaml | 82 +++++++++++++++++++++++++------------------------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/package.json b/package.json index a77b341..43e8ca8 100644 --- a/package.json +++ b/package.json @@ -65,5 +65,5 @@ "version": "napi version", "fmt": "dprint fmt" }, - "packageManager": "pnpm@9.15.1" + "packageManager": "pnpm@9.15.2" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ebc44f4..f7b9bb0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -27,8 +27,8 @@ importers: specifier: ^2.4.1 version: 2.4.1 dprint: - specifier: ^0.47.6 - version: 0.47.6 + specifier: ^0.48.0 + version: 0.48.0 nanoid: specifier: 5.0.9 version: 5.0.9 @@ -41,48 +41,48 @@ importers: packages: - '@dprint/darwin-arm64@0.47.6': - resolution: {integrity: sha512-DrtKVOH7Ue6QYsqsUfUwBlTkSZNF2j35xqyI6KimUT1ulgUPocLG53JC/Aej9KuSCPmt4M3J40xxPKRgIM4jPA==} + '@dprint/darwin-arm64@0.48.0': + resolution: {integrity: sha512-LJ+02WB1PDIUqobfwxBVMz0cUByXsZ6izFTC9tHR+BDt+qWfoZpCn5r/zpAVSkVlA5LzGHKLVNJrwKwaTnAiVA==} cpu: [arm64] os: [darwin] - '@dprint/darwin-x64@0.47.6': - resolution: {integrity: sha512-p16a4lMbAo4RngbNTAmtREnIRI/cOcZFy5wHPIzbCDnnHI+4UyHiAypTrpF8U8EYx1tw3hgih2MyAuupa9Gfag==} + '@dprint/darwin-x64@0.48.0': + resolution: {integrity: sha512-OxfLbitoNvFMVucauJ2DvEaJpnxyyhXWC2M56f2AX8lkZSsHrdMHtklqxHz3cBGVPpcCXjLPRC139ynwmqtjIA==} cpu: [x64] os: [darwin] - '@dprint/linux-arm64-glibc@0.47.6': - resolution: {integrity: sha512-WHphnk0oXpNzwJ9cjbddOL+hEZkXTvHxcA2pM1h1kWCBa5m+4qh0fg8YCktckMfHx1qdQuZYWRoT4l7yQbzWYA==} + '@dprint/linux-arm64-glibc@0.48.0': + resolution: {integrity: sha512-VMeyorjMXE9NrksmyOJ0zJRGxT7r7kDBBxshCxW+U1xgW+FqR8oE25RZaeDZZPDzUHapAly4ILZqjExLzAWVpw==} cpu: [arm64] os: [linux] - '@dprint/linux-arm64-musl@0.47.6': - resolution: {integrity: sha512-/2cSPudajg8J0U69ldNZkJx5QiKZNh+ohNVM9leWZ8v6GXN6sJDHX3T6hzS3ohIb03YOCiLOrJZDy9j3+fSgdQ==} + '@dprint/linux-arm64-musl@0.48.0': + resolution: {integrity: sha512-1BUHQKEngrZv8F6wq2SVxdokyeUoHFXjz0xbYGwctlFPzXAVNLpDy9FROXsfIKmxZ0NsRqEpatETLmubtvWtcA==} cpu: [arm64] os: [linux] - '@dprint/linux-riscv64-glibc@0.47.6': - resolution: {integrity: sha512-RMHJ3Zuzpls426upHlAveVwlGMi8oBLzhiCauyC/yWQl3CkGTAYdyhEpGnux0+RxacysfIL2bd8ourx4K0Sx3w==} + '@dprint/linux-riscv64-glibc@0.48.0': + resolution: {integrity: sha512-c8LktisPGbygyFf9wUg0trbAQDApawU017iPQXkZnGcV4QoCeGkFjnY8vltIKyy5DeP5gIp12KjlaT/wogMPkw==} cpu: [riscv64] os: [linux] - '@dprint/linux-x64-glibc@0.47.6': - resolution: {integrity: sha512-4zbVsx/a8lHkRyAjfW0PNlN5IMwOJfFapgXNYJowWNO7X3j3m1jYJWovjmdZls+d6pDeOHoanMWtq95wd7zTeQ==} + '@dprint/linux-x64-glibc@0.48.0': + resolution: {integrity: sha512-Am8rp4FqmkO4aFdmwxk+82g2csxPLTPIlNq0Fa9AZReU15yta3Pq0Pg4AXFq+KSso5L4WGmt09ciCitK5gmdOg==} cpu: [x64] os: [linux] - '@dprint/linux-x64-musl@0.47.6': - resolution: {integrity: sha512-0C13t4OVzomgQjvUyD5IrRyjLDhGuOtqMo00uJlwn3QHucfgOBqkjyQ5fq7T6+grBse0m14/EWblvSbYkZpyDw==} + '@dprint/linux-x64-musl@0.48.0': + resolution: {integrity: sha512-0nzrZXOvblM/H4GVffNJ7YZn/Y4F/i+DNDZRT1hQmKuTQurB7a2MBJ91OpooLIWJoSn4q40crwM1Po4xnNXrdg==} cpu: [x64] os: [linux] - '@dprint/win32-arm64@0.47.6': - resolution: {integrity: sha512-UOkeFMBdGIuGNwfkrJdVM9eNiRMdbZRRGVy0Cdo2AXn/FCDVqZ74KJkvYVcoUE27GCytHi4Sp1s4at7659WCOw==} + '@dprint/win32-arm64@0.48.0': + resolution: {integrity: sha512-bRcGLbhKEXmP7iXDir/vU6DqkA3XaMqBM6P2ACCJMHd+XKWsz3VJzZMn5hFWZ+oZpxUsS3Mg2RcgH5xVjaawgA==} cpu: [arm64] os: [win32] - '@dprint/win32-x64@0.47.6': - resolution: {integrity: sha512-i9xwXR8V8Jk/wU1gsYKx15eb0ypBRbRZFkHsnHfC0ZBimcfEOibGnGcfv+UCUcumXtnV46TnBqaJW7H70J1J+A==} + '@dprint/win32-x64@0.48.0': + resolution: {integrity: sha512-9JOKWWngo5vPBFxJgFogAS4rfFC2GaB9Yew6JZbRBUik7j5Num2muuw5p1tMYnl2NUBdS2W4EgsSLM3uUDyhBQ==} cpu: [x64] os: [win32] @@ -330,8 +330,8 @@ packages: resolution: {integrity: sha512-bwy0MGW55bG41VqxxypOsdSdGqLwXPI/focwgTYCFMbdUiBAxLg9CFzG08sz2aqzknwiX7Hkl0bQENjg8iLByw==} engines: {node: '>=8'} - dprint@0.47.6: - resolution: {integrity: sha512-vCQC+IMHVZbISA5pxEj+yshQbozmQoVFA4lzcLlqJ8rzIAH8U+1DKvesN/2Uv3Bqz6rMW6W4WY7pYJQljmiZ8w==} + dprint@0.48.0: + resolution: {integrity: sha512-dmCrYTiubcsQklTLUimlO+p8wWgMtZBjpPVsOGiw4kPX7Dn41vwyE1R4qA8Px4xHgQtcX7WP9mJujF4C8vISIw==} hasBin: true eastasianwidth@0.2.0: @@ -928,31 +928,31 @@ packages: snapshots: - '@dprint/darwin-arm64@0.47.6': + '@dprint/darwin-arm64@0.48.0': optional: true - '@dprint/darwin-x64@0.47.6': + '@dprint/darwin-x64@0.48.0': optional: true - '@dprint/linux-arm64-glibc@0.47.6': + '@dprint/linux-arm64-glibc@0.48.0': optional: true - '@dprint/linux-arm64-musl@0.47.6': + '@dprint/linux-arm64-musl@0.48.0': optional: true - '@dprint/linux-riscv64-glibc@0.47.6': + '@dprint/linux-riscv64-glibc@0.48.0': optional: true - '@dprint/linux-x64-glibc@0.47.6': + '@dprint/linux-x64-glibc@0.48.0': optional: true - '@dprint/linux-x64-musl@0.47.6': + '@dprint/linux-x64-musl@0.48.0': optional: true - '@dprint/win32-arm64@0.47.6': + '@dprint/win32-arm64@0.48.0': optional: true - '@dprint/win32-x64@0.47.6': + '@dprint/win32-x64@0.48.0': optional: true '@faker-js/faker@9.3.0': {} @@ -1238,17 +1238,17 @@ snapshots: detect-libc@2.0.3: {} - dprint@0.47.6: + dprint@0.48.0: optionalDependencies: - '@dprint/darwin-arm64': 0.47.6 - '@dprint/darwin-x64': 0.47.6 - '@dprint/linux-arm64-glibc': 0.47.6 - '@dprint/linux-arm64-musl': 0.47.6 - '@dprint/linux-riscv64-glibc': 0.47.6 - '@dprint/linux-x64-glibc': 0.47.6 - '@dprint/linux-x64-musl': 0.47.6 - '@dprint/win32-arm64': 0.47.6 - '@dprint/win32-x64': 0.47.6 + '@dprint/darwin-arm64': 0.48.0 + '@dprint/darwin-x64': 0.48.0 + '@dprint/linux-arm64-glibc': 0.48.0 + '@dprint/linux-arm64-musl': 0.48.0 + '@dprint/linux-riscv64-glibc': 0.48.0 + '@dprint/linux-x64-glibc': 0.48.0 + '@dprint/linux-x64-musl': 0.48.0 + '@dprint/win32-arm64': 0.48.0 + '@dprint/win32-x64': 0.48.0 eastasianwidth@0.2.0: {} From 914c7055ca6bb0c2697048ab163d99f239192f20 Mon Sep 17 00:00:00 2001 From: Inaiat Moraes Date: Sun, 5 Jan 2025 12:34:02 -0300 Subject: [PATCH 5/5] fix: CI --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 081fee9..1065afd 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -137,7 +137,7 @@ jobs: with: run_install: true - name: Download artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: bindings-${{ matrix.settings.target }} path: .