Skip to content

Commit 2ba3209

Browse files
committed
Retry on finished uploads
1 parent caa7c05 commit 2ba3209

File tree

2 files changed

+69
-20
lines changed

2 files changed

+69
-20
lines changed

crates/core/src/sync/interface.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use core::cell::RefCell;
22
use core::ffi::{c_int, c_void};
33

4+
use alloc::borrow::Cow;
45
use alloc::boxed::Box;
56
use alloc::rc::Rc;
67
use alloc::string::ToString;
@@ -28,6 +29,7 @@ pub enum SyncEvent<'a> {
2829
Initialize,
2930
TearDown,
3031
DidRefreshToken,
32+
UploadFinished,
3133
TextLine { data: &'a str },
3234
BinaryLine { data: &'a [u8] },
3335
}
@@ -37,7 +39,7 @@ pub enum SyncEvent<'a> {
3739
pub enum Instruction {
3840
LogLine {
3941
severity: LogSeverity,
40-
line: String,
42+
line: Cow<'static, str>,
4143
},
4244
UpdateSyncStatus {
4345
status: Rc<RefCell<DownloadSyncStatus>>,
@@ -135,6 +137,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
135137
},
136138
}),
137139
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
140+
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
138141
_ => {
139142
return Err(SQLiteError(
140143
ResultCode::MISUSE,

crates/core/src/sync/streaming_sync.rs

+65-19
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,13 @@ impl StreamingSyncIteration {
208208
}
209209

210210
async fn run(mut self) -> Result<(), SQLiteError> {
211-
let mut validated = None::<OwnedCheckpoint>;
212-
let mut applied = None::<OwnedCheckpoint>;
213-
214211
let mut target = SyncTarget::BeforeCheckpoint(self.prepare_request().await?);
215212

213+
// A checkpoint that has been fully received and validated, but couldn't be applied due to
214+
// pending local data. We will retry applying this checkpoint when the client SDK informs us
215+
// that it has finished uploading changes.
216+
let mut validated_but_not_applied = None::<OwnedCheckpoint>;
217+
216218
loop {
217219
let event = Self::receive_event().await;
218220

@@ -223,14 +225,44 @@ impl StreamingSyncIteration {
223225
SyncEvent::TearDown => break,
224226
SyncEvent::TextLine { data } => serde_json::from_str(data)?,
225227
SyncEvent::BinaryLine { data } => bson::from_bytes(data)?,
228+
SyncEvent::UploadFinished => {
229+
if let Some(checkpoint) = validated_but_not_applied.take() {
230+
let result = self.adapter.sync_local(&checkpoint, None)?;
231+
232+
match result {
233+
SyncLocalResult::ChangesApplied => {
234+
event.instructions.push(Instruction::LogLine {
235+
severity: LogSeverity::DEBUG,
236+
line: "Applied pending checkpoint after completed upload"
237+
.into(),
238+
});
239+
240+
self.handle_checkpoint_applied(&checkpoint, event)?;
241+
}
242+
_ => {
243+
event.instructions.push(Instruction::LogLine {
244+
severity: LogSeverity::WARNING,
245+
line: "Could not apply pending checkpoint even after completed upload"
246+
.into(),
247+
});
248+
}
249+
}
250+
}
251+
252+
continue;
253+
}
226254
SyncEvent::DidRefreshToken => {
227255
// Break so that the client SDK starts another iteration.
228256
break;
229257
}
230258
};
231259

260+
self.status
261+
.update(|s| s.mark_connected(), &mut event.instructions);
262+
232263
match line {
233264
SyncLine::Checkpoint(checkpoint) => {
265+
validated_but_not_applied = None;
234266
let to_delete = target.track_checkpoint(&checkpoint);
235267

236268
self.adapter
@@ -252,6 +284,7 @@ impl StreamingSyncIteration {
252284
};
253285

254286
target.apply_diff(&diff);
287+
validated_but_not_applied = None;
255288
self.adapter
256289
.delete_buckets(diff.removed_buckets.iter().copied())?;
257290

@@ -280,29 +313,25 @@ impl StreamingSyncIteration {
280313
// await new Promise((resolve) => setTimeout(resolve, 50));
281314
event.instructions.push(Instruction::LogLine {
282315
severity: LogSeverity::WARNING,
283-
line: format!("Could not apply checkpoint, {checkpoint_result}"),
316+
line: format!("Could not apply checkpoint, {checkpoint_result}")
317+
.into(),
284318
});
285319
break;
286320
}
287321
SyncLocalResult::PendingLocalChanges => {
288322
event.instructions.push(Instruction::LogLine {
289-
severity: LogSeverity::WARNING,
290-
line: format!("TODO: Await pending uploads and try again"),
291-
});
292-
break;
323+
severity: LogSeverity::INFO,
324+
line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(),
325+
});
326+
327+
validated_but_not_applied = Some(target.clone());
293328
}
294329
SyncLocalResult::ChangesApplied => {
295330
event.instructions.push(Instruction::LogLine {
296331
severity: LogSeverity::DEBUG,
297-
line: format!("Validated and applied checkpoint"),
332+
line: "Validated and applied checkpoint".into(),
298333
});
299-
event.instructions.push(Instruction::DidCompleteSync {});
300-
301-
let now = self.adapter.now()?;
302-
self.status.update(
303-
|status| status.applied_checkpoint(target, now),
304-
&mut event.instructions,
305-
);
334+
self.handle_checkpoint_applied(target, event)?;
306335
}
307336
}
308337
}
@@ -328,7 +357,8 @@ impl StreamingSyncIteration {
328357
severity: LogSeverity::WARNING,
329358
line: format!(
330359
"Could not apply partial checkpoint, {checkpoint_result}"
331-
),
360+
)
361+
.into(),
332362
});
333363
break;
334364
}
@@ -407,6 +437,22 @@ impl StreamingSyncIteration {
407437
.push(Instruction::EstablishSyncStream { request });
408438
Ok(local_bucket_names)
409439
}
440+
441+
fn handle_checkpoint_applied(
442+
&mut self,
443+
target: &OwnedCheckpoint,
444+
event: &mut ActiveEvent,
445+
) -> Result<(), ResultCode> {
446+
event.instructions.push(Instruction::DidCompleteSync {});
447+
448+
let now = self.adapter.now()?;
449+
self.status.update(
450+
|status| status.applied_checkpoint(target, now),
451+
&mut event.instructions,
452+
);
453+
454+
Ok(())
455+
}
410456
}
411457

412458
#[derive(Debug)]
@@ -451,7 +497,7 @@ impl SyncTarget {
451497
}
452498
}
453499

454-
#[derive(Debug)]
500+
#[derive(Debug, Clone)]
455501
pub struct OwnedCheckpoint {
456502
pub last_op_id: i64,
457503
pub write_checkpoint: Option<i64>,
@@ -485,7 +531,7 @@ impl OwnedCheckpoint {
485531
}
486532
}
487533

488-
#[derive(Debug)]
534+
#[derive(Debug, Clone)]
489535
pub struct OwnedBucketChecksum {
490536
pub bucket: String,
491537
pub checksum: i32,

0 commit comments

Comments
 (0)