Skip to content

Commit

Permalink
Merge pull request #179 from earth-mover/matt/virtual-refs
Browse files Browse the repository at this point in the history
More virtual refs tests
  • Loading branch information
paraseba authored Oct 10, 2024
2 parents 04ef42e + 7d75f75 commit 44a9031
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 76 deletions.
20 changes: 16 additions & 4 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ class StorageConfig:
with the given bucket and prefix
This assumes that the necessary credentials are available in the environment:
AWS_REGION
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN (optional)
AWS_REGION (optional)
AWS_ENDPOINT_URL (optional)
AWS_ALLOW_HTTP (optional)
"""
Expand Down Expand Up @@ -173,15 +173,15 @@ class VirtualRefConfig:
region: str | None

@classmethod
def s3_from_env(cls) -> StorageConfig:
def s3_from_env(cls) -> VirtualRefConfig:
"""Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage backend
with the given bucket and prefix
This assumes that the necessary credentials are available in the environment:
AWS_REGION or AWS_DEFAULT_REGION
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN (optional)
AWS_REGION (optional)
AWS_ENDPOINT_URL (optional)
AWS_ALLOW_HTTP (optional)
"""
Expand All @@ -194,7 +194,7 @@ class VirtualRefConfig:
endpoint_url: str | None,
allow_http: bool | None = None,
region: str | None = None,
) -> StorageConfig:
) -> VirtualRefConfig:
"""Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
backend with the given bucket, prefix, and configuration
Expand All @@ -203,6 +203,18 @@ class VirtualRefConfig:
"""
...

@classmethod
def s3_anonymous(
cls,
endpoint_url: str | None,
allow_http: bool | None = None,
region: str | None = None,
) -> VirtualRefConfig:
"""Create a VirtualReferenceConfig object for an S3 Object Storage compatible storage
using anonymous access
"""
...

class StoreConfig:
# The number of concurrent requests to make when fetching partial values
get_partial_values_concurrency: int | None
Expand Down
104 changes: 85 additions & 19 deletions icechunk-python/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#![allow(clippy::too_many_arguments)]
// TODO: we only need that allow for PyStorageConfig, but i don't know how to set it

use std::path::PathBuf;

use icechunk::{
storage::{
s3::{S3Config, S3Credentials},
s3::{S3Config, S3Credentials, StaticS3Credentials},
virtual_ref::ObjectStoreVirtualChunkResolverConfig,
},
zarr::StorageConfig,
Expand All @@ -20,9 +23,9 @@ pub struct PyS3Credentials {
session_token: Option<String>,
}

impl From<&PyS3Credentials> for S3Credentials {
impl From<&PyS3Credentials> for StaticS3Credentials {
fn from(credentials: &PyS3Credentials) -> Self {
S3Credentials {
StaticS3Credentials {
access_key_id: credentials.access_key_id.clone(),
secret_access_key: credentials.secret_access_key.clone(),
session_token: credentials.session_token.clone(),
Expand Down Expand Up @@ -53,6 +56,7 @@ pub enum PyStorageConfig {
S3 {
bucket: String,
prefix: String,
anon: bool,
credentials: Option<PyS3Credentials>,
endpoint_url: Option<String>,
allow_http: Option<bool>,
Expand Down Expand Up @@ -84,6 +88,7 @@ impl PyStorageConfig {
PyStorageConfig::S3 {
bucket,
prefix,
anon: false,
credentials: None,
endpoint_url,
allow_http,
Expand All @@ -104,12 +109,44 @@ impl PyStorageConfig {
PyStorageConfig::S3 {
bucket,
prefix,
anon: false,
credentials: Some(credentials),
endpoint_url,
allow_http,
region,
}
}

#[classmethod]
fn s3_anonymous(
_cls: &Bound<'_, PyType>,
bucket: String,
prefix: String,
endpoint_url: Option<String>,
allow_http: Option<bool>,
region: Option<String>,
) -> Self {
PyStorageConfig::S3 {
bucket,
prefix,
anon: true,
credentials: None,
endpoint_url,
allow_http,
region,
}
}
}

fn mk_credentials(config: Option<&PyS3Credentials>, anon: bool) -> S3Credentials {
if anon {
S3Credentials::Anonymous
} else {
match config {
None => S3Credentials::FromEnv,
Some(credentials) => S3Credentials::Static(credentials.into()),
}
}
}

impl From<&PyStorageConfig> for StorageConfig {
Expand All @@ -124,20 +161,25 @@ impl From<&PyStorageConfig> for StorageConfig {
PyStorageConfig::S3 {
bucket,
prefix,
anon,
credentials,
endpoint_url,
allow_http,
region,
} => StorageConfig::S3ObjectStore {
bucket: bucket.clone(),
prefix: prefix.clone(),
config: Some(S3Config {
} => {
let s3_config = S3Config {
region: region.clone(),
credentials: credentials.as_ref().map(S3Credentials::from),
credentials: mk_credentials(credentials.as_ref(), *anon),
endpoint: endpoint_url.clone(),
allow_http: *allow_http,
}),
},
allow_http: allow_http.unwrap_or(false),
};

StorageConfig::S3ObjectStore {
bucket: bucket.clone(),
prefix: prefix.clone(),
config: Some(s3_config),
}
}
}
}
}
Expand All @@ -150,6 +192,7 @@ pub enum PyVirtualRefConfig {
endpoint_url: Option<String>,
allow_http: Option<bool>,
region: Option<String>,
anon: bool,
},
}

Expand All @@ -162,6 +205,7 @@ impl PyVirtualRefConfig {
endpoint_url: None,
allow_http: None,
region: None,
anon: false,
}
}

Expand All @@ -172,27 +216,49 @@ impl PyVirtualRefConfig {
endpoint_url: Option<String>,
allow_http: Option<bool>,
region: Option<String>,
anon: Option<bool>,
) -> Self {
PyVirtualRefConfig::S3 {
credentials: Some(credentials),
endpoint_url,
allow_http,
region,
anon: anon.unwrap_or(false),
}
}

#[classmethod]
fn s3_anonymous(
_cls: &Bound<'_, PyType>,
endpoint_url: Option<String>,
allow_http: Option<bool>,
region: Option<String>,
) -> Self {
PyVirtualRefConfig::S3 {
credentials: None,
endpoint_url,
allow_http,
region,
anon: true,
}
}
}

impl From<&PyVirtualRefConfig> for ObjectStoreVirtualChunkResolverConfig {
fn from(config: &PyVirtualRefConfig) -> Self {
match config {
PyVirtualRefConfig::S3 { credentials, endpoint_url, allow_http, region } => {
ObjectStoreVirtualChunkResolverConfig::S3(S3Config {
region: region.clone(),
endpoint: endpoint_url.clone(),
credentials: credentials.as_ref().map(S3Credentials::from),
allow_http: *allow_http,
})
}
PyVirtualRefConfig::S3 {
credentials,
endpoint_url,
allow_http,
region,
anon,
} => ObjectStoreVirtualChunkResolverConfig::S3(S3Config {
region: region.clone(),
endpoint: endpoint_url.clone(),
credentials: mk_credentials(credentials.as_ref(), *anon),
allow_http: allow_http.unwrap_or(false),
}),
}
}
}
53 changes: 41 additions & 12 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np
import zarr
import zarr.core
import zarr.core.buffer
Expand Down Expand Up @@ -38,19 +39,9 @@ async def test_write_minino_virtual_refs():
]
)

# Open the store, the S3 credentials must be set in environment vars for this to work for now
# Open the store
store = await IcechunkStore.open(
storage=StorageConfig.s3_from_config(
bucket="testbucket",
prefix="python-virtual-ref",
credentials=S3Credentials(
access_key_id="minio123",
secret_access_key="minio123",
),
endpoint_url="http://localhost:9000",
allow_http=True,
region="us-east-1",
),
storage=StorageConfig.memory("virtual"),
mode="r+",
config=StoreConfig(
virtual_ref_config=VirtualRefConfig.s3_from_config(
Expand Down Expand Up @@ -86,3 +77,41 @@ async def test_write_minino_virtual_refs():

assert array[0, 0, 0] == 1936877926
assert array[0, 0, 1] == 1852793701

_snapshot_id = await store.commit("Add virtual refs")


async def test_from_s3_public_virtual_refs(tmpdir):
# Open the store,
store = await IcechunkStore.open(
storage=StorageConfig.filesystem(f'{tmpdir}/virtual'),
mode="w",
config=StoreConfig(
virtual_ref_config=VirtualRefConfig.s3_anonymous(region="us-east-1", allow_http=False)
),
)
root = zarr.Group.from_store(store=store, zarr_format=3)
depth = root.require_array(
name="depth", shape=((22, )), chunk_shape=((22,)), dtype="float64"
)

await store.set_virtual_ref(
"depth/c/0",
"s3://noaa-nos-ofs-pds/dbofs/netcdf/202410/dbofs.t00z.20241009.regulargrid.f030.nc",
offset=42499,
length=176
)

nodes = [n async for n in store.list()]
assert "depth/c/0" in nodes

depth_values = depth[:]
assert len(depth_values) == 22
actual_values = np.array([
0., 1., 2., 4., 6., 8., 10., 12., 15., 20., 25.,
30., 35., 40., 45., 50., 60., 70., 80., 90., 100., 125.
])
assert np.allclose(depth_values, actual_values)



43 changes: 30 additions & 13 deletions icechunk/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,30 @@ pub struct S3Storage {
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Credentials {
pub struct StaticS3Credentials {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(tag = "type")]
pub enum S3Credentials {
#[default]
#[serde(rename = "from_env")]
FromEnv,
#[serde(rename = "anonymous")]
Anonymous,
#[serde(rename = "static")]
Static(StaticS3Credentials),
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct S3Config {
pub region: Option<String>,
pub endpoint: Option<String>,
pub credentials: Option<S3Credentials>,
pub allow_http: Option<bool>,
pub credentials: S3Credentials,
pub allow_http: bool,
}

pub async fn mk_client(config: Option<&S3Config>) -> Client {
Expand All @@ -56,8 +68,9 @@ pub async fn mk_client(config: Option<&S3Config>) -> Client {
.unwrap_or_else(RegionProviderChain::default_provider);

let endpoint = config.and_then(|c| c.endpoint.clone());
let allow_http = config.and_then(|c| c.allow_http).unwrap_or(false);
let credentials = config.and_then(|c| c.credentials.clone());
let allow_http = config.map(|c| c.allow_http).unwrap_or(false);
let credentials =
config.map(|c| c.credentials.clone()).unwrap_or(S3Credentials::FromEnv);
#[allow(clippy::unwrap_used)]
let app_name = AppName::new("icechunk").unwrap();
let mut aws_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
Expand All @@ -68,14 +81,18 @@ pub async fn mk_client(config: Option<&S3Config>) -> Client {
aws_config = aws_config.endpoint_url(endpoint)
}

if let Some(credentials) = credentials {
aws_config = aws_config.credentials_provider(Credentials::new(
credentials.access_key_id,
credentials.secret_access_key,
credentials.session_token,
None,
"user",
));
match credentials {
S3Credentials::FromEnv => {}
S3Credentials::Anonymous => aws_config = aws_config.no_credentials(),
S3Credentials::Static(credentials) => {
aws_config = aws_config.credentials_provider(Credentials::new(
credentials.access_key_id,
credentials.secret_access_key,
credentials.session_token,
None,
"user",
));
}
}

let mut s3_builder = Builder::from(&aws_config.load().await);
Expand Down
Loading

0 comments on commit 44a9031

Please sign in to comment.