Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AWS's S3 client instead of object_store #135

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,177 changes: 800 additions & 377 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ alias pre := pre-commit

# run all tests
test *args='':
AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test {{args}}
cargo test --all {{args}}

# compile but don't run all tests
compile-tests *args='':
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ just test
This is just an alias for

```
AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test
cargo test --all
```

> [!TIP]
Expand Down
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ volumes:
services:
minio:
container_name: icechunk_minio
image: quay.io/minio/minio
image: minio/minio
entrypoint: |
/bin/sh -c '
for bucket in testbucket externalbucket arraylake-repo-bucket
Expand Down
6 changes: 4 additions & 2 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ type KeyRanges = Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>;

impl PyIcechunkStore {
async fn store_exists(storage: StorageConfig) -> PyIcechunkStoreResult<bool> {
let storage =
storage.make_cached_storage().map_err(PyIcechunkStoreError::UnkownError)?;
let storage = storage
.make_cached_storage()
.await
.map_err(PyIcechunkStoreError::UnkownError)?;
let exists = Repository::exists(storage.as_ref()).await?;
Ok(exists)
}
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::PathBuf;

use icechunk::{
storage::{
object_store::{S3Config, S3Credentials},
s3::{S3Config, S3Credentials},
virtual_ref::ObjectStoreVirtualChunkResolverConfig,
},
zarr::StorageConfig,
Expand Down
1 change: 1 addition & 0 deletions icechunk-python/tests/test_distributed_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def test_distributed_writers():
"bucket": "testbucket",
"prefix": "python-distributed-writers-test__" + str(time.time()),
"endpoint_url": "http://localhost:9000",
"region": "us-east-1",
"allow_http": True,
}
store_config = {"inline_chunk_threshold_bytes": 5}
Expand Down
2 changes: 2 additions & 0 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def test_write_minino_virtual_refs():
),
endpoint_url="http://localhost:9000",
allow_http=True,
region="us-east-1",
),
mode="r+",
config=StoreConfig(
Expand All @@ -59,6 +60,7 @@ async def test_write_minino_virtual_refs():
),
endpoint_url="http://localhost:9000",
allow_http=True,
region="us-east-1",
)
),
)
Expand Down
5 changes: 4 additions & 1 deletion icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = { version = "1.7.2", features = ["serde"] }
base64 = "0.22.1"
futures = "0.3.30"
itertools = "0.13.0"
object_store = { version = "0.11.0", features = ["aws"] }
object_store = { version = "0.11.0" }
rand = "0.8.5"
thiserror = "1.0.64"
serde_json = "1.0.128"
Expand All @@ -28,6 +28,9 @@ rmp-serde = "1.3.0"
url = "2.5.2"
async-stream = "0.3.5"
rmpv = { version = "1.3.0", features = ["serde", "with-serde"] }
aws-sdk-s3 = "1.53.0"
aws-config = "1.5.7"
aws-credential-types = "1.2.1"

[dev-dependencies]
pretty_assertions = "1.4.1"
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum RefError {

pub type RefResult<A> = Result<A, RefError>;

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum Ref {
Tag(String),
Branch(String),
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn branch_history<'a, 'b>(
branch: &'b str,
) -> RefResult<impl Stream<Item = RefResult<BranchVersion>> + 'a> {
let key = branch_root(branch)?;
let all = storage.ref_versions(key.as_str()).await;
let all = storage.ref_versions(key.as_str()).await?;
Ok(all.map_err(|e| e.into()).and_then(move |version_id| async move {
let version = version_id
.strip_suffix(".json")
Expand Down
5 changes: 4 additions & 1 deletion icechunk/src/storage/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ impl Storage for MemCachingStorage {
self.backend.write_ref(ref_key, overwrite_refs, bytes).await
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
self.backend.ref_versions(ref_name).await
}
}
Expand Down
5 changes: 4 additions & 1 deletion icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl Storage for LoggingStorage {
self.backend.write_ref(ref_key, overwrite_refs, bytes).await
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
self.backend.ref_versions(ref_name).await
}
}
35 changes: 27 additions & 8 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use aws_sdk_s3::{
config::http::HttpResponse,
error::SdkError,
operation::{
get_object::GetObjectError, list_objects_v2::ListObjectsV2Error,
put_object::PutObjectError,
},
primitives::ByteStreamError,
};
use core::fmt;
use futures::stream::BoxStream;
use std::sync::Arc;
use std::{ffi::OsString, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -12,37 +21,44 @@ pub mod caching;
pub mod logging;

pub mod object_store;
pub mod s3;
pub mod virtual_ref;

pub use caching::MemCachingStorage;
pub use object_store::ObjectStorage;

use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, AttributesId,
ByteRange, ChunkId, ManifestId, Path, SnapshotId,
ByteRange, ChunkId, ManifestId, SnapshotId,
};

#[derive(Debug, Error)]
pub enum StorageError {
#[error("error contacting object store {0}")]
ObjectStore(#[from] ::object_store::Error),
#[error("bad object store prefix {0:?}")]
BadPrefix(OsString),
#[error("error getting object from object store {0}")]
S3GetObjectError(#[from] SdkError<GetObjectError, HttpResponse>),
#[error("error writing object to object store {0}")]
S3PutObjectError(#[from] SdkError<PutObjectError, HttpResponse>),
#[error("error listing objects in object store {0}")]
S3ListObjectError(#[from] SdkError<ListObjectsV2Error, HttpResponse>),
#[error("error streaming bytes from object store {0}")]
S3StreamError(#[from] ByteStreamError),
#[error("messagepack decode error: {0}")]
MsgPackDecodeError(#[from] rmp_serde::decode::Error),
#[error("messagepack encode error: {0}")]
MsgPackEncodeError(#[from] rmp_serde::encode::Error),
#[error("error parsing RecordBatch from parquet file {0}.")]
BadRecordBatchRead(Path),
#[error("cannot overwrite ref: {0}")]
RefAlreadyExists(String),
#[error("ref not found: {0}")]
RefNotFound(String),
#[error("generic storage error: {0}")]
OtherError(#[from] Arc<dyn std::error::Error + Sync + Send>),
#[error("unknown storage error: {0}")]
Other(String),
}

type StorageResult<A> = Result<A, StorageError>;
pub type StorageResult<A> = Result<A, StorageError>;

/// Fetch and write the parquet files that represent the repository in object store
///
Expand Down Expand Up @@ -77,7 +93,10 @@ pub trait Storage: fmt::Debug {

async fn get_ref(&self, ref_key: &str) -> StorageResult<Bytes>;
async fn ref_names(&self) -> StorageResult<Vec<String>>;
async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>>;
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>>;
async fn write_ref(
&self,
ref_key: &str,
Expand Down
96 changes: 10 additions & 86 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ use crate::format::{
};
use async_trait::async_trait;
use bytes::Bytes;
use core::fmt;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{
aws::{AmazonS3Builder, S3ConditionalPut},
local::LocalFileSystem,
memory::InMemory,
path::Path as ObjectPath,
Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectStore, PutMode,
PutOptions, PutPayload,
local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, Attribute,
AttributeValue, Attributes, GetOptions, GetRange, ObjectStore, PutMode, PutOptions,
PutPayload,
};
use serde::{Deserialize, Serialize};
use std::{
fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc,
};
Expand Down Expand Up @@ -61,58 +56,7 @@ const MANIFEST_PREFIX: &str = "manifests/";
const CHUNK_PREFIX: &str = "chunks/";
const REF_PREFIX: &str = "refs";

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Credentials {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Config {
pub region: Option<String>,
pub endpoint: Option<String>,
pub credentials: Option<S3Credentials>,
pub allow_http: Option<bool>,
}

// TODO: Hide this behind a feature flag?
impl S3Config {
pub fn to_builder(&self) -> AmazonS3Builder {
let builder = if let Some(credentials) = &self.credentials {
let builder = AmazonS3Builder::new()
.with_access_key_id(credentials.access_key_id.clone())
.with_secret_access_key(credentials.secret_access_key.clone());

if let Some(token) = &credentials.session_token {
builder.with_token(token.clone())
} else {
builder
}
} else {
AmazonS3Builder::from_env()
};

let builder = if let Some(region) = &self.region {
builder.with_region(region.clone())
} else {
builder
};

let builder = if let Some(endpoint) = &self.endpoint {
builder.with_endpoint(endpoint.clone())
} else {
builder
};

if let Some(allow_http) = self.allow_http {
builder.with_allow_http(allow_http)
} else {
builder
}
}
}

#[derive(Debug)]
pub struct ObjectStorage {
store: Arc<dyn ObjectStore>,
prefix: String,
Expand Down Expand Up @@ -157,24 +101,6 @@ impl ObjectStorage {
})
}

pub fn new_s3_store(
bucket_name: impl Into<String>,
prefix: impl Into<String>,
config: Option<S3Config>,
) -> Result<ObjectStorage, StorageError> {
let config = config.unwrap_or_default();
let builder = config.to_builder();
let builder = builder.with_conditional_put(S3ConditionalPut::ETagMatch);
let store = builder.with_bucket_name(bucket_name.into()).build()?;
Ok(ObjectStorage {
store: Arc::new(store),
prefix: prefix.into(),
artificially_sort_refs_in_mem: false,
supports_create_if_not_exists: true,
supports_metadata: true,
})
}

fn get_path<const SIZE: usize, T: FileTypeTag>(
&self,
file_prefix: &str,
Expand Down Expand Up @@ -225,11 +151,6 @@ impl ObjectStorage {
}
}

impl fmt::Debug for ObjectStorage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ObjectStorage, prefix={}, store={}", self.prefix, self.store)
}
}
#[async_trait]
impl Storage for ObjectStorage {
async fn fetch_snapshot(
Expand Down Expand Up @@ -391,7 +312,10 @@ impl Storage for ObjectStorage {
.collect())
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
let res = self.do_ref_versions(ref_name).await;
if self.artificially_sort_refs_in_mem {
#[allow(clippy::expect_used)]
Expand All @@ -401,9 +325,9 @@ impl Storage for ObjectStorage {
let mut all =
res.try_collect::<Vec<_>>().await.expect("Error fetching ref versions");
all.sort();
futures::stream::iter(all.into_iter().map(Ok)).boxed()
Ok(futures::stream::iter(all.into_iter().map(Ok)).boxed())
} else {
res
Ok(res)
}
}

Expand Down
Loading
Loading