diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 99f4426d51..1403822e00 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -17,26 +17,40 @@ tokio.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true +futures.workspace = true +chrono.workspace = true +tracing.workspace = true deltalake-core = { version = "0.24.0", path = "../core", features = [ "datafusion", -]} +] } +deltalake-aws = { version = "0.7.0", path = "../aws", optional = true } +deltalake-azure = { version = "0.7.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.8.0", path = "../gcp", optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" -reqwest-middleware = "0.4.0" +reqwest-middleware = { version = "0.4.0", features = ["json"] } rand = "0.8" -futures = { workspace = true } -chrono = { workspace = true } dashmap = "6" -tracing = { workspace = true } datafusion = { workspace = true, optional = true } datafusion-common = { workspace = true, optional = true } +moka = { version = "0.12", optional = true, features = ["future"] } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tempfile = "3" httpmock = { version = "0.8.0-alpha.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] -default = [] -datafusion = ["dep:datafusion", "datafusion-common"] +default = ["datafusion", "aws"] +aws = ["deltalake-aws"] +azure = ["deltalake-azure"] +gcp = ["deltalake-gcp"] +r2 = ["deltalake-aws"] +datafusion = ["dep:datafusion", "datafusion-common", "deltalake-core/datafusion", "moka"] + +[[example]] +name = "uc_example" +path = "examples/uc_example.rs" +required-features = ["datafusion", "aws"] diff --git a/crates/catalog-unity/examples/uc_example.rs b/crates/catalog-unity/examples/uc_example.rs new file mode 100644 index 0000000000..bd25b2dd0d --- /dev/null +++ b/crates/catalog-unity/examples/uc_example.rs @@ -0,0 +1,46 @@ +use datafusion::prelude::*; +use deltalake_catalog_unity::prelude::*; +use std::error::Error; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let filter = tracing_subscriber::EnvFilter::builder().parse("deltalake_catalog_unity=info")?; + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber)?; + + let uc = UnityCatalogBuilder::from_env().build()?; + + deltalake_aws::register_handlers(None); + + let catalog = UnityCatalogProvider::try_new(Arc::new(uc), "scarman_sandbox").await?; + let ctx = SessionContext::new(); + ctx.register_catalog("scarman_sandbox", Arc::new(catalog)); + + ctx.sql( + "select hdci.city_name, hdci.country_code, hdci.latitude, hdci.longitude from \ + scarman_sandbox.external_data.historical_hourly_imperial hhi \ + join scarman_sandbox.external_data.historical_daily_calendar_imperial hdci on hdci.country_code = hhi.country_code \ + order by city_name \ + limit 50;" + ) + .await? + .show() + .await?; + + ctx.table("scarman_sandbox.external_data.historical_hourly_imperial") + .await? + .select(vec![ + col("city_name"), + col("country_code"), + col("latitude"), + col("longitude"), + ])? + .show_limit(50) + .await?; + + Ok(()) +} diff --git a/crates/catalog-unity/src/client/mock_server.rs b/crates/catalog-unity/src/client/mock_server.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/crates/catalog-unity/src/datafusion.rs b/crates/catalog-unity/src/datafusion.rs index 23339b0b16..c8f9aa1e89 100644 --- a/crates/catalog-unity/src/datafusion.rs +++ b/crates/catalog-unity/src/datafusion.rs @@ -1,21 +1,24 @@ //! Datafusion integration for UnityCatalog -use std::any::Any; -use std::collections::HashMap; -use std::sync::Arc; - +use chrono::prelude::*; use dashmap::DashMap; use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use datafusion_common::DataFusionError; +use futures::FutureExt; +use moka::future::Cache; +use moka::Expiry; +use std::any::Any; +use std::sync::Arc; +use std::time::{Duration, Instant}; use tracing::error; use super::models::{ GetTableResponse, ListCatalogsResponse, ListSchemasResponse, ListTableSummariesResponse, + TableTempCredentialsResponse, TemporaryTableCredentials, }; -use super::{DataCatalogResult, UnityCatalog}; - +use super::{DataCatalogResult, UnityCatalog, UnityCatalogError}; use deltalake_core::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog @@ -27,20 +30,13 @@ pub struct UnityCatalogList { impl UnityCatalogList { /// Create a new instance of [`UnityCatalogList`] - pub async fn try_new( - client: Arc, - storage_options: impl IntoIterator, impl Into)> + Clone, - ) -> DataCatalogResult { + pub async fn try_new(client: Arc) -> DataCatalogResult { let catalogs = match client.list_catalogs().await? { - ListCatalogsResponse::Success { catalogs } => { + ListCatalogsResponse::Success { catalogs, .. } => { let mut providers = Vec::new(); for catalog in catalogs { - let provider = UnityCatalogProvider::try_new( - client.clone(), - &catalog.name, - storage_options.clone(), - ) - .await?; + let provider = + UnityCatalogProvider::try_new(client.clone(), &catalog.name).await?; providers.push((catalog.name, Arc::new(provider) as Arc)); } providers @@ -87,20 +83,15 @@ impl UnityCatalogProvider { pub async fn try_new( client: Arc, catalog_name: impl Into, - storage_options: impl IntoIterator, impl Into)> + Clone, ) -> DataCatalogResult { let catalog_name = catalog_name.into(); let schemas = match client.list_schemas(&catalog_name).await? { ListSchemasResponse::Success { schemas } => { let mut providers = Vec::new(); for schema in schemas { - let provider = UnitySchemaProvider::try_new( - client.clone(), - &catalog_name, - &schema.name, - storage_options.clone(), - ) - .await?; + let provider = + UnitySchemaProvider::try_new(client.clone(), &catalog_name, &schema.name) + .await?; providers.push((schema.name, Arc::new(provider) as Arc)); } providers @@ -127,20 +118,33 @@ impl CatalogProvider for UnityCatalogProvider { } } +struct TokenExpiry; + +impl Expiry for TokenExpiry { + fn expire_after_read( + &self, + _key: &String, + value: &TemporaryTableCredentials, + _read_at: Instant, + _duration_until_expiry: Option, + _last_modified_at: Instant, + ) -> Option { + let time_to_expire = value.expiration_time - Utc::now(); + tracing::info!("Token {} expires in {}", _key, time_to_expire); + time_to_expire.to_std().ok() + } +} + /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog #[derive(Debug)] pub struct UnitySchemaProvider { - /// UnityCatalog Api client client: Arc, - catalog_name: String, - schema_name: String, /// Parent catalog for schemas of interest. table_names: Vec, - - storage_options: HashMap, + token_cache: Cache, } impl UnitySchemaProvider { @@ -149,7 +153,6 @@ impl UnitySchemaProvider { client: Arc, catalog_name: impl Into, schema_name: impl Into, - storage_options: impl IntoIterator, impl Into)>, ) -> DataCatalogResult { let catalog_name = catalog_name.into(); let schema_name = schema_name.into(); @@ -163,17 +166,37 @@ impl UnitySchemaProvider { .collect(), ListTableSummariesResponse::Error(_) => vec![], }; + let token_cache = Cache::builder().expire_after(TokenExpiry).build(); Ok(Self { client, table_names, catalog_name, schema_name, - storage_options: storage_options - .into_iter() - .map(|(key, value)| (key.into(), value.into())) - .collect(), + token_cache, }) } + + async fn get_creds( + &self, + catalog: &str, + schema: &str, + table: &str, + ) -> Result { + tracing::debug!( + "Fetching new credential for: {}.{}.{}", + catalog, + schema, + table + ); + self.client + .get_temp_table_credentials(catalog, schema, table) + .map(|resp| match resp { + Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds), + Ok(TableTempCredentialsResponse::Error(err)) => Err(err.into()), + Err(err) => Err(err.into()), + }) + .await + } } #[async_trait::async_trait] @@ -195,8 +218,20 @@ impl SchemaProvider for UnitySchemaProvider { match maybe_table { GetTableResponse::Success(table) => { + let temp_creds = self + .token_cache + .try_get_with( + table.table_id, + self.get_creds(&self.catalog_name, &self.schema_name, name), + ) + .await + .map_err(|err| DataFusionError::External(err.into()))?; + + let new_storage_opts = temp_creds.get_credentials().ok_or_else(|| { + DataFusionError::External(UnityCatalogError::MissingCredential.into()) + })?; let table = DeltaTableBuilder::from_uri(table.storage_location) - .with_storage_options(self.storage_options.clone()) + .with_storage_options(new_storage_opts) .load() .await?; Ok(Some(Arc::new(table))) diff --git a/crates/catalog-unity/src/error.rs b/crates/catalog-unity/src/error.rs deleted file mode 100644 index a13c3dc401..0000000000 --- a/crates/catalog-unity/src/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -#[derive(thiserror::Error, Debug)] -pub enum UnityCatalogError { - /// A generic error qualified in the message - #[error("Error in {catalog} catalog: {source}")] - Generic { - /// Name of the catalog - catalog: &'static str, - /// Error message - source: Box, - }, - - /// A generic error qualified in the message - #[error("{source}")] - Retry { - /// Error message - #[from] - source: crate::client::retry::RetryError, - }, - - #[error("Request error: {source}")] - /// Error from reqwest library - RequestError { - /// The underlying reqwest_middleware::Error - #[from] - source: reqwest::Error, - }, - - /// Error caused by missing environment variable for Unity Catalog. - #[error("Missing Unity Catalog environment variable: {var_name}")] - MissingEnvVar { - /// Variable name - var_name: String, - }, - - /// Error caused by invalid access token value - #[error("Invalid Databricks personal access token")] - InvalidAccessToken, -} diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index ff2ed2eaef..740195a6c7 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -1,12 +1,19 @@ +#![warn(clippy::all)] +#![warn(rust_2018_idioms)] //! Databricks Unity Catalog. -use std::str::FromStr; +#[cfg(not(any(feature = "aws", feature = "azure", feature = "gcp", feature = "r2")))] +compile_error!( + "At least one of the following crate features `aws`, `azure`, `gcp`, or `r2` must be enabled \ + for this crate to function properly." +); use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION}; +use std::str::FromStr; use crate::credential::{AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider}; use crate::models::{ - GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse, - ListTableSummariesResponse, + ErrorResponse, GetSchemaResponse, GetTableResponse, ListCatalogsResponse, ListSchemasResponse, + ListTableSummariesResponse, TableTempCredentialsResponse, TemporaryTableCredentialsRequest, }; use deltalake_core::data_catalog::DataCatalogResult; @@ -19,8 +26,8 @@ pub mod client; pub mod credential; #[cfg(feature = "datafusion")] pub mod datafusion; -pub mod error; pub mod models; +pub mod prelude; /// Possible errors from the unity-catalog/tables API call #[derive(thiserror::Error, Debug)] @@ -71,6 +78,19 @@ pub enum UnityCatalogError { #[error("Missing or corrupted federated token file for WorkloadIdentity.")] FederatedTokenFile, + + #[cfg(feature = "datafusion")] + #[error("Datafusion error: {0}")] + DatafusionError(#[from] datafusion_common::DataFusionError), +} + +impl From for UnityCatalogError { + fn from(value: ErrorResponse) -> Self { + UnityCatalogError::InvalidTable { + error_code: value.error_code, + message: value.message, + } + } } impl From for DataCatalogError { @@ -187,9 +207,10 @@ impl FromStr for UnityCatalogConfigKey { #[allow(deprecated)] fn from_str(s: &str) -> Result { match s { - "access_token" | "unity_access_token" | "databricks_access_token" => { - Ok(UnityCatalogConfigKey::AccessToken) - } + "access_token" + | "unity_access_token" + | "databricks_access_token" + | "databricks_token" => Ok(UnityCatalogConfigKey::AccessToken), "authority_host" | "unity_authority_host" | "databricks_authority_host" => { Ok(UnityCatalogConfigKey::AuthorityHost) } @@ -346,6 +367,7 @@ impl UnityCatalogBuilder { for (os_key, os_value) in std::env::vars_os() { if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { if key.starts_with("UNITY_") || key.starts_with("DATABRICKS_") { + tracing::debug!("Found relevant env: {}", key); if let Ok(config_key) = UnityCatalogConfigKey::from_str(&key.to_ascii_lowercase()) { @@ -618,6 +640,39 @@ impl UnityCatalog { Ok(resp.json().await?) } + + pub async fn get_temp_table_credentials( + &self, + catalog_id: impl AsRef, + database_name: impl AsRef, + table_name: impl AsRef, + ) -> Result { + let token = self.get_credential().await?; + let table_info = self + .get_table(catalog_id, database_name, table_name) + .await?; + let response = match table_info { + GetTableResponse::Success(table) => { + let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ"); + Ok(self + .client + .post(format!( + "{}/temporary-table-credentials", + self.catalog_url() + )) + .header(AUTHORIZATION, token) + .json(&request) + .send() + .await?) + } + GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable { + error_code: err.error_code, + message: err.message, + }), + }?; + + Ok(response.json().await?) + } } #[async_trait::async_trait] @@ -711,8 +766,10 @@ mod tests { let get_table_response = client .get_table("catalog_name", "schema_name", "table_name") - .await - .unwrap(); - assert!(matches!(get_table_response, GetTableResponse::Success(_))); + .await; + assert!(matches!( + get_table_response.unwrap(), + GetTableResponse::Success(_) + )); } } diff --git a/crates/catalog-unity/src/models.rs b/crates/catalog-unity/src/models.rs index 2066a4ee86..98be7481e7 100644 --- a/crates/catalog-unity/src/models.rs +++ b/crates/catalog-unity/src/models.rs @@ -1,18 +1,34 @@ //! Api models for databricks unity catalog APIs - -use core::fmt; +use chrono::serde::*; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; - -use serde::Deserialize; +use std::fmt; +use std::sync::Once; /// Error response from unity API -#[derive(Debug, Deserialize)] +#[derive(Deserialize, Debug)] pub struct ErrorResponse { /// The error code pub error_code: String, /// The error message pub message: String, + #[serde(default)] + pub details: Vec, +} + +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +pub struct ErrorDetails { + #[serde(rename = "@type")] + tpe: String, + reason: String, + domain: String, + metadata: HashMap, + request_id: String, + serving_data: String, } + impl fmt::Display for ErrorResponse { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "[{}] {}", self.error_code, self.message) @@ -21,20 +37,21 @@ impl fmt::Display for ErrorResponse { impl std::error::Error for ErrorResponse {} /// List catalogs response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListCatalogsResponse { /// Successful response Success { /// The schemas within the parent catalog catalogs: Vec, + next_page_token: Option, }, /// Error response Error(ErrorResponse), } /// List schemas response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListSchemasResponse { /// Successful response @@ -47,7 +64,7 @@ pub enum ListSchemasResponse { } /// Get table response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum GetTableResponse { /// Successful response @@ -57,7 +74,7 @@ pub enum GetTableResponse { } /// List schemas response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum GetSchemaResponse { /// Successful response @@ -67,7 +84,7 @@ pub enum GetSchemaResponse { } /// List table summaries response -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(untagged)] pub enum ListTableSummariesResponse { /// Successful response @@ -82,18 +99,25 @@ pub enum ListTableSummariesResponse { Error(ErrorResponse), } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Debug)] +#[serde(untagged)] +pub enum TableTempCredentialsResponse { + Success(TemporaryTableCredentials), + Error(ErrorResponse), +} + +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Whether the current securable is accessible from all workspaces or a specific set of workspaces. -pub enum IsomationMode { +pub enum IsolationMode { #[default] Undefined, Open, Isolated, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// The type of the catalog. @@ -106,67 +130,63 @@ pub enum CatalogType { } /// A catalog within a metastore -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] +#[serde(default)] pub struct Catalog { - /// Username of schema creator. - #[serde(default)] pub created_by: String, - - /// Name of schema, relative to parent catalog. pub name: String, - - /// Username of user who last modified schema. - #[serde(default)] pub updated_by: String, - - #[serde(default)] - /// Whether the current securable is accessible from all workspaces or a specific set of workspaces. - pub isolation_mode: IsomationMode, - - #[serde(default)] - /// The type of the catalog. + pub isolation_mode: IsolationMode, pub catalog_type: CatalogType, - - /// Storage root URL for managed tables within catalog. pub storage_root: String, - - /// The name of delta sharing provider. - /// - /// A Delta Sharing catalog is a catalog that is based on a Delta share on a remote sharing server. - pub provider_name: Option, - - /// Storage Location URL (full path) for managed tables within catalog. + pub provider_name: String, pub storage_location: String, - - /// A map of key-value properties attached to the securable. - #[serde(default)] pub properties: HashMap, - - /// The name of the share under the share provider. - pub share_name: Option, - - /// User-provided free-form text description. - #[serde(default)] + pub share_name: String, pub comment: String, - - /// Time at which this schema was created, in epoch milliseconds. - #[serde(default)] pub created_at: i64, - - /// Username of current owner of schema. - #[serde(default)] pub owner: String, - - /// Time at which this schema was created, in epoch milliseconds. - #[serde(default)] pub updated_at: i64, - - /// Unique identifier of parent metastore. pub metastore_id: String, + pub enabled_predictive_optimization: String, + pub effective_predictive_optimization_flag: EffectivePredictiveOptimizationFlag, + pub connection_name: String, + pub full_name: String, + pub options: HashMap, + pub securable_type: String, + pub provisioning_info: ProvisioningInfo, + pub browse_only: Option, + pub accessible_in_current_workspace: bool, + pub id: String, + pub securable_kind: String, + pub delta_sharing_valid_through_timestamp: u64, +} + +#[allow(unused)] +#[derive(Deserialize, Default, Debug)] +pub struct ProvisioningInfo { + state: ProvisioningState, +} + +#[derive(Deserialize, Debug, Default)] +pub enum ProvisioningState { + #[default] + Provisioning, + Active, + Failed, + Deleting, + Updating, +} + +#[derive(Deserialize, Default, Debug)] +pub struct EffectivePredictiveOptimizationFlag { + pub value: String, + pub inherited_from_type: String, + pub inherited_from_name: String, } /// A schema within a catalog -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] pub struct Schema { /// Username of schema creator. #[serde(default)] @@ -220,10 +240,11 @@ pub struct Schema { pub metastore_id: String, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Possible data source formats for unity tables +#[derive(Clone, PartialEq)] pub enum DataSourceFormat { #[default] Undefined, @@ -236,12 +257,27 @@ pub enum DataSourceFormat { Text, UnityCatalog, Deltasharing, + DatabricksFormat, + MySQLFormat, + PostgreSQLFormat, + RedshiftFormat, + SnowflakeFormat, + SQLDWFormat, + SQLServerFormat, + SalesForceFormat, + BigQueryFormat, + NetSuiteFormat, + WorkdayRAASFormat, + HiveSerde, + HiveCustom, + VectorIndexFormat, } -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[allow(missing_docs)] /// Possible data source formats for unity tables +#[derive(PartialEq, Clone)] pub enum TableType { #[default] Undefined, @@ -252,7 +288,7 @@ pub enum TableType { StreamingTable, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] /// Summary of the table pub struct TableSummary { /// The full name of the table. @@ -262,37 +298,273 @@ pub struct TableSummary { } /// A table within a schema -#[derive(Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Default, Deserialize)] pub struct Table { - /// Username of table creator. - #[serde(default)] - pub created_by: String, + pub name: String, + /// Name of parent catalog. + pub catalog_name: String, + /// Name of parent schema relative to its parent catalog. + pub schema_name: String, + pub table_type: TableType, + pub data_source_format: DataSourceFormat, + /// The array of __ColumnInfo__ definitions of the table's columns. + pub columns: Vec, + /// Storage root URL for table (for **MANAGED**, **EXTERNAL** tables) + pub storage_location: String, + /// User-provided free-form text description. + #[serde(skip_serializing_if = "Option::is_none")] + pub comment: Option, + /// A map of key-value properties attached to the securable. + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, + /// Time at which this table was created, in epoch milliseconds. + #[serde(with = "ts_milliseconds")] + pub created_at: DateTime, + /// Time at which this table was last modified, in epoch milliseconds. + #[serde(with = "ts_milliseconds")] + pub updated_at: DateTime, + /// Unique identifier for the table. + pub table_id: String, +} - /// Name of table, relative to parent schema. +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct ColumnInfo { + /// Name of Column. pub name: String, + /// Full data type specification as SQL/catalogString text. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_text: Option, + /// Full data type specification, JSON-serialized. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_json: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub type_name: Option, + /// Digits of precision; required for DecimalTypes. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_precision: Option, + /// Digits to right of decimal; Required for DecimalTypes. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_scale: Option, + /// Format of IntervalType. + #[serde(skip_serializing_if = "Option::is_none")] + pub type_interval_type: Option, + /// Ordinal position of column (starting at position 0). + pub position: u32, + /// User-provided free-form text description. + #[serde(skip_serializing_if = "Option::is_none")] + pub comment: Option, + /// Whether field may be Null. + pub nullable: bool, + /// Partition index for column. + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_index: Option, +} - /// Username of user who last modified the table. - #[serde(default)] - pub updated_by: String, +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ColumnTypeName { + Boolean, + Byte, + Short, + Int, + Long, + Float, + Double, + Date, + Timestamp, + TimestampNtz, + String, + Binary, + Decimal, + Interval, + Array, + Struct, + Map, + Char, + Null, + UserDefinedType, + TableType, +} - /// List of schemes whose objects can be referenced without qualification. - #[serde(default)] - pub sql_path: String, +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +pub struct DeltaRuntimeProperties { + pub delta_runtime_properties: HashMap, +} - /// Data source format - pub data_source_format: DataSourceFormat, +#[derive(Deserialize, Debug, Clone)] +pub struct TemporaryTableCredentials { + pub aws_temp_credentials: Option, + pub azure_user_delegation_sas: Option, + pub gcp_oauth_token: Option, + pub r2_temp_credentials: Option, + #[serde(with = "chrono::serde::ts_milliseconds")] + pub expiration_time: DateTime, + pub url: String, +} - /// Full name of table, in form of catalog_name.schema_name.table_name - pub full_name: String, +#[cfg(any(feature = "aws", feature = "r2"))] +static INIT_AWS: Once = Once::new(); +#[cfg(feature = "azure")] +static INIT_AZURE: Once = Once::new(); +#[cfg(feature = "gcp")] +static INIT_GCP: Once = Once::new(); + +impl TemporaryTableCredentials { + #[cfg(feature = "aws")] + pub fn get_aws_credentials(&self) -> Option> { + INIT_AWS.call_once(|| deltalake_aws::register_handlers(None)); + self.aws_temp_credentials.clone().map(Into::into) + } - /// Name of parent schema relative to its parent catalog. - pub schema_name: String, + #[cfg(not(feature = "aws"))] + pub fn get_aws_credentials(&self) -> Option> { + tracing::warn!("AWS Credentials found, but the feature is not enabled."); + None + } - /// Storage root URL for table (for MANAGED, EXTERNAL tables) - pub storage_location: String, + #[cfg(feature = "azure")] + pub fn get_azure_credentials(&self) -> Option> { + INIT_AZURE.call_once(|| deltalake_azure::register_handlers(None)); + self.azure_user_delegation_sas.clone().map(Into::into) + } - /// Unique identifier of parent metastore. - pub metastore_id: String, + #[cfg(not(feature = "azure"))] + pub fn get_azure_credentials(&self) -> Option> { + tracing::warn!("Azure credentials found, but the feature is not enabled."); + None + } + + #[cfg(feature = "gcp")] + pub fn get_gcp_credentials(&self) -> Option> { + INIT_GCP.call_once(|| deltalake_gcp::register_handlers(None)); + self.gcp_oauth_token.clone().map(Into::into) + } + + #[cfg(not(feature = "gcp"))] + pub fn get_gcp_credentials(&self) -> Option> { + tracing::warn!("GCP credentials found, but the feature is not enabled."); + None + } + + #[cfg(feature = "r2")] + pub fn get_r2_credentials(&self) -> Option> { + INIT_AWS.call_once(|| deltalake_aws::register_handlers(None)); + self.r2_temp_credentials.clone().map(Into::into) + } + + #[cfg(not(feature = "r2"))] + pub fn get_r2_credentials(&self) -> Option> { + tracing::warn!("r2 credentials found, but feature is not enabled."); + None + } + + pub fn get_credentials(self) -> Option> { + self.get_aws_credentials() + .or(self.get_azure_credentials()) + .or(self.get_gcp_credentials()) + .or(self.get_r2_credentials()) + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AwsTempCredentials { + pub access_key_id: String, + pub secret_access_key: String, + pub session_token: Option, + pub access_point: Option, +} + +#[cfg(feature = "aws")] +impl From for HashMap { + fn from(value: AwsTempCredentials) -> Self { + let mut result = HashMap::from_iter([ + ( + deltalake_aws::constants::AWS_ACCESS_KEY_ID.to_string(), + value.access_key_id, + ), + ( + deltalake_aws::constants::AWS_SECRET_ACCESS_KEY.to_string(), + value.secret_access_key, + ), + ]); + if let Some(st) = value.session_token { + result.insert(deltalake_aws::constants::AWS_SESSION_TOKEN.to_string(), st); + } + if let Some(ap) = value.access_point { + result.insert(deltalake_aws::constants::AWS_ENDPOINT_URL.to_string(), ap); + } + result + } +} + +#[cfg(feature = "azure")] +impl From for HashMap { + fn from(value: AzureUserDelegationSas) -> Self { + HashMap::from_iter([("azure_storage_sas_key".to_string(), value.sas_token)]) + } +} + +#[cfg(feature = "gcp")] +impl From for HashMap { + fn from(value: GcpOauthToken) -> Self { + HashMap::from_iter([( + "google_application_credentials".to_string(), + value.oauth_token, + )]) + } +} + +#[cfg(feature = "r2")] +impl From for HashMap { + fn from(value: R2TempCredentials) -> Self { + HashMap::from_iter([ + ( + deltalake_aws::constants::AWS_ACCESS_KEY_ID.to_string(), + value.access_key_id, + ), + ( + deltalake_aws::constants::AWS_SECRET_ACCESS_KEY.to_string(), + value.secret_access_key, + ), + ( + deltalake_aws::constants::AWS_SESSION_TOKEN.to_string(), + value.session_token, + ), + ]) + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct AzureUserDelegationSas { + pub sas_token: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct GcpOauthToken { + pub oauth_token: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct R2TempCredentials { + pub access_key_id: String, + pub secret_access_key: String, + pub session_token: String, +} + +#[derive(Serialize, Debug, Clone)] +pub struct TemporaryTableCredentialsRequest { + pub table_id: String, + pub operation: String, +} + +impl TemporaryTableCredentialsRequest { + pub fn new(table_id: &str, operation: &str) -> Self { + Self { + table_id: table_id.to_string(), + operation: operation.to_string(), + } + } } #[cfg(test)] @@ -302,7 +574,8 @@ pub(crate) mod tests { pub(crate) const ERROR_RESPONSE: &str = r#" { "error_code": "404", - "message": "error message" + "message": "error message", + "details": [] } "#; @@ -316,6 +589,7 @@ pub(crate) mod tests { "full_name": "string", "catalog_type": "string", "catalog_name": "string", + "schema_name": "string", "storage_root": "string", "storage_location": "string", "properties": { @@ -326,7 +600,8 @@ pub(crate) mod tests { "created_at": 0, "owner": "string", "updated_at": 0, - "metastore_id": "string" + "metastore_id": "string", + "table_id": "string" } ] }"#; @@ -353,21 +628,55 @@ pub(crate) mod tests { }"#; pub(crate) const GET_TABLE_RESPONSE: &str = r#" - { - "created_by": "string", - "name": "table_name", - "updated_by": "string", - "sql_path": "string", - "data_source_format": "DELTA", - "full_name": "string", - "delta_runtime_properties_kvpairs": { - "delta_runtime_properties": { + { + "name": "string", + "catalog_name": "string", + "schema_name": "string", + "table_type": "MANAGED", + "data_source_format": "DELTA", + "columns": [ + { + "name": "string", + "type_text": "string", + "type_name": "BOOLEAN", + "position": 0, + "type_precision": 0, + "type_scale": 0, + "type_interval_type": "string", + "type_json": "string", + "comment": "string", + "nullable": true, + "partition_index": 0, + "mask": { + "function_name": "string", + "using_column_names": [ + "string" + ] + } + } + ], + "storage_location": "string", + "view_definition": "string", + "view_dependencies": { + "dependencies": [ + { + "table": { + "table_full_name": "string" + }, + "function": { + "function_full_name": "string" + } + } + ] + }, + "sql_path": "string", + "owner": "string", + "comment": "string", + "properties": { "property1": "string", "property2": "string" - } - }, - "catalog_name": "string", - "table_constraints": { + }, + "storage_credential_name": "string", "table_constraints": [ { "primary_key_constraint": { @@ -390,63 +699,38 @@ pub(crate) mod tests { "name": "string" } } - ] - }, - "schema_name": "string", - "storage_location": "string", - "properties": { - "property1": "string", - "property2": "string" - }, - "columns": [ - { - "nullable": "true", - "name": "string", - "type_interval_type": "string", - "mask": { - "function_name": "string", - "using_column_names": [ - "string" - ] - }, - "type_scale": 0, - "type_text": "string", - "comment": "string", - "partition_index": 0, - "type_json": "string", - "position": 0, - "type_name": "BOOLEAN", - "type_precision": 0 - } - ], - "comment": "string", - "table_id": "string", - "table_type": "MANAGED", - "created_at": 0, - "row_filter": { - "name": "string", - "input_column_names": [ - "string" - ] - }, - "owner": "string", - "storage_credential_name": "string", - "updated_at": 0, - "view_definition": "string", - "view_dependencies": [ - { - "table": { - "table_full_name": "string" - }, - "function": { - "function_full_name": "string" + ], + "row_filter": { + "function_name": "string", + "input_column_names": [ + "string" + ] + }, + "enable_predictive_optimization": "DISABLE", + "metastore_id": "string", + "full_name": "string", + "data_access_configuration_id": "string", + "created_at": 0, + "created_by": "string", + "updated_at": 0, + "updated_by": "string", + "deleted_at": 0, + "table_id": "string", + "delta_runtime_properties_kvpairs": { + "delta_runtime_properties": { + "property1": "string", + "property2": "string" } - } - ], - "data_access_configuration_id": "string", - "deleted_at": 0, - "metastore_id": "string" - } + }, + "effective_predictive_optimization_flag": { + "value": "DISABLE", + "inherited_from_type": "CATALOG", + "inherited_from_name": "string" + }, + "access_point": "string", + "pipeline_id": "string", + "browse_only": true + } "#; pub(crate) const LIST_TABLES: &str = r#" @@ -507,6 +791,7 @@ pub(crate) mod tests { let get_table: Result = serde_json::from_str(ERROR_RESPONSE); assert!(get_table.is_ok()); + dbg!(&get_table); assert!(matches!(get_table.unwrap(), GetTableResponse::Error(_))) } } diff --git a/crates/catalog-unity/src/prelude.rs b/crates/catalog-unity/src/prelude.rs new file mode 100644 index 0000000000..48767b2331 --- /dev/null +++ b/crates/catalog-unity/src/prelude.rs @@ -0,0 +1,4 @@ +pub use crate::{UnityCatalog, UnityCatalogBuilder, UnityCatalogConfigKey, UnityCatalogError}; + +#[cfg(feature = "datafusion")] +pub use crate::datafusion::{UnityCatalogList, UnityCatalogProvider, UnitySchemaProvider}; diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index d312668f6c..e6646edb9c 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -28,7 +28,7 @@ pub struct AddColumnBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for AddColumnBuilder { +impl Operation<()> for AddColumnBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 5631079269..77a9fae20b 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -404,7 +404,7 @@ pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { })?; Url::from_directory_path(path).map_err(|_| { let msg = format!( - "Could not construct a URL from canonicalized path: {}.\n\ + "Could not construct a URL from the canonical path: {}.\n\ Something must be very wrong with the table path.", table_uri ); diff --git a/crates/gcp/src/config.rs b/crates/gcp/src/config.rs index b75e84e624..4ab4c482fa 100644 --- a/crates/gcp/src/config.rs +++ b/crates/gcp/src/config.rs @@ -1,7 +1,7 @@ -//! Auxiliary module for generating a valig Google cloud configuration. +//! Auxiliary module for generating a valid Google cloud configuration. //! //! Google offers few ways to authenticate against storage accounts and -//! provide credentials for a service principal. Some of this configutaion may +//! provide credentials for a service principal. Some of this configuration may //! partially be specified in the environment. This module establishes a structured //! way how we discover valid credentials and some heuristics on how they are prioritized. use std::collections::{hash_map::Entry, HashMap}; @@ -42,7 +42,7 @@ impl GcpCredential { /// Helper struct to create full configuration from passed options and environment /// -/// Main concern is to pick the desired credential for connecting to starage backend +/// Main concern is to pick the desired credential for connecting to storage backend /// based on a provided configuration and configuration set in the environment. pub(crate) struct GcpConfigHelper { config: HashMap,