Skip to content

Commit

Permalink
Implementing the new durability WIT host interface
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Jan 13, 2025
1 parent 376c1c7 commit c6cb594
Show file tree
Hide file tree
Showing 14 changed files with 2,976 additions and 1,192 deletions.
869 changes: 435 additions & 434 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ futures = "0.3"
futures-core = "0.3.31"
futures-util = "0.3.31"
git-version = "0.3.9"
# golem-wit = { version = "=1.1.1" }
golem-wit = { git = "https://github.com/golemcloud/golem-wit.git", branch = "vigoo/durability-host-functions" }
hex = "0.4.3"
http = "1.2.0" # keep in sync with wasmtime
humansize = "2.1.3"
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ fs-set-times = "0.20.2"
futures = { workspace = true }
futures-util = { workspace = true }
gethostname = "0.5.0"
golem-wit = { version = "=1.1.1" }
hex = { workspace = true }
http = { workspace = true }
http-body = "1.0.1" # keep in sync with wasmtime
Expand Down Expand Up @@ -110,6 +109,7 @@ tracing-subscriber = { workspace = true }

[build-dependencies]
cargo_metadata = "0.19.1"
golem-wit = { workspace = true }

[[test]]
name = "integration"
Expand Down
3 changes: 2 additions & 1 deletion golem-worker-executor-base/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn preview2_mod_gen(golem_wit_path: &str) -> String {
import golem:api/host@0.2.0;
import golem:api/host@1.1.0;
import golem:api/oplog@1.1.0;
import golem:api/durability@1.2.0;
import wasi:blobstore/blobstore;
import wasi:blobstore/container;
Expand Down Expand Up @@ -97,7 +98,7 @@ fn preview2_mod_gen(golem_wit_path: &str) -> String {
"wasi:keyvalue/types/outgoing-value": super::durable_host::keyvalue::types::OutgoingValueEntry,
"golem:api/host/get-workers": super::durable_host::golem::GetWorkersEntry,
"golem:api/oplog/get-oplog": super::durable_host::golem::v11::GetOplogEntry,
"golem:api/oplog/search-oplog": super::durable_host::golem::v11::SearchOplogEntry
"golem:api/oplog/search-oplog": super::durable_host::golem::v11::SearchOplogEntry,
}},
}});
"#
Expand Down
181 changes: 175 additions & 6 deletions golem-worker-executor-base/src/durable_host/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::durable_host::DurableWorkerCtx;
use crate::error::GolemError;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::preview2::golem;
use crate::preview2::golem::api1_2_0;
use crate::services::oplog::{CommitLevel, OplogOps};
use crate::workerctx::WorkerCtx;
use async_trait::async_trait;
Expand All @@ -27,6 +29,7 @@ use golem_common::serialization::{serialize, try_deserialize};
use golem_wasm_rpc::{IntoValue, IntoValueAndType, ValueAndType};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem::transmute;
use tracing::error;

#[derive(Debug)]
Expand All @@ -48,7 +51,7 @@ pub struct PersistedDurableFunctionInvocation {
#[async_trait]
pub trait DurabilityHost {
/// Observes a function call (produces logs and metrics)
fn observe_function_call(&self, interface: &'static str, function: &'static str);
fn observe_function_call(&self, interface: &str, function: &str);

/// Marks the beginning of a durable function.
///
Expand Down Expand Up @@ -102,9 +105,175 @@ pub trait DurabilityHost {
) -> Result<PersistedDurableFunctionInvocation, GolemError>;
}

impl From<api1_2_0::durability::DurableFunctionType> for DurableFunctionType {
fn from(value: api1_2_0::durability::DurableFunctionType) -> Self {
match value {
api1_2_0::durability::DurableFunctionType::WriteRemote => {
DurableFunctionType::WriteRemote
}
api1_2_0::durability::DurableFunctionType::WriteLocal => {
DurableFunctionType::WriteLocal
}
api1_2_0::durability::DurableFunctionType::WriteRemoteBatched(oplog_index) => {
DurableFunctionType::WriteRemoteBatched(oplog_index.map(OplogIndex::from_u64))
}
api1_2_0::durability::DurableFunctionType::ReadRemote => {
DurableFunctionType::ReadRemote
}
api1_2_0::durability::DurableFunctionType::ReadLocal => DurableFunctionType::ReadLocal,
}
}
}

impl From<DurableFunctionType> for api1_2_0::durability::DurableFunctionType {
fn from(value: DurableFunctionType) -> Self {
match value {
DurableFunctionType::WriteRemote => {
api1_2_0::durability::DurableFunctionType::WriteRemote
}
DurableFunctionType::WriteLocal => {
api1_2_0::durability::DurableFunctionType::WriteLocal
}
DurableFunctionType::WriteRemoteBatched(oplog_index) => {
api1_2_0::durability::DurableFunctionType::WriteRemoteBatched(
oplog_index.map(|idx| idx.into()),
)
}
DurableFunctionType::ReadRemote => {
api1_2_0::durability::DurableFunctionType::ReadRemote
}
DurableFunctionType::ReadLocal => api1_2_0::durability::DurableFunctionType::ReadLocal,
}
}
}

impl From<OplogEntryVersion> for api1_2_0::durability::OplogEntryVersion {
fn from(value: OplogEntryVersion) -> Self {
match value {
OplogEntryVersion::V1 => api1_2_0::durability::OplogEntryVersion::V1,
OplogEntryVersion::V2 => api1_2_0::durability::OplogEntryVersion::V2,
}
}
}

impl From<PersistedDurableFunctionInvocation>
for api1_2_0::durability::PersistedDurableFunctionInvocation
{
fn from(value: PersistedDurableFunctionInvocation) -> Self {
api1_2_0::durability::PersistedDurableFunctionInvocation {
timestamp: value.timestamp.into(),
function_name: value.function_name,
response: value.response,
function_type: value.function_type.into(),
entry_version: value.oplog_entry_version.into(),
}
}
}

#[async_trait]
impl<Ctx: WorkerCtx> api1_2_0::durability::Host for DurableWorkerCtx<Ctx> {
async fn observe_function_call(
&mut self,
iface: String,
function: String,
) -> anyhow::Result<()> {
DurabilityHost::observe_function_call(self, &iface, &function);
Ok(())
}

async fn begin_durable_function(
&mut self,
function_type: api1_2_0::durability::DurableFunctionType,
) -> anyhow::Result<api1_2_0::durability::OplogIndex> {
let oplog_idx = DurabilityHost::begin_durable_function(self, &function_type.into()).await?;
Ok(oplog_idx.into())
}

async fn end_durable_function(
&mut self,
function_type: api1_2_0::durability::DurableFunctionType,
begin_index: api1_2_0::durability::OplogIndex,
) -> anyhow::Result<()> {
DurabilityHost::end_durable_function(
self,
&function_type.into(),
OplogIndex::from_u64(begin_index),
)
.await?;
Ok(())
}

async fn current_durable_execution_state(
&mut self,
) -> anyhow::Result<api1_2_0::durability::DurableExecutionState> {
let state = DurabilityHost::durable_execution_state(self);
let persistence_level: golem::api0_2_0::host::PersistenceLevel =
state.persistence_level.into();
Ok(api1_2_0::durability::DurableExecutionState {
is_live: state.is_live,
persistence_level: persistence_level.into(),
})
}

async fn persist_durable_function_invocation(
&mut self,
function_name: String,
request: Vec<u8>,
response: Vec<u8>,
function_type: api1_2_0::durability::DurableFunctionType,
) -> anyhow::Result<()> {
DurabilityHost::persist_durable_function_invocation(
self,
function_name,
&request,
&response,
function_type.into(),
)
.await;
Ok(())
}

async fn persist_typed_durable_function_invocation(
&mut self,
function_name: String,
request: api1_2_0::durability::ValueAndType,
response: api1_2_0::durability::ValueAndType,
function_type: api1_2_0::durability::DurableFunctionType,
) -> anyhow::Result<()> {
let request = unsafe {
transmute::<
api1_2_0::durability::ValueAndType,
golem_wasm_rpc::golem::rpc::types::ValueAndType,
>(request)
};
let response = unsafe {
transmute::<
api1_2_0::durability::ValueAndType,
golem_wasm_rpc::golem::rpc::types::ValueAndType,
>(response)
};
DurabilityHost::persist_typed_durable_function_invocation(
self,
function_name,
request.into(),
response.into(),
function_type.into(),
)
.await;
Ok(())
}

async fn read_persisted_durable_function_invocation(
&mut self,
) -> anyhow::Result<api1_2_0::durability::PersistedDurableFunctionInvocation> {
let invocation = DurabilityHost::read_persisted_durable_function_invocation(self).await?;
Ok(invocation.into())
}
}

#[async_trait]
impl<Ctx: WorkerCtx> DurabilityHost for DurableWorkerCtx<Ctx> {
fn observe_function_call(&self, interface: &'static str, function: &'static str) {
fn observe_function_call(&self, interface: &str, function: &str) {
record_host_function_call(interface, function);
}

Expand Down Expand Up @@ -160,11 +329,11 @@ impl<Ctx: WorkerCtx> DurabilityHost for DurableWorkerCtx<Ctx> {
function_type: DurableFunctionType,
) {
let request = serialize(&request).unwrap_or_else(|err| {
panic!("failed to serialize request ({request:?}) for persisting durable function invocation: {err}")
}).to_vec();
panic!("failed to serialize request ({request:?}) for persisting durable function invocation: {err}")
}).to_vec();
let response = serialize(&response).unwrap_or_else(|err| {
panic!("failed to serialize response ({response:?}) for persisting durable function invocation: {err}")
}).to_vec();
panic!("failed to serialize response ({response:?}) for persisting durable function invocation: {err}")
}).to_vec();

self.state
.oplog
Expand Down
21 changes: 19 additions & 2 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use golem_wasm_rpc::golem::rpc::types::{
};
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::{
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue,
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitType,
WitValue,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -774,7 +775,23 @@ impl<Ctx: WorkerCtx> HostFutureInvokeResult for DurableWorkerCtx<Ctx> {
}

#[async_trait]
impl<Ctx: WorkerCtx> golem_wasm_rpc::Host for DurableWorkerCtx<Ctx> {}
impl<Ctx: WorkerCtx> golem_wasm_rpc::Host for DurableWorkerCtx<Ctx> {
// NOTE: these extract functions are only added as a workaround for the fact that the binding
// generator does not include types that are not used in any exported _functions_
async fn extract_value(
&mut self,
vnt: golem_wasm_rpc::golem::rpc::types::ValueAndType,
) -> anyhow::Result<WitValue> {
Ok(vnt.value)
}

async fn extract_type(
&mut self,
vnt: golem_wasm_rpc::golem::rpc::types::ValueAndType,
) -> anyhow::Result<WitType> {
Ok(vnt.typ)
}
}

/// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the
/// target component.
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub mod wasm {
.unwrap();
}

pub fn record_host_function_call(iface: &'static str, name: &'static str) {
pub fn record_host_function_call(iface: &str, name: &str) {
debug!("golem {iface}::{name} called");
HOST_FUNCTION_CALL_TOTAL
.with_label_values(&[iface, name])
Expand Down
4 changes: 3 additions & 1 deletion golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use golem_test_framework::dsl::to_worker_metadata;
use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, WasmRpc};
use golem_wasm_rpc::golem::rpc::types::{HostFutureInvokeResult, Pollable};
use golem_worker_executor_base::preview2::golem;
use golem_worker_executor_base::preview2::golem::api1_1_0;
use golem_worker_executor_base::preview2::golem::{api1_1_0, api1_2_0};
use golem_worker_executor_base::services::events::Events;
use golem_worker_executor_base::services::oplog::plugin::OplogProcessorPlugin;
use golem_worker_executor_base::services::plugins::{Plugins, PluginsObservations};
Expand Down Expand Up @@ -1013,6 +1013,8 @@ impl Bootstrap<TestWorkerCtx> for ServerBootstrap {
let mut linker = create_linker(engine, get_durable_ctx)?;
api0_2_0::host::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_1_0::host::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_1_0::oplog::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_2_0::durability::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
golem_wasm_rpc::golem::rpc::types::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
Ok(linker)
}
Expand Down
4 changes: 3 additions & 1 deletion golem-worker-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use async_trait::async_trait;
use golem_common::model::component::ComponentOwner;
use golem_common::model::plugin::{DefaultPluginOwner, DefaultPluginScope};
use golem_worker_executor_base::durable_host::DurableWorkerCtx;
use golem_worker_executor_base::preview2::golem::{api0_2_0, api1_1_0};
use golem_worker_executor_base::preview2::golem::{api0_2_0, api1_1_0, api1_2_0};
use golem_worker_executor_base::services::active_workers::ActiveWorkers;
use golem_worker_executor_base::services::blob_store::BlobStoreService;
use golem_worker_executor_base::services::component::ComponentService;
Expand Down Expand Up @@ -204,6 +204,8 @@ impl Bootstrap<Context> for ServerBootstrap {
let mut linker = create_linker(engine, get_durable_ctx)?;
api0_2_0::host::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_1_0::host::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_1_0::oplog::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
api1_2_0::durability::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
golem_wasm_rpc::golem::rpc::types::add_to_linker_get_host(&mut linker, get_durable_ctx)?;
Ok(linker)
}
Expand Down
2 changes: 1 addition & 1 deletion wasm-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ crate-type = ["cdylib", "rlib"]
harness = false

[dependencies]
wit-bindgen-rt = { version = "=0.26.0", features = ["bitflags"] }
wit-bindgen-rt = { version = "=0.36.0", features = ["bitflags"] }

arbitrary = { version = "1.4.1", features = ["derive"], optional = true }
async-recursion = { workspace = true, optional = true }
Expand Down
Loading

0 comments on commit c6cb594

Please sign in to comment.