From 37678c27715436de038d19c70f6e583e32083dd2 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 22 Jan 2025 01:18:18 +0100 Subject: [PATCH 1/3] Add http-handler binding type --- Cargo.lock | 1 + Cargo.toml | 2 + .../golem/apidefinition/api_definition.proto | 1 + golem-common/src/model/mod.rs | 1 + golem-common/src/model/protobuf.rs | 6 + .../virtual_exports/http_incoming_handler.rs | 206 ++++++++++++---- golem-worker-executor-base/Cargo.toml | 4 +- golem-worker-service-base/Cargo.toml | 1 + .../src/api/custom_http_request_api.rs | 3 + .../src/api/register_api_definition_api.rs | 78 +++++- .../http/http_api_definition.rs | 24 +- .../http/http_oas_api_definition.rs | 33 ++- .../gateway_binding_compiled.rs | 85 ++++++- .../gateway_binding/http_handler_binding.rs | 77 ++++++ .../src/gateway_binding/mod.rs | 106 +++++++-- .../src/gateway_binding/worker_binding.rs | 86 ++++++- .../worker_binding_compiled.rs | 134 ----------- .../file_server_binding_handler.rs | 184 +++++++------- .../gateway_binding_resolver.rs | 154 ++++++++++-- .../gateway_http_input_executor.rs | 32 +++ .../http_handler_binding_handler.rs | 225 ++++++++++++++++++ .../src/gateway_execution/mod.rs | 1 + .../rib_input_value_resolver.rs | 4 +- .../src/gateway_execution/to_response.rs | 25 ++ .../src/gateway_request/request_details.rs | 42 +--- .../src/service/gateway/api_definition.rs | 5 +- .../gateway/http_api_definition_validator.rs | 2 +- .../tests/api_gateway_end_to_end_tests.rs | 23 ++ golem-worker-service/src/api/mod.rs | 1 + golem-worker-service/src/service/mod.rs | 12 + 30 files changed, 1187 insertions(+), 371 deletions(-) create mode 100644 golem-worker-service-base/src/gateway_binding/http_handler_binding.rs delete mode 100644 golem-worker-service-base/src/gateway_binding/worker_binding_compiled.rs create mode 100644 golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 02f5a3c779..88214a2566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4386,6 +4386,7 @@ dependencies = [ "golem-wasm-ast", "golem-wasm-rpc", "http 1.2.0", + "http-body-util", "humantime-serde", "hyper 1.5.2", "lazy_static 1.5.0", diff --git a/Cargo.toml b/Cargo.toml index 618edab19b..be254b5ca0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,8 @@ git-version = "0.3.9" golem-wit = { version = "=1.1.2" } hex = "0.4.3" http = "1.2.0" # keep in sync with wasmtime +http-body = "1.0.1" # keep in sync with wasmtime +http-body-util = "0.1.0" # keep in sync with wasmtime humansize = "2.1.3" humantime-serde = "1.1.1" hyper = { version = "1.5.1", features = ["full"] } # keep in sync with wasmtime diff --git a/golem-api-grpc/proto/golem/apidefinition/api_definition.proto b/golem-api-grpc/proto/golem/apidefinition/api_definition.proto index edb5a6b0d0..36a4ba7e35 100644 --- a/golem-api-grpc/proto/golem/apidefinition/api_definition.proto +++ b/golem-api-grpc/proto/golem/apidefinition/api_definition.proto @@ -119,6 +119,7 @@ enum GatewayBindingType { FILE_SERVER = 1; CORS_PREFLIGHT = 2; AUTH_CALL_BACK = 3; + HTTP_HANDLER = 4; } // Used in api definition repo and needs to be backward compatible diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 33ade7dfd5..e9916891ba 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -2186,6 +2186,7 @@ pub enum GatewayBindingType { #[default] Default, FileServer, + HttpHandler, CorsPreflight, } diff --git a/golem-common/src/model/protobuf.rs b/golem-common/src/model/protobuf.rs index 8b57eeb398..13ca6ca05f 100644 --- a/golem-common/src/model/protobuf.rs +++ b/golem-common/src/model/protobuf.rs @@ -736,6 +736,9 @@ impl From for G golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::FileServer => { GatewayBindingType::FileServer } + golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::HttpHandler => { + GatewayBindingType::HttpHandler + } golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::CorsPreflight => { GatewayBindingType::CorsPreflight } @@ -755,6 +758,9 @@ impl From for golem_api_grpc::proto::golem::apidefinition::G GatewayBindingType::FileServer => { golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::FileServer } + GatewayBindingType::HttpHandler => { + golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::HttpHandler + } GatewayBindingType::CorsPreflight => { golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::CorsPreflight } diff --git a/golem-common/src/virtual_exports/http_incoming_handler.rs b/golem-common/src/virtual_exports/http_incoming_handler.rs index ef3b6333b2..ccab519a1b 100644 --- a/golem-common/src/virtual_exports/http_incoming_handler.rs +++ b/golem-common/src/virtual_exports/http_incoming_handler.rs @@ -16,8 +16,8 @@ use bytes::Bytes; use golem_wasm_ast::analysis::AnalysedType; use golem_wasm_ast::analysis::{AnalysedExport, AnalysedFunction, AnalysedInstance}; use golem_wasm_rpc::Value; -use lazy_static::lazy_static; use semver::Version; +use std::sync::LazyLock; // The following wit is modelled here: // @@ -45,92 +45,110 @@ use semver::Version; // handle: func(request: request) -> response; // -lazy_static! { - pub static ref REQUIRED_FUNCTIONS: Vec = vec![ +pub static REQUIRED_FUNCTIONS: LazyLock> = LazyLock::new(|| { + vec![ rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { namespace: "wasi".to_string(), package: "http".to_string(), interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 0))) + version: Some(rib::SemVer(Version::new(0, 2, 0))), }, function: rib::ParsedFunctionReference::Function { - function: "handle".to_string() - } + function: "handle".to_string(), + }, }, rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { namespace: "wasi".to_string(), package: "http".to_string(), interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 1))) + version: Some(rib::SemVer(Version::new(0, 2, 1))), }, function: rib::ParsedFunctionReference::Function { - function: "handle".to_string() - } + function: "handle".to_string(), + }, }, rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { namespace: "wasi".to_string(), package: "http".to_string(), interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 2))) + version: Some(rib::SemVer(Version::new(0, 2, 2))), }, function: rib::ParsedFunctionReference::Function { - function: "handle".to_string() - } + function: "handle".to_string(), + }, }, rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { namespace: "wasi".to_string(), package: "http".to_string(), interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 3))) + version: Some(rib::SemVer(Version::new(0, 2, 3))), }, function: rib::ParsedFunctionReference::Function { - function: "handle".to_string() - } - } - ]; - pub static ref PARSED_FUNCTION_NAME: rib::ParsedFunctionName = rib::ParsedFunctionName { + function: "handle".to_string(), + }, + }, + ] +}); + +pub static PARSED_FUNCTION_NAME: LazyLock = + LazyLock::new(|| rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { namespace: "golem".to_string(), package: "http".to_string(), interface: "incoming-handler".to_string(), - version: None + version: None, }, function: rib::ParsedFunctionReference::Function { - function: "handle".to_string() - } - }; - pub static ref ANALYZED_FUNCTION_PARAMETERS: Vec = { - use golem_wasm_ast::analysis::*; + function: "handle".to_string(), + }, + }); + +pub static ANALYZED_FUNCTION_PARAMETERS: LazyLock< + Vec, +> = { + use golem_wasm_ast::analysis::*; + LazyLock::new(|| { vec![AnalysedFunctionParameter { name: "request".to_string(), typ: IncomingHttpRequest::analysed_type(), }] - }; - pub static ref ANALYZED_FUNCTION_RESULTS: Vec = { - use golem_wasm_ast::analysis::*; + }) +}; + +pub static ANALYZED_FUNCTION_RESULTS: LazyLock< + Vec, +> = { + use golem_wasm_ast::analysis::*; + LazyLock::new(|| { vec![AnalysedFunctionResult { name: None, typ: HttpResponse::analysed_type(), }] - }; - pub static ref ANALYZED_FUNCTION: AnalysedFunction = { - use golem_wasm_ast::analysis::*; + }) +}; - AnalysedFunction { - name: "handle".to_string(), - parameters: ANALYZED_FUNCTION_PARAMETERS.clone(), - results: ANALYZED_FUNCTION_RESULTS.clone(), - } - }; - pub static ref ANALYZED_EXPORT: AnalysedExport = AnalysedExport::Instance(AnalysedInstance { - name: "golem:http/incoming-handler".to_string(), - functions: vec![ANALYZED_FUNCTION.clone()] - }); -} +pub static ANALYZED_FUNCTION: LazyLock = { + use golem_wasm_ast::analysis::*; + + LazyLock::new(|| AnalysedFunction { + name: "handle".to_string(), + parameters: ANALYZED_FUNCTION_PARAMETERS.clone(), + results: ANALYZED_FUNCTION_RESULTS.clone(), + }) +}; + +pub const FUNCTION_NAME: &str = "golem:http/incoming-handler"; + +pub static ANALYZED_EXPORT: LazyLock = LazyLock::new(|| { + AnalysedExport::Instance(AnalysedInstance { + name: FUNCTION_NAME.to_string(), + functions: vec![ANALYZED_FUNCTION.clone()], + }) +}); pub fn implements_required_interfaces(exports: &[AnalysedExport]) -> bool { let compatible_interfaces = [ @@ -249,6 +267,68 @@ impl HttpMethod { _ => Err("unknown case")?, } } + + pub fn to_value(self) -> Value { + match self { + HttpMethod::GET => Value::Variant { + case_idx: 0, + case_value: None, + }, + HttpMethod::HEAD => Value::Variant { + case_idx: 1, + case_value: None, + }, + HttpMethod::POST => Value::Variant { + case_idx: 2, + case_value: None, + }, + HttpMethod::PUT => Value::Variant { + case_idx: 3, + case_value: None, + }, + HttpMethod::DELETE => Value::Variant { + case_idx: 4, + case_value: None, + }, + HttpMethod::CONNECT => Value::Variant { + case_idx: 5, + case_value: None, + }, + HttpMethod::OPTIONS => Value::Variant { + case_idx: 6, + case_value: None, + }, + HttpMethod::TRACE => Value::Variant { + case_idx: 7, + case_value: None, + }, + HttpMethod::PATCH => Value::Variant { + case_idx: 8, + case_value: None, + }, + HttpMethod::Custom(custom_method) => Value::Variant { + case_idx: 9, + case_value: Some(Box::new(Value::String(custom_method))), + }, + } + } + + pub fn from_http_method(value: http::Method) -> Self { + use http::Method as M; + + match value { + M::GET => HttpMethod::GET, + M::CONNECT => HttpMethod::CONNECT, + M::DELETE => HttpMethod::DELETE, + M::HEAD => HttpMethod::HEAD, + M::OPTIONS => HttpMethod::OPTIONS, + M::PATCH => HttpMethod::PATCH, + M::POST => HttpMethod::POST, + M::PUT => HttpMethod::PUT, + M::TRACE => HttpMethod::TRACE, + other => HttpMethod::Custom(other.to_string()), + } + } } impl TryInto for HttpMethod { @@ -487,6 +567,15 @@ impl IncomingHttpRequest { body, }) } + + pub fn to_value(self) -> Value { + Value::Record(vec![ + Value::String(self.uri), + self.method.to_value(), + self.headers.to_value(), + Value::Option(self.body.map(|b| Box::new(b.to_value()))), + ]) + } } pub struct HttpResponse { @@ -519,6 +608,41 @@ impl HttpResponse { }) } + pub fn from_value(value: Value) -> Result { + let record_values = extract!(value, Value::Record(inner), inner, "not a record")?; + + if record_values.len() != 3 { + Err("wrong length of record data")?; + }; + + let status = extract!( + record_values[0].clone(), + Value::U16(inner), + inner, + "not a u16" + )?; + + let headers = HttpFields::from_value(&record_values[1])?; + + let body = extract!( + &record_values[2], + Value::Option(inner), + inner.as_ref(), + "not an option" + )?; + let body = if let Some(b) = body { + Some(HttpBodyAndTrailers::from_value(b)?) + } else { + None + }; + + Ok(HttpResponse { + status, + headers, + body, + }) + } + pub fn to_value(self) -> Value { let converted_status: Value = Value::U16(self.status); let converted_headers: Value = self.headers.to_value(); diff --git a/golem-worker-executor-base/Cargo.toml b/golem-worker-executor-base/Cargo.toml index bfecadd834..5015829edc 100644 --- a/golem-worker-executor-base/Cargo.toml +++ b/golem-worker-executor-base/Cargo.toml @@ -52,8 +52,8 @@ futures-util = { workspace = true } gethostname = "0.5.0" hex = { workspace = true } http = { workspace = true } -http-body = "1.0.1" # keep in sync with wasmtime -http-body-util = "0.1.0" # keep in sync with wasmtime +http-body = { workspace = true } +http-body-util = { workspace = true } humansize = { workspace = true } humantime-serde = { workspace = true } hyper = { workspace = true } diff --git a/golem-worker-service-base/Cargo.toml b/golem-worker-service-base/Cargo.toml index a3b43d7163..50a28dd4dc 100644 --- a/golem-worker-service-base/Cargo.toml +++ b/golem-worker-service-base/Cargo.toml @@ -39,6 +39,7 @@ figment = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } http = { workspace = true } +http-body-util = { workspace = true } humantime-serde = { workspace = true } hyper = { workspace = true } lazy_static = { workspace = true } diff --git a/golem-worker-service-base/src/api/custom_http_request_api.rs b/golem-worker-service-base/src/api/custom_http_request_api.rs index 365f05130f..d444182ed2 100644 --- a/golem-worker-service-base/src/api/custom_http_request_api.rs +++ b/golem-worker-service-base/src/api/custom_http_request_api.rs @@ -23,6 +23,7 @@ use crate::gateway_execution::gateway_http_input_executor::{ DefaultGatewayInputExecutor, GatewayHttpInputExecutor, }; use crate::gateway_execution::gateway_session::GatewaySession; +use crate::gateway_execution::http_handler_binding_handler::HttpHandlerBindingHandler; use crate::gateway_execution::GatewayWorkerRequestExecutor; use crate::gateway_request::http_request::InputHttpRequest; use crate::gateway_rib_interpreter::DefaultRibInterpreter; @@ -47,6 +48,7 @@ impl CustomHttpRequestApi { + Send, >, file_server_binding_handler: Arc + Sync + Send>, + http_handler_binding_handler: Arc + Sync + Send>, gateway_session_store: Arc, ) -> Self { let evaluator = Arc::new(DefaultRibInterpreter::from_worker_request_executor( @@ -59,6 +61,7 @@ impl CustomHttpRequestApi { evaluator, file_server_binding_handler, auth_call_back_binding_handler, + http_handler_binding_handler, api_definition_lookup_service, gateway_session_store, identity_provider: Arc::new(DefaultIdentityProvider), diff --git a/golem-worker-service-base/src/api/register_api_definition_api.rs b/golem-worker-service-base/src/api/register_api_definition_api.rs index 0e8038bb7b..09e4538588 100644 --- a/golem-worker-service-base/src/api/register_api_definition_api.rs +++ b/golem-worker-service-base/src/api/register_api_definition_api.rs @@ -18,7 +18,8 @@ use crate::gateway_api_definition::http::{ use crate::gateway_api_definition::{ApiDefinitionId, ApiVersion}; use crate::gateway_api_deployment::ApiSite; use crate::gateway_binding::{ - GatewayBinding, GatewayBindingCompiled, StaticBinding, WorkerBinding, WorkerBindingCompiled, + GatewayBinding, GatewayBindingCompiled, HttpHandlerBinding, HttpHandlerBindingCompiled, + StaticBinding, WorkerBinding, WorkerBindingCompiled, }; use crate::gateway_middleware::{CorsPreflightExpr, HttpCors, HttpMiddleware, HttpMiddlewares}; use crate::gateway_security::{ @@ -373,6 +374,36 @@ impl GatewayBindingData { allow_credentials: None, }) } + + pub fn from_http_handler_binding( + http_handler_binding: HttpHandlerBinding, + binding_type: GatewayBindingType, + ) -> Result { + let worker_id = http_handler_binding + .worker_name + .map(|expr| rib::to_string(&expr).map_err(|e| e.to_string())) + .transpose()?; + + let idempotency_key = if let Some(key) = &http_handler_binding.idempotency_key { + Some(rib::to_string(key).map_err(|e| e.to_string())?) + } else { + None + }; + + Ok(Self { + binding_type: Some(binding_type), + component_id: Some(http_handler_binding.component_id), + worker_name: worker_id, + idempotency_key, + response: None, + allow_origin: None, + allow_methods: None, + allow_headers: None, + expose_headers: None, + max_age: None, + allow_credentials: None, + }) + } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Object)] @@ -493,6 +524,33 @@ impl GatewayBindingResponseData { response_mapping_output: worker_binding.response_compiled.rib_output, } } + + pub fn from_http_handler_binding_compiled( + http_handler_binding: HttpHandlerBindingCompiled, + binding_type: GatewayBindingType, + ) -> Self { + GatewayBindingResponseData { + component_id: Some(http_handler_binding.component_id), + worker_name: http_handler_binding + .worker_name_compiled + .clone() + .map(|compiled| compiled.worker_name.to_string()), + idempotency_key: http_handler_binding.idempotency_key_compiled.clone().map( + |idempotency_key_compiled| idempotency_key_compiled.idempotency_key.to_string(), + ), + response: None, + binding_type: Some(binding_type), + response_mapping_input: None, + worker_name_input: http_handler_binding + .worker_name_compiled + .map(|compiled| compiled.rib_input_type_info), + idempotency_key_input: http_handler_binding + .idempotency_key_compiled + .map(|idempotency_key_compiled| idempotency_key_compiled.rib_input), + cors_preflight: None, + response_mapping_output: None, + } + } } impl TryFrom for GatewayBindingResponseData { @@ -514,6 +572,12 @@ impl TryFrom for GatewayBindingResponseData { GatewayBindingType::Default, )) } + GatewayBindingCompiled::HttpHandler(http_handler_binding) => Ok( + GatewayBindingResponseData::from_http_handler_binding_compiled( + http_handler_binding, + GatewayBindingType::HttpHandler, + ), + ), GatewayBindingCompiled::Static(static_binding) => { let binding_type = match static_binding.deref() { StaticBinding::HttpCorsPreflight(_) => GatewayBindingType::CorsPreflight, @@ -631,6 +695,13 @@ impl TryFrom for GatewayBindingData { GatewayBindingType::FileServer, ), + GatewayBinding::HttpHandler(http_handler_binding) => { + GatewayBindingData::from_http_handler_binding( + http_handler_binding, + GatewayBindingType::HttpHandler, + ) + } + GatewayBinding::Static(static_binding) => match static_binding.deref() { StaticBinding::HttpCorsPreflight(cors) => Ok(GatewayBindingData { binding_type: Some(GatewayBindingType::CorsPreflight), @@ -661,7 +732,10 @@ impl TryFrom for GatewayBinding { let v = gateway_binding_data.clone().binding_type; match v { - Some(GatewayBindingType::Default) | Some(GatewayBindingType::FileServer) | None => { + Some(GatewayBindingType::Default) + | Some(GatewayBindingType::FileServer) + | Some(GatewayBindingType::HttpHandler) + | None => { let response = gateway_binding_data .response .ok_or("Missing response field in binding")?; diff --git a/golem-worker-service-base/src/gateway_api_definition/http/http_api_definition.rs b/golem-worker-service-base/src/gateway_api_definition/http/http_api_definition.rs index bce8fb41d1..be563f1f83 100644 --- a/golem-worker-service-base/src/gateway_api_definition/http/http_api_definition.rs +++ b/golem-worker-service-base/src/gateway_api_definition/http/http_api_definition.rs @@ -16,8 +16,8 @@ use crate::gateway_api_definition::http::path_pattern_parser::parse_path_pattern use crate::gateway_api_definition::http::{HttpApiDefinitionRequest, RouteRequest}; use crate::gateway_api_definition::{ApiDefinitionId, ApiVersion, HasGolemBindings}; use crate::gateway_api_definition_transformer::transform_http_api_definition; -use crate::gateway_binding::WorkerBindingCompiled; use crate::gateway_binding::{GatewayBinding, GatewayBindingCompiled}; +use crate::gateway_binding::{HttpHandlerBindingCompiled, WorkerBindingCompiled}; use crate::gateway_middleware::{ HttpAuthenticationMiddleware, HttpCors, HttpMiddleware, HttpMiddlewares, }; @@ -624,6 +624,28 @@ impl CompiledRoute { }) } + GatewayBinding::HttpHandler(http_handler_binding) => { + let metadata = metadata_dictionary + .metadata + .get(&http_handler_binding.component_id) + .ok_or(RouteCompilationErrors::MetadataNotFoundError( + http_handler_binding.component_id.clone(), + ))?; + + let binding = HttpHandlerBindingCompiled::from_raw_http_handler_binding( + http_handler_binding, + metadata, + ) + .map_err(RouteCompilationErrors::RibCompilationError)?; + + Ok(CompiledRoute { + method: route.method.clone(), + path: route.path.clone(), + binding: GatewayBindingCompiled::HttpHandler(binding), + middlewares: route.middlewares.clone(), + }) + } + GatewayBinding::Static(static_binding) => Ok(CompiledRoute { method: route.method.clone(), path: route.path.clone(), diff --git a/golem-worker-service-base/src/gateway_api_definition/http/http_oas_api_definition.rs b/golem-worker-service-base/src/gateway_api_definition/http/http_oas_api_definition.rs index 8dc3e2c8dc..379a7f1db0 100644 --- a/golem-worker-service-base/src/gateway_api_definition/http/http_oas_api_definition.rs +++ b/golem-worker-service-base/src/gateway_api_definition/http/http_oas_api_definition.rs @@ -123,7 +123,9 @@ mod internal { use rib::Expr; use serde_json::Value; - use crate::gateway_binding::{GatewayBinding, ResponseMapping, StaticBinding, WorkerBinding}; + use crate::gateway_binding::{ + GatewayBinding, HttpHandlerBinding, ResponseMapping, StaticBinding, WorkerBinding, + }; use crate::gateway_middleware::{CorsPreflightExpr, HttpCors}; use crate::gateway_security::{SecuritySchemeIdentifier, SecuritySchemeReference}; use golem_service_base::model::VersionedComponentId; @@ -256,7 +258,7 @@ mod internal { } (GatewayBindingType::Default, _) => { - let binding = get_gateway_binding(worker_gateway_info)?; + let binding = get_worker_binding(worker_gateway_info)?; Ok(RouteRequest { path: path_pattern.clone(), @@ -267,7 +269,7 @@ mod internal { }) } (GatewayBindingType::FileServer, _) => { - let binding = get_gateway_binding(worker_gateway_info)?; + let binding = get_worker_binding(worker_gateway_info)?; Ok(RouteRequest { path: path_pattern.clone(), @@ -277,6 +279,17 @@ mod internal { cors: None }) } + (GatewayBindingType::HttpHandler, _) => { + let binding = get_http_handler_binding(worker_gateway_info)?; + + Ok(RouteRequest { + path: path_pattern.clone(), + method, + binding: GatewayBinding::HttpHandler(binding), + security, + cors: None + }) + } (GatewayBindingType::CorsPreflight, method) => { Err(format!("cors-preflight binding type is supported only for 'options' method, but found method '{}'", method)) } @@ -304,7 +317,7 @@ mod internal { } } - pub(crate) fn get_gateway_binding( + pub(crate) fn get_worker_binding( gateway_binding_value: &Value, ) -> Result { let binding = WorkerBinding { @@ -317,6 +330,18 @@ mod internal { Ok(binding) } + pub(crate) fn get_http_handler_binding( + gateway_binding_value: &Value, + ) -> Result { + let binding = HttpHandlerBinding { + worker_name: get_worker_id_expr(gateway_binding_value)?, + component_id: get_component_id(gateway_binding_value)?, + idempotency_key: get_idempotency_key(gateway_binding_value)?, + }; + + Ok(binding) + } + pub(crate) fn get_cors_static_binding( worker_gateway_info: &Value, ) -> Result { diff --git a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs index d84518cad9..3c8f7cbf2b 100644 --- a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs +++ b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs @@ -22,6 +22,9 @@ use golem_common::model::GatewayBindingType; use rib::RibOutputTypeInfo; use std::ops::Deref; +use super::http_handler_binding::HttpHandlerBindingCompiled; +use super::HttpHandlerBinding; + // A compiled binding is a binding with all existence of Rib Expr // get replaced with their compiled form - RibByteCode. #[derive(Debug, Clone, PartialEq)] @@ -29,6 +32,7 @@ pub enum GatewayBindingCompiled { Worker(WorkerBindingCompiled), Static(Box), FileServer(WorkerBindingCompiled), + HttpHandler(HttpHandlerBindingCompiled), } impl GatewayBindingCompiled { @@ -36,6 +40,7 @@ impl GatewayBindingCompiled { match self { GatewayBindingCompiled::Worker(_) => false, GatewayBindingCompiled::FileServer(_) => false, + GatewayBindingCompiled::HttpHandler(_) => false, GatewayBindingCompiled::Static(static_binding) => match static_binding.deref() { StaticBinding::HttpCorsPreflight(_) => false, StaticBinding::HttpAuthCallBack(_) => true, @@ -64,6 +69,13 @@ impl From for GatewayBinding { GatewayBinding::FileServer(worker_binding) } + GatewayBindingCompiled::HttpHandler(value) => { + let http_handler_binding = value.clone(); + + let worker_binding = HttpHandlerBinding::from(http_handler_binding); + + GatewayBinding::HttpHandler(worker_binding) + } } } } @@ -75,19 +87,26 @@ impl TryFrom fn try_from(value: GatewayBindingCompiled) -> Result { match value { GatewayBindingCompiled::Worker(worker_binding) => { - Ok(internal::to_gateway_binding_compiled_proto( + Ok(internal::worker_binding_to_gateway_binding_compiled_proto( worker_binding, GatewayBindingType::Default, )?) } GatewayBindingCompiled::FileServer(worker_binding) => { - Ok(internal::to_gateway_binding_compiled_proto( + Ok(internal::worker_binding_to_gateway_binding_compiled_proto( worker_binding, GatewayBindingType::FileServer, )?) } + GatewayBindingCompiled::HttpHandler(http_handler_binding) => { + Ok(internal::http_handler_to_gateway_binding_compiled_proto( + http_handler_binding, + GatewayBindingType::HttpHandler, + )?) + } + GatewayBindingCompiled::Static(static_binding) => { let binding_type = match static_binding.deref() { StaticBinding::HttpCorsPreflight(_) => golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::CorsPreflight, @@ -135,7 +154,9 @@ impl TryFrom { + ProtoGatewayBindingType::FileServer + | ProtoGatewayBindingType::Default + | ProtoGatewayBindingType::HttpHandler => { // Convert fields for the Worker variant let component_id = value .component @@ -232,11 +253,11 @@ impl TryFrom Result { @@ -290,6 +311,7 @@ mod internal { GatewayBindingType::Default => 0, GatewayBindingType::FileServer => 1, GatewayBindingType::CorsPreflight => 2, + GatewayBindingType::HttpHandler => 3, }; Ok( @@ -311,4 +333,57 @@ mod internal { }, ) } + + pub(crate) fn http_handler_to_gateway_binding_compiled_proto( + http_handler_binding: HttpHandlerBindingCompiled, + binding_type: GatewayBindingType, + ) -> Result { + let component = Some(http_handler_binding.component_id.into()); + let worker_name = http_handler_binding + .worker_name_compiled + .clone() + .map(|w| w.worker_name.into()); + let compiled_worker_name_expr = http_handler_binding + .worker_name_compiled + .clone() + .map(|w| w.compiled_worker_name.try_into()) + .transpose()?; + let worker_name_rib_input = http_handler_binding + .worker_name_compiled + .map(|w| w.rib_input_type_info.into()); + let (idempotency_key, compiled_idempotency_key_expr, idempotency_key_rib_input) = + match http_handler_binding.idempotency_key_compiled { + Some(x) => ( + Some(x.idempotency_key.into()), + Some(x.compiled_idempotency_key.try_into()?), + Some(x.rib_input.into()), + ), + None => (None, None, None), + }; + let binding_type = match binding_type { + GatewayBindingType::Default => 0, + GatewayBindingType::FileServer => 1, + GatewayBindingType::CorsPreflight => 2, + GatewayBindingType::HttpHandler => 3, + }; + + Ok( + golem_api_grpc::proto::golem::apidefinition::CompiledGatewayBinding { + component, + worker_name, + compiled_worker_name_expr, + worker_name_rib_input, + idempotency_key, + compiled_idempotency_key_expr, + idempotency_key_rib_input, + response: None, + compiled_response_expr: None, + response_rib_input: None, + worker_functions_in_response: None, + binding_type: Some(binding_type), + static_binding: None, + response_rib_output: None, + }, + ) + } } diff --git a/golem-worker-service-base/src/gateway_binding/http_handler_binding.rs b/golem-worker-service-base/src/gateway_binding/http_handler_binding.rs new file mode 100644 index 0000000000..c280e4df54 --- /dev/null +++ b/golem-worker-service-base/src/gateway_binding/http_handler_binding.rs @@ -0,0 +1,77 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{IdempotencyKeyCompiled, WorkerNameCompiled}; +use golem_service_base::model::VersionedComponentId; +use golem_wasm_ast::analysis::AnalysedExport; +use rib::Expr; + +#[derive(Debug, Clone, PartialEq)] +pub struct HttpHandlerBinding { + pub component_id: VersionedComponentId, + pub worker_name: Option, + pub idempotency_key: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct HttpHandlerBindingCompiled { + pub component_id: VersionedComponentId, + pub worker_name_compiled: Option, + pub idempotency_key_compiled: Option, +} + +impl HttpHandlerBindingCompiled { + pub fn from_raw_http_handler_binding( + http_handler_binding: &HttpHandlerBinding, + export_metadata: &[AnalysedExport], + ) -> Result { + let worker_name_compiled: Option = http_handler_binding + .worker_name + .clone() + .map(|worker_name_expr| { + WorkerNameCompiled::from_worker_name(&worker_name_expr, export_metadata) + }) + .transpose()?; + + let idempotency_key_compiled = match &http_handler_binding.idempotency_key { + Some(idempotency_key) => Some(IdempotencyKeyCompiled::from_idempotency_key( + idempotency_key, + export_metadata, + )?), + None => None, + }; + + Ok(HttpHandlerBindingCompiled { + component_id: http_handler_binding.component_id.clone(), + worker_name_compiled, + idempotency_key_compiled, + }) + } +} + +impl From for HttpHandlerBinding { + fn from(value: HttpHandlerBindingCompiled) -> Self { + let worker_binding = value.clone(); + + HttpHandlerBinding { + component_id: worker_binding.component_id, + worker_name: worker_binding + .worker_name_compiled + .map(|compiled| compiled.worker_name), + idempotency_key: worker_binding + .idempotency_key_compiled + .map(|compiled| compiled.idempotency_key), + } + } +} diff --git a/golem-worker-service-base/src/gateway_binding/mod.rs b/golem-worker-service-base/src/gateway_binding/mod.rs index 9fea9b52d9..9c7d917045 100644 --- a/golem-worker-service-base/src/gateway_binding/mod.rs +++ b/golem-worker-service-base/src/gateway_binding/mod.rs @@ -12,22 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) use self::http_handler_binding::*; +pub(crate) use self::worker_binding::*; pub(crate) use crate::gateway_execution::gateway_binding_resolver::*; pub(crate) use crate::gateway_execution::rib_input_value_resolver::*; pub(crate) use crate::gateway_request::request_details::*; +use crate::gateway_rib_compiler::DefaultWorkerServiceRibCompiler; +use crate::gateway_rib_compiler::WorkerServiceRibCompiler; pub(crate) use gateway_binding_compiled::*; use golem_api_grpc::proto::golem::apidefinition::GatewayBindingType; use golem_service_base::model::VersionedComponentId; -use rib::Expr; +use golem_wasm_ast::analysis::AnalysedExport; +use rib::{Expr, RibByteCode, RibInputTypeInfo}; pub use static_binding::*; use std::ops::Deref; -pub(crate) use worker_binding::*; -pub(crate) use worker_binding_compiled::*; mod gateway_binding_compiled; +mod http_handler_binding; mod static_binding; mod worker_binding; -mod worker_binding_compiled; + // A gateway binding is integration to the backend. This is similar to AWS's x-amazon-gateway-integration // where it holds the details of where to re-route. @@ -39,6 +43,7 @@ pub enum GatewayBinding { Default(WorkerBinding), FileServer(WorkerBinding), Static(Box), + HttpHandler(HttpHandlerBinding), } impl GatewayBinding { @@ -46,6 +51,7 @@ impl GatewayBinding { match self { Self::Default(_) => false, Self::FileServer(_) => false, + Self::HttpHandler(_) => false, Self::Static(s) => match s.deref() { StaticBinding::HttpCorsPreflight(_) => true, StaticBinding::HttpAuthCallBack(_) => false, @@ -57,6 +63,7 @@ impl GatewayBinding { match self { Self::Default(_) => false, Self::FileServer(_) => false, + Self::HttpHandler(_) => false, Self::Static(s) => match s.deref() { StaticBinding::HttpCorsPreflight(_) => false, StaticBinding::HttpAuthCallBack(_) => true, @@ -68,18 +75,13 @@ impl GatewayBinding { GatewayBinding::Static(Box::new(value)) } - pub fn get_worker_binding(&self) -> Option { - match self { - Self::Default(worker_binding) => Some(worker_binding.clone()), - Self::FileServer(worker_binding) => Some(worker_binding.clone()), - Self::Static(_) => None, - } - } - - pub fn get_worker_binding_mut(&mut self) -> Option<&mut WorkerBinding> { + pub fn get_component_id(&self) -> Option { match self { - Self::Default(worker_binding) => Some(worker_binding), - Self::FileServer(worker_binding) => Some(worker_binding), + Self::Default(worker_binding) => Some(worker_binding.component_id.clone()), + Self::FileServer(worker_binding) => Some(worker_binding.component_id.clone()), + Self::HttpHandler(http_handler_binding) => { + Some(http_handler_binding.component_id.clone()) + } Self::Static(_) => None, } } @@ -91,7 +93,7 @@ impl TryFrom for golem_api_grpc::proto::golem::apidefinition::Ga match value { GatewayBinding::Default(worker_binding) => Ok( golem_api_grpc::proto::golem::apidefinition::GatewayBinding { - binding_type: Some(0), + binding_type: Some(GatewayBindingType::Default.into()), component: Some(worker_binding.component_id.into()), worker_name: worker_binding.worker_name.map(|x| x.into()), response: Some(worker_binding.response_mapping.0.into()), @@ -101,7 +103,7 @@ impl TryFrom for golem_api_grpc::proto::golem::apidefinition::Ga ), GatewayBinding::FileServer(worker_binding) => Ok( golem_api_grpc::proto::golem::apidefinition::GatewayBinding { - binding_type: Some(1), + binding_type: Some(GatewayBindingType::FileServer.into()), component: Some(worker_binding.component_id.into()), worker_name: worker_binding.worker_name.map(|x| x.into()), response: Some(worker_binding.response_mapping.0.into()), @@ -127,7 +129,7 @@ impl TryFrom for golem_api_grpc::proto::golem::apidefinition::Ga Ok( golem_api_grpc::proto::golem::apidefinition::GatewayBinding { - binding_type: Some(gateway_binding_type as i32), + binding_type: Some(gateway_binding_type.into()), component: None, worker_name: None, response: None, @@ -136,6 +138,16 @@ impl TryFrom for golem_api_grpc::proto::golem::apidefinition::Ga }, ) } + GatewayBinding::HttpHandler(worker_binding) => Ok( + golem_api_grpc::proto::golem::apidefinition::GatewayBinding { + binding_type: Some(GatewayBindingType::HttpHandler.into()), + component: Some(worker_binding.component_id.into()), + worker_name: worker_binding.worker_name.map(|x| x.into()), + response: None, + idempotency_key: worker_binding.idempotency_key.map(|x| x.into()), + static_binding: None, + }, + ), } } } @@ -185,6 +197,19 @@ impl TryFrom for Ga response_mapping: ResponseMapping(response), })) } + golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::HttpHandler => { + let component_id = VersionedComponentId::try_from( + value.component.ok_or("Missing component id".to_string())?, + )?; + let worker_name = value.worker_name.map(Expr::try_from).transpose()?; + let idempotency_key = value.idempotency_key.map(Expr::try_from).transpose()?; + + Ok(GatewayBinding::HttpHandler(HttpHandlerBinding { + component_id, + worker_name, + idempotency_key, + })) + } golem_api_grpc::proto::golem::apidefinition::GatewayBindingType::CorsPreflight => { let static_binding = value.static_binding.ok_or("Missing static binding")?; @@ -203,3 +228,48 @@ impl TryFrom for Ga } } } + +#[derive(Debug, Clone, PartialEq)] +pub struct WorkerNameCompiled { + pub worker_name: Expr, + pub compiled_worker_name: RibByteCode, + pub rib_input_type_info: RibInputTypeInfo, +} + +impl WorkerNameCompiled { + pub fn from_worker_name( + worker_name: &Expr, + exports: &[AnalysedExport], + ) -> Result { + let compiled_worker_name = DefaultWorkerServiceRibCompiler::compile(worker_name, exports)?; + + Ok(WorkerNameCompiled { + worker_name: worker_name.clone(), + compiled_worker_name: compiled_worker_name.byte_code, + rib_input_type_info: compiled_worker_name.rib_input_type_info, + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct IdempotencyKeyCompiled { + pub idempotency_key: Expr, + pub compiled_idempotency_key: RibByteCode, + pub rib_input: RibInputTypeInfo, +} + +impl IdempotencyKeyCompiled { + pub fn from_idempotency_key( + idempotency_key: &Expr, + exports: &[AnalysedExport], + ) -> Result { + let idempotency_key_compiled = + DefaultWorkerServiceRibCompiler::compile(idempotency_key, exports)?; + + Ok(IdempotencyKeyCompiled { + idempotency_key: idempotency_key.clone(), + compiled_idempotency_key: idempotency_key_compiled.byte_code, + rib_input: idempotency_key_compiled.rib_input_type_info, + }) + } +} diff --git a/golem-worker-service-base/src/gateway_binding/worker_binding.rs b/golem-worker-service-base/src/gateway_binding/worker_binding.rs index 3d5295e38a..336923335f 100644 --- a/golem-worker-service-base/src/gateway_binding/worker_binding.rs +++ b/golem-worker-service-base/src/gateway_binding/worker_binding.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; - -use crate::gateway_binding::WorkerBindingCompiled; +use super::{IdempotencyKeyCompiled, WorkerNameCompiled}; +use crate::gateway_rib_compiler::DefaultWorkerServiceRibCompiler; +use crate::gateway_rib_compiler::WorkerServiceRibCompiler; use golem_service_base::model::VersionedComponentId; -use rib::Expr; +use golem_wasm_ast::analysis::AnalysedExport; +use rib::{Expr, RibByteCode, RibInputTypeInfo, RibOutputTypeInfo, WorkerFunctionsInRib}; +use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq)] pub struct WorkerBinding { @@ -26,9 +28,47 @@ pub struct WorkerBinding { pub response_mapping: ResponseMapping, } -// ResponseMapping will consist of actual logic such as invoking worker functions -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ResponseMapping(pub Expr); +#[derive(Debug, Clone, PartialEq)] +pub struct WorkerBindingCompiled { + pub component_id: VersionedComponentId, + pub worker_name_compiled: Option, + pub idempotency_key_compiled: Option, + pub response_compiled: ResponseMappingCompiled, +} + +impl WorkerBindingCompiled { + pub fn from_raw_worker_binding( + gateway_worker_binding: &WorkerBinding, + export_metadata: &[AnalysedExport], + ) -> Result { + let worker_name_compiled: Option = gateway_worker_binding + .worker_name + .clone() + .map(|worker_name_expr| { + WorkerNameCompiled::from_worker_name(&worker_name_expr, export_metadata) + }) + .transpose()?; + + let idempotency_key_compiled = match &gateway_worker_binding.idempotency_key { + Some(idempotency_key) => Some(IdempotencyKeyCompiled::from_idempotency_key( + idempotency_key, + export_metadata, + )?), + None => None, + }; + let response_compiled = ResponseMappingCompiled::from_response_mapping( + &gateway_worker_binding.response_mapping, + export_metadata, + )?; + + Ok(WorkerBindingCompiled { + component_id: gateway_worker_binding.component_id.clone(), + worker_name_compiled, + idempotency_key_compiled, + response_compiled, + }) + } +} impl From for WorkerBinding { fn from(value: WorkerBindingCompiled) -> Self { @@ -48,3 +88,35 @@ impl From for WorkerBinding { } } } + +// ResponseMapping will consist of actual logic such as invoking worker functions +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ResponseMapping(pub Expr); + +#[derive(Debug, Clone, PartialEq)] +pub struct ResponseMappingCompiled { + pub response_mapping_expr: Expr, + pub response_mapping_compiled: RibByteCode, + pub rib_input: RibInputTypeInfo, + pub worker_calls: Option, + // Optional to keep backward compatibility + pub rib_output: Option, +} + +impl ResponseMappingCompiled { + pub fn from_response_mapping( + response_mapping: &ResponseMapping, + exports: &[AnalysedExport], + ) -> Result { + let response_compiled = + DefaultWorkerServiceRibCompiler::compile(&response_mapping.0, exports)?; + + Ok(ResponseMappingCompiled { + response_mapping_expr: response_mapping.0.clone(), + response_mapping_compiled: response_compiled.byte_code, + rib_input: response_compiled.rib_input_type_info, + worker_calls: response_compiled.worker_invoke_calls, + rib_output: response_compiled.rib_output_type_info, + }) + } +} diff --git a/golem-worker-service-base/src/gateway_binding/worker_binding_compiled.rs b/golem-worker-service-base/src/gateway_binding/worker_binding_compiled.rs deleted file mode 100644 index 842afa611e..0000000000 --- a/golem-worker-service-base/src/gateway_binding/worker_binding_compiled.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2024-2025 Golem Cloud -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::gateway_binding::{ResponseMapping, WorkerBinding}; -use crate::gateway_rib_compiler::{DefaultWorkerServiceRibCompiler, WorkerServiceRibCompiler}; -use golem_service_base::model::VersionedComponentId; -use golem_wasm_ast::analysis::AnalysedExport; -use rib::{Expr, RibByteCode, RibInputTypeInfo, RibOutputTypeInfo, WorkerFunctionsInRib}; - -#[derive(Debug, Clone, PartialEq)] -pub struct WorkerBindingCompiled { - pub component_id: VersionedComponentId, - pub worker_name_compiled: Option, - pub idempotency_key_compiled: Option, - pub response_compiled: ResponseMappingCompiled, -} - -impl WorkerBindingCompiled { - pub fn from_raw_worker_binding( - gateway_worker_binding: &WorkerBinding, - export_metadata: &[AnalysedExport], - ) -> Result { - let worker_name_compiled: Option = gateway_worker_binding - .worker_name - .clone() - .map(|worker_name_expr| { - WorkerNameCompiled::from_worker_name(&worker_name_expr, export_metadata) - }) - .transpose()?; - - let idempotency_key_compiled = match &gateway_worker_binding.idempotency_key { - Some(idempotency_key) => Some(IdempotencyKeyCompiled::from_idempotency_key( - idempotency_key, - export_metadata, - )?), - None => None, - }; - let response_compiled = ResponseMappingCompiled::from_response_mapping( - &gateway_worker_binding.response_mapping, - export_metadata, - )?; - - Ok(WorkerBindingCompiled { - component_id: gateway_worker_binding.component_id.clone(), - worker_name_compiled, - idempotency_key_compiled, - response_compiled, - }) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct WorkerNameCompiled { - pub worker_name: Expr, - pub compiled_worker_name: RibByteCode, - pub rib_input_type_info: RibInputTypeInfo, -} - -impl WorkerNameCompiled { - pub fn from_worker_name( - worker_name: &Expr, - exports: &[AnalysedExport], - ) -> Result { - let compiled_worker_name = DefaultWorkerServiceRibCompiler::compile(worker_name, exports)?; - - Ok(WorkerNameCompiled { - worker_name: worker_name.clone(), - compiled_worker_name: compiled_worker_name.byte_code, - rib_input_type_info: compiled_worker_name.rib_input_type_info, - }) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct IdempotencyKeyCompiled { - pub idempotency_key: Expr, - pub compiled_idempotency_key: RibByteCode, - pub rib_input: RibInputTypeInfo, -} - -impl IdempotencyKeyCompiled { - pub fn from_idempotency_key( - idempotency_key: &Expr, - exports: &[AnalysedExport], - ) -> Result { - let idempotency_key_compiled = - DefaultWorkerServiceRibCompiler::compile(idempotency_key, exports)?; - - Ok(IdempotencyKeyCompiled { - idempotency_key: idempotency_key.clone(), - compiled_idempotency_key: idempotency_key_compiled.byte_code, - rib_input: idempotency_key_compiled.rib_input_type_info, - }) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct ResponseMappingCompiled { - pub response_mapping_expr: Expr, - pub response_mapping_compiled: RibByteCode, - pub rib_input: RibInputTypeInfo, - pub worker_calls: Option, - // Optional to keep backward compatibility - pub rib_output: Option, -} - -impl ResponseMappingCompiled { - pub fn from_response_mapping( - response_mapping: &ResponseMapping, - exports: &[AnalysedExport], - ) -> Result { - let response_compiled = - DefaultWorkerServiceRibCompiler::compile(&response_mapping.0, exports)?; - - Ok(ResponseMappingCompiled { - response_mapping_expr: response_mapping.0.clone(), - response_mapping_compiled: response_compiled.byte_code, - rib_input: response_compiled.rib_input_type_info, - worker_calls: response_compiled.worker_invoke_calls, - rib_output: response_compiled.rib_output_type_info, - }) - } -} diff --git a/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs b/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs index af0ecb26c9..0a5507ac5e 100644 --- a/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs +++ b/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs @@ -65,6 +65,97 @@ pub struct FileServerBindingDetails { pub file_path: ComponentFilePath, } +impl FileServerBindingDetails { + pub fn from_rib_result(result: RibResult) -> Result { + // Three supported formats: + // 1. A string path. Mime type is guessed from the path. Status code is 200. + // 2. A record with a 'file-path' field. Mime type and status are optionally taken from the record, otherwise guessed. + // 3. A result of either of the above, with the same rules applied. + match result { + RibResult::Val(value) => match value { + ValueAndType { + value: Value::Result(value), + typ: AnalysedType::Result(typ), + } => match value { + Ok(ok) => { + let ok = ValueAndType::new( + *ok.ok_or("ok unset".to_string())?, + (*typ.ok.ok_or("Missing 'ok' type")?).clone(), + ); + Self::from_rib_happy(ok) + } + Err(err) => { + let value = err.ok_or("err unset".to_string())?; + Err(format!("Error result: {value:?}")) + } + }, + other => Self::from_rib_happy(other), + }, + RibResult::Unit => Err("Expected a value".to_string()), + } + } + + /// Like the above, just without the result case. + fn from_rib_happy(value: ValueAndType) -> Result { + match &value { + ValueAndType { + value: Value::String(raw_path), + .. + } => Self::make_from(raw_path.clone(), None, None), + ValueAndType { + value: Value::Record(field_values), + typ: AnalysedType::Record(record), + } => { + let path_position = record + .fields + .iter() + .position(|pair| &pair.name == "file-path") + .ok_or("Record must contain 'file-path' field")?; + + let path = if let Value::String(path) = &field_values[path_position] { + path + } else { + return Err("file-path must be a string".to_string()); + }; + + let status = get_status_code(field_values, record)?; + let headers = get_response_headers_or_default(&value)?; + let content_type = headers.get_content_type(); + + Self::make_from(path.to_string(), content_type, status) + } + _ => Err("Response value expected".to_string()), + } + } + + fn make_from( + path: String, + content_type: Option, + status_code: Option, + ) -> Result { + let file_path = ComponentFilePath::from_either_str(&path)?; + + let content_type = match content_type { + Some(content_type) => content_type, + None => { + let mime_type = mime_guess::from_path(&path) + .first() + .ok_or("Could not determine mime type")?; + ContentType::from_str(mime_type.as_ref()) + .map_err(|e| format!("Invalid mime type: {}", e))? + } + }; + + let status_code = status_code.unwrap_or(StatusCode::OK); + + Ok(FileServerBindingDetails { + status_code, + content_type, + file_path, + }) + } +} + pub struct DefaultFileServerBindingHandler { component_service: Arc + Sync + Send>, initial_component_files_service: Arc, @@ -77,7 +168,7 @@ impl DefaultFileServerBindingHandler { initial_component_files_service: Arc, worker_service: Arc, ) -> Self { - DefaultFileServerBindingHandler { + Self { component_service, initial_component_files_service, worker_service, @@ -177,94 +268,3 @@ impl FileServerBindingHandler Result { - // Three supported formats: - // 1. A string path. Mime type is guessed from the path. Status code is 200. - // 2. A record with a 'file-path' field. Mime type and status are optionally taken from the record, otherwise guessed. - // 3. A result of either of the above, with the same rules applied. - match result { - RibResult::Val(value) => match value { - ValueAndType { - value: Value::Result(value), - typ: AnalysedType::Result(typ), - } => match value { - Ok(ok) => { - let ok = ValueAndType::new( - *ok.ok_or("ok unset".to_string())?, - (*typ.ok.ok_or("Missing 'ok' type")?).clone(), - ); - Self::from_rib_happy(ok) - } - Err(err) => { - let value = err.ok_or("err unset".to_string())?; - Err(format!("Error result: {value:?}")) - } - }, - other => Self::from_rib_happy(other), - }, - RibResult::Unit => Err("Expected a value".to_string()), - } - } - - /// Like the above, just without the result case. - fn from_rib_happy(value: ValueAndType) -> Result { - match &value { - ValueAndType { - value: Value::String(raw_path), - .. - } => Self::make_from(raw_path.clone(), None, None), - ValueAndType { - value: Value::Record(field_values), - typ: AnalysedType::Record(record), - } => { - let path_position = record - .fields - .iter() - .position(|pair| &pair.name == "file-path") - .ok_or("Record must contain 'file-path' field")?; - - let path = if let Value::String(path) = &field_values[path_position] { - path - } else { - return Err("file-path must be a string".to_string()); - }; - - let status = get_status_code(field_values, record)?; - let headers = get_response_headers_or_default(&value)?; - let content_type = headers.get_content_type(); - - Self::make_from(path.to_string(), content_type, status) - } - _ => Err("Response value expected".to_string()), - } - } - - fn make_from( - path: String, - content_type: Option, - status_code: Option, - ) -> Result { - let file_path = ComponentFilePath::from_either_str(&path)?; - - let content_type = match content_type { - Some(content_type) => content_type, - None => { - let mime_type = mime_guess::from_path(&path) - .first() - .ok_or("Could not determine mime type")?; - ContentType::from_str(mime_type.as_ref()) - .map_err(|e| format!("Invalid mime type: {}", e))? - } - }; - - let status_code = status_code.unwrap_or(StatusCode::OK); - - Ok(FileServerBindingDetails { - status_code, - content_type, - file_path, - }) - } -} diff --git a/golem-worker-service-base/src/gateway_execution/gateway_binding_resolver.rs b/golem-worker-service-base/src/gateway_execution/gateway_binding_resolver.rs index e3ba9a318f..6d978a1dc9 100644 --- a/golem-worker-service-base/src/gateway_execution/gateway_binding_resolver.rs +++ b/golem-worker-service-base/src/gateway_execution/gateway_binding_resolver.rs @@ -118,6 +118,7 @@ pub enum ResolvedBinding { Static(StaticBinding), Worker(ResolvedWorkerBinding), FileServer(ResolvedWorkerBinding), + HttpHandler(ResolvedHttpHandlerBinding), } #[derive(Clone, Debug)] @@ -198,6 +199,12 @@ impl ResolvedGatewayBinding { } } +#[derive(Debug, Clone)] +pub struct ResolvedHttpHandlerBinding { + pub worker_detail: WorkerDetail, + pub namespace: Namespace, +} + pub struct DefaultGatewayBindingResolver { input: InputHttpRequest, gateway_session_store: GatewaySessionStore, @@ -273,6 +280,7 @@ impl let mut http_request_details = HttpRequestDetails::from_input_http_request( &self.input.scheme, &self.input.host, + self.input.req_method.clone(), &self.input.api_input_path, &zipped_path_params, &request_query_variables, @@ -306,28 +314,45 @@ impl } match binding { - GatewayBindingCompiled::FileServer(worker_binding) => internal::get_resolved_binding( - worker_binding, - &http_request_details, - namespace, - headers, - ) - .await - .map(|resolved_binding| ResolvedGatewayBinding { - request_details: GatewayRequestDetails::Http(http_request_details), - resolved_binding: ResolvedBinding::FileServer(resolved_binding), - }), - GatewayBindingCompiled::Worker(worker_binding) => internal::get_resolved_binding( - worker_binding, - &http_request_details, - namespace, - headers, - ) - .await - .map(|resolved_binding| ResolvedGatewayBinding { - request_details: GatewayRequestDetails::Http(http_request_details), - resolved_binding: ResolvedBinding::Worker(resolved_binding), - }), + GatewayBindingCompiled::FileServer(worker_binding) => { + internal::get_resolved_worker_binding( + worker_binding, + &http_request_details, + namespace, + headers, + ) + .await + .map(|resolved_binding| ResolvedGatewayBinding { + request_details: GatewayRequestDetails::Http(http_request_details), + resolved_binding: ResolvedBinding::FileServer(resolved_binding), + }) + } + GatewayBindingCompiled::Worker(worker_binding) => { + internal::get_resolved_worker_binding( + worker_binding, + &http_request_details, + namespace, + headers, + ) + .await + .map(|resolved_binding| ResolvedGatewayBinding { + request_details: GatewayRequestDetails::Http(http_request_details), + resolved_binding: ResolvedBinding::Worker(resolved_binding), + }) + } + GatewayBindingCompiled::HttpHandler(http_handler_binding) => { + internal::get_resolved_http_handler_binding( + http_handler_binding, + &http_request_details, + namespace, + headers, + ) + .await + .map(|resolved_binding| ResolvedGatewayBinding { + request_details: GatewayRequestDetails::Http(http_request_details), + resolved_binding: ResolvedBinding::HttpHandler(resolved_binding), + }) + } GatewayBindingCompiled::Static(static_binding) => { Ok(ResolvedGatewayBinding::from_static_binding( &GatewayRequestDetails::Http(http_request_details), @@ -340,8 +365,8 @@ impl mod internal { use crate::gateway_binding::{ - ErrorOrRedirect, HttpRequestDetails, ResolvedWorkerBinding, RibInputValueResolver, - WorkerBindingCompiled, WorkerDetail, + ErrorOrRedirect, HttpHandlerBindingCompiled, HttpRequestDetails, ResolvedWorkerBinding, + RibInputValueResolver, WorkerBindingCompiled, WorkerDetail, }; use crate::gateway_execution::gateway_session::GatewaySessionStore; use crate::gateway_middleware::{HttpMiddlewares, MiddlewareError, MiddlewareSuccess}; @@ -350,6 +375,8 @@ mod internal { use http::HeaderMap; use std::sync::Arc; + use super::ResolvedHttpHandlerBinding; + pub async fn redirect_or_continue( input: &mut HttpRequestDetails, middlewares: &HttpMiddlewares, @@ -380,7 +407,7 @@ mod internal { } } - pub async fn get_resolved_binding( + pub async fn get_resolved_worker_binding( binding: &WorkerBindingCompiled, http_request_details: &HttpRequestDetails, namespace: &Namespace, @@ -457,4 +484,81 @@ mod internal { Ok(resolved_binding) } + + pub async fn get_resolved_http_handler_binding( + binding: &HttpHandlerBindingCompiled, + http_request_details: &HttpRequestDetails, + namespace: &Namespace, + headers: &HeaderMap, + ) -> Result, ErrorOrRedirect> { + let worker_name_opt = if let Some(worker_name_compiled) = &binding.worker_name_compiled { + let resolve_rib_input = http_request_details + .resolve_rib_input_value(&worker_name_compiled.rib_input_type_info) + .map_err(ErrorOrRedirect::rib_input_type_mismatch)?; + + let worker_name = rib::interpret_pure( + &worker_name_compiled.compiled_worker_name, + &resolve_rib_input, + ) + .await + .map_err(|err| { + ErrorOrRedirect::internal(format!( + "Failed to evaluate worker name rib expression. {}", + err + )) + })? + .get_literal() + .ok_or(ErrorOrRedirect::internal( + "Worker name is not a Rib expression that resolves to String".to_string(), + ))? + .as_string(); + + Some(worker_name) + } else { + None + }; + + let component_id = &binding.component_id; + + let idempotency_key = + if let Some(idempotency_key_compiled) = &binding.idempotency_key_compiled { + let resolve_rib_input = http_request_details + .resolve_rib_input_value(&idempotency_key_compiled.rib_input) + .map_err(ErrorOrRedirect::rib_input_type_mismatch)?; + + let idempotency_key_value = rib::interpret_pure( + &idempotency_key_compiled.compiled_idempotency_key, + &resolve_rib_input, + ) + .await + .map_err(|err| ErrorOrRedirect::internal(err.to_string()))?; + + let idempotency_key = idempotency_key_value + .get_literal() + .ok_or(ErrorOrRedirect::internal( + "Idempotency Key is not a string".to_string(), + ))? + .as_string(); + + Some(IdempotencyKey::new(idempotency_key)) + } else { + headers + .get("idempotency-key") + .and_then(|h| h.to_str().ok()) + .map(|value| IdempotencyKey::new(value.to_string())) + }; + + let worker_detail = WorkerDetail { + component_id: component_id.clone(), + worker_name: worker_name_opt, + idempotency_key, + }; + + let resolved_binding = ResolvedHttpHandlerBinding { + worker_detail, + namespace: namespace.clone(), + }; + + Ok(resolved_binding) + } } diff --git a/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs b/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs index 0a8bf8c870..24acaa985d 100644 --- a/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs +++ b/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs @@ -37,6 +37,8 @@ use rib::{RibInput, RibResult}; use std::sync::Arc; use tracing::error; +use super::http_handler_binding_handler::HttpHandlerBindingHandler; + #[async_trait] pub trait GatewayHttpInputExecutor { async fn execute_http_request(&self, input: poem::Request) -> poem::Response; @@ -46,6 +48,7 @@ pub struct DefaultGatewayInputExecutor { pub evaluator: Arc + Sync + Send>, pub file_server_binding_handler: Arc + Sync + Send>, pub auth_call_back_binding_handler: Arc, + pub http_handler_binding_handler: Arc + Sync + Send>, pub api_definition_lookup_service: Arc< dyn ApiDefinitionsLookup< InputHttpRequest, @@ -62,6 +65,7 @@ impl DefaultGatewayInputExecutor { evaluator: Arc + Sync + Send>, file_server_binding_handler: Arc + Sync + Send>, auth_call_back_binding_handler: Arc, + http_handler_binding_handler: Arc + Sync + Send>, api_definition_lookup_service: Arc< dyn ApiDefinitionsLookup< InputHttpRequest, @@ -76,6 +80,7 @@ impl DefaultGatewayInputExecutor { evaluator, file_server_binding_handler, auth_call_back_binding_handler, + http_handler_binding_handler, api_definition_lookup_service, gateway_session_store, identity_provider, @@ -128,6 +133,33 @@ impl DefaultGatewayInputExecutor { } } + ResolvedBinding::HttpHandler(http_handler_binding) => { + let result = self + .http_handler_binding_handler + .handle_http_handler_binding( + &http_handler_binding.namespace, + &http_handler_binding.worker_detail, + &request_details, + ) + .await; + + let mut response = result + .to_response(&request_details, &self.gateway_session_store) + .await; + + if let Some(middleware) = middleware_opt { + let result = middleware.process_middleware_out(&mut response).await; + match result { + Ok(_) => response, + Err(err) => { + err.to_response_from_safe_display(|_| StatusCode::INTERNAL_SERVER_ERROR) + } + } + } else { + response + } + } + ResolvedBinding::FileServer(resolved_file_server_binding) => { self.handle_file_server_binding( &self.gateway_session_store, diff --git a/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs new file mode 100644 index 0000000000..65d0c5b4ba --- /dev/null +++ b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs @@ -0,0 +1,225 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::gateway_binding::{HttpRequestDetails, WorkerDetail}; +use crate::gateway_execution::GatewayResolvedWorkerRequest; +use crate::service::worker::WorkerService; +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::TryStreamExt; +use golem_common::model::HasAccountId; +use golem_common::virtual_exports; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::{TypeAnnotatedValueConstructors, Value}; +use http::StatusCode; +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; +use std::convert::Infallible; +use std::str::FromStr; +use std::sync::Arc; + +use super::{GatewayWorkerRequestExecutor, WorkerRequestExecutorError}; + +#[async_trait] +pub trait HttpHandlerBindingHandler { + async fn handle_http_handler_binding( + &self, + namespace: &Namespace, + worker_detail: &WorkerDetail, + request_details: &HttpRequestDetails, + ) -> HttpHandlerBindingResult; +} + +pub type HttpHandlerBindingResult = Result; + +pub struct HttpHandlerBindingSuccess { + pub response: poem::Response, +} + +pub enum HttpHandlerBindingError { + InternalError(String), + WorkerRequestExecutorError(WorkerRequestExecutorError), +} + +pub struct DefaultHttpHandlerBindingHandler { + worker_request_executor: Arc + Sync + Send>, +} + +impl DefaultHttpHandlerBindingHandler { + pub fn new( + worker_request_executor: Arc + Sync + Send>, + ) -> Self { + Self { + worker_request_executor, + } + } +} + +#[async_trait] +impl HttpHandlerBindingHandler + for DefaultHttpHandlerBindingHandler +{ + async fn handle_http_handler_binding( + &self, + namespace: &Namespace, + worker_detail: &WorkerDetail, + request_details: &HttpRequestDetails, + ) -> HttpHandlerBindingResult { + let component_id = worker_detail.component_id.component_id.clone(); + + let http_request_function_param = { + use golem_common::virtual_exports::http_incoming_handler as hic; + + let headers = { + let mut acc = Vec::new(); + for header in request_details.request_headers.0.fields.iter() { + acc.push(( + header.name.clone(), + Bytes::from(header.value.to_string().into_bytes()), + )); + } + hic::HttpFields(acc) + }; + + let body = hic::HttpBodyAndTrailers { + content: hic::HttpBodyContent(Bytes::from( + request_details + .request_body_value + .0 + .to_string() + .into_bytes(), + )), + trailers: None, + }; + + hic::IncomingHttpRequest { + uri: request_details.get_api_input_path(), + method: hic::HttpMethod::from_http_method(request_details.request_method.clone()), + headers, + body: Some(body), + } + }; + + let typ: golem_wasm_ast::analysis::protobuf::Type = (&golem_common::virtual_exports::http_incoming_handler::IncomingHttpRequest::analysed_type()).into(); + + let type_annotated_param = + TypeAnnotatedValue::create(&http_request_function_param.to_value(), typ).map_err( + |e| { + HttpHandlerBindingError::InternalError(format!( + "Failed converting request into wasm rpc: {:?}", + e + )) + }, + )?; + + let resolved_request: GatewayResolvedWorkerRequest = + GatewayResolvedWorkerRequest { + component_id, + worker_name: worker_detail.worker_name.clone(), + function_name: virtual_exports::http_incoming_handler::FUNCTION_NAME.to_string(), + function_params: vec![type_annotated_param], + idempotency_key: worker_detail.idempotency_key.clone(), + namespace: namespace.clone(), + }; + + let response = self + .worker_request_executor + .execute(resolved_request) + .await + .map_err(HttpHandlerBindingError::WorkerRequestExecutorError)?; + + let poem_response = { + use golem_common::virtual_exports::http_incoming_handler as hic; + + let response_value: Value = response.result.try_into().map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed to parse response as wasm rpc value: {}", + e + )) + })?; + + let parsed_response = hic::HttpResponse::from_value(response_value).map_err(|e| { + HttpHandlerBindingError::InternalError(format!("Failed parsing response: {}", e)) + })?; + + let converted_status_code = + StatusCode::from_u16(parsed_response.status).map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed to parse response status: {}", + e + )) + })?; + + let mut builder = poem::Response::builder().status(converted_status_code); + + for (header_name, header_value) in parsed_response.headers.0 { + let converted_header_value = + http::HeaderValue::from_bytes(&header_value).map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed to parse response header: {}", + e + )) + })?; + builder = builder.header(header_name, converted_header_value); + } + + if let Some(body) = parsed_response.body { + let converted_body = http_body_util::Full::new(body.content.0); + + let trailers = if let Some(trailers) = body.trailers { + let mut acc = http::HeaderMap::new(); + for (header_name, header_value) in trailers.0.into_iter() { + let converted_header_name = http::HeaderName::from_str(&header_name) + .map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed to parse response trailer name: {}", + e + )) + })?; + let converted_header_value = http::HeaderValue::from_bytes(&header_value) + .map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed to parse response trailer value: {}", + e + )) + })?; + + acc.insert(converted_header_name, converted_header_value); + } + Some(Ok(acc)) + } else { + None + }; + + let body_with_trailers = converted_body.with_trailers(async { trailers }); + + let boxed: BoxBody = BoxBody::new( + body_with_trailers.map_err(error_from_infallible::), + ); + + builder.body(boxed) + } else { + builder.body(poem::Body::empty()) + } + }; + + Ok(HttpHandlerBindingSuccess { + response: poem_response, + }) + } +} + +fn error_from_infallible(_infallible: Infallible) -> E { + unreachable!() +} diff --git a/golem-worker-service-base/src/gateway_execution/mod.rs b/golem-worker-service-base/src/gateway_execution/mod.rs index e34ddbdd09..8e84cd2dfb 100644 --- a/golem-worker-service-base/src/gateway_execution/mod.rs +++ b/golem-worker-service-base/src/gateway_execution/mod.rs @@ -23,6 +23,7 @@ pub mod gateway_http_input_executor; pub mod gateway_session; mod gateway_worker_request_executor; mod http_content_type_mapper; +pub mod http_handler_binding_handler; pub mod rib_input_value_resolver; pub mod router; pub mod to_response; diff --git a/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs b/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs index 32ea8a442e..beff51e1c6 100644 --- a/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs +++ b/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs @@ -19,7 +19,7 @@ use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use rib::{RibInput, RibInputTypeInfo}; use std::collections::HashMap; use std::fmt::Display; -use tracing::warn; +use tracing::debug; // `RibInputValueResolver` is responsible // for extracting `RibInputValue` from any input, given the requirements as `RibInputTypeInfo`. @@ -58,7 +58,7 @@ impl RibInputValueResolver for HttpRequestDetails { match request_type_info { Some(request_type) => { - warn!("received: {:?}", rib_input_with_request_content); + debug!("received: {:?}", rib_input_with_request_content); let input = TypeAnnotatedValue::parse_with_type(rib_input_with_request_content, request_type) .map_err(|err| RibInputTypeMismatch(format!("Input request details don't match the requirements for rib expression to execute: {}. Requirements. {:?}", err.join(", "), request_type)))?; let input = input.try_into().map_err(|err| { diff --git a/golem-worker-service-base/src/gateway_execution/to_response.rs b/golem-worker-service-base/src/gateway_execution/to_response.rs index 37b7cfec64..8b160853d2 100644 --- a/golem-worker-service-base/src/gateway_execution/to_response.rs +++ b/golem-worker-service-base/src/gateway_execution/to_response.rs @@ -28,6 +28,8 @@ use poem::Body; use poem::IntoResponse; use rib::RibResult; +use super::http_handler_binding_handler::{HttpHandlerBindingError, HttpHandlerBindingResult}; + #[async_trait] pub trait ToHttpResponse { async fn to_response( @@ -67,6 +69,29 @@ impl ToHttpResponse for FileServerBindingResult { } } +#[async_trait] +impl ToHttpResponse for HttpHandlerBindingResult { + async fn to_response( + self, + _request_details: &HttpRequestDetails, + _session_store: &GatewaySessionStore, + ) -> poem::Response { + match self { + Ok(inner) => inner.response, + Err(HttpHandlerBindingError::InternalError(e)) => poem::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from_string(format!("Error {}", e).to_string())), + Err(HttpHandlerBindingError::WorkerRequestExecutorError(e)) => { + poem::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from_string( + format!("Error calling worker executor {}", e).to_string(), + )) + } + } + } +} + // Preflight (OPTIONS) response that will consist of all configured CORS headers #[async_trait] impl ToHttpResponse for CorsPreflight { diff --git a/golem-worker-service-base/src/gateway_request/request_details.rs b/golem-worker-service-base/src/gateway_request/request_details.rs index e92db4e8c5..e8adc0b260 100644 --- a/golem-worker-service-base/src/gateway_request/request_details.rs +++ b/golem-worker-service-base/src/gateway_request/request_details.rs @@ -20,7 +20,7 @@ use crate::gateway_middleware::HttpMiddlewares; use crate::gateway_request::http_request::ApiInputPath; use golem_common::SafeDisplay; use http::uri::Scheme; -use http::HeaderMap; +use http::{HeaderMap, Method}; use serde_json::Value; use std::collections::HashMap; use url::Url; @@ -30,37 +30,6 @@ use url::Url; pub enum GatewayRequestDetails { Http(HttpRequestDetails), } -impl GatewayRequestDetails { - // Form the HttpRequestDetails based on what's required by - // ApiDefinition. If there are query or path parameters that are not required - // by API definition, they will be discarded here. - // If there is a need to fetch any query values or path values that are required - // in the workflow but not through API definition, use poem::Request directly - // as it will be better performing in the hot path - pub fn from( - scheme: &Option, - host: &ApiSiteString, - api_input_path: &ApiInputPath, - path_params: &HashMap, - query_variable_values: &HashMap, - query_variable_names: &[QueryInfo], - request_body: &Value, - headers: HeaderMap, - middlewares: &Option, - ) -> Result> { - Ok(Self::Http(HttpRequestDetails::from_input_http_request( - scheme, - host, - api_input_path, - path_params, - query_variable_values, - query_variable_names, - request_body, - headers, - middlewares, - )?)) - } -} // A structure that holds the incoming request details // along with parameters that are required by the route in API definition @@ -71,6 +40,7 @@ impl GatewayRequestDetails { pub struct HttpRequestDetails { pub scheme: Option, pub host: ApiSiteString, + pub request_method: Method, pub api_input_path: ApiInputPath, pub request_path_params: RequestPathValues, pub request_body_value: RequestBody, @@ -158,6 +128,7 @@ impl HttpRequestDetails { HttpRequestDetails { scheme: Some(Scheme::HTTP), host: ApiSiteString("".to_string()), + request_method: Method::GET, api_input_path: ApiInputPath { base_path: "".to_string(), query_path: None, @@ -251,6 +222,7 @@ impl HttpRequestDetails { pub fn from_input_http_request( scheme: &Option, host: &ApiSiteString, + method: Method, api_input_path: &ApiInputPath, path_params: &HashMap, query_variable_values: &HashMap, @@ -267,6 +239,7 @@ impl HttpRequestDetails { Ok(Self { scheme: scheme.clone(), host: host.clone(), + request_method: method, api_input_path: api_input_path.clone(), request_path_params: path_params, request_body_value: request_body, @@ -335,7 +308,8 @@ impl RequestQueryValues { } #[derive(Debug, Clone)] -pub struct RequestHeaderValues(JsonKeyValues); +pub struct RequestHeaderValues(pub JsonKeyValues); + impl RequestHeaderValues { fn from(headers: &HeaderMap) -> Result> { let mut headers_map: JsonKeyValues = JsonKeyValues::default(); @@ -353,7 +327,7 @@ impl RequestHeaderValues { } #[derive(Debug, Clone)] -pub struct RequestBody(Value); +pub struct RequestBody(pub Value); impl RequestBody { fn from(request_body: &Value) -> Result> { diff --git a/golem-worker-service-base/src/service/gateway/api_definition.rs b/golem-worker-service-base/src/service/gateway/api_definition.rs index 9dab6a4ec3..27aee1ff9e 100644 --- a/golem-worker-service-base/src/service/gateway/api_definition.rs +++ b/golem-worker-service-base/src/service/gateway/api_definition.rs @@ -201,9 +201,8 @@ impl ApiDefinitionServiceDefault { .get_bindings() .iter() .cloned() - .filter_map(|binding| binding.get_worker_binding()) - .map(|binding| async move { - let id = &binding.component_id; + .filter_map(|binding| binding.get_component_id()) + .map(|id| async move { self.component_service .get_by_version(&id.component_id, id.version, auth_ctx) .await diff --git a/golem-worker-service-base/src/service/gateway/http_api_definition_validator.rs b/golem-worker-service-base/src/service/gateway/http_api_definition_validator.rs index c57a66efa4..dc60e3dc9a 100644 --- a/golem-worker-service-base/src/service/gateway/http_api_definition_validator.rs +++ b/golem-worker-service-base/src/service/gateway/http_api_definition_validator.rs @@ -101,7 +101,7 @@ fn unique_routes(routes: &[Route]) -> Vec { errors.push(RouteValidationError { method: route.method.clone(), path: route.path.to_string(), - component: route.binding.get_worker_binding().map(|w| w.component_id), + component: route.binding.get_component_id(), detail, }); } diff --git a/golem-worker-service-base/tests/api_gateway_end_to_end_tests.rs b/golem-worker-service-base/tests/api_gateway_end_to_end_tests.rs index 4abe1c49d1..5d1fc6805f 100644 --- a/golem-worker-service-base/tests/api_gateway_end_to_end_tests.rs +++ b/golem-worker-service-base/tests/api_gateway_end_to_end_tests.rs @@ -74,6 +74,7 @@ async fn execute( internal::get_test_rib_interpreter(), internal::get_test_file_server_binding_handler(), Arc::new(DefaultAuthCallBack), + internal::get_test_http_handler_binding_handler(), Arc::new(internal::TestApiDefinitionLookup::new(compiled)), Arc::clone(session_store), Arc::new(test_identity_provider.clone()), @@ -1594,12 +1595,16 @@ mod internal { FileServerBindingHandler, FileServerBindingResult, }; use golem_worker_service_base::gateway_execution::gateway_binding_resolver::WorkerDetail; + use golem_worker_service_base::gateway_execution::http_handler_binding_handler::{ + HttpHandlerBindingHandler, HttpHandlerBindingResult, + }; use golem_worker_service_base::gateway_execution::{ GatewayResolvedWorkerRequest, GatewayWorkerRequestExecutor, WorkerRequestExecutorError, WorkerResponse, }; use golem_worker_service_base::gateway_middleware::HttpCors; + use golem_worker_service_base::gateway_request::request_details::HttpRequestDetails; use golem_worker_service_base::gateway_rib_interpreter::{ DefaultRibInterpreter, EvaluationError, WorkerServiceRibInterpreter, }; @@ -1675,6 +1680,19 @@ mod internal { } } + struct TestHttpHandlerBindingHandler {} + #[async_trait] + impl HttpHandlerBindingHandler for TestHttpHandlerBindingHandler { + async fn handle_http_handler_binding( + &self, + _namespace: &Namespace, + _worker_detail: &WorkerDetail, + _request_details: &HttpRequestDetails, + ) -> HttpHandlerBindingResult { + unimplemented!() + } + } + #[derive(Debug, Clone)] pub struct DefaultResult { pub worker_name: String, @@ -1828,6 +1846,11 @@ mod internal { Arc::new(TestFileServerBindingHandler {}) } + pub fn get_test_http_handler_binding_handler( + ) -> Arc + Sync + Send> { + Arc::new(TestHttpHandlerBindingHandler {}) + } + pub fn get_preflight_from_response(response: Response) -> HttpCors { let headers = response.headers(); diff --git a/golem-worker-service/src/api/mod.rs b/golem-worker-service/src/api/mod.rs index 3755c95080..92562dfe5b 100644 --- a/golem-worker-service/src/api/mod.rs +++ b/golem-worker-service/src/api/mod.rs @@ -46,6 +46,7 @@ pub fn custom_request_route(services: &Services) -> Route { services.worker_to_http_service.clone(), services.http_definition_lookup_service.clone(), services.fileserver_binding_handler.clone(), + services.http_handler_binding_handler.clone(), services.gateway_session_store.clone(), ); diff --git a/golem-worker-service/src/service/mod.rs b/golem-worker-service/src/service/mod.rs index cedbdebf6b..e0eb4f7890 100644 --- a/golem-worker-service/src/service/mod.rs +++ b/golem-worker-service/src/service/mod.rs @@ -22,6 +22,9 @@ use golem_service_base::storage::blob::BlobStorage; use golem_service_base::storage::sqlite::SqlitePool; use golem_worker_service_base::gateway_execution::file_server_binding_handler::DefaultFileServerBindingHandler; use golem_worker_service_base::gateway_execution::file_server_binding_handler::FileServerBindingHandler; +use golem_worker_service_base::gateway_execution::http_handler_binding_handler::{ + DefaultHttpHandlerBindingHandler, HttpHandlerBindingHandler, +}; use worker_request_executor::UnauthorisedWorkerRequestExecutor; use golem_worker_service_base::gateway_api_definition::http::{ @@ -92,6 +95,8 @@ pub struct Services { Arc + Sync + Send>, pub fileserver_binding_handler: Arc + Sync + Send>, + pub http_handler_binding_handler: + Arc + Sync + Send>, } impl Services { @@ -254,6 +259,12 @@ impl Services { worker_service.clone(), )); + let http_handler_binding_handler: Arc< + dyn HttpHandlerBindingHandler + Sync + Send, + > = Arc::new(DefaultHttpHandlerBindingHandler::new( + worker_to_http_service.clone(), + )); + let api_definition_validator_service = Arc::new(HttpApiDefinitionValidator {}); let identity_provider = Arc::new(DefaultIdentityProvider); @@ -295,6 +306,7 @@ impl Services { api_definition_validator_service, fileserver_binding_handler, gateway_session_store, + http_handler_binding_handler, }) } } From a08e0929519d28abc43e9becbd70fb2790059d59 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 22 Jan 2025 21:57:50 +0100 Subject: [PATCH 2/3] add tests and various cleanups --- .../tests/api_deployment_http_handler.rs | 132 +++++++++ golem-cli/tests/main.rs | 1 + .../virtual_exports/http_incoming_handler.rs | 276 ++++++++++++------ golem-worker-executor-base/src/invocation.rs | 20 +- .../http_incoming_handler.rs | 44 ++- golem-worker-executor-base/tests/wasi.rs | 28 +- .../src/api/register_api_definition_api.rs | 30 +- .../gateway_binding_compiled.rs | 57 +++- .../gateway_http_input_executor.rs | 5 + .../http_handler_binding_handler.rs | 40 +-- .../src/gateway_request/http_request.rs | 6 +- .../src/gateway_request/request_details.rs | 22 +- openapi/golem-service.yaml | 1 + .../golem.yaml | 7 + 14 files changed, 500 insertions(+), 169 deletions(-) create mode 100644 golem-cli/tests/api_deployment_http_handler.rs create mode 100644 test-components/wasi-http-incoming-request-handler-echo/golem.yaml diff --git a/golem-cli/tests/api_deployment_http_handler.rs b/golem-cli/tests/api_deployment_http_handler.rs new file mode 100644 index 0000000000..01fad328b6 --- /dev/null +++ b/golem-cli/tests/api_deployment_http_handler.rs @@ -0,0 +1,132 @@ +// Copyright 2024-2025 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::cli::{Cli, CliLive}; +use crate::Tracing; +use golem_cli::model::component::ComponentView; +use golem_client::model::{ApiDeployment, HttpApiDefinitionRequest, HttpApiDefinitionResponseData}; +use golem_common::model::ComponentId; +use golem_common::uri::oss::urn::WorkerUrn; +use golem_test_framework::config::{EnvBasedTestDependencies, TestDependencies}; +use std::io::Write; +use std::sync::Arc; +use test_r::{inherit_test_dep, test, test_dep}; + +inherit_test_dep!(EnvBasedTestDependencies); +inherit_test_dep!(Tracing); + +#[test_dep] +fn cli(deps: &EnvBasedTestDependencies) -> CliLive { + CliLive::make("api_deployment", Arc::new(deps.clone())).unwrap() +} + +#[test] +async fn api_deployment_http_handler( + deps: &EnvBasedTestDependencies, + cli: &CliLive, +) -> anyhow::Result<()> { + let host = format!( + "localhost:{}", + deps.worker_service().private_custom_request_port() + ); + let component_name = "wasi-http-incoming-request-handler-echo".to_string(); + let golem_yaml = deps + .component_directory() + .join("wasi-http-incoming-request-handler-echo/golem.yaml"); + let cfg = &cli.config; + let component: ComponentView = cli.run_trimmed(&[ + "component", + "add", + &cfg.arg('a', "app"), + golem_yaml.to_str().unwrap(), + &cfg.arg('c', "component-name"), + &component_name, + ])?; + let component_urn = component.component_urn; + let component_id = component_urn.id.clone(); + + let worker_name = format!("{component_name}-worker"); + + let _: WorkerUrn = cli.run(&[ + "worker", + "add", + &cfg.arg('w', "worker-name"), + &worker_name, + &cfg.arg('C', "component"), + &component_urn.to_string(), + ])?; + + let temp_dir = tempfile::tempdir()?; + let api_definition = make_http_handler_api_definition(&component_id, &worker_name)?; + let api_path = temp_dir.path().join("api-definition.json"); + { + let mut api_file = std::fs::File::create_new(&api_path)?; + let mut writer = std::io::BufWriter::new(&mut api_file); + serde_json::to_writer(&mut writer, &api_definition)?; + writer.flush()?; + } + + let definition: HttpApiDefinitionResponseData = + cli.run(&["api-definition", "add", api_path.to_str().unwrap()])?; + + let _: ApiDeployment = cli.run(&[ + "api-deployment", + "deploy", + &cfg.arg('d', "definition"), + &format!("{}/{}", definition.id, definition.version), + &cfg.arg('H', "host"), + &host, + ])?; + + let res = { + let client = reqwest::Client::new(); + + client + .post(format!("http://{host}/test?foo=baz")) + .header("test-header", "test-header-value") + .body("\"test-body\"") + .send() + .await + }?; + assert_eq!(res.status().as_u16(), 200); + assert_eq!( + res.headers().get("echo-test-header").unwrap(), + "\"test-header-value\"" + ); + assert_eq!(res.text().await?, "\"test-body\""); + + Ok(()) +} + +fn make_http_handler_api_definition( + component_id: &ComponentId, + worker_name: &str, +) -> anyhow::Result { + Ok(serde_yaml::from_str(&format!( + r#" + id: "{component_id}" + version: "0.1.0" + draft: true + routes: + - method: Post + path: "/test" + binding: + bindingType: http-handler + componentId: + componentId: '{component_id}' + version: 0 + workerName: 'let name: string = "{worker_name}"; name' + "#, + ))?) +} diff --git a/golem-cli/tests/main.rs b/golem-cli/tests/main.rs index 21c705b50b..226938e4d7 100644 --- a/golem-cli/tests/main.rs +++ b/golem-cli/tests/main.rs @@ -27,6 +27,7 @@ pub mod cli; mod api_definition; mod api_deployment; mod api_deployment_fileserver; +mod api_deployment_http_handler; mod component; mod get; mod profile; diff --git a/golem-common/src/virtual_exports/http_incoming_handler.rs b/golem-common/src/virtual_exports/http_incoming_handler.rs index ccab519a1b..349a5d2558 100644 --- a/golem-common/src/virtual_exports/http_incoming_handler.rs +++ b/golem-common/src/virtual_exports/http_incoming_handler.rs @@ -15,8 +15,8 @@ use bytes::Bytes; use golem_wasm_ast::analysis::AnalysedType; use golem_wasm_ast::analysis::{AnalysedExport, AnalysedFunction, AnalysedInstance}; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::Value; -use semver::Version; use std::sync::LazyLock; // The following wit is modelled here: @@ -24,14 +24,35 @@ use std::sync::LazyLock; // type fields = list>>; // type body = list; // +// variant method { +// get, +// head, +// post, +// put, +// delete, +// connect, +// options, +// trace, +// patch, +// custom(string) +// } +// +// variant scheme { +// HTTP, +// HTTPS, +// custom(string) +// } +// // record body-and-trailers { // body: body, // trailers: option // } // // record request { -// uri: string // method: method, +// scheme: scheme, +// authority: string, +// path-with-query: string, // headers: fields, // body-and-trailers: option // } @@ -45,55 +66,6 @@ use std::sync::LazyLock; // handle: func(request: request) -> response; // -pub static REQUIRED_FUNCTIONS: LazyLock> = LazyLock::new(|| { - vec![ - rib::ParsedFunctionName { - site: rib::ParsedFunctionSite::PackagedInterface { - namespace: "wasi".to_string(), - package: "http".to_string(), - interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 0))), - }, - function: rib::ParsedFunctionReference::Function { - function: "handle".to_string(), - }, - }, - rib::ParsedFunctionName { - site: rib::ParsedFunctionSite::PackagedInterface { - namespace: "wasi".to_string(), - package: "http".to_string(), - interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 1))), - }, - function: rib::ParsedFunctionReference::Function { - function: "handle".to_string(), - }, - }, - rib::ParsedFunctionName { - site: rib::ParsedFunctionSite::PackagedInterface { - namespace: "wasi".to_string(), - package: "http".to_string(), - interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 2))), - }, - function: rib::ParsedFunctionReference::Function { - function: "handle".to_string(), - }, - }, - rib::ParsedFunctionName { - site: rib::ParsedFunctionSite::PackagedInterface { - namespace: "wasi".to_string(), - package: "http".to_string(), - interface: "incoming-handler".to_string(), - version: Some(rib::SemVer(Version::new(0, 2, 3))), - }, - function: rib::ParsedFunctionReference::Function { - function: "handle".to_string(), - }, - }, - ] -}); - pub static PARSED_FUNCTION_NAME: LazyLock = LazyLock::new(|| rib::ParsedFunctionName { site: rib::ParsedFunctionSite::PackagedInterface { @@ -141,11 +113,11 @@ pub static ANALYZED_FUNCTION: LazyLock = { }) }; -pub const FUNCTION_NAME: &str = "golem:http/incoming-handler"; +pub const FUNCTION_NAME: &str = "golem:http/incoming-handler.{handle}"; pub static ANALYZED_EXPORT: LazyLock = LazyLock::new(|| { AnalysedExport::Instance(AnalysedInstance { - name: FUNCTION_NAME.to_string(), + name: "golem:http/incoming-handler".to_string(), functions: vec![ANALYZED_FUNCTION.clone()], }) }); @@ -175,6 +147,85 @@ macro_rules! extract { }; } +pub enum HttpScheme { + HTTP, + HTTPS, + Custom(String), +} + +impl HttpScheme { + pub fn analyzed_type() -> AnalysedType { + use golem_wasm_ast::analysis::*; + AnalysedType::Variant(TypeVariant { + cases: vec![ + NameOptionTypePair { + name: "HTTP".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "HTTPS".to_string(), + typ: None, + }, + NameOptionTypePair { + name: "custom".to_string(), + typ: Some(AnalysedType::Str(TypeStr)), + }, + ], + }) + } + + pub fn from_value(value: &Value) -> Result { + let (case_idx, case_value) = extract!( + value, + Value::Variant { + case_idx, + case_value + }, + (case_idx, case_value), + "not a variant" + )?; + + match case_idx { + 0 => Ok(Self::HTTP), + 1 => Ok(Self::HTTPS), + 2 => { + let value = case_value.as_ref().ok_or("no case_value provided")?; + let custom_method = + extract!(*value.clone(), Value::String(inner), inner, "not a string")?; + Ok(Self::Custom(custom_method)) + } + _ => Err("unknown case")?, + } + } + + pub fn to_value(self) -> Value { + match self { + Self::HTTP => Value::Variant { + case_idx: 0, + case_value: None, + }, + Self::HTTPS => Value::Variant { + case_idx: 1, + case_value: None, + }, + Self::Custom(custom_method) => Value::Variant { + case_idx: 2, + case_value: Some(Box::new(Value::String(custom_method))), + }, + } + } +} + +impl From for HttpScheme { + fn from(value: http::uri::Scheme) -> Self { + match value { + well_known if well_known == http::uri::Scheme::HTTP => Self::HTTP, + well_known if well_known == http::uri::Scheme::HTTPS => Self::HTTPS, + other => Self::Custom(other.to_string()), + } + } +} + pub enum HttpMethod { GET, HEAD, @@ -230,7 +281,7 @@ impl HttpMethod { typ: None, }, NameOptionTypePair { - name: "Custom".to_string(), + name: "custom".to_string(), typ: Some(AnalysedType::Str(TypeStr)), }, ], @@ -249,20 +300,20 @@ impl HttpMethod { )?; match case_idx { - 0 => Ok(HttpMethod::GET), - 1 => Ok(HttpMethod::HEAD), - 2 => Ok(HttpMethod::POST), - 3 => Ok(HttpMethod::PUT), - 4 => Ok(HttpMethod::DELETE), - 5 => Ok(HttpMethod::CONNECT), - 6 => Ok(HttpMethod::OPTIONS), - 7 => Ok(HttpMethod::TRACE), - 8 => Ok(HttpMethod::PATCH), + 0 => Ok(Self::GET), + 1 => Ok(Self::HEAD), + 2 => Ok(Self::POST), + 3 => Ok(Self::PUT), + 4 => Ok(Self::DELETE), + 5 => Ok(Self::CONNECT), + 6 => Ok(Self::OPTIONS), + 7 => Ok(Self::TRACE), + 8 => Ok(Self::PATCH), 9 => { let value = case_value.as_ref().ok_or("no case_value provided")?; let custom_method = extract!(*value.clone(), Value::String(inner), inner, "not a string")?; - Ok(HttpMethod::Custom(custom_method)) + Ok(Self::Custom(custom_method)) } _ => Err("unknown case")?, } @@ -270,43 +321,43 @@ impl HttpMethod { pub fn to_value(self) -> Value { match self { - HttpMethod::GET => Value::Variant { + Self::GET => Value::Variant { case_idx: 0, case_value: None, }, - HttpMethod::HEAD => Value::Variant { + Self::HEAD => Value::Variant { case_idx: 1, case_value: None, }, - HttpMethod::POST => Value::Variant { + Self::POST => Value::Variant { case_idx: 2, case_value: None, }, - HttpMethod::PUT => Value::Variant { + Self::PUT => Value::Variant { case_idx: 3, case_value: None, }, - HttpMethod::DELETE => Value::Variant { + Self::DELETE => Value::Variant { case_idx: 4, case_value: None, }, - HttpMethod::CONNECT => Value::Variant { + Self::CONNECT => Value::Variant { case_idx: 5, case_value: None, }, - HttpMethod::OPTIONS => Value::Variant { + Self::OPTIONS => Value::Variant { case_idx: 6, case_value: None, }, - HttpMethod::TRACE => Value::Variant { + Self::TRACE => Value::Variant { case_idx: 7, case_value: None, }, - HttpMethod::PATCH => Value::Variant { + Self::PATCH => Value::Variant { case_idx: 8, case_value: None, }, - HttpMethod::Custom(custom_method) => Value::Variant { + Self::Custom(custom_method) => Value::Variant { case_idx: 9, case_value: Some(Box::new(Value::String(custom_method))), }, @@ -317,16 +368,16 @@ impl HttpMethod { use http::Method as M; match value { - M::GET => HttpMethod::GET, - M::CONNECT => HttpMethod::CONNECT, - M::DELETE => HttpMethod::DELETE, - M::HEAD => HttpMethod::HEAD, - M::OPTIONS => HttpMethod::OPTIONS, - M::PATCH => HttpMethod::PATCH, - M::POST => HttpMethod::POST, - M::PUT => HttpMethod::PUT, - M::TRACE => HttpMethod::TRACE, - other => HttpMethod::Custom(other.to_string()), + M::GET => Self::GET, + M::CONNECT => Self::CONNECT, + M::DELETE => Self::DELETE, + M::HEAD => Self::HEAD, + M::OPTIONS => Self::OPTIONS, + M::PATCH => Self::PATCH, + M::POST => Self::POST, + M::PUT => Self::PUT, + M::TRACE => Self::TRACE, + other => Self::Custom(other.to_string()), } } } @@ -493,8 +544,10 @@ impl HttpBodyAndTrailers { } pub struct IncomingHttpRequest { - pub uri: String, pub method: HttpMethod, + pub scheme: HttpScheme, + pub authority: String, + pub path_with_query: String, pub headers: HttpFields, pub body: Option, } @@ -506,12 +559,20 @@ impl IncomingHttpRequest { AnalysedType::Record(TypeRecord { fields: vec![ NameTypePair { - name: "uri".to_string(), + name: "method".to_string(), + typ: HttpMethod::analyzed_type(), + }, + NameTypePair { + name: "scheme".to_string(), + typ: HttpScheme::analyzed_type(), + }, + NameTypePair { + name: "authority".to_string(), typ: AnalysedType::Str(TypeStr), }, NameTypePair { - name: "method".to_string(), - typ: HttpMethod::analyzed_type(), + name: "path-with-query".to_string(), + typ: AnalysedType::Str(TypeStr), }, NameTypePair { name: "headers".to_string(), @@ -538,20 +599,27 @@ impl IncomingHttpRequest { fn from_value(value: &Value) -> Result { let record_values = extract!(value, Value::Record(inner), inner, "not a record")?; - if record_values.len() != 4 { + if record_values.len() != 6 { Err("wrong length of record data")?; }; - let uri = extract!( - record_values[0].clone(), + let method = HttpMethod::from_value(&record_values[0])?; + let scheme = HttpScheme::from_value(&record_values[1])?; + let authority = extract!( + record_values[2].clone(), + Value::String(inner), + inner, + "not a string" + )?; + let path_with_query = extract!( + record_values[3].clone(), Value::String(inner), inner, "not a string" )?; - let method = HttpMethod::from_value(&record_values[1])?; - let headers = HttpFields::from_value(&record_values[2])?; + let headers = HttpFields::from_value(&record_values[4])?; let body = extract!( - &record_values[3], + &record_values[5], Value::Option(inner), match inner { Some(v) => Some(HttpBodyAndTrailers::from_value(v)?), @@ -561,8 +629,10 @@ impl IncomingHttpRequest { )?; Ok(IncomingHttpRequest { - uri, method, + scheme, + authority, + path_with_query, headers, body, }) @@ -570,8 +640,10 @@ impl IncomingHttpRequest { pub fn to_value(self) -> Value { Value::Record(vec![ - Value::String(self.uri), self.method.to_value(), + self.scheme.to_value(), + Value::String(self.authority), + Value::String(self.path_with_query), self.headers.to_value(), Value::Option(self.body.map(|b| Box::new(b.to_value()))), ]) @@ -643,6 +715,18 @@ impl HttpResponse { }) } + pub fn from_function_output(output: TypeAnnotatedValue) -> Result { + let value: Value = output.try_into()?; + + let mut tuple_values = extract!(value, Value::Tuple(inner), inner, "not a tuple")?; + + if tuple_values.len() != 1 { + Err("unexpected number of outputs")? + }; + + Self::from_value(tuple_values.remove(0)) + } + pub fn to_value(self) -> Value { let converted_status: Value = Value::U16(self.status); let converted_headers: Value = self.headers.to_value(); diff --git a/golem-worker-executor-base/src/invocation.rs b/golem-worker-executor-base/src/invocation.rs index d411765e51..f5fcbc54e8 100644 --- a/golem-worker-executor-base/src/invocation.rs +++ b/golem-worker-executor-base/src/invocation.rs @@ -454,9 +454,8 @@ async fn invoke_http_handler( tracing::debug!("Invoking wasi:http/incoming-http-handler handle"); let (_, mut task_exits) = { - let hyper_request = + let (scheme, hyper_request) = virtual_export_compat::http_incoming_handler::input_to_hyper_request(function_input)?; - let scheme = wasi_http_scheme_from_request(&hyper_request)?; let incoming = store_context .data_mut() .as_wasi_http_view() @@ -747,20 +746,3 @@ enum FindFunctionResult { ResourceDrop, IncomingHttpHandlerBridge, } - -fn wasi_http_scheme_from_request( - req: &hyper::Request, -) -> Result { - use http::uri::*; - use wasmtime_wasi_http::bindings::http::types::Scheme as WasiScheme; - - let raw_scheme = req.uri().scheme().ok_or(GolemError::invalid_request( - "Could not extract scheme from uri".to_string(), - ))?; - - match raw_scheme { - scheme if *scheme == Scheme::HTTP => Ok(WasiScheme::Http), - scheme if *scheme == Scheme::HTTPS => Ok(WasiScheme::Https), - scheme => Ok(WasiScheme::Other(scheme.to_string())), - } -} diff --git a/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs b/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs index 197f3f31d4..11a48afb48 100644 --- a/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs +++ b/golem-worker-executor-base/src/virtual_export_compat/http_incoming_handler.rs @@ -23,16 +23,42 @@ use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use wasmtime_wasi_http::bindings::http::types::ErrorCode; -pub fn input_to_hyper_request( - inputs: &[Value], -) -> Result>, GolemError> { +pub type SchemeAndRequest = ( + wasmtime_wasi_http::bindings::wasi::http::types::Scheme, + hyper::Request>, +); + +pub fn input_to_hyper_request(inputs: &[Value]) -> Result { let request = IncomingHttpRequest::from_function_input(inputs).map_err(|e| { GolemError::invalid_request(format!("Failed contructing incoming request: {e}")) })?; - let mut builder = hyper::Request::builder() - .uri(request.uri) - .method(request.method); + let wasmtime_scheme = match request.scheme { + HttpScheme::HTTP => wasmtime_wasi_http::bindings::wasi::http::types::Scheme::Http, + HttpScheme::HTTPS => wasmtime_wasi_http::bindings::wasi::http::types::Scheme::Https, + HttpScheme::Custom(ref custom) => { + wasmtime_wasi_http::bindings::wasi::http::types::Scheme::Other(custom.clone()) + } + }; + + let converted_scheme = match request.scheme { + HttpScheme::HTTP => http::uri::Scheme::HTTP, + HttpScheme::HTTPS => http::uri::Scheme::HTTPS, + HttpScheme::Custom(custom) => custom.as_str().try_into().map_err(|e| { + GolemError::invalid_request(format!("Not a valid scheme: {custom} ({e})")) + })?, + }; + + let uri = http::Uri::builder() + .scheme(converted_scheme) + .authority(request.authority) + .path_and_query(request.path_with_query) + .build() + .map_err(|e| { + GolemError::invalid_request(format!("Failed to construct a valid url: {e}")) + })?; + + let mut builder = hyper::Request::builder().uri(uri).method(request.method); for (name, value) in request.headers.0 { let converted = http::HeaderValue::from_bytes(&value) @@ -69,9 +95,11 @@ pub fn input_to_hyper_request( BoxBody::new(http_body_util::Empty::new().map_err(hyper_error_from_infallible)) }; - builder + let hyper_request = builder .body(body) - .map_err(|e| GolemError::invalid_request(format!("Failed to attach body {e}"))) + .map_err(|e| GolemError::invalid_request(format!("Failed to attach body {e}")))?; + + Ok((wasmtime_scheme, hyper_request)) } pub async fn http_response_to_output( diff --git a/golem-worker-executor-base/tests/wasi.rs b/golem-worker-executor-base/tests/wasi.rs index 976e169dfb..edf113df1e 100644 --- a/golem-worker-executor-base/tests/wasi.rs +++ b/golem-worker-executor-base/tests/wasi.rs @@ -1868,11 +1868,16 @@ async fn wasi_incoming_request_handler( .await; let args: Value = Value::Record(vec![ - Value::String("http://localhost:8000".to_string()), Value::Variant { case_idx: 0, case_value: None, }, + Value::Variant { + case_idx: 0, + case_value: None, + }, + Value::String("localhost:8000".to_string()), + Value::String("/".to_string()), Value::List(vec![]), Value::Option(None), ]); @@ -1918,11 +1923,16 @@ async fn wasi_incoming_request_handler_echo( .await; let args: Value = Value::Record(vec![ - Value::String("http://localhost:8000/foo?bar=baz".to_string()), Value::Variant { case_idx: 2, case_value: None, }, + Value::Variant { + case_idx: 0, + case_value: None, + }, + Value::String("localhost:8000".to_string()), + Value::String("/foo?bar=baz".to_string()), Value::List(vec![Value::Tuple(vec![ Value::String("test-header".to_string()), Value::List( @@ -2052,11 +2062,16 @@ async fn wasi_incoming_request_handler_state( .await; let args_put: Value = Value::Record(vec![ - Value::String("http://localhost:8000".to_string()), Value::Variant { case_idx: 3, case_value: None, }, + Value::Variant { + case_idx: 0, + case_value: None, + }, + Value::String("localhost:8000".to_string()), + Value::String("/".to_string()), Value::List(vec![]), Value::Option(Some(Box::new(Value::Record(vec![ Value::List( @@ -2071,11 +2086,16 @@ async fn wasi_incoming_request_handler_state( ]); let args_get: Value = Value::Record(vec![ - Value::String("http://localhost:8000".to_string()), Value::Variant { case_idx: 0, case_value: None, }, + Value::Variant { + case_idx: 0, + case_value: None, + }, + Value::String("localhost:8000".to_string()), + Value::String("/".to_string()), Value::List(vec![]), Value::Option(None), ]); diff --git a/golem-worker-service-base/src/api/register_api_definition_api.rs b/golem-worker-service-base/src/api/register_api_definition_api.rs index 09e4538588..b2ace03476 100644 --- a/golem-worker-service-base/src/api/register_api_definition_api.rs +++ b/golem-worker-service-base/src/api/register_api_definition_api.rs @@ -732,10 +732,7 @@ impl TryFrom for GatewayBinding { let v = gateway_binding_data.clone().binding_type; match v { - Some(GatewayBindingType::Default) - | Some(GatewayBindingType::FileServer) - | Some(GatewayBindingType::HttpHandler) - | None => { + Some(GatewayBindingType::Default) | Some(GatewayBindingType::FileServer) | None => { let response = gateway_binding_data .response .ok_or("Missing response field in binding")?; @@ -773,6 +770,31 @@ impl TryFrom for GatewayBinding { } } + Some(GatewayBindingType::HttpHandler) => { + let component_id = gateway_binding_data + .component_id + .ok_or("Missing componentId field in binding")?; + + let worker_name = gateway_binding_data + .worker_name + .map(|name| rib::from_string(name.as_str()).map_err(|e| e.to_string())) + .transpose()?; + + let idempotency_key = if let Some(key) = &gateway_binding_data.idempotency_key { + Some(rib::from_string(key).map_err(|e| e.to_string())?) + } else { + None + }; + + let binding = HttpHandlerBinding { + component_id, + worker_name, + idempotency_key, + }; + + Ok(GatewayBinding::HttpHandler(binding)) + } + Some(GatewayBindingType::CorsPreflight) => { let response_mapping = gateway_binding_data.response; diff --git a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs index 3c8f7cbf2b..b75e7174ec 100644 --- a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs +++ b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs @@ -154,9 +154,7 @@ impl TryFrom { + ProtoGatewayBindingType::FileServer | ProtoGatewayBindingType::Default => { // Convert fields for the Worker variant let component_id = value .component @@ -239,6 +237,55 @@ impl TryFrom { + // Convert fields for the Worker variant + let component_id = value + .component + .ok_or("Missing component_id for Worker")? + .try_into()?; + + let worker_name_compiled = match ( + value.worker_name, + value.compiled_worker_name_expr, + value.worker_name_rib_input, + ) { + (Some(worker_name), Some(compiled_worker_name), Some(rib_input_type_info)) => { + Some(WorkerNameCompiled { + worker_name: rib::Expr::try_from(worker_name)?, + compiled_worker_name: rib::RibByteCode::try_from(compiled_worker_name)?, + rib_input_type_info: rib::RibInputTypeInfo::try_from( + rib_input_type_info, + )?, + }) + } + _ => None, + }; + + let idempotency_key_compiled = match ( + value.idempotency_key, + value.compiled_idempotency_key_expr, + value.idempotency_key_rib_input, + ) { + (Some(idempotency_key), Some(compiled_idempotency_key), Some(rib_input)) => { + Some(IdempotencyKeyCompiled { + idempotency_key: rib::Expr::try_from(idempotency_key)?, + compiled_idempotency_key: rib::RibByteCode::try_from( + compiled_idempotency_key, + )?, + rib_input: rib::RibInputTypeInfo::try_from(rib_input)?, + }) + } + _ => None, + }; + + Ok(GatewayBindingCompiled::HttpHandler( + HttpHandlerBindingCompiled { + component_id, + worker_name_compiled, + idempotency_key_compiled, + }, + )) + } ProtoGatewayBindingType::CorsPreflight | ProtoGatewayBindingType::AuthCallBack => { let static_binding = value .static_binding @@ -311,7 +358,7 @@ mod internal { GatewayBindingType::Default => 0, GatewayBindingType::FileServer => 1, GatewayBindingType::CorsPreflight => 2, - GatewayBindingType::HttpHandler => 3, + GatewayBindingType::HttpHandler => 4, }; Ok( @@ -364,7 +411,7 @@ mod internal { GatewayBindingType::Default => 0, GatewayBindingType::FileServer => 1, GatewayBindingType::CorsPreflight => 2, - GatewayBindingType::HttpHandler => 3, + GatewayBindingType::HttpHandler => 4, }; Ok( diff --git a/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs b/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs index 24acaa985d..df7804c83a 100644 --- a/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs +++ b/golem-worker-service-base/src/gateway_execution/gateway_http_input_executor.rs @@ -143,6 +143,11 @@ impl DefaultGatewayInputExecutor { ) .await; + match result { + Ok(_) => tracing::debug!("http handler binding successful"), + Err(ref e) => tracing::warn!("http handler binding failed: {e:?}"), + } + let mut response = result .to_response(&request_details, &self.gateway_session_store) .await; diff --git a/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs index 65d0c5b4ba..3c11dacfe4 100644 --- a/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs +++ b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs @@ -21,7 +21,7 @@ use futures_util::TryStreamExt; use golem_common::model::HasAccountId; use golem_common::virtual_exports; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::{TypeAnnotatedValueConstructors, Value}; +use golem_wasm_rpc::TypeAnnotatedValueConstructors; use http::StatusCode; use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; @@ -47,6 +47,7 @@ pub struct HttpHandlerBindingSuccess { pub response: poem::Response, } +#[derive(Debug)] pub enum HttpHandlerBindingError { InternalError(String), WorkerRequestExecutorError(WorkerRequestExecutorError), @@ -104,7 +105,9 @@ impl HttpHandlerBinding }; hic::IncomingHttpRequest { - uri: request_details.get_api_input_path(), + scheme: request_details.scheme.clone().into(), + authority: request_details.host.to_string(), + path_with_query: request_details.get_api_input_path(), method: hic::HttpMethod::from_http_method(request_details.request_method.clone()), headers, body: Some(body), @@ -133,25 +136,28 @@ impl HttpHandlerBinding namespace: namespace.clone(), }; - let response = self - .worker_request_executor - .execute(resolved_request) - .await - .map_err(HttpHandlerBindingError::WorkerRequestExecutorError)?; + let response = self.worker_request_executor.execute(resolved_request).await; + + // log outcome + match response { + Ok(_) => { + tracing::debug!("http_handler received successful response from worker invocation") + } + Err(ref e) => tracing::warn!("worker invocation of http_handler failed: {}", e), + } + + let response = response.map_err(HttpHandlerBindingError::WorkerRequestExecutorError)?; let poem_response = { use golem_common::virtual_exports::http_incoming_handler as hic; - let response_value: Value = response.result.try_into().map_err(|e| { - HttpHandlerBindingError::InternalError(format!( - "Failed to parse response as wasm rpc value: {}", - e - )) - })?; - - let parsed_response = hic::HttpResponse::from_value(response_value).map_err(|e| { - HttpHandlerBindingError::InternalError(format!("Failed parsing response: {}", e)) - })?; + let parsed_response = hic::HttpResponse::from_function_output(response.result) + .map_err(|e| { + HttpHandlerBindingError::InternalError(format!( + "Failed parsing response: {}", + e + )) + })?; let converted_status_code = StatusCode::from_u16(parsed_response.status).map_err(|e| { diff --git a/golem-worker-service-base/src/gateway_request/http_request.rs b/golem-worker-service-base/src/gateway_request/http_request.rs index 03643754c9..3cae96c429 100644 --- a/golem-worker-service-base/src/gateway_request/http_request.rs +++ b/golem-worker-service-base/src/gateway_request/http_request.rs @@ -25,7 +25,7 @@ use tracing::error; #[derive(Clone, Debug)] pub struct InputHttpRequest { - pub scheme: Option, + pub scheme: Scheme, pub host: ApiSiteString, pub api_input_path: ApiInputPath, pub headers: HeaderMap, @@ -44,10 +44,10 @@ impl From for Response { impl InputHttpRequest { pub async fn from_request(request: poem::Request) -> Result { + let scheme = request.scheme().clone(); let (req_parts, body) = request.into_parts(); let headers = req_parts.headers; let uri = req_parts.uri; - let scheme = uri.scheme(); let host = match headers.get(HOST).and_then(|h| h.to_str().ok()) { Some(host) => ApiSiteString(host.to_string()), @@ -77,7 +77,7 @@ impl InputHttpRequest { }; Ok(InputHttpRequest { - scheme: scheme.cloned(), + scheme: scheme.clone(), host, api_input_path: ApiInputPath { base_path: uri.path().to_string(), diff --git a/golem-worker-service-base/src/gateway_request/request_details.rs b/golem-worker-service-base/src/gateway_request/request_details.rs index e8adc0b260..882158e7fc 100644 --- a/golem-worker-service-base/src/gateway_request/request_details.rs +++ b/golem-worker-service-base/src/gateway_request/request_details.rs @@ -38,7 +38,7 @@ pub enum GatewayRequestDetails { // api_input_path is still available. #[derive(Debug, Clone)] pub struct HttpRequestDetails { - pub scheme: Option, + pub scheme: Scheme, pub host: ApiSiteString, pub request_method: Method, pub api_input_path: ApiInputPath, @@ -110,23 +110,19 @@ impl HttpRequestDetails { } pub fn url(&self) -> Result { - let url_str = if let Some(scheme) = &self.scheme { - format!( - "{}://{}{}", - scheme, - &self.host, - &self.api_input_path.to_string() - ) - } else { - format!("{}{}", &self.host, &self.api_input_path.to_string()) - }; + let url_str = format!( + "{}://{}{}", + &self.scheme, + &self.host, + &self.api_input_path.to_string() + ); Url::parse(&url_str).map_err(|err| err.to_string()) } pub fn empty() -> HttpRequestDetails { HttpRequestDetails { - scheme: Some(Scheme::HTTP), + scheme: Scheme::HTTP, host: ApiSiteString("".to_string()), request_method: Method::GET, api_input_path: ApiInputPath { @@ -220,7 +216,7 @@ impl HttpRequestDetails { } pub fn from_input_http_request( - scheme: &Option, + scheme: &Scheme, host: &ApiSiteString, method: Method, api_input_path: &ApiInputPath, diff --git a/openapi/golem-service.yaml b/openapi/golem-service.yaml index dd0db86075..ef24e400fc 100644 --- a/openapi/golem-service.yaml +++ b/openapi/golem-service.yaml @@ -4062,6 +4062,7 @@ components: enum: - default - file-server + - http-handler - cors-preflight GetFilesResponse: type: object diff --git a/test-components/wasi-http-incoming-request-handler-echo/golem.yaml b/test-components/wasi-http-incoming-request-handler-echo/golem.yaml new file mode 100644 index 0000000000..ea0035e9d9 --- /dev/null +++ b/test-components/wasi-http-incoming-request-handler-echo/golem.yaml @@ -0,0 +1,7 @@ +components: + wasi-http-incoming-request-handler-echo: + sourceWit: wit + generatedWit: generated-wit + componentWasm: ../wasi-http-incoming-request-handler-echo.wasm + linkedWasm: ../wasi-http-incoming-request-handler-echo.wasm + componentType: durable From da0a6c3df1689b25d26ab7efcb9125f14c0ade48 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Thu, 23 Jan 2025 13:06:32 +0100 Subject: [PATCH 3/3] remove unused --- .../src/gateway_execution/http_handler_binding_handler.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs index 3c11dacfe4..f1347c9f7e 100644 --- a/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs +++ b/golem-worker-service-base/src/gateway_execution/http_handler_binding_handler.rs @@ -14,10 +14,8 @@ use crate::gateway_binding::{HttpRequestDetails, WorkerDetail}; use crate::gateway_execution::GatewayResolvedWorkerRequest; -use crate::service::worker::WorkerService; use async_trait::async_trait; use bytes::Bytes; -use futures_util::TryStreamExt; use golem_common::model::HasAccountId; use golem_common::virtual_exports; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;