Skip to content

Commit

Permalink
feat: add multiconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
temarusanov committed Sep 28, 2023
1 parent 0b74307 commit 019ee30
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 163 deletions.
19 changes: 12 additions & 7 deletions libs/external/nats/src/lib/nats-configs/nats-module.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@ import { ConfigurableModuleBuilder } from '@nestjs/common'
import { merge } from 'lodash'
import { ConnectionOptions } from 'nats'

export interface NatsConfig extends ConnectionOptions {
export interface NatsConnectionConfig extends ConnectionOptions {
/**
* Connection name, provide it if you want to support different NATS connection (NOT clusters)
*/
connectionName?: string

/**
* @default: true
*/
enableJetstream?: boolean
}

export interface NatsConfig {
connections: NatsConnectionConfig[]

/**
* When set to `true` the client will print protocol messages that it receives
* or sends to the server using NestJS logger. If you want to debug logs from `nats.io` directly
* use `debug: true`
* use `debug: true` in `NatsConnectionConfig`
*/
debugLog?: {
enable:
Expand All @@ -26,11 +35,7 @@ export interface NatsConfig extends ConnectionOptions {
}
}

export const DEFAULT_NATS_CONFIG: Pick<
NatsConfig,
'enableJetstream' | 'debugLog'
> = {
enableJetstream: true,
export const DEFAULT_NATS_CONFIG: Pick<NatsConfig, 'debugLog'> = {
debugLog: {
enable: false,
},
Expand Down
2 changes: 2 additions & 0 deletions libs/external/nats/src/lib/nats-constants/nats.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ export const CONSUME_ARGS_METADATA = 'nats.consume.args.metadata'
export const CONSUME_PAYLOAD_TYPE = 3
export const CONSUME_HEADER_TYPE = 4
export const CONSUME_ACK_TYPE = 5

export const DEFAULT_CONNECTION_NAME = 'nats-default-connection'
10 changes: 8 additions & 2 deletions libs/external/nats/src/lib/nats-decorators/reply.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { applyDecorators, SetMetadata } from '@nestjs/common'
import { REPLY_METADATA } from '../nats-constants/nats.constants'
import { SubscriptionOptions } from '../nats-interfaces/nats.interfaces'

export function Reply(subject: string, options?: SubscriptionOptions) {
return applyDecorators(SetMetadata(REPLY_METADATA, { subject, options }))
export function Reply(
subject: string,
options?: SubscriptionOptions,
connectionName?: string,
) {
return applyDecorators(
SetMetadata(REPLY_METADATA, { subject, options, connectionName }),
)
}
2 changes: 2 additions & 0 deletions libs/external/nats/src/lib/nats-errors/nats-errors.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum NatsErrorsEnum {
NoConnection = 'NATS-002',
JetStreamNotEnabled = 'NATS-003',
JetStreamNotEnabledConfig = 'NATS-004',
UnknownConnectionName = 'NATS-005',
}

export const NATS_ERROR_TITLES = {
Expand All @@ -12,4 +13,5 @@ export const NATS_ERROR_TITLES = {
[NatsErrorsEnum.NoConnection]: `No connection established to NATS`,
[NatsErrorsEnum.JetStreamNotEnabled]: `JetStream not enabled. Please provide '--jetstream' option to your nats server`,
[NatsErrorsEnum.JetStreamNotEnabledConfig]: `JetStream not enabled. Please provide 'enableJetStream: true' option to your nats module config`,
[NatsErrorsEnum.UnknownConnectionName]: `Provide connection name if you use multiple NATS connections`,
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,38 @@ export class NatsConnectionHealthIndicator {
@HealthIndicator('nats')
async isHealthy(): Promise<HealthIndicatorResult> {
try {
const status = this.natsConnection.status()
const allConnections = this.natsConnection.getAllConnections()
let allConnected = true
const details: Record<string, any> = []

if (status.connected) {
for (const connection of allConnections) {
const status = this.natsConnection.status(connection.name)

if (!status.connected) {
allConnected = false
details.push({
connectionName: connection.name,
connected: false,
error: status.error,
})
} else {
details.push({
connectionName: connection.name,
connected: true,
})
}
}

if (allConnected) {
return {
status: 'up',
details,
}
}

return {
status: 'down',
error: status.error,
details,
}
} catch (error) {
return {
Expand Down
16 changes: 16 additions & 0 deletions libs/external/nats/src/lib/nats-interfaces/nats.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import {
ConsumeOptions as NatsConsumeOptions,
ConsumerConfig,
JetStreamOptions,
JetStreamPublishOptions,
MsgHdrs,
NatsConnection,
RequestOptions as NatsRequestOptions,
StreamConfig,
SubscriptionOptions as NatsSubscriptionOptions,
} from 'nats'

import { NatsConnectionConfig } from '../nats-configs/nats-module.config'

export interface NatsResponse<T> {
subject: string
data: T
Expand All @@ -23,6 +27,17 @@ export interface RequestOptions
headers: Record<string, string>
}

export interface NatsConnectionObject {
name: string
options: NatsConnectionConfig
connection: NatsConnection
}

export interface GetJetStreamManagerOptions {
connectionName?: string
options?: JetStreamOptions
}

export interface PublishOptions
extends Omit<Partial<JetStreamPublishOptions>, 'headers' | 'timeout'> {
timeout?: number
Expand Down Expand Up @@ -84,6 +99,7 @@ export interface ConsumerAcks {
}

export interface ConsumeOptions {
connectionName?: string
stream: string
consumer: Partial<ConsumerConfig>
options?: Omit<NatsConsumeOptions, 'callback'>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ export class NatsClientService {
subject: string,
payload?: T,
options?: RequestOptions,
connectionName?: string,
): Promise<NatsResponse<K>> {
const nc = this.natsConnection.getNatsConnection()
const nc =
this.natsConnection.getNatsConnection(connectionName).connection

const encodedPayload: Payload = encodeMessage(payload)

Expand Down Expand Up @@ -103,8 +105,10 @@ export class NatsClientService {
async reply(
subject: string,
options?: SubscriptionOptions,
connectionName?: string,
): Promise<Subscription> {
const nc = this.natsConnection.getNatsConnection()
const nc =
this.natsConnection.getNatsConnection(connectionName).connection

const subscription = nc.subscribe(subject, options)
this.logger.log(`Mapped {${subject}, NATS} route`)
Expand Down
Loading

0 comments on commit 019ee30

Please sign in to comment.