Skip to content

Commit

Permalink
Merge pull request #13 from one-piece-team1/feat-sockets
Browse files Browse the repository at this point in the history
Feat sockets
  • Loading branch information
libterty authored Mar 19, 2021
2 parents 41f2653 + c1045d1 commit 7b1f701
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ COPY ./package.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 7070 8080
EXPOSE 84 7070 8080
12 changes: 11 additions & 1 deletion config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const configs = {
// Server Setting
HOST: process.env.APPHOST || 'localhost',
PORT: process.env.APPPORT || 8080,
WSPORT: process.env.WSPORT || 84,

EVENT_STORE_SETTINGS: {
protocol: process.env.EVENTSTOREPROTOCOL || 'http',
Expand All @@ -70,8 +71,17 @@ const configs = {
},
poolOptions: {
min: process.env.EVENTSTOREPOOLOPTIONSMIN || 1,
max: process.env.EVENTSTOREPOOLOPTIONSMAX || 10,
max: process.env.EVENTSTOREPOOLOPTIONSMAX || 100,
},
bootstrapServers: process.env.KAFKA_BOOTSTRAP_SERVERS || 'localhost:9094',
secureProtocol: process.env.KAFKA_SECURITY_PROTOCOL || 'SASL_SSL',
saslMechanisms: process.env.KAFKA_SASL_MECHANISMS || 'PLAIN',
chat: {
groupId: process.env.KAFKA_CHAT_CONSUMER_GROUP || 'onepiece-topic-chat-groups',
},
topics: {
chatTopic: process.env.KAFKA_CHAT_TOPIC || 'onepiece-topic-chat',
}
},

MS_SETTINGS: [
Expand Down
38 changes: 34 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@
"grpc": "^1.24.4",
"ioredis": "^4.19.2",
"lodash": "^4.17.20",
"node-rdkafka": "^2.10.1",
"pg": "^8.5.1",
"reflect-metadata": "^0.1.13",
"request-promise": "^4.2.6",
"rimraf": "^3.0.2",
"rxjs": "^6.5.4",
"swagger-ui-express": "^4.1.4",
"typeorm": "^0.2.29"
"typeorm": "^0.2.29",
"ws": "^7.4.4"
},
"devDependencies": {
"@nestjs/cli": "^7.0.0",
Expand All @@ -78,6 +80,7 @@
"@types/proxy-addr": "^2.0.0",
"@types/request-promise": "^4.1.46",
"@types/supertest": "^2.0.8",
"@types/ws": "^7.4.0",
"@typescript-eslint/eslint-plugin": "3.9.1",
"@typescript-eslint/parser": "3.9.1",
"eslint": "7.7.0",
Expand Down
10 changes: 10 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ import {
} from '@nestjs/common';
import { GatewayModule } from './gateways/gateway.module';
import { RateMiddleware } from 'middlewares/rate-limit';
import { ChatSocketGateway } from './sockets/chat.gateway';
import { ChatSocketService } from './sockets/chat.service';
import { ChatConsumerService } from './consumers/chat.consumer';
import { ChatMessageRoutingService } from './handlers/chat.handler';

@Module({
imports: [GatewayModule],
providers: [
ChatSocketGateway,
ChatSocketService,
ChatConsumerService,
ChatMessageRoutingService,
],
})
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
Expand Down
54 changes: 54 additions & 0 deletions src/consumers/chat.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Injectable, Logger } from '@nestjs/common';
import Kafka from 'node-rdkafka';
import { ChatMessageRoutingService } from '../handlers/chat.handler';
import { config } from '../../config';

@Injectable()
export class ChatConsumerService {
private readonly logger: Logger = new Logger('ChatConsumerService');
private readonly consumer = new Kafka.KafkaConsumer(
{
'bootstrap.servers': config.EVENT_STORE_SETTINGS.bootstrapServers,
'group.id': config.EVENT_STORE_SETTINGS.chat.groupId,
'enable.auto.commit': true,
},
{
'auto.offset.reset': 'earliest',
},
);

constructor(
private readonly chatMessageRoutingService: ChatMessageRoutingService,
) {
this.init();
}

/**
* @description Init func
*/
init() {
this.consumer
.on('ready', () => {
this.consumer.subscribe([config.EVENT_STORE_SETTINGS.topics.chatTopic]);
setInterval(() => {
this.consumer.consume(config.EVENT_STORE_SETTINGS.poolOptions.max);
}, 1000);
})
.on('data', data => {
this.chatMessageRoutingService.register(data);
this.consumer.commit();
})
.on('event.error', err => {
this.logger.error(err.message, '', 'Event_Error');
})
.on('rebalance.error', err => {
this.logger.error(err.message, '', 'Reblanace_Error');
});

this.consumer.connect({}, (err, data) => {
if (err) {
this.logger.error(err.message, '', 'ConsumerConnectError');
}
});
}
}
2 changes: 1 addition & 1 deletion src/gateways/gateway.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { GatewayController } from './gateway.controller';
import { GatewayService } from './gateway.service';
import { MulterModule } from '@nestjs/platform-express';
// import { AuthService } from '../middlewares/auth.service';

@Module({
imports: [MulterModule.register()],
controllers: [GatewayController],
Expand Down
2 changes: 1 addition & 1 deletion src/gateways/gateway.service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { HttpException, HttpStatus, Injectable } from '@nestjs/common';
import { Request } from 'express';
import { config } from '../../config';
import { APIRequestFactory } from '../libs/request-factory';
import { ExceptionHandler } from '../libs/utils';
import * as IGateway from './interfaces';
import { config } from '../../config';

@Injectable()
export class GatewayService {
Expand Down
41 changes: 41 additions & 0 deletions src/handlers/chat.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {
Injectable,
InternalServerErrorException,
Logger,
} from '@nestjs/common';
import Kafka from 'node-rdkafka';
import { ChatSocketService } from '../sockets/chat.service';
import * as EChatRoom from '../sockets/enums';
import * as IChatRoom from '../sockets/interfaces';

@Injectable()
export class ChatMessageRoutingService {
private readonly logger: Logger = new Logger('ChatMessageRoutingService');

constructor(private readonly chatSocketService: ChatSocketService) {}

public register(kafkaMessage: Kafka.Message) {
if (!kafkaMessage)
throw new InternalServerErrorException('Non message is being proecssed');
const event: IChatRoom.IAggregateResponse<
EChatRoom.EChatRoomSocketEvent,
IChatRoom.IEventData
> = JSON.parse(kafkaMessage.value.toString());
return this.handler(event);
}

protected handler(
event: IChatRoom.IAggregateResponse<
EChatRoom.EChatRoomSocketEvent,
IChatRoom.IEventData
>,
) {
switch (event.type) {
case EChatRoom.EChatRoomSocketEvent.CREATECHATROOM:
return this.chatSocketService.sendNewChatRoom(
event.type,
event.data as IChatRoom.IChatRoomEntity,
);
}
}
}
30 changes: 30 additions & 0 deletions src/sockets/chat.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Injectable, Logger } from '@nestjs/common';
import * as Express from 'express';
import WebSocket from 'ws';
import * as url from 'url';
import { config } from '../../config';

@Injectable()
export class ChatSocketGateway {
public wss: WebSocket.Server;
private readonly logger: Logger = new Logger('ChatSocketGateway');

constructor() {
this.init();
}

/**
* @description Init func
*/
public init() {
this.wss = new WebSocket.Server({ port: config.WSPORT, path: '/chats' });
this.wss.on('connection', (ws: WebSocket, req: Express.Request) => {
ws.on('message', (message: string) => {
this.logger.log('Messaging is on');
});
this.logger.log('Connecting ws success');
const qs: url.UrlWithParsedQuery = url.parse(req.url, true);
ws['uid'] = qs.query.userIds;
});
}
}
56 changes: 56 additions & 0 deletions src/sockets/chat.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Injectable, Logger } from '@nestjs/common';
import { ChatSocketGateway } from './chat.gateway';
import * as EChatRoom from './enums';
import * as IChatRoom from './interfaces';

@Injectable()
export class ChatSocketService {
private readonly logger: Logger = new Logger('ChatSocketService');

constructor(private readonly chatSocketGateway: ChatSocketGateway) {}

/**
* @description Verify Identity
* @public
* @param {IChatRoom.ISocketWithIdentity} client
* @param {IChatRoom.IChatRoomEntity} chatRoom
* @returns {boolean}
*/
protected isRightClient(
client: IChatRoom.ISocketWithIdentity,
chatRoom: IChatRoom.IChatRoomEntity,
): boolean {
let isClient = false;
chatRoom.participateId.userIds.forEach(user => {
if (user.id === client.uid) {
isClient = true;
}
});
return isClient;
}

/**
* @description send new chat room
* @public
* @param {EChatRoom.EChatRoomSocketEvent} type
* @param {IChatRoom.IChatRoomEntity} chatRoomEvent
* @returns {void}
*/
public sendNewChatRoom(
type: EChatRoom.EChatRoomSocketEvent,
chatRoomEvent: IChatRoom.IChatRoomEntity,
): void {
this.chatSocketGateway.wss.clients.forEach(
(client: IChatRoom.ISocketWithIdentity) => {
const isClient = this.isRightClient(client, chatRoomEvent);
if (isClient) {
client.send(JSON.stringify({ type, data: chatRoomEvent }), err => {
if (err) {
this.logger.error(err.message, '', 'SendNewChatRoom');
}
});
}
},
);
}
}
11 changes: 11 additions & 0 deletions src/sockets/enums/chat-room.enum.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export enum EChatRoomType {
'PUBLIC' = 'public',
'PRIVATE' = 'private',
'GROUP' = 'group',
}

export enum EChatRoomSocketEvent {
'CREATECHATROOM' = 'createchatroom',
'UPDATECHATROOM' = 'updatechatroom',
'DELETECHATROOM' = 'deletechatroom',
}
2 changes: 2 additions & 0 deletions src/sockets/enums/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './chat-room.enum';
export * from './user.enum';
12 changes: 12 additions & 0 deletions src/sockets/enums/user.enum.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export enum EUserRole {
'TRIAL' = 'trial',
'USER' = 'user',
'VIP1' = 'vip1',
'VIP2' = 'vip2',
'ADMIN' = 'admin',
}

export enum EUserGender {
'MALE' = 'male',
'FEMALE' = 'female',
}
Loading

0 comments on commit 7b1f701

Please sign in to comment.