diff --git a/sink-connector-lightweight/dependency-reduced-pom.xml b/sink-connector-lightweight/dependency-reduced-pom.xml
index 1fad20fce..946084de2 100644
--- a/sink-connector-lightweight/dependency-reduced-pom.xml
+++ b/sink-connector-lightweight/dependency-reduced-pom.xml
@@ -63,8 +63,6 @@
**/*Test.java
**/*IT.java
- all
- 10
listener
diff --git a/sink-connector-lightweight/docker/config.yml b/sink-connector-lightweight/docker/config.yml
index 3448cbd28..decf85ecf 100644
--- a/sink-connector-lightweight/docker/config.yml
+++ b/sink-connector-lightweight/docker/config.yml
@@ -146,3 +146,6 @@ restart.event.loop.timeout.period.secs: "3000"
# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"
+
+# Maximum number of threads in the thread pool for processing CDC records.
+#thread.pool.size: 10
\ No newline at end of file
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
index 0adf3f7fd..706c96ab1 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java
@@ -55,7 +55,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;
// Records grouped by Topic Name
- private ConcurrentLinkedQueue> records;
+ private ConcurrentLinkedQueue> records = new ConcurrentLinkedQueue<>();
private BaseDbWriter writer = null;
@@ -197,11 +197,11 @@ private ClickHouseStruct processEveryChangeRecord(Properties props, ChangeEvent<
performDDLOperation(DDL, props, sr, config);
- ThreadFactory namedThreadFactory =
- new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
- this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);
-
- this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
+ setupProcessingThread(config, new MySQLDDLParserService(config));
+// ThreadFactory namedThreadFactory =
+// new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
+// this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);
+// this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
} else {
@@ -673,12 +673,15 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) {
private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLParserService ddlParserService) {
// Setup separate thread to read messages from shared buffer.
- this.records = new ConcurrentLinkedQueue<>();
- this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
+ // this.records = new ConcurrentLinkedQueue<>();
+ //this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);
- this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
+ for(int i = 0; i < config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); i++) {
+ this.executor.scheduleAtFixedRate(new ClickHouseBatchRunnable(this.records, config, new HashMap()), 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
+ }
+ //this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
private void appendToRecords(List convertedRecords) {
diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
index de661179a..f12f75a19 100644
--- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
+++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java
@@ -3,6 +3,7 @@
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.DataTypeConverter;
import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*;
+import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
@@ -279,7 +280,7 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree
}
} // datetime(6)
else if(parsedDataType.contains("(") && parsedDataType.contains(")") &&
- (parsedDataType.contains("datetime") || parsedDataType.contains("timestamp"))){
+ (containsIgnoreCase(parsedDataType, "datetime") || containsIgnoreCase(parsedDataType, "timestamp"))){
try {
precision = Integer.parseInt(parsedDataType.substring(parsedDataType.indexOf("(") + 1, parsedDataType.indexOf(")")));
} catch(Exception e) {
@@ -471,6 +472,18 @@ public void enterAlterTable(MySqlParser.AlterTableContext alterTableContext) {
} else if (tree instanceof MySqlParser.AlterByModifyColumnContext) {
parseAlterTable(tree);
} else if (tree instanceof MySqlParser.AlterByDropColumnContext) {
+ // Drop Column.
+ this.query.append(" ");
+ for (ParseTree dropColumnTree : ((MySqlParser.AlterByDropColumnContext) (tree)).children) {
+ if (dropColumnTree instanceof MySqlParser.UidContext) {
+ for(ParseTree dropColumnChild: ((MySqlParser.UidContext) dropColumnTree).children) {
+ if(dropColumnChild instanceof MySqlParser.SimpleIdContext) {
+ this.query.append(String.format(Constants.DROP_COLUMN, dropColumnChild.getText()));
+ }
+ }
+ // this.query.append(String.format(Constants.DROP_COLUMN, ((MySqlParser.AlterByDropColumnContext) tree).uid()));
+ }
+ }
} else if (tree instanceof MySqlParser.AlterByRenameColumnContext) {
parseRenameColumn(tree);
} else if (tree instanceof MySqlParser.AlterByAddPrimaryKeyContext) {
@@ -524,15 +537,15 @@ private void parseTreeHelper(ParseTree child) {
}
}
- @Override
- public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext alterByDropColumnContext) {
- this.query.append(" ");
- for (ParseTree tree : alterByDropColumnContext.children) {
- if (tree instanceof MySqlParser.UidContext) {
- this.query.append(String.format(Constants.DROP_COLUMN, tree.getText()));
- }
- }
- }
+// @Override
+// public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext alterByDropColumnContext) {
+// this.query.append(" ");
+// for (ParseTree tree : alterByDropColumnContext.children) {
+// if (tree instanceof MySqlParser.UidContext) {
+// this.query.append(String.format(Constants.DROP_COLUMN, tree.getText()));
+// }
+// }
+// }
@Override
public void enterDropTable(MySqlParser.DropTableContext dropTableContext) {
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
index 9a4e00917..ccb6406e8 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ClickHouseDebeziumEmbeddedPostgresPgoutputDockerIT.java
@@ -8,7 +8,6 @@
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java
new file mode 100644
index 000000000..3bddb86c1
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/BatchRetryOnFailureIT.java
@@ -0,0 +1,132 @@
+package com.altinity.clickhouse.debezium.embedded.cdc;
+
+import com.altinity.clickhouse.debezium.embedded.AppInjector;
+import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
+import com.altinity.clickhouse.debezium.embedded.ITCommon;
+import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
+import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
+import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.classic.methods.HttpGet;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.classic.methods.HttpUriRequest;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.HttpEntity;
+import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.io.entity.EntityUtils;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;
+
+@Testcontainers
+@DisplayName("Test that validates if the sink connector retries batches on ClickHouse failure")
+public class BatchRetryOnFailureIT {
+ protected MySQLContainer mySqlContainer;
+
+ @Container
+ public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_schema_only_column_timezone.sql")
+ // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withExposedPorts(8123);
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("datetime.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ clickHouseContainer.start();
+ Thread.sleep(15000);
+ }
+
+ @Test
+ public void testBatchRetryOnCHFailure() throws Exception {
+
+ Injector injector = Guice.createInjector(new AppInjector());
+
+ Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
+ // Override clickhouse server timezone.
+ ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
+ injector.getInstance(DDLParserService.class), props, false);
+ DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
+ , new Properties());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ });
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+ conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
+
+ Thread.sleep(40000);//
+ Thread.sleep(10000);
+
+
+ // Pause clickhouse container to simulate a batch failure.
+ clickHouseContainer.getDockerClient().pauseContainerCmd(clickHouseContainer.getContainerId()).exec();
+ conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT55','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
+
+ Thread.sleep(50000);
+
+ clickHouseContainer.getDockerClient().unpauseContainerCmd(clickHouseContainer.getContainerId()).exec();
+ Thread.sleep(10000);
+
+
+ // Check if Batch was inserted.
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees");
+ ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
+ clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
+
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME where Type = 'DATETIME-INSERT55'");
+ boolean insertCheck = false;
+ while(dateTimeResult.next()) {
+ insertCheck = true;
+ Assert.assertTrue(dateTimeResult.getString("Type").equalsIgnoreCase("DATETIME-INSERT55"));
+ }
+ Assert.assertTrue(insertCheck);
+
+
+ // Close connection.
+ clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
+ }
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java
new file mode 100644
index 000000000..58d1e02f6
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java
@@ -0,0 +1,127 @@
+package com.altinity.clickhouse.debezium.embedded.cdc;
+
+import com.altinity.clickhouse.debezium.embedded.AppInjector;
+import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
+import com.altinity.clickhouse.debezium.embedded.ITCommon;
+import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
+import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
+import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
+import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
+import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
+import com.clickhouse.jdbc.ClickHouseConnection;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;
+import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumPropertiesForSchemaOnly;
+
+@Testcontainers
+@Disabled
+@DisplayName("Test that validates if the sink connector does not have issues connecting if ClickHouse is slow to start")
+public class ClickHouseDelayedStartIT {
+ protected MySQLContainer mySqlContainer;
+
+ @Container
+ public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
+ .asCompatibleSubstituteFor("clickhouse"))
+ .withInitScript("init_clickhouse_schema_only_column_timezone.sql")
+ // .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
+ .withUsername("ch_user")
+ .withPassword("password")
+ .withExposedPorts(8123);
+ @BeforeEach
+ public void startContainers() throws InterruptedException {
+ mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
+ .asCompatibleSubstituteFor("mysql"))
+ .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
+ .withInitScript("datetime.sql")
+ .withExtraHost("mysql-server", "0.0.0.0")
+ .waitingFor(new HttpWaitStrategy().forPort(3306));
+
+ BasicConfigurator.configure();
+ mySqlContainer.start();
+ clickHouseContainer.start();
+ Thread.sleep(15000);
+ }
+
+ @Test
+ public void testClickHouseDelayedStart() throws Exception {
+ clickHouseContainer.getDockerClient().pauseContainerCmd(clickHouseContainer.getContainerId()).exec();
+
+ Injector injector = Guice.createInjector(new AppInjector());
+
+ Properties props = getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer);
+ // Override clickhouse server timezone.
+ ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(() -> {
+ try {
+ clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
+ injector.getInstance(DDLParserService.class), props, false);
+ DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
+ , new Properties());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ });
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
+ conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
+ conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT55','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
+ Thread.sleep(35000);
+ // Give time for Sink connector to catch up.
+ Thread.sleep(15000);
+ clickHouseContainer.getDockerClient().unpauseContainerCmd(clickHouseContainer.getContainerId()).exec();
+ boolean insertCheck = false;
+
+ while (true) {
+ // Check if Batch was inserted.
+ String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees");
+ ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
+ clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
+
+ BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
+ "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
+ try {
+ ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME where Type = 'DATETIME-INSERT55'");
+ while (dateTimeResult.next()) {
+ insertCheck = true;
+ Assert.assertTrue(dateTimeResult.getString("Type").equalsIgnoreCase("DATETIME-INSERT55"));
+ break;
+ }
+ } catch (Exception e) {
+ Thread.sleep(10000);
+ }
+ if (insertCheck) {
+ break;
+ }
+ }
+ Assert.assertTrue(clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().numRetries > 0);
+ Assert.assertTrue(insertCheck);
+
+ // Close connection.
+ clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
+ }
+
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
index 2ccb89b58..6cddf159d 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/client/SinkConnectorClientRestAPITest.java
@@ -2,14 +2,10 @@
import com.altinity.clickhouse.debezium.embedded.AppInjector;
import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
+import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
-import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
-import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
-import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
-import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
-import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
@@ -22,8 +18,6 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.log4j.BasicConfigurator;
-import org.json.simple.JSONArray;
-import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -36,16 +30,12 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
-import java.net.http.HttpResponse;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
@Testcontainers
@DisplayName("Test that validates the REST API calls used by sink connector client")
@@ -76,67 +66,21 @@ public void startContainers() throws InterruptedException {
Thread.sleep(15000);
}
-
- protected Properties getDebeziumProperties() throws Exception {
-
- // Start the debezium embedded application.
-
- Properties defaultProps = new Properties();
- Properties defaultProperties = PropertiesHelper.getProperties("config.properties");
-
- defaultProps.putAll(defaultProperties);
- Properties fileProps = new ConfigLoader().load("config.yml");
- defaultProps.putAll(fileProps);
-
- // **** OVERRIDE set to schema only
- defaultProps.setProperty("snapshot.mode", "schema_only");
- defaultProps.setProperty("disable.drop.truncate", "true");
-
- defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
- defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
- defaultProps.setProperty("database.user", "root");
- defaultProps.setProperty("database.password", "adminpass");
-
- defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
- defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
- defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
- defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
- defaultProps.setProperty("clickhouse.server.database", "employees");
-
- defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
- clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
-
- defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
- clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
-
- defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
- clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
-
- defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
- clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
-
-
- return defaultProps;
-
- }
@Test
@Disabled
public void testRestClient() throws Exception {
- AtomicReference engine = new AtomicReference<>();
-
Injector injector = Guice.createInjector(new AppInjector());
- Properties props = getDebeziumProperties();
+ Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
props.setProperty("database.include.list", "datatypes");
props.setProperty("clickhouse.server.database", "datatypes");
// Override clickhouse server timezone.
+ ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
- ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
-
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
@@ -150,11 +94,7 @@ public void testRestClient() throws Exception {
Thread.sleep(40000);//
- Connection conn = connectToMySQL();
- // alter table ship_class change column class_name class_name_new int;
- // alter table ship_class change column tonange tonange_new decimal(10,10);
-
- //conn.prepareStatement("insert into dt values('2008-01-01 00:00:01', 'this is a test', 11, 1)").execute();
+ Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();
@@ -207,29 +147,7 @@ public void testRestClient() throws Exception {
Assert.fail("There should be a respond body.");
}
- // Validate the stop call.
- if(engine.get() != null) {
- engine.get().stop();
- }
- // Files.deleteIfExists(tmpFilePath);
- executorService.shutdown();
-
-
- }
- Connection connectToMySQL() {
- Connection conn = null;
- try {
-
- String connectionUrl = String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s", mySqlContainer.getHost(), mySqlContainer.getFirstMappedPort(),
- mySqlContainer.getDatabaseName(), mySqlContainer.getUsername(), mySqlContainer.getPassword());
- conn = DriverManager.getConnection(connectionUrl);
-
-
- } catch (SQLException ex) {
- // handle any errors
-
- }
+ clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
- return conn;
}
}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/db/batch/BatchMultipleThreadsProcessingTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/db/batch/BatchMultipleThreadsProcessingTest.java
new file mode 100644
index 000000000..22f943402
--- /dev/null
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/db/batch/BatchMultipleThreadsProcessingTest.java
@@ -0,0 +1,50 @@
+package com.altinity.clickhouse.debezium.embedded.db.batch;
+
+import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
+import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
+import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+
+public class BatchMultipleThreadsProcessingTest {
+
+
+ @Test
+ @DisplayName("Integration test to test the batch multiple threads processing.")
+ public void testBatchMultipleThreadsProcessing() {
+ // Create the concurrentLinkedQueue of ClickHouseStruct
+ ClickHouseStruct ch1 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null,
+ ClickHouseConverter.CDC_OPERATION.CREATE);
+ // Add the ClickHouseStruct to the concurrentLinkedQueue
+
+ // Create the ClickHouseBatchRunnable object
+ //ClickHouseBatchRunnable clickHouseBatchRunnable = new ClickHouseBatchRunnable();
+ // Set the concurrentLinkedQueue to the ClickHouseBatchRunnable object
+
+ }
+
+ public static Struct getKafkaStruct() {
+ Schema kafkaConnectSchema = SchemaBuilder
+ .struct()
+ .field("first_name", Schema.STRING_SCHEMA)
+ .field("last_name", Schema.STRING_SCHEMA)
+ .field("quantity", Schema.INT32_SCHEMA)
+ .field("amount", Schema.FLOAT64_SCHEMA)
+ .field("employed", Schema.BOOLEAN_SCHEMA)
+ .build();
+
+ Struct kafkaConnectStruct = new Struct(kafkaConnectSchema);
+ kafkaConnectStruct.put("first_name", "John");
+ kafkaConnectStruct.put("last_name", "Doe");
+ kafkaConnectStruct.put("quantity", 100);
+ kafkaConnectStruct.put("amount", 23.223);
+ kafkaConnectStruct.put("employed", true);
+
+
+ return kafkaConnectStruct;
+ }
+}
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
index 4ed62f0d0..17875ce43 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DateTimeWithTimeZoneSchemaOnlyIT.java
@@ -31,7 +31,7 @@
import java.util.concurrent.atomic.AtomicReference;
@Testcontainers
-@DisplayName("Integration Test that validates replication of DateTime columns when the clickhouse schema has columns with timezone.")
+@DisplayName("Integration Test that validates replication of DateTime columns when the clickhouse schema has columns with timezone. Test to cover when MySQL has 2 digit year")
public class DateTimeWithTimeZoneSchemaOnlyIT {
protected MySQLContainer mySqlContainer;
@@ -92,7 +92,9 @@ public void testCreateTable() throws Exception {
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME4` VALUES ('DATETIME(4)-INSERT','1000-01-01 00:00:00.0000','2022-09-29 01:50:12.1234','9999-12-31 23:59:59.9999',NULL)").execute();
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME5` VALUES ('DATETIME(5)-INSERT','1000-01-01 00:00:00.00000','2022-09-29 01:50:28.12345','9999-12-31 23:59:59.99999',NULL)").execute();
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME6` VALUES ('DATETIME(6)-INSERT','1000-01-01 00:00:00.000000','2022-09-29 01:50:56.123456','9999-12-31 23:59:59.999999',NULL)").execute();
- //conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59',NULL);\n").execute();
+ conn.prepareStatement("INSERT INTO `temporal_types_DATETIME6` VALUES ('DATETIME(7)-INSERT','0099-01-01 00:00:00.000000','2022-09-29 01:50:56.123456','9999-12-31 23:59:59.999999',NULL)").execute();
+
+ //conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59',NULL);\n").execute();
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
ClickHouseConnection conn1 = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
@@ -209,7 +211,7 @@ public void testCreateTable() throws Exception {
}
// DATETIME6
- ResultSet dateTimeResult6 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME6");
+ ResultSet dateTimeResult6 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME6 where Type = 'DATETIME(6)-INSERT'");
while(dateTimeResult6.next()) {
System.out.println("DATE TIME 6");
@@ -220,7 +222,22 @@ public void testCreateTable() throws Exception {
Assert.assertTrue(dateTimeResult6.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-28 20:50:56.123456"));
Assert.assertTrue(dateTimeResult6.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2299-12-31 17:59:59.999999"));
Assert.assertTrue(dateTimeResult6.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1900-01-01 18:00:00.0"));
+ break;
+ }
+
+ // DATETIME6 with 2 digit year
+ ResultSet dateTimeResult7 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME6 where Type = 'DATETIME(7)-INSERT'");
+ while(dateTimeResult7.next()) {
+ System.out.println("DATE TIME 7");
+
+ System.out.println(dateTimeResult7.getTimestamp("Mid_Value").toString());
+ System.out.println(dateTimeResult7.getTimestamp("Maximum_Value").toString());
+ System.out.println(dateTimeResult7.getTimestamp("Minimum_Value").toString());
+ Assert.assertTrue(dateTimeResult7.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-28 20:50:56.123456"));
+ Assert.assertTrue(dateTimeResult7.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2299-12-31 17:59:59.999999"));
+ Assert.assertTrue(dateTimeResult7.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1900-01-01 18:00:00.0"));
+ break;
}
if(engine.get() != null) {
@@ -272,6 +289,7 @@ protected Properties getDebeziumProperties() throws Exception {
defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
+ defaultProps.setProperty("enable.time.adjuster", "false");
return defaultProps;
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
index a103d27fb..e8cd71e93 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java
@@ -122,7 +122,6 @@ public void testDateTimeColumns() {
StringBuffer clickHouseQuery = new StringBuffer();
mySQLDDLParserService.parseSql(createQuery6, "Persons", clickHouseQuery);
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(6, 0) NOT NULL ,`Mid_Value` DateTime64(6, 0) NOT NULL ,`Maximum_Value` DateTime64(6, 0) NOT NULL ,`Null_Value` Nullable(DateTime64(6, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
- log.info("Create table " + clickHouseQuery);
String createQuery1 = "CREATE TABLE `temporal_types_DATETIME4` (\n" +
" `Type` varchar(50) NOT NULL,\n" +
@@ -134,8 +133,7 @@ public void testDateTimeColumns() {
") ENGINE=InnoDB DEFAULT CHARSET=latin1;";
StringBuffer clickHouseQuery1 = new StringBuffer();
mySQLDDLParserService.parseSql(createQuery1, "Persons", clickHouseQuery1);
- //Assert.assertTrue(clickHouseQuery1.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(1, 0) NOT NULL ,`Mid_Value` DateTime64(1, 0) NOT NULL ,`Maximum_Value` DateTime64(1, 0) NOT NULL ,`Null_Value` Nullable(DateTime64(1, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
- log.info("Create table " + clickHouseQuery1);
+ Assert.assertTrue(clickHouseQuery1.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(1, 0) NOT NULL ,`Mid_Value` DateTime64(1, 0) NOT NULL ,`Maximum_Value` DateTime64(1, 0) NOT NULL ,`Null_Value` Nullable(DateTime64(1, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
String createQuery2 = "CREATE TABLE `temporal_types_DATETIME4` (\n" +
" `Type` varchar(50) NOT NULL,\n" +
@@ -147,8 +145,26 @@ public void testDateTimeColumns() {
") ENGINE=InnoDB DEFAULT CHARSET=latin1;";
StringBuffer clickHouseQuery2 = new StringBuffer();
mySQLDDLParserService.parseSql(createQuery2, "Persons", clickHouseQuery2);
- //Assert.assertTrue(clickHouseQuery1.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(2, 0) NOT NULL ,`Mid_Value` DateTime64(2, 0) NOT NULL ,`Maximum_Value` DateTime64(2, 0) NOT NULL ,`Null_Value` Nullable(DateTime64(2, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
- log.info("Create table " + clickHouseQuery2);
+ Assert.assertTrue(clickHouseQuery2.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(2, 0) NOT NULL ,`Mid_Value` DateTime64(2, 0) NOT NULL ,`Maximum_Value` DateTime64(2, 0) NOT NULL ,`Null_Value` Nullable(DateTime64(2, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
+
+
+ }
+
+ @Test
+ @DisplayName("Test DateTime precision/scale conversion for tables with Primary Key")
+ public void testDateTimeColumnsWithPrimaryKey() {
+
+ // DateTime(3) with Primary Key.
+ String createQuery3 = "CREATE TABLE table_1 (id INT NOT NULL PRIMARY KEY, data DATETIME(3))";
+ StringBuffer clickHouseQuery3 = new StringBuffer();
+ mySQLDDLParserService.parseSql(createQuery3, "Persons", clickHouseQuery3);
+ Assert.assertTrue(clickHouseQuery3.toString().equalsIgnoreCase("CREATE TABLE table_1(id Int32 NOT NULL ,data Nullable(DateTime64(3, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY id"));
+
+ // DateTime(4) with Primary Key
+ String createQuery4 = "CREATE TABLE table_1 (id INT NOT NULL PRIMARY KEY, data DATETIME(4))";
+ StringBuffer clickHouseQuery4 = new StringBuffer();
+ mySQLDDLParserService.parseSql(createQuery4, "Persons", clickHouseQuery4);
+ Assert.assertTrue(clickHouseQuery4.toString().equalsIgnoreCase("CREATE TABLE table_1(id Int32 NOT NULL ,data Nullable(DateTime64(4, 0)),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY id"));
}
@Test
@@ -172,6 +188,27 @@ public void testAutoCreateTableWithCHTimezone() {
log.info("Create table " + clickHouseQuery);
}
+ @Test
+ @DisplayName("Auto create table with user provided clickhouse timezone and uppercase datetime columns")
+ public void testAutoCreateTableWithCHTimezoneUpperCaseDateTime() {
+ String createQuery6 = "CREATE TABLE `temporal_types_DATETIME4` (\n" +
+ " `Type` varchar(50) NOT NULL,\n" +
+ " `Minimum_Value` DATETIME(1) NOT NULL,\n" +
+ " `Mid_Value` DATETIME(2) NOT NULL,\n" +
+ " `Maximum_Value` DATETIME(3) NOT NULL,\n" +
+ " `Null_Value` DATETIME(4) DEFAULT NULL,\n" +
+ " PRIMARY KEY (`Type`)\n" +
+ ") ENGINE=InnoDB DEFAULT CHARSET=latin1;";
+ StringBuffer clickHouseQuery = new StringBuffer();
+ HashMap props = new HashMap<>();
+ props.put(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATETIME_TIMEZONE.toString(), "UTC");
+
+ MySQLDDLParserService mySQLDDLParserService1 = new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(props));
+ mySQLDDLParserService1.parseSql(createQuery6, "Persons", clickHouseQuery);
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("CREATE TABLE `temporal_types_DATETIME4`(`Type` String NOT NULL ,`Minimum_Value` DateTime64(1,'UTC') NOT NULL ,`Mid_Value` DateTime64(2,'UTC') NOT NULL ,`Maximum_Value` DateTime64(3,'UTC') NOT NULL ,`Null_Value` Nullable(DateTime64(4,'UTC')),`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY (`Type`)"));
+ log.info("Create table " + clickHouseQuery);
+ }
+
@Test
public void testCreateTableAutoIncrement() {
StringBuffer clickHouseQuery = new StringBuffer();
@@ -567,6 +604,14 @@ public void testDropColumn() {
mySQLDDLParserService.parseSql(sql, "", clickHouseQuery);
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql));
+
+ String multipleDropColumnsSql = "ALTER TABLE fffe3e80f_d197_11ee_836a_19710b02e0b5 DROP COLUMN new_col1, DROP COLUMN new_col2, DROP COLUMN new_col3";
+
+ StringBuffer multipleDropColumnCHQuery = new StringBuffer();
+ mySQLDDLParserService.parseSql(multipleDropColumnsSql, "", multipleDropColumnCHQuery);
+
+ Assert.assertTrue(multipleDropColumnCHQuery.toString().equalsIgnoreCase(multipleDropColumnsSql));
+
}
@Test
@@ -647,6 +692,24 @@ public void checkIfTimestampDataTypePrecisionIsMaintained(String sql, String exp
}
+ @ParameterizedTest
+ @CsvSource(
+ value = {"CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` TIMESTAMP(1) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP1(`Mid_Value` DateTime64(1, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
+ "CREATE TABLE temporal_types_TIMESTAMP2(`Mid_Value` TIMESTAMP(2) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP2(`Mid_Value` DateTime64(2, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
+ "CREATE TABLE temporal_types_TIMESTAMP3(`Mid_Value` TIMESTAMP(3) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP3(`Mid_Value` DateTime64(3, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
+ "CREATE TABLE temporal_types_TIMESTAMP4(`Mid_Value` TIMESTAMP(4) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP4(`Mid_Value` DateTime64(4, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
+ "CREATE TABLE temporal_types_TIMESTAMP5(`Mid_Value` TIMESTAMP(5) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP5(`Mid_Value` DateTime64(5, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()",
+ "CREATE TABLE temporal_types_TIMESTAMP6(`Mid_Value` TIMESTAMP(6) NOT NULL) ENGINE=InnoDB;: CREATE TABLE temporal_types_TIMESTAMP6(`Mid_Value` DateTime64(6, 0) NOT NULL ,`_version` UInt64,`is_deleted` UInt8) Engine=ReplacingMergeTree(_version,is_deleted) ORDER BY tuple()"}
+ ,delimiter = ':')
+ @DisplayName("Test to validate if the timestamp data type precision(uppercase timestamp is maintained from MySQL to ClickHouse")
+ public void checkIfTimestampDataTypeUpperCasePrecisionIsMaintained(String sql, String expectedResult) {
+ StringBuffer clickHouseQuery = new StringBuffer();
+
+ AtomicBoolean isDropOrTruncate = new AtomicBoolean();
+ mySQLDDLParserService.parseSql(sql, "", clickHouseQuery, isDropOrTruncate);
+ Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(expectedResult));
+
+ }
@Test
public void testAlterDatabaseAddColumnEnum() {
String clickhouseExpectedQuery = "ALTER TABLE employees ADD COLUMN gender String";
diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
index df6579f5a..23174ba3a 100644
--- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
+++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/TableOperationsIT.java
@@ -80,7 +80,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
});
- Thread.sleep(10000);
+ Thread.sleep(20000);
Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("RENAME TABLE ship_class to ship_class_new, add_test to add_test_new").execute();
diff --git a/sink-connector/python/Dockerfile b/sink-connector/python/Dockerfile
deleted file mode 100644
index 05f936f72..000000000
--- a/sink-connector/python/Dockerfile
+++ /dev/null
@@ -1,4 +0,0 @@
-FROM clickhouse/clickhouse-server
-
-COPY db_load/* /
-RUN tar xvfz mysql-shell-8.0.32-linux-glibc2.12-x86-64bit.tar.gz
diff --git a/sink-connector/python/Dockerfile_clickhouse_checksum b/sink-connector/python/Dockerfile_clickhouse_checksum
new file mode 100644
index 000000000..060d19c35
--- /dev/null
+++ b/sink-connector/python/Dockerfile_clickhouse_checksum
@@ -0,0 +1,22 @@
+# Create a dockerfile of db db_compare folders.
+# docker run -e MYSQL_HOST=mysql_host_value -e MYSQL_DATABASE=mysql_database_value -e MYSQL_USER=mysql_user_value -e MYSQL_PASSWORD=mysql_password_value -it --rm db_compare:latest
+FROM python:3.10
+WORKDIR /app
+
+ENV CLICKHOUSE_HOST clickhouse_host_value
+ENV CLICKHOUSE_DATABASE clickhouse_database_value
+ENV CLICKHOUSE_USER clickhouse_user_value
+ENV CLICKHOUSE_PASSWORD clickhouse_password_value
+
+COPY db db
+COPY db_compare db_compare
+
+COPY requirements.txt requirements.txt
+RUN pip install -r requirements.txt
+
+RUN cd /app/db && export PYTHONPATH=.
+RUN cd ..
+ENV PYTHONPATH "${PYTHONPATH}:/app/db"
+ENTRYPOINT ["python3.10", "/app/db_compare/clickhouse_table_checksum.py","--clickhouse_host", "$CLICKHOUSE_HOST", "--clickhouse_database", "$CLICKHOUSE_DATABASE", "--clickhouse_user", "$CLICKHOUSE_USER", "--clickhouse_password", "$CLICKHOUSE_PASSWORD", "--tables_regex", "."]
+
+
diff --git a/sink-connector/python/Dockerfile_db_load b/sink-connector/python/Dockerfile_db_load
new file mode 100644
index 000000000..e5a340957
--- /dev/null
+++ b/sink-connector/python/Dockerfile_db_load
@@ -0,0 +1,19 @@
+# Use the official MySQL image as the base image
+FROM mysql:latest
+
+# Install ClickHouse client
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends apt-transport-https ca-certificates && \
+ apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 && \
+ echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | tee /etc/apt/sources.list.d/clickhouse.list && \
+ apt-get update && \
+ apt-get install -y clickhouse-client && \
+ rm -rf /var/lib/apt/lists/*
+
+# Install MySQL shell
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends mysql-shell && \
+ rm -rf /var/lib/apt/lists/*
+
+# Set the entry point to start MySQL shell by default
+ENTRYPOINT ["mysqlsh"]
\ No newline at end of file
diff --git a/sink-connector/python/Dockerfile_mysql_checksum b/sink-connector/python/Dockerfile_mysql_checksum
new file mode 100644
index 000000000..da3f52438
--- /dev/null
+++ b/sink-connector/python/Dockerfile_mysql_checksum
@@ -0,0 +1,22 @@
+# Create a dockerfile of db db_compare folders.
+# docker run -e MYSQL_HOST=mysql_host_value -e MYSQL_DATABASE=mysql_database_value -e MYSQL_USER=mysql_user_value -e MYSQL_PASSWORD=mysql_password_value -it --rm db_compare:latest
+FROM python:3.10
+WORKDIR /app
+
+ENV MYSQL_HOST mysql_host_value
+ENV MYSQL_DATABASE mysql_database_value
+ENV MYSQL_USER mysql_user_value
+ENV MYSQL_PASSWORD mysql_password_value
+
+COPY db db
+COPY db_compare db_compare
+
+COPY requirements.txt requirements.txt
+RUN pip install -r requirements.txt
+
+RUN cd /app/db && export PYTHONPATH=.
+RUN cd ..
+ENV PYTHONPATH "${PYTHONPATH}:/app/db"
+ENTRYPOINT ["python3.10", "/app/db_compare/mysql_table_checksum.py","--mysql_host", "$MYSQL_HOST", "--mysql_database", "$MYSQL_DATABASE", "--mysql_user", "$MYSQL_USER", "--mysql_password", "$MYSQL_PASSWORD", "--tables_regex", "."]
+
+
diff --git a/sink-connector/python/README.md b/sink-connector/python/README.md
index 4ad2a02f1..2bdc2d5ff 100644
--- a/sink-connector/python/README.md
+++ b/sink-connector/python/README.md
@@ -42,8 +42,8 @@ python db_load/clickhouse_loader.py --clickhouse_host localhost --clickhouse_da
If you loaded the same data in MySQL, you can then run checksums (see test_db.sh)
## Table checksums
-
-Credits : https://www.sisense.com/blog/hashing-tables-to-ensure-consistency-in-postgres-redshift-and-mysql/
+**Note**: Only `Python 3.10` is supported\
+**Credits** : https://www.sisense.com/blog/hashing-tables-to-ensure-consistency-in-postgres-redshift-and-mysql/
Compute the checksum of one table
@@ -55,6 +55,12 @@ python db_compare/mysql_table_checksum.py --mysql_host localhost --mysql_user ro
2022-09-11 19:39:49,148 - INFO - ThreadPoolExecutor-0_0 - Checksum for table menagerie.pet = 3d19b8b13cf29b5192068278123c5059 count 9
```
+## Compute table Checksum on ReplacingMergeTree tables with is_deleted columns
+```
+python db_compare/clickhouse_table_checksum.py --clickhouse_host localhost --clickhouse_user root --clickhouse_password root --clickhouse_database test --tables_regex . --threads 4
+--exclude_columns=is_deleted,_version --sign_column ""
+```
+
## Connecting to secure ClickHouse
```
python3 clickhouse_table_checksum.py --sign_column=sign --secure=True --clickhouse_port 9440 --clickhouse_host secure-host --clickhouse_user user --clickhouse_password password --clickhouse_database das --tables_regex '^products' --exclude_columns=[sign,ver]
diff --git a/sink-connector/python/__init__.py b/sink-connector/python/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/sink-connector/python/db/__init__.py b/sink-connector/python/db/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
index 9b59d6f33..5e62d523a 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java
@@ -116,15 +116,16 @@ public DbWriter(
}
act.createNewTable(record.getPrimaryKey(), tableName, fields, this.conn);
- this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
- response = metadata.getTableEngine(this.conn, database, tableName);
- this.engine = response.getLeft();
} catch (Exception e) {
log.error("**** Error creating table ***" + tableName, e);
}
} else {
log.error("********* AUTO CREATE DISABLED, Table does not exist, please enable it by setting auto.create.tables=true");
}
+
+ this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
+ response = metadata.getTableEngine(this.conn, database, tableName);
+ this.engine = response.getLeft();
}
if (this.engine != null && this.engine.getEngine().equalsIgnoreCase(DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE.getEngine())) {
@@ -148,6 +149,12 @@ public DbWriter(
}
}
+ public void updateColumnNameToDataTypeMap() throws SQLException {
+ this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
+ MutablePair response = new DBMetadata().getTableEngine(this.conn, database, tableName);
+ this.engine = response.getLeft();
+ }
+
public boolean wasTableMetaDataRetrieved() {
boolean result = true;
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
index 48b6cb0da..c4a31bb7d 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/batch/PreparedStatementExecutor.java
@@ -74,7 +74,7 @@ public boolean addToPreparedStatementBatch(String topicName, Map columnToDataTypeMap,
- DBMetadata.TABLE_ENGINE engine) throws Exception {
+ DBMetadata.TABLE_ENGINE engine) throws RuntimeException {
boolean result = false;
Iterator>, List>> iter = queryToRecordsMap.entrySet().iterator();
@@ -107,7 +107,7 @@ private boolean executePreparedStatement(String insertQuery, String topicName,
Map.Entry>, List> entry,
BlockMetaData bmd, ClickHouseSinkConnectorConfig config,
ClickHouseConnection conn, String tableName, Map columnToDataTypeMap,
- DBMetadata.TABLE_ENGINE engine) {
+ DBMetadata.TABLE_ENGINE engine) throws RuntimeException {
AtomicBoolean result = new AtomicBoolean(false);
long maxRecordsInBatch = config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_MAX_RECORDS.toString());
@@ -166,6 +166,7 @@ private boolean executePreparedStatement(String insertQuery, String topicName,
Metrics.updateErrorCounters(topicName, entry.getValue().size());
log.error("******* ERROR inserting Batch *****************", e);
failedRecords.addAll(batch);
+ throw new RuntimeException(e);
}
if (!truncatedRecords.isEmpty()) {
PreparedStatement ps = null;
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
index 76fb51d42..ed8b61832 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java
@@ -55,6 +55,8 @@ public class ClickHouseBatchRunnable implements Runnable {
private DBCredentials dbCredentials;
+ private List currentBatch = null;
+
public ClickHouseBatchRunnable(ConcurrentLinkedQueue> records,
ClickHouseSinkConnectorConfig config,
Map topic2TableMap) {
@@ -104,27 +106,31 @@ public void run() {
Long taskId = config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
try {
- int numRecords = records.size();
- if (numRecords <= 0) {
- //log.debug(String.format("No records to process ThreadId(%s), TaskId(%s)", Thread.currentThread().getId(), taskId));
- return;
- } else {
- log.debug("**** Processing Batch of Records ****" + numRecords);
- }
+// int numRecords = records.size();
+// if (numRecords <= 0) {
+// //log.debug(String.format("No records to process ThreadId(%s), TaskId(%s)", Thread.currentThread().getId(), taskId));
+// return;
+// } else {
+// log.debug("**** Processing Batch of Records ****" + numRecords);
+// }
// Poll from Queue until its empty.
- while(records.size() > 0) {
- List batch = records.poll();
+ while(records.size() > 0 || currentBatch != null) {
+ if(currentBatch == null) {
+ currentBatch = records.poll();
+ } else {
+ log.debug("***** RETRYING the same batch again");
+ }
///// ***** START PROCESSING BATCH **************************
// Step 1: Add to Inflight batches.
- DebeziumOffsetManagement.addToBatchTimestamps(batch);
+ DebeziumOffsetManagement.addToBatchTimestamps(currentBatch);
- log.info("****** Thread: " + Thread.currentThread().getName() + " Batch Size: " + batch.size() + " ******");
+ log.info("****** Thread: " + Thread.currentThread().getName() + " Batch Size: " + currentBatch.size() + " ******");
// Group records by topic name.
// Create a new map of topic name to list of records.
Map> topicToRecordsMap = new ConcurrentHashMap<>();
- batch.forEach(record -> {
+ currentBatch.forEach(record -> {
String topicName = record.getTopic();
// If the topic name is not present, create a new list and add the record.
if (topicToRecordsMap.containsKey(topicName) == false) {
@@ -150,7 +156,9 @@ public void run() {
if(result) {
// Step 2: Check if the batch can be committed.
- DebeziumOffsetManagement.checkIfBatchCanBeCommitted(batch);
+ if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) {
+ currentBatch = null;
+ }
}
//acknowledgeRecords(batch);
///// ***** END PROCESSING BATCH **************************
@@ -159,6 +167,11 @@ public void run() {
} catch(Exception e) {
log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e);
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
}
}
@@ -245,8 +258,17 @@ private boolean processRecordsByTopic(String topicName, List r
if(writer == null || writer.wasTableMetaDataRetrieved() == false) {
- log.error("*** TABLE METADATA not retrieved, retry next time");
- return false;
+ log.error("*** TABLE METADATA not retrieved, retrying");
+ if(writer == null) {
+ writer = getDbWriterForTable(topicName, tableName, records.get(0), this.conn);
+ }
+ if(writer.wasTableMetaDataRetrieved() == false)
+ writer.updateColumnNameToDataTypeMap();
+
+ if(writer == null || writer.wasTableMetaDataRetrieved() == false ) {
+ log.error("*** TABLE METADATA not retrieved, retrying on next attempt");
+ return false;
+ }
}
// Step 1: The Batch Insert with preparedStatement in JDBC
// works by forming the Query and then adding records to the Batch.
diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
index 202fd4e8c..657c2acf7 100644
--- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
+++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagement.java
@@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* This class is used to manage the state of the offsets from
@@ -19,17 +20,14 @@ public class DebeziumOffsetManagement {
private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetManagement.class);
// A list of minimum , maximum timestamps of batches in flight
- static Map, List> inFlightBatches = new HashMap<>();
+ static ConcurrentHashMap, List> inFlightBatches = new ConcurrentHashMap<>();
- static Map, List> completedBatches = new HashMap<>();
+ static ConcurrentHashMap, List> completedBatches = new ConcurrentHashMap<>();
- public DebeziumOffsetManagement(Map, List> inFlightBatches) {
+ public DebeziumOffsetManagement(ConcurrentHashMap, List> inFlightBatches) {
this.inFlightBatches = inFlightBatches;
}
- public void addToBatchTimestamps(Pair pair, List clickHouseStructs) {
- inFlightBatches.put(pair, clickHouseStructs);
- }
public static void addToBatchTimestamps(List batch) {
Pair pair = calculateMinMaxTimestampFromBatch(batch);
@@ -75,9 +73,9 @@ public static Pair calculateMinMaxTimestampFromBatch(List batch) {
+ static boolean checkIfThereAreInflightRequests(List currentBatch) {
boolean result = false;
- Pair pair = calculateMinMaxTimestampFromBatch(batch);
+ Pair currentBatchPair = calculateMinMaxTimestampFromBatch(currentBatch);
//Iterate through inFlightBatches and check if there is any batch
// which is lower than the current batch.
@@ -85,13 +83,13 @@ static boolean checkIfThereAreInflightRequests(List batch) {
Pair key = entry.getKey();
// Ignore the same batch
- if (pair.getLeft().longValue() == key.getLeft().longValue() && pair.getRight().longValue() == key.getRight().longValue()) {
+ if (currentBatchPair.getLeft().longValue() == key.getLeft().longValue() &&
+ currentBatchPair.getRight().longValue() == key.getRight().longValue()) {
continue;
}
- // If the min timestamp of the batch is lower than the current batch
- if (pair.getLeft() < key.getRight()) {
- log.error("*********** Batch is within the range of the in flight batch ***********");
+ // Check if max of current batch is greater than min of inflight batch
+ if(currentBatchPair.getRight().longValue() > key.getLeft().longValue()) {
result = true;
break;
}
@@ -99,7 +97,9 @@ static boolean checkIfThereAreInflightRequests(List batch) {
return result;
}
- static public void checkIfBatchCanBeCommitted(List batch) throws InterruptedException {
+ static synchronized public boolean checkIfBatchCanBeCommitted(List batch) throws InterruptedException {
+ boolean result = false;
+
if(true == checkIfThereAreInflightRequests(batch)) {
// Remove the record from inflightBatches
// and move it to completedBatches.
@@ -109,7 +109,7 @@ static public void checkIfBatchCanBeCommitted(List batch) thro
} else {
// Acknowledge current batch
acknowledgeRecords(batch);
-
+ result = true;
// Check if completed batch can also be acknowledged.
completedBatches.forEach((k, v) -> {
if(false == checkIfThereAreInflightRequests(v)) {
@@ -122,9 +122,11 @@ static public void checkIfBatchCanBeCommitted(List batch) thro
}
});
}
+
+ return result;
}
- static void acknowledgeRecords(List batch) throws InterruptedException {
+ static synchronized void acknowledgeRecords(List batch) throws InterruptedException {
// Acknowledge the records.
// acknowledge records
diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagementTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagementTest.java
index cf1578bbc..8bd0b7eea 100644
--- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagementTest.java
+++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/DebeziumOffsetManagementTest.java
@@ -7,6 +7,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -19,29 +20,104 @@ public class DebeziumOffsetManagementTest {
// Test function to validate the isWithinRange function
@Test
public void testIsWithinRange() {
+
+ // Min and Max values for this batch - 3 and 433
+ List clickHouseStructs = new ArrayList<>();
+ ClickHouseStruct ch1 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch1.setDebezium_ts_ms(21L);
+
+ ClickHouseStruct ch4 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 433L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch4.setDebezium_ts_ms(433L);
+
+ ClickHouseStruct ch2 = new ClickHouseStruct(8, "SERVER5432.test.customers", getKafkaStruct(), 2, 22L ,null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch2.setDebezium_ts_ms(22L);
+
+ ClickHouseStruct ch6 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 3L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch6.setDebezium_ts_ms(3L);
+
+ ClickHouseStruct ch3 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 33L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch3.setDebezium_ts_ms(33L);
+ clickHouseStructs.add(ch1);
+ clickHouseStructs.add(ch2);
+ clickHouseStructs.add(ch3);
+
+ clickHouseStructs.add(ch4);
+ clickHouseStructs.add(ch6);
+
+ // Batch 2 - Min and Max values for this batch - 1001 and 2001
+ List clickHouseStructs1 = new ArrayList<>();
+ ClickHouseStruct ch5 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch5.setDebezium_ts_ms(1001L);
+
+ ClickHouseStruct ch7 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch7.setDebezium_ts_ms(2001L);
+ clickHouseStructs1.add(ch5);
+ clickHouseStructs1.add(ch7);
+
+ DebeziumOffsetManagement.addToBatchTimestamps(clickHouseStructs);
+ DebeziumOffsetManagement.addToBatchTimestamps(clickHouseStructs1);
+
+ // Batch 3 - Min and Max values for this batch - 501 and 1000
+ List clickHouseStructs2 = new ArrayList<>();
+ ClickHouseStruct ch8 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch8.setDebezium_ts_ms(501L);
+
+ ClickHouseStruct ch9 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch9.setDebezium_ts_ms(1000L);
+ clickHouseStructs2.add(ch8);
+ clickHouseStructs2.add(ch9);
+
+ boolean result = DebeziumOffsetManagement.checkIfThereAreInflightRequests(clickHouseStructs2);
+ Assert.assertTrue(result);
+
+ // Batch 4 - Min and Max values for this batch - 1 and 2
+ List clickHouseStructs3 = new ArrayList<>();
+ ClickHouseStruct ch10 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch10.setDebezium_ts_ms(1L);
+
+ ClickHouseStruct ch11 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch11.setDebezium_ts_ms(2L);
+ clickHouseStructs3.add(ch10);
+ clickHouseStructs3.add(ch11);
+
+ boolean result1 = DebeziumOffsetManagement.checkIfThereAreInflightRequests(clickHouseStructs3);
+ Assert.assertFalse(result1);
+
+
+ }
+
+ @Test
+ public void testCalculateMinMaxTimestampFromBatch() {
+ // Test to validate DebeziumOffsetManagement calculateMinMaxTimestampFromBatch function
// Create batch timestamps map.
Map, List> batchTimestamps = new HashMap();
List clickHouseStructs = new ArrayList<>();
- ClickHouseStruct ch1 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 1L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ClickHouseStruct ch1 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, 21L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch1.setDebezium_ts_ms(21L);
+
+ ClickHouseStruct ch4 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 433L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch4.setDebezium_ts_ms(433L);
+
ClickHouseStruct ch2 = new ClickHouseStruct(8, "SERVER5432.test.customers", getKafkaStruct(), 2, 22L ,null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch2.setDebezium_ts_ms(22L);
+
+ ClickHouseStruct ch6 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 3L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch6.setDebezium_ts_ms(3L);
+
ClickHouseStruct ch3 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, 33L, null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
+ ch3.setDebezium_ts_ms(33L);
+
clickHouseStructs.add(ch1);
clickHouseStructs.add(ch2);
clickHouseStructs.add(ch3);
- batchTimestamps.put(Pair.of(1L, 2L), clickHouseStructs);
-
+ clickHouseStructs.add(ch4);
+ clickHouseStructs.add(ch6);
- // Test case 1
- DebeziumOffsetManagement dom = new DebeziumOffsetManagement(batchTimestamps);
-// assert(dom.isWithinRange(Pair.of(10L, 12L)) == false);
-//
-// // Test case 2
-// dom = new DebeziumOffsetManagement(batchTimestamps);
-//
-// dom.isWithinRange(Pair.of(1L, 2L));
-// assert(dom.isWithinRange(Pair.of(1L, 2L)) == true);
+ Pair result = DebeziumOffsetManagement.calculateMinMaxTimestampFromBatch(clickHouseStructs);
+ Assert.assertTrue(result.getLeft() == 3L);
+ Assert.assertTrue(result.getRight() == 433L);
}
public static Struct getKafkaStruct() {
diff --git a/sink-connector/tests/sysbench/run_sysbench_tests.sh b/sink-connector/tests/sysbench/run_sysbench_tests.sh
index cb22993a1..90d21748c 100755
--- a/sink-connector/tests/sysbench/run_sysbench_tests.sh
+++ b/sink-connector/tests/sysbench/run_sysbench_tests.sh
@@ -24,7 +24,7 @@ supported_test_names+=('oltp_update_non_index')
supported_test_names+=('oltp_insert_truncate')
### Sysbench configuration
-num_threads=500
+num_threads=1000
time=500 # IN Seconds
mysql_host=127.0.0.1