Skip to content

Commit

Permalink
Data type mapping for postgres export
Browse files Browse the repository at this point in the history
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>

Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>

Data type mapping for postgres export

Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>

Data type mapping for postgres export

Data type mapping for postgres export
  • Loading branch information
divbok committed Feb 19, 2025
1 parent 85127bc commit 531f92f
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory;
Expand All @@ -42,7 +41,6 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -109,14 +107,7 @@ public void start(Buffer<Record<Event>> buffer) {
final String s3PathPrefix = getS3PathPrefix();

final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
DbTableMetadata dbTableMetadata;
if (sourceConfig.getEngine() == EngineType.MYSQL) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(
(MySqlSchemaManager) schemaManager);
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
} else {
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap());
}
DbTableMetadata dbTableMetadata = getDbTableMetadata(dbMetadata, schemaManager);

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
Expand Down Expand Up @@ -213,11 +204,17 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) {
private DbTableMetadata getDbTableMetadata(final DbMetadata dbMetadata, final SchemaManager schemaManager) {
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
return new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
fullTableName -> schemaManager.getColumnDataTypes(fullTableName)
));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import java.util.Map;

public enum ColumnType {
BOOLEAN(16, "boolean"),
SMALLINT(21, "smallint"),
INTEGER(23, "integer"),
BIGINT(20, "bigint"),
REAL(700, "real"),
DOUBLE_PRECISION(701, "double precision"),
BOOLEAN(16, "bool"),
SMALLINT(21, "int2"),
INTEGER(23, "int4"),
BIGINT(20, "int8"),
REAL(700, "float4"),
DOUBLE_PRECISION(701, "float8"),
NUMERIC(1700, "numeric"),
TEXT(25, "text"),
BPCHAR(1042, "bpchar"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

public enum PostgresDataType {
// Numeric types
SMALLINT("smallint", DataCategory.NUMERIC),
INTEGER("integer", DataCategory.NUMERIC),
BIGINT("bigint", DataCategory.NUMERIC),
SMALLSERIAL("small", DataCategory.NUMERIC),
SERIAL("mediumint unsigned", DataCategory.NUMERIC),
BIGSERIAL("int", DataCategory.NUMERIC),
REAL("real", DataCategory.NUMERIC),
DOUBLE_PRECISION("double precision", DataCategory.NUMERIC),
SMALLINT("int2", DataCategory.NUMERIC),
INTEGER("int4", DataCategory.NUMERIC),
BIGINT("int8", DataCategory.NUMERIC),
SMALLSERIAL("smallserial", DataCategory.NUMERIC),
SERIAL("serial", DataCategory.NUMERIC),
BIGSERIAL("bigserial", DataCategory.NUMERIC),
REAL("float4", DataCategory.NUMERIC),
DOUBLE_PRECISION("float8", DataCategory.NUMERIC),
NUMERIC("numeric", DataCategory.NUMERIC),
MONEY("money", DataCategory.NUMERIC),

Expand All @@ -30,7 +30,7 @@ public enum PostgresDataType {
JSONB("jsonb",DataCategory.JSON),

//Boolean data type
BOOLEAN("boolean", DataCategory.BOOLEAN),
BOOLEAN("bool", DataCategory.BOOLEAN),

//Date-time data types
DATE("date", DataCategory.TEMPORAL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* to appropriate object representations based on their data types.
*/
public interface PostgresDataTypeHandler {
String BYTES_KEY = "bytes";

default Object process(final PostgresDataType columnType, final String columnName, final Object value) {
if(value == null)
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.rds.utils.BytesHexConverter.bytesToHex;


public class BinaryTypeHandler implements PostgresDataTypeHandler {
@Override
public Object handle(PostgresDataType columnType, String columnName, Object value) {
if (!columnType.isBinary()) {
throw new IllegalArgumentException("ColumnType is not Binary : " + columnType);
}
if (value instanceof Map) {
Object data = ((Map<?, ?>)value).get(BYTES_KEY);
byte[] bytes = ((String) data).getBytes(StandardCharsets.ISO_8859_1);
return "\\x" + bytesToHex(bytes);
}
return value.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler;

import java.util.Objects;

public class BooleanTypeHandler implements PostgresDataTypeHandler {
@Override
public Object handle(PostgresDataType columnType, String columnName, Object value) {
if (!columnType.isBoolean()) {
throw new IllegalArgumentException("ColumnType is not Boolean: " + columnType);
}
return (Objects.equals(value.toString(), "t")) ? Boolean.TRUE: Boolean.FALSE;
final String booleanValue = value.toString();
return booleanValue.equals("t") || booleanValue.equals("true") ? Boolean.TRUE : Boolean.FALSE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -175,7 +177,6 @@ public void run() {
}

private void transformEvent(final Event event, final String fullTableName, final EngineType engineType) {
// TODO: support data type mapping in Postgres
if (engineType == EngineType.MYSQL) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
for (Map.Entry<String, Object> entry : event.toMap().entrySet()) {
Expand All @@ -184,5 +185,13 @@ private void transformEvent(final Event event, final String fullTableName, final
event.put(entry.getKey(), data);
}
}
if (engineType == EngineType.POSTGRES) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
for (Map.Entry<String, Object> entry : event.toMap().entrySet()) {
final Object data = PostgresDataTypeHelper.getDataByColumnType(PostgresDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(),
entry.getValue());
event.put(entry.getKey(), data);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public List<String> getPrimaryKeys(final String fullTableName) {
return List.of();
}

public Map<String, String> getColumnDataTypes(final String database, final String tableName) {
public Map<String, String> getColumnDataTypes(final String fullTableName) {
final String database = fullTableName.split("\\.")[0];
final String tableName = fullTableName.split("\\.")[1];
final Map<String, String> columnsToDataType = new HashMap<>();
for (int retry = 0; retry <= NUM_OF_RETRIES; retry++) {
try (Connection connection = connectionManager.getConnection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;


public class PostgresSchemaManager implements SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaManager.class);
private final ConnectionManager connectionManager;

static final int NUM_OF_RETRIES = 3;
static final int BACKOFF_IN_MILLIS = 500;
static final String COLUMN_NAME = "COLUMN_NAME";
static final String TYPE_NAME = "TYPE_NAME";
static final String PGOUTPUT = "pgoutput";

public PostgresSchemaManager(ConnectionManager connectionManager) {
Expand Down Expand Up @@ -117,6 +122,38 @@ public List<String> getPrimaryKeys(final String fullTableName) {
throw new RuntimeException("Failed to get primary keys for table " + fullTableName);
}


public Map<String, String> getColumnDataTypes(final String fullTableName) {
final String[] splits = fullTableName.split("\\.");
final String database = splits[0];
final String schema = splits[1];
final String table = splits[2];
final Map<String, String> columnsToDataType = new HashMap<>();
for (int retry = 0; retry <= NUM_OF_RETRIES; retry++) {
try (Connection connection = connectionManager.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();
// Retrieve column metadata
try (ResultSet columns = metaData.getColumns(database, schema, table, null)) {
while (columns.next()) {
columnsToDataType.put(
columns.getString(COLUMN_NAME),
columns.getString(TYPE_NAME)
);
}
}
return columnsToDataType;
} catch (final Exception e) {
LOG.error("Failed to get dataTypes for database {} schema {} table {}, retrying", database, schema, table, e);
if (retry == NUM_OF_RETRIES) {
throw new RuntimeException(String.format("Failed to get dataTypes for database %s schema %s table %s after " +
"%d retries", database, schema, table, retry), e);
}
}
applyBackoff();
}
return columnsToDataType;
}

private void applyBackoff() {
try {
Thread.sleep(BACKOFF_IN_MILLIS);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.source.rds.schema;

import java.util.List;
import java.util.Map;

/**
* Interface for manager classes that are used to get metadata of a database, such as table schemas
Expand All @@ -12,4 +13,10 @@ public interface SchemaManager {
* @return List of primary keys
*/
List<String> getPrimaryKeys(final String fullTableName);
/**
* Get the mapping of columns to data types for a table
* @param fullTableName The full table name
* @return Map of column names to data types
*/
Map<String,String> getColumnDataTypes(final String fullTableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.dataprepper.plugins.source.rds.utils;

public class BytesHexConverter {
public static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) hexString.append('0');
hexString.append(hex);
}
return hexString.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -25,4 +28,19 @@ public void test_handle_binary_string() {
assertThat(result, is(instanceOf(String.class)));
assertThat(result, is(value));
}

@Test
public void test_handle_map_value_byte() {
final PostgresDataType columnType = PostgresDataType.BYTEA;
final String columnName = "test_column";
final Map<String, Object> value = new HashMap<>();
final String testData = "Text with a single quote: O'Reilly";
final String expected = "\\x54657874207769746820612073696e676c652071756f74653a204f275265696c6c79";
value.put("bytes", testData);

final Object result = handler.handle(columnType, columnName, value);

assertThat(result, is(instanceOf(String.class)));
assertThat(result, is(expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;

import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -18,12 +23,12 @@ void setUp() {
handler = new BooleanTypeHandler();
}

@Test
void test_handle_true_values() {
String value = "t";
@ParameterizedTest
@MethodSource("provideTrueData")
void test_handle_true_values(String value, Boolean expected) {
Object result = handler.process(PostgresDataType.BOOLEAN, "testColumn", value);
assertThat(result, is(instanceOf(Boolean.class)));
assertThat(result, is(Boolean.TRUE));
assertThat(result, is(expected));
}

@Test
Expand All @@ -40,4 +45,11 @@ void test_handle_non_boolean_type() {
);
}

private static Stream<Arguments> provideTrueData() {
return Stream.of(
Arguments.of("t", Boolean.TRUE),
Arguments.of("true", Boolean.TRUE)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void getColumnDataTypes_whenFailedToRetrieveColumns_shouldThrowException(
when(connection.getMetaData()).thenReturn(databaseMetaData);
when(databaseMetaData.getColumns(database, null, tableName, null)).thenThrow(new SQLException("Test exception"));

assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database, tableName));
assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database + "." + tableName));
}

@Test
Expand All @@ -144,7 +144,7 @@ public void getColumnDataTypes_whenFailedToGetConnection_shouldThrowException()

when(connectionManager.getConnection()).thenThrow(new SQLException("Connection failed"));

assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database, tableName));
assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database + "." + tableName));
}

@Test
Expand All @@ -171,7 +171,7 @@ void getColumnDataTypes_whenColumnsExist_shouldReturnValidMapping() throws SQLEx
when(resultSet.getString(TYPE_NAME))
.thenReturn("INTEGER", "VARCHAR", "TIMESTAMP");

Map<String, String> result = schemaManager.getColumnDataTypes(database, tableName);
Map<String, String> result = schemaManager.getColumnDataTypes(database + "." + tableName);

assertThat(result, notNullValue());
assertThat(result.size(), is(expectedColumnTypes.size()));
Expand Down
Loading

0 comments on commit 531f92f

Please sign in to comment.