Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support invoking wasi:http/incoming-handler #1249

Merged
merged 9 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ exclude = [
"test-components/update-test-v4",
"test-components/variant-service",
"test-components/wasi-http-incoming-request-handler",
"test-components/wasi-http-incoming-request-handler-echo",
"test-components/wasi-http-incoming-request-handler-state",
"test-components/write-stderr",
"test-components/write-stdout",
]
Expand Down
41 changes: 23 additions & 18 deletions golem-common/src/virtual_exports/http_incoming_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ use semver::Version;
//
// record body-and-trailers {
// body: body,
// trailers: option<simple-fields>
// trailers: option<fields>
// }
//
// record request {
// uri: string
// method: method,
// path-with-query: string,
// scheme: scheme,
// authority: string,
// headers: simple-fields,
// headers: fields,
// body-and-trailers: option<body-and-trailers>
// }
//
// record response {
// status: status-code,
// headers: fields,
// body: option<body-and-trailers>
// }
//
Expand Down Expand Up @@ -360,12 +359,12 @@ impl HttpBodyContent {
}
}

pub struct HttpBody {
pub struct HttpBodyAndTrailers {
pub content: HttpBodyContent,
pub trailers: Option<HttpFields>,
}

impl HttpBody {
impl HttpBodyAndTrailers {
pub fn analysed_type() -> AnalysedType {
use golem_wasm_ast::analysis::*;

Expand Down Expand Up @@ -403,7 +402,7 @@ impl HttpBody {
"not an option"
)?;

Ok(HttpBody { content, trailers })
Ok(HttpBodyAndTrailers { content, trailers })
}
pub fn to_value(self) -> Value {
let converted_content = self.content.to_value();
Expand All @@ -417,7 +416,7 @@ pub struct IncomingHttpRequest {
pub uri: String,
pub method: HttpMethod,
pub headers: HttpFields,
pub body: Option<HttpBody>,
pub body: Option<HttpBodyAndTrailers>,
}

impl IncomingHttpRequest {
Expand All @@ -439,9 +438,9 @@ impl IncomingHttpRequest {
typ: HttpFields::analyzed_type(),
},
NameTypePair {
name: "body".to_string(),
name: "body-and-trailers".to_string(),
typ: AnalysedType::Option(TypeOption {
inner: Box::new(HttpBody::analysed_type()),
inner: Box::new(HttpBodyAndTrailers::analysed_type()),
}),
},
],
Expand All @@ -453,7 +452,7 @@ impl IncomingHttpRequest {
Err("invalid number of inputs")?;
};
Self::from_value(&inputs[0])
.map_err(|e| format!("Failed parsing input as http request: ${e}"))
.map_err(|e| format!("Failed parsing input as http request: {e}"))
}

fn from_value(value: &Value) -> Result<Self, String> {
Expand All @@ -475,7 +474,7 @@ impl IncomingHttpRequest {
&record_values[3],
Value::Option(inner),
match inner {
Some(v) => Some(HttpBody::from_value(v)?),
Some(v) => Some(HttpBodyAndTrailers::from_value(v)?),
None => None,
},
"not an option"
Expand All @@ -492,7 +491,8 @@ impl IncomingHttpRequest {

pub struct HttpResponse {
pub status: u16,
pub body: Option<HttpBody>,
pub headers: HttpFields,
pub body: Option<HttpBodyAndTrailers>,
}

impl HttpResponse {
Expand All @@ -506,9 +506,13 @@ impl HttpResponse {
typ: AnalysedType::U16(TypeU16),
},
NameTypePair {
name: "body".to_string(),
name: "headers".to_string(),
typ: HttpFields::analyzed_type(),
},
NameTypePair {
name: "body-and-trailers".to_string(),
typ: AnalysedType::Option(TypeOption {
inner: Box::new(HttpBody::analysed_type()),
inner: Box::new(HttpBodyAndTrailers::analysed_type()),
}),
},
],
Expand All @@ -517,8 +521,9 @@ impl HttpResponse {

pub fn to_value(self) -> Value {
let converted_status: Value = Value::U16(self.status);
let converted_body = Value::Option(self.body.map(|b| Box::new(b.to_value())));
let converted_headers: Value = self.headers.to_value();
let converted_body: Value = Value::Option(self.body.map(|b| Box::new(b.to_value())));

Value::Record(vec![converted_status, converted_body])
Value::Record(vec![converted_status, converted_headers, converted_body])
}
}
137 changes: 70 additions & 67 deletions golem-worker-executor-base/src/durable_host/http/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,78 +441,81 @@ impl<Ctx: WorkerCtx> HostFutureTrailers for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<FutureTrailers>,
) -> anyhow::Result<Option<Result<Result<Option<Resource<Trailers>>, ErrorCode>, ()>>> {
self.observe_function_call("http::types::future_trailers", "get");
// Trailers might be associated with an incoming http request or an http response.
// Only in the second case do we need to add durability. We can distinguish these
// two cases by checking for presence of an associated open http request.
if let Some(request_state) = self.state.open_http_requests.get(&self_.rep()) {
let begin_idx = self
.state
.open_function_table
.get(&request_state.root_handle)
.ok_or_else(|| {
anyhow!(
"No matching BeginRemoteWrite index was found for the open HTTP request"
)
})?;

let request_state = self
.state
.open_http_requests
.get(&self_.rep())
.ok_or_else(|| {
anyhow!("No matching HTTP request is associated with resource handle")
})?;
let begin_idx = self
.state
.open_function_table
.get(&request_state.root_handle)
.ok_or_else(|| {
anyhow!("No matching BeginRemoteWrite index was found for the open HTTP request")
})?;
let request = request_state.request.clone();

let durability = Durability::<
Option<Result<Result<Option<HashMap<String, Vec<u8>>>, SerializableErrorCode>, ()>>,
SerializableError,
>::new(
self,
"golem http::types::future_trailers",
"get",
DurableFunctionType::WriteRemoteBatched(Some(*begin_idx)),
)
.await?;

if durability.is_live() {
let result = HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await;
let to_serialize = match &result {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(trailers))))) => {
let mut serialized_trailers = HashMap::new();
let host_fields: &Resource<wasmtime_wasi_http::types::HostFields> =
unsafe { std::mem::transmute(trailers) };

for (key, value) in get_fields(self.table(), host_fields)? {
serialized_trailers
.insert(key.as_str().to_string(), value.as_bytes().to_vec());
let request = request_state.request.clone();

let durability = Durability::<
Option<Result<Result<Option<HashMap<String, Vec<u8>>>, SerializableErrorCode>, ()>>,
SerializableError,
>::new(
self,
"golem http::types::future_trailers",
"get",
DurableFunctionType::WriteRemoteBatched(Some(*begin_idx)),
)
.await?;

if durability.is_live() {
let result = HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await;
let to_serialize = match &result {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(trailers))))) => {
let mut serialized_trailers = HashMap::new();
let host_fields: &Resource<wasmtime_wasi_http::types::HostFields> =
unsafe { std::mem::transmute(trailers) };

for (key, value) in get_fields(self.table(), host_fields)? {
serialized_trailers
.insert(key.as_str().to_string(), value.as_bytes().to_vec());
}
Ok(Some(Ok(Ok(Some(serialized_trailers)))))
}
Ok(Some(Ok(Ok(Some(serialized_trailers)))))
}
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(err) => Err(SerializableError::from(err)),
};
let _ = durability
.persist_serializable(self, request, to_serialize)
.await;
result
} else {
let serialized = durability.replay(self).await;
match serialized {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(serialized_trailers))))) => {
let mut fields = FieldMap::new();
for (key, value) in serialized_trailers {
fields.insert(HeaderName::from_str(&key)?, HeaderValue::try_from(value)?);
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(err) => Err(SerializableError::from(err)),
};
let _ = durability
.persist_serializable(self, request, to_serialize)
.await;
result
} else {
let serialized = durability.replay(self).await;
match serialized {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(serialized_trailers))))) => {
let mut fields = FieldMap::new();
for (key, value) in serialized_trailers {
fields
.insert(HeaderName::from_str(&key)?, HeaderValue::try_from(value)?);
}
let hdrs = self
.table()
.push(wasmtime_wasi_http::types::HostFields::Owned { fields })?;
Ok(Some(Ok(Ok(Some(hdrs)))))
}
let hdrs = self
.table()
.push(wasmtime_wasi_http::types::HostFields::Owned { fields })?;
Ok(Some(Ok(Ok(Some(hdrs)))))
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(error) => Err(error),
}
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(error) => Err(error),
}
} else {
self.observe_function_call("http::types::future_trailers", "get");
HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await
}
}

Expand Down
27 changes: 18 additions & 9 deletions golem-worker-executor-base/src/durable_host/io/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use anyhow::anyhow;
use async_trait::async_trait;
use wasmtime::component::Resource;
use wasmtime_wasi::{ResourceTable, StreamError};
use wasmtime_wasi::StreamError;

use crate::durable_host::http::end_http_request;
use crate::durable_host::http::serialized::SerializableHttpRequest;
Expand All @@ -38,8 +38,8 @@ impl<Ctx: WorkerCtx> HostInputStream for DurableWorkerCtx<Ctx> {
self_: Resource<InputStream>,
len: u64,
) -> Result<Vec<u8>, StreamError> {
if is_incoming_http_body_stream(self.table(), &self_) {
let handle = self_.rep();
let handle = self_.rep();
if is_incoming_http_body_stream(self, &self_) {
let begin_idx = get_http_request_begin_idx(self, handle)?;

let durability = Durability::<Vec<u8>, SerializableStreamError>::new(
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<Ctx: WorkerCtx> HostInputStream for DurableWorkerCtx<Ctx> {
self_: Resource<InputStream>,
len: u64,
) -> Result<Vec<u8>, StreamError> {
if is_incoming_http_body_stream(self.table(), &self_) {
if is_incoming_http_body_stream(self, &self_) {
let handle = self_.rep();
let begin_idx = get_http_request_begin_idx(self, handle)?;

Expand Down Expand Up @@ -100,7 +100,7 @@ impl<Ctx: WorkerCtx> HostInputStream for DurableWorkerCtx<Ctx> {
}

async fn skip(&mut self, self_: Resource<InputStream>, len: u64) -> Result<u64, StreamError> {
if is_incoming_http_body_stream(self.table(), &self_) {
if is_incoming_http_body_stream(self, &self_) {
let handle = self_.rep();
let begin_idx = get_http_request_begin_idx(self, handle)?;

Expand Down Expand Up @@ -132,7 +132,7 @@ impl<Ctx: WorkerCtx> HostInputStream for DurableWorkerCtx<Ctx> {
self_: Resource<InputStream>,
len: u64,
) -> Result<u64, StreamError> {
if is_incoming_http_body_stream(self.table(), &self_) {
if is_incoming_http_body_stream(self, &self_) {
let handle = self_.rep();
let begin_idx = get_http_request_begin_idx(self, handle)?;

Expand Down Expand Up @@ -168,7 +168,7 @@ impl<Ctx: WorkerCtx> HostInputStream for DurableWorkerCtx<Ctx> {
async fn drop(&mut self, rep: Resource<InputStream>) -> anyhow::Result<()> {
self.observe_function_call("io::streams::input_stream", "drop");

if is_incoming_http_body_stream(self.table(), &rep) {
if is_incoming_http_body_stream(self, &rep) {
let handle = rep.rep();
if let Some(state) = self.state.open_http_requests.get(&handle) {
if state.close_owner == HttpRequestCloseOwner::InputStreamClosed {
Expand Down Expand Up @@ -297,8 +297,17 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
}
}

fn is_incoming_http_body_stream(table: &ResourceTable, stream: &Resource<InputStream>) -> bool {
let stream = table.get::<InputStream>(stream).unwrap();
fn is_incoming_http_body_stream<Ctx: WorkerCtx>(
ctx: &mut DurableWorkerCtx<Ctx>,
stream: &Resource<InputStream>,
) -> bool {
// incoming-body is used for both incoming http bodies (which don't need durability),
// and response bodies. Only in the second case will there be an associated open http request.
if !ctx.state.open_http_requests.contains_key(&stream.rep()) {
return false;
};

let stream = ctx.table().get::<InputStream>(stream).unwrap();
stream
.as_any()
.downcast_ref::<HostIncomingBodyStream>()
Expand Down
Loading
Loading