Skip to content

Commit

Permalink
Merge branch 'main' into app-manifest-examples-and-related-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
noise64 committed Jan 13, 2025
2 parents b2dd826 + 376c1c7 commit 7b4bdf5
Show file tree
Hide file tree
Showing 80 changed files with 2,603 additions and 787 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions golem-api-grpc/proto/golem/worker/v1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ service WorkerService {

rpc ActivatePlugin(ActivatePluginRequest) returns (ActivatePluginResponse);
rpc DeactivatePlugin(DeactivatePluginRequest) returns (DeactivatePluginResponse);

rpc ForkWorker(ForkWorkerRequest) returns (ForkWorkerResponse);

}



message LaunchNewWorkerRequest {
golem.component.ComponentId componentId = 1;
string name = 2;
Expand All @@ -65,11 +70,20 @@ message LaunchNewWorkerResponse {
}
}



message LaunchNewWorkerSuccessResponse {
golem.worker.WorkerId workerId = 1;
uint64 component_version = 2;
}

message ForkWorkerRequest {
golem.worker.WorkerId source_worker_id = 2;
golem.worker.WorkerId target_worker_id = 3;
uint64 oplog_index_cutoff = 4;
}


message CompletePromiseRequest {
golem.worker.WorkerId workerId = 1;
golem.worker.CompleteParameters completeParameters = 2;
Expand All @@ -82,6 +96,14 @@ message CompletePromiseResponse {
}
}

message ForkWorkerResponse {
oneof result {
golem.common.Empty success = 1;
golem.worker.v1.WorkerError error = 2;
}
}


message DeleteWorkerRequest {
golem.worker.WorkerId workerId = 1;
}
Expand Down Expand Up @@ -178,6 +200,7 @@ message InvokeJsonRequest {

message ResumeWorkerRequest {
golem.worker.WorkerId workerId = 1;
optional bool force = 2;
}

message ResumeWorkerResponse {
Expand Down
16 changes: 16 additions & 0 deletions golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ service WorkerExecutor {
rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse);
rpc GetOplog(GetOplogRequest) returns (GetOplogResponse);
rpc SearchOplog(SearchOplogRequest) returns (SearchOplogResponse);
rpc ForkWorker(ForkWorkerRequest) returns (ForkWorkerResponse);

rpc ListDirectory(ListDirectoryRequest) returns (ListDirectoryResponse);
rpc GetFileContents(GetFileContentsRequest) returns (stream GetFileContentsResponse);
Expand All @@ -53,6 +54,20 @@ service WorkerExecutor {
rpc DeactivatePlugin(DeactivatePluginRequest) returns (DeactivatePluginResponse);
}

message ForkWorkerRequest {
golem.common.AccountId account_id = 1;
golem.worker.WorkerId source_worker_id = 2;
golem.worker.WorkerId target_worker_id = 3;
uint64 oplog_index_cutoff = 4;
}

message ForkWorkerResponse {
oneof result {
golem.common.Empty success = 1;
golem.worker.v1.WorkerExecutionError failure = 2;
}
}

message InvokeWorkerResponse {
oneof result {
golem.common.Empty success = 1;
Expand Down Expand Up @@ -206,6 +221,7 @@ message GetWorkerMetadataResponse {
message ResumeWorkerRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
optional bool force = 3;
}

message ResumeWorkerResponse {
Expand Down
92 changes: 85 additions & 7 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ impl OplogIndex {
}
}

pub struct OplogIndexRange {
current: u64,
end: u64,
}

impl Iterator for OplogIndexRange {
type Item = OplogIndex;

fn next(&mut self) -> Option<Self::Item> {
if self.current <= self.end {
let current = self.current;
self.current += 1; // Move forward
Some(OplogIndex(current))
} else {
None
}
}
}

impl OplogIndexRange {
pub fn new(start: OplogIndex, end: OplogIndex) -> OplogIndexRange {
OplogIndexRange {
current: start.0,
end: end.0,
}
}
}

impl Display for OplogIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
Expand Down Expand Up @@ -288,7 +316,7 @@ pub enum OplogEntry {
timestamp: Timestamp,
function_name: String,
response: OplogPayload,
wrapped_function_type: WrappedFunctionType,
wrapped_function_type: DurableFunctionType, // TODO: rename in Golem 2.0
},
/// The worker has been invoked
ExportedFunctionInvoked {
Expand Down Expand Up @@ -405,7 +433,7 @@ pub enum OplogEntry {
function_name: String,
request: OplogPayload,
response: OplogPayload,
wrapped_function_type: WrappedFunctionType,
wrapped_function_type: DurableFunctionType, // TODO: rename in Golem 2.0
},
/// The current version of the Create entry (previous is CreateV1)
Create {
Expand Down Expand Up @@ -648,14 +676,14 @@ impl OplogEntry {
wrapped_function_type,
..
} => match wrapped_function_type {
WrappedFunctionType::WriteRemoteBatched(Some(begin_index))
DurableFunctionType::WriteRemoteBatched(Some(begin_index))
if *begin_index == idx =>
{
true
}
WrappedFunctionType::ReadLocal => true,
WrappedFunctionType::WriteLocal => true,
WrappedFunctionType::ReadRemote => true,
DurableFunctionType::ReadLocal => true,
DurableFunctionType::WriteLocal => true,
DurableFunctionType::ReadRemote => true,
_ => false,
},
OplogEntry::ExportedFunctionCompleted { .. } => false,
Expand Down Expand Up @@ -735,6 +763,56 @@ impl OplogEntry {
_ => None,
}
}

pub fn update_worker_id(&self, worker_id: &WorkerId) -> Option<OplogEntry> {
match self {
OplogEntry::CreateV1 {
timestamp,
component_version,
args,
env,
account_id,
parent,
component_size,
initial_total_linear_memory_size,
worker_id: _,
} => Some(OplogEntry::CreateV1 {
timestamp: *timestamp,
worker_id: worker_id.clone(),
component_version: *component_version,
args: args.clone(),
env: env.clone(),
account_id: account_id.clone(),
parent: parent.clone(),
component_size: *component_size,
initial_total_linear_memory_size: *initial_total_linear_memory_size,
}),
OplogEntry::Create {
timestamp,
component_version,
args,
env,
account_id,
parent,
component_size,
initial_total_linear_memory_size,
initial_active_plugins,
worker_id: _,
} => Some(OplogEntry::Create {
timestamp: *timestamp,
worker_id: worker_id.clone(),
component_version: *component_version,
args: args.clone(),
env: env.clone(),
account_id: account_id.clone(),
parent: parent.clone(),
component_size: *component_size,
initial_total_linear_memory_size: *initial_total_linear_memory_size,
initial_active_plugins: initial_active_plugins.clone(),
}),
_ => None,
}
}
}

/// Describes a pending update
Expand Down Expand Up @@ -779,7 +857,7 @@ pub enum OplogPayload {
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub enum WrappedFunctionType {
pub enum DurableFunctionType {
/// The side-effect reads from the worker's local state (for example local file system,
/// random generator, etc.)
ReadLocal,
Expand Down
Loading

0 comments on commit 7b4bdf5

Please sign in to comment.