Skip to content

Commit

Permalink
Fix freezing on other chain rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
DZakh committed Jan 29, 2025
1 parent 10233a3 commit 1a2d659
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@ open Belt
type t = {
logger: Pino.t,
maxPartitionConcurrency: int,
mutable isWaitingForNewBlock: bool,
mutable waitingForNewBlockStateId: option<int>,
// Should take into consideration partitions fetching for previous states (before rollback)
mutable fetchingPartitionsCount: int,
}

let make = (~maxPartitionConcurrency, ~logger) => {
logger,
maxPartitionConcurrency,
isWaitingForNewBlock: false,
waitingForNewBlockStateId: None,
fetchingPartitionsCount: 0,
}

exception FromBlockIsHigherThanToBlock({fromBlock: int, toBlock: int})

let fetchNext = async (
sourceManager: t,
~fetchState: FetchState.t,
Expand All @@ -45,11 +43,20 @@ let fetchNext = async (
| ReachedMaxConcurrency
| NothingToQuery => ()
| WaitingForNewBlock =>
if !sourceManager.isWaitingForNewBlock {
sourceManager.isWaitingForNewBlock = true
switch sourceManager.waitingForNewBlockStateId {
| Some(waitingStateId) if waitingStateId >= stateId => ()
| Some(_) // Case for the prev state before a rollback
| None =>
sourceManager.waitingForNewBlockStateId = Some(stateId)
let currentBlockHeight = await waitForNewBlock(~currentBlockHeight, ~logger)
sourceManager.isWaitingForNewBlock = false
onNewBlock(~currentBlockHeight)
switch sourceManager.waitingForNewBlockStateId {
| Some(waitingStateId) when waitingStateId === stateId => {
sourceManager.waitingForNewBlockStateId = None
onNewBlock(~currentBlockHeight)
}
| Some(_) // Don't reset it if we are waiting for another state
| None => ()
}
}
| Ready(queries) => {
fetchState->FetchState.startFetchingQueries(~queries, ~stateId)
Expand Down
68 changes: 68 additions & 0 deletions scenarios/test_codegen/test/lib_tests/SourceManager_test.res
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type waitForNewBlockMock = {
fn: (~currentBlockHeight: int, ~logger: Pino.t) => Promise.t<int>,
calls: array<int>,
resolveAll: int => unit,
resolveFns: array<int => unit>,
}

let waitForNewBlockMock = () => {
Expand All @@ -52,6 +53,7 @@ let waitForNewBlockMock = () => {
})
},
calls,
resolveFns,
}
}

Expand Down Expand Up @@ -407,6 +409,72 @@ describe("SourceManager fetchNext", () => {
Assert.deepEqual(onNewBlockMock.calls->Js.Array2.length, 1)
})

Async.it("Restarts waiting for new block after a rollback", async () => {
let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger)

let p0 = mockFullPartition(~partitionIndex=0, ~latestFetchedBlockNumber=5)

let waitForNewBlockMock = waitForNewBlockMock()
let onNewBlockMock = onNewBlockMock()

let fetchNextPromise =
sourceManager->SourceManager.fetchNext(
~fetchState=mockFetchState([p0]),
~maxPerChainQueueSize=1000,
~currentBlockHeight=5,
~executeQuery=neverExecutePartitionQuery,
~waitForNewBlock=waitForNewBlockMock.fn,
~onNewBlock=neverOnNewBlock,
~stateId=0,
)

Assert.deepEqual(waitForNewBlockMock.calls, [5], ~message=`Should wait for new block`)

// Should do nothing on the second call with the same data
await sourceManager->SourceManager.fetchNext(
~fetchState=mockFetchState([p0]),
~maxPerChainQueueSize=1000,
~currentBlockHeight=5,
~executeQuery=neverExecutePartitionQuery,
~waitForNewBlock=neverWaitForNewBlock,
~onNewBlock=neverOnNewBlock,
~stateId=0,
)
Assert.deepEqual(
waitForNewBlockMock.calls,
[5],
~message=`New call is not added with the same stateId`,
)

let fetchNextPromise2 =
sourceManager->SourceManager.fetchNext(
~fetchState=mockFetchState([p0]),
~maxPerChainQueueSize=1000,
~currentBlockHeight=5,
~executeQuery=neverExecutePartitionQuery,
~waitForNewBlock=waitForNewBlockMock.fn,
~onNewBlock=onNewBlockMock.fn,
~stateId=1,
)
Assert.deepEqual(
waitForNewBlockMock.calls,
[5, 5],
~message=`Should add a new call after a rollback`,
)

(waitForNewBlockMock.resolveFns->Js.Array2.unsafe_get(0))(7)
(waitForNewBlockMock.resolveFns->Js.Array2.unsafe_get(1))(6)

await fetchNextPromise
await fetchNextPromise2

Assert.deepEqual(
onNewBlockMock.calls,
[6],
~message=`Should invalidate the waitForNewBlock result with block height 7, which responded after the reorg rollback`,
)
})

Async.it("Can add new partitions until the concurrency limit reached", async () => {
let sourceManager = SourceManager.make(~maxPartitionConcurrency=3, ~logger=Logging.logger)

Expand Down

0 comments on commit 1a2d659

Please sign in to comment.