Skip to content

Commit eb2e8c6

Browse files
committed
Fix deserializing checkpoints
1 parent 7f6f892 commit eb2e8c6

File tree

10 files changed

+228
-48
lines changed

10 files changed

+228
-48
lines changed

crates/core/src/bson/de.rs

+1-22
Original file line numberDiff line numberDiff line change
@@ -196,29 +196,8 @@ impl<'de, 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
196196
}
197197
}
198198

199-
fn deserialize_i32<V>(self, visitor: V) -> Result<V::Value, Self::Error>
200-
where
201-
V: Visitor<'de>,
202-
{
203-
// Since the sync service is written in JavaScript, we'll get numbers as doubles...
204-
let element_type = self.prepare_to_read_value()?;
205-
match element_type {
206-
ElementType::Int32 => visitor.visit_i32(self.parser.read_int32()?),
207-
ElementType::Double => {
208-
let value = self.parser.read_double()?;
209-
let converted: i32 = num_traits::cast(value).ok_or_else(|| {
210-
self.parser
211-
.error(ErrorKind::IllegalFloatToIntConversion(value))
212-
})?;
213-
214-
visitor.visit_i32(converted)
215-
}
216-
_ => self.deserialize_any(visitor),
217-
}
218-
}
219-
220199
forward_to_deserialize_any! {
221-
bool i8 i16 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
200+
bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
222201
bytes byte_buf unit unit_struct newtype_struct seq tuple
223202
tuple_struct map struct ignored_any identifier
224203
}

crates/core/src/bson/error.rs

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ pub enum ErrorKind {
3636
InvalidStateExpectedValue,
3737
ExpectedEnum { actual: ElementType },
3838
ExpectedString,
39-
IllegalFloatToIntConversion(f64),
4039
UnexpectedEndOfDocumentForEnumVariant,
4140
}
4241

crates/core/src/bson/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,9 @@ pub fn from_bytes<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> Result<T, BsonE
1515

1616
#[cfg(test)]
1717
mod test {
18-
use core::assert_matches::assert_matches;
19-
20-
use crate::sync::line::{Checkpoint, SyncLine};
18+
use crate::sync::line::SyncLine;
2119

2220
use super::*;
23-
use serde::de::DeserializeOwned;
2421

2522
#[test]
2623
fn test_hello_world() {

crates/core/src/sync/checksum.rs

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use core::{
2+
num::Wrapping,
3+
ops::{Add, AddAssign},
4+
};
5+
6+
use num_traits::float::FloatCore;
7+
use num_traits::Zero;
8+
use serde::{de::Visitor, Deserialize, Serialize};
9+
10+
/// A checksum as received from the sync service.
11+
///
12+
/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums
13+
/// should be a wrapping add.
14+
#[repr(transparent)]
15+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
16+
pub struct Checksum(Wrapping<u32>);
17+
18+
impl Checksum {
19+
pub const fn value(self) -> u32 {
20+
self.0 .0
21+
}
22+
23+
pub const fn from_value(value: u32) -> Self {
24+
Self(Wrapping(value))
25+
}
26+
27+
pub const fn from_i32(value: i32) -> Self {
28+
Self::from_value(value as u32)
29+
}
30+
31+
pub const fn bitcast_i32(self) -> i32 {
32+
self.value() as i32
33+
}
34+
}
35+
36+
impl Zero for Checksum {
37+
fn zero() -> Self {
38+
const { Self::from_value(0) }
39+
}
40+
41+
fn is_zero(&self) -> bool {
42+
self.value() == 0
43+
}
44+
}
45+
46+
impl Add for Checksum {
47+
type Output = Self;
48+
49+
#[inline]
50+
fn add(self, rhs: Self) -> Self::Output {
51+
Self(self.0 + rhs.0)
52+
}
53+
}
54+
55+
impl AddAssign for Checksum {
56+
#[inline]
57+
fn add_assign(&mut self, rhs: Self) {
58+
self.0 += rhs.0
59+
}
60+
}
61+
62+
impl From<u32> for Checksum {
63+
fn from(value: u32) -> Self {
64+
Self::from_value(value)
65+
}
66+
}
67+
68+
impl<'de> Deserialize<'de> for Checksum {
69+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
70+
where
71+
D: serde::Deserializer<'de>,
72+
{
73+
struct MyVisitor;
74+
75+
impl<'de> Visitor<'de> for MyVisitor {
76+
type Value = Checksum;
77+
78+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
79+
write!(formatter, "a number to interpret as a checksum")
80+
}
81+
82+
fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
83+
where
84+
E: serde::de::Error,
85+
{
86+
Ok(v.into())
87+
}
88+
89+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
90+
where
91+
E: serde::de::Error,
92+
{
93+
let as_u32: u32 = v.try_into().map_err(|_| {
94+
E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int")
95+
})?;
96+
Ok(as_u32.into())
97+
}
98+
99+
fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
100+
where
101+
E: serde::de::Error,
102+
{
103+
Ok(Checksum::from_i32(v))
104+
}
105+
106+
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
107+
where
108+
E: serde::de::Error,
109+
{
110+
// This is supposed to be an u32, but it could also be a i32 that we need to
111+
// normalize.
112+
let min: i64 = u32::MIN.into();
113+
let max: i64 = u32::MAX.into();
114+
115+
if v >= min && v <= max {
116+
return Ok(Checksum::from(v as u32));
117+
}
118+
119+
let as_i32: i32 = v.try_into().map_err(|_| {
120+
E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int")
121+
})?;
122+
Ok(Checksum::from_i32(as_i32))
123+
}
124+
125+
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
126+
where
127+
E: serde::de::Error,
128+
{
129+
if !v.is_finite() || v.trunc() != v {
130+
return Err(E::invalid_value(
131+
serde::de::Unexpected::Float(v),
132+
&"a whole number",
133+
));
134+
}
135+
136+
self.visit_i64(v as i64)
137+
}
138+
}
139+
140+
deserializer.deserialize_u32(MyVisitor)
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod test {
146+
use super::Checksum;
147+
148+
#[test]
149+
pub fn test_binary_representation() {
150+
assert_eq!(Checksum::from_i32(-1).value(), u32::MAX);
151+
assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX);
152+
assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1);
153+
}
154+
155+
fn deserialize(from: &str) -> Checksum {
156+
serde_json::from_str(from).expect("should deserialize")
157+
}
158+
159+
#[test]
160+
pub fn test_deserialize() {
161+
assert_eq!(deserialize("0").value(), 0);
162+
assert_eq!(deserialize("-1").value(), u32::MAX);
163+
assert_eq!(deserialize("-1.0").value(), u32::MAX);
164+
165+
assert_eq!(deserialize("3573495687").value(), 3573495687);
166+
assert_eq!(deserialize("3573495687.0").value(), 3573495687);
167+
assert_eq!(deserialize("-721471609.0").value(), 3573495687);
168+
}
169+
}

crates/core/src/sync/line.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::json_writer::JsonWriter;
1313
use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64};
1414

1515
use super::bucket_priority::BucketPriority;
16+
use super::Checksum;
1617

1718
#[derive(Deserialize, Debug)]
1819

@@ -73,7 +74,7 @@ pub struct CheckpointPartiallyComplete {
7374
#[derive(Deserialize, Debug)]
7475
pub struct BucketChecksum<'a> {
7576
pub bucket: &'a str,
76-
pub checksum: i32,
77+
pub checksum: Checksum,
7778
pub priority: Option<BucketPriority>,
7879
pub count: Option<i64>,
7980
#[serde(default)]
@@ -95,7 +96,7 @@ pub struct DataLine<'a> {
9596

9697
#[derive(Deserialize, Debug)]
9798
pub struct OplogEntry<'a> {
98-
pub checksum: i32,
99+
pub checksum: Checksum,
99100
#[serde(deserialize_with = "deserialize_string_to_i64")]
100101
pub op_id: i64,
101102
pub op: OpType,
@@ -324,7 +325,7 @@ mod tests {
324325
assert_eq!(checkpoint.buckets.len(), 1);
325326
let bucket = &checkpoint.buckets[0];
326327
assert_eq!(bucket.bucket, "a");
327-
assert_eq!(bucket.checksum, 10);
328+
assert_eq!(bucket.checksum, 10u32.into());
328329
assert_eq!(bucket.priority, None);
329330

330331
let SyncLine::Checkpoint(checkpoint) = deserialize(
@@ -336,7 +337,7 @@ mod tests {
336337
assert_eq!(checkpoint.buckets.len(), 1);
337338
let bucket = &checkpoint.buckets[0];
338339
assert_eq!(bucket.bucket, "a");
339-
assert_eq!(bucket.checksum, 10);
340+
assert_eq!(bucket.checksum, 10u32.into());
340341
assert_eq!(bucket.priority, Some(BucketPriority { number: 1 }));
341342

342343
assert_matches!(
@@ -400,10 +401,12 @@ mod tests {
400401
assert_eq!(data.next_after, None);
401402

402403
assert_eq!(data.data.len(), 1);
404+
let entry = &data.data[0];
405+
assert_eq!(entry.checksum, 10u32.into());
403406
assert_matches!(
404407
&data.data[0],
405408
OplogEntry {
406-
checksum: 10,
409+
checksum: _,
407410
op_id: 1,
408411
object_id: Some(_),
409412
object_type: Some(_),

crates/core/src/sync/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use sqlite_nostd::{self as sqlite, ResultCode};
22

33
pub mod bucket_priority;
4+
mod checksum;
45
mod interface;
56
pub mod line;
67
pub mod operations;
78
pub mod storage_adapter;
89
mod streaming_sync;
910
mod sync_status;
1011

12+
pub use checksum::Checksum;
13+
1114
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
1215
interface::register(db)
1316
}

crates/core/src/sync/operations.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use alloc::{borrow::Cow, format};
2+
use num_traits::Zero;
23
use sqlite_nostd::Connection;
34
use sqlite_nostd::{self as sqlite, ResultCode};
45

@@ -8,6 +9,7 @@ use crate::{
89
};
910

1011
use super::line::OplogData;
12+
use super::Checksum;
1113
use super::{
1214
line::{DataLine, OpType},
1315
storage_adapter::{BucketInfo, StorageAdapter},
@@ -51,8 +53,8 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
5153
)?;
5254

5355
let mut last_op: Option<i64> = None;
54-
let mut add_checksum: i32 = 0;
55-
let mut op_checksum: i32 = 0;
56+
let mut add_checksum = Checksum::zero();
57+
let mut op_checksum = Checksum::zero();
5658
let mut added_ops: i32 = 0;
5759

5860
for data in &line.data {
@@ -77,9 +79,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
7779

7880
while supersede_statement.step()? == ResultCode::ROW {
7981
// Superseded (deleted) a previous operation, add the checksum
80-
let supersede_checksum = supersede_statement.column_int(1);
81-
add_checksum = add_checksum.wrapping_add(supersede_checksum);
82-
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
82+
let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1));
83+
add_checksum += supersede_checksum;
84+
op_checksum += supersede_checksum;
8385

8486
// Superseded an operation, only skip if the bucket was empty
8587
// Previously this checked "superseded_op <= last_applied_op".
@@ -95,7 +97,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
9597
if data.op == OpType::REMOVE {
9698
let should_skip_remove = !superseded;
9799

98-
add_checksum = add_checksum.wrapping_add(checksum);
100+
add_checksum += checksum;
99101

100102
if !should_skip_remove {
101103
if let (Some(object_type), Some(object_id)) =
@@ -144,13 +146,13 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
144146
None => insert_statement.bind_null(6)?,
145147
};
146148

147-
insert_statement.bind_int(7, checksum)?;
149+
insert_statement.bind_int(7, checksum.bitcast_i32())?;
148150
insert_statement.exec()?;
149151

150-
op_checksum = op_checksum.wrapping_add(checksum);
152+
op_checksum += checksum;
151153
}
152154
OpType::MOVE => {
153-
add_checksum = add_checksum.wrapping_add(checksum);
155+
add_checksum += checksum;
154156
}
155157
OpType::CLEAR => {
156158
// Any remaining PUT operations should get an implicit REMOVE
@@ -179,12 +181,12 @@ WHERE bucket = ?1",
179181
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
180182
)?;
181183
clear_statement2.bind_int64(2, bucket_id)?;
182-
clear_statement2.bind_int(1, checksum)?;
184+
clear_statement2.bind_int(1, checksum.bitcast_i32())?;
183185
clear_statement2.exec()?;
184186

185-
add_checksum = 0;
187+
add_checksum = Checksum::zero();
186188
is_empty = true;
187-
op_checksum = 0;
189+
op_checksum = Checksum::zero();
188190
}
189191
}
190192
}
@@ -201,8 +203,8 @@ WHERE bucket = ?1",
201203
)?;
202204
statement.bind_int64(1, bucket_id)?;
203205
statement.bind_int64(2, last_op)?;
204-
statement.bind_int(3, add_checksum)?;
205-
statement.bind_int(4, op_checksum)?;
206+
statement.bind_int(3, add_checksum.bitcast_i32())?;
207+
statement.bind_int(4, op_checksum.bitcast_i32())?;
206208
statement.bind_int(5, added_ops)?;
207209

208210
statement.exec()?;

0 commit comments

Comments
 (0)