Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume replay for debugging service #1242

Merged
merged 13 commits into from
Jan 21, 2025
348 changes: 177 additions & 171 deletions golem-worker-executor-base/src/durable_host/mod.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions golem-worker-executor-base/src/services/oplog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,12 @@ impl OpenOplogEntry {
}

#[derive(Clone)]
struct OpenOplogs {
pub struct OpenOplogs {
oplogs: Cache<WorkerId, (), OpenOplogEntry, ()>,
}

impl OpenOplogs {
fn new(name: &'static str) -> Self {
pub fn new(name: &'static str) -> Self {
Self {
oplogs: Cache::new(
None,
Expand All @@ -383,7 +383,7 @@ impl OpenOplogs {
}
}

async fn get_or_open(
pub async fn get_or_open(
&self,
worker_id: &WorkerId,
constructor: impl OplogConstructor + Send + 'static,
Expand Down Expand Up @@ -446,7 +446,7 @@ impl Debug for OpenOplogs {
}

#[async_trait]
trait OplogConstructor: Clone {
pub trait OplogConstructor: Clone {
async fn create_oplog(
self,
close: Box<dyn FnOnce() + Send + Sync>,
Expand Down
41 changes: 41 additions & 0 deletions golem-worker-executor-base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,24 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
}
}

pub async fn resume_replay(&self) -> Result<(), GolemError> {
match &*self.instance.lock().await {
WorkerInstance::Running(running) => {
running
.sender
.send(WorkerCommand::ResumeReplay)
.expect("Failed to send resume command");

Ok(())
}
WorkerInstance::Unloaded | WorkerInstance::WaitingForPermit(_) => {
Err(GolemError::invalid_request(
"Explicit resume is not supported for uninitialized workers",
))
}
}
}

pub async fn invoke(
&self,
idempotency_key: IdempotencyKey,
Expand Down Expand Up @@ -1485,6 +1503,28 @@ impl RunningWorker {
while let Some(cmd) = receiver.recv().await {
waiting_for_command.store(false, Ordering::Release);
match cmd {
WorkerCommand::ResumeReplay => {
let mut store = store.lock().await;

let resume_replay_result =
Ctx::resume_replay(&mut *store, &instance).await;

match resume_replay_result {
Ok(decision) => {
final_decision = decision;
}

Err(err) => {
warn!("Failed to resume replay: {err}");
if let Err(err2) = store.data_mut().set_suspended().await {
warn!("Additional error during resume of replay of worker: {err2}");
}

parent.stop_internal(true, Some(err)).await;
break;
}
}
}
WorkerCommand::Invocation => {
let message = active
.write()
Expand Down Expand Up @@ -1989,6 +2029,7 @@ pub enum RetryDecision {
#[derive(Debug)]
enum WorkerCommand {
Invocation,
ResumeReplay,
Interrupt(InterruptKind),
}

Expand Down
10 changes: 9 additions & 1 deletion golem-worker-executor-base/src/workerctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use golem_common::model::{
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::wasmtime::ResourceStore;
use golem_wasm_rpc::Value;
use wasmtime::component::{Component, Linker};
use wasmtime::component::{Component, Instance, Linker};
use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync};
use wasmtime_wasi::WasiView;
use wasmtime_wasi_http::WasiHttpView;
Expand Down Expand Up @@ -366,6 +366,14 @@ pub trait ExternalOperations<Ctx: WorkerCtx> {
metadata: &Option<WorkerMetadata>,
) -> Result<WorkerStatusRecord, GolemError>;

/// Resume the replay of a worker instance. Note that if the previous replay
/// hasn't reached the end of the replay (which is usually last index in oplog)
/// resume_replay will ensure to start replay from the last replayed index.
async fn resume_replay(
afsalthaj marked this conversation as resolved.
Show resolved Hide resolved
store: &mut (impl AsContextMut<Data = Ctx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError>;

/// Prepares a wasmtime instance after it has been created, but before it can be invoked.
/// This can be used to restore the previous state of the worker but by general it can be no-op.
///
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ impl ExternalOperations<TestWorkerCtx> for TestWorkerCtx {
.await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = TestWorkerCtx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<TestWorkerCtx>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ impl ExternalOperations<Context> for Context {
DurableWorkerCtx::<Context>::compute_latest_worker_status(this, worker_id, metadata).await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = Context> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<Context>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down
Loading