Skip to content

Commit

Permalink
Implement worker fork (#1209)
Browse files Browse the repository at this point in the history
  • Loading branch information
afsalthaj authored Jan 10, 2025
1 parent 1af8f6a commit be5e969
Show file tree
Hide file tree
Showing 27 changed files with 1,726 additions and 76 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions golem-api-grpc/proto/golem/worker/v1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ service WorkerService {

rpc ActivatePlugin(ActivatePluginRequest) returns (ActivatePluginResponse);
rpc DeactivatePlugin(DeactivatePluginRequest) returns (DeactivatePluginResponse);

rpc ForkWorker(ForkWorkerRequest) returns (ForkWorkerResponse);

}



message LaunchNewWorkerRequest {
golem.component.ComponentId componentId = 1;
string name = 2;
Expand All @@ -65,11 +70,20 @@ message LaunchNewWorkerResponse {
}
}



message LaunchNewWorkerSuccessResponse {
golem.worker.WorkerId workerId = 1;
uint64 component_version = 2;
}

message ForkWorkerRequest {
golem.worker.WorkerId source_worker_id = 2;
golem.worker.WorkerId target_worker_id = 3;
uint64 oplog_index_cutoff = 4;
}


message CompletePromiseRequest {
golem.worker.WorkerId workerId = 1;
golem.worker.CompleteParameters completeParameters = 2;
Expand All @@ -82,6 +96,14 @@ message CompletePromiseResponse {
}
}

message ForkWorkerResponse {
oneof result {
golem.common.Empty success = 1;
golem.worker.v1.WorkerError error = 2;
}
}


message DeleteWorkerRequest {
golem.worker.WorkerId workerId = 1;
}
Expand Down Expand Up @@ -178,6 +200,7 @@ message InvokeJsonRequest {

message ResumeWorkerRequest {
golem.worker.WorkerId workerId = 1;
optional bool force = 2;
}

message ResumeWorkerResponse {
Expand Down
16 changes: 16 additions & 0 deletions golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ service WorkerExecutor {
rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse);
rpc GetOplog(GetOplogRequest) returns (GetOplogResponse);
rpc SearchOplog(SearchOplogRequest) returns (SearchOplogResponse);
rpc ForkWorker(ForkWorkerRequest) returns (ForkWorkerResponse);

rpc ListDirectory(ListDirectoryRequest) returns (ListDirectoryResponse);
rpc GetFileContents(GetFileContentsRequest) returns (stream GetFileContentsResponse);
Expand All @@ -53,6 +54,20 @@ service WorkerExecutor {
rpc DeactivatePlugin(DeactivatePluginRequest) returns (DeactivatePluginResponse);
}

message ForkWorkerRequest {
golem.common.AccountId account_id = 1;
golem.worker.WorkerId source_worker_id = 2;
golem.worker.WorkerId target_worker_id = 3;
uint64 oplog_index_cutoff = 4;
}

message ForkWorkerResponse {
oneof result {
golem.common.Empty success = 1;
golem.worker.v1.WorkerExecutionError failure = 2;
}
}

message InvokeWorkerResponse {
oneof result {
golem.common.Empty success = 1;
Expand Down Expand Up @@ -206,6 +221,7 @@ message GetWorkerMetadataResponse {
message ResumeWorkerRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
optional bool force = 3;
}

message ResumeWorkerResponse {
Expand Down
78 changes: 78 additions & 0 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ impl OplogIndex {
}
}

pub struct OplogIndexRange {
current: u64,
end: u64,
}

impl Iterator for OplogIndexRange {
type Item = OplogIndex;

fn next(&mut self) -> Option<Self::Item> {
if self.current <= self.end {
let current = self.current;
self.current += 1; // Move forward
Some(OplogIndex(current))
} else {
None
}
}
}

impl OplogIndexRange {
pub fn new(start: OplogIndex, end: OplogIndex) -> OplogIndexRange {
OplogIndexRange {
current: start.0,
end: end.0,
}
}
}

impl Display for OplogIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
Expand Down Expand Up @@ -735,6 +763,56 @@ impl OplogEntry {
_ => None,
}
}

pub fn update_worker_id(&self, worker_id: &WorkerId) -> Option<OplogEntry> {
match self {
OplogEntry::CreateV1 {
timestamp,
component_version,
args,
env,
account_id,
parent,
component_size,
initial_total_linear_memory_size,
worker_id: _,
} => Some(OplogEntry::CreateV1 {
timestamp: *timestamp,
worker_id: worker_id.clone(),
component_version: *component_version,
args: args.clone(),
env: env.clone(),
account_id: account_id.clone(),
parent: parent.clone(),
component_size: *component_size,
initial_total_linear_memory_size: *initial_total_linear_memory_size,
}),
OplogEntry::Create {
timestamp,
component_version,
args,
env,
account_id,
parent,
component_size,
initial_total_linear_memory_size,
initial_active_plugins,
worker_id: _,
} => Some(OplogEntry::Create {
timestamp: *timestamp,
worker_id: worker_id.clone(),
component_version: *component_version,
args: args.clone(),
env: env.clone(),
account_id: account_id.clone(),
parent: parent.clone(),
component_size: *component_size,
initial_total_linear_memory_size: *initial_total_linear_memory_size,
initial_active_plugins: initial_active_plugins.clone(),
}),
_ => None,
}
}
}

/// Describes a pending update
Expand Down
73 changes: 64 additions & 9 deletions golem-test-framework/src/components/worker_service/forwarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ use async_trait::async_trait;
use golem_api_grpc::proto::golem::common::{Empty, ResourceLimits};
use golem_api_grpc::proto::golem::worker::v1::worker_service_client::WorkerServiceClient;
use golem_api_grpc::proto::golem::worker::v1::{
ConnectWorkerRequest, DeleteWorkerRequest, DeleteWorkerResponse, GetFileContentsRequest,
GetOplogRequest, GetOplogResponse, GetOplogSuccessResponse, GetWorkerMetadataRequest,
GetWorkerMetadataResponse, InterruptWorkerRequest, InterruptWorkerResponse,
InvokeAndAwaitJsonRequest, InvokeAndAwaitJsonResponse, InvokeAndAwaitRequest,
InvokeAndAwaitResponse, InvokeJsonRequest, InvokeRequest, InvokeResponse,
LaunchNewWorkerRequest, LaunchNewWorkerResponse, LaunchNewWorkerSuccessResponse,
ListDirectoryRequest, ListDirectoryResponse, ListDirectorySuccessResponse, ResumeWorkerRequest,
ResumeWorkerResponse, SearchOplogRequest, SearchOplogResponse, SearchOplogSuccessResponse,
UpdateWorkerRequest, UpdateWorkerResponse, WorkerError,
ConnectWorkerRequest, DeleteWorkerRequest, DeleteWorkerResponse, ForkWorkerRequest,
ForkWorkerResponse, GetFileContentsRequest, GetOplogRequest, GetOplogResponse,
GetOplogSuccessResponse, GetWorkerMetadataRequest, GetWorkerMetadataResponse,
InterruptWorkerRequest, InterruptWorkerResponse, InvokeAndAwaitJsonRequest,
InvokeAndAwaitJsonResponse, InvokeAndAwaitRequest, InvokeAndAwaitResponse, InvokeJsonRequest,
InvokeRequest, InvokeResponse, LaunchNewWorkerRequest, LaunchNewWorkerResponse,
LaunchNewWorkerSuccessResponse, ListDirectoryRequest, ListDirectoryResponse,
ListDirectorySuccessResponse, ResumeWorkerRequest, ResumeWorkerResponse, SearchOplogRequest,
SearchOplogResponse, SearchOplogSuccessResponse, UpdateWorkerRequest, UpdateWorkerResponse,
WorkerError,
};
use golem_api_grpc::proto::golem::worker::{InvokeResult, LogEvent, WorkerId};
use golem_api_grpc::proto::golem::workerexecutor::v1::CreateWorkerRequest;
Expand Down Expand Up @@ -457,6 +458,7 @@ impl WorkerService for ForwardingWorkerService {
}
.into(),
),
force: request.force,
})
.await;

Expand Down Expand Up @@ -809,6 +811,59 @@ impl WorkerService for ForwardingWorkerService {
Ok(Bytes::from(bytes))
}

async fn fork_worker(
&self,
fork_worker_request: ForkWorkerRequest,
) -> crate::Result<ForkWorkerResponse> {
let mut retry_count = Self::RETRY_COUNT;
let result = loop {
let result = self
.worker_executor
.client()
.await?
.fork_worker(workerexecutor::v1::ForkWorkerRequest {
source_worker_id: fork_worker_request.source_worker_id.clone(),
target_worker_id: fork_worker_request.target_worker_id.clone(),
account_id: Some(
AccountId {
value: "test-account".to_string(),
}
.into(),
),
oplog_index_cutoff: fork_worker_request.oplog_index_cutoff,
})
.await;

if Self::should_retry(&mut retry_count, &result) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
} else {
break result;
}
};
let result = result?.into_inner();

match result.result {
None => Err(anyhow!(
"No response from golem-worker-executor fork-worker call"
)),
Some(workerexecutor::v1::fork_worker_response::Result::Success(_)) => {
Ok(ForkWorkerResponse {
result: Some(worker::v1::fork_worker_response::Result::Success(Empty {})),
})
}
Some(workerexecutor::v1::fork_worker_response::Result::Failure(error)) => {
Ok(ForkWorkerResponse {
result: Some(worker::v1::fork_worker_response::Result::Error(
WorkerError {
error: Some(worker::v1::worker_error::Error::InternalError(error)),
},
)),
})
}
}
}

fn private_host(&self) -> String {
panic!("No real golem-worker-service, forwarding requests to worker-executor");
}
Expand Down
28 changes: 21 additions & 7 deletions golem-test-framework/src/components/worker_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ use anyhow::anyhow;
use golem_api_grpc::proto::golem::worker::v1::worker_service_client::WorkerServiceClient;
use golem_api_grpc::proto::golem::worker::v1::{
get_file_contents_response, ConnectWorkerRequest, DeleteWorkerRequest, DeleteWorkerResponse,
GetFileContentsRequest, GetOplogRequest, GetOplogResponse, GetWorkerMetadataRequest,
GetWorkerMetadataResponse, GetWorkersMetadataRequest, GetWorkersMetadataResponse,
InterruptWorkerRequest, InterruptWorkerResponse, InvokeAndAwaitJsonRequest,
InvokeAndAwaitJsonResponse, InvokeAndAwaitRequest, InvokeAndAwaitResponse, InvokeJsonRequest,
InvokeRequest, InvokeResponse, LaunchNewWorkerRequest, LaunchNewWorkerResponse,
ListDirectoryRequest, ListDirectoryResponse, ResumeWorkerRequest, ResumeWorkerResponse,
SearchOplogRequest, SearchOplogResponse, UpdateWorkerRequest, UpdateWorkerResponse,
ForkWorkerRequest, ForkWorkerResponse, GetFileContentsRequest, GetOplogRequest,
GetOplogResponse, GetWorkerMetadataRequest, GetWorkerMetadataResponse,
GetWorkersMetadataRequest, GetWorkersMetadataResponse, InterruptWorkerRequest,
InterruptWorkerResponse, InvokeAndAwaitJsonRequest, InvokeAndAwaitJsonResponse,
InvokeAndAwaitRequest, InvokeAndAwaitResponse, InvokeJsonRequest, InvokeRequest,
InvokeResponse, LaunchNewWorkerRequest, LaunchNewWorkerResponse, ListDirectoryRequest,
ListDirectoryResponse, ResumeWorkerRequest, ResumeWorkerResponse, SearchOplogRequest,
SearchOplogResponse, UpdateWorkerRequest, UpdateWorkerResponse,
};
use golem_api_grpc::proto::golem::worker::LogEvent;

Expand Down Expand Up @@ -241,6 +242,19 @@ pub trait WorkerService {
Ok(Bytes::from(bytes))
}

async fn fork_worker(
&self,
fork_worker_request: ForkWorkerRequest,
) -> crate::Result<ForkWorkerResponse> {
let response = self
.client()
.await?
.fork_worker(fork_worker_request)
.await?;

Ok(response.into_inner())
}

fn private_host(&self) -> String;
fn private_http_port(&self) -> u16;
fn private_grpc_port(&self) -> u16;
Expand Down
Loading

0 comments on commit be5e969

Please sign in to comment.