Skip to content

Commit

Permalink
Support cross account DynamoDB tables for streams. (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#4776)

Update requests to DynamoDB to provide the table ARN instead of the table name. This allows Data Prepper to use the new cross-account and resource policy changes available in DynamoDB.

Resolves opensearch-project#4424

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Jan 14, 2025
1 parent 81be7e3 commit 9b07d40
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.TableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -234,8 +233,8 @@ private void compareAndCreateChildrenPartitions(List<EnhancedSourcePartition> so
* Conduct Metadata info for table and also perform validation on configuration.
* Once created, the info should not be changed.
*/
private TableInfo getTableInfo(TableConfig tableConfig) {
String tableName = TableUtil.getTableNameFromArn(tableConfig.getTableArn());
private TableInfo getTableInfo(final TableConfig tableConfig) {
final String tableName = tableConfig.getTableArn();
DescribeTableResponse describeTableResult;
try {
// Need to call describe table to get the Key schema for table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.utils;

import software.amazon.awssdk.arns.Arn;

public class TableUtil {

public static String getTableNameFromArn(String tableArn) {
Arn arn = Arn.fromString(tableArn);
// resourceAsString is table/xxx
return arn.resourceAsString().substring("table/".length());
}

public static String getTableArnFromStreamArn(String streamArn) {
// e.g. Given a stream arn: arn:aws:dynamodb:us-west-2:xxx:table/test-table/stream/2023-07-31T04:59:58.190
// Returns arn:aws:dynamodb:us-west-2:xxx:table/test-table
Expand All @@ -21,6 +13,4 @@ public static String getTableArnFromExportArn(String exportArn) {
// returns: arn:aws:dynamodb:us-west-2:123456789012:table/Thread
return exportArn.substring(0, exportArn.lastIndexOf("/export/"));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
Expand Down Expand Up @@ -223,7 +224,10 @@ void test_should_init() throws InterruptedException {


// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));
// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));
// Acquire the init partition
Expand Down Expand Up @@ -252,7 +256,11 @@ void test_PITR_not_enabled_init_should_failed() throws InterruptedException {
executorService.shutdownNow();

// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));

// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));

Expand All @@ -277,7 +285,11 @@ void test_streaming_not_enabled_init_should_failed() throws InterruptedException
executorService.shutdownNow();

// Should call describe table to get basic table info
verify(dynamoDbClient).describeTable(any(DescribeTableRequest.class));
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));

// Should check PITR enabled or not
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ class TableUtilTest {
private final String exportArn = tableArn + "/export/01693291918297-bfeccbea";
private final String streamArn = tableArn + "/stream/2023-09-14T05:46:45.367";

@Test
void test_getTableNameFromArn_should_return_tableName() {
String result = TableUtil.getTableNameFromArn(tableArn);
assertThat(result, equalTo(tableName));
}

@Test
void test_getTableArnFromStreamArn_should_return_tableArn() {
String result = TableUtil.getTableArnFromStreamArn(streamArn);
Expand Down

0 comments on commit 9b07d40

Please sign in to comment.