diff --git a/api/listener/controller.go b/api/listener/controller.go index 14d3d0725..3f6102730 100644 --- a/api/listener/controller.go +++ b/api/listener/controller.go @@ -34,6 +34,11 @@ type ListenerInsertModel struct { Chain string `db:"chain_name" json:"chain" validate:"required"` } +type ObservedBlockModel struct { + BlockKey string `db:"block_key" json:"blockKey" validate:"required"` + BlockNumber int64 `db:"block_number" json:"blockNumber" validate:"blockNumberValidator"` +} + func insert(c *fiber.Ctx) error { payload := new(ListenerInsertModel) if err := c.BodyParser(payload); err != nil { @@ -151,3 +156,33 @@ func deleteById(c *fiber.Ctx) error { return c.JSON(result) } + +func upsertObservedBlock(c *fiber.Ctx) error { + payload := new(ObservedBlockModel) + if err := c.BodyParser(payload); err != nil { + return err + } + + validate := validator.New() + validate.RegisterValidation("blockNumberValidator", func(fl validator.FieldLevel) bool { + return fl.Field().Int() >= 0 + }) + if err := validate.Struct(payload); err != nil { + return err + } + + result, err := utils.QueryRow[ObservedBlockModel](c, UpsertObservedBlock, map[string]any{"block_key": payload.BlockKey, "block_number": payload.BlockNumber}) + if err != nil { + return err + } + return c.JSON(result) +} + +func getObservedBlock(c *fiber.Ctx) error { + blockKey := c.Query("blockKey") + result, err := utils.QueryRow[ObservedBlockModel](c, GetObservedBlock, map[string]any{"block_key": blockKey}) + if err != nil { + return err + } + return c.JSON(result) +} \ No newline at end of file diff --git a/api/listener/queries.go b/api/listener/queries.go index 6bd3f01c7..52db86df0 100644 --- a/api/listener/queries.go +++ b/api/listener/queries.go @@ -42,6 +42,20 @@ const ( (SELECT name from chains WHERE chain_id = listeners.chain_id) AS chain_name, (SELECT name from services WHERE service_id = listeners.service_id) AS service_name; ` + + GetObservedBlock = ` + SELECT * FROM observed_blocks + WHERE block_key = @block_key + LIMIT 1; + ` + + UpsertObservedBlock = ` + INSERT INTO observed_blocks (block_key, block_number) + VALUES (@block_key, @block_number) + ON CONFLICT (block_key) + DO UPDATE SET block_number = @block_number + RETURNING *; + ` ) func GenerateGetListenerQuery(params GetListenerQueryParams) string { diff --git a/api/listener/route.go b/api/listener/route.go index 9318ab98a..82c153f22 100644 --- a/api/listener/route.go +++ b/api/listener/route.go @@ -9,6 +9,8 @@ func Routes(router fiber.Router) { listener.Post("", insert) listener.Get("", get) + listener.Post("/observed-block", upsertObservedBlock) + listener.Get("/observed-block", getObservedBlock) listener.Get("/:id", getById) listener.Patch("/:id", updateById) listener.Delete("/:id", deleteById) diff --git a/api/migrations/000002_add_observed_block.down.sql b/api/migrations/000002_add_observed_block.down.sql new file mode 100644 index 000000000..9a1ef910e --- /dev/null +++ b/api/migrations/000002_add_observed_block.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS "observed_blocks"; \ No newline at end of file diff --git a/api/migrations/000002_add_observed_block.up.sql b/api/migrations/000002_add_observed_block.up.sql new file mode 100644 index 000000000..be0a2b492 --- /dev/null +++ b/api/migrations/000002_add_observed_block.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS "observed_blocks" ( + block_key TEXT NOT NULL, + block_number BIGINT NOT NULL, + CONSTRAINT "observed_blocks_key" UNIQUE ("block_key") +) \ No newline at end of file diff --git a/core/src/errors.ts b/core/src/errors.ts index b43ecce6e..77920bf35 100644 --- a/core/src/errors.ts +++ b/core/src/errors.ts @@ -73,5 +73,7 @@ export enum OraklErrorCode { AxiosCanceledByUser, AxiosNotSupported, AxiosInvalidUrl, - FailedToConnectAPI + FailedToConnectAPI, + UpsertObservedBlockFailed, + GetObservedBlockFailed } diff --git a/core/src/listener/api.ts b/core/src/listener/api.ts index af42b598d..a8b091409 100644 --- a/core/src/listener/api.ts +++ b/core/src/listener/api.ts @@ -4,6 +4,7 @@ import { OraklError, OraklErrorCode } from '../errors' import { ORAKL_NETWORK_API_URL } from '../settings' import { IListenerRawConfig } from '../types' import { buildUrl } from '../utils' +import { IObservedBlock } from './types' const FILE_NAME = import.meta.url @@ -58,3 +59,54 @@ export async function getListener({ throw new OraklError(OraklErrorCode.GetListenerRequestFailed) } } + +/** + * Get observed block number from the Orakl Network API for a given contract address + * + * @param {string} blockKey + * @param {pino.Logger} logger + * @return {Promise} + * @exception {OraklErrorCode.GetObservedBlockFailed} + */ +export async function getObservedBlock({ + blockKey, + logger +}: { + blockKey: string + logger?: Logger +}): Promise { + try { + const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `listener/observed-block?blockKey=${blockKey}`) + return (await axios.get(endpoint))?.data + } catch (e) { + logger?.error({ name: 'getObservedBlock', file: FILE_NAME, ...e }, 'error') + throw new OraklError(OraklErrorCode.GetObservedBlockFailed) + } +} + +/** + * Upsert listener observed block number to the Orakl Network API for a given contract address + * + * @param {string} blockKey + * @param {number} blockNumber + * @param {pino.Logger} logger + * @return {Promise} + * @exception {OraklErrorCode.UpsertObservedBlockFailed} + */ +export async function upsertObservedBlock({ + blockKey, + blockNumber, + logger +}: { + blockKey: string + blockNumber: number + logger?: Logger +}): Promise { + try { + const endpoint = buildUrl(ORAKL_NETWORK_API_URL, 'listener/observed-block') + return (await axios.post(endpoint, { blockKey, blockNumber }))?.data + } catch (e) { + logger?.error({ name: 'upsertObservedBlock', file: FILE_NAME, ...e }, 'error') + throw new OraklError(OraklErrorCode.UpsertObservedBlockFailed) + } +} diff --git a/core/src/listener/data-feed-L2.ts b/core/src/listener/data-feed-L2.ts index 2d2022ab8..92d2543aa 100644 --- a/core/src/listener/data-feed-L2.ts +++ b/core/src/listener/data-feed-L2.ts @@ -10,7 +10,6 @@ import { L2_DATA_FEED_SERVICE_NAME, L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME, L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME, - L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME, L2_WORKER_AGGREGATOR_QUEUE_NAME } from '../settings' import { IAnswerUpdated, IDataFeedListenerWorkerL2, IListenerConfig } from '../types' @@ -30,7 +29,6 @@ export async function buildListener( const eventName = 'AnswerUpdated' const latestQueueName = L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_AGGREGATOR_QUEUE_NAME const abi = Aggregator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -44,11 +42,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/data-feed.ts b/core/src/listener/data-feed.ts index ae20f103a..7026acb75 100644 --- a/core/src/listener/data-feed.ts +++ b/core/src/listener/data-feed.ts @@ -57,11 +57,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/listener.ts b/core/src/listener/listener.ts index 05e3cdf98..46a7fd823 100644 --- a/core/src/listener/listener.ts +++ b/core/src/listener/listener.ts @@ -4,18 +4,11 @@ import { Logger } from 'pino' import type { RedisClientType } from 'redis' import { BULLMQ_CONNECTION, getObservedBlockRedisKey, LISTENER_JOB_SETTINGS } from '../settings' import { IListenerConfig } from '../types' +import { upsertObservedBlock } from './api' import { State } from './state' -import { - IHistoryListenerJob, - ILatestListenerJob, - IProcessEventListenerJob, - ListenerInitType, - ProcessEventOutputType -} from './types' +import { IHistoryListenerJob, ILatestListenerJob, ProcessEventOutputType } from './types' import { watchman } from './watchman' -const FILE_NAME = import.meta.url - /** * The listener service is used for tracking events emmitted by smart * contracts. Tracked events are subsequently send to BullMQ queue to @@ -40,7 +33,6 @@ const FILE_NAME = import.meta.url * @param {string} name of [worker] queue * @param {(log: ethers.Event) => Promise} event processing function * @param {RedisClientType} redis client - * @params {ListenerInitType} listener initialization type * @param {Logger} pino logger */ export async function listenerService({ @@ -52,11 +44,9 @@ export async function listenerService({ eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn, redisClient, - listenerInitType, logger }: { config: IListenerConfig[] @@ -67,16 +57,13 @@ export async function listenerService({ eventName: string latestQueueName: string historyQueueName: string - processEventQueueName: string workerQueueName: string processFn: (log: ethers.Event) => Promise redisClient: RedisClientType - listenerInitType: ListenerInitType logger: Logger }) { const latestListenerQueue = new Queue(latestQueueName, BULLMQ_CONNECTION) const historyListenerQueue = new Queue(historyQueueName, BULLMQ_CONNECTION) - const processEventQueue = new Queue(processEventQueueName, BULLMQ_CONNECTION) const workerQueue = new Queue(workerQueueName, BULLMQ_CONNECTION) const state = new State({ @@ -88,7 +75,6 @@ export async function listenerService({ chain, eventName, abi, - listenerInitType, logger }) await state.clear() @@ -98,8 +84,9 @@ export async function listenerService({ latestJob({ state, historyListenerQueue, - processEventQueue, + workerQueue, redisClient, + processFn, logger }), BULLMQ_CONNECTION @@ -110,22 +97,13 @@ export async function listenerService({ const historyWorker = new Worker( historyQueueName, - historyJob({ state, processEventQueue, logger }), + historyJob({ state, logger, processFn, redisClient, workerQueue }), BULLMQ_CONNECTION ) historyWorker.on('error', (e) => { logger.error(e) }) - const processEventWorker = new Worker( - processEventQueueName, - processEventJob({ workerQueue, processFn, logger }), - BULLMQ_CONNECTION - ) - processEventWorker.on('error', (e) => { - logger.error(e) - }) - for (const listener of config) { await state.add(listener.id) } @@ -137,9 +115,8 @@ export async function listenerService({ await latestWorker.close() await historyWorker.close() - await processEventWorker.close() await state.clear() - await watchmanServer.close() + watchmanServer.close() await redisClient.quit() } process.on('SIGINT', handleExit) @@ -170,15 +147,17 @@ export async function listenerService({ */ function latestJob({ state, - processEventQueue, + workerQueue, historyListenerQueue, redisClient, + processFn, logger }: { state: State - processEventQueue: Queue historyListenerQueue: Queue + workerQueue: Queue redisClient: RedisClientType + processFn: (log: ethers.Event) => Promise logger: Logger }) { async function wrapper(job: Job) { @@ -203,7 +182,7 @@ function latestJob({ try { // We assume that redis cache has been initialized within - // `State.add` method call. + // `State.add` method call and observedBlock has already been processed observedBlock = Number(await redisClient.get(observedBlockRedisKey)) } catch (e) { // Similarly to the failure during fetching the latest block @@ -220,36 +199,52 @@ function latestJob({ observedBlock = Math.max(0, latestBlock - 1) } - const logPrefix = generateListenerLogPrefix(contractAddress, observedBlock, latestBlock) try { if (latestBlock > observedBlock) { - await redisClient.set(observedBlockRedisKey, latestBlock) - // The `observedBlock` block number is already processed, // therefore we do not need to re-query the same event in such // block again. - const events = await state.queryEvent(contractAddress, observedBlock + 1, latestBlock) - for (const [index, event] of events.entries()) { - const outData: IProcessEventListenerJob = { - contractAddress, - event + const lockObservedBlock = observedBlock + 1 + for (let blockNumber = lockObservedBlock; blockNumber <= latestBlock; ++blockNumber) { + const events = await state.queryEvent(contractAddress, blockNumber, blockNumber) + for (const [_, event] of events.entries()) { + const jobMetadata = await processFn(event) + if (jobMetadata) { + const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata + const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS + await workerQueue.add(jobName, jobData, { + jobId, + ...queueSettings + }) + logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`) + } else { + throw new Error(`jobMetadata is not defined for an event in block ${blockNumber}`) + } } - const jobId = getUniqueEventIdentifier(event, index) - await processEventQueue.add('latest', outData, { - jobId, - ...LISTENER_JOB_SETTINGS + await upsertObservedBlock({ + blockKey: observedBlockRedisKey, + blockNumber, + logger: this.logger }) + await redisClient.set(observedBlockRedisKey, blockNumber) + observedBlock += 1 // in case of failure, dont add processed blocks to history queue } - logger.debug(logPrefix) + logger.debug( + `${generateListenerLogPrefix(contractAddress, lockObservedBlock, observedBlock)} success` + ) } else { - logger.debug(`${logPrefix} noop`) + logger.debug( + `${generateListenerLogPrefix(contractAddress, observedBlock, latestBlock)} noop` + ) } } catch (e) { // Querying the latest events or passing data to [process] queue // failed. Repeateable [latest] job will continue listening for // new blocks, and the blocks which failed to be scanned for // events will be retried through [history] job. - logger.warn(`${logPrefix} fail`) + logger.warn( + `${generateListenerLogPrefix(contractAddress, observedBlock + 1, latestBlock)} fail` + ) for (let blockNumber = observedBlock + 1; blockNumber <= latestBlock; ++blockNumber) { const outData: IHistoryListenerJob = { contractAddress, blockNumber } @@ -281,80 +276,51 @@ function latestJob({ */ function historyJob({ state, - processEventQueue, - logger + logger, + processFn, + redisClient, + workerQueue }: { state: State - processEventQueue: Queue logger: Logger + redisClient: RedisClientType + workerQueue: Queue + processFn: (log: ethers.Event) => Promise }) { async function wrapper(job: Job) { const inData: IHistoryListenerJob = job.data const { contractAddress, blockNumber } = inData const logPrefix = generateListenerLogPrefix(contractAddress, blockNumber, blockNumber) - let events: ethers.Event[] = [] try { - events = await state.queryEvent(contractAddress, blockNumber, blockNumber) - } catch (e) { - logger.error(`${logPrefix} hist fail`) - throw e - } - - logger.debug(`${logPrefix} hist`) - - for (const [index, event] of events.entries()) { - const outData: IProcessEventListenerJob = { - contractAddress, - event + const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress) + const observedBlock = Number(await redisClient.get(observedBlockRedisKey)) + const events = await state.queryEvent(contractAddress, blockNumber, blockNumber) + for (const [_, event] of events.entries()) { + const jobMetadata = await processFn(event) + if (jobMetadata) { + const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata + const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS + await workerQueue.add(jobName, jobData, { + jobId, + ...queueSettings + }) + logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`) + } else { + throw new Error(`jobMetadata is not defined for an event in block ${blockNumber}`) + } } - const jobId = getUniqueEventIdentifier(event, index) - await processEventQueue.add('history', outData, { - jobId, - ...LISTENER_JOB_SETTINGS - }) - } - } - - return wrapper -} - -/** - * The [processEvent] listener worker accepts jobs from [processEvent] - * queue. The jobs are submitted either by the [latest] or [history] - * listener worker. - * - * @param {(log: ethers.Event) => Promise} function that processes event caught by listener - */ -function processEventJob({ - workerQueue, - processFn, - logger -}: { - workerQueue: Queue - processFn: (log: ethers.Event) => Promise - logger: Logger -}) { - const _logger = logger.child({ name: 'processEventJob', file: FILE_NAME }) - async function wrapper(job: Job) { - const inData: IProcessEventListenerJob = job.data - const { event } = inData - _logger.debug(event, 'event') - - try { - const jobMetadata = await processFn(event) - if (jobMetadata) { - const { jobId, jobName, jobData, jobQueueSettings } = jobMetadata - const queueSettings = jobQueueSettings ? jobQueueSettings : LISTENER_JOB_SETTINGS - await workerQueue.add(jobName, jobData, { - jobId, - ...queueSettings + if (blockNumber > observedBlock) { + await upsertObservedBlock({ + blockKey: observedBlockRedisKey, + blockNumber, + logger }) - _logger.debug(`Listener submitted job [${jobId}] for [${jobName}]`) + await redisClient.set(observedBlockRedisKey, blockNumber) } } catch (e) { - _logger.error(e, 'Error in user defined listener processing function') + logger.error(`${logPrefix} hist fail`) throw e } } @@ -362,17 +328,6 @@ function processEventJob({ return wrapper } -/** - * Auxiliary function to create a unique identifier for a give `event` - * and `index` of the even within the transaction. - * - * @param {ethers.Event} event - * @param {number} index of event within a transaction - */ -function getUniqueEventIdentifier(event: ethers.Event, index: number) { - return `${event.blockNumber}-${event.transactionHash}-${index}` -} - /** * Auxiliary function that generate a consisten log prefix, that is * used both by the [latest] and [history] listener worker. diff --git a/core/src/listener/request-response-L2-fulfill.ts b/core/src/listener/request-response-L2-fulfill.ts index c52fbea21..ea998e2c0 100644 --- a/core/src/listener/request-response-L2-fulfill.ts +++ b/core/src/listener/request-response-L2-fulfill.ts @@ -7,7 +7,6 @@ import { L1_ENDPOINT, L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME, L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME, - L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME, L2_REQUEST_RESPONSE_FULFILL_LISTENER_STATE_NAME, L2_REQUEST_RESPONSE_FULFILL_SERVICE_NAME, L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME @@ -34,7 +33,6 @@ export async function buildListener( const eventName = 'DataRequestFulfilled' const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_REQUEST_RESPONSE_FULFILL_QUEUE_NAME const abi = L1Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -48,11 +46,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/request-response-L2-request.ts b/core/src/listener/request-response-L2-request.ts index 05a7c36f6..2aa47b0df 100644 --- a/core/src/listener/request-response-L2-request.ts +++ b/core/src/listener/request-response-L2-request.ts @@ -7,7 +7,6 @@ import { L1_ENDPOINT, L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME, L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME, - L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME, L2_REQUEST_RESPONSE_REQUEST_LISTENER_STATE_NAME, L2_REQUEST_RESPONSE_REQUEST_SERVICE_NAME, L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'DataRequested' const latestQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_REQUEST_RESPONSE_REQUEST_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_REQUEST_RESPONSE_REQUEST_QUEUE_NAME const abi = L2Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,11 +41,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/request-response.ts b/core/src/listener/request-response.ts index f5f1156ee..fbb12811d 100644 --- a/core/src/listener/request-response.ts +++ b/core/src/listener/request-response.ts @@ -6,7 +6,6 @@ import { CHAIN, LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME, LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME, - LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME, REQUEST_RESPONSE_LISTENER_STATE_NAME, REQUEST_RESPONSE_SERVICE_NAME, WORKER_REQUEST_RESPONSE_QUEUE_NAME @@ -28,7 +27,6 @@ export async function buildListener( const eventName = 'DataRequested' const latestQueueName = LISTENER_REQUEST_RESPONSE_LATEST_QUEUE_NAME const historyQueueName = LISTENER_REQUEST_RESPONSE_HISTORY_QUEUE_NAME - const processEventQueueName = LISTENER_REQUEST_RESPONSE_PROCESS_EVENT_QUEUE_NAME const workerQueueName = WORKER_REQUEST_RESPONSE_QUEUE_NAME const abi = RequestResponseCoordinator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -42,11 +40,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/state.ts b/core/src/listener/state.ts index df8933e0d..d28adac81 100644 --- a/core/src/listener/state.ts +++ b/core/src/listener/state.ts @@ -10,8 +10,8 @@ import { PROVIDER_URL } from '../settings' import { IListenerConfig, IListenerRawConfig } from '../types' -import { getListeners } from './api' -import { IContracts, IHistoryListenerJob, ILatestListenerJob, ListenerInitType } from './types' +import { getListeners, getObservedBlock, upsertObservedBlock } from './api' +import { IContracts, IHistoryListenerJob, ILatestListenerJob } from './types' import { postprocessListeners } from './utils' const FILE_NAME = import.meta.url @@ -48,7 +48,6 @@ export class State { logger: Logger provider: ethers.providers.JsonRpcProvider contracts: IContracts - listenerInitType: ListenerInitType abi: ethers.ContractInterface constructor({ @@ -60,7 +59,6 @@ export class State { chain, eventName, abi, - listenerInitType, logger }: { redisClient: RedisClientType @@ -71,7 +69,6 @@ export class State { chain: string eventName: string abi: ethers.ContractInterface - listenerInitType: ListenerInitType logger: Logger }) { this.redisClient = redisClient @@ -82,7 +79,6 @@ export class State { this.abi = abi this.chain = chain this.eventName = eventName - this.listenerInitType = listenerInitType this.logger = logger this.provider = new ethers.providers.JsonRpcProvider(PROVIDER_URL) this.contracts = {} @@ -179,33 +175,33 @@ export class State { const contractAddress = toAddListener.address const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress) const latestBlock = await this.latestBlockNumber() - - switch (this.listenerInitType) { - case 'clear': - // Clear metadata about previously observed blocks for a specific - // `contractAddress`. - await this.redisClient.set(observedBlockRedisKey, latestBlock - 1) - break - - case 'latest': - await this.setObservedBlockNumberIfNotDefined(observedBlockRedisKey, latestBlock - 1) - break - - default: - // [block number] initialization - await this.setObservedBlockNumberIfNotDefined(observedBlockRedisKey, latestBlock - 1) - for (let blockNumber = this.listenerInitType; blockNumber < latestBlock; ++blockNumber) { - const historyOutData: IHistoryListenerJob = { - contractAddress, - blockNumber - } - await this.historyListenerQueue.add('history', historyOutData, { - ...LISTENER_JOB_SETTINGS - }) - } - break + const { blockKey: observedBlockKey, blockNumber } = await getObservedBlock({ + blockKey: observedBlockRedisKey, + logger: this.logger + }) + const observedBlockNumber = observedBlockKey === '' ? latestBlock : blockNumber + + // Insert history jobs if there are any unprocessed blocks + // observedBlockNumber is assumed to be already processed + // since we update observedBlock only after processing all events from that block + for (let blockNumber = observedBlockNumber + 1; blockNumber < latestBlock; ++blockNumber) { + const historyOutData: IHistoryListenerJob = { + contractAddress, + blockNumber + } + await this.historyListenerQueue.add('history', historyOutData, { + ...LISTENER_JOB_SETTINGS + }) } + // Update observed block in db and redis + await upsertObservedBlock({ + blockKey: observedBlockRedisKey, + blockNumber: Math.max(latestBlock - 1, 0), + logger: this.logger + }) + await this.redisClient.set(observedBlockRedisKey, Math.max(latestBlock - 1, 0)) + // Insert listener jobs const outData: ILatestListenerJob = { contractAddress diff --git a/core/src/listener/types.ts b/core/src/listener/types.ts index ae6375e27..8475c49b0 100644 --- a/core/src/listener/types.ts +++ b/core/src/listener/types.ts @@ -77,3 +77,8 @@ export interface IProcessEventListenerJob { export interface IContracts { [key: string]: ethers.Contract } + +export interface IObservedBlock { + blockNumber: number + blockKey: string +} diff --git a/core/src/listener/vrf-L2-fulfill.ts b/core/src/listener/vrf-L2-fulfill.ts index 6cf6b95de..af5fb9af5 100644 --- a/core/src/listener/vrf-L2-fulfill.ts +++ b/core/src/listener/vrf-L2-fulfill.ts @@ -7,7 +7,6 @@ import { L2_ENDPOINT, L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME, L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME, - L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME, L2_VRF_FULFILL_LISTENER_STATE_NAME, L2_VRF_FULFILL_SERVICE_NAME, L2_WORKER_VRF_FULFILL_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'RandomWordFulfilled' const latestQueueName = L2_LISTENER_VRF_FULFILL_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_VRF_FULFILL_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_VRF_FULFILL_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_VRF_FULFILL_QUEUE_NAME const abi = L1Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,11 +41,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/vrf-L2-request.ts b/core/src/listener/vrf-L2-request.ts index 1e7bba4cc..d020efcb7 100644 --- a/core/src/listener/vrf-L2-request.ts +++ b/core/src/listener/vrf-L2-request.ts @@ -8,7 +8,6 @@ import { L1_ENDPOINT, L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME, L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME, - L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME, L2_VRF_REQUEST_LISTENER_STATE_NAME, L2_VRF_REQUEST_SERVICE_NAME, L2_WORKER_VRF_REQUEST_QUEUE_NAME @@ -30,7 +29,6 @@ export async function buildListener( const eventName = 'RandomWordsRequested' const latestQueueName = L2_LISTENER_VRF_REQUEST_LATEST_QUEUE_NAME const historyQueueName = L2_LISTENER_VRF_REQUEST_HISTORY_QUEUE_NAME - const processEventQueueName = L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME const workerQueueName = L2_WORKER_VRF_REQUEST_QUEUE_NAME const abi = L2Endpoint__factory.abi const iface = new ethers.utils.Interface(abi) @@ -44,11 +42,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) } diff --git a/core/src/listener/vrf.ts b/core/src/listener/vrf.ts index e16c35a57..4cd3c1e37 100644 --- a/core/src/listener/vrf.ts +++ b/core/src/listener/vrf.ts @@ -7,7 +7,6 @@ import { CHAIN, LISTENER_VRF_HISTORY_QUEUE_NAME, LISTENER_VRF_LATEST_QUEUE_NAME, - LISTENER_VRF_PROCESS_EVENT_QUEUE_NAME, VRF_LISTENER_STATE_NAME, VRF_SERVICE_NAME, WORKER_VRF_QUEUE_NAME @@ -29,7 +28,6 @@ export async function buildListener( const eventName = 'RandomWordsRequested' const latestQueueName = LISTENER_VRF_LATEST_QUEUE_NAME const historyQueueName = LISTENER_VRF_HISTORY_QUEUE_NAME - const processEventQueueName = LISTENER_VRF_PROCESS_EVENT_QUEUE_NAME const workerQueueName = WORKER_VRF_QUEUE_NAME const abi = VRFCoordinator__factory.abi const iface = new ethers.utils.Interface(abi) @@ -43,11 +41,9 @@ export async function buildListener( eventName, latestQueueName, historyQueueName, - processEventQueueName, workerQueueName, processFn: await processEvent({ iface, logger }), redisClient, - listenerInitType: 'latest', logger }) }