Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.7.0 #17

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ node_modules
lib/
package-lock.json
# yarn.lock
_mything.ts
_my*
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"format": "prettier --write \"src/**/*.ts\"",
"dev:my": "nodemon -x ts-node src/_mything.ts | pino-pretty",
"dev:run-all": "nodemon -x ts-node src/run-all.ts | pino-pretty",
"dev:channel:1": "nodemon -x ts-node src/channel-example.ts | pino-pretty",
"dev:readme": "nodemon -x ts-node src/common-readme-example.ts | pino-pretty",
"dev:map:1": "nodemon -x ts-node src/map-insert-get-example.ts | pino-pretty",
"dev:map:2": "nodemon -x ts-node src/map-update-if-example.ts | pino-pretty",
Expand Down
70 changes: 70 additions & 0 deletions examples/src/channel-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Hamok, HamokChannelEventMap, setHamokLogLevel } from 'hamok';
import * as pino from 'pino';
import { HamokMessageHub } from './utils/HamokMessageHub';

const logger = pino.pino({
name: 'app-channel-example',
level: 'debug',
});

interface ChannelEvents extends HamokChannelEventMap {
'foo': (value: unknown) => void;
'bar': (value: unknown) => Promise<number>;
}

export async function run() {
const server_1 = new Hamok();
const server_2 = new Hamok();
const server_3 = new Hamok();
const messageHub = new HamokMessageHub();

messageHub.add(server_1, server_2, server_3);

await Promise.all([
server_1.join(),
server_2.join(),
server_3.join(),
]);

const channel_1 = server_1.createChannel<ChannelEvents>({
channelId: 'my-channel',
});
const channel_2 = server_2.createChannel<ChannelEvents>({
channelId: 'my-channel',
});
const channel_3 = server_3.createChannel<ChannelEvents>({
channelId: 'my-channel',
});

await channel_1.subscribe('foo', (value) => {
logger.info('Server_1 received foo event with value %o', value);
});
await channel_1.subscribe('bar', async (value) => {
logger.info('Server_1 received bar event with value %o', value);
return 42;
});
await channel_1.subscribe('bar', async (value) => {
logger.info('Server_1 received bar event with value %o', value);
return 43;
});
await channel_3.subscribe('foo', (value) => {
logger.info('Server_3 received foo event with value %o', value);
});


const responses = await channel_2.request('bar', 'sadasd');
logger.info('Server_2 received responses %o', responses);

channel_1.notify('foo', 'bar');

server_1.close();
server_2.close();
server_3.close();
}

if (require.main === module) {
logger.info('Running from module file');
setHamokLogLevel('info');
run();
}

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hamok",
"version": "2.6.0",
"version": "2.7.0",
"description": "Lightweight Distributed Object Storage on RAFT consensus algorithm",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
47 changes: 46 additions & 1 deletion src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { EndpointStatesNotification } from './messages/messagetypes/EndpointNoti
import { JoinNotification } from './messages/messagetypes/JoinNotification';
import { RemoteMap } from './collections/RemoteMap';
import { HamokRemoteMap } from './collections/HamokRemoteMap';
import { HamokChannel, HamokChannelEventMap } from './collections/HamokChannel';

const logger = createLogger('Hamok');

Expand Down Expand Up @@ -234,6 +235,14 @@ export type HamokEmitterBuilderConfig<T extends HamokEmitterEventMap> = Partial<

}

export type HamokChannelConfig = Partial<HamokConnectionBuilderBaseConfig> & {

/**
* The unique identifier for the channel.
*/
channelId: string,
}

export type HamokFetchRemotePeersResponse = {
remotePeers: string[],
minNumberOfLogs?: number,
Expand Down Expand Up @@ -279,7 +288,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
public readonly config: HamokObjectConfig<AppData>;
public readonly raft: RaftEngine;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public readonly storages = new Map<string, HamokRecord<any> | HamokMap<any, any> | HamokQueue<any> | HamokRemoteMap<any, any> | HamokEmitter<any>>();
public readonly storages = new Map<string, HamokRecord<any> | HamokMap<any, any> | HamokQueue<any> | HamokRemoteMap<any, any> | HamokEmitter<any> | HamokChannel<any>>();

private _closed = false;
private _run = false;
Expand Down Expand Up @@ -579,6 +588,42 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
});
}

public createChannel<T extends HamokChannelEventMap = HamokChannelEventMap>(options: HamokChannelConfig): HamokChannel<T> {
if (this._closed) throw new Error('Cannot create channel on a closed Hamok instance');
if (this.storages.has(options.channelId)) throw new Error(`Storage with id ${options.channelId} already exists`);

const connection = this._createStorageConnection(
{
...options,
keyCodec: createStrToUint8ArrayCodec(),
valueCodec: createStrToUint8ArrayCodec(),
storageId: options.channelId,
},
);

const storage = new HamokChannel<T>(
connection,
);

storage.connection.once('close', () => {
this.storages.delete(storage.id);
});

this.storages.set(storage.id, storage);

return storage;
}

public getOrCreateChannel<T extends HamokChannelEventMap = HamokChannelEventMap>(options: HamokChannelConfig, callback?: (alreadyExisted: boolean) => void): HamokChannel<T> {
const storage = this.storages.get(options.channelId) as HamokChannel<T>;

if (!storage) return this.createChannel(options);

callback?.(true);

return storage;
}

public createMap<K, V>(options: HamokMapBuilderConfig<K, V>): HamokMap<K, V> {
if (this._closed) throw new Error('Cannot create map on a closed Hamok instance');
if (this.storages.has(options.mapId)) throw new Error(`Map with id ${options.mapId} already exists`);
Expand Down
10 changes: 10 additions & 0 deletions src/HamokSnapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,13 @@ export type HamokEmitterSnapshot = {
}[];
}[],
}

export type HamokChannelSnapshot = {
emitterId: string;
subscriptions: {
event: string;
subscribers: {
peerId: string, metaData: Record<string, unknown> | null
}[];
}[],
}
Loading
Loading