diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 106827cb69..76753aeac9 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -29,7 +29,6 @@ import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory; import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlConnectionManager; -import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager; import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManagerFactory; @@ -42,7 +41,6 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -109,14 +107,7 @@ public void start(Buffer> buffer) { final String s3PathPrefix = getS3PathPrefix(); final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata); - DbTableMetadata dbTableMetadata; - if (sourceConfig.getEngine() == EngineType.MYSQL) { - final Map> tableColumnDataTypeMap = getColumnDataTypeMap( - (MySqlSchemaManager) schemaManager); - dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); - } else { - dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap()); - } + DbTableMetadata dbTableMetadata = getDbTableMetadata(dbMetadata, schemaManager); leaderScheduler = new LeaderScheduler( sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata); @@ -213,11 +204,17 @@ private String getS3PathPrefix() { return s3PathPrefix; } - private Map> getColumnDataTypeMap(final MySqlSchemaManager schemaManager) { + private DbTableMetadata getDbTableMetadata(final DbMetadata dbMetadata, final SchemaManager schemaManager) { + final Map> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager); + return new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); + } + + private Map> getColumnDataTypeMap(final SchemaManager schemaManager) { return sourceConfig.getTableNames().stream() .collect(Collectors.toMap( fullTableName -> fullTableName, - fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1]) + fullTableName -> schemaManager.getColumnDataTypes(fullTableName) )); } + } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java index c60658a577..d5dd9deeb0 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/ColumnType.java @@ -14,12 +14,12 @@ import java.util.Map; public enum ColumnType { - BOOLEAN(16, "boolean"), - SMALLINT(21, "smallint"), - INTEGER(23, "integer"), - BIGINT(20, "bigint"), - REAL(700, "real"), - DOUBLE_PRECISION(701, "double precision"), + BOOLEAN(16, "bool"), + SMALLINT(21, "int2"), + INTEGER(23, "int4"), + BIGINT(20, "int8"), + REAL(700, "float4"), + DOUBLE_PRECISION(701, "float8"), NUMERIC(1700, "numeric"), TEXT(25, "text"), BPCHAR(1042, "bpchar"), diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java index 7148696c76..bccdb745a1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataType.java @@ -5,14 +5,14 @@ public enum PostgresDataType { // Numeric types - SMALLINT("smallint", DataCategory.NUMERIC), - INTEGER("integer", DataCategory.NUMERIC), - BIGINT("bigint", DataCategory.NUMERIC), - SMALLSERIAL("small", DataCategory.NUMERIC), - SERIAL("mediumint unsigned", DataCategory.NUMERIC), - BIGSERIAL("int", DataCategory.NUMERIC), - REAL("real", DataCategory.NUMERIC), - DOUBLE_PRECISION("double precision", DataCategory.NUMERIC), + SMALLINT("int2", DataCategory.NUMERIC), + INTEGER("int4", DataCategory.NUMERIC), + BIGINT("int8", DataCategory.NUMERIC), + SMALLSERIAL("smallserial", DataCategory.NUMERIC), + SERIAL("serial", DataCategory.NUMERIC), + BIGSERIAL("bigserial", DataCategory.NUMERIC), + REAL("float4", DataCategory.NUMERIC), + DOUBLE_PRECISION("float8", DataCategory.NUMERIC), NUMERIC("numeric", DataCategory.NUMERIC), MONEY("money", DataCategory.NUMERIC), @@ -30,7 +30,7 @@ public enum PostgresDataType { JSONB("jsonb",DataCategory.JSON), //Boolean data type - BOOLEAN("boolean", DataCategory.BOOLEAN), + BOOLEAN("bool", DataCategory.BOOLEAN), //Date-time data types DATE("date", DataCategory.TEMPORAL), diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java index edc8ec8b25..bf840b4d87 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/PostgresDataTypeHandler.java @@ -6,6 +6,8 @@ * to appropriate object representations based on their data types. */ public interface PostgresDataTypeHandler { + String BYTES_KEY = "bytes"; + default Object process(final PostgresDataType columnType, final String columnName, final Object value) { if(value == null) return null; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java index ba9cc23721..8dd903388e 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandler.java @@ -3,6 +3,11 @@ import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.opensearch.dataprepper.plugins.source.rds.utils.BytesHexConverter.bytesToHex; + public class BinaryTypeHandler implements PostgresDataTypeHandler { @Override @@ -10,6 +15,12 @@ public Object handle(PostgresDataType columnType, String columnName, Object valu if (!columnType.isBinary()) { throw new IllegalArgumentException("ColumnType is not Binary : " + columnType); } + if (value instanceof Map) { + Object data = ((Map)value).get(BYTES_KEY); + byte[] bytes = ((String) data).getBytes(StandardCharsets.ISO_8859_1); + return "\\x" + bytesToHex(bytes); + } return value.toString(); } + } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java index 1d7d1a5e2a..def171a18a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandler.java @@ -3,14 +3,13 @@ import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHandler; -import java.util.Objects; - public class BooleanTypeHandler implements PostgresDataTypeHandler { @Override public Object handle(PostgresDataType columnType, String columnName, Object value) { if (!columnType.isBoolean()) { throw new IllegalArgumentException("ColumnType is not Boolean: " + columnType); } - return (Objects.equals(value.toString(), "t")) ? Boolean.TRUE: Boolean.FALSE; + final String booleanValue = value.toString(); + return booleanValue.equals("t") || booleanValue.equals("true") ? Boolean.TRUE : Boolean.FALSE; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index 9b025a768e..b6437e0657 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -22,6 +22,8 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataType; import org.opensearch.dataprepper.plugins.source.rds.datatype.mysql.MySQLDataTypeHelper; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,7 +177,6 @@ public void run() { } private void transformEvent(final Event event, final String fullTableName, final EngineType engineType) { - // TODO: support data type mapping in Postgres if (engineType == EngineType.MYSQL) { Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); for (Map.Entry entry : event.toMap().entrySet()) { @@ -184,5 +185,13 @@ private void transformEvent(final Event event, final String fullTableName, final event.put(entry.getKey(), data); } } + if (engineType == EngineType.POSTGRES) { + Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); + for (Map.Entry entry : event.toMap().entrySet()) { + final Object data = PostgresDataTypeHelper.getDataByColumnType(PostgresDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + entry.getValue()); + event.put(entry.getKey(), data); + } + } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java index 1ca9182b40..ffc52b542a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java @@ -74,7 +74,9 @@ public List getPrimaryKeys(final String fullTableName) { return List.of(); } - public Map getColumnDataTypes(final String database, final String tableName) { + public Map getColumnDataTypes(final String fullTableName) { + final String database = fullTableName.split("\\.")[0]; + final String tableName = fullTableName.split("\\.")[1]; final Map columnsToDataType = new HashMap<>(); for (int retry = 0; retry <= NUM_OF_RETRIES; retry++) { try (Connection connection = connectionManager.getConnection()) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManager.java index a3939061fc..00d35af7d2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManager.java @@ -16,12 +16,16 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; + public class PostgresSchemaManager implements SchemaManager { private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaManager.class); private final ConnectionManager connectionManager; @@ -29,6 +33,7 @@ public class PostgresSchemaManager implements SchemaManager { static final int NUM_OF_RETRIES = 3; static final int BACKOFF_IN_MILLIS = 500; static final String COLUMN_NAME = "COLUMN_NAME"; + static final String TYPE_NAME = "TYPE_NAME"; static final String PGOUTPUT = "pgoutput"; public PostgresSchemaManager(ConnectionManager connectionManager) { @@ -117,6 +122,38 @@ public List getPrimaryKeys(final String fullTableName) { throw new RuntimeException("Failed to get primary keys for table " + fullTableName); } + + public Map getColumnDataTypes(final String fullTableName) { + final String[] splits = fullTableName.split("\\."); + final String database = splits[0]; + final String schema = splits[1]; + final String table = splits[2]; + final Map columnsToDataType = new HashMap<>(); + for (int retry = 0; retry <= NUM_OF_RETRIES; retry++) { + try (Connection connection = connectionManager.getConnection()) { + final DatabaseMetaData metaData = connection.getMetaData(); + // Retrieve column metadata + try (ResultSet columns = metaData.getColumns(database, schema, table, null)) { + while (columns.next()) { + columnsToDataType.put( + columns.getString(COLUMN_NAME), + columns.getString(TYPE_NAME) + ); + } + } + return columnsToDataType; + } catch (final Exception e) { + LOG.error("Failed to get dataTypes for database {} schema {} table {}, retrying", database, schema, table, e); + if (retry == NUM_OF_RETRIES) { + throw new RuntimeException(String.format("Failed to get dataTypes for database %s schema %s table %s after " + + "%d retries", database, schema, table, retry), e); + } + } + applyBackoff(); + } + return columnsToDataType; + } + private void applyBackoff() { try { Thread.sleep(BACKOFF_IN_MILLIS); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/SchemaManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/SchemaManager.java index 000a1eea05..d4c34ebd94 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/SchemaManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/SchemaManager.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.source.rds.schema; import java.util.List; +import java.util.Map; /** * Interface for manager classes that are used to get metadata of a database, such as table schemas @@ -12,4 +13,10 @@ public interface SchemaManager { * @return List of primary keys */ List getPrimaryKeys(final String fullTableName); + /** + * Get the mapping of columns to data types for a table + * @param fullTableName The full table name + * @return Map of column names to data types + */ + Map getColumnDataTypes(final String fullTableName); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverter.java new file mode 100644 index 0000000000..db23dd8e41 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverter.java @@ -0,0 +1,13 @@ +package org.opensearch.dataprepper.plugins.source.rds.utils; + +public class BytesHexConverter { + public static String bytesToHex(byte[] bytes) { + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) hexString.append('0'); + hexString.append(hex); + } + return hexString.toString(); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java index 7ff8589203..2d09d97481 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BinaryTypeHandlerTest.java @@ -4,6 +4,9 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import java.util.HashMap; +import java.util.Map; + import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -25,4 +28,19 @@ public void test_handle_binary_string() { assertThat(result, is(instanceOf(String.class))); assertThat(result, is(value)); } + + @Test + public void test_handle_map_value_byte() { + final PostgresDataType columnType = PostgresDataType.BYTEA; + final String columnName = "test_column"; + final Map value = new HashMap<>(); + final String testData = "Text with a single quote: O'Reilly"; + final String expected = "\\x54657874207769746820612073696e676c652071756f74653a204f275265696c6c79"; + value.put("bytes", testData); + + final Object result = handler.handle(columnType, columnName, value); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(expected)); + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java index 0bd969d6b7..4c0808a47b 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/postgres/handler/BooleanTypeHandlerTest.java @@ -3,8 +3,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType; +import java.util.stream.Stream; + import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -18,12 +23,12 @@ void setUp() { handler = new BooleanTypeHandler(); } - @Test - void test_handle_true_values() { - String value = "t"; + @ParameterizedTest + @MethodSource("provideTrueData") + void test_handle_true_values(String value, Boolean expected) { Object result = handler.process(PostgresDataType.BOOLEAN, "testColumn", value); assertThat(result, is(instanceOf(Boolean.class))); - assertThat(result, is(Boolean.TRUE)); + assertThat(result, is(expected)); } @Test @@ -40,4 +45,11 @@ void test_handle_non_boolean_type() { ); } + private static Stream provideTrueData() { + return Stream.of( + Arguments.of("t", Boolean.TRUE), + Arguments.of("true", Boolean.TRUE) + ); + } + } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManagerTest.java index 3856cb3f00..1a433d67a4 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManagerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManagerTest.java @@ -134,7 +134,7 @@ public void getColumnDataTypes_whenFailedToRetrieveColumns_shouldThrowException( when(connection.getMetaData()).thenReturn(databaseMetaData); when(databaseMetaData.getColumns(database, null, tableName, null)).thenThrow(new SQLException("Test exception")); - assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database, tableName)); + assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database + "." + tableName)); } @Test @@ -144,7 +144,7 @@ public void getColumnDataTypes_whenFailedToGetConnection_shouldThrowException() when(connectionManager.getConnection()).thenThrow(new SQLException("Connection failed")); - assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database, tableName)); + assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(database + "." + tableName)); } @Test @@ -171,7 +171,7 @@ void getColumnDataTypes_whenColumnsExist_shouldReturnValidMapping() throws SQLEx when(resultSet.getString(TYPE_NAME)) .thenReturn("INTEGER", "VARCHAR", "TIMESTAMP"); - Map result = schemaManager.getColumnDataTypes(database, tableName); + Map result = schemaManager.getColumnDataTypes(database + "." + tableName); assertThat(result, notNullValue()); assertThat(result.size(), is(expectedColumnTypes.size())); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManagerTest.java index 9eebee0348..c23ddfd8a2 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManagerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/schema/PostgresSchemaManagerTest.java @@ -23,12 +23,16 @@ import org.postgresql.replication.fluent.logical.ChainedLogicalCreateSlotBuilder; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.UUID; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -37,6 +41,8 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager.COLUMN_NAME; +import static org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager.TYPE_NAME; @ExtendWith(MockitoExtension.class) class PostgresSchemaManagerTest { @@ -47,6 +53,12 @@ class PostgresSchemaManagerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private Connection connection; + @Mock + private DatabaseMetaData databaseMetaData; + + @Mock + private ResultSet resultSet; + private PostgresSchemaManager schemaManager; @BeforeEach @@ -160,6 +172,63 @@ void test_getPrimaryKeys_throws_exception_if_failed() throws SQLException { assertThrows(RuntimeException.class, () -> schemaManager.getPrimaryKeys(fullTableName)); } + @Test + public void getColumnDataTypes_whenFailedToRetrieveColumns_shouldThrowException() throws SQLException { + final String database = "my_db"; + final String schema = "public"; + final String tableName = "test"; + final String fullTableName = database + "." + schema + "." + tableName; + when(connectionManager.getConnection()).thenReturn(connection); + when(connection.getMetaData()).thenReturn(databaseMetaData); + when(databaseMetaData.getColumns(database, schema, tableName, null)).thenThrow(new SQLException("Test exception")); + + assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(fullTableName)); + } + + @Test + public void getColumnDataTypes_whenFailedToGetConnection_shouldThrowException() throws SQLException { + final String database = "my_db"; + final String schema = "public"; + final String tableName = "test"; + final String fullTableName = database + "." + schema + "." + tableName; + when(connectionManager.getConnection()).thenThrow(new SQLException("Connection failed")); + + assertThrows(RuntimeException.class, () -> schemaManager.getColumnDataTypes(fullTableName)); + } + + @Test + void getColumnDataTypes_whenColumnsExist_shouldReturnValidMapping() throws SQLException { + final String database = "my_db"; + final String schema = "public"; + final String tableName = "test"; + final String fullTableName = database + "." + schema + "." + tableName; + final Map expectedColumnTypes = Map.of( + "id", "serial", + "name", "char", + "created_at", "timestamp" + ); + + // Setup the mocks + when(connectionManager.getConnection()).thenReturn(connection); + when(connection.getMetaData()).thenReturn(databaseMetaData); + when(databaseMetaData.getColumns(database, schema, tableName, null)) + .thenReturn(resultSet); + + // Setup ResultSet to return our expected columns + when(resultSet.next()) + .thenReturn(true, true, true, false); // Three columns, then done + when(resultSet.getString(COLUMN_NAME)) + .thenReturn("id", "name", "created_at"); + when(resultSet.getString(TYPE_NAME)) + .thenReturn("serial", "char", "timestamp"); + + Map result = schemaManager.getColumnDataTypes(fullTableName); + + assertThat(result, notNullValue()); + assertThat(result.size(), is(expectedColumnTypes.size())); + assertThat(result, equalTo(expectedColumnTypes)); + } + private PostgresSchemaManager createObjectUnderTest() { return new PostgresSchemaManager(connectionManager); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverterTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverterTest.java new file mode 100644 index 0000000000..0e708861c9 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/utils/BytesHexConverterTest.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.plugins.source.rds.utils; + +import org.junit.jupiter.api.Test; + + +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class BytesHexConverterTest { + @Test + void test_bytes_to_hex_string() { + final String testData = "Text with a single quote: O'Reilly"; + byte[] bytes = testData.getBytes(StandardCharsets.ISO_8859_1); + final String expected = "54657874207769746820612073696e676c652071756f74653a204f275265696c6c79"; + final String result = BytesHexConverter.bytesToHex(bytes); + assertThat(result, equalTo(expected)); + } +}