From 2b450ba40b3ee8c1f3f828bef0c7c73262bf1b6e Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 15 Jan 2025 12:19:30 -0800 Subject: [PATCH] get SF managed working --- dbt/adapters/snowflake/catalog.py | 54 ++++++++++++++----- dbt/adapters/snowflake/impl.py | 1 + dbt/adapters/snowflake/relation.py | 43 +++++++++------ .../relation_configs/dynamic_table.py | 11 ++-- .../macros/relations/dynamic_table/create.sql | 10 ++-- .../relations/dynamic_table/replace.sql | 2 +- .../macros/relations/table/create.sql | 14 +++-- hatch.toml | 9 ++++ 8 files changed, 100 insertions(+), 44 deletions(-) diff --git a/dbt/adapters/snowflake/catalog.py b/dbt/adapters/snowflake/catalog.py index b342d5bf0..adb67b797 100644 --- a/dbt/adapters/snowflake/catalog.py +++ b/dbt/adapters/snowflake/catalog.py @@ -1,26 +1,28 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Any import textwrap from dbt.adapters.base import BaseRelation from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType from dbt.adapters.contracts.relation import RelationConfig +from dbt.adapters.relation_configs import RelationResults class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.managed - def render_ddl_predicates(self, relation: RelationConfig) -> str: + def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: """ {{ optional('external_volume', dynamic_table.catalog.external_volume) }} {{ optional('catalog', dynamic_table.catalog.name) }} base_location = '{{ dynamic_table.catalog.base_location }}' + :param config: :param relation: :return: """ base_location: str = f"_dbt/{relation.schema}/{relation.name}" - if sub_path := relation.config.get("base_location_subpath"): + if sub_path := config.get("base_location_subpath"): base_location += f"/{sub_path}" iceberg_ddl_predicates: str = f""" @@ -30,11 +32,39 @@ def render_ddl_predicates(self, relation: RelationConfig) -> str: """ return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + import agate + + # this try block can be removed once enable_iceberg_materializations is retired + try: + catalog_results: "agate.Table" = relation_results["catalog"] + except KeyError: + # this happens when `enable_iceberg_materializations` is turned off + return {} + + if len(catalog_results) == 0: + # this happens when the dynamic table is a standard dynamic table (e.g. not iceberg) + return {} + + # for now, if we get catalog results, it's because this is an iceberg table + # this is because we only run `show iceberg tables` to get catalog metadata + # this will need to be updated once this is in `show objects` + catalog: "agate.Row" = catalog_results.rows[0] + config_dict = { + "table_format": "iceberg", + "name": catalog.get("catalog_name"), + "external_volume": catalog.get("external_volume_name"), + "base_location": catalog.get("base_location"), + } + + return config_dict + class SnowflakeGlueCatalogIntegration(CatalogIntegration): catalog_type = CatalogIntegrationType.glue - auto_refresh: str = "FALSE" - replace_invalid_characters: str = "FALSE" + auto_refresh: Optional[str] = None # "TRUE" | "FALSE" + replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE" def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None: if adapter_configs: @@ -43,15 +73,15 @@ def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None: if "replace_invalid_characters" in adapter_configs: self.replace_invalid_characters = adapter_configs["replace_invalid_characters"] - def render_ddl_predicates(self, relation: BaseRelation) -> str: - ddl_predicate = f"""create or replace iceberg table {relation.render()} - external_volume = '{self.external_volume} - catalog = '{self.name}' + def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str: + ddl_predicate = f""" + external_volume = '{self.external_volume}' + catalog = '{self.integration_name}' """ if self.namespace: - ddl_predicate += "CATALOG_NAMESPACE = '{self.namespace}'" + ddl_predicate += f"CATALOG_NAMESPACE = '{self.namespace}'\n" if self.auto_refresh: - ddl_predicate += f"REPLACE_INVALID_CHARACTERS = {self.auto_refresh}" + ddl_predicate += f"auto_refresh = {self.auto_refresh}\n" if self.replace_invalid_characters: - ddl_predicate += f"AUTO_REFRESH = {self.replace_invalid_characters}" + ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n" return ddl_predicate diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index b4e058cd9..b8c4dad6a 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -60,6 +60,7 @@ class SnowflakeConfig(AdapterConfig): table_format: Optional[str] = None external_volume: Optional[str] = None base_location_subpath: Optional[str] = None + catalog_name: Optional[str] = None class SnowflakeAdapter(SQLAdapter): diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 54db21924..909d655bb 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -1,10 +1,9 @@ -import textwrap - from dataclasses import dataclass, field from typing import FrozenSet, Optional, Type, Iterator, Tuple - +from dbt.adapters.clients import catalogs as catalogs_client from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType from dbt.adapters.contracts.relation import ComponentName, RelationConfig from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug from dbt.adapters.relation_configs import ( @@ -12,6 +11,7 @@ RelationConfigChangeAction, RelationResults, ) +from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration from dbt.adapters.utils import classproperty from dbt_common.exceptions import DbtRuntimeError from dbt_common.events.functions import fire_event, warn_or_error @@ -64,6 +64,10 @@ def is_dynamic_table(self) -> bool: @property def is_iceberg_format(self) -> bool: + if self.catalog_name: + return ( + catalogs_client.get_catalog(self.catalog_name).table_format == TableFormat.ICEBERG + ) return self.table_format == TableFormat.ICEBERG @classproperty @@ -167,7 +171,11 @@ def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> """ transient_explicitly_set_true: bool = config.get("transient", False) - + catalog_name = config.get("catalog_name", None) + if catalog_name: + catalog = catalogs_client.get_catalog(catalog_name) + if catalog.table_format == TableFormat.ICEBERG: + return "iceberg" # Temporary tables are a Snowflake feature that do not exist in the # Iceberg framework. We ignore the Iceberg status of the model. if temporary: @@ -203,18 +211,21 @@ def get_ddl_prefix_for_alter(self) -> str: else: return "" - def get_iceberg_ddl_options(self, config: RelationConfig) -> str: - base_location: str = f"_dbt/{self.schema}/{self.name}" - - if subpath := config.get("base_location_subpath"): - base_location += f"/{subpath}" - - iceberg_ddl_predicates: str = f""" - external_volume = '{config.get('external_volume')}' - catalog = 'snowflake' - base_location = '{base_location}' - """ - return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + def add_managed_catalog_integration(self, config: RelationConfig) -> str: + catalog_name = "snowflake_managed" + external_volume = config.get("external_volume") + integration_config = CatalogIntegrationConfig( + catalog_name=catalog_name, + integration_name=catalog_name, + table_format=self.table_format, + catalog_type=CatalogIntegrationType.managed.value, + external_volume=external_volume, + ) + catalogs_client.add_catalog( + SnowflakeManagedIcebergCatalogIntegration(integration_config), + catalog_name=catalog_name, + ) + return catalog_name def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]: drop_view_message: str = ( diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index a54e20ef7..470464b15 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -40,10 +40,11 @@ def default(cls) -> Self: return cls("ON_CREATE") -def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]: - breakpoint() +def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> str: if not catalog_info: - return None + return "SNOWFLAKE" + elif isinstance(catalog_info, str): + return catalog_info elif isinstance(catalog_info, dict): catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info) else: @@ -64,7 +65,7 @@ def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Opt ) return catalog_name else: - return None + return TableFormat.default().value @dataclass(frozen=True, eq=True, unsafe_hash=True) @@ -90,7 +91,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): query: str target_lag: str snowflake_warehouse: str - catalog: Optional[str] = None + catalog: str = "SNOWFLAKE" refresh_mode: Optional[RefreshMode] = RefreshMode.default() initialize: Optional[Initialize] = Initialize.default() diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql index ac77bf556..d846e7ed2 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/create.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/create.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog is not none -%} + {%- if dynamic_table.catalog != 'snowflake' -%} {{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} @@ -70,16 +70,12 @@ -- A valid DDL statement which will result in a new dynamic iceberg table. -#} - {% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%} - - {% if not catalog_integration -%} - {{ raise('Catalog integration is required for iceberg tables') }} - {%- endif -%} + {% set catalog_integration = adapter.get_catalog_integration(relation.catalog) -%} create dynamic iceberg table {{ relation }} target_lag = '{{ dynamic_table.target_lag }}' warehouse = {{ dynamic_table.snowflake_warehouse }} - {{ catalog_integration.render_ddl_predicates(relation) }} + {{ catalog_integration.render_ddl_predicates(relation, config.model.config) }} {{ optional('refresh_mode', dynamic_table.refresh_mode) }} {{ optional('initialize', dynamic_table.initialize) }} as ( diff --git a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql index 4c9b966e7..0a09b32de 100644 --- a/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql +++ b/dbt/include/snowflake/macros/relations/dynamic_table/replace.sql @@ -15,7 +15,7 @@ {%- set dynamic_table = relation.from_config(config.model) -%} - {%- if dynamic_table.catalog is not none -%} + {%- if dynamic_table.catalog != 'SNOWFLAKE' -%} {{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }} {%- else -%} {{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }} diff --git a/dbt/include/snowflake/macros/relations/table/create.sql b/dbt/include/snowflake/macros/relations/table/create.sql index 50bedd78f..b8cf3b4c0 100644 --- a/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt/include/snowflake/macros/relations/table/create.sql @@ -12,6 +12,7 @@ {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} {%- set copy_grants = config.get('copy_grants', default=false) -%} + {%- set catalog_name = config.get('catalog_name', default=none) -%} {%- if cluster_by_keys is not none and cluster_by_keys is string -%} {%- set cluster_by_keys = [cluster_by_keys] -%} @@ -21,18 +22,25 @@ {% else %} {%- set cluster_by_string = none -%} {%- endif -%} + {%- if catalog_name is not none %} + {%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%} + {%- endif -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} create or replace {{ materialization_prefix }} table {{ relation }} - {%- if relation.is_iceberg_format %} + {%- if catalog_integration is not none %} {# Valid DDL in CTAS statements. Plain create statements have a different order. https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table #} - {{ relation.get_iceberg_ddl_options(config.model.config) }} - {%- endif -%} + {{ catalog_integration.render_ddl_predicates(relation=relation, config=config.model.config) }} + {%- elif relation.is_iceberg_format %} + {%- set catalog_name = relation.add_managed_catalog_integration(config.model.config) -%} + {%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%} + {{ catalog_integration.render_ddl_predicates(relation, config.model.config) }} + {%- endif %} {%- set contract_config = config.get('contract') -%} {%- if contract_config.enforced -%} diff --git a/hatch.toml b/hatch.toml index b22b571f0..9ce99fe5d 100644 --- a/hatch.toml +++ b/hatch.toml @@ -36,6 +36,15 @@ docker-dev = [ "docker run --rm -it --name dbt-snowflake-dev -v $(pwd):/opt/code dbt-snowflake-dev", ] +[envs.local] +# TODO: if/when hatch gets support for defining editable dependencies, the +# pre-install commands here and post-install commands in the matrix can be moved +# to the dependencies section +pre-install-commands = [ + "pip install -e ../dbt-adapter", + "pip install -e ../dbt-core/core", + ] + [envs.build] detached = true dependencies = [