Skip to content

Commit

Permalink
durability impl - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon committed Jan 29, 2025
1 parent b3a0519 commit af19d43
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 52 deletions.
81 changes: 64 additions & 17 deletions golem-worker-executor-base/src/durable_host/rdbms/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::durable_host::DurableWorkerCtx;
use crate::metrics::wasm::record_host_function_call;
use crate::durable_host::{DurabilityHost, DurableWorkerCtx};
use crate::preview2::wasi::rdbms::mysql::{
DbColumn, DbColumnType, DbResult, DbRow, DbValue, Error, Host, HostDbConnection,
HostDbResultStream, HostDbTransaction,
};
use crate::services::rdbms::mysql::types as mysql_types;
use crate::services::rdbms::mysql::MysqlType;
// use crate::services::rdbms::{Error as RdbmsError};
use crate::services::rdbms::RdbmsPoolKey;
use crate::workerctx::WorkerCtx;
use async_trait::async_trait;
Expand All @@ -29,6 +29,8 @@ use std::str::FromStr;
use std::sync::Arc;
use wasmtime::component::Resource;
use wasmtime_wasi::WasiView;
// use golem_common::model::oplog::DurableFunctionType;
// use crate::durable_host::serialized::SerializableError;

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {}
Expand All @@ -49,7 +51,7 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
&mut self,
address: String,
) -> anyhow::Result<Result<Resource<MysqlDbConnection>, Error>> {
record_host_function_call("rdbms::mysql::db-connection", "open");
self.observe_function_call("rdbms::mysql::db-connection", "open");

let worker_id = self.state.owned_worker_id.worker_id.clone();
let result = self
Expand All @@ -75,7 +77,7 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<Resource<DbResultStreamEntry>, Error>> {
record_host_function_call("rdbms::mysql::db-connection", "query-stream");
self.observe_function_call("rdbms::mysql::db-connection", "query-stream");
let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
.as_wasi_view()
Expand Down Expand Up @@ -115,7 +117,7 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<DbResult, Error>> {
record_host_function_call("rdbms::mysql::db-connection", "query");
self.observe_function_call("rdbms::mysql::db-connection", "query");
let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
.as_wasi_view()
Expand Down Expand Up @@ -149,8 +151,53 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<u64, Error>> {
record_host_function_call("rdbms::mysql::db-connection", "execute");
self.observe_function_call("rdbms::mysql::db-connection", "execute");
let worker_id = self.state.owned_worker_id.worker_id.clone();

// let durability = Durability::<Result<u64, RdbmsError>, SerializableError>::new(
// self,
// "golem rdbms::mysql::db-connection",
// "execute",
// DurableFunctionType::WriteRemote,
// )
// .await?;
// let result = if durability.is_live() {
// let pool_key = self
// .as_wasi_view()
// .table()
// .get::<MysqlDbConnection>(&self_)?
// .pool_key
// .clone();
//
// let params = params
// .into_iter()
// .map(|v| v.try_into())
// .collect::<Result<Vec<mysql_types::DbValue>, String>>().map_err(RdbmsError::QueryParameterFailure);
//
// let result = match params
// {
// Ok(params) => {
// self
// .state
// .rdbms_service
// .mysql()
// .execute(&pool_key, &worker_id, &statement, params)
// .await
// }
// Err(error) => Err(error),
// };
// let input = statement.clone();
// durability.persist(self, input, Ok(result)).await
// } else {
// durability.replay(self).await
// };
// match result {
// Ok(result) => Ok(result.map_err(Error::from)),
// Err(error) => {
// Ok(Err(error))
// }
// }

let pool_key = self
.as_wasi_view()
.table()
Expand Down Expand Up @@ -182,7 +229,7 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<MysqlDbConnection>,
) -> anyhow::Result<Result<Resource<DbTransactionEntry>, Error>> {
record_host_function_call("rdbms::mysql::db-connection", "begin-transaction");
self.observe_function_call("rdbms::mysql::db-connection", "begin-transaction");
let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
.as_wasi_view()
Expand All @@ -209,7 +256,7 @@ impl<Ctx: WorkerCtx> HostDbConnection for DurableWorkerCtx<Ctx> {
}

async fn drop(&mut self, rep: Resource<MysqlDbConnection>) -> anyhow::Result<()> {
record_host_function_call("rdbms::mysql::db-connection", "drop");
self.observe_function_call("rdbms::mysql::db-connection", "drop");

let worker_id = self.state.owned_worker_id.worker_id.clone();
let pool_key = self
Expand Down Expand Up @@ -250,7 +297,7 @@ impl<Ctx: WorkerCtx> HostDbResultStream for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<DbResultStreamEntry>,
) -> anyhow::Result<Vec<DbColumn>> {
record_host_function_call("rdbms::mysql::db-result-stream", "get-columns");
self.observe_function_call("rdbms::mysql::db-result-stream", "get-columns");

let internal = self
.as_wasi_view()
Expand All @@ -270,7 +317,7 @@ impl<Ctx: WorkerCtx> HostDbResultStream for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<DbResultStreamEntry>,
) -> anyhow::Result<Option<Vec<DbRow>>> {
record_host_function_call("rdbms::mysql::db-result-stream", "get-next");
self.observe_function_call("rdbms::mysql::db-result-stream", "get-next");
let internal = self
.as_wasi_view()
.table()
Expand All @@ -286,7 +333,7 @@ impl<Ctx: WorkerCtx> HostDbResultStream for DurableWorkerCtx<Ctx> {
}

async fn drop(&mut self, rep: Resource<DbResultStreamEntry>) -> anyhow::Result<()> {
record_host_function_call("rdbms::mysql::db-result-stream", "drop");
self.observe_function_call("rdbms::mysql::db-result-stream", "drop");
self.as_wasi_view()
.table()
.delete::<DbResultStreamEntry>(rep)?;
Expand Down Expand Up @@ -314,7 +361,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<DbResult, Error>> {
record_host_function_call("rdbms::mysql::db-transaction", "query");
self.observe_function_call("rdbms::mysql::db-transaction", "query");
match params
.into_iter()
.map(|v| v.try_into())
Expand Down Expand Up @@ -344,7 +391,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<Resource<DbResultStreamEntry>, Error>> {
record_host_function_call("rdbms::mysql::db-transaction", "query-stream");
self.observe_function_call("rdbms::mysql::db-transaction", "query-stream");
match params
.into_iter()
.map(|v| v.try_into())
Expand Down Expand Up @@ -377,7 +424,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
statement: String,
params: Vec<DbValue>,
) -> anyhow::Result<Result<u64, Error>> {
record_host_function_call("rdbms::mysql::db-transaction", "execute");
self.observe_function_call("rdbms::mysql::db-transaction", "execute");
match params
.into_iter()
.map(|v| v.try_into())
Expand All @@ -404,7 +451,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<DbTransactionEntry>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("rdbms::mysql::db-transaction", "commit");
self.observe_function_call("rdbms::mysql::db-transaction", "commit");
let internal = self
.as_wasi_view()
.table()
Expand All @@ -419,7 +466,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<DbTransactionEntry>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("rdbms::mysql::db-transaction", "query");
self.observe_function_call("rdbms::mysql::db-transaction", "query");
let internal = self
.as_wasi_view()
.table()
Expand All @@ -431,7 +478,7 @@ impl<Ctx: WorkerCtx> HostDbTransaction for DurableWorkerCtx<Ctx> {
}

async fn drop(&mut self, rep: Resource<DbTransactionEntry>) -> anyhow::Result<()> {
record_host_function_call("rdbms::mysql::db-result-stream", "drop");
self.observe_function_call("rdbms::mysql::db-result-stream", "drop");
let entry = self
.as_wasi_view()
.table()
Expand Down
Loading

0 comments on commit af19d43

Please sign in to comment.