From 957f35b7eabf5320cdaa16cd8068a02666b39a94 Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Sun, 2 Feb 2025 20:12:12 +0100 Subject: [PATCH] durability impl - wip --- .../src/durable_host/rdbms/postgres.rs | 173 +++++++++++++----- 1 file changed, 130 insertions(+), 43 deletions(-) diff --git a/golem-worker-executor-base/src/durable_host/rdbms/postgres.rs b/golem-worker-executor-base/src/durable_host/rdbms/postgres.rs index 5d406e2cc..2aa3900e3 100644 --- a/golem-worker-executor-base/src/durable_host/rdbms/postgres.rs +++ b/golem-worker-executor-base/src/durable_host/rdbms/postgres.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::durable_host::{DurabilityHost, DurableWorkerCtx}; +use crate::durable_host::serialized::SerializableError; +use crate::durable_host::{Durability, DurabilityHost, DurableWorkerCtx}; use crate::preview2::wasi::rdbms::postgres::{ Composite, CompositeType, Datebound, Daterange, DbColumn, DbColumnType, DbResult, DbRow, DbValue, Domain, DomainType, Enumeration, EnumerationType, Error, Host, HostDbConnection, @@ -23,11 +24,13 @@ use crate::preview2::wasi::rdbms::postgres::{ use crate::preview2::wasi::rdbms::types::Timetz; use crate::services::rdbms::postgres::types as postgres_types; use crate::services::rdbms::postgres::PostgresType; +use crate::services::rdbms::Error as RdbmsError; use crate::services::rdbms::RdbmsPoolKey; use crate::workerctx::WorkerCtx; use async_trait::async_trait; use bigdecimal::BigDecimal; use bit_vec::BitVec; +use golem_common::model::oplog::DurableFunctionType; use std::ops::{Bound, Deref}; use std::str::FromStr; use std::sync::Arc; @@ -118,33 +121,78 @@ impl HostDbConnection for DurableWorkerCtx { ) -> anyhow::Result> { self.observe_function_call("rdbms::postgres::db-connection", "query"); let worker_id = self.state.owned_worker_id.worker_id.clone(); - let pool_key = self - .as_wasi_view() - .table() - .get::(&self_)? - .pool_key - .clone(); - - match to_db_values(params, self.as_wasi_view().table()) { - Ok(params) => { - let result = self - .state - .rdbms_service - .postgres() - .query(&pool_key, &worker_id, &statement, params) - .await; + let durability = + Durability::, SerializableError>::new( + self, + "golem rdbms::postgres::db-connection", + "query", + DurableFunctionType::ReadRemote, + ) + .await?; + let result = if durability.is_live() { + let pool_key = self + .as_wasi_view() + .table() + .get::(&self_)? + .pool_key + .clone(); - match result { - Ok(result) => { - let result = from_db_result(result, self.as_wasi_view().table()) - .map_err(Error::QueryResponseFailure); - Ok(result) - } - Err(e) => Ok(Err(e.into())), + let params = to_db_values(params, self.as_wasi_view().table()) + .map_err(RdbmsError::QueryParameterFailure); + + let (params, result) = match params { + Ok(params) => { + let result = self + .state + .rdbms_service + .postgres() + .query(&pool_key, &worker_id, &statement, params.clone()) + .await; + (params, result) } + Err(error) => (vec![], Err(error)), + }; + let input = (pool_key.masked_address(), statement.clone(), params); + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { + Ok(result) => { + let result = from_db_result(result, self.as_wasi_view().table()) + .map_err(Error::QueryResponseFailure); + Ok(result) } - Err(error) => Ok(Err(Error::QueryParameterFailure(error))), + Err(e) => Ok(Err(e.into())), } + + // let pool_key = self + // .as_wasi_view() + // .table() + // .get::(&self_)? + // .pool_key + // .clone(); + // + // match to_db_values(params, self.as_wasi_view().table()) { + // Ok(params) => { + // let result = self + // .state + // .rdbms_service + // .postgres() + // .query(&pool_key, &worker_id, &statement, params) + // .await; + // + // match result { + // Ok(result) => { + // let result = from_db_result(result, self.as_wasi_view().table()) + // .map_err(Error::QueryResponseFailure); + // Ok(result) + // } + // Err(e) => Ok(Err(e.into())), + // } + // } + // Err(error) => Ok(Err(Error::QueryParameterFailure(error))), + // } } async fn execute( @@ -155,27 +203,66 @@ impl HostDbConnection for DurableWorkerCtx { ) -> anyhow::Result> { self.observe_function_call("rdbms::postgres::db-connection", "execute"); let worker_id = self.state.owned_worker_id.worker_id.clone(); - let pool_key = self - .as_wasi_view() - .table() - .get::(&self_)? - .pool_key - .clone(); - match to_db_values(params, self.as_wasi_view().table()) { - Ok(params) => { - let result = self - .state - .rdbms_service - .postgres() - .execute(&pool_key, &worker_id, &statement, params) - .await - .map_err(|e| e.into()); + let durability = Durability::::new( + self, + "golem rdbms::postgres::db-connection", + "execute", + DurableFunctionType::WriteRemote, + ) + .await?; + let result = if durability.is_live() { + let pool_key = self + .as_wasi_view() + .table() + .get::(&self_)? + .pool_key + .clone(); - Ok(result) - } - Err(error) => Ok(Err(Error::QueryParameterFailure(error))), - } + let params = to_db_values(params, self.as_wasi_view().table()) + .map_err(RdbmsError::QueryParameterFailure); + + let (params, result) = match params { + Ok(params) => { + let result = self + .state + .rdbms_service + .postgres() + .execute(&pool_key, &worker_id, &statement, params.clone()) + .await; + (params, result) + } + Err(error) => (vec![], Err(error)), + }; + let input = (pool_key.masked_address(), statement.clone(), params); + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + + Ok(result.map_err(Error::from)) + + // let pool_key = self + // .as_wasi_view() + // .table() + // .get::(&self_)? + // .pool_key + // .clone(); + // + // match to_db_values(params, self.as_wasi_view().table()) { + // Ok(params) => { + // let result = self + // .state + // .rdbms_service + // .postgres() + // .execute(&pool_key, &worker_id, &statement, params) + // .await + // .map_err(|e| e.into()); + // + // Ok(result) + // } + // Err(error) => Ok(Err(Error::QueryParameterFailure(error))), + // } } async fn begin_transaction(