Skip to content

Commit af8a6af

Browse files
committed
Merge remote-tracking branch 'origin/main' into sync-progress
2 parents b338fae + e3996e7 commit af8a6af

File tree

5 files changed

+59
-7
lines changed

5 files changed

+59
-7
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## unreleased
44

5+
* Fixed `CrudBatch` `hasMore` always returning false.
6+
* Added `triggerImmediately` to `onChange` method.
57
* Report real-time progress information about downloads through `SyncStatus.downloadProgress`.
68
* Compose: Add `composeState()` extension method on `SyncStatus`.
79

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

+43-2
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,22 @@ class DatabaseTest {
187187
fun testTableChangesUpdates() =
188188
databaseTest {
189189
turbineScope {
190-
val query = database.onChange(tables = setOf("users")).testIn(this)
190+
val query =
191+
database
192+
.onChange(
193+
tables = setOf("users"),
194+
).testIn(this)
191195

192196
database.execute(
193197
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
194198
listOf("Test", "test@example.org"),
195199
)
196200

197-
val changeSet = query.awaitItem()
201+
var changeSet = query.awaitItem()
202+
// The initial result
203+
changeSet.count() shouldBe 0
204+
205+
changeSet = query.awaitItem()
198206
changeSet.count() shouldBe 1
199207
changeSet.contains("users") shouldBe true
200208

@@ -418,4 +426,37 @@ class DatabaseTest {
418426

419427
database.getNextCrudTransaction() shouldBe null
420428
}
429+
430+
@Test
431+
fun testCrudBatch() =
432+
databaseTest {
433+
database.execute(
434+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
435+
listOf("a", "a@example.org"),
436+
)
437+
438+
database.writeTransaction {
439+
it.execute(
440+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
441+
listOf("b", "b@example.org"),
442+
)
443+
it.execute(
444+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
445+
listOf("c", "c@example.org"),
446+
)
447+
}
448+
449+
// Purposely limit to less than the number of available ops
450+
var batch = database.getCrudBatch(2) ?: error("Batch should not be null")
451+
batch.hasMore shouldBe true
452+
batch.crud shouldHaveSize 2
453+
batch.complete(null)
454+
455+
batch = database.getCrudBatch(1000) ?: error("Batch should not be null")
456+
batch.crud shouldHaveSize 1
457+
batch.hasMore shouldBe false
458+
batch.complete(null)
459+
460+
database.getCrudBatch() shouldBe null
461+
}
421462
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ internal class PowerSyncDatabaseImpl(
245245
return null
246246
}
247247

248-
val entries =
248+
var entries =
249249
internalDb.getAll(
250250
"SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?",
251-
listOf(limit.toLong()),
251+
listOf(limit.toLong() + 1),
252252
) {
253253
CrudEntry.fromRow(
254254
CrudRow(
@@ -265,7 +265,7 @@ internal class PowerSyncDatabaseImpl(
265265

266266
val hasMore = entries.size > limit
267267
if (hasMore) {
268-
entries.dropLast(entries.size - limit)
268+
entries = entries.dropLast(1)
269269
}
270270

271271
return CrudBatch(entries, hasMore, complete = { writeCheckpoint ->
@@ -338,11 +338,12 @@ internal class PowerSyncDatabaseImpl(
338338
override fun onChange(
339339
tables: Set<String>,
340340
throttleMs: Long,
341+
triggerImmediately: Boolean,
341342
): Flow<Set<String>> =
342343
flow {
343344
waitReady()
344345
emitAll(
345-
internalDb.onChange(tables, throttleMs),
346+
internalDb.onChange(tables, throttleMs, triggerImmediately),
346347
)
347348
}
348349

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public interface Queries {
9797
*
9898
* @param tables The set of tables to monitor for changes.
9999
* @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle.
100+
* @param triggerImmediately If true (default), the flow will immediately emit an empty set of tables when the flow is first collected. This can be useful for ensuring that the flow emits at least once, even if no changes occur to the monitored tables.
100101
* @return A [Flow] emitting the set of modified tables.
101102
* @throws PowerSyncException If a database error occurs.
102103
* @throws CancellationException If the operation is cancelled.
@@ -105,6 +106,7 @@ public interface Queries {
105106
public fun onChange(
106107
tables: Set<String>,
107108
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
109+
triggerImmediately: Boolean = true,
108110
): Flow<Set<String>>
109111

110112
/**

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ internal class InternalDatabaseImpl(
106106
override fun onChange(
107107
tables: Set<String>,
108108
throttleMs: Long,
109+
triggerImmediately: Boolean,
109110
): Flow<Set<String>> =
110111
channelFlow {
111112
// Match all possible internal table combinations
@@ -116,7 +117,12 @@ internal class InternalDatabaseImpl(
116117
val batchedUpdates = AtomicMutableSet<String>()
117118

118119
updatesOnTables()
119-
.transform { updates ->
120+
.onSubscription {
121+
if (triggerImmediately) {
122+
// Emit an initial event (if requested). No changes would be detected at this point
123+
send(setOf())
124+
}
125+
}.transform { updates ->
120126
val intersection = updates.intersect(watchedTables)
121127
if (intersection.isNotEmpty()) {
122128
// Transform table names using friendlyTableName

0 commit comments

Comments
 (0)