diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/Cargo.lock b/Cargo.lock index 3c4c2c6e24..02f5a3c779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3747,6 +3747,7 @@ dependencies = [ "rand", "range-set-blaze", "regex", + "semver", "serde", "serde_json", "serde_yaml", @@ -4224,6 +4225,7 @@ dependencies = [ "async-fs", "async-lock", "async-mutex", + "async-scoped", "async-stream", "async-trait", "aws-config", @@ -4260,6 +4262,7 @@ dependencies = [ "hex", "http 1.2.0", "http-body 1.0.1", + "http-body-util", "humansize", "humantime-serde", "hyper 1.5.2", diff --git a/Cargo.toml b/Cargo.toml index 1578ab232a..0e9775237c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ exclude = [ "test-components/update-test-v3-11", "test-components/update-test-v4", "test-components/variant-service", + "test-components/wasi-http-incoming-request-handler", "test-components/write-stderr", "test-components/write-stdout", ] diff --git a/golem-common/Cargo.toml b/golem-common/Cargo.toml index 0c0be48346..eb5cba0020 100644 --- a/golem-common/Cargo.toml +++ b/golem-common/Cargo.toml @@ -52,6 +52,7 @@ prost-types = { workspace = true, optional = true } rand = { workspace = true } range-set-blaze = "0.1.16" regex = { workspace = true } +semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } diff --git a/golem-common/src/lib.rs b/golem-common/src/lib.rs index 5d084defe4..d7581adf7b 100644 --- a/golem-common/src/lib.rs +++ b/golem-common/src/lib.rs @@ -56,6 +56,8 @@ pub mod tracing; pub mod uri; +pub mod virtual_exports; + #[cfg(test)] test_r::enable!(); diff --git a/golem-common/src/model/component_metadata.rs b/golem-common/src/model/component_metadata.rs index 138321e593..980e19ade2 100644 --- a/golem-common/src/model/component_metadata.rs +++ b/golem-common/src/model/component_metadata.rs @@ -16,7 +16,7 @@ use bincode::{Decode, Encode}; use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; -use crate::SafeDisplay; +use crate::{virtual_exports, SafeDisplay}; use golem_wasm_ast::analysis::AnalysedFunctionParameter; use golem_wasm_ast::core::Mem; use golem_wasm_ast::metadata::Producers as WasmAstProducers; @@ -182,6 +182,7 @@ impl RawComponentMetadata { .map_err(ComponentProcessingError::Analysis)?; add_resource_drops(&mut exports); + add_virtual_exports(&mut exports); let exports = exports.into_iter().collect::>(); @@ -327,6 +328,17 @@ fn drop_from_constructor(constructor: &AnalysedFunction) -> AnalysedFunction { } } +fn add_virtual_exports(exports: &mut Vec) { + // Some interfaces like the golem/http:incoming-handler do not exist on the component, + // but are dynamically created by the worker executor based on other existing interfaces. + + if virtual_exports::http_incoming_handler::implements_required_interfaces(exports) { + exports.extend(vec![ + virtual_exports::http_incoming_handler::ANALYZED_EXPORT.clone(), + ]); + }; +} + #[cfg(feature = "protobuf")] mod protobuf { use crate::model::component_metadata::{ diff --git a/golem-common/src/model/exports.rs b/golem-common/src/model/exports.rs index 0675da80cf..adc7c6d2c2 100644 --- a/golem-common/src/model/exports.rs +++ b/golem-common/src/model/exports.rs @@ -16,24 +16,7 @@ use golem_wasm_ast::analysis::{AnalysedExport, AnalysedFunction, AnalysedInstanc use rib::{ParsedFunctionName, ParsedFunctionReference, ParsedFunctionSite}; -pub trait AnalysedExportExtensions { - fn function_names(&self) -> Vec; -} - -impl AnalysedExportExtensions for AnalysedExport { - fn function_names(&self) -> Vec { - match self { - AnalysedExport::Instance(instance) => instance - .functions - .iter() - .map(|function| format!("{}.{{{}}}", instance.name, function.name)) - .collect(), - AnalysedExport::Function(function) => vec![function.name.clone()], - } - } -} - -pub fn instances(exports: &Vec) -> Vec { +fn instances(exports: &Vec) -> Vec { let mut instances = vec![]; for export in exports { if let AnalysedExport::Instance(instance) = export { @@ -43,7 +26,7 @@ pub fn instances(exports: &Vec) -> Vec { instances } -pub fn functions(exports: &Vec) -> Vec { +fn functions(exports: &Vec) -> Vec { let mut functions = vec![]; for export in exports { if let AnalysedExport::Function(function) = export { diff --git a/golem-common/src/virtual_exports/http_incoming_handler.rs b/golem-common/src/virtual_exports/http_incoming_handler.rs new file mode 100644 index 0000000000..45215c8602 --- /dev/null +++ b/golem-common/src/virtual_exports/http_incoming_handler.rs @@ -0,0 +1,524 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use golem_wasm_ast::analysis::AnalysedType; +use golem_wasm_ast::analysis::{AnalysedExport, AnalysedFunction, AnalysedInstance}; +use golem_wasm_rpc::Value; +use lazy_static::lazy_static; +use semver::Version; + +// The following wit is modelled here: +// +// type fields = list>>; +// type body = list; +// +// record body-and-trailers { +// body: body, +// trailers: option +// } +// +// record request { +// method: method, +// path-with-query: string, +// scheme: scheme, +// authority: string, +// headers: simple-fields, +// body-and-trailers: option +// } +// +// record response { +// status: status-code, +// body: option +// } +// +// handle: func(request: request) -> response; +// + +lazy_static! { + pub static ref REQUIRED_FUNCTIONS: Vec = vec![ + rib::ParsedFunctionName { + site: rib::ParsedFunctionSite::PackagedInterface { + namespace: "wasi".to_string(), + package: "http".to_string(), + interface: "incoming-handler".to_string(), + version: Some(rib::SemVer(Version::new(0, 2, 0))) + }, + function: rib::ParsedFunctionReference::Function { + function: "handle".to_string() + } + }, + rib::ParsedFunctionName { + site: rib::ParsedFunctionSite::PackagedInterface { + namespace: "wasi".to_string(), + package: "http".to_string(), + interface: "incoming-handler".to_string(), + version: Some(rib::SemVer(Version::new(0, 2, 1))) + }, + function: rib::ParsedFunctionReference::Function { + function: "handle".to_string() + } + }, + rib::ParsedFunctionName { + site: rib::ParsedFunctionSite::PackagedInterface { + namespace: "wasi".to_string(), + package: "http".to_string(), + interface: "incoming-handler".to_string(), + version: Some(rib::SemVer(Version::new(0, 2, 2))) + }, + function: rib::ParsedFunctionReference::Function { + function: "handle".to_string() + } + }, + rib::ParsedFunctionName { + site: rib::ParsedFunctionSite::PackagedInterface { + namespace: "wasi".to_string(), + package: "http".to_string(), + interface: "incoming-handler".to_string(), + version: Some(rib::SemVer(Version::new(0, 2, 3))) + }, + function: rib::ParsedFunctionReference::Function { + function: "handle".to_string() + } + } + ]; + pub static ref PARSED_FUNCTION_NAME: rib::ParsedFunctionName = rib::ParsedFunctionName { + site: rib::ParsedFunctionSite::PackagedInterface { + namespace: "golem".to_string(), + package: "http".to_string(), + interface: "incoming-handler".to_string(), + version: None + }, + function: rib::ParsedFunctionReference::Function { + function: "handle".to_string() + } + }; + pub static ref ANALYZED_FUNCTION_PARAMETERS: Vec = { + use golem_wasm_ast::analysis::*; + vec![AnalysedFunctionParameter { + name: "request".to_string(), + typ: IncomingHttpRequest::analysed_type(), + }] + }; + pub static ref ANALYZED_FUNCTION_RESULTS: Vec = { + use golem_wasm_ast::analysis::*; + vec![AnalysedFunctionResult { + name: None, + typ: HttpResponse::analysed_type(), + }] + }; + pub static ref ANALYZED_FUNCTION: AnalysedFunction = { + use golem_wasm_ast::analysis::*; + + AnalysedFunction { + name: "handle".to_string(), + parameters: ANALYZED_FUNCTION_PARAMETERS.clone(), + results: ANALYZED_FUNCTION_RESULTS.clone(), + } + }; + pub static ref ANALYZED_EXPORT: AnalysedExport = AnalysedExport::Instance(AnalysedInstance { + name: "golem:http/incoming-handler".to_string(), + functions: vec![ANALYZED_FUNCTION.clone()] + }); +} + +pub fn implements_required_interfaces(exports: &[AnalysedExport]) -> bool { + let compatible_interfaces = [ + "wasi:http/incoming-handler@0.2.0".to_string(), + "wasi:http/incoming-handler@0.2.1".to_string(), + "wasi:http/incoming-handler@0.2.2".to_string(), + "wasi:http/incoming-handler@0.2.3".to_string(), + ]; + + exports.iter().any(|ae| match ae { + AnalysedExport::Instance(AnalysedInstance { name, .. }) => { + compatible_interfaces.contains(name) + } + _ => false, + }) +} + +macro_rules! extract { + ($expression:expr, $pattern:pat $(if $guard:expr)? $(,)?, $ret:expr, $err:expr) => { + match $expression { + $pattern $(if $guard)? => Ok($ret), + _ => Err($err) + } + }; +} + +pub enum HttpMethod { + GET, + HEAD, + POST, + PUT, + DELETE, + CONNECT, + OPTIONS, + TRACE, + PATCH, + Custom(String), +} + +impl HttpMethod { + pub fn analyzed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + AnalysedType::Variant(TypeVariant { + cases: vec![ + NameOptionTypePair { + name: "GET".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "HEAD".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "POST".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "PUT".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "DELETE".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "CONNECT".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "OPTIONS".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "TRACE".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "PATCH".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "Custom".to_string(), + typ: Some(AnalysedType::Str(TypeStr)), + }, + ], + }) + } + + pub fn from_value(value: &Value) -> Result { + let (case_idx, case_value) = extract!( + value, + Value::Variant { + case_idx, + case_value + }, + (case_idx, case_value), + "not a variant" + )?; + + match case_idx { + 0 => Ok(HttpMethod::GET), + 1 => Ok(HttpMethod::HEAD), + 2 => Ok(HttpMethod::POST), + 3 => Ok(HttpMethod::PUT), + 4 => Ok(HttpMethod::DELETE), + 5 => Ok(HttpMethod::CONNECT), + 6 => Ok(HttpMethod::OPTIONS), + 7 => Ok(HttpMethod::TRACE), + 8 => Ok(HttpMethod::PATCH), + 9 => { + let value = case_value.as_ref().ok_or("no case_value provided")?; + let custom_method = + extract!(*value.clone(), Value::String(inner), inner, "not a string")?; + Ok(HttpMethod::Custom(custom_method)) + } + _ => Err("unknown case")?, + } + } +} + +impl TryInto for HttpMethod { + type Error = http::method::InvalidMethod; + + fn try_into(self) -> Result { + match self { + Self::GET => Ok(http::Method::GET), + Self::HEAD => Ok(http::Method::HEAD), + Self::POST => Ok(http::Method::POST), + Self::PUT => Ok(http::Method::PUT), + Self::DELETE => Ok(http::Method::DELETE), + Self::CONNECT => Ok(http::Method::CONNECT), + Self::OPTIONS => Ok(http::Method::OPTIONS), + Self::TRACE => Ok(http::Method::TRACE), + Self::PATCH => Ok(http::Method::PATCH), + Self::Custom(method) => http::Method::from_bytes(method.as_bytes()), + } + } +} + +pub struct HttpFields(pub Vec<(String, Bytes)>); + +impl HttpFields { + pub fn analyzed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + AnalysedType::List(TypeList { + inner: Box::new(AnalysedType::Tuple(TypeTuple { + items: vec![ + AnalysedType::Str(TypeStr), + AnalysedType::List(TypeList { + inner: Box::new(AnalysedType::U8(TypeU8)), + }), + ], + })), + }) + } + + pub fn from_value(value: &Value) -> Result { + let mut result = Vec::new(); + + let list_values = extract!(value, Value::List(inner), inner, "not a list")?; + + for lv in list_values { + let tuple_value = extract!(lv, Value::Tuple(inner), inner, "not a tuple")?; + + let (name, values) = extract!( + tuple_value.as_slice(), + [Value::String(name), Value::List(values)], + (name.clone(), values), + "incompatible types" + )?; + + let mut result_value = Vec::new(); + + for v in values { + let v = extract!(v, Value::U8(inner), *inner, "not a byte")?; + result_value.push(v); + } + + result.push((name, Bytes::from(result_value))); + } + + Ok(HttpFields(result)) + } + + pub fn to_value(self) -> Value { + let mut list_values = Vec::new(); + + for (name, value) in self.0 { + let converted_bytes: Vec = value.into_iter().map(Value::U8).collect::>(); + + list_values.push(Value::Tuple(vec![ + Value::String(name), + Value::List(converted_bytes), + ])); + } + Value::List(list_values) + } +} + +pub struct HttpBodyContent(pub Bytes); + +impl HttpBodyContent { + pub fn analyzed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + AnalysedType::List(TypeList { + inner: Box::new(AnalysedType::U8(TypeU8)), + }) + } + + pub fn from_value(value: &Value) -> Result { + let mut result = Vec::new(); + + let list_values = extract!(value, Value::List(inner), inner, "not a list")?; + + for lv in list_values { + let byte_value = extract!(lv, Value::U8(inner), *inner, "not a byte")?; + result.push(byte_value); + } + + Ok(HttpBodyContent(Bytes::from(result))) + } + + pub fn to_value(self) -> Value { + let converted = self.0.into_iter().map(Value::U8).collect::>(); + Value::List(converted) + } +} + +pub struct HttpBody { + pub content: HttpBodyContent, + pub trailers: Option, +} + +impl HttpBody { + pub fn analysed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + + AnalysedType::Record(TypeRecord { + fields: vec![ + NameTypePair { + name: "content".to_string(), + typ: HttpBodyContent::analyzed_type(), + }, + NameTypePair { + name: "trailers".to_string(), + typ: AnalysedType::Option(TypeOption { + inner: Box::new(HttpFields::analyzed_type()), + }), + }, + ], + }) + } + + pub fn from_value(value: &Value) -> Result { + let record_values = extract!(value, Value::Record(inner), inner, "not a record")?; + + if record_values.len() != 2 { + Err("wrong length of record data")?; + }; + + let content = HttpBodyContent::from_value(&record_values[0])?; + let trailers = extract!( + &record_values[1], + Value::Option(inner), + match inner { + Some(inner) => Some(HttpFields::from_value(inner)?), + None => None, + }, + "not an option" + )?; + + Ok(HttpBody { content, trailers }) + } + pub fn to_value(self) -> Value { + let converted_content = self.content.to_value(); + let converted_trailers = Value::Option(self.trailers.map(|t| Box::new(t.to_value()))); + + Value::Record(vec![converted_content, converted_trailers]) + } +} + +pub struct IncomingHttpRequest { + pub uri: String, + pub method: HttpMethod, + pub headers: HttpFields, + pub body: Option, +} + +impl IncomingHttpRequest { + pub fn analysed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + + AnalysedType::Record(TypeRecord { + fields: vec![ + NameTypePair { + name: "uri".to_string(), + typ: AnalysedType::Str(TypeStr), + }, + NameTypePair { + name: "method".to_string(), + typ: HttpMethod::analyzed_type(), + }, + NameTypePair { + name: "headers".to_string(), + typ: HttpFields::analyzed_type(), + }, + NameTypePair { + name: "body".to_string(), + typ: AnalysedType::Option(TypeOption { + inner: Box::new(HttpBody::analysed_type()), + }), + }, + ], + }) + } + + pub fn from_function_input(inputs: &[Value]) -> Result { + if inputs.len() != 1 { + Err("invalid number of inputs")?; + }; + Self::from_value(&inputs[0]) + .map_err(|e| format!("Failed parsing input as http request: ${e}")) + } + + fn from_value(value: &Value) -> Result { + let record_values = extract!(value, Value::Record(inner), inner, "not a record")?; + + if record_values.len() != 4 { + Err("wrong length of record data")?; + }; + + let uri = extract!( + record_values[0].clone(), + Value::String(inner), + inner, + "not a string" + )?; + let method = HttpMethod::from_value(&record_values[1])?; + let headers = HttpFields::from_value(&record_values[2])?; + let body = extract!( + &record_values[3], + Value::Option(inner), + match inner { + Some(v) => Some(HttpBody::from_value(v)?), + None => None, + }, + "not an option" + )?; + + Ok(IncomingHttpRequest { + uri, + method, + headers, + body, + }) + } +} + +pub struct HttpResponse { + pub status: u16, + pub body: Option, +} + +impl HttpResponse { + pub fn analysed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + + AnalysedType::Record(TypeRecord { + fields: vec![ + NameTypePair { + name: "status".to_string(), + typ: AnalysedType::U16(TypeU16), + }, + NameTypePair { + name: "body".to_string(), + typ: AnalysedType::Option(TypeOption { + inner: Box::new(HttpBody::analysed_type()), + }), + }, + ], + }) + } + + pub fn to_value(self) -> Value { + let converted_status: Value = Value::U16(self.status); + let converted_body = Value::Option(self.body.map(|b| Box::new(b.to_value()))); + + Value::Record(vec![converted_status, converted_body]) + } +} diff --git a/golem-common/src/virtual_exports/mod.rs b/golem-common/src/virtual_exports/mod.rs new file mode 100644 index 0000000000..a3d76a162c --- /dev/null +++ b/golem-common/src/virtual_exports/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod http_incoming_handler; diff --git a/golem-worker-executor-base/Cargo.toml b/golem-worker-executor-base/Cargo.toml index 56ed5b0352..bfecadd834 100644 --- a/golem-worker-executor-base/Cargo.toml +++ b/golem-worker-executor-base/Cargo.toml @@ -25,6 +25,7 @@ golem-wasm-ast = { path = "../wasm-ast", version = "0.0.0" } golem-wasm-rpc = { path = "../wasm-rpc", version = "0.0.0", default-features = false, features = ["host"] } anyhow = { workspace = true } +async-scoped = "0.9.0" async-fs = { workspace = true } async-lock = "3.4.0" async-mutex = "1.4.0" @@ -52,6 +53,7 @@ gethostname = "0.5.0" hex = { workspace = true } http = { workspace = true } http-body = "1.0.1" # keep in sync with wasmtime +http-body-util = "0.1.0" # keep in sync with wasmtime humansize = { workspace = true } humantime-serde = { workspace = true } hyper = { workspace = true } diff --git a/golem-worker-executor-base/src/invocation.rs b/golem-worker-executor-base/src/invocation.rs index 5f50de7f79..3ad09ca4a3 100644 --- a/golem-worker-executor-base/src/invocation.rs +++ b/golem-worker-executor-base/src/invocation.rs @@ -12,19 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::error::GolemError; +use crate::metrics::wasm::{record_invocation, record_invocation_consumption}; +use crate::model::{InterruptKind, TrapType}; +use crate::virtual_export_compat; +use crate::workerctx::{PublicWorkerIo, WorkerCtx}; +use anyhow::anyhow; use golem_common::model::oplog::{WorkerError, WorkerResourceId}; use golem_common::model::WorkerStatus; +use golem_common::virtual_exports; use golem_wasm_rpc::wasmtime::{decode_param, encode_output, type_to_analysed_type}; use golem_wasm_rpc::Value; use rib::{ParsedFunctionName, ParsedFunctionReference}; use tracing::{debug, error}; use wasmtime::component::{Func, Val}; use wasmtime::{AsContextMut, StoreContextMut}; - -use crate::error::GolemError; -use crate::metrics::wasm::{record_invocation, record_invocation_consumption}; -use crate::model::{InterruptKind, TrapType}; -use crate::workerctx::{PublicWorkerIo, WorkerCtx}; +use wasmtime_wasi_http::bindings::Proxy; +use wasmtime_wasi_http::WasiHttpView; /// Invokes a function on a worker. /// @@ -108,10 +112,9 @@ pub fn find_first_available_function( let mut store = store.as_context_mut(); for name in names { let parsed = ParsedFunctionName::parse(&name).ok()?; - if find_function(&mut store, instance, &parsed) - .ok() - .flatten() - .is_some() + + if let Ok(FindFunctionResult::ExportedFunction(_)) = + find_function(&mut store, instance, &parsed) { return Some(name); } @@ -123,7 +126,21 @@ fn find_function<'a, Ctx: WorkerCtx>( mut store: &mut StoreContextMut<'a, Ctx>, instance: &'a wasmtime::component::Instance, parsed_function_name: &ParsedFunctionName, -) -> Result, GolemError> { +) -> Result { + if *parsed_function_name == *virtual_exports::http_incoming_handler::PARSED_FUNCTION_NAME { + return Ok(FindFunctionResult::IncomingHttpHandlerBridge); + }; + + let parsed_function_ref = parsed_function_name.function(); + + if matches!( + parsed_function_ref, + ParsedFunctionReference::RawResourceDrop { .. } + | ParsedFunctionReference::IndexedResourceDrop { .. } + ) { + return Ok(FindFunctionResult::ResourceDrop); + } + match &parsed_function_name.site().interface_name() { Some(interface_name) => { let exported_instance_idx = instance @@ -132,6 +149,7 @@ fn find_function<'a, Ctx: WorkerCtx>( "could not load exports for interface {}", interface_name )))?; + let func = instance .get_export( &mut store, @@ -141,38 +159,28 @@ fn find_function<'a, Ctx: WorkerCtx>( .and_then(|idx| instance.get_func(&mut store, idx)); match func { - Some(func) => Ok(Some(func)), - None => { - if matches!( - parsed_function_name.function(), - ParsedFunctionReference::RawResourceDrop { .. } - | ParsedFunctionReference::IndexedResourceDrop { .. } - ) { - Ok(None) - } else { - match parsed_function_name.method_as_static() { - None => Err(GolemError::invalid_request(format!( - "could not load function {} for interface {}", - &parsed_function_name.function().function_name(), - interface_name - ))), - Some(parsed_static) => instance - .get_export( - &mut store, - Some(&exported_instance_idx), - &parsed_static.function().function_name(), - ) - .and_then(|idx| instance.get_func(&mut store, idx)) - .ok_or(GolemError::invalid_request(format!( - "could not load function {} or {} for interface {}", - &parsed_function_name.function().function_name(), - &parsed_static.function().function_name(), - interface_name - ))) - .map(Some), - } - } - } + Some(func) => Ok(FindFunctionResult::ExportedFunction(func)), + None => match parsed_function_name.method_as_static() { + None => Err(GolemError::invalid_request(format!( + "could not load function {} for interface {}", + &parsed_function_name.function().function_name(), + interface_name + ))), + Some(parsed_static) => instance + .get_export( + &mut store, + Some(&exported_instance_idx), + &parsed_static.function().function_name(), + ) + .and_then(|idx| instance.get_func(&mut store, idx)) + .ok_or(GolemError::invalid_request(format!( + "could not load function {} or {} for interface {}", + &parsed_function_name.function().function_name(), + &parsed_static.function().function_name(), + interface_name + ))) + .map(FindFunctionResult::ExportedFunction), + }, } } None => instance @@ -181,7 +189,7 @@ fn find_function<'a, Ctx: WorkerCtx>( "could not load function {}", &parsed_function_name.function().function_name() ))) - .map(Some), + .map(FindFunctionResult::ExportedFunction), } } @@ -235,11 +243,16 @@ async fn invoke_or_fail( } let mut call_result = match function { - Some(function) => invoke(&mut store, function, &function_input, &full_function_name).await, - None => { + FindFunctionResult::ExportedFunction(function) => { + invoke(&mut store, function, &function_input, &full_function_name).await + } + FindFunctionResult::ResourceDrop => { // Special function: drop drop_resource(&mut store, &parsed, &function_input, &full_function_name).await } + FindFunctionResult::IncomingHttpHandlerBridge => { + invoke_http_handler(&mut store, instance, &function_input, &full_function_name).await + } }; if let Ok(r) = call_result.as_mut() { r.add_fuel(extra_fuel); @@ -270,12 +283,17 @@ async fn get_or_create_indexed_resource<'a, Ctx: WorkerCtx>( resource: resource_name.clone(), }, ); - let resource_constructor = find_function(store, instance, &resource_constructor_name)?.ok_or( - GolemError::invalid_request(format!( + + let resource_constructor = if let FindFunctionResult::ExportedFunction(func) = + find_function(store, instance, &resource_constructor_name)? + { + func + } else { + Err(GolemError::invalid_request(format!( "could not find resource constructor for resource {}", resource_name - )), - )?; + )))? + }; let constructor_param_types = resource_constructor.params(store as &StoreContextMut<'a, Ctx>).iter().map(type_to_analysed_type).collect::, _>>() .map_err(|err| GolemError::invalid_request(format!("Indexed resource invocation cannot be used with owned or borrowed resource handles in constructor parameter position! ({err})")))?; @@ -407,6 +425,118 @@ async fn invoke( } } +async fn invoke_http_handler( + store: &mut impl AsContextMut, + instance: &wasmtime::component::Instance, + function_input: &[Value], + raw_function_name: &str, +) -> Result { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + let proxy = Proxy::new(&mut *store, instance).unwrap(); + let mut store_context = store.as_context_mut(); + + store_context.data_mut().borrow_fuel().await?; + + let idempotency_key = store_context.data().get_current_idempotency_key().await; + if let Some(idempotency_key) = &idempotency_key { + store_context + .data() + .get_public_state() + .event_service() + .emit_invocation_start( + raw_function_name, + idempotency_key, + store_context.data().is_live(), + ); + } + + let (_, mut task_exits) = { + let hyper_request = + virtual_export_compat::http_incoming_handler::input_to_hyper_request(function_input)?; + let scheme = wasi_http_scheme_from_request(&hyper_request)?; + let incoming = store_context + .data_mut() + .durable_ctx_mut() + .as_wasi_http_view() + .new_incoming_request(scheme, hyper_request) + .unwrap(); + let outgoing = store_context + .data_mut() + .durable_ctx_mut() + .as_wasi_http_view() + .new_response_outparam(sender) + .unwrap(); + + unsafe { + async_scoped::TokioScope::scope_and_collect(|s| { + s.spawn(proxy.wasi_http_incoming_handler().call_handle( + store_context, + incoming, + outgoing, + )); + }) + .await + } + }; + + let res_or_error = match receiver.await { + Ok(Ok(resp)) => { + Ok(virtual_export_compat::http_incoming_handler::http_response_to_output(resp).await?) + } + Ok(Err(e)) => Err(anyhow::Error::from(e)), + Err(_) => { + // An error in the receiver (`RecvError`) only indicates that the + // task exited before a response was sent (i.e., the sender was + // dropped); it does not describe the underlying cause of failure. + // Instead we retrieve and propagate the error from inside the task + // which should more clearly tell the user what went wrong. Note + // that we assume the task has already exited at this point so the + // `await` should resolve immediately. + let task_exit = task_exits.remove(0); + let e = match task_exit { + Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"), + Err(_e) => anyhow!("failed joining wasm task"), + }; + Err(e)? + } + }; + + let mut store_context = store.as_context_mut(); + + let current_fuel_level = store_context.get_fuel().unwrap_or(0); + let consumed_fuel = store_context + .data_mut() + .return_fuel(current_fuel_level as i64) + .await?; + + if consumed_fuel > 0 { + debug!( + "Fuel consumed for call {raw_function_name}: {}", + consumed_fuel + ); + } + + if let Some(idempotency_key) = idempotency_key { + store_context + .data() + .get_public_state() + .event_service() + .emit_invocation_finished( + raw_function_name, + &idempotency_key, + store_context.data().is_live(), + ); + } + + record_invocation_consumption(consumed_fuel); + + match res_or_error { + Ok(resp) => Ok(InvokeResult::from_success(consumed_fuel, vec![resp])), + Err(e) => Ok(InvokeResult::from_error::(consumed_fuel, &e)), + } +} + async fn drop_resource( store: &mut impl AsContextMut, parsed_function_name: &ParsedFunctionName, @@ -613,3 +743,26 @@ impl InvokeResult { } } } + +enum FindFunctionResult { + ExportedFunction(Func), + ResourceDrop, + IncomingHttpHandlerBridge, +} + +fn wasi_http_scheme_from_request( + req: &hyper::Request, +) -> Result { + use http::uri::*; + use wasmtime_wasi_http::bindings::http::types::Scheme as WasiScheme; + + let raw_scheme = req.uri().scheme().ok_or(GolemError::invalid_request( + "Could not extract scheme from uri".to_string(), + ))?; + + match raw_scheme { + scheme if *scheme == Scheme::HTTP => Ok(WasiScheme::Http), + scheme if *scheme == Scheme::HTTPS => Ok(WasiScheme::Https), + scheme => Ok(WasiScheme::Other(scheme.to_string())), + } +} diff --git a/golem-worker-executor-base/src/lib.rs b/golem-worker-executor-base/src/lib.rs index c81bcca2a1..d1d7683415 100644 --- a/golem-worker-executor-base/src/lib.rs +++ b/golem-worker-executor-base/src/lib.rs @@ -22,6 +22,7 @@ pub mod model; pub mod preview2; pub mod services; pub mod storage; +pub mod virtual_export_compat; pub mod wasi_host; pub mod worker; pub mod workerctx; diff --git a/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs b/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs new file mode 100644 index 0000000000..a8cb20765e --- /dev/null +++ b/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs @@ -0,0 +1,125 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; + +use crate::error::GolemError; +use bytes::Bytes; +use golem_common::virtual_exports::http_incoming_handler::*; +use golem_wasm_rpc::Value; +use http::{HeaderName, HeaderValue}; +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; +use wasmtime_wasi_http::bindings::http::types::ErrorCode; + +pub fn input_to_hyper_request( + inputs: &[Value], +) -> Result>, GolemError> { + let request = IncomingHttpRequest::from_function_input(inputs).map_err(|e| { + GolemError::invalid_request(format!("Failed contructing incoming request: ${e}")) + })?; + + let mut builder = hyper::Request::builder() + .uri(request.uri) + .method(request.method); + + for (name, value) in request.headers.0 { + let converted = http::HeaderValue::from_bytes(&value) + .map_err(|e| GolemError::invalid_request(format!("Invalid header value: ${e}")))?; + + builder = builder.header(name, converted); + } + + let body = if let Some(b) = request.body { + let body = http_body_util::Full::new(b.content.0); + + let converted_trailers = if let Some(trailers) = b.trailers { + let mut converted_trailers = http::HeaderMap::new(); + for (name, value) in trailers.0.into_iter() { + let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| { + GolemError::invalid_request(format!("Failed to convert header name ${e}")) + })?; + let header_value = HeaderValue::from_bytes(&value).map_err(|e| { + GolemError::invalid_request(format!("Failed to convert header value ${e}")) + })?; + + converted_trailers.insert(header_name, header_value); + } + Some(Ok(converted_trailers)) + } else { + None + }; + + let with_trailers = body.with_trailers(async { converted_trailers }); + BoxBody::new(with_trailers.map_err(hyper_error_from_infallible)) + } else { + BoxBody::new(http_body_util::Empty::new().map_err(hyper_error_from_infallible)) + }; + + builder + .body(body) + .map_err(|e| GolemError::invalid_request(format!("Failed to attach body ${e}"))) +} + +pub async fn http_response_to_output( + response: http::Response>, +) -> Result { + use http_body_util::BodyExt; + + let status = response.status().as_u16(); + let collected = response.into_body().collect().await.map_err(|e| { + GolemError::runtime(format!("Failed collection body of http response: ${e}")) + })?; + + let trailers = collected.trailers().cloned(); + let bytes = collected.to_bytes(); + + let body = if !bytes.is_empty() { + let trailers = if let Some(hm) = trailers { + let mut result = Vec::new(); + let mut previous = None; + for (name, value) in hm.into_iter() { + let current = match name { + None => previous.clone().unwrap(), + Some(next) => { + previous = Some(next.clone()); + next + } + }; + result.push(( + current.to_string(), + Bytes::copy_from_slice(value.as_bytes()), + )) + } + Some(HttpFields(result)) + } else { + None + }; + + Some(HttpBody { + content: HttpBodyContent(bytes), + trailers, + }) + } else { + None + }; + + let response = HttpResponse { status, body }; + + Ok(response.to_value()) +} + +fn hyper_error_from_infallible(_infallible: Infallible) -> hyper::Error { + unreachable!() +} diff --git a/golem-worker-executor-base/src/virtual_export_compat/mod.rs b/golem-worker-executor-base/src/virtual_export_compat/mod.rs new file mode 100644 index 0000000000..a3d76a162c --- /dev/null +++ b/golem-worker-executor-base/src/virtual_export_compat/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod http_incoming_handler; diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index 765e478bbb..029c2cdef6 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock, Weak}; +use crate::durable_host::DurableWorkerCtxView; use crate::error::GolemError; use crate::model::{ CurrentResourceLimits, ExecutionStatus, InterruptKind, LastError, ListDirectoryResult, @@ -73,6 +74,7 @@ pub trait WorkerCtx: + Send + Sync + Sized + + DurableWorkerCtxView + 'static { /// PublicState is a subset of the worker context which is accessible outside the worker diff --git a/golem-worker-executor-base/tests/wasi.rs b/golem-worker-executor-base/tests/wasi.rs index c7de8dc350..d19085eebe 100644 --- a/golem-worker-executor-base/tests/wasi.rs +++ b/golem-worker-executor-base/tests/wasi.rs @@ -1849,3 +1849,47 @@ async fn ip_address_resolve( check!(result1.len() > 0); check!(result2.len() > 0); } + +#[test] +#[tracing::instrument] +async fn wasi_incoming_request_handler_compat( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let component_id = executor + .store_component("wasi-http-incoming-request-handler") + .await; + let worker_id = executor + .start_worker(&component_id, "wasi-http-incoming-request-handler-1") + .await; + + let args: Value = Value::Record(vec![ + Value::String("http://localhost:8000".to_string()), + Value::Variant { + case_idx: 0, + case_value: None, + }, + Value::List(vec![]), + Value::Option(None), + ]); + + let result = executor + .invoke_and_await( + &worker_id, + "golem:http/incoming-handler.{handle}", + vec![args], + ) + .await + .unwrap(); + + drop(executor); + + println!("foobar"); + + check!(result.len() == 1); + // check!(result[0] == Value::Record(vec![Value::U16(200), Value::Option(None)])); +} diff --git a/test-components/wasi-http-incoming-request-handler b/test-components/wasi-http-incoming-request-handler new file mode 160000 index 0000000000..ec24fdab5b --- /dev/null +++ b/test-components/wasi-http-incoming-request-handler @@ -0,0 +1 @@ +Subproject commit ec24fdab5bd993117caa85eb00d9f11f602f34fa diff --git a/test-components/wasi-http-incoming-request-handler.wasm b/test-components/wasi-http-incoming-request-handler.wasm new file mode 100644 index 0000000000..cfd8b2acac Binary files /dev/null and b/test-components/wasi-http-incoming-request-handler.wasm differ