-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic retry functionality #1119
base: master
Are you sure you want to change the base?
Conversation
74d689c
to
1c98b31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have started the review, and I will be working on by adding more comments today and over next days if possible (will try to look up a couple of things over the weekend as well). No need to wait for me, if someone is faster. My objective is to have some observations confirmed and some questions answered. Because I have documented the whole flow recently, I just want to take advantage of this more substantial change to improve the docs and perhaps acting as sounding board...
let peerCtx = self.peers.get(peer) | ||
|
||
if peerCtx != nil: | ||
await b.payForBlocks(peerCtx, blocksDelivery) | ||
await self.payForBlocks(peerCtx, blocksDelivery) | ||
## shouldn't we remove them from the want-list instead of this: | ||
peerCtx.cleanPresence(blocksDelivery.mapIt(it.address)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just above the highlighted lines we are calling resolveBlocks
. resolveBlocks
will call cancelBlocks
which will take care for removing the relevant presence entries, right?
If this is the case, perhaps, we could move the payment there as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great first pass. I can see the improvements in the benchmarks.
$(NPH) *.nim | ||
$(NPH) codex/ | ||
$(NPH) tests/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ehehe good easter egg... 😃
@@ -143,8 +143,6 @@ proc start*(b: DiscoveryEngine) {.async.} = | |||
asyncSpawn fut | |||
|
|||
b.discoveryLoop = b.discoveryQueueLoop() | |||
b.trackedFutures.track(b.discoveryLoop) | |||
asyncSpawn b.discoveryLoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... why are you excluding this? This will cause the discovery loop not to stop when someone calls .stop()
on the discovery service. Or rather, it will stop cause Codex will exit anyway, but won't receive a cancel signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not really - this doesn't affect stopping since we call cancel the tracked futures on stop. The asyncSpawn
call tries to do some logging and re-throw any future error as a Defect, but that was really a workaround because we lacked checked exceptions on async code. I do intend to make tracked futures only accept Future.Raises([])
for that matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait but you're not adding the discoveryLoop future to the trackedFutures, so that one won't stop.
self.blockexcRunning = true | ||
for i in 0 ..< self.concurrentTasks: | ||
let fut = self.blockexcTaskRunner() | ||
self.trackedFutures.track(fut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I guess you don't need asyncspawn here cause we're already keeping a ref to the future in trackedFutures?
warn "Stopping blockexc without starting it" | ||
return | ||
|
||
b.blockexcRunning = false | ||
await b.trackedFutures.cancelTracked() | ||
self.blockexcRunning = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eheh good catch...
except CatchableError as exc: | ||
warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId | ||
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = | ||
Rng.instance.sample(peers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that's much better. The previous approach was not random at all, you'd keep pounding the same peer if you're trying to fetch the same block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that was quite annoying when I realized what was going on.
|
||
logScope: | ||
topics = "codex blockexcnetwork" | ||
|
||
const | ||
Codec* = "/codex/blockexc/1.0.0" | ||
MaxInflight* = 100 | ||
DefaultMaxInflight* = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, Deluge allows 1500 pending block requests per peer in its most aggressive profiles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know, we can definitely adjust.
## Perform protocol initialization | ||
## | ||
|
||
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = | ||
if event.kind == PeerEventKind.Joined: | ||
b.setupPeer(peerId) | ||
self.setupPeer(peerId) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... I see there are three PeerEventKind
s:
PeerEventKind* {.pure.} = enum
Left
Joined
Identified
as it is now, if we get Indentified
, we'll drop the peer. I'm assuming you're aware of this but pointing it out anyhow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, not really, thats a new event 🤔. Good catch!
@@ -251,15 +251,15 @@ type | |||
desc: | |||
"Time interval in seconds - determines frequency of block " & | |||
"maintenance cycle: how often blocks are checked " & "for expiration and cleanup", | |||
defaultValue: DefaultBlockMaintenanceInterval, | |||
defaultValueDesc: $DefaultBlockMaintenanceInterval, | |||
defaultValue: DefaultBlockInterval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is improving what this constant contains...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SuperLongSelfDescriptingValueIdentifierSuperConstant
is quite annoying tho 😁. Matter of taste, but I can live with it if we find it useful.
|
||
if err =? (await erasureJob()).errorOption: | ||
return failure(err) | ||
self.trackedFutures.track(prefetch()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see why you're doing this, but now streamEntireDataset
will effectively continue downloading even if the client stops streaming. Anyhow, I guess we're gonna drop streaming soon enough anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why you think that streaming is going to be dropped? Otherwise, good observation.
raise newException(RngSampleError, "Limit cannot be larger than sample!") | ||
|
||
for _ in 0 ..< min(sample.len, limit): | ||
result.add(rng.sample(sample, result)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sample procedures aren't great, I'm going to do a PR on rng.nim
.
@@ -143,8 +143,6 @@ proc start*(b: DiscoveryEngine) {.async.} = | |||
asyncSpawn fut | |||
|
|||
b.discoveryLoop = b.discoveryQueueLoop() | |||
b.trackedFutures.track(b.discoveryLoop) | |||
asyncSpawn b.discoveryLoop | |||
|
|||
proc stop*(b: DiscoveryEngine) {.async.} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are changing b
=> self
all over the places, maybe you want to do it here as well?
|
||
trace "NetworkStore stopped" | ||
|
||
proc sendWantHave( | ||
b: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] | ||
self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] | ||
): Future[void] {.async.} = | ||
for p in peers: | ||
let toAsk = addresses.filterIt(it notin p.peerHave) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you did not touch that line, but it would be cleaner to have sendWantHave
not to do any filtering (I mean, this should happen elsewhere, where it is really relevant). Especially that sendWantBlock
is nice and clean without side-effected (the caller expects the request to be just sent)... Maybe a small separate PR...
self: BlockExcEngine, address: BlockAddress | ||
): Future[?!Block] {.async: (raises: [CancelledError]).} = | ||
if address notin self.pendingBlocks: | ||
self.trackedFutures.track(self.downloadInternal(address)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, what are we really tracking here in:
self.trackedFutures.track(self.downloadInternal(address))
Which future will the downloadInternal
return? If I read it well it will be the result of await (handle or sleepAsync(self.pendingBlocks.retryInterval))
, which returns a new future which will complete once either handle
or sleepAsync
complete, which could well be sleepAsync
. I think we always want to track handle
.
if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and | ||
not self.pendingBlocks.isInFlight(address): | ||
self.pendingBlocks.setInFlight(address, true) | ||
self.pendingBlocks.decRetries(address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is convenient to do it here, but I never like having side effects in filter
... it also feels a bit out of context here when you read the code. We could add setInFlight
that takes list of addresses. The same could be done for self.pendingBlocks.decRetries
. We could also consider if self.pendingBlocks.setInFlight(address, true)
should be part of sendWantBlock
(and then self.self.pendingBlocks.setInFlight(address, false)
to be inside sendWantHave
(but yeah, also said in other comment earlier that I like those two to be as focused as possible (still as long as they do not filter our blocks I am kind of fine with it).
Of course it is a bit faster to do it here, but , that could be a bit premature optimization....
) | ||
) | ||
).filterIt(it.failed) | ||
peerCtx.cleanPresence(addrs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And not peerCtx.cleanPresence(blocks)
?
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) | ||
|
||
if e.cancel: | ||
trace "Received cancelation for untracked block, skipping", address = e.address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace "Received cancelation for untracked block, skipping", address = e.address | |
trace "Received cancellation for untracked block, skipping", address = e.address |
# it doesn't mean that we're never talking to this peer again. | ||
# TODO: we need a lot more work around peer selection and | ||
# prioritization | ||
proc downloadInternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought. I am wondering if downloadInternal
does not better belong to pendingblocks
. It would perhaps drag with it other things that should be there. For example RetriesExhaustedError
is defined in pendingblocks
, yet never used there but here.
No description provided.