Skip to content

Commit

Permalink
Support audit headers in request (#204)
Browse files Browse the repository at this point in the history
## Description of change
Support audit headers in request

#### Relevant issues
<!-- Please add issue numbers. -->
<!-- Please also link them to this PR. -->

#### Does this contribution introduce any breaking changes to the
existing APIs or behaviors?
<!-- Please explain why this was necessary. -->

#### Does this contribution introduce any new public APIs or behaviors?
<!-- Please describe them and explain what scenarios they target.  -->

#### How was the contribution tested?
<!-- Please describe how this contribution was tested. -->

#### Does this contribution need a changelog entry?
- [ ] I have updated the CHANGELOG or README if appropriate

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).
  • Loading branch information
rajdchak authored Jan 9, 2025
1 parent 5b0c145 commit df46146
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,13 @@ public interface ObjectClient extends Closeable {
* @return ResponseInputStream<GetObjectResponse>
*/
CompletableFuture<ObjectContent> getObject(GetRequest getRequest);

/**
* Make a getObject request to the object store.
*
* @param getRequest The GET request to be sent
* @param streamContext audit headers to be attached in the request header
* @return ResponseInputStream<GetObjectResponse>
*/
CompletableFuture<ObjectContent> getObject(GetRequest getRequest, StreamContext streamContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.s3.analyticsaccelerator.request;
/**
* The StreamContext interface provides methods for modifying and building referrer header which
* will then be attached to subsequent HTTP requests.
*/
public interface StreamContext {

/**
* Modifies and builds the referrer header string for a given request context.
*
* <p>Implementation Note: To ensure thread safety, implementations should create and modify a
* copy of the internal state rather than modifying the original object directly. This is crucial
* as multiple threads may be accessing the same StreamContext instance concurrently.
*
* <p>Example implementation:
*
* <pre>
* public class S3AStreamContext implements StreamContext {
* private final HttpReferrerAuditHeader referrer;
*
* public S3AStreamContext(HttpReferrerAuditHeader referrer) {
* this.referrer = referrer;
* }
*
* &#64;Override
* public String modifyAndBuildReferrerHeader(GetRequest getRequestContext) {
* // Create a copy to ensure thread safety
* HttpReferrerAuditHeader copyReferrer = new HttpReferrerAuditHeader(this.referrer);
* copyReferrer.set(AuditConstants.PARAM_RANGE, getRequestContext.getRange().toHttpString());
* return copyReferrer.buildHttpReferrer();
* }
* }
* </pre>
*
* @param getRequestContext the request context for building the referrer header
* @return the modified and built referrer header as a String
*/
public String modifyAndBuildReferrerHeader(GetRequest getRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

Expand Down Expand Up @@ -106,20 +107,37 @@ public S3SeekableInputStream createStream(@NonNull S3URI s3URI, long contentLeng
return new S3SeekableInputStream(s3URI, createLogicalIO(s3URI), telemetry);
}

/**
* Create an instance of S3SeekableInputStream with streamContext.
*
* @param s3URI the object's S3 URI
* @param streamContext contains audit headers to be attached in request header
* @return An instance of the input stream.
*/
public S3SeekableInputStream createStream(@NonNull S3URI s3URI, StreamContext streamContext) {
return new S3SeekableInputStream(s3URI, createLogicalIO(s3URI, streamContext), telemetry);
}

LogicalIO createLogicalIO(S3URI s3URI) {
return createLogicalIO(s3URI, null);
}

LogicalIO createLogicalIO(S3URI s3URI, StreamContext streamContext) {
switch (objectFormatSelector.getObjectFormat(s3URI)) {
case PARQUET:
return new ParquetLogicalIOImpl(
s3URI,
new PhysicalIOImpl(s3URI, objectMetadataStore, objectBlobStore, telemetry),
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);

default:
return new DefaultLogicalIOImpl(
s3URI,
new PhysicalIOImpl(s3URI, objectMetadataStore, objectBlobStore, telemetry),
new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, streamContext),
telemetry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

/** A BlobStore is a container for Blobs and functions as a data cache. */
Expand Down Expand Up @@ -69,16 +70,18 @@ protected boolean removeEldestEntry(final Map.Entry<S3URI, Blob> eldest) {
* Opens a new blob if one does not exist or returns the handle to one that exists already.
*
* @param s3URI the S3 URI of the object
* @param streamContext contains audit headers to be attached in the request header
* @return the blob representing the object from the BlobStore
*/
public Blob get(S3URI s3URI) {
public Blob get(S3URI s3URI, StreamContext streamContext) {
return blobMap.computeIfAbsent(
s3URI,
uri ->
new Blob(
uri,
metadataStore,
new BlockManager(uri, objectClient, metadataStore, telemetry, configuration),
new BlockManager(
uri, objectClient, metadataStore, telemetry, configuration, streamContext),
telemetry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.Referrer;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;
import software.amazon.s3.analyticsaccelerator.util.StreamUtils;
Expand All @@ -51,7 +52,7 @@ public class Block implements Closeable {
private static final String OPERATION_BLOCK_GET_JOIN = "block.get.join";

/**
* Constructs a Block. data.
* Constructs a Block data.
*
* @param s3URI the S3 URI of the object
* @param objectClient the object client to use to interact with the object store
Expand All @@ -69,6 +70,32 @@ public Block(
long end,
long generation,
@NonNull ReadMode readMode) {

this(s3URI, objectClient, telemetry, start, end, generation, readMode, null);
}

/**
* Constructs a Block data.
*
* @param s3URI the S3 URI of the object
* @param objectClient the object client to use to interact with the object store
* @param telemetry an instance of {@link Telemetry} to use
* @param start start of the block
* @param end end of the block
* @param generation generation of the block in a sequential read pattern (should be 0 by default)
* @param readMode read mode describing whether this is a sync or async fetch
* @param streamContext contains audit headers to be attached in the request header
*/
public Block(
@NonNull S3URI s3URI,
@NonNull ObjectClient objectClient,
@NonNull Telemetry telemetry,
long start,
long end,
long generation,
@NonNull ReadMode readMode,
StreamContext streamContext) {

Preconditions.checkArgument(
0 <= generation, "`generation` must be non-negative; was: %s", generation);
Preconditions.checkArgument(0 <= start, "`start` must be non-negative; was: %s", start);
Expand Down Expand Up @@ -97,7 +124,8 @@ public Block(
.s3Uri(this.s3URI)
.range(this.range)
.referrer(new Referrer(range.toHttpString(), readMode))
.build()));
.build(),
streamContext));
this.data = this.source.thenApply(StreamUtils::toByteArray);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

Expand All @@ -44,6 +45,7 @@ public class BlockManager implements Closeable {
private final IOPlanner ioPlanner;
private final PhysicalIOConfiguration configuration;
private final RangeOptimiser rangeOptimiser;
private StreamContext streamContext;

private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available";

Expand All @@ -62,6 +64,26 @@ public BlockManager(
@NonNull MetadataStore metadataStore,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration) {
this(s3URI, objectClient, metadataStore, telemetry, configuration, null);
}

/**
* Constructs a new BlockManager.
*
* @param s3URI the S3 URI of the object
* @param objectClient object client capable of interacting with the underlying object store
* @param telemetry an instance of {@link Telemetry} to use
* @param metadataStore the metadata cache
* @param configuration the physicalIO configuration
* @param streamContext contains audit headers to be attached in the request header
*/
public BlockManager(
@NonNull S3URI s3URI,
@NonNull ObjectClient objectClient,
@NonNull MetadataStore metadataStore,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration,
StreamContext streamContext) {
this.s3URI = s3URI;
this.objectClient = objectClient;
this.metadataStore = metadataStore;
Expand All @@ -72,6 +94,7 @@ public BlockManager(
this.sequentialReadProgression = new SequentialReadProgression(configuration);
this.ioPlanner = new IOPlanner(blockStore);
this.rangeOptimiser = new RangeOptimiser(configuration);
this.streamContext = streamContext;
}

/**
Expand Down Expand Up @@ -178,7 +201,8 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
r.getStart(),
r.getEnd(),
generation,
readMode);
readMode,
streamContext);
blockStore.add(block);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan;
import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

Expand All @@ -35,6 +36,7 @@ public class PhysicalIOImpl implements PhysicalIO {
private final MetadataStore metadataStore;
private final BlobStore blobStore;
private final Telemetry telemetry;
private final StreamContext streamContext;

private final long physicalIOBirth = System.nanoTime();

Expand All @@ -56,10 +58,29 @@ public PhysicalIOImpl(
@NonNull MetadataStore metadataStore,
@NonNull BlobStore blobStore,
@NonNull Telemetry telemetry) {
this(s3URI, metadataStore, blobStore, telemetry, null);
}

/**
* Construct a new instance of PhysicalIOV2.
*
* @param s3URI the S3 URI of the object
* @param metadataStore a metadata cache
* @param blobStore a data cache
* @param telemetry The {@link Telemetry} to use to report measurements.
* @param streamContext contains audit headers to be attached in the request header
*/
public PhysicalIOImpl(
@NonNull S3URI s3URI,
@NonNull MetadataStore metadataStore,
@NonNull BlobStore blobStore,
@NonNull Telemetry telemetry,
StreamContext streamContext) {
this.s3URI = s3URI;
this.metadataStore = metadataStore;
this.blobStore = blobStore;
this.telemetry = telemetry;
this.streamContext = streamContext;
}

/**
Expand Down Expand Up @@ -94,7 +115,7 @@ public int read(long pos) throws IOException {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(s3URI).read(pos));
() -> blobStore.get(s3URI, streamContext).read(pos));
}

/**
Expand Down Expand Up @@ -124,7 +145,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(s3URI).read(buf, off, len, pos));
() -> blobStore.get(s3URI, streamContext).read(buf, off, len, pos));
}

/**
Expand All @@ -151,7 +172,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(s3URI).read(buf, off, len, contentLength - len));
() -> blobStore.get(s3URI, streamContext).read(buf, off, len, contentLength - len));
}

/**
Expand All @@ -172,7 +193,7 @@ public IOPlanExecution execute(IOPlan ioPlan) {
StreamAttributes.physicalIORelativeTimestamp(
System.nanoTime() - physicalIOBirth))
.build(),
() -> blobStore.get(s3URI).execute(ioPlan));
() -> blobStore.get(s3URI, streamContext).execute(ioPlan));
}

private long contentLength() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.request.GetRequest;
import software.amazon.s3.analyticsaccelerator.request.HeadRequest;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.*;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class InMemoryS3SeekableInputStream extends SeekableInputStream {
Expand Down Expand Up @@ -73,6 +69,12 @@ public CompletableFuture<ObjectMetadata> headObject(HeadRequest headRequest) {

@Override
public CompletableFuture<ObjectContent> getObject(GetRequest getRequest) {
return getObject(getRequest, null);
}

@Override
public CompletableFuture<ObjectContent> getObject(
GetRequest getRequest, StreamContext streamContext) {
int start = 0;
int end = size - 1;

Expand Down
Loading

0 comments on commit df46146

Please sign in to comment.