Skip to content

Commit

Permalink
Improve test helper classes (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Feb 22, 2025
1 parent d10e616 commit ea52f61
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest

@Inject
IcebergChangeConsumer consumer;
@Inject
TestChangeEventFactory eventFactory;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {

String dest = "testc.inventory.customers_upsert";
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
records.add(TestChangeEvent.of(dest, 2, "c"));
records.add(TestChangeEvent.of(dest, 3, "c"));
records.add(eventFactory.of(dest, 1, "c"));
records.add(eventFactory.of(dest, 2, "c"));
records.add(eventFactory.of(dest, 3, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -57,10 +59,10 @@ public void testSimpleUpsert() throws Exception {

// 3 records should be updated 4th one should be inserted
records.clear();
records.add(TestChangeEvent.of(dest, 1, "r"));
records.add(TestChangeEvent.of(dest, 2, "d"));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV1"));
records.add(TestChangeEvent.of(dest, 4, "c"));
records.add(eventFactory.of(dest, 1, "r"));
records.add(eventFactory.of(dest, 2, "d"));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV1"));
records.add(eventFactory.of(dest, 4, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -74,17 +76,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "d", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "d", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
records.add(eventFactory.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(eventFactory.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(eventFactory.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(eventFactory.of(dest, 4, "d", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(eventFactory.of(dest, 5, "d", TEST_EPOCH_MS + 7L));
records.add(eventFactory.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(eventFactory.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(eventFactory.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(eventFactory.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -97,10 +99,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -109,10 +111,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "d", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
records.add(eventFactory.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 7, "d", TEST_EPOCH_MS + 2L));
records.add(eventFactory.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(eventFactory.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -125,10 +127,10 @@ public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(eventFactory.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(eventFactory.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -137,10 +139,10 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user1", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "d", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(eventFactory.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(eventFactory.ofCompositeKey(dest, 1, "d", "user1", TEST_EPOCH_MS + 3L));
records.add(eventFactory.ofCompositeKey(dest, 1, "d", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@ public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
@Inject
TestChangeEventFactory eventFactory;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
public void testSimpleUpsert() throws Exception {
String dest = "testc.inventory.customers_upsert";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.of(dest, 1, "c"));
records.add(TestChangeEvent.of(dest, 2, "c"));
records.add(TestChangeEvent.of(dest, 3, "c"));
records.add(eventFactory.of(dest, 1, "c"));
records.add(eventFactory.of(dest, 2, "c"));
records.add(eventFactory.of(dest, 3, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -62,10 +64,10 @@ public void testSimpleUpsert() throws Exception {

// 3 records should be updated 4th one should be inserted
records.clear();
records.add(TestChangeEvent.of(dest, 1, "r"));
records.add(TestChangeEvent.of(dest, 2, "d"));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV1"));
records.add(TestChangeEvent.of(dest, 4, "c"));
records.add(eventFactory.of(dest, 1, "r"));
records.add(eventFactory.of(dest, 2, "d"));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV1"));
records.add(eventFactory.of(dest, 4, "c"));
consumer.handleBatch(records, TestUtil.getCommitter());

ds = getTableData("testc.inventory.customers_upsert");
Expand All @@ -79,17 +81,17 @@ public void testSimpleUpsert() throws Exception {

records.clear();
// incase of duplicate records it should only keep the latest by epoch ts
records.add(TestChangeEvent.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(TestChangeEvent.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(TestChangeEvent.of(dest, 4, "r", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(TestChangeEvent.of(dest, 5, "r", TEST_EPOCH_MS + 7L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(TestChangeEvent.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(TestChangeEvent.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(TestChangeEvent.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
records.add(eventFactory.of(dest, 3, "r", "UpdatednameV2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV3", TEST_EPOCH_MS + 2L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV4", TEST_EPOCH_MS + 3L));
records.add(eventFactory.of(dest, 4, "u", "Updatedname-4-V1", TEST_EPOCH_MS + 4L));
records.add(eventFactory.of(dest, 4, "u", "Updatedname-4-V2", TEST_EPOCH_MS + 5L));
records.add(eventFactory.of(dest, 4, "r", "Updatedname-4-V3", TEST_EPOCH_MS + 6L));
records.add(eventFactory.of(dest, 5, "r", TEST_EPOCH_MS + 7L));
records.add(eventFactory.of(dest, 6, "r", TEST_EPOCH_MS + 8L));
records.add(eventFactory.of(dest, 6, "r", TEST_EPOCH_MS + 9L));
records.add(eventFactory.of(dest, 6, "u", TEST_EPOCH_MS + 10L));
records.add(eventFactory.of(dest, 6, "u", "Updatedname-6-V1", TEST_EPOCH_MS + 11L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.sort("id").show(false);
Expand All @@ -102,10 +104,10 @@ public void testSimpleUpsert() throws Exception {
// in case of duplicate records including epoch ts, its should keep latest one based on operation priority
// ("c", 1, "r", 2, "u", 3, "d", 4);
records.clear();
records.add(TestChangeEvent.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "d", "UpdatednameV5", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 3, "u", "UpdatednameV6", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 6, "c", "Updatedname-6-V2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 6, "r", "Updatedname-6-V3", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -114,10 +116,10 @@ public void testSimpleUpsert() throws Exception {

// if its not standard insert followed by update! should keep latest one
records.clear();
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.of(dest, 7, "u", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(TestChangeEvent.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
records.add(eventFactory.of(dest, 7, "u", TEST_EPOCH_MS + 1L));
records.add(eventFactory.of(dest, 7, "u", TEST_EPOCH_MS + 2L));
records.add(eventFactory.of(dest, 7, "r", TEST_EPOCH_MS + 3L));
records.add(eventFactory.of(dest, 7, "u", "Updatedname-7-V1", TEST_EPOCH_MS + 4L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert");
ds.show();
Expand All @@ -130,10 +132,10 @@ public void testSimpleUpsertCompositeKey() throws Exception {
String dest = "testc.inventory.customers_upsert_compositekey";
// test simple inserts
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(eventFactory.ofCompositeKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(eventFactory.ofCompositeKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());

Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_compositekey");
Expand All @@ -142,7 +144,7 @@ public void testSimpleUpsertCompositeKey() throws Exception {
Assertions.assertEquals(ds.where("id = 1").count(), 2);

records.clear();
records.add(TestChangeEvent.ofCompositeKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofCompositeKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_compositekey");
ds.show();
Expand All @@ -155,19 +157,19 @@ public void testSimpleUpsertNoKey() throws Exception {
String dest = "testc.inventory.customers_upsert_nokey";
// when there is no PK it should fall back to append mode
List<io.debezium.engine.ChangeEvent<Object, Object>> records = new ArrayList<>();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
records.add(eventFactory.ofNoKey(dest, 1, "c", "user1", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofNoKey(dest, 1, "u", "user1", TEST_EPOCH_MS + 2L));
consumer.handleBatch(records, TestUtil.getCommitter());
Dataset<Row> ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Assertions.assertEquals(ds.count(), 3);
Assertions.assertEquals(ds.where("id = 1").count(), 3);

records.clear();
records.add(TestChangeEvent.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
records.add(TestChangeEvent.ofNoKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
records.add(eventFactory.ofNoKey(dest, 1, "c", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofNoKey(dest, 1, "u", "user2", TEST_EPOCH_MS + 1L));
records.add(eventFactory.ofNoKey(dest, 1, "r", "user1", TEST_EPOCH_MS + 3L));
consumer.handleBatch(records, TestUtil.getCommitter());
ds = getTableData("testc.inventory.customers_upsert_nokey");
ds.show();
Expand Down
Loading

0 comments on commit ea52f61

Please sign in to comment.