From 3b373c7b6b0ba4556f27f36b4331a0a4da7efe26 Mon Sep 17 00:00:00 2001 From: Afsal Thaj Date: Tue, 21 Jan 2025 20:51:29 +1100 Subject: [PATCH] Resume replay for debugging service (#1242) * Make oplog constructor public * update golem examples * Make openoplogs public for debugging service * Add resume replay to external operations * Add resume replay to worker * Fix store passed to resume * Fix compilation errors * Fix count * Make return type of resume sensible * Add comments in resume_replay * Handle resuming the replay of error * Throw when unable to send resume command --- .../src/durable_host/mod.rs | 348 +++++++++--------- .../src/services/oplog/mod.rs | 8 +- golem-worker-executor-base/src/worker.rs | 41 +++ golem-worker-executor-base/src/workerctx.rs | 10 +- .../tests/common/mod.rs | 7 + golem-worker-executor/src/context.rs | 7 + 6 files changed, 245 insertions(+), 176 deletions(-) diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 14589458d8..66d6272e56 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -1206,6 +1206,182 @@ impl> ExternalOperations for Dur calculate_last_known_status(this, owned_worker_id, metadata).await } + async fn resume_replay( + store: &mut (impl AsContextMut + Send), + instance: &Instance, + ) -> Result { + let mut number_of_replayed_functions = 0; + + let resume_result = loop { + let cont = store.as_context().data().durable_ctx().state.is_replay(); + + if cont { + let oplog_entry = store + .as_context_mut() + .data_mut() + .durable_ctx_mut() + .state + .replay_state + .get_oplog_entry_exported_function_invoked() + .await; + match oplog_entry { + Err(error) => break Err(error), + Ok(None) => break Ok(RetryDecision::None), + Ok(Some((function_name, function_input, idempotency_key))) => { + debug!("Replaying function {function_name}"); + let span = span!(Level::INFO, "replaying", function = function_name); + store + .as_context_mut() + .data_mut() + .set_current_idempotency_key(idempotency_key) + .await; + + let full_function_name = function_name.to_string(); + let invoke_result = invoke_worker( + full_function_name.clone(), + function_input.clone(), + store, + instance, + ) + .instrument(span) + .await; + + match invoke_result { + Ok(InvokeResult::Succeeded { + output, + consumed_fuel, + }) => { + let component_metadata = + store.as_context().data().component_metadata(); + + match exports::function_by_name( + &component_metadata.exports, + &full_function_name, + ) { + Ok(value) => { + if let Some(value) = value { + let result = + interpret_function_results(output, value.results) + .map_err(|e| GolemError::ValueMismatch { + details: e.join(", "), + })?; + if let Err(err) = store + .as_context_mut() + .data_mut() + .on_invocation_success( + &full_function_name, + &function_input, + consumed_fuel, + result, + ) + .await + { + break Err(err); + } + } else { + let trap_type = TrapType::Error( + WorkerError::InvalidRequest(format!( + "Function {full_function_name} not found" + )), + ); + + let _ = store + .as_context_mut() + .data_mut() + .on_invocation_failure(&trap_type) + .await; + + break Err(GolemError::invalid_request(format!( + "Function {full_function_name} not found" + ))); + } + } + Err(err) => { + let trap_type = + TrapType::Error(WorkerError::InvalidRequest(format!( + "Function {full_function_name} not found: {err}" + ))); + + let _ = store + .as_context_mut() + .data_mut() + .on_invocation_failure(&trap_type) + .await; + + break Err(GolemError::invalid_request(format!( + "Function {full_function_name} not found: {err}" + ))); + } + } + number_of_replayed_functions += 1; + continue; + } + _ => { + let trap_type = match invoke_result { + Ok(invoke_result) => invoke_result.as_trap_type::(), + Err(error) => { + Some(TrapType::from_error::(&anyhow!(error))) + } + }; + let decision = match trap_type { + Some(trap_type) => { + let decision = store + .as_context_mut() + .data_mut() + .on_invocation_failure(&trap_type) + .await; + + if decision == RetryDecision::None { + // Cannot retry so we need to fail + match trap_type { + TrapType::Interrupt(interrupt_kind) => { + if interrupt_kind == InterruptKind::Interrupt { + break Err(GolemError::runtime( + "Interrupted via the Golem API", + )); + } else { + break Err(GolemError::runtime("The worker could not finish replaying a function {function_name}")); + } + } + TrapType::Exit => { + break Err(GolemError::runtime( + "Process exited", + )) + } + TrapType::Error(error) => { + let stderr = store + .as_context() + .data() + .get_public_state() + .event_service() + .get_last_invocation_errors(); + break Err(GolemError::runtime( + error.to_string(&stderr), + )); + } + } + } + + decision + } + None => RetryDecision::None, + }; + + break Ok(decision); + } + } + } + } + } else { + break Ok(RetryDecision::None); + } + }; + + record_number_of_replayed_functions(number_of_replayed_functions); + + resume_result + } + async fn prepare_instance( worker_id: &WorkerId, instance: &Instance, @@ -1213,8 +1389,6 @@ impl> ExternalOperations for Dur ) -> Result { debug!("Starting prepare_instance"); let start = Instant::now(); - let mut count = 0; - store.as_context_mut().data_mut().set_running(); if store @@ -1257,177 +1431,9 @@ impl> ExternalOperations for Dur .get_out_of_deleted_region() .await; - let result = loop { - let cont = store.as_context().data().durable_ctx().state.is_replay(); - - if cont { - let oplog_entry = store - .as_context_mut() - .data_mut() - .durable_ctx_mut() - .state - .replay_state - .get_oplog_entry_exported_function_invoked() - .await; - match oplog_entry { - Err(error) => break Err(error), - Ok(None) => break Ok(RetryDecision::None), - Ok(Some((function_name, function_input, idempotency_key))) => { - debug!("Replaying function {function_name}"); - let span = span!(Level::INFO, "replaying", function = function_name); - store - .as_context_mut() - .data_mut() - .set_current_idempotency_key(idempotency_key) - .await; - - let full_function_name = function_name.to_string(); - let invoke_result = invoke_worker( - full_function_name.clone(), - function_input.clone(), - store, - instance, - ) - .instrument(span) - .await; - - match invoke_result { - Ok(InvokeResult::Succeeded { - output, - consumed_fuel, - }) => { - let component_metadata = - store.as_context().data().component_metadata(); - - match exports::function_by_name( - &component_metadata.exports, - &full_function_name, - ) { - Ok(value) => { - if let Some(value) = value { - let result = interpret_function_results( - output, - value.results, - ) - .map_err(|e| GolemError::ValueMismatch { - details: e.join(", "), - })?; - if let Err(err) = store - .as_context_mut() - .data_mut() - .on_invocation_success( - &full_function_name, - &function_input, - consumed_fuel, - result, - ) - .await - { - break Err(err); - } - } else { - let trap_type = TrapType::Error( - WorkerError::InvalidRequest(format!( - "Function {full_function_name} not found" - )), - ); - - let _ = store - .as_context_mut() - .data_mut() - .on_invocation_failure(&trap_type) - .await; - - break Err(GolemError::invalid_request(format!( - "Function {full_function_name} not found" - ))); - } - } - Err(err) => { - let trap_type = TrapType::Error( - WorkerError::InvalidRequest(format!( - "Function {full_function_name} not found: {err}" - )), - ); - - let _ = store - .as_context_mut() - .data_mut() - .on_invocation_failure(&trap_type) - .await; - - break Err(GolemError::invalid_request(format!( - "Function {full_function_name} not found: {err}" - ))); - } - } - count += 1; - continue; - } - _ => { - let trap_type = match invoke_result { - Ok(invoke_result) => invoke_result.as_trap_type::(), - Err(error) => { - Some(TrapType::from_error::(&anyhow!(error))) - } - }; - let decision = match trap_type { - Some(trap_type) => { - let decision = store - .as_context_mut() - .data_mut() - .on_invocation_failure(&trap_type) - .await; - - if decision == RetryDecision::None { - // Cannot retry so we need to fail - match trap_type { - TrapType::Interrupt(interrupt_kind) => { - if interrupt_kind - == InterruptKind::Interrupt - { - break Err(GolemError::runtime( - "Interrupted via the Golem API", - )); - } else { - break Err(GolemError::runtime("The worker could not finish replaying a function {function_name}")); - } - } - TrapType::Exit => { - break Err(GolemError::runtime( - "Process exited", - )) - } - TrapType::Error(error) => { - let stderr = store - .as_context() - .data() - .get_public_state() - .event_service() - .get_last_invocation_errors(); - break Err(GolemError::runtime( - error.to_string(&stderr), - )); - } - } - } - - decision - } - None => RetryDecision::None, - }; + let result = Self::resume_replay(store, instance).await; - break Ok(decision); - } - } - } - } - } else { - break Ok(RetryDecision::None); - } - }; record_resume_worker(start.elapsed()); - record_number_of_replayed_functions(count); let final_decision = Self::finalize_pending_update(&result, instance, store).await; diff --git a/golem-worker-executor-base/src/services/oplog/mod.rs b/golem-worker-executor-base/src/services/oplog/mod.rs index d7d3dc0000..a9f8e647b0 100644 --- a/golem-worker-executor-base/src/services/oplog/mod.rs +++ b/golem-worker-executor-base/src/services/oplog/mod.rs @@ -367,12 +367,12 @@ impl OpenOplogEntry { } #[derive(Clone)] -struct OpenOplogs { +pub struct OpenOplogs { oplogs: Cache, } impl OpenOplogs { - fn new(name: &'static str) -> Self { + pub fn new(name: &'static str) -> Self { Self { oplogs: Cache::new( None, @@ -383,7 +383,7 @@ impl OpenOplogs { } } - async fn get_or_open( + pub async fn get_or_open( &self, worker_id: &WorkerId, constructor: impl OplogConstructor + Send + 'static, @@ -446,7 +446,7 @@ impl Debug for OpenOplogs { } #[async_trait] -trait OplogConstructor: Clone { +pub trait OplogConstructor: Clone { async fn create_oplog( self, close: Box, diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index 37f6e618d6..9622f47d0c 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -452,6 +452,24 @@ impl Worker { } } + pub async fn resume_replay(&self) -> Result<(), GolemError> { + match &*self.instance.lock().await { + WorkerInstance::Running(running) => { + running + .sender + .send(WorkerCommand::ResumeReplay) + .expect("Failed to send resume command"); + + Ok(()) + } + WorkerInstance::Unloaded | WorkerInstance::WaitingForPermit(_) => { + Err(GolemError::invalid_request( + "Explicit resume is not supported for uninitialized workers", + )) + } + } + } + pub async fn invoke( &self, idempotency_key: IdempotencyKey, @@ -1485,6 +1503,28 @@ impl RunningWorker { while let Some(cmd) = receiver.recv().await { waiting_for_command.store(false, Ordering::Release); match cmd { + WorkerCommand::ResumeReplay => { + let mut store = store.lock().await; + + let resume_replay_result = + Ctx::resume_replay(&mut *store, &instance).await; + + match resume_replay_result { + Ok(decision) => { + final_decision = decision; + } + + Err(err) => { + warn!("Failed to resume replay: {err}"); + if let Err(err2) = store.data_mut().set_suspended().await { + warn!("Additional error during resume of replay of worker: {err2}"); + } + + parent.stop_internal(true, Some(err)).await; + break; + } + } + } WorkerCommand::Invocation => { let message = active .write() @@ -1989,6 +2029,7 @@ pub enum RetryDecision { #[derive(Debug)] enum WorkerCommand { Invocation, + ResumeReplay, Interrupt(InterruptKind), } diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index 765e478bbb..72f4b48e30 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -50,7 +50,7 @@ use golem_common::model::{ use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::wasmtime::ResourceStore; use golem_wasm_rpc::Value; -use wasmtime::component::{Component, Linker}; +use wasmtime::component::{Component, Instance, Linker}; use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync}; use wasmtime_wasi::WasiView; use wasmtime_wasi_http::WasiHttpView; @@ -366,6 +366,14 @@ pub trait ExternalOperations { metadata: &Option, ) -> Result; + /// Resume the replay of a worker instance. Note that if the previous replay + /// hasn't reached the end of the replay (which is usually last index in oplog) + /// resume_replay will ensure to start replay from the last replayed index. + async fn resume_replay( + store: &mut (impl AsContextMut + Send), + instance: &Instance, + ) -> Result; + /// Prepares a wasmtime instance after it has been created, but before it can be invoked. /// This can be used to restore the previous state of the worker but by general it can be no-op. /// diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index acbe36a5f1..8655c1ae6e 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -461,6 +461,13 @@ impl ExternalOperations for TestWorkerCtx { .await } + async fn resume_replay( + store: &mut (impl AsContextMut + Send), + instance: &Instance, + ) -> Result { + DurableWorkerCtx::::resume_replay(store, instance).await + } + async fn prepare_instance( worker_id: &WorkerId, instance: &Instance, diff --git a/golem-worker-executor/src/context.rs b/golem-worker-executor/src/context.rs index 28585f6bb3..3752aabee8 100644 --- a/golem-worker-executor/src/context.rs +++ b/golem-worker-executor/src/context.rs @@ -116,6 +116,13 @@ impl ExternalOperations for Context { DurableWorkerCtx::::compute_latest_worker_status(this, worker_id, metadata).await } + async fn resume_replay( + store: &mut (impl AsContextMut + Send), + instance: &Instance, + ) -> Result { + DurableWorkerCtx::::resume_replay(store, instance).await + } + async fn prepare_instance( worker_id: &WorkerId, instance: &Instance,