Skip to content

Commit caa7c05

Browse files
committed
Handle keepalive messages
1 parent 2b229c9 commit caa7c05

File tree

4 files changed

+40
-13
lines changed

4 files changed

+40
-13
lines changed

crates/core/src/sync/interface.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub enum SyncControlRequest<'a> {
2727
pub enum SyncEvent<'a> {
2828
Initialize,
2929
TearDown,
30+
DidRefreshToken,
3031
TextLine { data: &'a str },
3132
BinaryLine { data: &'a [u8] },
3233
}
@@ -44,10 +45,14 @@ pub enum Instruction {
4445
EstablishSyncStream {
4546
request: StreamingSyncRequest,
4647
},
47-
// These two are defined like this because deserializers in Kotlin can't support either an
48+
FetchCredentials {
49+
did_expire: bool,
50+
},
51+
// These are defined like this because deserializers in Kotlin can't support either an
4852
// object or a literal value
49-
FlushFileSystem {},
5053
CloseSyncStream {},
54+
FlushFileSystem {},
55+
DidCompleteSync {},
5156
}
5257

5358
#[derive(Serialize)]
@@ -129,6 +134,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
129134
));
130135
},
131136
}),
137+
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
132138
_ => {
133139
return Err(SQLiteError(
134140
ResultCode::MISUSE,

crates/core/src/sync/line.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,20 @@ pub enum OpType {
118118
REMOVE,
119119
}
120120

121-
#[derive(Deserialize, Debug)]
121+
#[repr(transparent)]
122+
#[derive(Deserialize, Debug, Clone, Copy)]
122123
pub struct TokenExpiresIn(pub i32);
123124

125+
impl TokenExpiresIn {
126+
pub fn is_expired(self) -> bool {
127+
self.0 <= 0
128+
}
129+
130+
pub fn should_prefetch(self) -> bool {
131+
!self.is_expired() && self.0 <= 30
132+
}
133+
}
134+
124135
impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> {
125136
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
126137
where

crates/core/src/sync/streaming_sync.rs

+19-2
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ impl StreamingSyncIteration {
223223
SyncEvent::TearDown => break,
224224
SyncEvent::TextLine { data } => serde_json::from_str(data)?,
225225
SyncEvent::BinaryLine { data } => bson::from_bytes(data)?,
226+
SyncEvent::DidRefreshToken => {
227+
// Break so that the client SDK starts another iteration.
228+
break;
229+
}
226230
};
227231

228232
match line {
@@ -290,8 +294,9 @@ impl StreamingSyncIteration {
290294
SyncLocalResult::ChangesApplied => {
291295
event.instructions.push(Instruction::LogLine {
292296
severity: LogSeverity::DEBUG,
293-
line: format!("Validated checkpoint"),
297+
line: format!("Validated and applied checkpoint"),
294298
});
299+
event.instructions.push(Instruction::DidCompleteSync {});
295300

296301
let now = self.adapter.now()?;
297302
self.status.update(
@@ -347,7 +352,19 @@ impl StreamingSyncIteration {
347352
.update(|s| s.track_line(&data_line), &mut event.instructions);
348353
insert_bucket_operations(&self.adapter, &data_line)?;
349354
}
350-
SyncLine::KeepAlive(token_expires_in) => todo!(),
355+
SyncLine::KeepAlive(token) => {
356+
if token.is_expired() {
357+
// Token expired already - stop the connection immediately.
358+
event
359+
.instructions
360+
.push(Instruction::FetchCredentials { did_expire: true });
361+
break;
362+
} else if token.should_prefetch() {
363+
event
364+
.instructions
365+
.push(Instruction::FetchCredentials { did_expire: false });
366+
}
367+
}
351368
}
352369
}
353370

crates/core/src/sync/sync_status.rs

+1-8
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,8 @@ impl DownloadSyncStatus {
7373
self.downloading = None;
7474
self.priority_status.clear();
7575

76-
let lowest_priority = applied
77-
.buckets
78-
.values()
79-
.map(|bkt| bkt.priority)
80-
.max()
81-
.unwrap_or(BucketPriority::SENTINEL);
82-
8376
self.priority_status.push(SyncPriorityStatus {
84-
priority: lowest_priority,
77+
priority: BucketPriority::SENTINEL,
8578
last_synced_at: Some(now),
8679
has_synced: Some(true),
8780
});

0 commit comments

Comments
 (0)