Skip to content

Commit

Permalink
Implement match_by_column_name and include_metadata for PIPE; Improve…
Browse files Browse the repository at this point in the history
… grant name parsing for various edge cases
  • Loading branch information
littleK0i committed Jul 8, 2024
1 parent 549fcc8 commit 06d2d4a
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [0.29.1] - 2024-07-08

- Implemented parameters `match_by_column_name` and `include_metadata` for `PIPE` object type.
- Adjusted grant name parsing logic to extract arguments only from object types which support overloading.
- Included currently unknown data types to graceful warning logic for non-conforming identifiers. It should prevent SnowDDL from terminating with exception in case of encountering manually created `FUNCTION` or `PROCEDURE` with data type like `VECTOR` or `MAP`.

## [0.29.0] - 2024-06-12

- Implemented `AGGREGATION_POLICY`, `PROJECTION_POLICY` object types.
Expand Down
4 changes: 2 additions & 2 deletions snowddl/blueprint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
from .ident_builder import (
build_schema_object_ident,
build_role_ident,
build_grant_name_ident_snowflake,
build_future_grant_name_ident_snowflake,
build_grant_name_ident,
build_future_grant_name_ident,
build_default_namespace_ident,
)

Expand Down
2 changes: 2 additions & 0 deletions snowddl/blueprint/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class PipeBlueprint(SchemaObjectBlueprint):
copy_pattern: Optional[str] = None
copy_transform: Optional[Dict[str, str]] = None
copy_file_format: Optional[SchemaObjectIdent] = None
copy_match_by_column_name: Optional[str] = None
copy_include_metadata: Optional[Dict[Ident,Ident]] = None
copy_options: Optional[Dict[str, Union[bool, float, int, str, list]]] = None
aws_sns_topic: Optional[str] = None
integration: Optional[Ident] = None
Expand Down
17 changes: 9 additions & 8 deletions snowddl/blueprint/ident_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,25 @@ def build_role_ident(env_prefix, *args: Union[AccountObjectIdent, str]) -> Accou
)


def build_grant_name_ident_snowflake(object_type: ObjectType, grant_name: str):
def build_grant_name_ident(object_type: ObjectType, grant_name: str):
env_prefix = ""

parts = [p.strip('"') for p in grant_name.split(".")]
last_part = parts[-1]

if len(parts) == 3:
# Extract data types for arguments of functions and procedures
if "(" in last_part:
# Extract data types from arguments of FUNCTION or PROCEDURE
if object_type.is_overloading_supported:
start_dtypes_idx = last_part.index("(")
finish_dtypes_idx = last_part.index(")")

parts[-1] = last_part[0:start_dtypes_idx]
arguments_str = last_part[start_dtypes_idx + 1 : finish_dtypes_idx]
data_types = []

data_types_str = last_part[start_dtypes_idx + 1 : finish_dtypes_idx]
data_types = (
[BaseDataType[arg.strip(" ").split(" ")[1]] for arg in data_types_str.split(",")] if data_types_str else []
)
if arguments_str:
for arg in arguments_str.split(","):
data_types.append(BaseDataType[arg.strip(" ").split(" ")[-1]])

return SchemaObjectIdentWithArgs(env_prefix, parts[0], parts[1], parts[2], data_types=data_types)

Expand All @@ -81,7 +82,7 @@ def build_grant_name_ident_snowflake(object_type: ObjectType, grant_name: str):
raise ValueError(f"Unexpected grant name format [{grant_name}] in Snowflake for object type [{object_type}]")


def build_future_grant_name_ident_snowflake(object_type: ObjectType, grant_name: str):
def build_future_grant_name_ident(object_type: ObjectType, grant_name: str):
env_prefix = ""

parts = [p.strip('"') for p in grant_name.split(".")]
Expand Down
6 changes: 6 additions & 0 deletions snowddl/blueprint/object_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ObjectType(Enum):
"singular": "FUNCTION",
"plural": "FUNCTIONS",
"is_future_grant_supported": True,
"is_overloading_supported": True,
"blueprint_cls": "FunctionBlueprint",
}

Expand Down Expand Up @@ -145,6 +146,7 @@ class ObjectType(Enum):
"singular": "PROCEDURE",
"plural": "PROCEDURES",
"is_future_grant_supported": True,
"is_overloading_supported": True,
"blueprint_cls": "ProcedureBlueprint",
}

Expand Down Expand Up @@ -301,5 +303,9 @@ def blueprint_cls(self):
def is_future_grant_supported(self) -> bool:
return self.value.get("is_future_grant_supported", False)

@property
def is_overloading_supported(self) -> bool:
return self.value.get("is_overloading_supported", False)

def __repr__(self):
return f"<{self.__class__.__name__}.{super().name}>"
19 changes: 17 additions & 2 deletions snowddl/parser/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
"file_format": {
"type": "string"
},
"match_by_column_name": {
"type": "string"
},
"include_metadata": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"transform": {
"type": "object",
"additionalProperties": {
Expand Down Expand Up @@ -67,12 +76,16 @@ def load_blueprints(self):
self.parse_schema_object_files("pipe", pipe_json_schema, self.process_pipe)

def process_pipe(self, f: ParsedFile):
file_format = None
include_metadata = None

copy = f.params["copy"]

if copy.get("file_format"):
file_format = build_schema_object_ident(self.env_prefix, copy.get("file_format"), f.database, f.schema)
else:
file_format = None

if copy.get("include_metadata"):
include_metadata = {Ident(k): Ident(v) for k, v in copy.get("include_metadata").items()}

bp = PipeBlueprint(
full_name=SchemaObjectIdent(self.env_prefix, f.database, f.schema, f.name),
Expand All @@ -83,6 +96,8 @@ def process_pipe(self, f: ParsedFile):
copy_pattern=copy.get("pattern"),
copy_transform=self.normalise_params_dict(copy.get("transform")),
copy_file_format=file_format,
copy_match_by_column_name=copy.get("match_by_column_name"),
copy_include_metadata=include_metadata,
copy_options=self.normalise_params_dict(copy.get("options")),
aws_sns_topic=f.params.get("aws_sns_topic"),
integration=Ident(f.params["integration"]) if f.params.get("integration") else None,
Expand Down
10 changes: 5 additions & 5 deletions snowddl/resolver/abc_role_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
FutureGrant,
SchemaIdent,
SchemaObjectIdent,
build_grant_name_ident_snowflake,
build_future_grant_name_ident_snowflake,
build_grant_name_ident,
build_future_grant_name_ident,
)
from snowddl.resolver.abc_resolver import AbstractResolver, ResolveResult, ObjectType

Expand Down Expand Up @@ -78,8 +78,8 @@ def get_existing_role_grants(self, role_name):
account_grants.append(AccountGrant(privilege=r["privilege"]))
else:
try:
grant_name = build_grant_name_ident_snowflake(object_type, r["name"])
except ValueError:
grant_name = build_grant_name_ident(object_type, r["name"])
except (KeyError, ValueError) as e:
self.engine.intention_cache.add_invalid_name_warning(object_type, r["name"])
continue

Expand Down Expand Up @@ -107,7 +107,7 @@ def get_existing_role_grants(self, role_name):
continue

try:
grant_name = build_future_grant_name_ident_snowflake(object_type, r["name"])
grant_name = build_future_grant_name_ident(object_type, r["name"])
except ValueError:
self.engine.intention_cache.add_invalid_name_warning(object_type, r["name"])
continue
Expand Down
4 changes: 2 additions & 2 deletions snowddl/resolver/inbound_share_role.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from snowddl.blueprint import RoleBlueprint, DatabaseShareBlueprint, Grant, build_role_ident, build_grant_name_ident_snowflake
from snowddl.blueprint import RoleBlueprint, DatabaseShareBlueprint, Grant, build_role_ident, build_grant_name_ident
from snowddl.resolver.abc_role_resolver import AbstractRoleResolver, ObjectType


Expand Down Expand Up @@ -29,7 +29,7 @@ def get_existing_role_grants(self, role_name):
Grant(
privilege="IMPORTED PRIVILEGES",
on=ObjectType.DATABASE,
name=build_grant_name_ident_snowflake(ObjectType.DATABASE, r["name"]),
name=build_grant_name_ident(ObjectType.DATABASE, r["name"]),
)
)

Expand Down
4 changes: 2 additions & 2 deletions snowddl/resolver/outbound_share.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from snowddl.blueprint import OutboundShareBlueprint, Grant, build_grant_name_ident_snowflake
from snowddl.blueprint import OutboundShareBlueprint, Grant, build_grant_name_ident
from snowddl.resolver.abc_resolver import AbstractResolver, ResolveResult, ObjectType


Expand Down Expand Up @@ -164,7 +164,7 @@ def get_existing_share_grants(self, share_name):
Grant(
privilege=r["privilege"],
on=ObjectType[r["granted_on"]],
name=build_grant_name_ident_snowflake(ObjectType[r["granted_on"]], r["name"]),
name=build_grant_name_ident(ObjectType[r["granted_on"]], r["name"]),
)
)

Expand Down
23 changes: 23 additions & 0 deletions snowddl/resolver/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,29 @@ def _build_copy_into(self, bp: PipeBlueprint):
},
)

if bp.copy_match_by_column_name:
query.append_nl(
"MATCH_BY_COLUMN_NAME = {match_by_column_name}",
{
"match_by_column_name": bp.copy_match_by_column_name.upper(),
},
)

if bp.copy_include_metadata:
query.append_nl("INCLUDE_METADATA = (")

for idx, (table_col_name, metadata_col_name) in enumerate(bp.copy_include_metadata.items()):
query.append_nl(
" {comma:r}{table_col_name:i} = {metadata_col_name:i}",
{
"comma": " " if idx == 0 else ", ",
"table_col_name": table_col_name,
"metadata_col_name": metadata_col_name,
},
)

query.append_nl(")")

if bp.copy_options:
for k, v in bp.copy_options.items():
query.append_nl(
Expand Down
6 changes: 3 additions & 3 deletions snowddl/resolver/user_role.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from snowddl.blueprint import RoleBlueprint, UserBlueprint, Grant, build_role_ident, build_grant_name_ident_snowflake
from snowddl.blueprint import RoleBlueprint, UserBlueprint, Grant, build_role_ident, build_grant_name_ident
from snowddl.resolver.abc_role_resolver import AbstractRoleResolver, ObjectType


Expand Down Expand Up @@ -26,8 +26,8 @@ def get_existing_role_grants(self, role_name):
object_type = ObjectType.ROLE

try:
grant_name = build_grant_name_ident_snowflake(object_type, r["name"])
except ValueError:
grant_name = build_grant_name_ident(object_type, r["name"])
except (KeyError, ValueError):
self.engine.intention_cache.add_invalid_name_warning(object_type, r["name"])
continue

Expand Down
2 changes: 1 addition & 1 deletion snowddl/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.29.0"
__version__ = "0.29.1"
1 change: 1 addition & 0 deletions test/_config/step1/db1/sc1/file_format/pi003_ff1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
type: PARQUET
15 changes: 15 additions & 0 deletions test/_config/step1/db1/sc1/pipe/pi003_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
copy:
table: pi003_tb1
stage: pi003_st1
file_format: pi003_ff1

match_by_column_name: case_sensitive

include_metadata:
metadata_file_name: METADATA$FILENAME
metadata_file_row_number: METADATA$FILE_ROW_NUMBER
metadata_file_content_key: METADATA$FILE_CONTENT_KEY
metadata_file_last_modified: METADATA$FILE_LAST_MODIFIED
metadata_file_start_scan_time: METADATA$START_SCAN_TIME

auto_ingest: false
2 changes: 2 additions & 0 deletions test/_config/step1/db1/sc1/stage/pi003_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
8 changes: 8 additions & 0 deletions test/_config/step1/db1/sc1/table/pi003_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
metadata_file_name: VARCHAR(100000)
metadata_file_row_number: NUMBER(38,0)
metadata_file_content_key: VARCHAR(100000)
metadata_file_last_modified: TIMESTAMP_NTZ(9)
metadata_file_start_scan_time: TIMESTAMP_LTZ(9)
1 change: 1 addition & 0 deletions test/_config/step2/db1/sc1/file_format/pi003_ff1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
type: PARQUET
15 changes: 15 additions & 0 deletions test/_config/step2/db1/sc1/pipe/pi003_pi1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
copy:
table: pi003_tb1
stage: pi003_st1
file_format: pi003_ff1

match_by_column_name: case_insensitive

include_metadata:
metadata_file_name: METADATA$FILENAME
metadata_file_row_number: METADATA$FILE_ROW_NUMBER
metadata_file_content_key: METADATA$FILE_CONTENT_KEY
metadata_file_last_modified: METADATA$FILE_LAST_MODIFIED
metadata_file_start_scan_time: METADATA$START_SCAN_TIME

auto_ingest: false
2 changes: 2 additions & 0 deletions test/_config/step2/db1/sc1/stage/pi003_st1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
url: gcs://test-bucket1
storage_integration: test_storage_integration
8 changes: 8 additions & 0 deletions test/_config/step2/db1/sc1/table/pi003_tb1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
columns:
id: NUMBER(38,0)
name: VARCHAR(255)
metadata_file_name: VARCHAR(100000)
metadata_file_row_number: NUMBER(38,0)
metadata_file_content_key: VARCHAR(100000)
metadata_file_last_modified: TIMESTAMP_NTZ(9)
metadata_file_start_scan_time: TIMESTAMP_LTZ(9)
28 changes: 28 additions & 0 deletions test/pipe/pi003.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
def test_step1(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi003_pi1")

assert "PI003_TB1" in pipe_show["definition"]
assert "PI003_ST1" in pipe_show["definition"]
assert "PI003_FF1" in pipe_show["definition"]

assert "MATCH_BY_COLUMN_NAME = 'CASE_SENSITIVE'" in pipe_show["definition"]
assert "INCLUDE_METADATA" in pipe_show["definition"]
assert "METADATA$FILENAME" in pipe_show["definition"]


def test_step2(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi003_pi1")

assert "PI003_TB1" in pipe_show["definition"]
assert "PI003_ST1" in pipe_show["definition"]
assert "PI003_FF1" in pipe_show["definition"]

assert "MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE'" in pipe_show["definition"]
assert "INCLUDE_METADATA" in pipe_show["definition"]
assert "METADATA$FILENAME" in pipe_show["definition"]


def test_step3(helper):
pipe_show = helper.show_pipe("db1", "sc1", "pi003_pi1")

assert pipe_show is None

0 comments on commit 06d2d4a

Please sign in to comment.