Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate listener observed block from redis to db #1494

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions api/listener/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type ListenerInsertModel struct {
Chain string `db:"chain_name" json:"chain" validate:"required"`
}

type ObservedBlockModel struct {
BlockKey string `db:"block_key" json:"blockKey" validate:"required"`
BlockNumber int64 `db:"block_number" json:"blockNumber" validate:"blockNumberValidator"`
}

func insert(c *fiber.Ctx) error {
payload := new(ListenerInsertModel)
if err := c.BodyParser(payload); err != nil {
Expand Down Expand Up @@ -151,3 +156,33 @@ func deleteById(c *fiber.Ctx) error {

return c.JSON(result)
}

func upsertObservedBlock(c *fiber.Ctx) error {
payload := new(ObservedBlockModel)
if err := c.BodyParser(payload); err != nil {
return err
}

validate := validator.New()
validate.RegisterValidation("blockNumberValidator", func(fl validator.FieldLevel) bool {
return fl.Field().Int() >= 0
Intizar-T marked this conversation as resolved.
Show resolved Hide resolved
})
if err := validate.Struct(payload); err != nil {
return err
}

result, err := utils.QueryRow[ObservedBlockModel](c, UpsertObservedBlock, map[string]any{"block_key": payload.BlockKey, "block_number": payload.BlockNumber})
if err != nil {
return err
}
return c.JSON(result)
}

func getObservedBlock(c *fiber.Ctx) error {
blockKey := c.Query("blockKey")
result, err := utils.QueryRow[ObservedBlockModel](c, GetObservedBlock, map[string]any{"block_key": blockKey})
if err != nil {
return err
}
return c.JSON(result)
}
14 changes: 14 additions & 0 deletions api/listener/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ const (
(SELECT name from chains WHERE chain_id = listeners.chain_id) AS chain_name,
(SELECT name from services WHERE service_id = listeners.service_id) AS service_name;
`

GetObservedBlock = `
SELECT * FROM observed_blocks
WHERE block_key = @block_key
LIMIT 1;
`

UpsertObservedBlock = `
INSERT INTO observed_blocks (block_key, block_number)
VALUES (@block_key, @block_number)
ON CONFLICT (block_key)
DO UPDATE SET block_number = @block_number
RETURNING *;
`
)

func GenerateGetListenerQuery(params GetListenerQueryParams) string {
Expand Down
2 changes: 2 additions & 0 deletions api/listener/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ func Routes(router fiber.Router) {

listener.Post("", insert)
listener.Get("", get)
listener.Post("/observed-block", upsertObservedBlock)
listener.Get("/observed-block", getObservedBlock)
listener.Get("/:id", getById)
listener.Patch("/:id", updateById)
listener.Delete("/:id", deleteById)
Expand Down
1 change: 1 addition & 0 deletions api/migrations/000002_add_observed_block.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS "observed_blocks";
5 changes: 5 additions & 0 deletions api/migrations/000002_add_observed_block.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS "observed_blocks" (
block_key TEXT NOT NULL,
block_number BIGINT NOT NULL,
CONSTRAINT "observed_blocks_key" UNIQUE ("block_key")
)
4 changes: 3 additions & 1 deletion core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,7 @@ export enum OraklErrorCode {
AxiosCanceledByUser,
AxiosNotSupported,
AxiosInvalidUrl,
FailedToConnectAPI
FailedToConnectAPI,
UpsertObservedBlockFailed,
GetObservedBlockFailed
}
52 changes: 52 additions & 0 deletions core/src/listener/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OraklError, OraklErrorCode } from '../errors'
import { ORAKL_NETWORK_API_URL } from '../settings'
import { IListenerRawConfig } from '../types'
import { buildUrl } from '../utils'
import { IObservedBlock } from './types'

const FILE_NAME = import.meta.url

Expand Down Expand Up @@ -58,3 +59,54 @@ export async function getListener({
throw new OraklError(OraklErrorCode.GetListenerRequestFailed)
}
}

/**
* Get observed block number from the Orakl Network API for a given contract address
*
* @param {string} blockKey
* @param {pino.Logger} logger
* @return {Promise<IObservedBlock>}
* @exception {OraklErrorCode.GetObservedBlockFailed}
*/
export async function getObservedBlock({
blockKey,
logger
}: {
blockKey: string
logger?: Logger
}): Promise<IObservedBlock> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, `listener/observed-block?blockKey=${blockKey}`)
return (await axios.get(endpoint))?.data
} catch (e) {
logger?.error({ name: 'getObservedBlock', file: FILE_NAME, ...e }, 'error')
throw new OraklError(OraklErrorCode.GetObservedBlockFailed)
}
}

/**
* Upsert listener observed block number to the Orakl Network API for a given contract address
*
* @param {string} blockKey
* @param {number} blockNumber
* @param {pino.Logger} logger
* @return {Promise<IObservedBlock>}
* @exception {OraklErrorCode.UpsertObservedBlockFailed}
*/
export async function upsertObservedBlock({
blockKey,
blockNumber,
logger
}: {
blockKey: string
blockNumber: number
logger?: Logger
}): Promise<IObservedBlock> {
try {
const endpoint = buildUrl(ORAKL_NETWORK_API_URL, 'listener/observed-block')
return (await axios.post(endpoint, { blockKey, blockNumber }))?.data
} catch (e) {
logger?.error({ name: 'upsertObservedBlock', file: FILE_NAME, ...e }, 'error')
throw new OraklError(OraklErrorCode.UpsertObservedBlockFailed)
}
}
4 changes: 0 additions & 4 deletions core/src/listener/data-feed-L2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
L2_DATA_FEED_SERVICE_NAME,
L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME,
L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME,
L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME,
L2_WORKER_AGGREGATOR_QUEUE_NAME
} from '../settings'
import { IAnswerUpdated, IDataFeedListenerWorkerL2, IListenerConfig } from '../types'
Expand All @@ -30,7 +29,6 @@ export async function buildListener(
const eventName = 'AnswerUpdated'
const latestQueueName = L2_LISTENER_DATA_FEED_LATEST_QUEUE_NAME
const historyQueueName = L2_LISTENER_DATA_FEED_HISTORY_QUEUE_NAME
const processEventQueueName = L2_LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME
const workerQueueName = L2_WORKER_AGGREGATOR_QUEUE_NAME
const abi = Aggregator__factory.abi
const iface = new ethers.utils.Interface(abi)
Expand All @@ -44,11 +42,9 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
listenerInitType: 'latest',
logger
})
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/listener/data-feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,9 @@ export async function buildListener(
eventName,
latestQueueName,
historyQueueName,
processEventQueueName,
workerQueueName,
processFn: await processEvent({ iface, logger }),
redisClient,
listenerInitType: 'latest',
logger
})
}
Expand Down
Loading
Loading