Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate listener observed block from redis to db #1494

Closed
wants to merge 14 commits into from

Conversation

Intizar-T
Copy link
Contributor

@Intizar-T Intizar-T commented May 17, 2024

Description

Requires extra attention: Refactor listener to merge processEventQ with latestQ and historyQ (i.e. add event jobs to workerQ in latestWorker and historyWorker)

Add listener observed block to both redis and db. Read from db only when listener is initialized and read from redis in other cases:

  • write migrate up & down sql files for a new table to keep track of observed block
  • write query, controller function, and add route in api/listener
  • write wrapper api function in core/listener
  • integrate wrapper api into listener code
  • test

Implement nodemon for having hot reload running services (listener, worker, reporter):

  • Note: hot reload in this case means recompile and restart the service

Tests Performed after implementation. All tests have been performed end to end, consumer to service, for both services, VRF and RR:

  • Single request and read operation ✅
  • Multiple request and read operations ✅
  • Kill listener, make multiple requests, start listener ✅
  • Kill listener, make multiple requests, start listener, throw manual error in latestWorker after processing half of unprocessed blocks (to test historyQ and worker) ✅

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

Checklist before requesting a review

  • I have performed a self-review of my code.
  • If it is a core feature, I have added thorough tests.

Deployment

  • Should publish npm package
  • Should publish Docker image

Copy link
Contributor

coderabbitai bot commented May 17, 2024

Walkthrough

Walkthrough

The update introduces new functionality for handling observed blocks within the listener API. It adds SQL queries and endpoints for inserting, updating, and retrieving observed blocks. Additionally, it refactors the listener's event processing logic by removing various queue name parameters and consolidating event handling within specific functions. Error codes and types are updated to support these changes.

Changes

Files/Directories Change Summary
api/listener/controller.go Added ListenerObservedBlockModel, upsertObservedBlock, and getObservedBlock functions.
api/listener/queries.go Added SQL queries GetObservedBlock and UpsertObservedBlock.
api/listener/route.go Added routes for upsertObservedBlock (POST) and getObservedBlock (GET).
api/migrations/000002_add_observed_block.up.sql Created table "observed_blocks" with unique block key and block number columns.
api/migrations/000002_add_observed_block.down.sql Dropped the "observed_blocks" table.
core/src/errors.ts Added new error codes: UpsertObservedBlockFailed, GetObservedBlockFailed.
core/src/listener/api.ts Added functions getObservedBlock and upsertObservedBlock.
core/src/listener/data-feed-L2.ts, core/src/listener/data-feed.ts, core/src/listener/listener.ts, core/src/listener/request-response-L2-fulfill.ts, core/src/listener/request-response-L2-request.ts, core/src/listener/request-response.ts, core/src/listener/vrf-L2-fulfill.ts, core/src/listener/vrf-L2-request.ts, core/src/listener/vrf.ts Removed various processEventQueueName constants and related logic.
core/src/listener/state.ts Updated handling of observed blocks, removed ListenerInitType.
core/src/listener/types.ts Added IObservedBlock interface.

🐇
In code's vast fields, we hop and play,
Observing blocks, both night and day.
With queries swift and routes so keen,
We build a world, precise and clean.
Through errors new, we find our way,
In rabbit's realm, where codes convey.
🐰


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@Intizar-T Intizar-T force-pushed the feat/migrate-observed-block-to-db branch from 91a6148 to 830b489 Compare May 21, 2024 02:55
@Intizar-T Intizar-T self-assigned this May 21, 2024
@Intizar-T Intizar-T force-pushed the feat/migrate-observed-block-to-db branch from 83c5149 to c9c8384 Compare May 22, 2024 06:16
@Intizar-T Intizar-T marked this pull request as ready for review May 22, 2024 06:18
@Intizar-T Intizar-T requested a review from a team as a code owner May 22, 2024 06:18
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 7b4f67b and c9c8384.
Files ignored due to path filters (2)
  • core/package.json is excluded by !**/*.json
  • yarn.lock is excluded by !**/*.lock, !**/*.lock
Files selected for processing (18)
  • api/listener/controller.go (2 hunks)
  • api/listener/queries.go (1 hunks)
  • api/listener/route.go (1 hunks)
  • api/migrations/000002_add_observed_block.down.sql (1 hunks)
  • api/migrations/000002_add_observed_block.up.sql (1 hunks)
  • core/src/errors.ts (1 hunks)
  • core/src/listener/api.ts (2 hunks)
  • core/src/listener/data-feed-L2.ts (3 hunks)
  • core/src/listener/data-feed.ts (1 hunks)
  • core/src/listener/listener.ts (12 hunks)
  • core/src/listener/request-response-L2-fulfill.ts (3 hunks)
  • core/src/listener/request-response-L2-request.ts (3 hunks)
  • core/src/listener/request-response.ts (3 hunks)
  • core/src/listener/state.ts (7 hunks)
  • core/src/listener/types.ts (1 hunks)
  • core/src/listener/vrf-L2-fulfill.ts (3 hunks)
  • core/src/listener/vrf-L2-request.ts (3 hunks)
  • core/src/listener/vrf.ts (3 hunks)
Files skipped from review due to trivial changes (5)
  • api/migrations/000002_add_observed_block.down.sql
  • api/migrations/000002_add_observed_block.up.sql
  • core/src/listener/request-response-L2-request.ts
  • core/src/listener/request-response.ts
  • core/src/listener/vrf.ts
Additional comments not posted (10)
api/listener/route.go (1)

12-13: New routes for observed blocks added correctly.

These routes align with the PR's objective to migrate observed blocks handling from Redis to a database.

core/src/errors.ts (1)

77-78: New error codes for observed block operations added correctly.

These error codes are essential for robust error handling in the new database operations for observed blocks.

core/src/listener/types.ts (1)

81-84: Interface IObservedBlock defined correctly.

This interface is crucial for type safety in handling observed blocks within the new database-centric approach.

api/listener/queries.go (1)

46-58: New SQL queries for observed blocks added correctly.

These queries are essential for the new database operations. Ensure parameterized queries are used to prevent SQL injection.

core/src/listener/vrf-L2-request.ts (1)

Line range hint 1-1: The removal of L2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME and related logic is consistent with the PR's objective to refactor listener-related functionalities.

core/src/listener/request-response-L2-fulfill.ts (1)

Line range hint 1-1: The removal of L2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME and related logic is consistent with the PR's objective to refactor listener-related functionalities.

core/src/listener/api.ts (1)

7-7: The addition of IObservedBlock interface and the functions getListenerObservedBlock and upsertListenerObservedBlock are consistent with the PR's objective to migrate the listener observed block from Redis to a database.

Also applies to: 63-104

core/src/listener/data-feed.ts (1)

Line range hint 1-1: The removal of LISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME and related logic is consistent with the PR's objective to refactor listener-related functionalities.

api/listener/controller.go (1)

37-40: The addition of ListenerObservedBlockModel struct and the functions upsertObservedBlock and getObservedBlock are consistent with the PR's objective to migrate the listener observed block from Redis to a database.

Also applies to: 160-188

core/src/listener/state.ts (1)

13-13: The integration of getListenerObservedBlock and upsertListenerObservedBlock functions into the State class is consistent with the PR's objective to handle observed blocks differently.

Also applies to: 178-196

core/src/listener/listener.ts Show resolved Hide resolved
core/src/listener/listener.ts Show resolved Hide resolved
core/src/listener/listener.ts Outdated Show resolved Hide resolved
Copy link
Collaborator

@nick-bisonai nick-bisonai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a short session (5~10 min) explaining merge of two bullmq queues? Think it'd more understandable after your explanation

api/listener/controller.go Outdated Show resolved Hide resolved
api/listener/controller.go Show resolved Hide resolved
api/listener/controller.go Outdated Show resolved Hide resolved
api/migrations/000002_add_observed_block.up.sql Outdated Show resolved Hide resolved
core/package.json Outdated Show resolved Hide resolved
core/src/errors.ts Outdated Show resolved Hide resolved
core/src/listener/listener.ts Show resolved Hide resolved
core/src/listener/listener.ts Outdated Show resolved Hide resolved
@Intizar-T Intizar-T force-pushed the feat/migrate-observed-block-to-db branch from c9c8384 to 9b1bd39 Compare May 22, 2024 09:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (2)
core/src/listener/listener.ts (2)

150-153: Consider reordering parameters for consistency.

It might improve readability and maintainability if the parameters redisClient and workerQueue are consistently ordered across similar functions.

Also applies to: 158-160, 277-280, 284-286


158-158: Consider grouping queue definitions together for better readability.

As suggested by a previous comment, it might be beneficial to define all queues together to enhance code clarity.

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between c9c8384 and 9b1bd39.
Files selected for processing (18)
  • api/listener/controller.go (2 hunks)
  • api/listener/queries.go (1 hunks)
  • api/listener/route.go (1 hunks)
  • api/migrations/000002_add_observed_block.down.sql (1 hunks)
  • api/migrations/000002_add_observed_block.up.sql (1 hunks)
  • core/src/errors.ts (1 hunks)
  • core/src/listener/api.ts (2 hunks)
  • core/src/listener/data-feed-L2.ts (3 hunks)
  • core/src/listener/data-feed.ts (1 hunks)
  • core/src/listener/listener.ts (12 hunks)
  • core/src/listener/request-response-L2-fulfill.ts (3 hunks)
  • core/src/listener/request-response-L2-request.ts (3 hunks)
  • core/src/listener/request-response.ts (3 hunks)
  • core/src/listener/state.ts (7 hunks)
  • core/src/listener/types.ts (1 hunks)
  • core/src/listener/vrf-L2-fulfill.ts (3 hunks)
  • core/src/listener/vrf-L2-request.ts (3 hunks)
  • core/src/listener/vrf.ts (3 hunks)
Files skipped from review as they are similar to previous changes (17)
  • api/listener/controller.go
  • api/listener/queries.go
  • api/listener/route.go
  • api/migrations/000002_add_observed_block.down.sql
  • api/migrations/000002_add_observed_block.up.sql
  • core/src/errors.ts
  • core/src/listener/api.ts
  • core/src/listener/data-feed-L2.ts
  • core/src/listener/data-feed.ts
  • core/src/listener/request-response-L2-fulfill.ts
  • core/src/listener/request-response-L2-request.ts
  • core/src/listener/request-response.ts
  • core/src/listener/state.ts
  • core/src/listener/types.ts
  • core/src/listener/vrf-L2-fulfill.ts
  • core/src/listener/vrf-L2-request.ts
  • core/src/listener/vrf.ts
Additional comments not posted (3)
core/src/listener/listener.ts (3)

87-89: Refactor to remove Redis usage in favor of database.

Also applies to: 100-100, 119-119


185-185: Refactor to remove Redis usage in favor of database.

Also applies to: 294-294


7-7: Ensure upsertObservedBlock is imported from the correct module.

Verification successful

The upsertObservedBlock function is correctly imported from the ./api module, as it is defined in core/src/listener/api.ts.

  • core/src/listener/api.ts: export async function upsertObservedBlock({
Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the source of `upsertObservedBlock` function.

# Test: Search for the function definition in the project.
rg --type typescript --files-with-matches 'export function upsertObservedBlock'

Length of output: 116



Script:

#!/bin/bash
# Description: Verify the source of `upsertObservedBlock` function.

# Test: Search for the function definition in the project.
rg 'export function upsertObservedBlock' --glob '*.ts'

Length of output: 56



Script:

#!/bin/bash
# Description: Verify the source or usage of `upsertObservedBlock` function.

# Test: Search for the function name in the project.
rg 'upsertObservedBlock' --glob '*.ts'

Length of output: 587

core/src/listener/listener.ts Show resolved Hide resolved
core/src/listener/listener.ts Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 9b1bd39 and 33ecf17.
Files selected for processing (1)
  • core/src/listener/state.ts (6 hunks)
Files skipped from review as they are similar to previous changes (1)
  • core/src/listener/state.ts

Copy link
Member

@martinkersner martinkersner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

There is quite impactful consequence of this new proposal that hasn't been mentioned, which is the processing of multiple blocks and multiple events within a block will be always sequential. Previously, we could tune the number of workers in processEventQueue but now it can be applied only to historyQueue. The impact might be seen when listener gets behind the latest block and when the number of events increase within a block.

After this PR, @Intizar-T will be running benchmarks to see how well it can perform and if it does not work well, we can optimize it.

const events = await state.queryEvent(contractAddress, blockNumber, blockNumber)
for (const [_, event] of events.entries()) {
const jobMetadata = await processFn(event)
if (jobMetadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no jobMetadata we should at least print error to log. Same can be found at historyJob function.

Copy link
Contributor Author

@Intizar-T Intizar-T May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about something like this:

if (jobMetadata) {
  ...
} else {
  logger.warn(`jobMetadata is undefined for event ${event}`)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be an error and not just debug log. We expect it to not be undefined or nil, therefore if it is there is something strange going on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to be precise, we should say that the jobMetadata was not defined, rather than undefined since undefined is not the only case which leads to else branch.

> if (!null) { console.log("hello") }
hello
undefined
> if (!undefined) { console.log("hello") }
hello
undefined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted! then I'm throwing the following error:

throw new Error(`jobMetadata is not defined for an event in block ${blockNumber}`)

between observedBlock and latestBlock)
* it does not exist -> upsert latestBlock
*/
if (observedBlockKey === '') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we handle situation where we deploy this new listener, but while we are deploying we do not listen on new blocks and some block are going to be missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this scenario, if (observedBlockKey === '') happens only one time when we migrate to this new logic, where the observedBlockKey key wont exist in the db and it will be upserted with latest block number. Consequent service restarts will always have observedBlock number and all unprocessed blocks will be passed to the latestQ

There is a reason why I decided to pass unprocessed blocks to latestQ instead of historyQ but now that I think about it again, it should be totally fine. I'll reimplement passing unprocessed blocks to historyQ.

Regarding one time migration, it'd be ideal to figure out the observedBlock number from currently running listener service redis instance and initialize the observedBlock key value pair in the db before the new listener service is started. I'll look into it before we deploy.

core/src/listener/state.ts Outdated Show resolved Hide resolved
Copy link
Collaborator

@nick-bisonai nick-bisonai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 33ecf17 and 140161e.
Files selected for processing (2)
  • core/src/listener/listener.ts (12 hunks)
  • core/src/listener/state.ts (6 hunks)
Files skipped from review as they are similar to previous changes (1)
  • core/src/listener/state.ts
Additional comments not posted (5)
core/src/listener/listener.ts (5)

87-89: Ensure Redis usage is replaced with database interactions as per migration strategy.

This comment is still valid as per the previous review comments on lines 185-185 and 296-296. The Redis operations should be replaced with database interactions.

Also applies to: 100-100


119-119: Ensure proper cleanup of Redis connections.

This comment is still valid as per the previous review comments on lines 320-320. Ensure that Redis operations are removed or properly managed during the shutdown process.


185-185: Replace Redis operations with database interactions.

This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.


224-230: Replace Redis operations with database interactions.

This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.


296-320: Replace Redis operations with database interactions.

This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.

@Intizar-T Intizar-T closed this May 25, 2024
@Intizar-T
Copy link
Contributor Author

Merging processEventQ into latestQ and historyQ turned out not an optimal solution.

@Intizar-T Intizar-T deleted the feat/migrate-observed-block-to-db branch May 27, 2024 04:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants