Skip to content

Commit

Permalink
WIP, generic coders & sample access functions
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekas committed Nov 22, 2023
1 parent 70efd13 commit 3833072
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 80 deletions.
228 changes: 149 additions & 79 deletions codex/stores/repostore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import pkg/upraises

push: {.upraises: [].}

import std/sugar

import pkg/chronos
import pkg/chronos/futures
import pkg/chronicles
Expand All @@ -29,6 +31,7 @@ import ../clock
import ../systemclock
import ../merkletree
import ../utils
import ../utils/genericcoders

export blocktype, cid

Expand All @@ -50,7 +53,7 @@ type
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
metaDs*: ConcurrentDatastore
clock: Clock
totalBlocks*: uint # number of blocks in the store
quotaMaxBytes*: uint # maximum available bytes
Expand All @@ -63,6 +66,19 @@ type
cid*: Cid
expiration*: SecondsSince1970

QuotaUsageMetadata* = object
quotaUsedBytes = int64
quotaReservedBytes = int64

BlockMetadata* = object
expiry: SecondsSince1970
size: int64 # TODO use NBytes
refCount: int64

LeafMetadata* =
blkCid: Cid
proof: MerkleProof

proc updateMetrics(self: RepoStore) =
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
Expand All @@ -77,34 +93,20 @@ func available*(self: RepoStore): uint =
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()

proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE

@nBytes & cidBytes & proofBytes

proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? MerkleProof.decode(data[sizeof(uint64) + n..^1])
success((cid, proof))

proc decodeCid(_: type (Cid, MerkleProof), data: seq[byte]): ?!Cid =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
success(cid)
proc encode(cid: Cid): seq[byte] =
cid.data.buffer

proc decode(_: type Cid, bytes: seq[byte]): !?Cid =
Cid.init(bytes).mapFailure

proc encode(i: int64): seq[byte] =
@(cast[uint64](a).toBytesBE)

proc decode(_: type int64, bytes: seq[byte]): !?int64 =
if bytes.len >= sizeof(uint64):
success(cast[int64](uint64.fromBytesBE(bytes)))
else:
failure("Not enough bytes to decode `int64`")

method putBlockCidAndProof*(
self: RepoStore,
Expand All @@ -123,6 +125,7 @@ method putBlockCidAndProof*(

await self.metaDs.put(key, value)


proc getCidAndProof(
self: RepoStore,
treeCid: Cid,
Expand Down Expand Up @@ -153,7 +156,7 @@ proc getCid(
else:
return failure(err)

return (Cid, MerkleProof).decodeCid(value)
return (Cid, MerkleProof).decodeFirst(value)

method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
Expand Down Expand Up @@ -238,6 +241,8 @@ method ensureExpiry*(
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##

## TODO do a safe modify here

logScope:
cid = cid

Expand Down Expand Up @@ -273,13 +278,119 @@ proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
return failure(err)
return success()

type
CountOp {.pure.} = enum
Inc, Dec, NoOp

func toDelta(op: CountOp): int64 =
if op == Inc:
1.int64
elif op == Dec:
-1.int64
else:
0.int64


proc updateTotalBlocksCount(self: RepoStore, op: CountOp): Future[?!void] {.async.} =
let res = await self.metaDs.modifyGet(CodexTotalBlocksKey, (maybeBytes: ?seq[byte]) =>
if bytes =? maybeBytes:
without count = int64.decode(bytes), err:
raise err

some(max(count + op.toDelta, 0).encode())
else:
some(max(op.toDelta, 0).encode())
)

# TODO update metrics

# TODO use `mapErr` instead of this stuff below?
if err =? res.errorOption:
return failure("Error updating quota, nested err: " & err.msg)

proc updateQuotaUsage(self: RepoStore, deltaUsed: int64 = 0, deltaReserved: int64 = 0): Future[?!void] {.async.} =
let res = await self.metaDs.modify(QuotaUsedKey, (maybeBytes: ?seq[byte]) =>
var
newUsed: int64
newReserved: int64

if bytes =? maybeBytes:
without usedAndReserved = (int64, int64).decode(bytes), err:
raise err

let (used, reserved) = usedAndReserved

newUsed = max(used + deltaUsed, 0)
newReserved = max(reserved + deltaReserved, 0)
else:
newUsed = max(deltaUsed, 0)
newReserved = max(deltaReserved, 0)

if newUsed + newReserved > self.quotaMaxBytes:
raise newException(QuotaUsedError, "Cannot store block, quota used!"))
else:
some((newUsed, newReserved).encode())
)

# TODO update metrics

# TODO use `mapErr` instead of this stuff below?
if err =? res.errorOption:
return failure("Error updating quota, nested err: " & err.msg)

proc updateBlockMetadata(self: RepoStore, cid: Cid, minExpiry: SecondsSince1970 = 0, op: CountOp = NoOp): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)

let res = await self.metaDs.modify(key, (maybeBytes: ?seq[byte]) =>
if bytes =? maybeBytes:
without expAndRefCount = (SecondsSince1970, RefCount).decode(bytes), err:
raise err

let (expiry, refCount) = expAndRefCount

some((
max(expiry, minExpiry),
max(refCount + op.toDelta, 0)
).encode())
else:
some((minExpiry, 0).encode())
)

if err =? res.errorOption:
return failure("Error updating block metadata, nested err: " & err.msg)

proc tryDeleteBlockMetadata(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)

let res = await self.metaDs.modify(key, (maybeBytes: ?seq[byte]) =>
if bytes =? maybeBytes:
without expAndRefCount = (SecondsSince1970, RefCount).decode(bytes), err:
raise err

let (_, refCount) = expAndRefCount

if refCount == 0:
(SecondsSince1970, RefCount).encode().none
else:
bytes.some
else:
(SecondsSince1970, RefCount).encode().none
)

if err =? res.errorOption:
return failure("Error deleting block metadata, nested err: " & err.msg)

method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##

## TODO use default ref count = 0

logScope:
cid = blk.cid

Expand All @@ -295,69 +406,26 @@ method putBlock*(
trace "Block already in store", cid = blk.cid
return success()

if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes:
error "Cannot store block, quota used!", used = self.totalUsed
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))

trace "Storing block with key", key

var
batch: seq[BatchEntry]

let
used = self.quotaUsedBytes + blk.data.len.uint

if err =? (await self.repoDs.put(key, blk.data)).errorOption:
trace "Error storing block", err = err.msg
if err =? (await self.updateQuotaUsage(deltaUsed = blk.data.len )).errorOption:
return failure(err)

trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))

without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
return failure(err)
batch.add(blockExpEntry)

if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg

if err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block after failed quota update", err = err.msg
return failure(err)

if err =? (await self.updateTotalBlocksCount(Inc)).errorOption:
return failure(err)

self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
# TODO handle rolling back changes made by other steps

self.updateMetrics()
return success()

proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
self.updateMetrics()
return success()

proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)

method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##

## TODO log if deleting a block with ref count != 0

logScope:
cid = cid

Expand Down Expand Up @@ -404,6 +472,8 @@ method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void]
without cid =? (Cid, MerkleProof).decodeCid(value), err:
return failure(err)

# TODO decrement ref-count and delete block from disk only if ref-count is 0

if err =? (await self.delBlock(cid)).errorOption:
return failure(err)

Expand Down
29 changes: 29 additions & 0 deletions codex/utils/genericcoders.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pkg/questionable
import pkg/questionable/results
import pkg/stew/endians2
import pkg/stew/byteutils

proc autoencode*[T: tuple | object](tup: T): seq[byte] =
var buffer = newSeq[byte]()
for f in fields(tup):
when (compiles do:
let _: seq[byte] = encode(f)):
let fBytes = encode(f)
buffer.add(cast[uint64](fBytes.len).toBytesBE) # TODO user varint
buffer.add(fBytes)
else:
{.error: "please provide `proc encode(a: " & $typeof(a) & "): seq[byte]`".}

return buffer

proc autodecode*(T: typedesc[tuple | object], bytes: seq[byte]): ?!T =
var
res = default(T)
offset = 0
for f in fields(res):
let fLen = cast[int](uint64.fromBytesBE(bytes[offset..<offset + sizeof(uint64)])) # TODO user varint
offset.inc(sizeof(uint64))
f = ? decode(typeof(f), bytes[offset..<offset + fLen])
offset.inc(fLen)

return success(res)
2 changes: 1 addition & 1 deletion vendor/nim-datastore

0 comments on commit 3833072

Please sign in to comment.