Skip to content

Commit

Permalink
durability impl - bincode encode, decode for Db types
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon committed Feb 1, 2025
1 parent 779e6e2 commit ba41cce
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 52 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async_zip = "0.0.17"
aws-config = "1.5.10"
aws-sdk-s3 = "1.65.0"
axum = { version = "0.7.9", features = ["multipart"] }
bigdecimal = "0.4.7"
bigdecimal = { version = "0.4.7", features = ["serde"] }
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
bytes = "1.9.0"
cap-std = "3.4.2" # keep in sync with wasmtime
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ aws-sdk-s3 = { workspace = true }
bincode = { workspace = true }
bitflags = "2.6.0"
bigdecimal = { workspace = true }
bit-vec = "0.6"
bit-vec = { version = "0.6", features = ["serde"] }
bytes = { workspace = true }
cap-fs-ext = "3.4.2" # keep in sync with wasmtime
cap-std = { workspace = true }
Expand Down Expand Up @@ -65,7 +65,7 @@ iso8601-timestamp = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = "0.4.22"
mac_address = "1.1.7"
mac_address = { version = "1.1.7", features = ["serde"] }
md5 = "0.7.0"
metrohash = "1.0.7"
nonempty-collections = "0.2.9"
Expand Down
10 changes: 6 additions & 4 deletions golem-worker-executor-base/src/durable_host/rdbms/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,14 +578,17 @@ impl TryFrom<Timetz> for postgres_types::TimeTz {
let time = value.time.try_into()?;
let offset = chrono::offset::FixedOffset::west_opt(value.offset)
.ok_or("Offset value is not valid")?;
Ok(Self { time, offset })
Ok(Self {
time,
offset: offset.utc_minus_local(),
})
}
}

impl From<postgres_types::TimeTz> for Timetz {
fn from(v: postgres_types::TimeTz) -> Self {
let time = v.time.into();
let offset = v.offset.local_minus_utc();
let offset = v.offset;
Timetz { time, offset }
}
}
Expand Down Expand Up @@ -1922,7 +1925,6 @@ pub mod tests {
use assert2::check;
use bigdecimal::BigDecimal;
use bit_vec::BitVec;
use chrono::Offset;
use mac_address::MacAddress;
use serde_json::json;
use std::collections::Bound;
Expand Down Expand Up @@ -2036,7 +2038,7 @@ pub mod tests {
postgres_types::DbValue::Array(vec![postgres_types::DbValue::Timetz(
postgres_types::TimeTz::new(
chrono::NaiveTime::from_hms_opt(10, 20, 30).unwrap(),
chrono::Utc.fix(),
chrono::FixedOffset::east_opt(5 * 60 * 60).unwrap(),
),
)]),
postgres_types::DbValue::Array(vec![postgres_types::DbValue::Interval(
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/services/rdbms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use std::sync::Arc;
use url::Url;

pub trait RdbmsType: Debug + Display + Default + Send {
type DbColumn: Clone + Send + Sync + PartialEq + Debug + bincode::Decode + bincode::Encode;
type DbValue: Clone + Send + Sync + PartialEq + Debug;
type DbColumn: Clone + Send + Sync + PartialEq + Debug + Decode + Encode;
type DbValue: Clone + Send + Sync + PartialEq + Debug + Decode + Encode;
}

#[derive(Clone)]
Expand Down
16 changes: 8 additions & 8 deletions golem-worker-executor-base/src/services/rdbms/mysql/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Display for DbColumnType {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub enum DbValue {
Boolean(bool),
Tinyint(i8),
Expand All @@ -115,11 +115,11 @@ pub enum DbValue {
BigintUnsigned(u64),
Float(f32),
Double(f64),
Decimal(BigDecimal),
Date(chrono::NaiveDate),
Datetime(chrono::DateTime<chrono::Utc>),
Timestamp(chrono::DateTime<chrono::Utc>),
Time(chrono::NaiveTime),
Decimal(#[bincode(with_serde)] BigDecimal),
Date(#[bincode(with_serde)] chrono::NaiveDate),
Datetime(#[bincode(with_serde)] chrono::DateTime<chrono::Utc>),
Timestamp(#[bincode(with_serde)] chrono::DateTime<chrono::Utc>),
Time(#[bincode(with_serde)] chrono::NaiveTime),
Year(u16),
Fixchar(String),
Varchar(String),
Expand All @@ -135,8 +135,8 @@ pub enum DbValue {
Longblob(Vec<u8>),
Enumeration(String),
Set(String),
Bit(BitVec),
Json(serde_json::Value),
Bit(#[bincode(with_serde)] BitVec),
Json(#[bincode(with_serde)] serde_json::Value),
Null,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ fn set_value_helper<'a, S: PgValueSetter<'a>>(
}),
DbColumnType::Timetz => setter.try_set_db_value(value, value_category, |v| {
if let DbValue::Timetz(v) = v {
Some(PgTimeTz::from(v))
PgTimeTz::try_from(v).ok()
} else {
None
}
Expand Down Expand Up @@ -754,17 +754,20 @@ impl From<PgTimeTz> for TimeTz {
fn from(value: PgTimeTz) -> Self {
Self {
time: value.time,
offset: value.offset,
offset: value.offset.utc_minus_local(),
}
}
}

impl From<TimeTz> for PgTimeTz {
fn from(value: TimeTz) -> Self {
Self {
impl TryFrom<TimeTz> for PgTimeTz {
type Error = String;
fn try_from(value: TimeTz) -> Result<Self, Self::Error> {
let offset = chrono::offset::FixedOffset::west_opt(value.offset)
.ok_or("Offset value is not valid")?;
Ok(Self {
time: value.time,
offset: value.offset,
}
offset,
})
}
}

Expand Down
55 changes: 30 additions & 25 deletions golem-worker-executor-base/src/services/rdbms/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bincode::{Decode, Encode};
use bit_vec::BitVec;
use itertools::Itertools;
use mac_address::MacAddress;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
use std::net::IpAddr;
use std::ops::Bound;
Expand Down Expand Up @@ -135,7 +136,7 @@ impl NamedType for RangeType {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
pub struct ValuesRange<T> {
pub start: Bound<T>,
pub end: Bound<T>,
Expand Down Expand Up @@ -192,15 +193,19 @@ impl Display for Interval {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
pub struct TimeTz {
#[bincode(with_serde)]
pub time: chrono::NaiveTime,
pub offset: chrono::FixedOffset,
pub offset: i32,
}

impl TimeTz {
pub fn new(time: chrono::NaiveTime, offset: chrono::FixedOffset) -> Self {
TimeTz { time, offset }
TimeTz {
time,
offset: offset.utc_minus_local(),
}
}
}

Expand Down Expand Up @@ -234,7 +239,7 @@ impl Display for Enum {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub struct Composite {
pub name: String,
pub values: Vec<DbValue>,
Expand Down Expand Up @@ -263,7 +268,7 @@ impl NamedType for Composite {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub struct Domain {
pub name: String,
pub value: Box<DbValue>,
Expand All @@ -290,7 +295,7 @@ impl Display for Domain {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub struct Range {
pub name: String,
pub value: Box<ValuesRange<DbValue>>,
Expand Down Expand Up @@ -440,42 +445,42 @@ impl Display for DbColumnType {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub enum DbValue {
Character(i8),
Int2(i16),
Int4(i32),
Int8(i64),
Float4(f32),
Float8(f64),
Numeric(BigDecimal),
Numeric(#[bincode(with_serde)] BigDecimal),
Boolean(bool),
Timestamp(chrono::NaiveDateTime),
Timestamptz(chrono::DateTime<chrono::Utc>),
Date(chrono::NaiveDate),
Time(chrono::NaiveTime),
Timestamp(#[bincode(with_serde)] chrono::NaiveDateTime),
Timestamptz(#[bincode(with_serde)] chrono::DateTime<chrono::Utc>),
Date(#[bincode(with_serde)] chrono::NaiveDate),
Time(#[bincode(with_serde)] chrono::NaiveTime),
Timetz(TimeTz),
Interval(Interval),
Text(String),
Varchar(String),
Bpchar(String),
Bytea(Vec<u8>),
Json(serde_json::Value),
Jsonb(serde_json::Value),
Json(#[bincode(with_serde)] serde_json::Value),
Jsonb(#[bincode(with_serde)] serde_json::Value),
Jsonpath(String),
Xml(String),
Uuid(Uuid),
Inet(IpAddr),
Cidr(IpAddr),
Macaddr(MacAddress),
Bit(BitVec),
Varbit(BitVec),
Uuid(#[bincode(with_serde)] Uuid),
Inet(#[bincode(with_serde)] IpAddr),
Cidr(#[bincode(with_serde)] IpAddr),
Macaddr(#[bincode(with_serde)] MacAddress),
Bit(#[bincode(with_serde)] BitVec),
Varbit(#[bincode(with_serde)] BitVec),
Int4range(ValuesRange<i32>),
Int8range(ValuesRange<i64>),
Numrange(ValuesRange<BigDecimal>),
Tsrange(ValuesRange<chrono::NaiveDateTime>),
Tstzrange(ValuesRange<chrono::DateTime<chrono::Utc>>),
Daterange(ValuesRange<chrono::NaiveDate>),
Numrange(#[bincode(with_serde)] ValuesRange<BigDecimal>),
Tsrange(#[bincode(with_serde)] ValuesRange<chrono::NaiveDateTime>),
Tstzrange(#[bincode(with_serde)] ValuesRange<chrono::DateTime<chrono::Utc>>),
Daterange(#[bincode(with_serde)] ValuesRange<chrono::NaiveDate>),
Money(i64),
Oid(u32),
Enum(Enum),
Expand Down
5 changes: 2 additions & 3 deletions golem-worker-executor-base/src/services/rdbms/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::services::rdbms::{RdbmsPoolKey, RdbmsService};
use assert2::check;
use bigdecimal::BigDecimal;
use bit_vec::BitVec;
use chrono::Offset;
use golem_common::model::{ComponentId, WorkerId};
use golem_test_framework::components::rdb::docker_mysql::DockerMysqlRdbs;
use golem_test_framework::components::rdb::docker_postgres::DockerPostgresRdbs;
Expand Down Expand Up @@ -544,7 +543,7 @@ async fn postgres_create_insert_select_test(
postgres_types::DbValue::Time(chrono::NaiveTime::from_hms_opt(10, 20, 30).unwrap()),
postgres_types::DbValue::Timetz(postgres_types::TimeTz::new(
chrono::NaiveTime::from_hms_opt(10, 20, 30).unwrap(),
chrono::Utc.fix(),
chrono::FixedOffset::west_opt(3 * 60 * 60).unwrap(),
)),
postgres_types::DbValue::Interval(postgres_types::Interval::new(10, 20, 30)),
postgres_types::DbValue::Bytea("bytea".as_bytes().to_vec()),
Expand Down Expand Up @@ -1290,7 +1289,7 @@ async fn postgres_create_insert_select_array_test(
postgres_types::DbValue::Array(vec![postgres_types::DbValue::Timetz(
postgres_types::TimeTz::new(
chrono::NaiveTime::from_hms_opt(10, 20, 30).unwrap(),
chrono::Utc.fix(),
chrono::FixedOffset::east_opt(5 * 60 * 60).unwrap(),
),
)]),
postgres_types::DbValue::Array(vec![postgres_types::DbValue::Interval(
Expand Down

0 comments on commit ba41cce

Please sign in to comment.