-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker revert feature and executor refactorings #1291
Conversation
Err(err) | ||
} | ||
})?, | ||
extensions: WorkerStatusRecord::handle_decoded_extensions(Decode::decode(decoder))?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we expose some new type here instead of WorkerStatusRecordExtensions (while still reading the data using WorkerStatusRecordExtensions)? This way we can handle all the conversions between different versions at read time (as we do now), but get some typesafety back when working with the decoded data. That should also make these unreachable!()
s unnecessary https://github.com/golemcloud/golem/pull/1291/files#diff-198f85d728dbf2f361783b09a7b67c20b1ee595b8fdc18ef6277d22d1f2a44f4R644
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seemed like a good idea and I tried but then reverted because I could not find a nice way to do it. If the public type owns the extension fields, they have to be cloned in the encode implementation which is not very nice (this struct is serialized a lot). If the serializable enum is just a view with references to the public type, then you cannot easily implement the decode side. So for now let's leave it as is and we can try again to make it nicer next time we touch it.
pub async fn update_worker_status(&self, f: impl FnOnce(&mut WorkerStatusRecord)) { | ||
let mut status = self.get_worker_status_record(); | ||
|
||
let mut skipped_regions = self.state.replay_state.skipped_regions().await; | ||
let (pending_updates, extra_deleted_regions) = self.public_state.worker().pending_updates(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let (pending_updates, extra_deleted_regions) = self.public_state.worker().pending_updates(); | |
let (pending_updates, extra_skipped_regions) = self.public_state.worker().pending_updates(); |
If I understand the new naming correctly
@@ -1096,7 +1109,10 @@ impl<Ctx: WorkerCtx> UpdateManagement for DurableWorkerCtx<Ctx> { | |||
timestamp, | |||
target_version, | |||
details: details.clone(), | |||
}) | |||
}); | |||
if status.skipped_regions.is_overridden() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a comment explaining what this does? It's very not clear to me.
@@ -41,7 +41,7 @@ pub struct ReplayState { | |||
|
|||
#[derive(Clone)] | |||
struct InternalReplayState { | |||
pub deleted_regions: DeletedRegions, | |||
pub skipped_regions: DeletedRegions, | |||
pub next_deleted_region: Option<OplogRegion>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub next_deleted_region: Option<OplogRegion>, | |
pub next_skipped_region: Option<OplogRegion>, |
let internal = self.internal.read().await; | ||
internal.deleted_regions.clone() | ||
internal.skipped_regions.clone() | ||
} | ||
|
||
pub async fn add_deleted_region(&mut self, region: OplogRegion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn add_deleted_region(&mut self, region: OplogRegion) { | |
pub async fn add_skipped_region(&mut self, region: OplogRegion) { |
} | ||
|
||
pub async fn add_deleted_region(&mut self, region: OplogRegion) { | ||
let mut internal = self.internal.write().await; | ||
internal.deleted_regions.add(region); | ||
internal.skipped_regions.add(region); | ||
} | ||
|
||
pub async fn is_in_deleted_region(&self, oplog_index: OplogIndex) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn is_in_deleted_region(&self, oplog_index: OplogIndex) -> bool { | |
pub async fn is_in_skipped_region(&self, oplog_index: OplogIndex) -> bool { |
} | ||
|
||
/// Starting from the end of the oplog, find the Nth ExportedFunctionInvoked entry's index. | ||
async fn find_nth_invocation_from_end(&self, n: usize) -> Option<OplogIndex> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will / should this behave if we have updates / memory increases in the oplog region that is getting deleted?
We might want to delete region belonging to invocations, but keep other regions that belong to actions that users performed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature reverts everything to a given point in the oplog. Including worker updates, pending invocations, etc. I think this is what most users would expect. We could also have some more advanced "history editing" feature where you just remove random invocations and leave the rest of the history as is. I think such a feature could go into the debugging service later.
.get(&owned_worker_id) | ||
.await | ||
.ok_or(GolemError::worker_not_found(worker_id.clone()))?; | ||
let metadata = Worker::<Ctx>::get_latest_metadata(&self.services, &owned_worker_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wohoo, nice cleanup 🎉
Resolves #1180
Resolves #1181
Primary change
Implements the revert operation on workers, and adds it to the following layers:
The implementation relies on the previously existing "deleted oplog region" support in the worker executor. The idea is that we figure out an oplog range (either directly from the request - or if the request said "last N invocations" we read back the oplog to find the beginning of the Nth invocation starting from the end). We mark the region from this oplog to the current end of the oplog as "deleted" and add a new oplog entry (
Revert
) to the oplog to persist this information.During replay and status calculations the deleted region is skipped.
The revert feature required a slightly different kind of deleted region as the one that we used before. Previously we used it for two things:
jump
operation, that can be initiated from a worker to "travel back in time". it can only jump within an invocationmanual update
operation to skip the replay with the old version and immediately load the snapshot with the newBoth of these use cases require some of the oplog entries in the deleted regions to be still considered existing - for example pending invocations, invocation results, pending updates and update results. No matter if we jumped / updated our worker, these informations are valid past and we want them to be preserved.
With
revert
the situation is different, if we revert we want to forget everything in that part of the oplog (we revert invocations and updates, we want to completely ignore their oplog entries).So I introduced a new
DeletedRegions
field inWorkerStatusRecord
- called deleted regions, and renamed the old one to "skipped regions".Additional changes
Worker Executor gRPC layer cleanup
Some code duplication has been eliminated from the worker executor's
grpc.rs
file - how many of the operations were querying the latest status of the worker before deciding what to do. Not only reduced the redundancy but it also has been optimized, so if the worker is in memory, the information is got directly from the in-memory worker instance.Status calculation refactoring and fixes
The worker executor can reconstruct the worker's status (
WorkerStatusRecord
) purely by reading the oplog, but it is optimized by having this record cached and only reading the new entries to get the up-to-date version of the status. This code became more and more complex as the worker status evolved and this latest change of the concept of a "deleted region" vs "skipped region" just made it even more complex. While working on it I've found several bugs in it and I fixed them as well as wrote several tests to verify the algorithm.This is just a first step, which can eventually lead to actually share the code that manipulates the worker status between the place where it gets applied and written to the oplog, and the above mentioned code that performs the calculation based on the oplog. This step is not done in this PR.
Physically the
worker.rs
had been promoted to a module directory and the status management was split from it into a separate submodule.Known issues
There is a new oplog entry (
Revert
) and we have a WIT definedoplog-entry
variant for the host-exposed oplog query and search feature.I did not add the new entry to this because I think it breaks the API and there will be more new oplog entries for the rest of the remaining Golem 1.2 features. So probably in the end we have to create a new version of the oplog API, but I don't think it makes sense to do it after each new entry in between the milestones. So for now the revert entry is represented by a fake
log
entry in the host-function initiated oplog queries.