Skip to content

Commit

Permalink
Unify test injections (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Feb 25, 2025
1 parent dae06db commit e4fe838
Show file tree
Hide file tree
Showing 24 changed files with 96 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
*
*/

package io.debezium.server.iceberg.testresources;
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.TestConfigSource;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.debezium.server.iceberg.testresources.TestUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -18,11 +19,9 @@
import org.apache.spark.sql.catalog.Database;
import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.types.StructField;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;

import java.time.Duration;
import java.util.Map;

import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_CATALOG_TABLE_NAMESPACE;
Expand All @@ -41,9 +40,8 @@ public class BaseSparkTest extends BaseTest {
protected static SparkSession spark;

@BeforeAll
static void setup() {
Awaitility.setDefaultTimeout(Duration.ofMinutes(3));
Awaitility.setDefaultPollInterval(Duration.ofSeconds(10));
public static void setupSpark() {
LOGGER.debug("Setup Spark Test");
sparkconf
.set("spark.ui.enabled", "false")
.set("spark.eventLog.enabled", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,67 @@
*
*/

package io.debezium.server.iceberg.testresources;

import io.debezium.server.iceberg.IcebergChangeConsumer;
package io.debezium.server.iceberg;

import io.debezium.server.iceberg.tableoperator.IcebergTableOperator;
import jakarta.inject.Inject;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_CATALOG_TABLE_NAMESPACE;
import static org.mockito.Mockito.when;

/**
* Integration test that uses spark to consumer data is consumed.
*
* @author Ismail Simsek
*/
public class BaseTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class);

@Inject
public IcebergChangeConsumer consumer;
@Inject
public IcebergConfig icebergConfig;
@Inject
public DebeziumConfig debeziumConfig;
@Inject
public GlobalConfig config;
@Inject
public IcebergChangeEventBuilder eventBuilder;
@Inject
public TestChangeEventFactory eventFactory;
@Inject
public IcebergTableOperator icebergTableOperator;
@ConfigProperty(name = "debezium.sink.type")
public String sinkType;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
public String namespace;

@BeforeAll
static void setup() {
static void setupBase() {
LOGGER.debug("Setup Base Test");
Awaitility.setDefaultTimeout(Duration.ofMinutes(3));
Awaitility.setDefaultPollInterval(Duration.ofSeconds(6));
}

@BeforeEach
void setupBaseBeforeEach() {
when(consumer.config.debezium()).thenReturn(debeziumConfig);
when(consumer.config.iceberg()).thenReturn(icebergConfig);
}

public CloseableIterable<Record> getTableDataV2(String table) throws InterruptedException {
return getTableDataV2(ICEBERG_CATALOG_TABLE_NAMESPACE, table);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.debezium.server.iceberg;

import io.smallrye.config.SmallRyeConfig;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.mockito.Mockito;

/**
* This class provides a mocked instance of DebeziumConfig for testing purposes,
* allowing selective overriding of configuration values while preserving the original
* configuration.
*/
public class DebeziumConfigProducer {
@Inject
Config config;

@Produces
@ApplicationScoped
@io.quarkus.test.Mock
DebeziumConfig appConfig() {
DebeziumConfig appConfig = config.unwrap(SmallRyeConfig.class).getConfigMapping(DebeziumConfig.class);
DebeziumConfig appConfigSpy = Mockito.spy(appConfig);
return appConfigSpy;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -16,24 +15,21 @@

@QuarkusTest
@TestProfile(GlobalConfigTest.TestProfile.class)
public class GlobalConfigTest {

@Inject
GlobalConfig conf;
public class GlobalConfigTest extends BaseTest {

@Test
void configLoadsCorrectly() {
Assertions.assertEquals(ICEBERG_CATALOG_NAME, conf.iceberg().catalogName());
Assertions.assertEquals(ICEBERG_CATALOG_NAME, config.iceberg().catalogName());
// tests are running with false
Assertions.assertEquals(false, conf.iceberg().upsert());
Assertions.assertEquals(ICEBERG_WAREHOUSE_S3A, conf.iceberg().warehouseLocation());

Assertions.assertTrue(conf.iceberg().icebergConfigs().containsKey("warehouse"));
Assertions.assertTrue(conf.iceberg().icebergConfigs().containsValue(ICEBERG_WAREHOUSE_S3A));
Assertions.assertTrue(conf.iceberg().icebergConfigs().containsKey("table-namespace"));
Assertions.assertTrue(conf.iceberg().icebergConfigs().containsKey("catalog-name"));
Assertions.assertTrue(conf.iceberg().icebergConfigs().containsValue(ICEBERG_CATALOG_NAME));
Assertions.assertEquals(Logger.Level.ERROR, conf.quarkusLogLevel());
Assertions.assertEquals(false, config.iceberg().upsert());
Assertions.assertEquals(ICEBERG_WAREHOUSE_S3A, config.iceberg().warehouseLocation());

Assertions.assertTrue(config.iceberg().icebergConfigs().containsKey("warehouse"));
Assertions.assertTrue(config.iceberg().icebergConfigs().containsValue(ICEBERG_WAREHOUSE_S3A));
Assertions.assertTrue(config.iceberg().icebergConfigs().containsKey("table-namespace"));
Assertions.assertTrue(config.iceberg().icebergConfigs().containsKey("catalog-name"));
Assertions.assertTrue(config.iceberg().icebergConfigs().containsValue(ICEBERG_CATALOG_NAME));
Assertions.assertEquals(Logger.Level.ERROR, config.quarkusLogLevel());
}

public static class TestProfile implements QuarkusTestProfile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMongoDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@

import com.google.common.collect.Lists;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.awaitility.Awaitility;
Expand All @@ -40,9 +38,6 @@
@TestProfile(IcebergChangeConsumerMysqlTest.TestProfile.class)
public class IcebergChangeConsumerMysqlTest extends BaseTest {

@Inject
GlobalConfig config;

@Test
public void testSimpleUpload() throws Exception {
assertEquals(config.debezium().temporalPrecisionMode(), TemporalPrecisionMode.CONNECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourceMysqlDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package io.debezium.server.iceberg;
import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogRest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@

import com.google.common.collect.Lists;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
Expand All @@ -27,11 +25,8 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
Expand All @@ -54,18 +49,6 @@
@TestProfile(IcebergChangeConsumerTest.TestProfile.class)
public class IcebergChangeConsumerTest extends BaseSparkTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumerTest.class);
@ConfigProperty(name = "debezium.sink.type")
String sinkType;

@Inject
IcebergChangeConsumer icebergConsumer;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
String namespace;

@Inject
IcebergConfig icebergConfig;

@Test
public void testConsumingVariousDataTypes() throws Exception {
assertEquals(sinkType, "iceberg");
Expand Down Expand Up @@ -352,20 +335,20 @@ public void testPartitionedTable() {

@Test
public void testMapDestination() {
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), icebergConsumer.mapDestination("table2"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), consumer.mapDestination("table1"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table"), consumer.mapDestination("table2"));
// test
when(consumer.config.iceberg()).thenReturn(icebergConfig);
when(icebergConfig.destinationUppercaseTableNames()).thenReturn(true);
when(icebergConfig.destinationLowercaseTableNames()).thenReturn(false);
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("table_name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), icebergConsumer.mapDestination("TABLE_NAME"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), consumer.mapDestination("table_name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), consumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "DEBEZIUMCDC_TABLE_NAME"), consumer.mapDestination("TABLE_NAME"));
when(consumer.config.iceberg()).thenReturn(icebergConfig);
when(icebergConfig.destinationUppercaseTableNames()).thenReturn(false);
when(icebergConfig.destinationLowercaseTableNames()).thenReturn(true);
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), icebergConsumer.mapDestination("TABLE_NAME"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), consumer.mapDestination("Table_Name"));
assertEquals(TableIdentifier.of(Namespace.of(namespace), "debeziumcdc_table_name"), consumer.mapDestination("TABLE_NAME"));
}

public static class TestProfile implements QuarkusTestProfile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.TestUtil;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
Expand All @@ -37,10 +35,6 @@
@TestProfile(IcebergChangeConsumerUpsertDeleteDeletesTest.TestProfile.class)
public class IcebergChangeConsumerUpsertDeleteDeletesTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
@Inject
TestChangeEventFactory eventFactory;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package io.debezium.server.iceberg;

import io.debezium.server.iceberg.testresources.BaseSparkTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
Expand All @@ -17,7 +16,6 @@
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand All @@ -42,10 +40,6 @@
@TestProfile(IcebergChangeConsumerUpsertTest.TestProfile.class)
public class IcebergChangeConsumerUpsertTest extends BaseSparkTest {

@Inject
IcebergChangeConsumer consumer;
@Inject
TestChangeEventFactory eventFactory;
final static Long TEST_EPOCH_MS = 1577840461000L;

@Test
Expand Down
Loading

0 comments on commit e4fe838

Please sign in to comment.