diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index 85f7df05d2..d5e019df64 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -5,9 +5,6 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; @@ -18,16 +15,18 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.OperationType; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; +import java.math.BigDecimal; import java.time.Instant; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class StreamRecordConverter extends RecordConverter { private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); @@ -38,10 +37,7 @@ public class StreamRecordConverter extends RecordConverter { static final String BYTES_RECEIVED = "bytesReceived"; static final String BYTES_PROCESSED = "bytesProcessed"; - private static final ObjectMapper MAPPER = new ObjectMapper(); - - private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() { - }; + private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); private final StreamConfig streamConfig; @@ -117,13 +113,56 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List BASE64_ENCODER.encodeToString(buffer.asByteArray())) + .collect(Collectors.toSet()); + case SS: // SS for String Set + return attributeValue.ss(); + case L: // L for List + return convertListData(attributeValue.l()); + case M: // M for Map + return convertData(attributeValue.m()); + case NUL: // NUL for Null + return null; + default: + throw new IllegalArgumentException("Unsupported attribute type: " + attributeValue.type()); + } + } /** * Convert the DynamoDB attribute map to a normal map for data */ - private Map convertData(Map data) throws JsonProcessingException { - String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); - return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); + private Map convertData(Map data) { + return data.entrySet().stream() + .collect(HashMap::new, + (map, entry) -> map.put(entry.getKey(), processAttributeValue(entry.getValue())), + HashMap::putAll); + } + + /** + * Convert the DynamoDB attribute List to a normal list for data + */ + private List convertListData(List data) { + return data.stream() + .map(this::processAttributeValue) + .collect(Collectors.toList()); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 43ef07f7dc..86655050c6 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; @@ -24,25 +25,32 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.Identity; import software.amazon.awssdk.services.dynamodb.model.OperationType; import software.amazon.awssdk.services.dynamodb.model.StreamRecord; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; -import software.amazon.awssdk.services.dynamodb.model.Identity; - +import java.math.BigDecimal; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.stream.Stream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.BDDMockito.given; @@ -52,14 +60,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_PROCESSED; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_RECEIVED; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT; @@ -68,6 +76,7 @@ @ExtendWith(MockitoExtension.class) class StreamRecordConverterTest { private static final Random RANDOM = new Random(); + private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); @Mock private PluginMetrics pluginMetrics; @@ -188,7 +197,9 @@ void test_writeSingleRecordToBuffer() throws Exception { "and/or", "c:\\Home", "I take\nup multiple\nlines", - "String with some \"backquotes\"." + "String with some \"backquotes\".", + "\u0003 \u0002 \u0001 \u0000 control characters", + "\b control characters" }) void test_writeSingleRecordToBuffer_with_other_data(final String additionalString) throws Exception { @@ -227,6 +238,35 @@ void test_writeSingleRecordToBuffer_with_other_data(final String additionalStrin verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); } + @ParameterizedTest + @MethodSource("provideAttributeValues") + void test_dynamodb_datatypes_are_handled_correctly(final AttributeTestCase attributeTestCase) throws Exception{ + AttributeValue attributeValue = attributeTestCase.getAttributeValue(); + Object expectedOutput = attributeTestCase.getExpectedOutput(); + + final Map data = Map.of("Data", attributeValue); + List records = buildRecords(1, Instant.now(), data); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator, times(1)).add(any(Record.class)); + + Record capturedRecord = recordArgumentCaptor.getValue(); + JacksonEvent event = (JacksonEvent) capturedRecord.getData(); + Object actualData = event.get("Data", Object.class); + if (actualData instanceof Collection && expectedOutput instanceof Collection) { + Collection actual = (Collection) actualData; + Collection expected = (Collection) expectedOutput; + assertEquals(actual.size(), expected.size()); + assertTrue(actual.containsAll(expected) && expected.containsAll(actual)); + } else { + assertEquals(expectedOutput, actualData); + } + } + @Test void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Exception { @@ -552,6 +592,39 @@ private List buildRecords return buildRecords(count, creationTime, Collections.emptyMap()); } + private static Stream provideAttributeValues() { + return Stream.of( + new AttributeTestCase(AttributeValue.builder().s("testString").build(), "testString"), + new AttributeTestCase(AttributeValue.builder().n("123").build(), new BigDecimal("123")), + new AttributeTestCase(AttributeValue.builder().bool(true).build(), true), + new AttributeTestCase(AttributeValue.builder().nul(true).build(), null), + new AttributeTestCase(AttributeValue.builder().b(SdkBytes.fromUtf8String("0000")).build(), + BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0000").asByteArray())), + new AttributeTestCase(AttributeValue.builder().bs(Arrays.asList( + SdkBytes.fromUtf8String("0000"), + SdkBytes.fromUtf8String("0101") + )).build(), + new ArrayList<>(Arrays.asList( + BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0000").asByteArray()), + BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0101").asByteArray()) + ))), + new AttributeTestCase(AttributeValue.builder().ss(Arrays.asList("string1", "string2")).build(), + new ArrayList<>(Arrays.asList("string1", "string2"))), // Using ArrayList instead of Set + new AttributeTestCase(AttributeValue.builder().ns(Arrays.asList("1", "2")).build(), + new ArrayList<>(Arrays.asList(new BigDecimal("1"), new BigDecimal("2")))), // Using ArrayList + new AttributeTestCase(AttributeValue.builder().l(Arrays.asList( + AttributeValue.builder().s("listItem1").build(), + AttributeValue.builder().n("2").build() + )).build(), Arrays.asList("listItem1", new BigDecimal("2"))), + new AttributeTestCase( + AttributeValue.builder().m( + Map.of("key1", AttributeValue.builder().s("value1").build(), + "key2", AttributeValue.builder().n("2").build())).build(), + Map.of("key1", "value1", "key2", new BigDecimal("2")) + ) + ); + } + private List buildRecords( int count, final Instant creationTime, @@ -617,4 +690,21 @@ private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final return record; } + private static class AttributeTestCase { + private final AttributeValue attributeValue; + private final Object expectedOutput; + + private AttributeTestCase(AttributeValue attributeValue, Object expectedOutput) { + this.attributeValue = attributeValue; + this.expectedOutput = expectedOutput; + } + + public AttributeValue getAttributeValue() { + return attributeValue; + } + + public Object getExpectedOutput() { + return expectedOutput; + } + } } \ No newline at end of file