Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added basis for kafka consumer and producer with proto buffers #51

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
45 changes: 42 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions packages/nodejs-kafka-client-lib/generate-proto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const { execSync } = require('child_process');
const path = require('path');

const protoDir = path.resolve(__dirname, 'src/protobuff');
const protoFile = path.resolve(protoDir, 'messages.proto');
const outDir = path.resolve(protoDir);

const isWin = process.platform === 'win32';
const protocGenTsPath = path.resolve(__dirname, 'node_modules/.bin/protoc-gen-ts');

const command = `npx protoc --ts_out=service=true:${outDir} --proto_path=${protoDir} ${protoFile}`;

console.log('Running command:', command);

try {
execSync(command, { stdio: 'inherit' });
Fixed Show fixed Hide fixed
} catch (error) {
console.error('Error executing command:', error);
process.exit(1);
}
12 changes: 8 additions & 4 deletions packages/nodejs-kafka-client-lib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"scripts": {
"start": "npm run service",
"service": "tsc && node dist/index.js --config ../../.env",
"build": "tsc",
"build": "npm run generate:proto && tsc",
"watch": "tsc -w",
"clean": "npm run clean:npm; npm run clean:dist",
"clean:dist": "rm -rf dist",
Expand All @@ -34,20 +34,24 @@
"test:coverage-check": "jest --coverage --testMatch **/test/unit/**/*.test.ts",
"test:integration": "jest --passWithNoTests --testMatch **/test/integration/**/*.test.ts",
"dep:check": "ncu -e 2",
"dep:update": "ncu -u"
"dep:update": "ncu -u",
"generate:proto": "node generate-proto.ts"
},
"dependencies": {
"@mojaloop/logging-bc-public-types-lib": "~0.5.4",
"@mojaloop/platform-shared-lib-messaging-types-lib": "~0.6.2",
"google-protobuf": "^3.21.4",
"node-rdkafka": "~3.0.1"
},
"devDependencies": {
"@types/google-protobuf": "^3.15.12",
"eslint": "^8.57.0",
"jest": "^29.7.0",
"npm-check-updates": "^16.14.18",
"protoc-gen-ts": "^0.8.7",
"ts-jest": "^29.1.2",
"typescript": "^4.6.4",
"tslib": "^2.6.2"
"tslib": "^2.6.2",
"typescript": "^4.6.4"
},
"engines": {
"node": ">=20.10.0"
Expand Down
79 changes: 41 additions & 38 deletions packages/nodejs-kafka-client-lib/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributors
--------------
This is the official list (alphabetical ordering) of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.

* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>

* Crosslake
- Pedro Sousa Barreto <pedrob@crosslaketech.com>

--------------
******/

"use strict";

export * from "./rdkafka_json_consumer";
export * from "./rdkafka_json_producer";

export * from "./raw/raw_types";
export * from "./raw/rdkafka_raw_consumer";
export * from "./raw/rdkafka_raw_producer";
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributors
--------------
This is the official list (alphabetical ordering) of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.

* Gates Foundation
- Name Surname <name.surname@gatesfoundation.com>

* Crosslake
- Pedro Sousa Barreto <pedrob@crosslaketech.com>

--------------
******/

"use strict";

export * from "./json/rdkafka_json_consumer";
export * from "./json/rdkafka_json_producer";

export * from "./protobuff/rdkafka_proto_buff_consumer";
export * from "./protobuff/rdkafka_proto_buff_producer";

export * from "./raw/raw_types";
export * from "./raw/rdkafka_raw_consumer";
export * from "./raw/rdkafka_raw_producer";
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import {
MLKafkaRawConsumer,
MLKafkaRawConsumerOutputType,
MLKafkaRawConsumerOptions
} from "./raw/rdkafka_raw_consumer";
import {IRawAuthenticationOptions, IRawMessage} from "./raw/raw_types";
} from "../raw/rdkafka_raw_consumer";
import {IRawAuthenticationOptions, IRawMessage} from "../raw/raw_types";


type MLKafkaJsonConsumerEvents = "rebalance";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import { ILogger } from "@mojaloop/logging-bc-public-types-lib";
import { IMessage, IMessageProducer } from "@mojaloop/platform-shared-lib-messaging-types-lib";
import {MLKafkaRawProducer, MLKafkaRawProducerOptions} from "./raw/rdkafka_raw_producer";
import {IRawMessage} from "./raw/raw_types";
import {MLKafkaRawProducer, MLKafkaRawProducerOptions} from "../raw/rdkafka_raw_producer";
import {IRawMessage} from "../raw/raw_types";

export type MLKafkaJsonProducerOptions = MLKafkaRawProducerOptions;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
syntax = "proto3";

message Envelope {
string type = 1;
bytes event = 2;
}
96 changes: 96 additions & 0 deletions packages/nodejs-kafka-client-lib/src/protobuff/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Generated by the protoc-gen-ts. DO NOT EDIT!
* compiler version: 3.20.3
* source: messages.proto
* git: https://github.com/thesayyn/protoc-gen-ts */
import * as pb_1 from "google-protobuf";
export class Envelope extends pb_1.Message {
#one_of_decls: number[][] = [];
constructor(data?: any[] | {
type?: string;
event?: Uint8Array;
}) {
super();
pb_1.Message.initialize(this, Array.isArray(data) ? data : [], 0, -1, [], this.#one_of_decls);
if (!Array.isArray(data) && typeof data == "object") {
if ("type" in data && data.type != undefined) {
this.type = data.type;
}
if ("event" in data && data.event != undefined) {
this.event = data.event;
}
}
}
get type() {
return pb_1.Message.getFieldWithDefault(this, 1, "") as string;
}
set type(value: string) {
pb_1.Message.setField(this, 1, value);
}
get event() {
return pb_1.Message.getFieldWithDefault(this, 2, new Uint8Array(0)) as Uint8Array;
}
set event(value: Uint8Array) {
pb_1.Message.setField(this, 2, value);
}
static fromObject(data: {
type?: string;
event?: Uint8Array;
}): Envelope {
const message = new Envelope({});
if (data.type != null) {
message.type = data.type;
}
if (data.event != null) {
message.event = data.event;
}
return message;
}
toObject() {
const data: {
type?: string;
event?: Uint8Array;
} = {};
if (this.type != null) {
data.type = this.type;
}
if (this.event != null) {
data.event = this.event;
}
return data;
}
serialize(): Uint8Array;
serialize(w: pb_1.BinaryWriter): void;
serialize(w?: pb_1.BinaryWriter): Uint8Array | void {
const writer = w || new pb_1.BinaryWriter();
if (this.type.length)
writer.writeString(1, this.type);
if (this.event.length)
writer.writeBytes(2, this.event);
if (!w)
return writer.getResultBuffer();
}
static deserialize(bytes: Uint8Array | pb_1.BinaryReader): Envelope {
const reader = bytes instanceof pb_1.BinaryReader ? bytes : new pb_1.BinaryReader(bytes), message = new Envelope();
while (reader.nextField()) {
if (reader.isEndGroup())
break;
switch (reader.getFieldNumber()) {
case 1:
message.type = reader.readString();
break;
case 2:
message.event = reader.readBytes();
break;
default: reader.skipField();
}
}
return message;
}
serializeBinary(): Uint8Array {
return this.serialize();
}
static deserializeBinary(bytes: Uint8Array): Envelope {
return Envelope.deserialize(bytes);
}
}
Loading