Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic retry functionality #1119

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open

Conversation

dryajov
Copy link
Contributor

@dryajov dryajov commented Feb 19, 2025

No description provided.

@dryajov dryajov force-pushed the feat/add-retry-functionality branch from 74d689c to 1c98b31 Compare February 19, 2025 17:22
@dryajov dryajov marked this pull request as ready for review February 20, 2025 15:12
Copy link
Contributor

@marcinczenko marcinczenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have started the review, and I will be working on by adding more comments today and over next days if possible (will try to look up a couple of things over the weekend as well). No need to wait for me, if someone is faster. My objective is to have some observations confirmed and some questions answered. Because I have documented the whole flow recently, I just want to take advantage of this more substantial change to improve the docs and perhaps acting as sounding board...

Comment on lines +404 to 409
let peerCtx = self.peers.get(peer)

if peerCtx != nil:
await b.payForBlocks(peerCtx, blocksDelivery)
await self.payForBlocks(peerCtx, blocksDelivery)
## shouldn't we remove them from the want-list instead of this:
peerCtx.cleanPresence(blocksDelivery.mapIt(it.address))
Copy link
Contributor

@marcinczenko marcinczenko Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just above the highlighted lines we are calling resolveBlocks. resolveBlocks will call cancelBlocks which will take care for removing the relevant presence entries, right?

If this is the case, perhaps, we could move the payment there as well?

Copy link
Member

@gmega gmega left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great first pass. I can see the improvements in the benchmarks.

$(NPH) *.nim
$(NPH) codex/
$(NPH) tests/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ehehe good easter egg... 😃

@@ -143,8 +143,6 @@ proc start*(b: DiscoveryEngine) {.async.} =
asyncSpawn fut

b.discoveryLoop = b.discoveryQueueLoop()
b.trackedFutures.track(b.discoveryLoop)
asyncSpawn b.discoveryLoop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... why are you excluding this? This will cause the discovery loop not to stop when someone calls .stop() on the discovery service. Or rather, it will stop cause Codex will exit anyway, but won't receive a cancel signal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not really - this doesn't affect stopping since we call cancel the tracked futures on stop. The asyncSpawn call tries to do some logging and re-throw any future error as a Defect, but that was really a workaround because we lacked checked exceptions on async code. I do intend to make tracked futures only accept Future.Raises([]) for that matter.

Copy link
Member

@gmega gmega Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait but you're not adding the discoveryLoop future to the trackedFutures, so that one won't stop.

self.blockexcRunning = true
for i in 0 ..< self.concurrentTasks:
let fut = self.blockexcTaskRunner()
self.trackedFutures.track(fut)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I guess you don't need asyncspawn here cause we're already keeping a ref to the future in trackedFutures?

warn "Stopping blockexc without starting it"
return

b.blockexcRunning = false
await b.trackedFutures.cancelTracked()
self.blockexcRunning = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eheh good catch...

except CatchableError as exc:
warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
Rng.instance.sample(peers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's much better. The previous approach was not random at all, you'd keep pounding the same peer if you're trying to fetch the same block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that was quite annoying when I realized what was going on.


logScope:
topics = "codex blockexcnetwork"

const
Codec* = "/codex/blockexc/1.0.0"
MaxInflight* = 100
DefaultMaxInflight* = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, Deluge allows 1500 pending block requests per peer in its most aggressive profiles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know, we can definitely adjust.

## Perform protocol initialization
##

proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
b.setupPeer(peerId)
self.setupPeer(peerId)
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I see there are three PeerEventKinds:

  PeerEventKind* {.pure.} = enum
    Left
    Joined
    Identified

as it is now, if we get Indentified, we'll drop the peer. I'm assuming you're aware of this but pointing it out anyhow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, not really, thats a new event 🤔. Good catch!

@@ -251,15 +251,15 @@ type
desc:
"Time interval in seconds - determines frequency of block " &
"maintenance cycle: how often blocks are checked " & "for expiration and cleanup",
defaultValue: DefaultBlockMaintenanceInterval,
defaultValueDesc: $DefaultBlockMaintenanceInterval,
defaultValue: DefaultBlockInterval,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is improving what this constant contains...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SuperLongSelfDescriptingValueIdentifierSuperConstant is quite annoying tho 😁. Matter of taste, but I can live with it if we find it useful.


if err =? (await erasureJob()).errorOption:
return failure(err)
self.trackedFutures.track(prefetch())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see why you're doing this, but now streamEntireDataset will effectively continue downloading even if the client stops streaming. Anyhow, I guess we're gonna drop streaming soon enough anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you think that streaming is going to be dropped? Otherwise, good observation.

raise newException(RngSampleError, "Limit cannot be larger than sample!")

for _ in 0 ..< min(sample.len, limit):
result.add(rng.sample(sample, result))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sample procedures aren't great, I'm going to do a PR on rng.nim.

@@ -143,8 +143,6 @@ proc start*(b: DiscoveryEngine) {.async.} =
asyncSpawn fut

b.discoveryLoop = b.discoveryQueueLoop()
b.trackedFutures.track(b.discoveryLoop)
asyncSpawn b.discoveryLoop

proc stop*(b: DiscoveryEngine) {.async.} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are changing b => self all over the places, maybe you want to do it here as well?


trace "NetworkStore stopped"

proc sendWantHave(
b: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx]
self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx]
): Future[void] {.async.} =
for p in peers:
let toAsk = addresses.filterIt(it notin p.peerHave)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you did not touch that line, but it would be cleaner to have sendWantHave not to do any filtering (I mean, this should happen elsewhere, where it is really relevant). Especially that sendWantBlock is nice and clean without side-effected (the caller expects the request to be just sent)... Maybe a small separate PR...

self: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
if address notin self.pendingBlocks:
self.trackedFutures.track(self.downloadInternal(address))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what are we really tracking here in:

self.trackedFutures.track(self.downloadInternal(address))

Which future will the downloadInternal return? If I read it well it will be the result of await (handle or sleepAsync(self.pendingBlocks.retryInterval)), which returns a new future which will complete once either handle or sleepAsync complete, which could well be sleepAsync. I think we always want to track handle.

if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and
not self.pendingBlocks.isInFlight(address):
self.pendingBlocks.setInFlight(address, true)
self.pendingBlocks.decRetries(address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is convenient to do it here, but I never like having side effects in filter... it also feels a bit out of context here when you read the code. We could add setInFlight that takes list of addresses. The same could be done for self.pendingBlocks.decRetries. We could also consider if self.pendingBlocks.setInFlight(address, true) should be part of sendWantBlock (and then self.self.pendingBlocks.setInFlight(address, false) to be inside sendWantHave (but yeah, also said in other comment earlier that I like those two to be as focused as possible (still as long as they do not filter our blocks I am kind of fine with it).

Of course it is a bit faster to do it here, but , that could be a bit premature optimization....

)
)
).filterIt(it.failed)
peerCtx.cleanPresence(addrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And not peerCtx.cleanPresence(blocks)?

price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)

if e.cancel:
trace "Received cancelation for untracked block, skipping", address = e.address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trace "Received cancelation for untracked block, skipping", address = e.address
trace "Received cancellation for untracked block, skipping", address = e.address

# it doesn't mean that we're never talking to this peer again.
# TODO: we need a lot more work around peer selection and
# prioritization
proc downloadInternal(
Copy link
Contributor

@marcinczenko marcinczenko Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought. I am wondering if downloadInternal does not better belong to pendingblocks. It would perhaps drag with it other things that should be there. For example RetriesExhaustedError is defined in pendingblocks, yet never used there but here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants