Skip to content

Commit

Permalink
Merge pull request #135 from earth-mover/seba/object_store_out
Browse files Browse the repository at this point in the history
Use AWS's S3 client instead of object_store
  • Loading branch information
paraseba authored Oct 4, 2024
2 parents e3a119c + dc8cc12 commit b93a6b9
Show file tree
Hide file tree
Showing 20 changed files with 1,622 additions and 619 deletions.
1,177 changes: 800 additions & 377 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ alias pre := pre-commit

# run all tests
test *args='':
AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test {{args}}
cargo test --all {{args}}

# compile but don't run all tests
compile-tests *args='':
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ just test
This is just an alias for

```
AWS_ALLOW_HTTP=1 AWS_ENDPOINT_URL=http://localhost:9000 AWS_ACCESS_KEY_ID=minio123 AWS_SECRET_ACCESS_KEY=minio123 cargo test
cargo test --all
```

> [!TIP]
Expand Down
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ volumes:
services:
minio:
container_name: icechunk_minio
image: quay.io/minio/minio
image: minio/minio
entrypoint: |
/bin/sh -c '
for bucket in testbucket externalbucket arraylake-repo-bucket
Expand Down
6 changes: 4 additions & 2 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ type KeyRanges = Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>;

impl PyIcechunkStore {
async fn store_exists(storage: StorageConfig) -> PyIcechunkStoreResult<bool> {
let storage =
storage.make_cached_storage().map_err(PyIcechunkStoreError::UnkownError)?;
let storage = storage
.make_cached_storage()
.await
.map_err(PyIcechunkStoreError::UnkownError)?;
let exists = Repository::exists(storage.as_ref()).await?;
Ok(exists)
}
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::PathBuf;

use icechunk::{
storage::{
object_store::{S3Config, S3Credentials},
s3::{S3Config, S3Credentials},
virtual_ref::ObjectStoreVirtualChunkResolverConfig,
},
zarr::StorageConfig,
Expand Down
1 change: 1 addition & 0 deletions icechunk-python/tests/test_distributed_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def test_distributed_writers():
"bucket": "testbucket",
"prefix": "python-distributed-writers-test__" + str(time.time()),
"endpoint_url": "http://localhost:9000",
"region": "us-east-1",
"allow_http": True,
}
store_config = {"inline_chunk_threshold_bytes": 5}
Expand Down
2 changes: 2 additions & 0 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def test_write_minino_virtual_refs():
),
endpoint_url="http://localhost:9000",
allow_http=True,
region="us-east-1",
),
mode="r+",
config=StoreConfig(
Expand All @@ -59,6 +60,7 @@ async def test_write_minino_virtual_refs():
),
endpoint_url="http://localhost:9000",
allow_http=True,
region="us-east-1",
)
),
)
Expand Down
5 changes: 4 additions & 1 deletion icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = { version = "1.7.2", features = ["serde"] }
base64 = "0.22.1"
futures = "0.3.30"
itertools = "0.13.0"
object_store = { version = "0.11.0", features = ["aws"] }
object_store = { version = "0.11.0" }
rand = "0.8.5"
thiserror = "1.0.64"
serde_json = "1.0.128"
Expand All @@ -28,6 +28,9 @@ rmp-serde = "1.3.0"
url = "2.5.2"
async-stream = "0.3.5"
rmpv = { version = "1.3.0", features = ["serde", "with-serde"] }
aws-sdk-s3 = "1.53.0"
aws-config = "1.5.7"
aws-credential-types = "1.2.1"

[dev-dependencies]
pretty_assertions = "1.4.1"
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum RefError {

pub type RefResult<A> = Result<A, RefError>;

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum Ref {
Tag(String),
Branch(String),
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn branch_history<'a, 'b>(
branch: &'b str,
) -> RefResult<impl Stream<Item = RefResult<BranchVersion>> + 'a> {
let key = branch_root(branch)?;
let all = storage.ref_versions(key.as_str()).await;
let all = storage.ref_versions(key.as_str()).await?;
Ok(all.map_err(|e| e.into()).and_then(move |version_id| async move {
let version = version_id
.strip_suffix(".json")
Expand Down
5 changes: 4 additions & 1 deletion icechunk/src/storage/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ impl Storage for MemCachingStorage {
self.backend.write_ref(ref_key, overwrite_refs, bytes).await
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
self.backend.ref_versions(ref_name).await
}
}
Expand Down
5 changes: 4 additions & 1 deletion icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ impl Storage for LoggingStorage {
self.backend.write_ref(ref_key, overwrite_refs, bytes).await
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
self.backend.ref_versions(ref_name).await
}
}
35 changes: 27 additions & 8 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use aws_sdk_s3::{
config::http::HttpResponse,
error::SdkError,
operation::{
get_object::GetObjectError, list_objects_v2::ListObjectsV2Error,
put_object::PutObjectError,
},
primitives::ByteStreamError,
};
use core::fmt;
use futures::stream::BoxStream;
use std::sync::Arc;
use std::{ffi::OsString, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -12,37 +21,44 @@ pub mod caching;
pub mod logging;

pub mod object_store;
pub mod s3;
pub mod virtual_ref;

pub use caching::MemCachingStorage;
pub use object_store::ObjectStorage;

use crate::format::{
attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, AttributesId,
ByteRange, ChunkId, ManifestId, Path, SnapshotId,
ByteRange, ChunkId, ManifestId, SnapshotId,
};

#[derive(Debug, Error)]
pub enum StorageError {
#[error("error contacting object store {0}")]
ObjectStore(#[from] ::object_store::Error),
#[error("bad object store prefix {0:?}")]
BadPrefix(OsString),
#[error("error getting object from object store {0}")]
S3GetObjectError(#[from] SdkError<GetObjectError, HttpResponse>),
#[error("error writing object to object store {0}")]
S3PutObjectError(#[from] SdkError<PutObjectError, HttpResponse>),
#[error("error listing objects in object store {0}")]
S3ListObjectError(#[from] SdkError<ListObjectsV2Error, HttpResponse>),
#[error("error streaming bytes from object store {0}")]
S3StreamError(#[from] ByteStreamError),
#[error("messagepack decode error: {0}")]
MsgPackDecodeError(#[from] rmp_serde::decode::Error),
#[error("messagepack encode error: {0}")]
MsgPackEncodeError(#[from] rmp_serde::encode::Error),
#[error("error parsing RecordBatch from parquet file {0}.")]
BadRecordBatchRead(Path),
#[error("cannot overwrite ref: {0}")]
RefAlreadyExists(String),
#[error("ref not found: {0}")]
RefNotFound(String),
#[error("generic storage error: {0}")]
OtherError(#[from] Arc<dyn std::error::Error + Sync + Send>),
#[error("unknown storage error: {0}")]
Other(String),
}

type StorageResult<A> = Result<A, StorageError>;
pub type StorageResult<A> = Result<A, StorageError>;

/// Fetch and write the parquet files that represent the repository in object store
///
Expand Down Expand Up @@ -77,7 +93,10 @@ pub trait Storage: fmt::Debug {

async fn get_ref(&self, ref_key: &str) -> StorageResult<Bytes>;
async fn ref_names(&self) -> StorageResult<Vec<String>>;
async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>>;
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>>;
async fn write_ref(
&self,
ref_key: &str,
Expand Down
96 changes: 10 additions & 86 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ use crate::format::{
};
use async_trait::async_trait;
use bytes::Bytes;
use core::fmt;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{
aws::{AmazonS3Builder, S3ConditionalPut},
local::LocalFileSystem,
memory::InMemory,
path::Path as ObjectPath,
Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectStore, PutMode,
PutOptions, PutPayload,
local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, Attribute,
AttributeValue, Attributes, GetOptions, GetRange, ObjectStore, PutMode, PutOptions,
PutPayload,
};
use serde::{Deserialize, Serialize};
use std::{
fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc,
};
Expand Down Expand Up @@ -61,58 +56,7 @@ const MANIFEST_PREFIX: &str = "manifests/";
const CHUNK_PREFIX: &str = "chunks/";
const REF_PREFIX: &str = "refs";

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Credentials {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Config {
pub region: Option<String>,
pub endpoint: Option<String>,
pub credentials: Option<S3Credentials>,
pub allow_http: Option<bool>,
}

// TODO: Hide this behind a feature flag?
impl S3Config {
pub fn to_builder(&self) -> AmazonS3Builder {
let builder = if let Some(credentials) = &self.credentials {
let builder = AmazonS3Builder::new()
.with_access_key_id(credentials.access_key_id.clone())
.with_secret_access_key(credentials.secret_access_key.clone());

if let Some(token) = &credentials.session_token {
builder.with_token(token.clone())
} else {
builder
}
} else {
AmazonS3Builder::from_env()
};

let builder = if let Some(region) = &self.region {
builder.with_region(region.clone())
} else {
builder
};

let builder = if let Some(endpoint) = &self.endpoint {
builder.with_endpoint(endpoint.clone())
} else {
builder
};

if let Some(allow_http) = self.allow_http {
builder.with_allow_http(allow_http)
} else {
builder
}
}
}

#[derive(Debug)]
pub struct ObjectStorage {
store: Arc<dyn ObjectStore>,
prefix: String,
Expand Down Expand Up @@ -157,24 +101,6 @@ impl ObjectStorage {
})
}

pub fn new_s3_store(
bucket_name: impl Into<String>,
prefix: impl Into<String>,
config: Option<S3Config>,
) -> Result<ObjectStorage, StorageError> {
let config = config.unwrap_or_default();
let builder = config.to_builder();
let builder = builder.with_conditional_put(S3ConditionalPut::ETagMatch);
let store = builder.with_bucket_name(bucket_name.into()).build()?;
Ok(ObjectStorage {
store: Arc::new(store),
prefix: prefix.into(),
artificially_sort_refs_in_mem: false,
supports_create_if_not_exists: true,
supports_metadata: true,
})
}

fn get_path<const SIZE: usize, T: FileTypeTag>(
&self,
file_prefix: &str,
Expand Down Expand Up @@ -225,11 +151,6 @@ impl ObjectStorage {
}
}

impl fmt::Debug for ObjectStorage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ObjectStorage, prefix={}, store={}", self.prefix, self.store)
}
}
#[async_trait]
impl Storage for ObjectStorage {
async fn fetch_snapshot(
Expand Down Expand Up @@ -391,7 +312,10 @@ impl Storage for ObjectStorage {
.collect())
}

async fn ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
async fn ref_versions(
&self,
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
let res = self.do_ref_versions(ref_name).await;
if self.artificially_sort_refs_in_mem {
#[allow(clippy::expect_used)]
Expand All @@ -401,9 +325,9 @@ impl Storage for ObjectStorage {
let mut all =
res.try_collect::<Vec<_>>().await.expect("Error fetching ref versions");
all.sort();
futures::stream::iter(all.into_iter().map(Ok)).boxed()
Ok(futures::stream::iter(all.into_iter().map(Ok)).boxed())
} else {
res
Ok(res)
}
}

Expand Down
Loading

0 comments on commit b93a6b9

Please sign in to comment.