Skip to content

Commit

Permalink
Merge pull request #464 from Altinity/463-the-sink-connector-lightwei…
Browse files Browse the repository at this point in the history
…ght-should-retry-on-failures

Integration tests
  • Loading branch information
subkanthi authored Feb 27, 2024
2 parents 3318fa3 + 1eea474 commit 1b1d81a
Show file tree
Hide file tree
Showing 25 changed files with 669 additions and 172 deletions.
2 changes: 0 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@
<include>**/*Test.java</include>
<include>**/*IT.java</include>
</includes>
<parallel>all</parallel>
<threadCount>10</threadCount>
<properties>
<property>
<name>listener</name>
Expand Down
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,6 @@ restart.event.loop.timeout.period.secs: "3000"

# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# Maximum number of threads in the thread pool for processing CDC records.
#thread.pool.size: 10
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records;
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records = new ConcurrentLinkedQueue<>();


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -197,11 +197,11 @@ private ClickHouseStruct processEveryChangeRecord(Properties props, ChangeEvent<


performDDLOperation(DDL, props, sr, config);
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);

this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
setupProcessingThread(config, new MySQLDDLParserService(config));
// ThreadFactory namedThreadFactory =
// new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
// this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);
// this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}

} else {
Expand Down Expand Up @@ -673,12 +673,15 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) {
private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLParserService ddlParserService) {

// Setup separate thread to read messages from shared buffer.
this.records = new ConcurrentLinkedQueue<>();
this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
// this.records = new ConcurrentLinkedQueue<>();
//this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build();
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory);
this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
for(int i = 0; i < config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); i++) {
this.executor.scheduleAtFixedRate(new ClickHouseBatchRunnable(this.records, config, new HashMap()), 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
//this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}

private void appendToRecords(List<ClickHouseStruct> convertedRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.DataTypeConverter;
import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.*;
import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
Expand Down Expand Up @@ -279,7 +280,7 @@ private String getClickHouseDataType(String parsedDataType, ParseTree colDefTree
}
} // datetime(6)
else if(parsedDataType.contains("(") && parsedDataType.contains(")") &&
(parsedDataType.contains("datetime") || parsedDataType.contains("timestamp"))){
(containsIgnoreCase(parsedDataType, "datetime") || containsIgnoreCase(parsedDataType, "timestamp"))){
try {
precision = Integer.parseInt(parsedDataType.substring(parsedDataType.indexOf("(") + 1, parsedDataType.indexOf(")")));
} catch(Exception e) {
Expand Down Expand Up @@ -471,6 +472,18 @@ public void enterAlterTable(MySqlParser.AlterTableContext alterTableContext) {
} else if (tree instanceof MySqlParser.AlterByModifyColumnContext) {
parseAlterTable(tree);
} else if (tree instanceof MySqlParser.AlterByDropColumnContext) {
// Drop Column.
this.query.append(" ");
for (ParseTree dropColumnTree : ((MySqlParser.AlterByDropColumnContext) (tree)).children) {
if (dropColumnTree instanceof MySqlParser.UidContext) {
for(ParseTree dropColumnChild: ((MySqlParser.UidContext) dropColumnTree).children) {
if(dropColumnChild instanceof MySqlParser.SimpleIdContext) {
this.query.append(String.format(Constants.DROP_COLUMN, dropColumnChild.getText()));
}
}
// this.query.append(String.format(Constants.DROP_COLUMN, ((MySqlParser.AlterByDropColumnContext) tree).uid()));
}
}
} else if (tree instanceof MySqlParser.AlterByRenameColumnContext) {
parseRenameColumn(tree);
} else if (tree instanceof MySqlParser.AlterByAddPrimaryKeyContext) {
Expand Down Expand Up @@ -524,15 +537,15 @@ private void parseTreeHelper(ParseTree child) {
}
}

@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext alterByDropColumnContext) {
this.query.append(" ");
for (ParseTree tree : alterByDropColumnContext.children) {
if (tree instanceof MySqlParser.UidContext) {
this.query.append(String.format(Constants.DROP_COLUMN, tree.getText()));
}
}
}
// @Override
// public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext alterByDropColumnContext) {
// this.query.append(" ");
// for (ParseTree tree : alterByDropColumnContext.children) {
// if (tree instanceof MySqlParser.UidContext) {
// this.query.append(String.format(Constants.DROP_COLUMN, tree.getText()));
// }
// }
// }

@Override
public void enterDropTable(MySqlParser.DropTableContext dropTableContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.AppInjector;
import com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication;
import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.api.DebeziumEmbeddedRestApi;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.classic.methods.HttpGet;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.classic.methods.HttpUriRequest;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.HttpEntity;
import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.http.io.entity.EntityUtils;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.altinity.clickhouse.debezium.embedded.ITCommon.getDebeziumProperties;

@Testcontainers
@DisplayName("Test that validates if the sink connector retries batches on ClickHouse failure")
public class BatchRetryOnFailureIT {
protected MySQLContainer mySqlContainer;

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_schema_only_column_timezone.sql")
// .withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);
@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("datetime.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(15000);
}

@Test
public void testBatchRetryOnCHFailure() throws Exception {

Injector injector = Guice.createInjector(new AppInjector());

Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);
// Override clickhouse server timezone.
ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();


ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
throw new RuntimeException(e);
}

});
Connection conn = ITCommon.connectToMySQL(mySqlContainer);
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();

Thread.sleep(40000);//
Thread.sleep(10000);


// Pause clickhouse container to simulate a batch failure.
clickHouseContainer.getDockerClient().pauseContainerCmd(clickHouseContainer.getContainerId()).exec();
conn.prepareStatement("INSERT INTO `temporal_types_DATETIME` VALUES ('DATETIME-INSERT55','1000-01-01 00:00:00','2022-09-29 01:47:46','9999-12-31 23:59:59','9999-12-31 23:59:59');\n").execute();

Thread.sleep(50000);

clickHouseContainer.getDockerClient().unpauseContainerCmd(clickHouseContainer.getContainerId()).exec();
Thread.sleep(10000);


// Check if Batch was inserted.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);

ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME where Type = 'DATETIME-INSERT55'");
boolean insertCheck = false;
while(dateTimeResult.next()) {
insertCheck = true;
Assert.assertTrue(dateTimeResult.getString("Type").equalsIgnoreCase("DATETIME-INSERT55"));
}
Assert.assertTrue(insertCheck);


// Close connection.
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
}
}
Loading

0 comments on commit 1b1d81a

Please sign in to comment.