Skip to content

Commit

Permalink
Add update operation
Browse files Browse the repository at this point in the history
Make upsert return the updated/inserted record
  • Loading branch information
dacr committed Jan 21, 2024
1 parent 0313769 commit 7cccbf5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 11 deletions.
10 changes: 10 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# ZIO-LMDB RELEASE NOTES

## 1.8.0 - 2024-01-21

- dependency updates
- add update operation
- `def update(key: RecordKey, modifier: T => T): IO[UpdateErrors, Option[T]]`
- will return None if no record was found
- change upsert method signature to return the updated/inserted record (instead of Unit previously)
- `def upsert(key: RecordKey, modifier: Option[T] => T): IO[UpsertErrors, T]`
- now the updated or inserted record is returned

## 1.7.1 - 2024-01-01

- add limit parameter to collection collect operation
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala-3/zio/lmdb/LMDBIssues.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package zio.lmdb

enum StorageUserError {
case CollectionAlreadExists(name: CollectionName) extends StorageUserError
case CollectionNotFound(name: CollectionName) extends StorageUserError
case JsonFailure(issue: String) extends StorageUserError
case OverSizedKey(id: String, expandedSize: Int, limit: Int) extends StorageUserError
case CollectionAlreadExists(name: CollectionName)
case CollectionNotFound(name: CollectionName)
case JsonFailure(issue: String)
case OverSizedKey(id: String, expandedSize: Int, limit: Int)
}

enum StorageSystemError {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala-3/zio/lmdb/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ package object lmdb {
type CreateErrors = CollectionAlreadExists | StorageSystemError
type FetchErrors = OverSizedKey | CollectionNotFound | JsonFailure | StorageSystemError
type ContainsErrors = OverSizedKey | CollectionNotFound | StorageSystemError
type UpdateErrors = OverSizedKey | CollectionNotFound | JsonFailure | StorageSystemError
type UpsertErrors = OverSizedKey | CollectionNotFound | JsonFailure | StorageSystemError
type DeleteErrors = OverSizedKey | CollectionNotFound | JsonFailure | StorageSystemError
type CollectErrors = OverSizedKey | CollectionNotFound | JsonFailure | StorageSystemError
Expand Down
27 changes: 25 additions & 2 deletions src/main/scala/zio/lmdb/LMDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ trait LMDB {

def contains(collectionName: CollectionName, key: RecordKey): IO[ContainsErrors, Boolean]

def upsert[T](collectionName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, Unit]
def update[T](collectionName: CollectionName, key: RecordKey, modifier: T => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpdateErrors, Option[T]]

def upsert[T](collectionName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, T]

def upsertOverwrite[T](collectionName: CollectionName, key: RecordKey, document: T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, Unit]

Expand Down Expand Up @@ -282,6 +284,24 @@ object LMDB {
*/
def contains(collectionName: CollectionName, key: RecordKey): ZIO[LMDB, ContainsErrors, Boolean] = ZIO.serviceWithZIO(_.contains(collectionName, key))

/** update atomically a record in a collection.
*
* @param collectionName
* the collection name
* @param key
* the key for the record upsert
* @param modifier
* the lambda used to update the record content
* @tparam T
* the data type of the record which must be Json serializable
* @returns
* the updated record if a record exists for the given key
*/

def update[T](collectionName: CollectionName, key: RecordKey, modifier: T => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): ZIO[LMDB, UpdateErrors, Option[T]] = {
ZIO.serviceWithZIO(_.update[T](collectionName, key, modifier))
}

/** update or insert atomically a record in a collection.
*
* @param collectionName
Expand All @@ -292,9 +312,12 @@ object LMDB {
* the lambda used to update the record content
* @tparam T
* the data type of the record which must be Json serializable
* @returns
* the updated or inserted record
*/
def upsert[T](collectionName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): ZIO[LMDB, UpsertErrors, Unit] =
def upsert[T](collectionName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): ZIO[LMDB, UpsertErrors, T] = {
ZIO.serviceWithZIO(_.upsert[T](collectionName, key, modifier))
}

/** Overwrite or insert a record in a collection. If the key is already being used for a record then the previous record will be overwritten by the new one.
*
Expand Down
15 changes: 14 additions & 1 deletion src/main/scala/zio/lmdb/LMDBCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,27 @@ case class LMDBCollection[T](name: String, lmdb: LMDB)(implicit je: JsonEncoder[
*/
def contains(key: RecordKey): IO[ContainsErrors, Boolean] = lmdb.contains(name, key)

/** update atomically a record in a collection.
*
* @param key
* the key for the record upsert
* @param modifier
* the lambda used to update the record content
* @returns
* the updated record if a record exists for the given key
*/
def update(key: RecordKey, modifier: T => T): IO[UpdateErrors, Option[T]] = lmdb.update(name, key, modifier)

/** update or insert atomically a record in a collection.
*
* @param key
* the key for the record upsert
* @param modifier
* the lambda used to update the record content
* @returns
* the updated or inserted record
*/
def upsert(key: RecordKey, modifier: Option[T] => T): IO[UpsertErrors, Unit] =
def upsert(key: RecordKey, modifier: Option[T] => T): IO[UpsertErrors, T] =
lmdb.upsert[T](name, key, modifier)

/** Overwrite or insert a record in a collection. If the key is already being used for a record then the previous record will be overwritten by the new one.
Expand Down
38 changes: 35 additions & 3 deletions src/main/scala/zio/lmdb/LMDBLive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,38 @@ class LMDBLive(
} yield result
}

override def update[T](collectionName: CollectionName, key: RecordKey, modifier: T => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpdateErrors, Option[T]] = {
def updateLogic(collectionDbi: Dbi[ByteBuffer]): IO[UpdateErrors, Option[T]] = {
reentrantLock.withWriteLock(
withWriteTransaction(collectionName) { txn =>
for {
key <- makeKeyByteBuffer(key)
found <- ZIO.attemptBlocking(Option(collectionDbi.get(txn, key))).mapError(err => InternalError(s"Couldn't fetch $key for upsert on $collectionName", Some(err)))
mayBeRawValue <- ZIO.foreach(found)(_ => ZIO.succeed(txn.`val`()))
mayBeDocBefore <- ZIO.foreach(mayBeRawValue) { rawValue =>
ZIO.fromEither(charset.decode(rawValue).fromJson[T]).mapError[UpdateErrors](msg => JsonFailure(msg))
}
mayBeDocAfter = mayBeDocBefore.map(modifier)
_ <- ZIO.foreachDiscard(mayBeDocAfter) { docAfter =>
val jsonDocBytes = docAfter.toJson.getBytes(charset)
for {
valueBuffer <- ZIO.attemptBlocking(ByteBuffer.allocateDirect(jsonDocBytes.size)).mapError(err => InternalError("Couldn't allocate byte buffer for json value", Some(err)))
_ <- ZIO.attemptBlocking(valueBuffer.put(jsonDocBytes).flip).mapError(err => InternalError("Couldn't copy value bytes to buffer", Some(err)))
_ <- ZIO.attemptBlocking(collectionDbi.put(txn, key, valueBuffer)).mapError(err => InternalError(s"Couldn't upsert $key into $collectionName", Some(err)))
_ <- ZIO.attemptBlocking(txn.commit()).mapError(err => InternalError(s"Couldn't commit upsert $key into $collectionName", Some(err)))
} yield ()
}
} yield mayBeDocAfter
}
)
}

for {
collectionDbi <- getCollectionDbi(collectionName)
result <- updateLogic(collectionDbi)
} yield result
}

override def upsertOverwrite[T](colName: CollectionName, key: RecordKey, document: T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, Unit] = {
def upsertLogic(collectionDbi: Dbi[ByteBuffer]): IO[UpsertErrors, Unit] = {
reentrantLock.withWriteLock(
Expand All @@ -367,8 +399,8 @@ class LMDBLive(
} yield result
}

override def upsert[T](colName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, Unit] = {
def upsertLogic(collectionDbi: Dbi[ByteBuffer]): IO[UpsertErrors, Unit] = {
override def upsert[T](colName: CollectionName, key: RecordKey, modifier: Option[T] => T)(implicit je: JsonEncoder[T], jd: JsonDecoder[T]): IO[UpsertErrors, T] = {
def upsertLogic(collectionDbi: Dbi[ByteBuffer]): IO[UpsertErrors, T] = {
reentrantLock.withWriteLock(
withWriteTransaction(colName) { txn =>
for {
Expand All @@ -384,7 +416,7 @@ class LMDBLive(
_ <- ZIO.attemptBlocking(valueBuffer.put(jsonDocBytes).flip).mapError(err => InternalError("Couldn't copy value bytes to buffer", Some(err)))
_ <- ZIO.attemptBlocking(collectionDbi.put(txn, key, valueBuffer)).mapError(err => InternalError(s"Couldn't upsert $key into $colName", Some(err)))
_ <- ZIO.attemptBlocking(txn.commit()).mapError(err => InternalError(s"Couldn't commit upsert $key into $colName", Some(err)))
} yield ()
} yield docAfter
}
)
}
Expand Down
17 changes: 17 additions & 0 deletions src/test/scala/zio/lmdb/LMDBFeaturesSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ object LMDBFeaturesSpec extends ZIOSpecDefault with Commons {
} @@ tag("slow"),
// -----------------------------------------------------------------------------
test("safe update in place") {
def modifier(from: Num): Num = Num(from.value.intValue() + 1)

for {
id <- randomUUID
count = limit
colName <- randomCollectionName
col <- LMDB.collectionCreate[Num](colName)
shouldBeEmpty <- col.update(id, modifier)
_ <- col.upsertOverwrite(id, Num(0))
_ <- ZIO.foreachDiscard(1.to(count))(i => col.update(id, modifier))
num <- col.fetch(id)
} yield assertTrue(
shouldBeEmpty.isEmpty,
num.map(_.value.intValue()).contains(count)
)
}, // -----------------------------------------------------------------------------
test("safe upsert in place") {
def modifier(from: Option[Num]): Num = from match {
case None => Num(1)
case Some(num) => Num(num.value.intValue() + 1)
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "1.7.2-SNAPSHOT"
ThisBuild / version := "1.8.0-SNAPSHOT"

0 comments on commit 7cccbf5

Please sign in to comment.