Skip to content

Commit

Permalink
Merge pull request #48 from ainblockchain/feature/csh/subscribe
Browse files Browse the repository at this point in the history
Feature/csh/subscribe
  • Loading branch information
cshcomcom authored Dec 14, 2021
2 parents e5fa0dd + 1bdde6c commit 3dbf112
Show file tree
Hide file tree
Showing 12 changed files with 2,070 additions and 2,940 deletions.
27 changes: 27 additions & 0 deletions __tests__/event_manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import Ain from '../src/ain';

const { test_event_handler_node } = require('./test_data');
const delayMs = (time) => new Promise(resolve => setTimeout(resolve, time));

jest.setTimeout(60000);

describe('Event Handler', function() {
let ain = new Ain(test_event_handler_node);

beforeAll(async () => {
await ain.em.connect();
});

afterAll(() => {
ain.em.disconnect();
});

it('Subscribe to BLOCK_FINALIZED', async () => {
const callback = jest.fn();
ain.em.subscribe('BLOCK_FINALIZED', {}, (data) => {
callback(data);
});
await delayMs(10000);
expect(callback).toBeCalled();
});
});
3 changes: 2 additions & 1 deletion __tests__/test_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export const test_seed = '';
export const test_sk = '';

export const test_node_1 = '';
export const test_node_2 = '';
export const test_node_2 = '';
export const test_event_handler_node = '';
4,619 changes: 1,687 additions & 2,932 deletions package-lock.json

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
{
"name": "@ainblockchain/ain-js",
"version": "1.1.9",
"version": "1.2.0",
"description": "",
"main": "lib/ain.js",
"scripts": {
"build": "rm -rf ./lib && tsc",
"prepublishOnly": "npm run build",
"snapshot": "jest --updateSnapshot",
"test": "jest",
"test_he": "jest he.test.ts"
"test_he": "jest he.test.ts",
"test_em": "jest event_manager.test.ts"
},
"engines": {
"node": ">=12"
Expand All @@ -33,17 +34,17 @@
"lib/**/*"
],
"devDependencies": {
"@types/jest": "^24.9.1",
"jest": "^24.9.0",
"ts-jest": "^24.3.0",
"@types/jest": "^27.0.2",
"jest": "^27.3.1",
"ts-jest": "^27.0.7",
"typescript": "^3.9.10"
},
"dependencies": {
"@ainblockchain/ain-util": "^1.1.6",
"@types/node": "^12.7.3",
"@types/randombytes": "^2.0.0",
"@types/semver": "^7.3.4",
"axios": "^0.21.1",
"axios": "^0.21.4",
"bip39": "^3.0.2",
"browserify-cipher": "^1.0.1",
"eventemitter3": "^4.0.0",
Expand All @@ -55,6 +56,7 @@
"scryptsy": "^2.1.0",
"semver": "^6.3.0",
"url-parse": "^1.4.7",
"uuid": "^3.3.3"
"uuid": "^3.3.3",
"ws": "^8.2.3"
}
}
3 changes: 3 additions & 0 deletions src/ain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Database from './ain-db/db';
import Reference from './ain-db/ref';
import Wallet from './wallet';
import Network from './net';
import EventManager from './event-manager';
import HomomorphicEncryption from './he';

export default class Ain {
Expand All @@ -19,6 +20,7 @@ export default class Ain {
public net: Network;
public wallet: Wallet;
public he: HomomorphicEncryption;
public em: EventManager;

/**
* @param {string} providerUrl
Expand All @@ -31,6 +33,7 @@ export default class Ain {
this.wallet = new Wallet(this, this.chainId);
this.db = new Database(this, this.provider);
this.he = new HomomorphicEncryption();
this.em = new EventManager(this);
}

/**
Expand Down
71 changes: 71 additions & 0 deletions src/event-manager/event-callback-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import EventFilter from './event-filter';
import Subscription from './subscription';
import { BlockchainEventTypes, EventConfigType } from '../types';
import { PushId } from '../ain-db/push-id';

export default class EventCallbackManager {
private readonly _filters: Map<string, EventFilter>;
private readonly _filterIdToSubscription: Map<string, Subscription>;

constructor() {
this._filters = new Map<string, EventFilter>();
this._filterIdToSubscription = new Map<string, Subscription>();
}

buildFilterId() {
return PushId.generate();
}

buildSubscriptionId() {
return PushId.generate();
}

emitEvent(filterId: string, payload: any) {
const subscription = this._filterIdToSubscription.get(filterId);
if (!subscription) {
throw Error(`Can't find subscription by filter id (${filterId})`);
}
subscription.emit('data', payload);
}

emitError(filterId: string, errorMessage: string) {
const subscription = this._filterIdToSubscription.get(filterId);
if (!subscription) {
throw Error(`Can't find subscription by filter id (${filterId})`);
}
subscription.emit('error', errorMessage);
}

createFilter(eventTypeStr: string, config: EventConfigType): EventFilter {
const eventType = eventTypeStr as BlockchainEventTypes;
switch (eventType) {
case BlockchainEventTypes.BLOCK_FINALIZED:
const filterId = this.buildFilterId();
if (this._filters.get(filterId)) { // TODO(cshcomcom): Retry logic
throw Error(`Already existing filter id in filters (${filterId})`);
}
const filter = new EventFilter(filterId, eventType, config);
this._filters.set(filterId, filter);
return filter;
case BlockchainEventTypes.VALUE_CHANGED: // TODO(cshcomcom): Implement
throw Error(`Not implemented`);
case BlockchainEventTypes.TX_STATE_CHANGED: // TODO(cshcomcom): Implement
throw Error(`Not implemented`);
default:
throw Error(`Invalid event type (${eventType})`);
}
}

createSubscription(filter: EventFilter, dataCallback?: (data: any) => void,
errorCallback?: (error: any) => void) {
const subscription = new Subscription(filter);
if (dataCallback) {
subscription.on('data', dataCallback);
}
if (errorCallback) {
subscription.on('error', errorCallback);
}
this._filterIdToSubscription.set(filter.id, subscription);
return subscription;
}
}
129 changes: 129 additions & 0 deletions src/event-manager/event-channel-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import Ain from '../ain';
import { WebSocket } from 'ws';
import {
EventChannelMessageTypes,
EventChannelMessage,
BlockchainEventTypes,
EventChannelConnectionOption,
} from '../types';
import EventFilter from './event-filter';
import EventCallbackManager from './event-callback-manager';

export default class EventChannelClient {
private readonly _ain: Ain;
private readonly _eventCallbackManager: EventCallbackManager;
private _wsClient?: WebSocket;
private _endpointUrl?: string;
private _isConnected: boolean;

constructor(ain: Ain, eventCallbackManager: EventCallbackManager) {
this._ain = ain;
this._eventCallbackManager = eventCallbackManager;
this._wsClient = undefined;
this._endpointUrl = undefined;
this._isConnected = false;
}

get isConnected(): boolean {
return this._isConnected;
}

connect(connectionOption: EventChannelConnectionOption) {
return new Promise(async (resolve, reject) => {
const eventHandlerNetworkInfo = await this._ain.net.getEventHandlerNetworkInfo();
const url = eventHandlerNetworkInfo.url;
if (!url) {
reject(new Error(`Can't get url from eventHandlerNetworkInfo ` +
`(${JSON.stringify(eventHandlerNetworkInfo, null, 2)}`));
}
this._endpointUrl = url;
this._wsClient = new WebSocket(url, [], { handshakeTimeout: connectionOption.handshakeTimeout || 30000 });
this._wsClient.on('message', (message) => {
this.handleMessage(message);
});
this._wsClient.on('error', (err) => {
reject(err);
});
this._wsClient.on('open', () => {
this._isConnected = true;
resolve();
});
// TODO(cshcomcom): Handle close connection (w/ ping-pong)
})
}

disconnect() {
this._isConnected = false;
this._wsClient.close();
}

handleEmitEventMessage(messageData) {
const filterId = messageData.filter_id;
if (!filterId) {
throw Error(`Can't find filter ID from message data (${JSON.stringify(messageData, null, 2)})`);
}
const eventTypeStr = messageData.type;
const eventType = eventTypeStr as BlockchainEventTypes;
if (!Object.values(BlockchainEventTypes).includes(eventType)) {
throw Error(`Invalid event type (${eventTypeStr})`);
}
const payload = messageData.payload;
if (!payload) {
throw Error(`Can't find payload from message data (${JSON.stringify(messageData, null, 2)})`);
}
this._eventCallbackManager.emitEvent(filterId, payload);
}

handleEmitErrorMessage(messageData) {
const filterId = messageData.filter_id;
if (!filterId) {
throw Error(`Can't find filter ID from message data (${JSON.stringify(messageData, null, 2)})`);
}
// TODO(cshcomcom): error codes
const errorMessage = messageData.error_message;
if (!errorMessage) {
throw Error(`Can't find error message from message data (${JSON.stringify(messageData, null, 2)})`);
}
this._eventCallbackManager.emitError(filterId, errorMessage);
}

handleMessage(message: string) {
try {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
if (!messageType) {
throw Error(`Can't find type from message (${message})`);
}
const messageData = parsedMessage.data;
if (!messageData) {
throw Error(`Can't find data from message (${message})`);
}
switch (messageType) {
case EventChannelMessageTypes.EMIT_EVENT:
this.handleEmitEventMessage(messageData);
break;
case EventChannelMessageTypes.EMIT_ERROR:
this.handleEmitErrorMessage(messageData);
break;
default:
break;
}
} catch (err) {
console.error(err);
// TODO(cshcomcom): Error handling
}
}

buildMessage(messageType: EventChannelMessageTypes, data: any): EventChannelMessage {
return {
type: messageType,
data: data,
};
}

registerFilter(filter: EventFilter) {
const filterObj = filter.toObject();
const registerMessage = this.buildMessage(EventChannelMessageTypes.REGISTER_FILTER, filterObj);
this._wsClient.send(JSON.stringify(registerMessage));
}
}
21 changes: 21 additions & 0 deletions src/event-manager/event-filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { BlockchainEventTypes, EventConfigType } from '../types';

export default class EventFilter {
public readonly id: string;
public readonly type: BlockchainEventTypes;
public readonly config: EventConfigType;

constructor(id: string, type: BlockchainEventTypes, config: EventConfigType) {
this.id = id;
this.type = type;
this.config = config;
}

toObject() {
return {
id: this.id,
type: this.type,
config: this.config,
};
}
}
58 changes: 58 additions & 0 deletions src/event-manager/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import Ain from '../ain';
import {
BlockFinalizedEventConfig,
ErrorFirstCallback,
EventChannelConnectionOption,
EventConfigType,
TxStateChangedEventConfig,
ValueChangedEventConfig,
} from '../types';
import EventChannelClient from './event-channel-client';
import EventCallbackManager from './event-callback-manager';
import Subscription from './subscription';

export default class EventManager {
private _ain: Ain;
private readonly _eventCallbackManager: EventCallbackManager;
private readonly _eventChannelClient: EventChannelClient;

constructor(ain: Ain) {
this._ain = ain;
this._eventCallbackManager = new EventCallbackManager();
this._eventChannelClient = new EventChannelClient(ain, this._eventCallbackManager);
}

async connect(connectionOption?: EventChannelConnectionOption) {
await this._eventChannelClient.connect(connectionOption || {});
}

disconnect() {
this._eventChannelClient.disconnect();
}

subscribe(
eventType: 'BLOCK_FINALIZED', config: BlockFinalizedEventConfig,
dataCallback?: (data: any) => void, errorCallback?: (error: any) => void): string;
subscribe(
eventType: 'VALUE_CHANGED', config: ValueChangedEventConfig,
dataCallback?: (data: any) => void, errorCallback?: (error: any) => void): string;
subscribe(
eventType: 'TX_STATE_CHANGED', config: TxStateChangedEventConfig,
dataCallback?: (data: any) => void, errorCallback?: (error: any) => void): string;
subscribe(
eventTypeStr: string, config: EventConfigType,
dataCallback?: (data: any) => void, errorCallback?: (error: any) => void): string {
if (!this._eventChannelClient.isConnected) {
throw Error(`Event channel is not connected! You must call ain.eh.connect() before using subscribe()`);
}
const filter = this._eventCallbackManager.createFilter(eventTypeStr, config);
this._eventChannelClient.registerFilter(filter);
this._eventCallbackManager.createSubscription(filter, dataCallback, errorCallback);
return filter.id;
}

unsubscribe(filterId: string, callback: ErrorFirstCallback<boolean>) {
// TODO(cshcomcom): Implement logic
callback(new Error(`Not implemented!`));
}
}
Loading

0 comments on commit 3dbf112

Please sign in to comment.