-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate listener observed block from redis to db #1494
Conversation
WalkthroughWalkthroughThe update introduces new functionality for handling observed blocks within the listener API. It adds SQL queries and endpoints for inserting, updating, and retrieving observed blocks. Additionally, it refactors the listener's event processing logic by removing various queue name parameters and consolidating event handling within specific functions. Error codes and types are updated to support these changes. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
91a6148
to
830b489
Compare
83c5149
to
c9c8384
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Review Details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files ignored due to path filters (2)
core/package.json
is excluded by!**/*.json
yarn.lock
is excluded by!**/*.lock
,!**/*.lock
Files selected for processing (18)
- api/listener/controller.go (2 hunks)
- api/listener/queries.go (1 hunks)
- api/listener/route.go (1 hunks)
- api/migrations/000002_add_observed_block.down.sql (1 hunks)
- api/migrations/000002_add_observed_block.up.sql (1 hunks)
- core/src/errors.ts (1 hunks)
- core/src/listener/api.ts (2 hunks)
- core/src/listener/data-feed-L2.ts (3 hunks)
- core/src/listener/data-feed.ts (1 hunks)
- core/src/listener/listener.ts (12 hunks)
- core/src/listener/request-response-L2-fulfill.ts (3 hunks)
- core/src/listener/request-response-L2-request.ts (3 hunks)
- core/src/listener/request-response.ts (3 hunks)
- core/src/listener/state.ts (7 hunks)
- core/src/listener/types.ts (1 hunks)
- core/src/listener/vrf-L2-fulfill.ts (3 hunks)
- core/src/listener/vrf-L2-request.ts (3 hunks)
- core/src/listener/vrf.ts (3 hunks)
Files skipped from review due to trivial changes (5)
- api/migrations/000002_add_observed_block.down.sql
- api/migrations/000002_add_observed_block.up.sql
- core/src/listener/request-response-L2-request.ts
- core/src/listener/request-response.ts
- core/src/listener/vrf.ts
Additional comments not posted (10)
api/listener/route.go (1)
12-13
: New routes for observed blocks added correctly.These routes align with the PR's objective to migrate observed blocks handling from Redis to a database.
core/src/errors.ts (1)
77-78
: New error codes for observed block operations added correctly.These error codes are essential for robust error handling in the new database operations for observed blocks.
core/src/listener/types.ts (1)
81-84
: InterfaceIObservedBlock
defined correctly.This interface is crucial for type safety in handling observed blocks within the new database-centric approach.
api/listener/queries.go (1)
46-58
: New SQL queries for observed blocks added correctly.These queries are essential for the new database operations. Ensure parameterized queries are used to prevent SQL injection.
core/src/listener/vrf-L2-request.ts (1)
Line range hint
1-1
: The removal ofL2_LISTENER_VRF_REQUEST_PROCESS_EVENT_QUEUE_NAME
and related logic is consistent with the PR's objective to refactor listener-related functionalities.core/src/listener/request-response-L2-fulfill.ts (1)
Line range hint
1-1
: The removal ofL2_LISTENER_REQUEST_RESPONSE_FULFILL_PROCESS_EVENT_QUEUE_NAME
and related logic is consistent with the PR's objective to refactor listener-related functionalities.core/src/listener/api.ts (1)
7-7
: The addition ofIObservedBlock
interface and the functionsgetListenerObservedBlock
andupsertListenerObservedBlock
are consistent with the PR's objective to migrate the listener observed block from Redis to a database.Also applies to: 63-104
core/src/listener/data-feed.ts (1)
Line range hint
1-1
: The removal ofLISTENER_DATA_FEED_PROCESS_EVENT_QUEUE_NAME
and related logic is consistent with the PR's objective to refactor listener-related functionalities.api/listener/controller.go (1)
37-40
: The addition ofListenerObservedBlockModel
struct and the functionsupsertObservedBlock
andgetObservedBlock
are consistent with the PR's objective to migrate the listener observed block from Redis to a database.Also applies to: 160-188
core/src/listener/state.ts (1)
13-13
: The integration ofgetListenerObservedBlock
andupsertListenerObservedBlock
functions into theState
class is consistent with the PR's objective to handle observed blocks differently.Also applies to: 178-196
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a short session (5~10 min) explaining merge of two bullmq queues? Think it'd more understandable after your explanation
c9c8384
to
9b1bd39
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range and nitpick comments (2)
core/src/listener/listener.ts (2)
150-153
: Consider reordering parameters for consistency.It might improve readability and maintainability if the parameters
redisClient
andworkerQueue
are consistently ordered across similar functions.Also applies to: 158-160, 277-280, 284-286
158-158
: Consider grouping queue definitions together for better readability.As suggested by a previous comment, it might be beneficial to define all queues together to enhance code clarity.
Review Details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (18)
- api/listener/controller.go (2 hunks)
- api/listener/queries.go (1 hunks)
- api/listener/route.go (1 hunks)
- api/migrations/000002_add_observed_block.down.sql (1 hunks)
- api/migrations/000002_add_observed_block.up.sql (1 hunks)
- core/src/errors.ts (1 hunks)
- core/src/listener/api.ts (2 hunks)
- core/src/listener/data-feed-L2.ts (3 hunks)
- core/src/listener/data-feed.ts (1 hunks)
- core/src/listener/listener.ts (12 hunks)
- core/src/listener/request-response-L2-fulfill.ts (3 hunks)
- core/src/listener/request-response-L2-request.ts (3 hunks)
- core/src/listener/request-response.ts (3 hunks)
- core/src/listener/state.ts (7 hunks)
- core/src/listener/types.ts (1 hunks)
- core/src/listener/vrf-L2-fulfill.ts (3 hunks)
- core/src/listener/vrf-L2-request.ts (3 hunks)
- core/src/listener/vrf.ts (3 hunks)
Files skipped from review as they are similar to previous changes (17)
- api/listener/controller.go
- api/listener/queries.go
- api/listener/route.go
- api/migrations/000002_add_observed_block.down.sql
- api/migrations/000002_add_observed_block.up.sql
- core/src/errors.ts
- core/src/listener/api.ts
- core/src/listener/data-feed-L2.ts
- core/src/listener/data-feed.ts
- core/src/listener/request-response-L2-fulfill.ts
- core/src/listener/request-response-L2-request.ts
- core/src/listener/request-response.ts
- core/src/listener/state.ts
- core/src/listener/types.ts
- core/src/listener/vrf-L2-fulfill.ts
- core/src/listener/vrf-L2-request.ts
- core/src/listener/vrf.ts
Additional comments not posted (3)
core/src/listener/listener.ts (3)
87-89
: Refactor to remove Redis usage in favor of database.Also applies to: 100-100, 119-119
185-185
: Refactor to remove Redis usage in favor of database.Also applies to: 294-294
7-7
: EnsureupsertObservedBlock
is imported from the correct module.Verification successful
The
upsertObservedBlock
function is correctly imported from the./api
module, as it is defined incore/src/listener/api.ts
.
core/src/listener/api.ts: export async function upsertObservedBlock({
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the source of `upsertObservedBlock` function. # Test: Search for the function definition in the project. rg --type typescript --files-with-matches 'export function upsertObservedBlock'Length of output: 116
Script:
#!/bin/bash # Description: Verify the source of `upsertObservedBlock` function. # Test: Search for the function definition in the project. rg 'export function upsertObservedBlock' --glob '*.ts'Length of output: 56
Script:
#!/bin/bash # Description: Verify the source or usage of `upsertObservedBlock` function. # Test: Search for the function name in the project. rg 'upsertObservedBlock' --glob '*.ts'Length of output: 587
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- core/src/listener/state.ts (6 hunks)
Files skipped from review as they are similar to previous changes (1)
- core/src/listener/state.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There is quite impactful consequence of this new proposal that hasn't been mentioned, which is the processing of multiple blocks and multiple events within a block will be always sequential. Previously, we could tune the number of workers in processEventQueue
but now it can be applied only to historyQueue
. The impact might be seen when listener gets behind the latest block and when the number of events increase within a block.
After this PR, @Intizar-T will be running benchmarks to see how well it can perform and if it does not work well, we can optimize it.
const events = await state.queryEvent(contractAddress, blockNumber, blockNumber) | ||
for (const [_, event] of events.entries()) { | ||
const jobMetadata = await processFn(event) | ||
if (jobMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no jobMetadata
we should at least print error to log. Same can be found at historyJob
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about something like this:
if (jobMetadata) {
...
} else {
logger.warn(`jobMetadata is undefined for event ${event}`)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be an error and not just debug log. We expect it to not be undefined or nil, therefore if it is there is something strange going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, to be precise, we should say that the jobMetadata was not defined, rather than undefined since undefined
is not the only case which leads to else branch.
> if (!null) { console.log("hello") }
hello
undefined
> if (!undefined) { console.log("hello") }
hello
undefined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted! then I'm throwing the following error:
throw new Error(`jobMetadata is not defined for an event in block ${blockNumber}`)
core/src/listener/state.ts
Outdated
between observedBlock and latestBlock) | ||
* it does not exist -> upsert latestBlock | ||
*/ | ||
if (observedBlockKey === '') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would we handle situation where we deploy this new listener, but while we are deploying we do not listen on new blocks and some block are going to be missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this scenario, if (observedBlockKey === '')
happens only one time when we migrate to this new logic, where the observedBlockKey
key wont exist in the db and it will be upserted with latest block number. Consequent service restarts will always have observedBlock number and all unprocessed blocks will be passed to the latestQ
There is a reason why I decided to pass unprocessed blocks to latestQ instead of historyQ but now that I think about it again, it should be totally fine. I'll reimplement passing unprocessed blocks to historyQ.
Regarding one time migration, it'd be ideal to figure out the observedBlock number from currently running listener service redis instance and initialize the observedBlock key value pair in the db before the new listener service is started. I'll look into it before we deploy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- core/src/listener/listener.ts (12 hunks)
- core/src/listener/state.ts (6 hunks)
Files skipped from review as they are similar to previous changes (1)
- core/src/listener/state.ts
Additional comments not posted (5)
core/src/listener/listener.ts (5)
87-89
: Ensure Redis usage is replaced with database interactions as per migration strategy.This comment is still valid as per the previous review comments on lines 185-185 and 296-296. The Redis operations should be replaced with database interactions.
Also applies to: 100-100
119-119
: Ensure proper cleanup of Redis connections.This comment is still valid as per the previous review comments on lines 320-320. Ensure that Redis operations are removed or properly managed during the shutdown process.
185-185
: Replace Redis operations with database interactions.This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.
224-230
: Replace Redis operations with database interactions.This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.
296-320
: Replace Redis operations with database interactions.This comment is still valid as per the previous review comments on lines 296-296. The Redis operations should be replaced with database interactions.
Merging processEventQ into latestQ and historyQ turned out not an optimal solution. |
Description
Requires extra attention: Refactor listener to merge processEventQ with latestQ and historyQ (i.e. add event jobs to workerQ in latestWorker and historyWorker)
Add listener observed block to both redis and db. Read from db only when listener is initialized and read from redis in other cases:
Implement nodemon for having hot reload running services (listener, worker, reporter):
Tests Performed after implementation. All tests have been performed end to end, consumer to service, for both services, VRF and RR:
Fixes # (issue)
Type of change
Please delete options that are not relevant.
Checklist before requesting a review
Deployment