Skip to content

Commit

Permalink
Resume replay for debugging service (#1242)
Browse files Browse the repository at this point in the history
* Make oplog constructor public

* update golem examples

* Make openoplogs public for debugging service

* Add resume replay to external operations

* Add resume replay to worker

* Fix store passed to resume

* Fix compilation errors

* Fix count

* Make return type of resume sensible

* Add comments in resume_replay

* Handle resuming the replay of error

* Throw when unable to send resume command
  • Loading branch information
afsalthaj authored Jan 21, 2025
1 parent de0f293 commit 3b373c7
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 176 deletions.
348 changes: 177 additions & 171 deletions golem-worker-executor-base/src/durable_host/mod.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions golem-worker-executor-base/src/services/oplog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,12 @@ impl OpenOplogEntry {
}

#[derive(Clone)]
struct OpenOplogs {
pub struct OpenOplogs {
oplogs: Cache<WorkerId, (), OpenOplogEntry, ()>,
}

impl OpenOplogs {
fn new(name: &'static str) -> Self {
pub fn new(name: &'static str) -> Self {
Self {
oplogs: Cache::new(
None,
Expand All @@ -383,7 +383,7 @@ impl OpenOplogs {
}
}

async fn get_or_open(
pub async fn get_or_open(
&self,
worker_id: &WorkerId,
constructor: impl OplogConstructor + Send + 'static,
Expand Down Expand Up @@ -446,7 +446,7 @@ impl Debug for OpenOplogs {
}

#[async_trait]
trait OplogConstructor: Clone {
pub trait OplogConstructor: Clone {
async fn create_oplog(
self,
close: Box<dyn FnOnce() + Send + Sync>,
Expand Down
41 changes: 41 additions & 0 deletions golem-worker-executor-base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,24 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
}
}

pub async fn resume_replay(&self) -> Result<(), GolemError> {
match &*self.instance.lock().await {
WorkerInstance::Running(running) => {
running
.sender
.send(WorkerCommand::ResumeReplay)
.expect("Failed to send resume command");

Ok(())
}
WorkerInstance::Unloaded | WorkerInstance::WaitingForPermit(_) => {
Err(GolemError::invalid_request(
"Explicit resume is not supported for uninitialized workers",
))
}
}
}

pub async fn invoke(
&self,
idempotency_key: IdempotencyKey,
Expand Down Expand Up @@ -1485,6 +1503,28 @@ impl RunningWorker {
while let Some(cmd) = receiver.recv().await {
waiting_for_command.store(false, Ordering::Release);
match cmd {
WorkerCommand::ResumeReplay => {
let mut store = store.lock().await;

let resume_replay_result =
Ctx::resume_replay(&mut *store, &instance).await;

match resume_replay_result {
Ok(decision) => {
final_decision = decision;
}

Err(err) => {
warn!("Failed to resume replay: {err}");
if let Err(err2) = store.data_mut().set_suspended().await {
warn!("Additional error during resume of replay of worker: {err2}");
}

parent.stop_internal(true, Some(err)).await;
break;
}
}
}
WorkerCommand::Invocation => {
let message = active
.write()
Expand Down Expand Up @@ -1989,6 +2029,7 @@ pub enum RetryDecision {
#[derive(Debug)]
enum WorkerCommand {
Invocation,
ResumeReplay,
Interrupt(InterruptKind),
}

Expand Down
10 changes: 9 additions & 1 deletion golem-worker-executor-base/src/workerctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use golem_common::model::{
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::wasmtime::ResourceStore;
use golem_wasm_rpc::Value;
use wasmtime::component::{Component, Linker};
use wasmtime::component::{Component, Instance, Linker};
use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync};
use wasmtime_wasi::WasiView;
use wasmtime_wasi_http::WasiHttpView;
Expand Down Expand Up @@ -366,6 +366,14 @@ pub trait ExternalOperations<Ctx: WorkerCtx> {
metadata: &Option<WorkerMetadata>,
) -> Result<WorkerStatusRecord, GolemError>;

/// Resume the replay of a worker instance. Note that if the previous replay
/// hasn't reached the end of the replay (which is usually last index in oplog)
/// resume_replay will ensure to start replay from the last replayed index.
async fn resume_replay(
store: &mut (impl AsContextMut<Data = Ctx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError>;

/// Prepares a wasmtime instance after it has been created, but before it can be invoked.
/// This can be used to restore the previous state of the worker but by general it can be no-op.
///
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ impl ExternalOperations<TestWorkerCtx> for TestWorkerCtx {
.await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = TestWorkerCtx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<TestWorkerCtx>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ impl ExternalOperations<Context> for Context {
DurableWorkerCtx::<Context>::compute_latest_worker_status(this, worker_id, metadata).await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = Context> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<Context>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down

0 comments on commit 3b373c7

Please sign in to comment.