Skip to content

Commit

Permalink
revamp state.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed May 27, 2024
1 parent 4f9836f commit c7496e3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 37 deletions.
2 changes: 1 addition & 1 deletion api/blocks/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getObservedBlock(c *fiber.Ctx) error {
if service == "" {
return fiber.NewError(fiber.StatusBadRequest, "service is required")
}
result, err := utils.QueryRows[BlockModel](c, GetObservedBlock, map[string]any{
result, err := utils.QueryRow[BlockModel](c, GetObservedBlock, map[string]any{
"service": service,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions core/src/listener/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ export async function getListener({

/**
* @param {string} service
* @return {Promise<number>}
* @return {Promise<IBlock>}
* @exception {FailedToGetObservedBlock}
*/
export async function getObservedBlock({ service }: { service: string }): Promise<number> {
export async function getObservedBlock({ service }: { service: string }): Promise<IBlock> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `blocks/observed?service=${service}`)
return (await axios.get(endpoint))?.data?.blockNumber
return (await axios.get(endpoint))?.data
} catch (e) {
throw new OraklError(OraklErrorCode.FailedToGetObservedBlock)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/listener/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ function latestJob({
try {
// We assume that redis cache has been initialized within
// `State.add` method call.
observedBlock = await getObservedBlock({ service })
observedBlock = (await getObservedBlock({ service })).blockNumber
} catch (e) {
// Similarly to the failure during fetching the latest block
// number, this error doesn't require job resubmission. The next
Expand Down
82 changes: 50 additions & 32 deletions core/src/listener/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ import ethers from 'ethers'
import { Logger } from 'pino'
import type { RedisClientType } from 'redis'
import { OraklError, OraklErrorCode } from '../errors'
import {
getObservedBlockRedisKey,
LISTENER_DELAY,
LISTENER_JOB_SETTINGS,
PROVIDER_URL
} from '../settings'
import { LISTENER_DELAY, LISTENER_JOB_SETTINGS, PROVIDER_URL } from '../settings'
import { IListenerConfig, IListenerRawConfig } from '../types'
import { getListeners } from './api'
import { getListeners, getObservedBlock, getUnprocessedBlocks, upsertObservedBlock } from './api'
import { IContracts, IHistoryListenerJob, ILatestListenerJob, ListenerInitType } from './types'
import { postprocessListeners } from './utils'

Expand Down Expand Up @@ -131,6 +126,16 @@ export class State {
return state ? JSON.parse(state) : []
}

async addBlockToHistoryQueue(contractAddress: string, blockNumber: number) {
const historyOutData: IHistoryListenerJob = {
contractAddress,
blockNumber
}
await this.historyListenerQueue.add('history', historyOutData, {
...LISTENER_JOB_SETTINGS
})
}

/**
* Add listener identified by `id` parameter. `id` is a global
* listener identifier stored at permanent listener state. Listener
Expand Down Expand Up @@ -177,33 +182,46 @@ export class State {
await this.redisClient.set(this.stateName, JSON.stringify(updatedActiveListeners))

const contractAddress = toAddListener.address
const observedBlockRedisKey = getObservedBlockRedisKey(contractAddress)
const latestBlock = await this.latestBlockNumber()

switch (this.listenerInitType) {
case 'clear':
// Clear metadata about previously observed blocks for a specific
// `contractAddress`.
await this.redisClient.set(observedBlockRedisKey, latestBlock - 1)
break

case 'latest':
await this.setObservedBlockNumberIfNotDefined(observedBlockRedisKey, latestBlock - 1)
break

default:
// [block number] initialization
await this.setObservedBlockNumberIfNotDefined(observedBlockRedisKey, latestBlock - 1)
for (let blockNumber = this.listenerInitType; blockNumber < latestBlock; ++blockNumber) {
const historyOutData: IHistoryListenerJob = {
contractAddress,
blockNumber
const observedBlock = await getObservedBlock({ service: this.service })

// if there is no observedBlock record in the db,
// use the listenerInitType to determine how to initialize the listener
if (observedBlock.service === '') {
switch (this.listenerInitType) {
case 'clear':
// Clear metadata about previously observed blocks for a specific
// `contractAddress`.
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
break

case 'latest':
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
break

default:
// [block number] initialization
for (let blockNumber = this.listenerInitType; blockNumber < latestBlock; ++blockNumber) {
await this.addBlockToHistoryQueue(contractAddress, blockNumber)
}
await this.historyListenerQueue.add('history', historyOutData, {
...LISTENER_JOB_SETTINGS
})
}
break
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
break
}
} else {
for (
let blockNumber = observedBlock.blockNumber + 1;
blockNumber < latestBlock;
++blockNumber
) {
await this.addBlockToHistoryQueue(contractAddress, blockNumber)
}
await upsertObservedBlock({ service: this.service, blockNumber: latestBlock - 1 })
}

// fetch all unprocessed blocks and pass to the history queue
const unprocessedBlocks = await getUnprocessedBlocks({ service: this.service })
for (const block of unprocessedBlocks) {
await this.addBlockToHistoryQueue(contractAddress, block.blockNumber)
}

// Insert listener jobs
Expand Down

0 comments on commit c7496e3

Please sign in to comment.