Skip to content

Commit

Permalink
Deal with Control Characters in DynamoDB (opensearch-project#5437)
Browse files Browse the repository at this point in the history
* Deal with Control Characters in DynamoDB

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
  • Loading branch information
Galactus22625 authored and divbok committed Feb 24, 2025
1 parent 531f92f commit e72e9c3
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
Expand All @@ -18,16 +15,18 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.OperationType;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class StreamRecordConverter extends RecordConverter {
private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class);
Expand All @@ -38,10 +37,7 @@ public class StreamRecordConverter extends RecordConverter {
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";

private static final ObjectMapper MAPPER = new ObjectMapper();

private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {
};
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();

private final StreamConfig streamConfig;

Expand Down Expand Up @@ -117,13 +113,56 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
}
}

/**
* Process the DynamoDB attributes to be formatted correctly
*/
private Object processAttributeValue(AttributeValue attributeValue){
switch (attributeValue.type()){
case N: // N for number
return new BigDecimal(attributeValue.n());
case B: // B for Binary
return BASE64_ENCODER.encodeToString(attributeValue.b().asByteArray());
case S: // S for String
return attributeValue.s();
case BOOL: // BOOL for Boolean
return attributeValue.bool();
case NS: // NS for Number Set
return attributeValue.ns().stream()
.map(BigDecimal::new).collect(Collectors.toSet());
case BS: // BS for Binary Set
return attributeValue.bs().stream()
.map(buffer -> BASE64_ENCODER.encodeToString(buffer.asByteArray()))
.collect(Collectors.toSet());
case SS: // SS for String Set
return attributeValue.ss();
case L: // L for List
return convertListData(attributeValue.l());
case M: // M for Map
return convertData(attributeValue.m());
case NUL: // NUL for Null
return null;
default:
throw new IllegalArgumentException("Unsupported attribute type: " + attributeValue.type());
}
}

/**
* Convert the DynamoDB attribute map to a normal map for data
*/
private Map<String, Object> convertData(Map<String, AttributeValue> data) throws JsonProcessingException {
String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson();
return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE);
private Map<String, Object> convertData(Map<String, AttributeValue> data) {
return data.entrySet().stream()
.collect(HashMap::new,
(map, entry) -> map.put(entry.getKey(), processAttributeValue(entry.getValue())),
HashMap::putAll);
}

/**
* Convert the DynamoDB attribute List to a normal list for data
*/
private List<Object> convertListData(List<AttributeValue> data) {
return data.stream()
.map(this::processAttributeValue)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
Expand All @@ -24,25 +25,32 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.Identity;
import software.amazon.awssdk.services.dynamodb.model.OperationType;
import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.Identity;


import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.BDDMockito.given;
Expand All @@ -52,14 +60,14 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_PROCESSED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_RECEIVED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT;
Expand All @@ -68,6 +76,7 @@
@ExtendWith(MockitoExtension.class)
class StreamRecordConverterTest {
private static final Random RANDOM = new Random();
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();

@Mock
private PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -188,7 +197,9 @@ void test_writeSingleRecordToBuffer() throws Exception {
"and/or",
"c:\\Home",
"I take\nup multiple\nlines",
"String with some \"backquotes\"."
"String with some \"backquotes\".",
"\u0003 \u0002 \u0001 \u0000 control characters",
"\b control characters"
})
void test_writeSingleRecordToBuffer_with_other_data(final String additionalString) throws Exception {

Expand Down Expand Up @@ -227,6 +238,35 @@ void test_writeSingleRecordToBuffer_with_other_data(final String additionalStrin
verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes());
}

@ParameterizedTest
@MethodSource("provideAttributeValues")
void test_dynamodb_datatypes_are_handled_correctly(final AttributeTestCase attributeTestCase) throws Exception{
AttributeValue attributeValue = attributeTestCase.getAttributeValue();
Object expectedOutput = attributeTestCase.getExpectedOutput();

final Map<String, AttributeValue> data = Map.of("Data", attributeValue);
List<software.amazon.awssdk.services.dynamodb.model.Record> records = buildRecords(1, Instant.now(), data);
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(null, records);

verify(bufferAccumulator, times(1)).add(any(Record.class));

Record capturedRecord = recordArgumentCaptor.getValue();
JacksonEvent event = (JacksonEvent) capturedRecord.getData();
Object actualData = event.get("Data", Object.class);
if (actualData instanceof Collection && expectedOutput instanceof Collection) {
Collection<?> actual = (Collection<?>) actualData;
Collection<?> expected = (Collection<?>) expectedOutput;
assertEquals(actual.size(), expected.size());
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));
} else {
assertEquals(expectedOutput, actualData);
}
}

@Test
void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Exception {

Expand Down Expand Up @@ -552,6 +592,39 @@ private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords
return buildRecords(count, creationTime, Collections.emptyMap());
}

private static Stream<AttributeTestCase> provideAttributeValues() {
return Stream.of(
new AttributeTestCase(AttributeValue.builder().s("testString").build(), "testString"),
new AttributeTestCase(AttributeValue.builder().n("123").build(), new BigDecimal("123")),
new AttributeTestCase(AttributeValue.builder().bool(true).build(), true),
new AttributeTestCase(AttributeValue.builder().nul(true).build(), null),
new AttributeTestCase(AttributeValue.builder().b(SdkBytes.fromUtf8String("0000")).build(),
BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0000").asByteArray())),
new AttributeTestCase(AttributeValue.builder().bs(Arrays.asList(
SdkBytes.fromUtf8String("0000"),
SdkBytes.fromUtf8String("0101")
)).build(),
new ArrayList<>(Arrays.asList(
BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0000").asByteArray()),
BASE64_ENCODER.encodeToString(SdkBytes.fromUtf8String("0101").asByteArray())
))),
new AttributeTestCase(AttributeValue.builder().ss(Arrays.asList("string1", "string2")).build(),
new ArrayList<>(Arrays.asList("string1", "string2"))), // Using ArrayList instead of Set
new AttributeTestCase(AttributeValue.builder().ns(Arrays.asList("1", "2")).build(),
new ArrayList<>(Arrays.asList(new BigDecimal("1"), new BigDecimal("2")))), // Using ArrayList
new AttributeTestCase(AttributeValue.builder().l(Arrays.asList(
AttributeValue.builder().s("listItem1").build(),
AttributeValue.builder().n("2").build()
)).build(), Arrays.asList("listItem1", new BigDecimal("2"))),
new AttributeTestCase(
AttributeValue.builder().m(
Map.of("key1", AttributeValue.builder().s("value1").build(),
"key2", AttributeValue.builder().n("2").build())).build(),
Map.of("key1", "value1", "key2", new BigDecimal("2"))
)
);
}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(
int count,
final Instant creationTime,
Expand Down Expand Up @@ -617,4 +690,21 @@ private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final
return record;
}

private static class AttributeTestCase {
private final AttributeValue attributeValue;
private final Object expectedOutput;

private AttributeTestCase(AttributeValue attributeValue, Object expectedOutput) {
this.attributeValue = attributeValue;
this.expectedOutput = expectedOutput;
}

public AttributeValue getAttributeValue() {
return attributeValue;
}

public Object getExpectedOutput() {
return expectedOutput;
}
}
}

0 comments on commit e72e9c3

Please sign in to comment.