Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker revert feature and executor refactorings #1291

Merged
merged 13 commits into from
Feb 10, 2025
Merged

Worker revert feature and executor refactorings #1291

merged 13 commits into from
Feb 10, 2025

Conversation

vigoo
Copy link
Contributor

@vigoo vigoo commented Feb 4, 2025

Resolves #1180
Resolves #1181

Primary change

Implements the revert operation on workers, and adds it to the following layers:

  • Exposed via gRPC in worker executor
  • Exposed via gRPC and REST API in worker service
  • Exposed in the CLI as a new worker command
  • Available in the test DSL (for both gRPC and REST APIs)

The implementation relies on the previously existing "deleted oplog region" support in the worker executor. The idea is that we figure out an oplog range (either directly from the request - or if the request said "last N invocations" we read back the oplog to find the beginning of the Nth invocation starting from the end). We mark the region from this oplog to the current end of the oplog as "deleted" and add a new oplog entry (Revert) to the oplog to persist this information.

During replay and status calculations the deleted region is skipped.

The revert feature required a slightly different kind of deleted region as the one that we used before. Previously we used it for two things:

  • the jump operation, that can be initiated from a worker to "travel back in time". it can only jump within an invocation
  • the manual update operation to skip the replay with the old version and immediately load the snapshot with the new

Both of these use cases require some of the oplog entries in the deleted regions to be still considered existing - for example pending invocations, invocation results, pending updates and update results. No matter if we jumped / updated our worker, these informations are valid past and we want them to be preserved.

With revert the situation is different, if we revert we want to forget everything in that part of the oplog (we revert invocations and updates, we want to completely ignore their oplog entries).

So I introduced a new DeletedRegions field in WorkerStatusRecord - called deleted regions, and renamed the old one to "skipped regions".

Additional changes

Worker Executor gRPC layer cleanup

Some code duplication has been eliminated from the worker executor's grpc.rs file - how many of the operations were querying the latest status of the worker before deciding what to do. Not only reduced the redundancy but it also has been optimized, so if the worker is in memory, the information is got directly from the in-memory worker instance.

Status calculation refactoring and fixes

The worker executor can reconstruct the worker's status (WorkerStatusRecord) purely by reading the oplog, but it is optimized by having this record cached and only reading the new entries to get the up-to-date version of the status. This code became more and more complex as the worker status evolved and this latest change of the concept of a "deleted region" vs "skipped region" just made it even more complex. While working on it I've found several bugs in it and I fixed them as well as wrote several tests to verify the algorithm.

This is just a first step, which can eventually lead to actually share the code that manipulates the worker status between the place where it gets applied and written to the oplog, and the above mentioned code that performs the calculation based on the oplog. This step is not done in this PR.

Physically the worker.rs had been promoted to a module directory and the status management was split from it into a separate submodule.

Known issues

There is a new oplog entry (Revert) and we have a WIT defined oplog-entry variant for the host-exposed oplog query and search feature.

I did not add the new entry to this because I think it breaks the API and there will be more new oplog entries for the rest of the remaining Golem 1.2 features. So probably in the end we have to create a new version of the oplog API, but I don't think it makes sense to do it after each new entry in between the milestones. So for now the revert entry is represented by a fake log entry in the host-function initiated oplog queries.

@vigoo vigoo changed the title Worker revert feature and executor refactorings, WIP Worker revert feature and executor refactorings Feb 7, 2025
@vigoo vigoo marked this pull request as ready for review February 7, 2025 15:13
Err(err)
}
})?,
extensions: WorkerStatusRecord::handle_decoded_extensions(Decode::decode(decoder))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expose some new type here instead of WorkerStatusRecordExtensions (while still reading the data using WorkerStatusRecordExtensions)? This way we can handle all the conversions between different versions at read time (as we do now), but get some typesafety back when working with the decoded data. That should also make these unreachable!()s unnecessary https://github.com/golemcloud/golem/pull/1291/files#diff-198f85d728dbf2f361783b09a7b67c20b1ee595b8fdc18ef6277d22d1f2a44f4R644

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed like a good idea and I tried but then reverted because I could not find a nice way to do it. If the public type owns the extension fields, they have to be cloned in the encode implementation which is not very nice (this struct is serialized a lot). If the serializable enum is just a view with references to the public type, then you cannot easily implement the decode side. So for now let's leave it as is and we can try again to make it nicer next time we touch it.

pub async fn update_worker_status(&self, f: impl FnOnce(&mut WorkerStatusRecord)) {
let mut status = self.get_worker_status_record();

let mut skipped_regions = self.state.replay_state.skipped_regions().await;
let (pending_updates, extra_deleted_regions) = self.public_state.worker().pending_updates();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let (pending_updates, extra_deleted_regions) = self.public_state.worker().pending_updates();
let (pending_updates, extra_skipped_regions) = self.public_state.worker().pending_updates();

If I understand the new naming correctly

@@ -1096,7 +1109,10 @@ impl<Ctx: WorkerCtx> UpdateManagement for DurableWorkerCtx<Ctx> {
timestamp,
target_version,
details: details.clone(),
})
});
if status.skipped_regions.is_overridden() {
Copy link
Contributor

@mschuwalow mschuwalow Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a comment explaining what this does? It's very not clear to me.

@@ -41,7 +41,7 @@ pub struct ReplayState {

#[derive(Clone)]
struct InternalReplayState {
pub deleted_regions: DeletedRegions,
pub skipped_regions: DeletedRegions,
pub next_deleted_region: Option<OplogRegion>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub next_deleted_region: Option<OplogRegion>,
pub next_skipped_region: Option<OplogRegion>,

let internal = self.internal.read().await;
internal.deleted_regions.clone()
internal.skipped_regions.clone()
}

pub async fn add_deleted_region(&mut self, region: OplogRegion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn add_deleted_region(&mut self, region: OplogRegion) {
pub async fn add_skipped_region(&mut self, region: OplogRegion) {

}

pub async fn add_deleted_region(&mut self, region: OplogRegion) {
let mut internal = self.internal.write().await;
internal.deleted_regions.add(region);
internal.skipped_regions.add(region);
}

pub async fn is_in_deleted_region(&self, oplog_index: OplogIndex) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn is_in_deleted_region(&self, oplog_index: OplogIndex) -> bool {
pub async fn is_in_skipped_region(&self, oplog_index: OplogIndex) -> bool {

}

/// Starting from the end of the oplog, find the Nth ExportedFunctionInvoked entry's index.
async fn find_nth_invocation_from_end(&self, n: usize) -> Option<OplogIndex> {
Copy link
Contributor

@mschuwalow mschuwalow Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will / should this behave if we have updates / memory increases in the oplog region that is getting deleted?
We might want to delete region belonging to invocations, but keep other regions that belong to actions that users performed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature reverts everything to a given point in the oplog. Including worker updates, pending invocations, etc. I think this is what most users would expect. We could also have some more advanced "history editing" feature where you just remove random invocations and leave the rest of the history as is. I think such a feature could go into the debugging service later.

.get(&owned_worker_id)
.await
.ok_or(GolemError::worker_not_found(worker_id.clone()))?;
let metadata = Worker::<Ctx>::get_latest_metadata(&self.services, &owned_worker_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wohoo, nice cleanup 🎉

@vigoo vigoo enabled auto-merge (squash) February 10, 2025 12:30
@vigoo vigoo merged commit eb3055b into main Feb 10, 2025
19 checks passed
@vigoo vigoo deleted the vigoo/undo branch February 10, 2025 12:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Expose the worker undo feature via REST and CLI Implement the undo API in worker executor
3 participants