From 3f030e3dc1148e43d65c17af4eb63754de1cf1b4 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:36:08 -0600 Subject: [PATCH 01/29] adding basic retry functionality --- codex/blockexchange/engine/engine.nim | 110 +++++++++++++++----------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index d30f88d9e..03478f591 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -158,59 +158,81 @@ proc sendWantBlock( ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() -proc monitorBlockHandle( - b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId -) {.async.} = - try: - discard await handle - except CancelledError as exc: - trace "Block handle cancelled", address, peerId - except CatchableError as exc: - warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId +proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + Rng.instance.sample(peers) - # TODO: really, this is just a quick and dirty way of - # preventing hitting the same "bad" peer every time, however, - # we might as well discover this on or next iteration, so - # it doesn't mean that we're never talking to this peer again. - # TODO: we need a lot more work around peer selection and - # prioritization +proc downloadInternal( + self: BlockExcEngine, address: BlockAddress +) {.async: (raises: []).} = + logScope: + address = address - # drop unresponsive peer - await b.network.switch.disconnect(peerId) - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - -proc pickPseudoRandom( - address: BlockAddress, peers: seq[BlockExcPeerCtx] -): BlockExcPeerCtx = - return peers[hash(address) mod peers.len] - -proc requestBlock*( - b: BlockExcEngine, address: BlockAddress -): Future[?!Block] {.async.} = - let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) + let handle = self.pendingBlocks.getWantHandle(address) + trace "Downloading block" + try: + while address in self.pendingBlocks: + logScope: + retries = self.pendingBlocks.retries(address) + interval = self.pendingBlocks.retryInterval + + if self.pendingBlocks.retriesExhausted(address): + trace "Error retries exhausted" + handle.fail(newException(RetriesExhaustedError, "Error retries exhausted")) + break + + trace "Running retry handle" + let peers = self.peers.getPeersForBlock(address) + logScope: + peersWith = peers.with.len + peersWithout = peers.without.len + + trace "Peers for block" + if peers.with.len > 0: + self.pendingBlocks.setInFlight(address, true) + await self.sendWantBlock(@[address], peers.with.randomPeer) + else: + self.pendingBlocks.setInFlight(address, false) + if peers.without.len > 0: + await self.sendWantHave(@[address], peers.without) + self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - if not b.pendingBlocks.isInFlight(address): - let peers = b.peers.getPeersForBlock(address) + await (handle or sleepAsync(self.pendingBlocks.retryInterval)) + self.pendingBlocks.decRetries(address) - if peers.with.len == 0: - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - else: - let selected = pickPseudoRandom(address, peers.with) - asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) - b.pendingBlocks.setInFlight(address) - await b.sendWantBlock(@[address], selected) + if handle.finished: + trace "Handle for block finished", failed = handle.failed + break + except CancelledError as exc: + trace "Block download cancelled" + if not handle.finished: + await handle.cancelAndWait() + except CatchableError as exc: + warn "Error downloadloading block", exc = exc.msg + if not handle.finished: + handle.fail(exc) + finally: + self.pendingBlocks.setInFlight(address, false) - await b.sendWantHave(@[address], peers.without) +proc requestBlock*( + self: BlockExcEngine, address: BlockAddress +): Future[?!Block] {.async: (raises: [CancelledError]).} = + if address notin self.pendingBlocks: + self.trackedFutures.track(self.downloadInternal(address)) - # Don't let timeouts bubble up. We can't be too broad here or we break - # cancellations. try: - success await blockFuture - except AsyncTimeoutError as err: + let handle = self.pendingBlocks.getWantHandle(address) + success await handle + except CancelledError as err: + warn "Block request cancelled", address + raise err + except CatchableError as err: + error "Block request failed", address, err = err.msg failure err -proc requestBlock*(b: BlockExcEngine, cid: Cid): Future[?!Block] = - b.requestBlock(BlockAddress.init(cid)) +proc requestBlock*( + self: BlockExcEngine, cid: Cid +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = + self.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] From 25c0c164b36ca8de248a29c7f87c56937ac49b8d Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:38:50 -0600 Subject: [PATCH 02/29] avoid duplicate requests and batch them --- codex/blockexchange/engine/engine.nim | 31 ++++++++++++++------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 03478f591..af6fc1e2f 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -235,11 +235,12 @@ proc requestBlock*( self.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( - b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] + self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] ) {.async.} = + trace "Received block presence from peer", peer, blocks = blocks.mapIt($it) let - peerCtx = b.peers.get(peer) - wantList = toSeq(b.pendingBlocks.wantList) + peerCtx = self.peers.get(peer) + ourWantList = toSeq(self.pendingBlocks.wantList) if peerCtx.isNil: return @@ -250,23 +251,23 @@ proc blockPresenceHandler*( let peerHave = peerCtx.peerHave - dontWantCids = peerHave.filterIt(it notin wantList) + dontWantCids = peerHave.filterIt(it notin ourWantList) if dontWantCids.len > 0: peerCtx.cleanPresence(dontWantCids) - let wantCids = wantList.filterIt(it in peerHave) - - if wantCids.len > 0: - trace "Peer has blocks in our wantList", peer, wants = wantCids - await b.sendWantBlock(wantCids, peerCtx) + let ourWantCids = ourWantList.filter do(address: BlockAddress) -> bool: + if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and + not self.pendingBlocks.isInFlight(address): + self.pendingBlocks.setInFlight(address, true) + self.pendingBlocks.decRetries(address) + true + else: + false - # if none of the connected peers report our wants in their have list, - # fire up discovery - b.discovery.queueFindBlocksReq( - toSeq(b.pendingBlocks.wantListCids).filter do(cid: Cid) -> bool: - not b.peers.anyIt(cid in it.peerHaveCids) - ) + if ourWantCids.len > 0: + trace "Peer has blocks in our wantList", peer, wants = ourWantCids + await self.sendWantBlock(ourWantCids, peerCtx) proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = let cids = blocksDelivery.mapIt(it.blk.cid) From 62dc92ca80e8d12a7f2f5edc4a6786a31f3d7fe6 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:39:25 -0600 Subject: [PATCH 03/29] fix cancelling blocks --- codex/blockexchange/engine/engine.nim | 35 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index af6fc1e2f..ff708ce74 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -286,26 +286,37 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn break # do next peer -proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = +proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = ## Tells neighboring peers that we're no longer interested in a block. + ## + + if self.peers.len == 0: + return + trace "Sending block request cancellations to peers", - addrs, peers = b.peers.mapIt($it.id) + addrs, peers = self.peers.peerIds + + proc mapPeers(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} = + let blocks = addrs.filter do(a: BlockAddress) -> bool: + a in peerCtx.blocks - let failed = ( - await allFinished( - b.peers.mapIt( - b.network.request.sendWantCancellations(peer = it.id, addresses = addrs) + if blocks.len > 0: + trace "Sending block request cancellations to peer", peer = peerCtx.id, blocks + await self.network.request.sendWantCancellations( + peer = peerCtx.id, addresses = blocks ) - ) - ).filterIt(it.failed) + peerCtx.cleanPresence(addrs) + peerCtx + + let failed = (await allFinished(map(toSeq(self.peers.peers.values), mapPeers))).filterIt( + it.failed + ) if failed.len > 0: warn "Failed to send block request cancellations to peers", peers = failed.len + else: + trace "Block request cancellations sent to peers", peers = self.peers.len -proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = - b.pendingBlocks.resolve(blocksDelivery) - await b.scheduleTasks(blocksDelivery) - await b.cancelBlocks(blocksDelivery.mapIt(it.address)) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks( From 94926adab4c96b7c2f241cf74626b93a63d0c608 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:40:11 -0600 Subject: [PATCH 04/29] properly resolve blocks --- codex/blockexchange/engine/engine.nim | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index ff708ce74..b0b263791 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -317,9 +317,15 @@ proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = else: trace "Block request cancellations sent to peers", peers = self.peers.len +proc resolveBlocks*( + self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] +) {.async.} = + self.pendingBlocks.resolve(blocksDelivery) + await self.scheduleTasks(blocksDelivery) + await self.cancelBlocks(blocksDelivery.mapIt(it.address)) -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = - await b.resolveBlocks( +proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} = + await self.resolveBlocks( blocks.mapIt( BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)) ) From d399654496ee47368918779a5adc895c4a0d5540 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:40:40 -0600 Subject: [PATCH 05/29] minor cleanup - use `self` --- codex/blockexchange/engine/engine.nim | 190 +++++++++++++------------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index b0b263791..dafdd5200 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -19,6 +19,7 @@ import pkg/metrics import pkg/stint import pkg/questionable +import ../../rng import ../../stores/blockstore import ../../blocktype import ../../utils @@ -67,12 +68,6 @@ const DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 - # DefaultMaxRetries = 3 - # DefaultConcurrentDiscRequests = 10 - # DefaultConcurrentAdvertRequests = 10 - # DefaultDiscoveryTimeout = 1.minutes - # DefaultMaxQueriedBlocksCache = 1000 - # DefaultMinPeersPerBlock = 3 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} @@ -88,10 +83,8 @@ type trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved - peersPerRequest: int # Max number of peers to request from wallet*: WalletRef # Nitro wallet for micropayments pricing*: ?Pricing # Optional bandwidth pricing - blockFetchTimeout*: Duration # Timeout for fetching blocks over the network discovery*: DiscoveryEngine advertiser*: Advertiser @@ -100,60 +93,60 @@ type price*: UInt256 # attach task scheduler to engine -proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = - b.taskQueue.pushOrUpdateNoWait(task).isOk() +proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = + self.taskQueue.pushOrUpdateNoWait(task).isOk() -proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} +proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} -proc start*(b: BlockExcEngine) {.async.} = +proc start*(self: BlockExcEngine) {.async.} = ## Start the blockexc task ## - await b.discovery.start() - await b.advertiser.start() + await self.discovery.start() + await self.advertiser.start() - trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks - if b.blockexcRunning: + trace "Blockexc starting with concurrent tasks", tasks = self.concurrentTasks + if self.blockexcRunning: warn "Starting blockexc twice" return - b.blockexcRunning = true - for i in 0 ..< b.concurrentTasks: - let fut = b.blockexcTaskRunner() - b.trackedFutures.track(fut) - asyncSpawn fut + self.blockexcRunning = true + for i in 0 ..< self.concurrentTasks: + let fut = self.blockexcTaskRunner() + self.trackedFutures.track(fut) -proc stop*(b: BlockExcEngine) {.async.} = +proc stop*(self: BlockExcEngine) {.async.} = ## Stop the blockexc blockexc ## - await b.discovery.stop() - await b.advertiser.stop() + await self.trackedFutures.cancelTracked() + await self.network.stop() + await self.discovery.stop() + await self.advertiser.stop() trace "NetworkStore stop" - if not b.blockexcRunning: + if not self.blockexcRunning: warn "Stopping blockexc without starting it" return - b.blockexcRunning = false - await b.trackedFutures.cancelTracked() + self.blockexcRunning = false trace "NetworkStore stopped" proc sendWantHave( - b: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] + self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] ): Future[void] {.async.} = for p in peers: let toAsk = addresses.filterIt(it notin p.peerHave) trace "Sending wantHave request", toAsk, peer = p.id - await b.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave) + await self.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave) codex_block_exchange_want_have_lists_sent.inc() proc sendWantBlock( - b: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx + self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx ): Future[void] {.async.} = trace "Sending wantBlock request to", addresses, peer = blockPeer.id - await b.network.request.sendWantList( + await self.network.request.sendWantList( blockPeer.id, addresses, wantType = WantType.WantBlock ) # we want this remote to send us a block codex_block_exchange_want_block_lists_sent.inc() @@ -269,17 +262,17 @@ proc blockPresenceHandler*( trace "Peer has blocks in our wantList", peer, wants = ourWantCids await self.sendWantBlock(ourWantCids, peerCtx) -proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = +proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = let cids = blocksDelivery.mapIt(it.blk.cid) # schedule any new peers to provide blocks to - for p in b.peers: + for p in self.peers: for c in cids: # for each cid # schedule a peer if it wants at least one cid # and we have it in our local store if c in p.peerWantsCids: - if await (c in b.localStore): - if b.scheduleTask(p): + if await (c in self.localStore): + if self.scheduleTask(p): trace "Task scheduled for peer", peer = p.id else: warn "Unable to schedule task for peer", peer = p.id @@ -332,18 +325,18 @@ proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} = ) proc payForBlocks( - engine: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] + self: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] ) {.async.} = let - sendPayment = engine.network.request.sendPayment + sendPayment = self.network.request.sendPayment price = peer.price(blocksDelivery.mapIt(it.address)) - if payment =? engine.wallet.pay(peer, price): + if payment =? self.wallet.pay(peer, price): trace "Sending payment for blocks", price, len = blocksDelivery.len await sendPayment(peer.id, payment) -proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void = - if bd.address notin b.pendingBlocks: +proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void = + if bd.address notin self.pendingBlocks: return failure("Received block is not currently a pending block") if bd.address.leaf: @@ -373,7 +366,7 @@ proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void = return success() proc blocksDeliveryHandler*( - b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] + self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] ) {.async.} = trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) @@ -383,11 +376,11 @@ proc blocksDeliveryHandler*( peer = peer address = bd.address - if err =? b.validateBlockDelivery(bd).errorOption: + if err =? self.validateBlockDelivery(bd).errorOption: warn "Block validation failed", msg = err.msg continue - if err =? (await b.localStore.putBlock(bd.blk)).errorOption: + if err =? (await self.localStore.putBlock(bd.blk)).errorOption: error "Unable to store block", err = err.msg continue @@ -396,7 +389,7 @@ proc blocksDeliveryHandler*( error "Proof expected for a leaf block delivery" continue if err =? ( - await b.localStore.putCidAndProof( + await self.localStore.putCidAndProof( bd.address.treeCid, bd.address.index, bd.blk.cid, proof ) ).errorOption: @@ -405,18 +398,22 @@ proc blocksDeliveryHandler*( validatedBlocksDelivery.add(bd) - await b.resolveBlocks(validatedBlocksDelivery) + await self.resolveBlocks(validatedBlocksDelivery) codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) - let peerCtx = b.peers.get(peer) + let peerCtx = self.peers.get(peer) if peerCtx != nil: - await b.payForBlocks(peerCtx, blocksDelivery) + await self.payForBlocks(peerCtx, blocksDelivery) ## shouldn't we remove them from the want-list instead of this: peerCtx.cleanPresence(blocksDelivery.mapIt(it.address)) -proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} = - let peerCtx = b.peers.get(peer) +proc wantListHandler*( + self: BlockExcEngine, peer: PeerId, wantList: WantList +) {.async.} = + trace "Received want list from peer", peer, wantList = wantList.entries.len + + let peerCtx = self.peers.get(peer) if peerCtx.isNil: return @@ -435,9 +432,14 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy if idx < 0: # Adding new entry to peer wants let - have = await e.address in b.localStore - price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) + have = await e.address in self.localStore + price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) + + if e.cancel: + trace "Received cancelation for untracked block, skipping", address = e.address + continue + trace "Processing want list entry", wantList = $e case e.wantType of WantType.WantHave: if have: @@ -453,7 +455,6 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy address: e.address, `type`: BlockPresenceType.DontHave, price: price ) ) - peerCtx.peerWants.add(e) codex_block_exchange_want_have_lists_received.inc() of WantType.WantBlock: @@ -465,73 +466,76 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy if e.cancel: trace "Canceling want for block", address = e.address peerCtx.peerWants.del(idx) + trace "Canceled block request", address = e.address, len = peerCtx.peerWants.len else: + if e.wantType == WantType.WantBlock: + schedulePeer = true # peer might want to ask for the same cid with # different want params trace "Updating want for block", address = e.address peerCtx.peerWants[idx] = e # update entry + trace "Updated block request", address = e.address, len = peerCtx.peerWants.len if presence.len > 0: trace "Sending presence to remote", items = presence.mapIt($it).join(",") - await b.network.request.sendPresence(peer, presence) + await self.network.request.sendPresence(peer, presence) - if schedulePeer: - if not b.scheduleTask(peerCtx): - warn "Unable to schedule task for peer", peer + if schedulePeer and not self.scheduleTask(peerCtx): + warn "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerId, account: Account) {.async.} = - let context = engine.peers.get(peer) +proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.async.} = + let context = self.peers.get(peer) if context.isNil: return context.account = account.some proc paymentHandler*( - engine: BlockExcEngine, peer: PeerId, payment: SignedState + self: BlockExcEngine, peer: PeerId, payment: SignedState ) {.async.} = trace "Handling payments", peer - without context =? engine.peers.get(peer).option and account =? context.account: + without context =? self.peers.get(peer).option and account =? context.account: trace "No context or account for peer", peer return if channel =? context.paymentChannel: let sender = account.address - discard engine.wallet.acceptPayment(channel, Asset, sender, payment) + discard self.wallet.acceptPayment(channel, Asset, sender, payment) else: - context.paymentChannel = engine.wallet.acceptChannel(payment).option + context.paymentChannel = self.wallet.acceptChannel(payment).option -proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = +proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} = ## Perform initial setup, such as want ## list exchange ## trace "Setting up peer", peer - if peer notin b.peers: + if peer notin self.peers: trace "Setting up new peer", peer - b.peers.add(BlockExcPeerCtx(id: peer)) - trace "Added peer", peers = b.peers.len + self.peers.add(BlockExcPeerCtx(id: peer)) + trace "Added peer", peers = self.peers.len # broadcast our want list, the other peer will do the same - if b.pendingBlocks.wantListLen > 0: + if self.pendingBlocks.wantListLen > 0: trace "Sending our want list to a peer", peer - let cids = toSeq(b.pendingBlocks.wantList) - await b.network.request.sendWantList(peer, cids, full = true) + let cids = toSeq(self.pendingBlocks.wantList) + await self.network.request.sendWantList(peer, cids, full = true) - if address =? b.pricing .? address: - await b.network.request.sendAccount(peer, Account(address: address)) + if address =? self.pricing .? address: + await self.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(b: BlockExcEngine, peer: PeerId) = +proc dropPeer*(self: BlockExcEngine, peer: PeerId) = ## Cleanup disconnected peer ## trace "Dropping peer", peer # drop the peer from the peers table - b.peers.remove(peer) + self.peers.remove(peer) -proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = +proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # Send to the peer blocks he wants to get, # if they present in our local store @@ -554,14 +558,14 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = if e.address.leaf: - (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( + (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( (blkAndProof: (Block, CodexProof)) => BlockDelivery( address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some ) ) else: - (await b.localStore.getBlock(e.address)).map( + (await self.localStore.getBlock(e.address)).map( (blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none) ) @@ -580,22 +584,22 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = if blocksDelivery.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) - await b.network.request.sendBlocksDelivery(task.id, blocksDelivery) + await self.network.request.sendBlocksDelivery(task.id, blocksDelivery) codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) task.peerWants.keepItIf(it.address notin successAddresses) -proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} = +proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = ## process tasks ## trace "Starting blockexc task runner" - while b.blockexcRunning: + while self.blockexcRunning: try: - let peerCtx = await b.taskQueue.pop() + let peerCtx = await self.taskQueue.pop() - await b.taskHandler(peerCtx) + await self.taskHandler(peerCtx) except CancelledError: break # do not propagate as blockexcTaskRunner was asyncSpawned except CatchableError as e: @@ -613,55 +617,51 @@ proc new*( peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, - peersPerRequest = DefaultMaxPeersPerRequest, - blockFetchTimeout = DefaultBlockTimeout, ): BlockExcEngine = ## Create new block exchange engine instance ## - let engine = BlockExcEngine( + let self = BlockExcEngine( localStore: localStore, peers: peerStore, pendingBlocks: pendingBlocks, - peersPerRequest: peersPerRequest, network: network, wallet: wallet, concurrentTasks: concurrentTasks, - trackedFutures: TrackedFutures.new(), + trackedFutures: TrackedFutures(), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, advertiser: advertiser, - blockFetchTimeout: blockFetchTimeout, ) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - await engine.setupPeer(peerId) + await self.setupPeer(peerId) else: - engine.dropPeer(peerId) + self.dropPeer(peerId) if not isNil(network.switch): network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} = - engine.wantListHandler(peer, wantList) + self.wantListHandler(peer, wantList) proc blockPresenceHandler( peer: PeerId, presence: seq[BlockPresence] ): Future[void] {.gcsafe.} = - engine.blockPresenceHandler(peer, presence) + self.blockPresenceHandler(peer, presence) proc blocksDeliveryHandler( peer: PeerId, blocksDelivery: seq[BlockDelivery] ): Future[void] {.gcsafe.} = - engine.blocksDeliveryHandler(peer, blocksDelivery) + self.blocksDeliveryHandler(peer, blocksDelivery) proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = - engine.accountHandler(peer, account) + self.accountHandler(peer, account) proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = - engine.paymentHandler(peer, payment) + self.paymentHandler(peer, payment) network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, @@ -671,4 +671,4 @@ proc new*( onPayment: paymentHandler, ) - return engine + return self From cce0b17787053d2140081f2ca5f774f999c47399 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:40:56 -0600 Subject: [PATCH 06/29] avoid useless asyncSpawn --- codex/blockexchange/engine/discovery.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index ba773ac56..59b51f02a 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -143,8 +143,6 @@ proc start*(b: DiscoveryEngine) {.async.} = asyncSpawn fut b.discoveryLoop = b.discoveryQueueLoop() - b.trackedFutures.track(b.discoveryLoop) - asyncSpawn b.discoveryLoop proc stop*(b: DiscoveryEngine) {.async.} = ## Stop the discovery engine From 5507e88b32d3743e9680db32356dfea76e1e0f3c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:41:21 -0600 Subject: [PATCH 07/29] track retries --- codex/blockexchange/engine/pendingblocks.nim | 147 +++++++++++-------- 1 file changed, 86 insertions(+), 61 deletions(-) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 3b69e2d2a..090811425 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -7,13 +7,11 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/tables import std/monotimes - -import pkg/upraises - -push: - {.upraises: [].} +import std/strutils import pkg/chronos import pkg/libp2p @@ -34,66 +32,75 @@ declareGauge( codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us" ) -const DefaultBlockTimeout* = 10.minutes +const + DefaultBlockRetries* = 3000 + DefaultRetryInterval* = 500.millis type + RetriesExhaustedError* = object of CatchableError + BlockReq* = object - handle*: Future[Block] + handle*: Future[Block].Raising([CancelledError, RetriesExhaustedError]) inFlight*: bool + blockRetries*: int startTime*: int64 PendingBlocksManager* = ref object of RootObj + blockRetries*: int = DefaultBlockRetries + retryInterval*: Duration = DefaultRetryInterval blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( - p: PendingBlocksManager, - address: BlockAddress, - timeout = DefaultBlockTimeout, - inFlight = false, -): Future[Block] {.async.} = + self: PendingBlocksManager, address: BlockAddress, inFlight = false +): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = ## Add an event for a block ## - try: - if address notin p.blocks: - p.blocks[address] = BlockReq( - handle: newFuture[Block]("pendingBlocks.getWantHandle"), - inFlight: inFlight, - startTime: getMonoTime().ticks, - ) - - p.updatePendingBlockGauge() - return await p.blocks[address].handle.wait(timeout) - except CancelledError as exc: - trace "Blocks cancelled", exc = exc.msg, address - raise exc - except CatchableError as exc: - error "Pending WANT failed or expired", exc = exc.msg - # no need to cancel, it is already cancelled by wait() - raise exc - finally: - p.blocks.del(address) - p.updatePendingBlockGauge() + self.blocks.withValue(address, blk): + return blk[].handle + do: + let blk = BlockReq( + handle: newFuture[Block]("pendingBlocks.getWantHandle"), + inFlight: inFlight, + blockRetries: self.blockRetries, + startTime: getMonoTime().ticks, + ) + self.blocks[address] = blk + let handle = blk.handle + + proc cleanUpBlock(data: pointer) {.raises: [].} = + self.blocks.del(address) + self.updatePendingBlockGauge() + + handle.addCallback(cleanUpBlock) + handle.cancelCallback = proc(data: pointer) {.raises: [].} = + if not handle.finished: + handle.removeCallback(cleanUpBlock) + cleanUpBlock(nil) + + self.updatePendingBlockGauge() + return handle proc getWantHandle*( - p: PendingBlocksManager, cid: Cid, timeout = DefaultBlockTimeout, inFlight = false -): Future[Block] = - p.getWantHandle(BlockAddress.init(cid), timeout, inFlight) + self: PendingBlocksManager, cid: Cid, inFlight = false +): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = + self.getWantHandle(BlockAddress.init(cid), inFlight) proc resolve*( - p: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] + self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] ) {.gcsafe, raises: [].} = ## Resolve pending blocks ## for bd in blocksDelivery: - p.blocks.withValue(bd.address, blockReq): - if not blockReq.handle.finished: + self.blocks.withValue(bd.address, blockReq): + if not blockReq[].handle.finished: + trace "Resolving pending block", address = bd.address let - startTime = blockReq.startTime + startTime = blockReq[].startTime stopTime = getMonoTime().ticks retrievalDurationUs = (stopTime - startTime) div 1000 @@ -106,52 +113,70 @@ proc resolve*( else: trace "Block handle already finished", address = bd.address -proc setInFlight*(p: PendingBlocksManager, address: BlockAddress, inFlight = true) = +func retries*(self: PendingBlocksManager, address: BlockAddress): int = + self.blocks.withValue(address, pending): + result = pending[].blockRetries + do: + result = 0 + +func decRetries*(self: PendingBlocksManager, address: BlockAddress) = + self.blocks.withValue(address, pending): + pending[].blockRetries -= 1 + +func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool = + self.blocks.withValue(address, pending): + result = pending[].blockRetries <= 0 + +func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) = ## Set inflight status for a block ## - p.blocks.withValue(address, pending): + self.blocks.withValue(address, pending): pending[].inFlight = inFlight -proc isInFlight*(p: PendingBlocksManager, address: BlockAddress): bool = +func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool = ## Check if a block is in flight ## - p.blocks.withValue(address, pending): + self.blocks.withValue(address, pending): result = pending[].inFlight -proc contains*(p: PendingBlocksManager, cid: Cid): bool = - BlockAddress.init(cid) in p.blocks +func contains*(self: PendingBlocksManager, cid: Cid): bool = + BlockAddress.init(cid) in self.blocks -proc contains*(p: PendingBlocksManager, address: BlockAddress): bool = - address in p.blocks +func contains*(self: PendingBlocksManager, address: BlockAddress): bool = + address in self.blocks -iterator wantList*(p: PendingBlocksManager): BlockAddress = - for a in p.blocks.keys: +iterator wantList*(self: PendingBlocksManager): BlockAddress = + for a in self.blocks.keys: yield a -iterator wantListBlockCids*(p: PendingBlocksManager): Cid = - for a in p.blocks.keys: +iterator wantListBlockCids*(self: PendingBlocksManager): Cid = + for a in self.blocks.keys: if not a.leaf: yield a.cid -iterator wantListCids*(p: PendingBlocksManager): Cid = +iterator wantListCids*(self: PendingBlocksManager): Cid = var yieldedCids = initHashSet[Cid]() - for a in p.blocks.keys: + for a in self.blocks.keys: let cid = a.cidOrTreeCid if cid notin yieldedCids: yieldedCids.incl(cid) yield cid -iterator wantHandles*(p: PendingBlocksManager): Future[Block] = - for v in p.blocks.values: +iterator wantHandles*(self: PendingBlocksManager): Future[Block] = + for v in self.blocks.values: yield v.handle -proc wantListLen*(p: PendingBlocksManager): int = - p.blocks.len +proc wantListLen*(self: PendingBlocksManager): int = + self.blocks.len -func len*(p: PendingBlocksManager): int = - p.blocks.len +func len*(self: PendingBlocksManager): int = + self.blocks.len -func new*(T: type PendingBlocksManager): PendingBlocksManager = - PendingBlocksManager() +func new*( + T: type PendingBlocksManager, + retries = DefaultBlockRetries, + interval = DefaultRetryInterval, +): PendingBlocksManager = + PendingBlocksManager(blockRetries: retries, retryInterval: interval) From 6b4e587ad1bbfd2a9ffc678b5aac020d9d403ec4 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:42:43 -0600 Subject: [PATCH 08/29] limit max inflight and set libp2p maxIncomingStreams --- codex/blockexchange/network/network.nim | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index ecb728901..ceec0d313 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -315,15 +315,20 @@ proc new*( T: type BlockExcNetwork, switch: Switch, connProvider: ConnProvider = nil, - maxInflight = MaxInflight, + maxInflight = DefaultMaxInflight, ): BlockExcNetwork = ## Create a new BlockExcNetwork instance ## let self = BlockExcNetwork( - switch: switch, getConn: connProvider, inflightSema: newAsyncSemaphore(maxInflight) + switch: switch, + getConn: connProvider, + inflightSema: newAsyncSemaphore(maxInflight), + maxInflight: maxInflight, ) + self.maxIncomingStreams = self.maxInflight + proc sendWantList( id: PeerId, cids: seq[BlockAddress], From 2a44d5d5d2d557f989f18d41ca1046fd89e5b3a6 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:43:05 -0600 Subject: [PATCH 09/29] cleanup --- codex/blockexchange/network/network.nim | 54 ++++++++++++++++--------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index ceec0d313..daf358de3 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -21,17 +21,18 @@ import ../../blocktype as bt import ../../logutils import ../protobuf/blockexc as pb import ../protobuf/payments +import ../../utils/trackedfutures import ./networkpeer -export network, payments +export networkpeer, payments logScope: topics = "codex blockexcnetwork" const Codec* = "/codex/blockexc/1.0.0" - MaxInflight* = 100 + DefaultMaxInflight* = 100 type WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} @@ -82,6 +83,8 @@ type request*: BlockExcRequest getConn: ConnProvider inflightSema: AsyncSemaphore + maxInflight: int = DefaultMaxInflight + trackedFutures*: TrackedFutures = TrackedFutures() proc peerId*(b: BlockExcNetwork): PeerId = ## Return peer id @@ -220,23 +223,25 @@ proc handlePayment( if not network.handlers.onPayment.isNil: await network.handlers.onPayment(peer.id, payment) -proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.raises: [].} = +proc rpcHandler( + b: BlockExcNetwork, peer: NetworkPeer, msg: Message +) {.async: (raises: [CatchableError]).} = ## handle rpc messages ## if msg.wantList.entries.len > 0: - asyncSpawn b.handleWantList(peer, msg.wantList) + b.trackedFutures.track(b.handleWantList(peer, msg.wantList)) if msg.payload.len > 0: - asyncSpawn b.handleBlocksDelivery(peer, msg.payload) + b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload)) if msg.blockPresences.len > 0: - asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) + b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences)) if account =? Account.init(msg.account): - asyncSpawn b.handleAccount(peer, account) + b.trackedFutures.track(b.handleAccount(peer, account)) if payment =? SignedState.init(msg.payment): - asyncSpawn b.handlePayment(peer, payment) + b.trackedFutures.track(b.handlePayment(peer, payment)) proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer @@ -247,6 +252,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = try: + trace "Getting new connection stream", peer return await b.switch.dial(peer, Codec) except CancelledError as error: raise error @@ -256,8 +262,10 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = if not isNil(b.getConn): getConn = b.getConn - let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async.} = - b.rpcHandler(p, msg) + let rpcHandler = proc( + p: NetworkPeer, msg: Message + ) {.async: (raises: [CatchableError]).} = + await b.rpcHandler(p, msg) # create new pubsub peer let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) @@ -282,34 +290,42 @@ proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = trace "Skipping dialing self", peer = peer.peerId return + if peer.peerId in b.peers: + trace "Already connected to peer", peer = peer.peerId + return + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = ## Cleanup disconnected peer ## + trace "Dropping peer", peer b.peers.del(peer) -method init*(b: BlockExcNetwork) = +method init*(self: BlockExcNetwork) = ## Perform protocol initialization ## proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: - b.setupPeer(peerId) + self.setupPeer(peerId) else: - b.dropPeer(peerId) + self.dropPeer(peerId) - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) + self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) + self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + proc handler(conn: Connection, proto: string) {.async.} = let peerId = conn.peerId - let blockexcPeer = b.getOrCreatePeer(peerId) + let blockexcPeer = self.getOrCreatePeer(peerId) await blockexcPeer.readLoop(conn) # attach read loop - b.handler = handle - b.codec = Codec + self.handler = handler + self.codec = Codec + +proc stop*(self: BlockExcNetwork) {.async: (raises: []).} = + await self.trackedFutures.cancelTracked() proc new*( T: type BlockExcNetwork, From 255edf87faa8a65f93b09ae0de4798b453d7f5c3 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:43:22 -0600 Subject: [PATCH 10/29] add basic yield in readLoop --- codex/blockexchange/network/networkpeer.nim | 29 ++++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 90c538ea0..4a100340a 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -22,39 +22,56 @@ import ../../logutils logScope: topics = "codex blockexcnetworkpeer" +const DefaultYieldInterval = 50.millis + type ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.} - RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} + RPCHandler* = proc( + peer: NetworkPeer, msg: Message + ): Future[void].Raising(CatchableError) {.gcsafe.} NetworkPeer* = ref object of RootObj id*: PeerId handler*: RPCHandler sendConn: Connection getConn: ConnProvider + yieldInterval*: Duration = DefaultYieldInterval proc connected*(b: NetworkPeer): bool = not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof) proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = if isNil(conn): + trace "No connection to read from", peer = b.id return + trace "Attaching read loop", peer = b.id, connId = conn.oid try: + var nextYield = Moment.now() + b.yieldInterval while not conn.atEof or not conn.closed: + if Moment.now() > nextYield: + nextYield = Moment.now() + b.yieldInterval + trace "Yielding in read loop", + peer = b.id, nextYield = nextYield, interval = b.yieldInterval + await sleepAsync(10.millis) + let data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() + trace "Received message", peer = b.id, connId = conn.oid await b.handler(b, msg) except CancelledError: trace "Read loop cancelled" except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg finally: + trace "Detaching read loop", peer = b.id, connId = conn.oid await conn.close() proc connect*(b: NetworkPeer): Future[Connection] {.async.} = if b.connected: + trace "Already connected", peer = b.id, connId = b.sendConn.oid return b.sendConn b.sendConn = await b.getConn() @@ -68,17 +85,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = warn "Unable to get send connection for peer message not sent", peer = b.id return + trace "Sending message", peer = b.id, connId = conn.oid await conn.writeLp(protobufEncode(msg)) -proc broadcast*(b: NetworkPeer, msg: Message) = - proc sendAwaiter() {.async.} = - try: - await b.send(msg) - except CatchableError as exc: - warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg - - asyncSpawn sendAwaiter() - func new*( T: type NetworkPeer, peer: PeerId, From 4cf466e8bcdce3543ad55cf0ad44ec3462a5aa79 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:43:38 -0600 Subject: [PATCH 11/29] use tuple instead of object --- codex/blockexchange/peers/peerctxstore.nim | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 7cf167b4e..739d92b5b 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -10,6 +10,7 @@ import std/sequtils import std/tables import std/algorithm +import std/sequtils import pkg/upraises @@ -33,9 +34,7 @@ type PeerCtxStore* = ref object of RootObj peers*: OrderedTable[PeerId, BlockExcPeerCtx] - PeersForBlock* = object of RootObj - with*: seq[BlockExcPeerCtx] - without*: seq[BlockExcPeerCtx] + PeersForBlock* = tuple[with: seq[BlockExcPeerCtx], without: seq[BlockExcPeerCtx]] iterator items*(self: PeerCtxStore): BlockExcPeerCtx = for p in self.peers.values: @@ -47,6 +46,9 @@ proc contains*(a: openArray[BlockExcPeerCtx], b: PeerId): bool = a.anyIt(it.id == b) +func peerIds*(self: PeerCtxStore): seq[PeerId] = + toSeq(self.peers.keys) + func contains*(self: PeerCtxStore, peerId: PeerId): bool = peerId in self.peers @@ -75,7 +77,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = - var res = PeersForBlock() + var res: PeersForBlock = (@[], @[]) for peer in self: if peer.peerHave.anyIt(it == address): res.with.add(peer) From 5611cf45964617911c82c6b88baf2e52b1bb2b12 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:46:15 -0600 Subject: [PATCH 12/29] cleanup imports and logs --- .gitignore | 2 ++ codex/rest/api.nim | 6 +++--- codex/slots/builder/builder.nim | 2 +- codex/utils/asyncspawn.nim | 10 ---------- codex/utils/natutils.nim | 3 ++- vendor/codex-contracts-eth | 2 +- vendor/nim-ethers | 2 +- vendor/nim-serde | 2 +- 8 files changed, 11 insertions(+), 18 deletions(-) delete mode 100644 codex/utils/asyncspawn.nim diff --git a/.gitignore b/.gitignore index 0e1f27dbe..f6292ddac 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,5 @@ docker/prometheus-data .DS_Store nim.cfg tests/integration/logs + +data/ diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 8ba1abaea..77807c7e1 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -13,8 +13,8 @@ push: {.upraises: [].} import std/sequtils -import mimetypes -import os +import std/mimetypes +import std/os import pkg/questionable import pkg/questionable/results @@ -120,7 +120,7 @@ proc retrieveCid( await resp.finish() codex_api_downloads.inc() except CatchableError as exc: - warn "Excepting streaming blocks", exc = exc.msg + warn "Error streaming blocks", exc = exc.msg resp.status = Http500 return await resp.sendBody("") finally: diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 74597ff1b..30332f1c0 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -189,7 +189,7 @@ proc getCellHashes*[T, H]( blkIdx = blkIdx pos = i - trace "Getting block CID for tree at index", index = blkIdx + trace "Getting block CID for tree at index" without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, err: error "Failed to get block CID for tree at index", err = err.msg diff --git a/codex/utils/asyncspawn.nim b/codex/utils/asyncspawn.nim deleted file mode 100644 index 95a9f0144..000000000 --- a/codex/utils/asyncspawn.nim +++ /dev/null @@ -1,10 +0,0 @@ -import pkg/chronos - -proc asyncSpawn*(future: Future[void], ignore: type CatchableError) = - proc ignoringError() {.async.} = - try: - await future - except ignore: - discard - - asyncSpawn ignoringError() diff --git a/codex/utils/natutils.nim b/codex/utils/natutils.nim index 43909588b..996d8dd01 100644 --- a/codex/utils/natutils.nim +++ b/codex/utils/natutils.nim @@ -1,6 +1,7 @@ {.push raises: [].} -import std/[tables, hashes], pkg/results, stew/shims/net as stewNet, chronos, chronicles +import + std/[tables, hashes], pkg/results, pkg/stew/shims/net as stewNet, chronos, chronicles import pkg/libp2p diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index ff82c26b3..e74d3397a 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit ff82c26b3669b52a09280c634141dace7f04659a +Subproject commit e74d3397a133eaf1eb95d9ce59f56747a7c8c30b diff --git a/vendor/nim-ethers b/vendor/nim-ethers index d2b11a865..1cfccb969 160000 --- a/vendor/nim-ethers +++ b/vendor/nim-ethers @@ -1 +1 @@ -Subproject commit d2b11a865796a55296027f8ffba68398035ad435 +Subproject commit 1cfccb9695fa47860bf7ef3d75da9019096a3933 diff --git a/vendor/nim-serde b/vendor/nim-serde index 69a7a0111..c82e85c62 160000 --- a/vendor/nim-serde +++ b/vendor/nim-serde @@ -1 +1 @@ -Subproject commit 69a7a0111addaa4aad885dd4bd7b5ee4684a06de +Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b From ad8340a5f4aac0b6518bfd0d2cdf5af1f67b91ce Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:46:27 -0600 Subject: [PATCH 13/29] increase defaults --- codex/stores/maintenance.nim | 4 ++-- codex/stores/repostore/types.nim | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index e7ce1bdfa..cced5da90 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -22,8 +22,8 @@ import ../logutils import ../systemclock const - DefaultBlockMaintenanceInterval* = 10.minutes - DefaultNumberOfBlocksToMaintainPerInterval* = 1000 + DefaultBlockInterval* = 10.minutes + DefaultNumBlocksPerInterval* = 1000 type BlockMaintainer* = ref object of RootObj repoStore: RepoStore diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 3d455d12c..42f528e94 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -21,8 +21,8 @@ import ../../systemclock import ../../units const - DefaultBlockTtl* = 24.hours - DefaultQuotaBytes* = 8.GiBs + DefaultBlockTtl* = 30.days + DefaultQuotaBytes* = 20.GiBs type QuotaNotEnoughError* = object of CodexError From 267158758d2b83799ef494ef92559a892423c006 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:46:40 -0600 Subject: [PATCH 14/29] wip --- codex/conf.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/codex/conf.nim b/codex/conf.nim index 2a859efb8..986a53d6f 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -51,8 +51,8 @@ export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig export ValidationGroups, MaxSlots export - DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, - DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize + DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, + DefaultRequestCacheSize type ThreadCount* = distinct Natural @@ -251,15 +251,15 @@ type desc: "Time interval in seconds - determines frequency of block " & "maintenance cycle: how often blocks are checked " & "for expiration and cleanup", - defaultValue: DefaultBlockMaintenanceInterval, - defaultValueDesc: $DefaultBlockMaintenanceInterval, + defaultValue: DefaultBlockInterval, + defaultValueDesc: $DefaultBlockInterval, name: "block-mi" .}: Duration blockMaintenanceNumberOfBlocks* {. desc: "Number of blocks to check every maintenance cycle", - defaultValue: DefaultNumberOfBlocksToMaintainPerInterval, - defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval, + defaultValue: DefaultNumBlocksPerInterval, + defaultValueDesc: $DefaultNumBlocksPerInterval, name: "block-mn" .}: int From f455022fefad4f60c1525ec95fbcbc4e4cce139a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:48:04 -0600 Subject: [PATCH 15/29] fix prefetch batching --- codex/node.nim | 60 +++++++++++++++++++++++------------ codex/stores/networkstore.nim | 8 +++++ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 062ec2ced..805d6d14a 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -45,13 +45,14 @@ import ./utils import ./errors import ./logutils import ./utils/asynciter +import ./utils/trackedfutures export logutils logScope: topics = "codex node" -const FetchBatch = 200 +const DefaultFetchBatch = 10 type Contracts* = @@ -72,6 +73,7 @@ type clock*: Clock storage*: Contracts taskpool: Taskpool + trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -163,7 +165,7 @@ proc fetchBatched*( self: CodexNodeRef, cid: Cid, iter: Iter[int], - batchSize = FetchBatch, + batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, ): Future[?!void] {.async, gcsafe.} = ## Fetch blocks in batches of `batchSize` @@ -179,7 +181,9 @@ proc fetchBatched*( let blocks = collect: for i in 0 ..< batchSize: if not iter.finished: - self.networkStore.getBlock(BlockAddress.init(cid, iter.next())) + let address = BlockAddress.init(cid, iter.next()) + if not await address in self.networkStore: + self.networkStore.getBlock(BlockAddress.init(cid, iter.next())) if blocksErr =? (await allFutureResult(blocks)).errorOption: return failure(blocksErr) @@ -188,18 +192,21 @@ proc fetchBatched*( batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption: return failure(batchErr) + await sleepAsync(1.millis) + success() proc fetchBatched*( self: CodexNodeRef, manifest: Manifest, - batchSize = FetchBatch, + batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, ): Future[?!void] = ## Fetch manifest in batches of `batchSize` ## - trace "Fetching blocks in batches of", size = batchSize + trace "Fetching blocks in batches of", + size = batchSize, blocksCount = manifest.blocksCount let iter = Iter[int].new(0 ..< manifest.blocksCount) self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch) @@ -223,7 +230,7 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async finally: await stream.pushEof() - asyncSpawn streamOneBlock() + self.trackedFutures.track(streamOneBlock()) LPStream(stream).success proc streamEntireDataset( @@ -235,19 +242,27 @@ proc streamEntireDataset( if manifest.protected: # Retrieve, decode and save to the local store all EС groups - proc erasureJob(): Future[?!void] {.async.} = - # Spawn an erasure decoding job - let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool - ) - without _ =? (await erasure.decode(manifest)), error: - error "Unable to erasure decode manifest", manifestCid, exc = error.msg - return failure(error) - - return success() + proc erasureJob(): Future[void] {.async.} = + try: + # Spawn an erasure decoding job + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) + without _ =? (await erasure.decode(manifest)), error: + error "Unable to erasure decode manifest", manifestCid, exc = error.msg + except CatchableError as exc: + trace "Error erasure decoding manifest", manifestCid, exc = exc.msg + + self.trackedFutures.track(erasureJob()) + + proc prefetch() {.async: (raises: []).} = + try: + if err =? (await self.fetchBatched(manifest, DefaultFetchBatch)).errorOption: + error "Unable to fetch blocks", err = err.msg + except CatchableError as exc: + error "Error fetching blocks", exc = exc.msg - if err =? (await erasureJob()).errorOption: - return failure(err) + self.trackedFutures.track(prefetch()) # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid @@ -755,6 +770,11 @@ proc start*(self: CodexNodeRef) {.async.} = proc stop*(self: CodexNodeRef) {.async.} = trace "Stopping node" + if not self.taskpool.isNil: + self.taskpool.shutdown() + + await self.trackedFutures.cancelTracked() + if not self.engine.isNil: await self.engine.stop() @@ -776,9 +796,6 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close - if not self.taskpool.isNil: - self.taskpool.shutdown() - proc new*( T: type CodexNodeRef, switch: Switch, @@ -800,4 +817,5 @@ proc new*( discovery: discovery, taskPool: taskpool, contracts: contracts, + trackedFutures: TrackedFutures(), ) diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index faee36e1c..f94bca330 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -137,6 +137,14 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = trace "Checking network store for block existence", cid return await self.localStore.hasBlock(cid) +method hasBlock*( + self: NetworkStore, tree: Cid, index: Natural +): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + trace "Checking network store for block existence", tree, index + return await self.localStore.hasBlock(tree, index) + method close*(self: NetworkStore): Future[void] {.async.} = ## Close the underlying local blockstore ## From b090162ab2f779b15adca30d8bbeec5c948df0a8 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:48:24 -0600 Subject: [PATCH 16/29] cleanup --- codex/codex.nim | 2 +- codex/logutils.nim | 2 +- codex/rng.nim | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index dc5773738..b89052059 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -311,7 +311,7 @@ proc new*( bufferSize = (1024 * 64), maxRequestBodySize = int.high, ) - .expect("Should start rest server!") + .expect("Should create rest server!") switch.mount(network) diff --git a/codex/logutils.nim b/codex/logutils.nim index b37f69526..e9604aba3 100644 --- a/codex/logutils.nim +++ b/codex/logutils.nim @@ -152,7 +152,7 @@ proc formatTextLineSeq*(val: seq[string]): string = template formatIt*(format: LogFormat, T: typedesc, body: untyped) = # Provides formatters for logging with Chronicles for the given type and # `LogFormat`. - # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overriddden + # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overridden # since the base `setProperty` is generic using `auto` and conflicts with # providing a generic `seq` and `Option` override. when format == LogFormat.json: diff --git a/codex/rng.nim b/codex/rng.nim index 9d82156ea..866d65f80 100644 --- a/codex/rng.nim +++ b/codex/rng.nim @@ -55,6 +55,15 @@ proc sample*[T]( break +proc sample*[T]( + rng: Rng, sample: openArray[T], limit: int +): seq[T] {.raises: [Defect, RngSampleError].} = + if limit > sample.len: + raise newException(RngSampleError, "Limit cannot be larger than sample!") + + for _ in 0 ..< min(sample.len, limit): + result.add(rng.sample(sample, result)) + proc shuffle*[T](rng: Rng, a: var openArray[T]) = for i in countdown(a.high, 1): let j = rng.rand(i) From f7dc789b25697b9ac1bf0fe84d9fbd67b2455a9a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:48:37 -0600 Subject: [PATCH 17/29] decrease timeouts to speedup tests --- tests/codex/blockexchange/discovery/testdiscoveryengine.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 904703a0f..937047264 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -76,7 +76,7 @@ asyncchecksuite "Test Discovery Engine": ) await discoveryEngine.start() - await allFuturesThrowing(allFinished(wants)).wait(1.seconds) + await allFuturesThrowing(allFinished(wants)).wait(100.millis) await discoveryEngine.stop() test "Should queue discovery request": @@ -101,7 +101,7 @@ asyncchecksuite "Test Discovery Engine": await discoveryEngine.start() discoveryEngine.queueFindBlocksReq(@[blocks[0].cid]) - await want.wait(1.seconds) + await want.wait(100.millis) await discoveryEngine.stop() test "Should not request more than minPeersPerBlock": From a806be0975ed56965dd9ac310307126a07f19aa5 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:48:57 -0600 Subject: [PATCH 18/29] remove outdated test --- .../blockexchange/engine/testblockexc.nim | 44 ++----------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index aa15f795c..f87affa5f 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -1,5 +1,6 @@ import std/sequtils import std/algorithm +import std/importutils import pkg/chronos import pkg/stew/byteutils @@ -56,7 +57,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": nodeCmps2.switch.peerInfo.peerId, nodeCmps2.switch.peerInfo.addrs ) - await sleepAsync(1.seconds) # give some time to exchange lists + await sleepAsync(100.millis) # give some time to exchange lists peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId) peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId) @@ -75,7 +76,6 @@ asyncchecksuite "NetworkStore engine - 2 nodes": test "Should exchange blocks on connect": await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds) - await allFuturesThrowing(allFinished(pendingBlocks2)).wait(10.seconds) check: @@ -178,7 +178,7 @@ asyncchecksuite "NetworkStore - multiple nodes": (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(nodes) - await sleepAsync(1.seconds) + await sleepAsync(100.millis) await allFuturesThrowing(allFinished(pendingBlocks)) @@ -203,45 +203,9 @@ asyncchecksuite "NetworkStore - multiple nodes": (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(nodes) - await sleepAsync(1.seconds) + await sleepAsync(100.millis) await allFuturesThrowing(allFinished(pendingBlocks1), allFinished(pendingBlocks2)) check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3] check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15] - - test "Should actively cancel want-haves if block received from elsewhere": - let - # Peer wanting to download blocks - downloader = nodes[4] - # Bystander peer - gets block request but can't satisfy them - bystander = nodes[3] - # Holder of actual blocks - blockHolder = nodes[1] - - let aBlock = blocks[0] - (await blockHolder.engine.localStore.putBlock(aBlock)).tryGet() - - await connectNodes(@[downloader, bystander]) - # Downloader asks for block... - let blockRequest = downloader.engine.requestBlock(aBlock.cid) - - # ... and bystander learns that downloader wants it, but can't provide it. - check eventually( - bystander.engine.peers - .get(downloader.switch.peerInfo.peerId).peerWants - .filterIt(it.address == aBlock.address).len == 1 - ) - - # As soon as we connect the downloader to the blockHolder, the block should - # propagate to the downloader... - await connectNodes(@[downloader, blockHolder]) - check (await blockRequest).tryGet().cid == aBlock.cid - check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet() - - # ... and the bystander should have cancelled the want-have - check eventually( - bystander.engine.peers - .get(downloader.switch.peerInfo.peerId).peerWants - .filterIt(it.address == aBlock.address).len == 0 - ) From 616672d7553645fb7437600da6b9b3a5b489b606 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:49:08 -0600 Subject: [PATCH 19/29] add retry tests --- .../codex/blockexchange/engine/testengine.nim | 173 +++++++++++++++++- 1 file changed, 171 insertions(+), 2 deletions(-) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index f7cc82941..553beb935 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -20,6 +20,11 @@ import ../../../asynctest import ../../helpers import ../../examples +const NopSendWantCancellationsProc = proc( + id: PeerId, addresses: seq[BlockAddress] +) {.gcsafe, async.} = + discard + asyncchecksuite "NetworkStore engine basic": var rng: Rng @@ -292,7 +297,8 @@ asyncchecksuite "NetworkStore engine handlers": await done.wait(100.millis) test "Should handle block presence": - var handles: Table[Cid, Future[Block]] + var handles: + Table[Cid, Future[Block].Raising([CancelledError, RetriesExhaustedError])] proc sendWantList( id: PeerId, @@ -333,6 +339,10 @@ asyncchecksuite "NetworkStore engine handlers": blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq) + peerCtx.blocks = blocks.mapIt( + (it.address, Presence(address: it.address, have: true, price: UInt256.example)) + ).toTable + proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] ) {.gcsafe, async.} = @@ -344,9 +354,168 @@ asyncchecksuite "NetworkStore engine handlers": ) await engine.blocksDeliveryHandler(peerId, blocksDelivery) - discard await allFinished(pending) + discard await allFinished(pending).wait(100.millis) await allFuturesThrowing(cancellations.values().toSeq) +asyncchecksuite "Block Download": + var + rng: Rng + seckey: PrivateKey + peerId: PeerId + chunker: Chunker + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + network: BlockExcNetwork + engine: BlockExcEngine + discovery: DiscoveryEngine + advertiser: Advertiser + peerCtx: BlockExcPeerCtx + localStore: BlockStore + blocks: seq[Block] + + setup: + rng = Rng.instance() + chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(Block.new(chunk).tryGet()) + + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + wallet = WalletRef.example + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + + localStore = CacheStore.new() + network = BlockExcNetwork() + + discovery = + DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + + advertiser = Advertiser.new(localStore, blockDiscovery) + + engine = BlockExcEngine.new( + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks + ) + + peerCtx = BlockExcPeerCtx(id: peerId) + engine.peers.add(peerCtx) + + test "Should exhaust retries": + var + retries = 2 + address = BlockAddress.init(blocks[0].cid) + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + check wantType == WantHave + check not engine.pendingBlocks.isInFlight(address) + check engine.pendingBlocks.retries(address) == retries + retries -= 1 + + engine.pendingBlocks.blockRetries = 2 + engine.pendingBlocks.retryInterval = 10.millis + engine.network = + BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) + + let pending = engine.requestBlock(address) + + expect RetriesExhaustedError: + discard (await pending).tryGet() + + test "Should retry block request": + var + address = BlockAddress.init(blocks[0].cid) + steps = newAsyncEvent() + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + case wantType + of WantHave: + check engine.pendingBlocks.isInFlight(address) == false + check engine.pendingBlocks.retriesExhausted(address) == false + steps.fire() + of WantBlock: + check engine.pendingBlocks.isInFlight(address) == true + check engine.pendingBlocks.retriesExhausted(address) == false + steps.fire() + + engine.pendingBlocks.blockRetries = 10 + engine.pendingBlocks.retryInterval = 10.millis + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc + ) + ) + + let pending = engine.requestBlock(address) + await steps.wait() + + # add blocks precense + peerCtx.blocks = blocks.mapIt( + (it.address, Presence(address: it.address, have: true, price: UInt256.example)) + ).toTable + + steps.clear() + await steps.wait() + + await engine.blocksDeliveryHandler( + peerId, @[BlockDelivery(blk: blocks[0], address: address)] + ) + check (await pending).tryGet() == blocks[0] + + test "Should cancel block request": + var + address = BlockAddress.init(blocks[0].cid) + done = newFuture[void]() + + proc sendWantList( + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false, + ) {.gcsafe, async.} = + done.complete() + + engine.pendingBlocks.blockRetries = 10 + engine.pendingBlocks.retryInterval = 1.seconds + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc + ) + ) + + let pending = engine.requestBlock(address) + await done.wait(100.millis) + + pending.cancel() + expect CancelledError: + discard (await pending).tryGet() + asyncchecksuite "Task Handler": var rng: Rng From ac272924728c554f048d2bc38cc14c6a56573306 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:49:30 -0600 Subject: [PATCH 20/29] should track retries --- .../codex/blockexchange/testpendingblocks.nim | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/tests/codex/blockexchange/testpendingblocks.nim b/tests/codex/blockexchange/testpendingblocks.nim index 45b065c0e..259eb657a 100644 --- a/tests/codex/blockexchange/testpendingblocks.nim +++ b/tests/codex/blockexchange/testpendingblocks.nim @@ -28,7 +28,10 @@ checksuite "Pending Blocks": check blk.cid in pendingBlocks pendingBlocks.resolve(@[blk].mapIt(BlockDelivery(blk: it, address: it.address))) - check (await handle) == blk + await sleepAsync(0.millis) + # trigger the event loop, otherwise the block finishes before poll runs + let resolved = await handle + check resolved == blk check blk.cid notin pendingBlocks test "Should cancel want handle": @@ -41,20 +44,6 @@ checksuite "Pending Blocks": await handle.cancelAndWait() check blk.cid notin pendingBlocks - test "Should expire want handle": - let - pendingBlocks = PendingBlocksManager.new() - blk = bt.Block.new("Hello".toBytes).tryGet - handle = pendingBlocks.getWantHandle(blk.cid, 1.millis) - - check blk.cid in pendingBlocks - - await sleepAsync(10.millis) - expect AsyncTimeoutError: - discard await handle - - check blk.cid notin pendingBlocks - test "Should get wants list": let pendingBlocks = PendingBlocksManager.new() @@ -79,3 +68,22 @@ checksuite "Pending Blocks": check: (await allFinished(wantHandles)).mapIt($it.read.cid).sorted(cmp[string]) == (await allFinished(handles)).mapIt($it.read.cid).sorted(cmp[string]) + + test "Should handle retry counters": + let + pendingBlocks = PendingBlocksManager.new(3) + blk = bt.Block.new("Hello".toBytes).tryGet + address = BlockAddress.init(blk.cid) + handle = pendingBlocks.getWantHandle(blk.cid) + + check pendingBlocks.retries(address) == 3 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 2 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 1 + pendingBlocks.decRetries(address) + check pendingBlocks.retries(address) == 0 + check pendingBlocks.retriesExhausted(address) + + pendingBlocks.resolve(@[blk].mapIt(BlockDelivery(blk: it, address: it.address))) + check (await handle).cid == blk.cid From 7701e7570b2254b0332ddb6684cf0173968ee6f2 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 17 Feb 2025 18:56:00 -0600 Subject: [PATCH 21/29] remove useless test --- tests/codex/node/testnode.nim | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 379602322..6e638c212 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -64,21 +64,6 @@ asyncchecksuite "Test Node - Basic": check: fetched == manifest - test "Should not lookup non-existing blocks twice": - # https://github.com/codex-storage/nim-codex/issues/699 - let - cstore = CountingStore.new(engine, localStore) - node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new()) - missingCid = - Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() - - engine.blockFetchTimeout = timer.milliseconds(100) - - discard await node.retrieve(missingCid, local = false) - - let lookupCount = cstore.lookups.getOrDefault(missingCid) - check lookupCount == 1 - test "Block Batching": let manifest = await storeDataGetManifest(localStore, chunker) From 732de6c404e125adb52b3e0a03bb59453af94d38 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 18 Feb 2025 18:16:27 -0600 Subject: [PATCH 22/29] use correct block address (index was off by 1) --- codex/node.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/node.nim b/codex/node.nim index 805d6d14a..5e440c78a 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -183,7 +183,7 @@ proc fetchBatched*( if not iter.finished: let address = BlockAddress.init(cid, iter.next()) if not await address in self.networkStore: - self.networkStore.getBlock(BlockAddress.init(cid, iter.next())) + self.networkStore.getBlock(address) if blocksErr =? (await allFutureResult(blocks)).errorOption: return failure(blocksErr) From 45af78a547acb06128bb9f693f2d1553c49568d4 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 18 Feb 2025 18:16:42 -0600 Subject: [PATCH 23/29] remove duplicate noop proc --- tests/codex/blockexchange/engine/testengine.nim | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 553beb935..78416f0fe 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -134,11 +134,6 @@ asyncchecksuite "NetworkStore engine handlers": localStore: BlockStore blocks: seq[Block] - const NopSendWantCancellationsProc = proc( - id: PeerId, addresses: seq[BlockAddress] - ) {.gcsafe, async.} = - discard - setup: rng = Rng.instance() chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) From aeb3ac6700d28bc2be6e7168f7d03d12c510b98b Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 10:35:21 -0600 Subject: [PATCH 24/29] add BlockHandle type --- codex/blockexchange/engine/pendingblocks.nim | 3 ++- vendor/nim-serde | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 090811425..ce6381c0d 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -38,9 +38,10 @@ const type RetriesExhaustedError* = object of CatchableError + BlockHandle* = Future[Block].Raising([CancelledError, RetriesExhaustedError]) BlockReq* = object - handle*: Future[Block].Raising([CancelledError, RetriesExhaustedError]) + handle*: BlockHandle inFlight*: bool blockRetries*: int startTime*: int64 diff --git a/vendor/nim-serde b/vendor/nim-serde index c82e85c62..69a7a0111 160000 --- a/vendor/nim-serde +++ b/vendor/nim-serde @@ -1 +1 @@ -Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b +Subproject commit 69a7a0111addaa4aad885dd4bd7b5ee4684a06de From 83bd438b8ab62892b6a079ceb6c1555606acb51e Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 10:36:04 -0600 Subject: [PATCH 25/29] Use BlockHandle type --- tests/codex/blockexchange/engine/testblockexc.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index f87affa5f..0c250231a 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -21,7 +21,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes": peerCtx1, peerCtx2: BlockExcPeerCtx pricing1, pricing2: Pricing blocks1, blocks2: seq[bt.Block] - pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] + pendingBlocks1, pendingBlocks2: seq[BlockHandle] setup: blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) From b619482f2b23dbfdc9b23af3560c41fef50bee75 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 10:42:54 -0600 Subject: [PATCH 26/29] add fetchLocal to control batching from local store --- codex/node.nim | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 5e440c78a..247643bef 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -167,6 +167,7 @@ proc fetchBatched*( iter: Iter[int], batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, + fetchLocal = true, ): Future[?!void] {.async, gcsafe.} = ## Fetch blocks in batches of `batchSize` ## @@ -182,7 +183,7 @@ proc fetchBatched*( for i in 0 ..< batchSize: if not iter.finished: let address = BlockAddress.init(cid, iter.next()) - if not await address in self.networkStore: + if not (await address in self.networkStore) or fetchLocal: self.networkStore.getBlock(address) if blocksErr =? (await allFutureResult(blocks)).errorOption: @@ -201,6 +202,7 @@ proc fetchBatched*( manifest: Manifest, batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, + fetchLocal = true, ): Future[?!void] = ## Fetch manifest in batches of `batchSize` ## @@ -209,7 +211,7 @@ proc fetchBatched*( size = batchSize, blocksCount = manifest.blocksCount let iter = Iter[int].new(0 ..< manifest.blocksCount) - self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch) + self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = ## Streams the contents of a single block. @@ -257,7 +259,9 @@ proc streamEntireDataset( proc prefetch() {.async: (raises: []).} = try: - if err =? (await self.fetchBatched(manifest, DefaultFetchBatch)).errorOption: + if err =? ( + await self.fetchBatched(manifest, DefaultFetchBatch, fetchLocal = false) + ).errorOption: error "Unable to fetch blocks", err = err.msg except CatchableError as exc: error "Error fetching blocks", exc = exc.msg From 1c98b31dbb53dc7ad64a4fec2696cd9554a0775a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 10:43:22 -0600 Subject: [PATCH 27/29] add format target --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index 3dfe8e7ef..29d6c11de 100644 --- a/Makefile +++ b/Makefile @@ -229,6 +229,11 @@ nph/%: build-nph echo -e $(FORMAT_MSG) "nph/$*" && \ $(NPH) $* +format: + $(NPH) *.nim + $(NPH) codex/ + $(NPH) tests/ + clean-nph: rm -f $(NPH) From 999ed6ed8f5bd7b6d61ce1c2929b9420ede73968 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 11:54:07 -0600 Subject: [PATCH 28/29] revert deps --- vendor/codex-contracts-eth | 2 +- vendor/nim-ethers | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index e74d3397a..ff82c26b3 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit e74d3397a133eaf1eb95d9ce59f56747a7c8c30b +Subproject commit ff82c26b3669b52a09280c634141dace7f04659a diff --git a/vendor/nim-ethers b/vendor/nim-ethers index 1cfccb969..d2b11a865 160000 --- a/vendor/nim-ethers +++ b/vendor/nim-ethers @@ -1 +1 @@ -Subproject commit 1cfccb9695fa47860bf7ef3d75da9019096a3933 +Subproject commit d2b11a865796a55296027f8ffba68398035ad435 From 2e97fe8df524561af7ffd33feacfedea126c7e95 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 19 Feb 2025 12:54:09 -0600 Subject: [PATCH 29/29] adjust quotaMaxBytes --- tests/integration/testrestapi.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 3918791e3..08be9c4ff 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -37,7 +37,7 @@ twonodessuite "REST API": let space = client1.space().tryGet() check: space.totalBlocks == 2 - space.quotaMaxBytes == 8589934592.NBytes + space.quotaMaxBytes == 21474836480.NBytes space.quotaUsedBytes == 65592.NBytes space.quotaReservedBytes == 12.NBytes