Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22350] Add transformations for Transfers with dataframe column filtering #186

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/186.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add transformations for **Transfers** with dataframe column filtering
5 changes: 4 additions & 1 deletion syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
S3ReadTransferTarget,
)
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
from syncmaster.schemas.v1.transfers.transformations.dataframe_columns_filter import (
DataframeColumnsFilter,
)
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
DataframeRowsFilter,
)
Expand Down Expand Up @@ -102,7 +105,7 @@
| None
)

TransformationSchema = DataframeRowsFilter
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter


class CopyTransferSchema(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated, Literal

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.transformation_types import DATAFRAME_COLUMNS_FILTER


class BaseColumnsFilter(BaseModel):
field: str


class IncludeFilter(BaseColumnsFilter):
type: Literal["include"]


class RenameFilter(BaseColumnsFilter):
type: Literal["rename"]
to: str


class CastFilter(BaseColumnsFilter):
type: Literal["cast"]
as_type: str


ColumnsFilter = IncludeFilter | RenameFilter | CastFilter


class DataframeColumnsFilter(BaseModel):
type: DATAFRAME_COLUMNS_FILTER
filters: list[Annotated[ColumnsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,74 @@
from syncmaster.schemas.v1.transformation_types import DATAFRAME_ROWS_FILTER


class BaseRowFilter(BaseModel):
class BaseRowsFilter(BaseModel):
field: str


class IsNullFilter(BaseRowFilter):
class IsNullFilter(BaseRowsFilter):
type: Literal["is_null"]


class IsNotNullFilter(BaseRowFilter):
class IsNotNullFilter(BaseRowsFilter):
type: Literal["is_not_null"]


class EqualFilter(BaseRowFilter):
class EqualFilter(BaseRowsFilter):
type: Literal["equal"]
value: str


class NotEqualFilter(BaseRowFilter):
class NotEqualFilter(BaseRowsFilter):
type: Literal["not_equal"]
value: str


class GreaterThanFilter(BaseRowFilter):
class GreaterThanFilter(BaseRowsFilter):
type: Literal["greater_than"]
value: str


class GreaterOrEqualFilter(BaseRowFilter):
class GreaterOrEqualFilter(BaseRowsFilter):
type: Literal["greater_or_equal"]
value: str


class LessThanFilter(BaseRowFilter):
class LessThanFilter(BaseRowsFilter):
type: Literal["less_than"]
value: str


class LessOrEqualFilter(BaseRowFilter):
class LessOrEqualFilter(BaseRowsFilter):
type: Literal["less_or_equal"]
value: str


class LikeFilter(BaseRowFilter):
class LikeFilter(BaseRowsFilter):
type: Literal["like"]
value: str


class ILikeFilter(BaseRowFilter):
class ILikeFilter(BaseRowsFilter):
type: Literal["ilike"]
value: str


class NotLikeFilter(BaseRowFilter):
class NotLikeFilter(BaseRowsFilter):
type: Literal["not_like"]
value: str


class NotILikeFilter(BaseRowFilter):
class NotILikeFilter(BaseRowsFilter):
type: Literal["not_ilike"]
value: str


class RegexpFilter(BaseRowFilter):
class RegexpFilter(BaseRowsFilter):
type: Literal["regexp"]
value: str


RowFilter = (
RowsFilter = (
IsNullFilter
| IsNotNullFilter
| EqualFilter
Expand All @@ -93,4 +93,4 @@ class RegexpFilter(BaseRowFilter):

class DataframeRowsFilter(BaseModel):
type: DATAFRAME_ROWS_FILTER
filters: list[Annotated[RowFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
filters: list[Annotated[RowsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
1 change: 1 addition & 0 deletions syncmaster/schemas/v1/transformation_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from typing import Literal

DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]
49 changes: 42 additions & 7 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def read(self) -> DataFrame:
reader = DBReader(
connection=self.connection,
table=self.transfer_dto.table_name,
where=self._get_filter_expression(),
where=self._get_rows_filter_expression(),
columns=self._get_columns_filter_expressions(),
)
return reader.run()

Expand All @@ -53,13 +54,47 @@ def write(self, df: DataFrame) -> None:
def _normalize_column_names(self, df: DataFrame) -> DataFrame: ...

@abstractmethod
def _make_filter_expression(self, filters: list[dict]) -> str | None: ...
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None: ...

def _get_filter_expression(self) -> str | None:
filters = []
def _make_columns_filter_expressions(self, filters: list[dict]) -> list[str] | None:
expressions = []
for filter in filters:
filter_type = filter["type"]
field = self._quote_field(filter["field"])

if filter_type == "include":
expressions.append(field)
elif filter_type == "rename":
new_name = self._quote_field(filter["to"])
expressions.append(f"{field} AS {new_name}")
elif filter_type == "cast":
cast_type = filter["as_type"]
expressions.append(f"CAST({field} AS {cast_type}) AS {field}")

return expressions or None

def _get_rows_filter_expression(self) -> str | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filters.extend(transformation["filters"])
if filters:
return self._make_filter_expression(filters)
expressions.extend(transformation["filters"])

if expressions:
return self._make_rows_filter_expression(expressions)

return None

def _get_columns_filter_expressions(self) -> list[str] | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_columns_filter":
expressions.extend(transformation["filters"])

if expressions:
return self._make_columns_filter_expressions(expressions)

return None

@staticmethod
def _quote_field(field: str) -> str:
return f'"{field}"'
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._quote_field(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
8 changes: 6 additions & 2 deletions syncmaster/worker/handlers/db/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f"`{filter["field"]}`"
field = self._quote_field(filter["field"])
value = filter.get("value")

if value is None:
Expand All @@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
expressions.append(f"{field} {op} '{value}'")

return " AND ".join(expressions) or None

@staticmethod
def _quote_field(field):
return f"`{field}`"
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f'"{filter["field"]}"'
field = self._quote_field(filter["field"])
value = filter.get("value")

if value is None:
Expand Down
8 changes: 6 additions & 2 deletions syncmaster/worker/handlers/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f"`{filter["field"]}`"
field = self._quote_field(filter["field"])
value = filter.get("value")

if value is None:
Expand All @@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
expressions.append(f"{field} {op} '{value}'")

return " AND ".join(expressions) or None

@staticmethod
def _quote_field(field: str) -> str:
return f"`{field}`"
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.upper())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._quote_field(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._quote_field(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
61 changes: 48 additions & 13 deletions syncmaster/worker/handlers/file/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ def read(self) -> DataFrame:
)
df = reader.run()

filter_expression = self._get_filter_expression()
if filter_expression:
df = df.where(filter_expression)
rows_filter_expression = self._get_rows_filter_expression()
if rows_filter_expression:
df = df.where(rows_filter_expression)

columns_filter_expressions = self._get_columns_filter_expressions()
if columns_filter_expressions:
df = df.selectExpr(*columns_filter_expressions)

return df

Expand All @@ -64,16 +68,7 @@ def write(self, df: DataFrame):

return writer.run(df=df)

def _get_filter_expression(self) -> str | None:
filters = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filters.extend(transformation["filters"])
if filters:
return self._make_filter_expression(filters)
return None

def _make_filter_expression(self, filters: list[dict]) -> str:
def _make_rows_filter_expression(self, filters: list[dict]) -> str:
expressions = []
for filter in filters:
field = filter["field"]
Expand All @@ -83,3 +78,43 @@ def _make_filter_expression(self, filters: list[dict]) -> str:
expressions.append(f"{field} {op} '{value}'" if value is not None else f"{field} {op}")

return " AND ".join(expressions)

def _make_columns_filter_expressions(self, filters: list[dict]) -> list[str] | None:
# TODO: another approach is to use df.select(col("col1"), col("col2").alias("new_col2"), ...)
expressions = []
for filter in filters:
filter_type = filter["type"]
field = filter["field"]

if filter_type == "include":
expressions.append(field)
elif filter_type == "rename":
new_name = filter["to"]
expressions.append(f"{field} AS {new_name}")
IlyasDevelopment marked this conversation as resolved.
Show resolved Hide resolved
elif filter_type == "cast":
cast_type = filter["as_type"]
expressions.append(f"CAST({field} AS {cast_type}) AS {field}")

return expressions or None

def _get_rows_filter_expression(self) -> str | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
expressions.extend(transformation["filters"])

if expressions:
return self._make_rows_filter_expression(expressions)

return None

def _get_columns_filter_expressions(self) -> list[str] | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_columns_filter":
expressions.extend(transformation["filters"])

if expressions:
return self._make_columns_filter_expressions(expressions)

return None
Loading
Loading