diff --git a/Cargo.lock b/Cargo.lock index b09d78b..eae0128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,6 +788,7 @@ dependencies = [ "hyper", "hyper-proxy", "log", + "num-bigint", "prost", "rand", "reqwest", @@ -797,6 +798,7 @@ dependencies = [ "thiserror", "tokio", "uuid", + "v8_valueserializer", ] [[package]] @@ -857,6 +859,7 @@ dependencies = [ "tokio", "tokio-stream", "uuid", + "v8_valueserializer", ] [[package]] @@ -2474,6 +2477,21 @@ dependencies = [ "serde", ] +[[package]] +name = "v8_valueserializer" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97599c400fc79925922b58303e98fcb8fa88f573379a08ddb652e72cbd2e70f6" +dependencies = [ + "bitflags 2.4.1", + "encoding_rs", + "indexmap 2.1.0", + "num-bigint", + "serde", + "thiserror", + "wtf8", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -2725,6 +2743,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "wtf8" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01ae8492c38f52376efd3a17d0994b6bcf3df1e39c0226d458b7d81670b2a06" + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/Cargo.toml b/Cargo.toml index c16fdb8..c9ed9dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,4 +47,5 @@ tokio = { version = "1.33.0", features = ["full"] } tokio-stream = "0.1" tokio-util = "0.7" url = "2" -uuid = { version = "1.4.1", features = ["v4", "serde"] } \ No newline at end of file +uuid = { version = "1.4.1", features = ["v4", "serde"] } +v8_valueserializer = "0.1.1" diff --git a/denokv/Cargo.toml b/denokv/Cargo.toml index be542c2..9fe4e3e 100644 --- a/denokv/Cargo.toml +++ b/denokv/Cargo.toml @@ -45,5 +45,7 @@ uuid.workspace = true [dev-dependencies] denokv_remote.workspace = true +num-bigint.workspace = true tempfile.workspace = true -reqwest.workspace = true \ No newline at end of file +reqwest.workspace = true +v8_valueserializer.workspace = true diff --git a/denokv/main.rs b/denokv/main.rs index e6123ec..fbe3a53 100644 --- a/denokv/main.rs +++ b/denokv/main.rs @@ -629,6 +629,9 @@ impl From for ApiError { ApiError::UnknownValueEncoding(encoding) } SqliteBackendError::TypeMismatch(msg) => ApiError::TypeMismatch(msg), + x @ SqliteBackendError::SumOutOfRange => { + ApiError::TypeMismatch(x.to_string()) + } } } } diff --git a/denokv/tests/integration.rs b/denokv/tests/integration.rs index 5c70c56..f86f1cf 100644 --- a/denokv/tests/integration.rs +++ b/denokv/tests/integration.rs @@ -5,9 +5,16 @@ use std::process::Stdio; use denokv_proto::AtomicWrite; use denokv_proto::Database; +use denokv_proto::KvValue; use denokv_proto::ReadRange; +use denokv_remote::RemotePermissions; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; +use v8_valueserializer::value_eq; +use v8_valueserializer::Heap; +use v8_valueserializer::Value; +use v8_valueserializer::ValueDeserializer; +use v8_valueserializer::ValueSerializer; const ACCESS_TOKEN: &str = "1234abcd5678efgh"; @@ -229,7 +236,12 @@ async fn sum_type_mismatch() { checks: vec![], mutations: vec![denokv_proto::Mutation { key: vec![1], - kind: denokv_proto::MutationKind::Sum(denokv_proto::KvValue::U64(1)), + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::U64(1), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, expire_at: None, }], enqueues: vec![], @@ -240,4 +252,379 @@ async fn sum_type_mismatch() { assert!(err.to_string().contains( "Failed to perform 'sum' mutation on a non-U64 value in the database" )); + + let res = remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(1.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await; + + let err = res.unwrap_err(); + assert!(err.to_string().contains("unsupported value type")); +} + +#[tokio::test] +async fn sum_values() { + let (_child, addr) = start_server().await; + let client = reqwest::Client::new(); + let url = format!("http://localhost:{}", addr.port()).parse().unwrap(); + + let metadata_endpoint = denokv_remote::MetadataEndpoint { + url, + access_token: ACCESS_TOKEN.to_string(), + }; + + let remote = + denokv_remote::Remote::new(client, DummyPermissions, metadata_endpoint); + + // Sum(nothing, u64) -> u64 + let commit_result = remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::U64(42), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + assert_ne!(commit_result.versionstamp, [0; 10]); + + // Old sum semantics: u64 + u64 -> u64 + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::U64(1), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + + let entry = read_key_1(&remote).await; + assert!(matches!(entry.value, denokv_proto::KvValue::U64(43))); + + // Backward compat: u64 + v8(bigint) -> u64 + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(1.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + + let entry = read_key_1(&remote).await; + assert!(matches!(entry.value, denokv_proto::KvValue::U64(44))); + + // Reset to v8(bigint) + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Set(denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(42.into())) + .unwrap(), + )), + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + let entry = read_key_1(&remote).await; + let KvValue::V8(value) = &entry.value else { + panic!("expected v8 value"); + }; + let (value, heap) = ValueDeserializer::default().read(value).unwrap(); + assert!(value_eq( + (&value, &heap), + (&Value::BigInt(42.into()), &heap) + )); + + // v8(bigint) + v8(bigint) -> v8(bigint) + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(1.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + let entry = read_key_1(&remote).await; + let KvValue::V8(value) = &entry.value else { + panic!("expected v8 value"); + }; + let (value, heap) = ValueDeserializer::default().read(value).unwrap(); + assert!(value_eq( + (&value, &heap), + (&Value::BigInt(43.into()), &heap) + )); + + // v8(bigint) + v8(number) -> error + let res = remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::I32(1)) + .unwrap(), + ), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await; + + let err = res.unwrap_err(); + assert!(err.to_string().contains("Cannot sum BigInt with Number")); + + // clamp=false error + let res = remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(2.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(44.into())) + .unwrap(), + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await; + + let err = res.unwrap_err(); + assert!( + err + .to_string() + .contains("The result of a Sum operation would exceed its range limit"), + "{}", + err + ); + + // clamp=false ok + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(1.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(44.into())) + .unwrap(), + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + let entry = read_key_1(&remote).await; + let KvValue::V8(value) = &entry.value else { + panic!("expected v8 value"); + }; + let (value, heap) = ValueDeserializer::default().read(value).unwrap(); + assert!(value_eq( + (&value, &heap), + (&Value::BigInt(44.into()), &heap) + )); + + // clamp=true + remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![1], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(10.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(50.into())) + .unwrap(), + clamp: true, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + let entry = read_key_1(&remote).await; + let KvValue::V8(value) = &entry.value else { + panic!("expected v8 value"); + }; + let (value, heap) = ValueDeserializer::default().read(value).unwrap(); + assert!(value_eq( + (&value, &heap), + (&Value::BigInt(50.into()), &heap) + )); + + // Sum(nothing, v8(bigint)) -> v8(bigint) + let commit_result = remote + .atomic_write(AtomicWrite { + checks: vec![], + mutations: vec![denokv_proto::Mutation { + key: vec![2], + kind: denokv_proto::MutationKind::Sum { + value: denokv_proto::KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(10.into())) + .unwrap(), + ), + min_v8: vec![], + max_v8: vec![], + clamp: false, + }, + expire_at: None, + }], + enqueues: vec![], + }) + .await + .unwrap() + .expect("commit success"); + assert_ne!(commit_result.versionstamp, [0; 10]); + + let ranges = remote + .snapshot_read( + vec![ReadRange { + start: vec![2], + end: vec![3], + limit: NonZeroU32::try_from(1).unwrap(), + reverse: false, + }], + denokv_proto::SnapshotReadOptions { + consistency: denokv_proto::Consistency::Strong, + }, + ) + .await + .unwrap(); + assert_eq!(ranges.len(), 1); + let range = ranges.into_iter().next().unwrap(); + assert_eq!(range.entries.len(), 1); + assert_eq!(range.entries[0].key, vec![2]); + let KvValue::V8(value) = &range.entries[0].value else { + panic!("expected v8 value"); + }; + let (value, heap) = ValueDeserializer::default().read(value).unwrap(); + assert!(value_eq( + (&value, &heap), + (&Value::BigInt(10.into()), &heap) + )); +} + +async fn read_key_1( + remote: &denokv_remote::Remote

, +) -> denokv_proto::KvEntry { + let ranges = remote + .snapshot_read( + vec![ReadRange { + start: vec![1], + end: vec![2], + limit: NonZeroU32::try_from(1).unwrap(), + reverse: false, + }], + denokv_proto::SnapshotReadOptions { + consistency: denokv_proto::Consistency::Strong, + }, + ) + .await + .unwrap(); + assert_eq!(ranges.len(), 1); + let range = ranges.into_iter().next().unwrap(); + assert_eq!(range.entries.len(), 1); + assert_eq!(range.entries[0].key, vec![1]); + range.entries.into_iter().next().unwrap() } diff --git a/proto/convert.rs b/proto/convert.rs index 62a3c95..0d86453 100644 --- a/proto/convert.rs +++ b/proto/convert.rs @@ -136,7 +136,12 @@ impl TryFrom for AtomicWrite { (pb::MutationType::MSum, Some(value)) => { let value = decode_value(value.data, value.encoding as i64) .ok_or(ConvertError::DecodeError)?; - MutationKind::Sum(value) + MutationKind::Sum { + value, + min_v8: mutation.sum_min, + max_v8: mutation.sum_max, + clamp: mutation.sum_clamp, + } } (pb::MutationType::MMin, Some(value)) => { let value = decode_value(value.data, value.encoding as i64) diff --git a/proto/interface.rs b/proto/interface.rs index 9ad98a3..f3cce85 100644 --- a/proto/interface.rs +++ b/proto/interface.rs @@ -320,7 +320,12 @@ pub struct Enqueue { pub enum MutationKind { Set(KvValue), Delete, - Sum(KvValue), + Sum { + value: KvValue, + min_v8: Vec, + max_v8: Vec, + clamp: bool, + }, Min(KvValue), Max(KvValue), SetSuffixVersionstampedKey(KvValue), @@ -330,7 +335,7 @@ impl MutationKind { pub fn value(&self) -> Option<&KvValue> { match self { MutationKind::Set(value) => Some(value), - MutationKind::Sum(value) => Some(value), + MutationKind::Sum { value, .. } => Some(value), MutationKind::Min(value) => Some(value), MutationKind::Max(value) => Some(value), MutationKind::SetSuffixVersionstampedKey(value) => Some(value), diff --git a/proto/schema/datapath.proto b/proto/schema/datapath.proto index 68819da..e077bc3 100644 --- a/proto/schema/datapath.proto +++ b/proto/schema/datapath.proto @@ -102,6 +102,21 @@ message Mutation { // An expiry time for the value, in milliseconds since the Unix epoch (UTC). // If this is set to 0, the value will never expire. int64 expire_at_ms = 4; + + // A V8-encoded value that represents the minimum allowed value of the result + // of a `sum` operation. Only valid if `mutation_type` is `M_SUM` and + // `value.encoding` is `VE_V8`. + bytes sum_min = 5; + + // A V8-encoded value that represents the maximum allowed value of the result + // of a `sum` operation. Only valid if `mutation_type` is `M_SUM` and + // `value.encoding` is `VE_V8`. + bytes sum_max = 6; + + // If true, a result outside sum_min..=sum_max will be clamped. Otherwise, an + // error is returned and the atomic operation is not applied. Only valid if + // `mutation_type` is `M_SUM` and `value.encoding` is `VE_V8`. + bool sum_clamp = 7; } message KvValue { diff --git a/remote/lib.rs b/remote/lib.rs index 6522273..f68ebb7 100644 --- a/remote/lib.rs +++ b/remote/lib.rs @@ -580,6 +580,7 @@ impl Database for Remote

{ value: Some(encode_value_to_pb(value)), mutation_type: pb::MutationType::MSet as _, expire_at_ms, + ..Default::default() }); } denokv_proto::MutationKind::Delete => { @@ -588,14 +589,23 @@ impl Database for Remote

{ value: Some(encode_value_to_pb(KvValue::Bytes(vec![]))), mutation_type: pb::MutationType::MDelete as _, expire_at_ms, + ..Default::default() }); } - denokv_proto::MutationKind::Sum(value) => { + denokv_proto::MutationKind::Sum { + value, + min_v8, + max_v8, + clamp, + } => { mutations.push(pb::Mutation { key: mutation.key, value: Some(encode_value_to_pb(value)), mutation_type: pb::MutationType::MSum as _, expire_at_ms, + sum_min: min_v8, + sum_max: max_v8, + sum_clamp: clamp, }); } denokv_proto::MutationKind::Max(value) => { @@ -604,6 +614,7 @@ impl Database for Remote

{ value: Some(encode_value_to_pb(value)), mutation_type: pb::MutationType::MMax as _, expire_at_ms, + ..Default::default() }); } denokv_proto::MutationKind::Min(value) => { @@ -612,6 +623,7 @@ impl Database for Remote

{ value: Some(encode_value_to_pb(value)), mutation_type: pb::MutationType::MMin as _, expire_at_ms, + ..Default::default() }); } denokv_proto::MutationKind::SetSuffixVersionstampedKey(value) => { @@ -620,6 +632,7 @@ impl Database for Remote

{ value: Some(encode_value_to_pb(value)), mutation_type: pb::MutationType::MSetSuffixVersionstampedKey as _, expire_at_ms, + ..Default::default() }); } } diff --git a/sqlite/Cargo.toml b/sqlite/Cargo.toml index 347f207..d6e41ff 100644 --- a/sqlite/Cargo.toml +++ b/sqlite/Cargo.toml @@ -25,5 +25,6 @@ rusqlite.workspace = true serde_json.workspace = true thiserror.workspace = true tokio.workspace = true -tokio-stream.workspace = true uuid.workspace = true +v8_valueserializer.workspace = true +tokio-stream.workspace = true diff --git a/sqlite/backend.rs b/sqlite/backend.rs index da4221d..39450ff 100644 --- a/sqlite/backend.rs +++ b/sqlite/backend.rs @@ -1,6 +1,7 @@ // Copyright 2023 the Deno authors. All rights reserved. MIT license. use std::collections::HashSet; +use std::mem::discriminant; use std::time::Duration; use chrono::DateTime; @@ -18,6 +19,7 @@ use denokv_proto::ReadRangeOutput; use denokv_proto::SnapshotReadOptions; use denokv_proto::Versionstamp; use denokv_proto::VALUE_ENCODING_V8; +use num_bigint::BigInt; use rand::Rng; use rand::RngCore; use rusqlite::params; @@ -27,6 +29,7 @@ use rusqlite::Transaction; use thiserror::Error; use uuid::Uuid; +use crate::sum_operand::SumOperand; use crate::time::utc_now; use crate::SqliteNotifier; @@ -141,6 +144,9 @@ pub enum SqliteBackendError { #[error("{0}")] TypeMismatch(String), + + #[error("The result of a Sum operation would exceed its range limit")] + SumOutOfRange, } pub struct SqliteBackend { @@ -374,10 +380,27 @@ impl SqliteBackend { .execute(params![mutation.key])?; assert!(changed == 0 || changed == 1) } - MutationKind::Sum(operand) => { - mutate_le64(tx, &mutation.key, "sum", operand, version, |a, b| { - a.wrapping_add(b) - })?; + MutationKind::Sum { + value: operand, + min_v8, + max_v8, + clamp, + } => { + if matches!(operand, KvValue::U64(_)) { + mutate_le64(tx, &mutation.key, "sum", operand, version, |a, b| { + a.wrapping_add(b) + })?; + } else { + sum_v8( + tx, + &mutation.key, + operand, + min_v8.clone(), + max_v8.clone(), + *clamp, + version, + )?; + } } MutationKind::Min(operand) => { mutate_le64(tx, &mutation.key, "min", operand, version, |a, b| { @@ -807,6 +830,141 @@ fn mutate_le64( Ok(()) } +fn sum_v8( + tx: &Transaction, + key: &[u8], + operand: &KvValue, + min_v8: Vec, + max_v8: Vec, + clamp: bool, + new_version: i64, +) -> Result<(), SqliteBackendError> { + let (Ok(operand), Ok(result_min), Ok(result_max)) = ( + SumOperand::parse(operand), + SumOperand::parse_optional(&KvValue::V8(min_v8)), + SumOperand::parse_optional(&KvValue::V8(max_v8)), + ) else { + return Err(SqliteBackendError::TypeMismatch( + "Some of the parameters are not valid V8 values".into(), + )); + }; + + // min/max parameters, if any, must match the type of `operand` + if [&result_min, &result_max].into_iter().any(|x| { + x.as_ref() + .map(|x| discriminant(x) != discriminant(&operand)) + .unwrap_or_default() + }) { + return Err(SqliteBackendError::TypeMismatch( + "Min/max parameters have different types than the operand".into(), + )); + } + + let old_value = tx + .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)? + .query_row([key], |row| { + let value: Vec = row.get(0)?; + let encoding: i64 = row.get(1)?; + Ok((value, encoding)) + }) + .optional()?; + + let old_value = match old_value { + Some((value, encoding)) => { + SumOperand::parse(&decode_value(value, encoding).ok_or_else(|| { + SqliteBackendError::TypeMismatch("Invalid sum operand".into()) + })?) + .map_err(|e| SqliteBackendError::TypeMismatch(e.to_string()))? + } + + None => { + let (new_value, encoding) = encode_value_owned(operand.encode()); + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_SET)? + .execute(params![key, &new_value[..], encoding, new_version, -1i64,])?; + assert_eq!(changed, 1); + return Ok(()); + } + }; + + // Backward compat: sum(KvU64, bigint) -> KvU64 + let operand = match (&old_value, operand, &result_min, &result_max, &clamp) { + (SumOperand::KvU64(_), SumOperand::BigInt(x), None, None, false) + if x >= BigInt::from(0u64) && x <= BigInt::from(std::u64::MAX) => + { + SumOperand::KvU64(x.try_into().unwrap()) + } + (_, x, _, _, _) => x, + }; + + let output = (|| match (&old_value, &operand) { + (SumOperand::BigInt(current), SumOperand::BigInt(operand)) => { + let mut current = current + operand; + if let Some(SumOperand::BigInt(result_min)) = &result_min { + if current < *result_min { + if !clamp { + return Err(SqliteBackendError::SumOutOfRange); + } + current = result_min.clone(); + } + } + if let Some(SumOperand::BigInt(result_max)) = &result_max { + if current > *result_max { + if !clamp { + return Err(SqliteBackendError::SumOutOfRange); + } + current = result_max.clone(); + } + } + Ok(SumOperand::BigInt(current)) + } + (SumOperand::Number(current), SumOperand::Number(operand)) => { + let mut current = current + operand; + if let Some(SumOperand::Number(result_min)) = &result_min { + if current < *result_min { + if !clamp { + return Err(SqliteBackendError::SumOutOfRange); + } + current = *result_min; + } + } + if let Some(SumOperand::Number(result_max)) = &result_max { + if current > *result_max { + if !clamp { + return Err(SqliteBackendError::SumOutOfRange); + } + current = *result_max; + } + } + Ok(SumOperand::Number(current)) + } + (SumOperand::KvU64(current), SumOperand::KvU64(operand)) => { + if result_min.is_some() || result_max.is_some() { + return Err(SqliteBackendError::TypeMismatch( + "Cannot use min/max parameters with KvU64 operands".into(), + )); + } + Ok(SumOperand::KvU64(current.wrapping_add(*operand))) + } + _ => Err(SqliteBackendError::TypeMismatch(format!( + "Cannot sum {} with {}", + old_value.variant_name(), + operand.variant_name(), + ))), + })()?; + + let (new_value, encoding) = encode_value_owned(output.encode()); + let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ + key, + &new_value[..], + encoding as i32, + new_version, + -1i64, + ])?; + assert_eq!(changed, 1); + Ok(()) +} + fn version_to_versionstamp(version: i64) -> Versionstamp { let mut versionstamp = [0; 10]; versionstamp[..8].copy_from_slice(&version.to_be_bytes()); diff --git a/sqlite/lib.rs b/sqlite/lib.rs index 5782bc8..dadc18a 100644 --- a/sqlite/lib.rs +++ b/sqlite/lib.rs @@ -1,6 +1,7 @@ // Copyright 2023 the Deno authors. All rights reserved. MIT license. mod backend; +mod sum_operand; mod time; use std::collections::hash_map::Entry; diff --git a/sqlite/sum_operand.rs b/sqlite/sum_operand.rs new file mode 100644 index 0000000..019c83d --- /dev/null +++ b/sqlite/sum_operand.rs @@ -0,0 +1,82 @@ +// Copyright 2023 the Deno authors. All rights reserved. MIT license. + +use denokv_proto::KvValue; +use num_bigint::BigInt; +use thiserror::Error; +use v8_valueserializer::Heap; +use v8_valueserializer::ParseError; +use v8_valueserializer::Value; +use v8_valueserializer::ValueDeserializer; +use v8_valueserializer::ValueSerializer; + +#[derive(Clone, Debug)] +pub enum SumOperand { + BigInt(BigInt), + Number(f64), + KvU64(u64), +} + +#[derive(Error, Debug)] +pub enum InvalidSumOperandError { + #[error("invalid v8 value")] + InvalidV8Value(#[from] ParseError), + #[error("unsupported value type")] + UnsupportedValueType, + #[error("operand cannot be empty")] + OperandCannotBeEmpty, +} + +impl SumOperand { + pub fn variant_name(&self) -> &'static str { + match self { + Self::BigInt(_) => "BigInt", + Self::Number(_) => "Number", + Self::KvU64(_) => "KvU64", + } + } + + pub fn parse_optional( + value: &KvValue, + ) -> Result, InvalidSumOperandError> { + match value { + KvValue::V8(value) => { + if value.is_empty() { + return Ok(None); + } + let value = ValueDeserializer::default().read(value)?.0; + Ok(Some(match value { + Value::BigInt(x) => Self::BigInt(x), + Value::Double(x) => Self::Number(x), + Value::I32(x) => Self::Number(x as f64), + Value::U32(x) => Self::Number(x as f64), + _ => { + return Err(InvalidSumOperandError::UnsupportedValueType); + } + })) + } + KvValue::U64(x) => Ok(Some(Self::KvU64(*x))), + _ => Err(InvalidSumOperandError::UnsupportedValueType), + } + } + + pub fn parse(value: &KvValue) -> Result { + Self::parse_optional(value)? + .ok_or(InvalidSumOperandError::OperandCannotBeEmpty) + } + + pub fn encode(self) -> KvValue { + match self { + Self::BigInt(x) => KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::BigInt(x)) + .unwrap(), + ), + Self::Number(x) => KvValue::V8( + ValueSerializer::default() + .finish(&Heap::default(), &Value::Double(x)) + .unwrap(), + ), + Self::KvU64(x) => KvValue::U64(x), + } + } +}