Skip to content

Commit

Permalink
Block deletion with ref count & repostore refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekas committed Jun 18, 2024
1 parent 6e9bdf1 commit 5d4b971
Show file tree
Hide file tree
Showing 21 changed files with 952 additions and 866 deletions.
2 changes: 1 addition & 1 deletion codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ proc new*(
repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota.uint,
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)

maintenance = BlockMaintainer.new(
Expand Down
17 changes: 17 additions & 0 deletions codex/merkletree/codex/coders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ push: {.upraises: [].}
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/byteutils
import pkg/serde/json

import ../../units
import ../../errors
Expand Down Expand Up @@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof =
nodes.add node

CodexProof.init(mcodec, index.int, nleaves.int, nodes)

proc fromJson*(
_: type CodexProof,
json: JsonNode
): ?!CodexProof =
expectJsonKind(Cid, JString, json)
var bytes: seq[byte]
try:
bytes = hexToSeqByte(json.str)
except ValueError as err:
return failure(err)

Check warning on line 116 in codex/merkletree/codex/coders.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/coders.nim#L114-L116

Added lines #L114 - L116 were not covered by tests
CodexProof.decode(bytes)

func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode())
9 changes: 5 additions & 4 deletions codex/rest/json.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ../sales
import ../purchasing
import ../utils/json
import ../manifest
import ../units

export json

Expand Down Expand Up @@ -65,10 +66,10 @@ type
id*: NodeId

RestRepoStore* = object
totalBlocks* {.serialize.}: uint
quotaMaxBytes* {.serialize.}: uint
quotaUsedBytes* {.serialize.}: uint
quotaReservedBytes* {.serialize.}: uint
totalBlocks* {.serialize.}: Natural
quotaMaxBytes* {.serialize.}: NBytes
quotaUsedBytes* {.serialize.}: NBytes
quotaReservedBytes* {.serialize.}: NBytes

proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList =
RestContentList(
Expand Down
34 changes: 17 additions & 17 deletions codex/sales/reservations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import ../stores
import ../market
import ../contracts/requests
import ../utils/json
import ../units

export requests
export logutils
Expand Down Expand Up @@ -166,16 +167,16 @@ func key*(availability: Availability): ?!Key =
func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId)

func available*(self: Reservations): uint = self.repo.available
func available*(self: Reservations): uint = self.repo.available.uint

func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes)
self.repo.available(bytes.NBytes)

proc exists*(
self: Reservations,
key: Key): Future[bool] {.async.} =

let exists = await self.repo.metaDs.contains(key)
let exists = await self.repo.metaDs.ds.contains(key)
return exists

proc getImpl(
Expand All @@ -186,7 +187,7 @@ proc getImpl(
let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err)

without serialized =? await self.repo.metaDs.get(key), error:
without serialized =? await self.repo.metaDs.ds.get(key), error:
return failure(error.toErr(GetFailedError))

return success serialized
Expand All @@ -213,7 +214,7 @@ proc updateImpl(
without key =? obj.key, error:
return failure(error)

if err =? (await self.repo.metaDs.put(
if err =? (await self.repo.metaDs.ds.put(
key,
@(obj.toJson.toBytes)
)).errorOption:
Expand Down Expand Up @@ -257,11 +258,11 @@ proc update*(
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))

elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))

let res = await self.updateImpl(obj)
Expand Down Expand Up @@ -294,7 +295,7 @@ proc delete(
if not await self.exists(key):
return success()

if err =? (await self.repo.metaDs.delete(key)).errorOption:
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:

Check warning on line 298 in codex/sales/reservations.nim

View check run for this annotation

Codecov / codecov/patch

codex/sales/reservations.nim#L298

Added line #L298 was not covered by tests
return failure(err.toErr(DeleteFailedError))

return success()
Expand Down Expand Up @@ -333,7 +334,7 @@ proc deleteReservation*(
if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)

if err =? (await self.repo.metaDs.delete(key)).errorOption:
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))

return success()
Expand All @@ -355,14 +356,14 @@ proc createAvailability*(
)
let bytes = availability.freeSize.truncate(uint)

if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))

if updateErr =? (await self.update(availability)).errorOption:

# rollback the reserve
trace "rolling back reserve"
if rollbackErr =? (await self.repo.release(bytes)).errorOption:
if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)

Expand Down Expand Up @@ -448,7 +449,7 @@ proc returnBytesToAvailability*(

# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))

without availabilityKey =? availabilityId.key, error:
Expand All @@ -463,7 +464,7 @@ proc returnBytesToAvailability*(
if updateErr =? (await self.update(availability)).errorOption:

trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption:

Check warning on line 467 in codex/sales/reservations.nim

View check run for this annotation

Codecov / codecov/patch

codex/sales/reservations.nim#L467

Added line #L467 was not covered by tests
rollbackErr.parent = updateErr
return failure(rollbackErr)

Expand Down Expand Up @@ -497,7 +498,7 @@ proc release*(
"trying to release an amount of bytes that is greater than the total size of the Reservation")
return failure(error)

if releaseErr =? (await self.repo.release(bytes)).errorOption:
if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError))

reservation.size -= bytes.u256
Expand All @@ -507,7 +508,7 @@ proc release*(

# rollback release if an update error encountered
trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption:
if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
rollbackErr.parent = err
return failure(rollbackErr)
return failure(err)
Expand Down Expand Up @@ -537,7 +538,7 @@ proc storables(
else:
raiseAssert "unknown type"

without results =? await self.repo.metaDs.query(query), error:
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)

# /sales/reservations
Expand Down Expand Up @@ -639,4 +640,3 @@ proc findAvailability*(
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral

12 changes: 6 additions & 6 deletions codex/stores/maintenance.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.}
trace "Unable to delete block from repoStore"

proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now:
if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid)
else:
inc self.offset
Expand All @@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
return

var numberReceived = 0
for maybeBeFuture in iter:
if be =? await maybeBeFuture:
inc numberReceived
await self.processBlockExpiration(be)
await sleepAsync(50.millis)
for beFut in iter:
let be = await beFut
inc numberReceived
await self.processBlockExpiration(be)
await sleepAsync(1.millis) # cooperative scheduling

# If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time.
Expand Down
2 changes: 1 addition & 1 deletion codex/stores/queryiterhelper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pkg/datastore/typedds

import ../utils/asynciter

type KeyVal[T] = tuple[key: Key, value: T]
type KeyVal*[T] = tuple[key: Key, value: T]

proc toAsyncIter*[T](
queryIter: QueryIter[T],
Expand Down
Loading

0 comments on commit 5d4b971

Please sign in to comment.