Skip to content

Commit

Permalink
durability impl - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon committed Feb 2, 2025
1 parent 9dd255f commit 957f35b
Showing 1 changed file with 130 additions and 43 deletions.
173 changes: 130 additions & 43 deletions golem-worker-executor-base/src/durable_host/rdbms/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::durable_host::{DurabilityHost, DurableWorkerCtx};
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurabilityHost, DurableWorkerCtx};
use crate::preview2::wasi::rdbms::postgres::{
Composite, CompositeType, Datebound, Daterange, DbColumn, DbColumnType, DbResult, DbRow,
DbValue, Domain, DomainType, Enumeration, EnumerationType, Error, Host, HostDbConnection,
Expand All @@ -23,11 +24,13 @@ use crate::preview2::wasi::rdbms::postgres::{
use crate::preview2::wasi::rdbms::types::Timetz;
use crate::services::rdbms::postgres::types as postgres_types;
use crate::services::rdbms::postgres::PostgresType;
use crate::services::rdbms::Error as RdbmsError;
use crate::services::rdbms::RdbmsPoolKey;
use crate::workerctx::WorkerCtx;
use async_trait::async_trait;
use bigdecimal::BigDecimal;
use bit_vec::BitVec;
use golem_common::model::oplog::DurableFunctionType;
use std::ops::{Bound, Deref};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -118,33 +121,78 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
) -> anyhow::Result<Result<DbResult, Error>> {
self.observe_function_call("rdbms::postgres::db-connection", "query");
let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
.as_wasi_view()
.table()
.get::<PostgresDbConnection>(&self_)?
.pool_key
.clone();

match to_db_values(params, self.as_wasi_view().table()) {
Ok(params) => {
let result = self
.state
.rdbms_service
.postgres()
.query(&pool_key, &worker_id, &statement, params)
.await;
let durability =
Durability::<crate::services::rdbms::DbResult<PostgresType>, SerializableError>::new(
self,
"golem rdbms::postgres::db-connection",
"query",
DurableFunctionType::ReadRemote,
)
.await?;
let result = if durability.is_live() {
let pool_key = self
.as_wasi_view()
.table()
.get::<PostgresDbConnection>(&self_)?
.pool_key
.clone();

match result {
Ok(result) => {
let result = from_db_result(result, self.as_wasi_view().table())
.map_err(Error::QueryResponseFailure);
Ok(result)
}
Err(e) => Ok(Err(e.into())),
let params = to_db_values(params, self.as_wasi_view().table())
.map_err(RdbmsError::QueryParameterFailure);

let (params, result) = match params {
Ok(params) => {
let result = self
.state
.rdbms_service
.postgres()
.query(&pool_key, &worker_id, &statement, params.clone())
.await;
(params, result)
}
Err(error) => (vec![], Err(error)),
};
let input = (pool_key.masked_address(), statement.clone(), params);
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};
match result {
Ok(result) => {
let result = from_db_result(result, self.as_wasi_view().table())
.map_err(Error::QueryResponseFailure);
Ok(result)
}
Err(error) => Ok(Err(Error::QueryParameterFailure(error))),
Err(e) => Ok(Err(e.into())),
}

// let pool_key = self
// .as_wasi_view()
// .table()
// .get::<PostgresDbConnection>(&self_)?
// .pool_key
// .clone();
//
// match to_db_values(params, self.as_wasi_view().table()) {
// Ok(params) => {
// let result = self
// .state
// .rdbms_service
// .postgres()
// .query(&pool_key, &worker_id, &statement, params)
// .await;
//
// match result {
// Ok(result) => {
// let result = from_db_result(result, self.as_wasi_view().table())
// .map_err(Error::QueryResponseFailure);
// Ok(result)
// }
// Err(e) => Ok(Err(e.into())),
// }
// }
// Err(error) => Ok(Err(Error::QueryParameterFailure(error))),
// }
}

async fn execute(
Expand All @@ -155,27 +203,66 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
) -> anyhow::Result<Result<u64, Error>> {
self.observe_function_call("rdbms::postgres::db-connection", "execute");
let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
.as_wasi_view()
.table()
.get::<PostgresDbConnection>(&self_)?
.pool_key
.clone();

match to_db_values(params, self.as_wasi_view().table()) {
Ok(params) => {
let result = self
.state
.rdbms_service
.postgres()
.execute(&pool_key, &worker_id, &statement, params)
.await
.map_err(|e| e.into());
let durability = Durability::<u64, SerializableError>::new(
self,
"golem rdbms::postgres::db-connection",
"execute",
DurableFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let pool_key = self
.as_wasi_view()
.table()
.get::<PostgresDbConnection>(&self_)?
.pool_key
.clone();

Ok(result)
}
Err(error) => Ok(Err(Error::QueryParameterFailure(error))),
}
let params = to_db_values(params, self.as_wasi_view().table())
.map_err(RdbmsError::QueryParameterFailure);

let (params, result) = match params {
Ok(params) => {
let result = self
.state
.rdbms_service
.postgres()
.execute(&pool_key, &worker_id, &statement, params.clone())
.await;
(params, result)
}
Err(error) => (vec![], Err(error)),
};
let input = (pool_key.masked_address(), statement.clone(), params);
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};

Ok(result.map_err(Error::from))

// let pool_key = self
// .as_wasi_view()
// .table()
// .get::<PostgresDbConnection>(&self_)?
// .pool_key
// .clone();
//
// match to_db_values(params, self.as_wasi_view().table()) {
// Ok(params) => {
// let result = self
// .state
// .rdbms_service
// .postgres()
// .execute(&pool_key, &worker_id, &statement, params)
// .await
// .map_err(|e| e.into());
//
// Ok(result)
// }
// Err(error) => Ok(Err(Error::QueryParameterFailure(error))),
// }
}

async fn begin_transaction(
Expand Down

0 comments on commit 957f35b

Please sign in to comment.