Skip to content

Commit

Permalink
Timeout retry stuck sdk client (#219)
Browse files Browse the repository at this point in the history
## Description of change
This PR adds timeout and retry logic to S3SdkObjectClient in case
S3AsyncClinet cannot response to request for given time.

#### Relevant issues
AWS Java SDK V2 has an issue of getting stuck time to time:
aws/aws-sdk-java-v2#5755

#### Does this contribution introduce any breaking changes to the
existing APIs or behaviors?
No

#### Does this contribution introduce any new public APIs or behaviors?
No

#### How was the contribution tested?
- Executed integration tests
- Executed unit tests

#### Does this contribution need a changelog entry?
No

#### Next Steps
- In a follow-up PR, we need to make timeout configurable

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Co-authored-by: Erdogan Ozkoca <ozkoca@amazon.com>
  • Loading branch information
ozkoca and Erdogan Ozkoca authored Feb 11, 2025
1 parent 451720f commit c616bf3
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.*;
Expand Down Expand Up @@ -305,13 +307,14 @@ public <T> T measureJoin(
*/
private <T> T handleCompletableFutureJoin(CompletableFuture<T> future) throws IOException {
try {
return future.join();
} catch (CompletionException e) {
return future.get(120_000, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
Throwable cause = e.getCause();
if (cause instanceof UncheckedIOException) {
throw ((UncheckedIOException) cause).getCause();
}
throw e;

throw new IOException("Error while getting data", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

import java.io.IOException;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/** Tests read stream behaviour with untrusted S3ClientKinds on multiple sizes and read patterns */
@Disabled("Disabled as AAL is not resilient to Faulty S3 Clients yet.")
public class GrayFailureTest extends IntegrationTestBase {
@ParameterizedTest
@MethodSource("sequentialReads")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Operation;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry;
Expand All @@ -44,6 +47,10 @@ public class Block implements Closeable {
private final ObjectKey objectKey;
private final Range range;
private final Telemetry telemetry;
private final ObjectClient objectClient;
private final StreamContext streamContext;
private final ReadMode readMode;
private final Referrer referrer;

@Getter private final long start;
@Getter private final long end;
Expand All @@ -52,6 +59,10 @@ public class Block implements Closeable {
private static final String OPERATION_BLOCK_GET_ASYNC = "block.get.async";
private static final String OPERATION_BLOCK_GET_JOIN = "block.get.join";

private static final int MAX_RETRIES = 20;

private static final Logger LOG = LoggerFactory.getLogger(Block.class);

/**
* Constructs a Block data.
*
Expand Down Expand Up @@ -110,16 +121,24 @@ public Block(
this.telemetry = telemetry;
this.objectKey = objectKey;
this.range = new Range(start, end);
this.objectClient = objectClient;
this.streamContext = streamContext;
this.readMode = readMode;
this.referrer = new Referrer(range.toHttpString(), readMode);

generateSourceAndData();
}

/** Method to help construct source and data */
private void generateSourceAndData() {
GetRequest.GetRequestBuilder getRequestBuilder =
GetRequest.builder()
.s3Uri(this.objectKey.getS3URI())
.range(this.range)
.etag(this.objectKey.getEtag())
.referrer(new Referrer(range.toHttpString(), readMode));
.referrer(referrer);

GetRequest getRequest = getRequestBuilder.build();

this.source =
this.telemetry.measureCritical(
() ->
Expand All @@ -145,7 +164,7 @@ public Block(
public int read(long pos) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");

byte[] content = this.getData();
byte[] content = this.getDataWithRetries();
return Byte.toUnsignedInt(content[posToOffset(pos)]);
}

Expand All @@ -165,7 +184,7 @@ public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOExcep
Preconditions.checkArgument(0 <= len, "`len` must not be negative");
Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer");

byte[] content = this.getData();
byte[] content = this.getDataWithRetries();
int contentOffset = posToOffset(pos);
int available = content.length - contentOffset;
int bytesToCopy = Math.min(len, available);
Expand Down Expand Up @@ -199,6 +218,34 @@ private int posToOffset(long pos) {
return (int) (pos - start);
}

/**
* Returns the bytes fetched by the issued {@link GetRequest}. If it receives an IOException from
* {@link S3SdkObjectClient}, retries for MAX_RETRIES count.
*
* @return the bytes fetched by the issued {@link GetRequest}.
* @throws IOException if an I/O error occurs after maximum retry counts
*/
private byte[] getDataWithRetries() throws IOException {
for (int i = 0; i < MAX_RETRIES; i++) {
try {
return this.getData();
} catch (IOException ex) {
if (ex.getClass() == IOException.class) {
if (i < MAX_RETRIES - 1) {
LOG.info("Get data failed. Retrying. Retry Count {}", i);
generateSourceAndData();
} else {
LOG.error("Cannot read block file. Retry reached the limit");
throw new IOException("Cannot read block file", ex.getCause());
}
} else {
throw ex;
}
}
}
throw new IOException("Cannot read block file", new IOException("Error while getting block"));
}

/**
* Returns the bytes fetched by the issued {@link GetRequest}. This method will block until the
* data is fully available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public IOPlanExecution execute(IOPlan ioPlan) {
private void handleOperationExceptions(Exception e) {
if (e.getCause() != null
&& e.getCause().getMessage() != null
&& e.getCause().getMessage().contains("Status Code: 412")) {
&& (e.getCause().getMessage().contains("Status Code: 412")
|| e.getCause().getMessage().contains("Error while getting block"))) {
try {
metadataStore.evictKey(this.objectKey.getS3URI());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient;
import software.amazon.s3.analyticsaccelerator.util.FakeStuckObjectClient;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

Expand Down Expand Up @@ -235,6 +236,25 @@ void testContainsBoundaries() {
assertThrows(IllegalArgumentException.class, () -> block.contains(-1));
}

@Test
void testReadTimeoutAndRetry() throws IOException {
final String TEST_DATA = "test-data";
ObjectKey stuckObjectKey =
ObjectKey.builder().s3URI(S3URI.of("stuck-client", "bar")).etag(ETAG).build();
ObjectClient fakeStuckObjectClient = new FakeStuckObjectClient(TEST_DATA);

Block block =
new Block(
stuckObjectKey,
fakeStuckObjectClient,
TestTelemetry.DEFAULT,
0,
TEST_DATA.length(),
0,
ReadMode.SYNC);
assertThrows(IOException.class, () -> block.read(4));
}

@Test
void testClose() {
final String TEST_DATA = "test-data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,32 @@ public void test_FailureEvictsObjectsAsExpected() throws IOException {
assertEquals(0, blobStore.blobCount());
assertThrows(Exception.class, () -> metadataStore.get(s3URI));
}

@SuppressWarnings("unchecked")
@Test
public void test_FailureEvictsObjectsAsExpected_WhenSDKClientGetsStuck() throws IOException {
IOException ioException = new IOException(new IOException("Error while getting block"));

S3AsyncClient mockS3AsyncClient = mock(S3AsyncClient.class);
CompletableFuture<ResponseInputStream<GetObjectResponse>> failedFuture =
new CompletableFuture<>();
failedFuture.completeExceptionally(ioException);
when(mockS3AsyncClient.getObject(
any(GetObjectRequest.class), any(AsyncResponseTransformer.class)))
.thenReturn(failedFuture);
S3SdkObjectClient client = new S3SdkObjectClient(mockS3AsyncClient);

MetadataStore metadataStore =
new MetadataStore(client, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
ObjectMetadata objectMetadata = ObjectMetadata.builder().contentLength(100).etag(etag).build();
metadataStore.storeObjectMetadata(s3URI, objectMetadata);
BlobStore blobStore =
new BlobStore(client, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT);
PhysicalIOImpl physicalIOImplV2 =
new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT);

assertThrows(IOException.class, () -> physicalIOImplV2.read(0));
assertEquals(0, blobStore.blobCount());
assertThrows(Exception.class, () -> metadataStore.get(s3URI));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.s3.analyticsaccelerator.util;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import software.amazon.s3.analyticsaccelerator.request.GetRequest;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;

public class FakeStuckObjectClient extends FakeObjectClient {

/**
* Instantiate a fake Object Client backed by some string as data.
*
* @param data the data making up the object
*/
public FakeStuckObjectClient(String data) {
super(data);
}

@Override
public CompletableFuture<ObjectContent> getObject(
GetRequest getRequest, StreamContext streamContext) {
CompletableFuture<ObjectContent> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new TimeoutException("Request timed out"));
return failedFuture;
}
}

0 comments on commit c616bf3

Please sign in to comment.