Skip to content

Commit

Permalink
chore: add seek and commit method to KafkaStreamReadable
Browse files Browse the repository at this point in the history
  • Loading branch information
inaiat committed Dec 9, 2024
1 parent 551a6ee commit c9b68df
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2021"
name = "kafka-crab-js"
version = "0.0.0"
version = "1.0.0"

[lib]
crate-type = ["cdylib"]
Expand Down
4 changes: 2 additions & 2 deletions example/stream-sample.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { fakerPT_BR } from '@faker-js/faker'
import { KafkaClient } from '../index.js'
import { KafkaClient, PartitionPosition } from '../index.js'

const kafkaClient = new KafkaClient({
brokers: 'localhost:29092',
Expand Down Expand Up @@ -39,7 +39,7 @@ async function startConsumer(topic) {
enableAutoCommit: true,
})

await kafkaStream.subscribe(topic)
await kafkaStream.subscribe([{ topic: 'foo', allOffsets: { position: PartitionPosition.Beginning } }])

let counter = 0
console.log('Starting consumer')
Expand Down
11 changes: 10 additions & 1 deletion js-src/kafka-stream-readable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Readable } from 'stream'

import { KafkaConsumer, TopicPartitionConfig } from './js-binding'
import { CommitMode } from '../js-binding'
import { KafkaConsumer, OffsetModel, TopicPartitionConfig } from './js-binding'

/**
* KafkaStreamReadable class
Expand Down Expand Up @@ -28,6 +29,14 @@ export class KafkaStreamReadable extends Readable {
await this.kafkaConsumer.subscribe(topics)
}

seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined) {
this.kafkaConsumer.seek(topic, partition, offsetModel, timeout)
}

commit(topic: string, partition: number, offset: number, commit: CommitMode) {
this.kafkaConsumer.commit(topic, partition, offset, commit)
}

/**
* Unsubscribe from topics
*/
Expand Down
5 changes: 4 additions & 1 deletion kafka-stream-readable.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Readable } from 'stream';
import { KafkaConsumer, TopicPartitionConfig } from './js-binding';
import { CommitMode } from '../js-binding';
import { KafkaConsumer, OffsetModel, TopicPartitionConfig } from './js-binding';
/**
* KafkaStreamReadable class
* @extends Readable
Expand All @@ -14,6 +15,8 @@ export declare class KafkaStreamReadable extends Readable {
* Subscribes to topics
*/
subscribe(topics: string | Array<TopicPartitionConfig>): Promise<void>;
seek(topic: string, partition: number, offsetModel: OffsetModel, timeout?: number | undefined): void;
commit(topic: string, partition: number, offset: number, commit: CommitMode): void;
/**
* Unsubscribe from topics
*/
Expand Down
6 changes: 6 additions & 0 deletions kafka-stream-readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class KafkaStreamReadable extends stream_1.Readable {
}
await this.kafkaConsumer.subscribe(topics);
}
seek(topic, partition, offsetModel, timeout) {
this.kafkaConsumer.seek(topic, partition, offsetModel, timeout);
}
commit(topic, partition, offset, commit) {
this.kafkaConsumer.commit(topic, partition, offset, commit);
}
/**
* Unsubscribe from topics
*/
Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-crab-js",
"version": "1.0.3",
"version": "1.0.4",
"main": "index.js",
"types": "index.d.ts",
"module": "commonjs",
Expand Down Expand Up @@ -40,13 +40,13 @@
"url": "https://github.com/inaiat/kafka-crab-js.git"
},
"devDependencies": {
"@faker-js/faker": "9.2.0",
"@faker-js/faker": "9.3.0",
"@napi-rs/cli": "^2.18.4",
"@types/node": "22.9.0",
"@types/node": "22.10.1",
"ava": "6.2.0",
"dprint": "^0.47.5",
"nanoid": "5.0.8",
"typescript": "5.6.3",
"dprint": "^0.47.6",
"nanoid": "5.0.9",
"typescript": "5.7.2",
"copyfiles": "^2.4.1",
"@types/copyfiles": "^2.4.4",
"rimraf": "^6.0.1"
Expand Down
131 changes: 70 additions & 61 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c9b68df

Please sign in to comment.