diff --git a/.env.docker b/.env.docker index d0f2d570..b1c92fc9 100644 --- a/.env.docker +++ b/.env.docker @@ -81,7 +81,7 @@ TEST_CLICKHOUSE_PORT_FOR_CONFTEST=8123 TEST_CLICKHOUSE_HOST_FOR_WORKER=test-clickhouse TEST_CLICKHOUSE_PORT_FOR_WORKER=8123 TEST_CLICKHOUSE_USER=default -TEST_CLICKHOUSE_PASSWORD= +TEST_CLICKHOUSE_PASSWORD=test_only TEST_CLICKHOUSE_DB=default TEST_MSSQL_HOST_FOR_CONFTEST=test-mssql diff --git a/.env.local b/.env.local index abdf1205..1c84df54 100644 --- a/.env.local +++ b/.env.local @@ -68,7 +68,7 @@ export TEST_CLICKHOUSE_PORT_FOR_CONFTEST=8123 export TEST_CLICKHOUSE_HOST_FOR_WORKER=test-clickhouse export TEST_CLICKHOUSE_PORT_FOR_WORKER=8123 export TEST_CLICKHOUSE_USER=default -export TEST_CLICKHOUSE_PASSWORD= +export TEST_CLICKHOUSE_PASSWORD=test_only export TEST_CLICKHOUSE_DB=default export TEST_MSSQL_HOST_FOR_CONFTEST=localhost diff --git a/docker-compose.test.yml b/docker-compose.test.yml index e9144c1b..d02b8c9d 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -173,6 +173,10 @@ services: test-clickhouse: image: clickhouse/clickhouse-server restart: unless-stopped + environment: + CLICKHOUSE_USER: default + CLICKHOUSE_PASSWORD: test_only + CLICKHOUSE_DB: default ports: - 8123:8123 - 9001:9000 diff --git a/docs/changelog/next_release/196.feature.rst b/docs/changelog/next_release/196.feature.rst new file mode 100644 index 00000000..a411445c --- /dev/null +++ b/docs/changelog/next_release/196.feature.rst @@ -0,0 +1 @@ +Add `file_name_template` field to `target_params` \ No newline at end of file diff --git a/syncmaster/schemas/v1/transfers/file/base.py b/syncmaster/schemas/v1/transfers/file/base.py index a3e80b49..9255b37f 100644 --- a/syncmaster/schemas/v1/transfers/file/base.py +++ b/syncmaster/schemas/v1/transfers/file/base.py @@ -33,6 +33,7 @@ class ReadFileTransferTarget(BaseModel): ..., discriminator="type", ) + file_name_template: str options: dict[str, Any] @@ -61,6 +62,10 @@ class CreateFileTransferTarget(BaseModel): ..., discriminator="type", ) + file_name_template: str = Field( + default="{run_created_at}_{index}.{extension}", + description="Template for file naming with required placeholders 'index' and 'extension'", + ) options: dict[str, Any] = Field(default_factory=dict) class Config: @@ -72,3 +77,20 @@ def _directory_path_is_valid_path(cls, value): if not PurePosixPath(value).is_absolute(): raise ValueError("Directory path must be absolute") return value + + @field_validator("file_name_template") + @classmethod + def validate_file_name_template(cls, value): + required_keys = {"index", "extension"} + placeholders = {key for key in required_keys if f"{{{key}}}" in value} + + missing_keys = sorted(required_keys - placeholders) + if missing_keys: + raise ValueError(f"Missing required placeholders: {', '.join(missing_keys)}") + + try: + value.format(index="", extension="", run_created_at="", run_id="") + except KeyError as e: + raise ValueError(f"Invalid placeholder: {e}") + + return value diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py index 2f16cf2b..950fcb28 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_create_transfer.py @@ -9,8 +9,9 @@ pytestmark = [pytest.mark.asyncio, pytest.mark.server] +# TODO: refactor test fixtures to decrease amount of annotations @pytest.mark.parametrize( - "connection_type,create_connection_data", + "connection_type, create_connection_data", [ ( "s3", @@ -23,71 +24,92 @@ indirect=["create_connection_data"], ) @pytest.mark.parametrize( - "target_source_params", + "target_source_params, target_params", [ - { - "type": "s3", - "directory_path": "/some/pure/path", - "file_format": { - "type": "csv", - "delimiter": ",", - "encoding": "utf-8", - "quote": '"', - "escape": "\\", - "include_header": False, - "line_sep": "\n", - "compression": "gzip", - }, - "options": { - "some": "option", - }, - }, - { - "type": "s3", - "directory_path": "/some/excel/path", - "file_format": { - "type": "excel", - "include_header": True, - "start_cell": "A1", + ( + { + "type": "s3", + "directory_path": "/some/pure/path", + "file_format": { + "type": "csv", + "delimiter": ",", + "encoding": "utf-8", + "quote": '"', + "escape": "\\", + "include_header": False, + "line_sep": "\n", + "compression": "gzip", + }, + "options": { + "some": "option", + }, }, - "options": { - "some": "option", + { + "file_name_template": "{run_created_at}_{index}.{extension}", }, - }, - { - "type": "s3", - "directory_path": "/some/xml/path", - "file_format": { - "type": "xml", - "root_tag": "data", - "row_tag": "record", - "compression": "lz4", + ), + ( + { + "type": "s3", + "directory_path": "/some/excel/path", + "file_format": { + "type": "excel", + "include_header": True, + "start_cell": "A1", + }, + "options": { + "some": "option", + }, }, - "options": { - "some": "option", + { + "file_name_template": "{index}.{extension}", }, - }, - { - "type": "s3", - "directory_path": "/some/orc/path", - "file_format": { - "type": "orc", + ), + ( + { + "type": "s3", + "directory_path": "/some/xml/path", + "file_format": { + "type": "xml", + "root_tag": "data", + "row_tag": "record", + "compression": "lz4", + }, + "options": { + "some": "option", + }, }, - "options": { - "some": "option", + { + "file_name_template": "{run_created_at}-{index}.{extension}", }, - }, - { - "type": "s3", - "directory_path": "/some/parquet/path", - "file_format": { - "type": "parquet", - "compression": "gzip", + ), + ( + { + "type": "s3", + "directory_path": "/some/orc/path", + "file_format": { + "type": "orc", + }, + "options": { + "some": "option", + }, }, - "options": { - "some": "option", + {}, + ), + ( + { + "type": "s3", + "directory_path": "/some/parquet/path", + "file_format": { + "type": "parquet", + "compression": "gzip", + }, + "options": { + "some": "option", + }, }, - }, + {}, + ), ], ) async def test_developer_plus_can_create_s3_transfer( @@ -98,6 +120,7 @@ async def test_developer_plus_can_create_s3_transfer( group_queue: Queue, mock_group: MockGroup, target_source_params: dict, + target_params: dict, create_connection_data: dict, ): # Arrange @@ -117,7 +140,7 @@ async def test_developer_plus_can_create_s3_transfer( "source_connection_id": first_connection.id, "target_connection_id": second_connection.id, "source_params": target_source_params, - "target_params": target_source_params, + "target_params": {**target_source_params, **target_params}, "strategy_params": {"type": "full"}, "queue_id": group_queue.id, }, @@ -202,56 +225,78 @@ async def test_developer_plus_can_create_s3_transfer( indirect=["create_connection_data"], ) @pytest.mark.parametrize( - "target_source_params", + "target_source_params, target_params", [ - { - "type": "hdfs", - "directory_path": "/some/pure/path", - "file_format": { - "type": "csv", - "compression": "gzip", + ( + { + "type": "hdfs", + "directory_path": "/some/pure/path", + "file_format": { + "type": "csv", + "compression": "gzip", + }, }, - }, - { - "type": "hdfs", - "directory_path": "/some/excel/path", - "file_format": { - "type": "excel", - "include_header": True, - "start_cell": "A1", + { + "file_name_template": "{run_created_at}_{index}.{extension}", }, - }, - { - "type": "hdfs", - "directory_path": "/some/xml/path", - "file_format": { - "type": "xml", - "root_tag": "data", - "row_tag": "record", - "compression": "bzip2", + ), + ( + { + "type": "hdfs", + "directory_path": "/some/excel/path", + "file_format": { + "type": "excel", + "include_header": True, + "start_cell": "A1", + }, }, - }, - { - "type": "hdfs", - "directory_path": "/some/orc/path", - "file_format": { - "type": "orc", + { + "file_name_template": "{index}.{extension}", }, - }, - { - "type": "hdfs", - "directory_path": "/some/parquet/path", - "file_format": { - "type": "parquet", - "compression": "snappy", + ), + ( + { + "type": "hdfs", + "directory_path": "/some/xml/path", + "file_format": { + "type": "xml", + "root_tag": "data", + "row_tag": "record", + "compression": "bzip2", + }, }, - }, + { + "file_name_template": "{run_created_at}-{index}.{extension}", + }, + ), + ( + { + "type": "hdfs", + "directory_path": "/some/orc/path", + "file_format": { + "type": "orc", + }, + }, + {}, + ), + ( + { + "type": "hdfs", + "directory_path": "/some/parquet/path", + "file_format": { + "type": "parquet", + "compression": "snappy", + }, + }, + {}, + ), ], ) async def test_developer_plus_can_create_hdfs_transfer( create_connection_data: dict, two_group_connections: tuple[MockConnection, MockConnection], target_source_params: dict, + target_params: dict, role_developer_plus: UserTestRoles, group_queue: Queue, mock_group: MockGroup, @@ -275,7 +320,7 @@ async def test_developer_plus_can_create_hdfs_transfer( "source_connection_id": first_connection.id, "target_connection_id": second_connection.id, "source_params": target_source_params, - "target_params": target_source_params, + "target_params": {**target_source_params, **target_params}, "strategy_params": {"type": "full"}, "queue_id": group_queue.id, }, @@ -457,3 +502,107 @@ async def test_cannot_create_file_transfer_with_relative_path( ], }, } + + +@pytest.mark.parametrize( + "create_connection_data", + [ + { + "type": "s3", + "host": "localhost", + "port": 443, + }, + ], + indirect=True, +) +@pytest.mark.parametrize( + "target_source_params, target_params, expected_errors", + [ + pytest.param( + { + "type": "s3", + "directory_path": "/some/path", + "file_format": { + "type": "excel", + "include_header": True, + }, + }, + { + "file_name_template": "{run_created_at}", + }, + [ + { + "context": {}, + "input": "{run_created_at}", + "location": ["body", "target_params", "s3", "file_name_template"], + "message": "Value error, Missing required placeholders: extension, index", + "code": "value_error", + }, + ], + id="missing_required_placeholders", + ), + pytest.param( + { + "type": "s3", + "directory_path": "/some/path", + "file_format": { + "type": "xml", + "root_tag": "data", + "row_tag": "record", + }, + }, + { + "file_name_template": "{run_created_at}_{index}.{extension}{some_unknown_tag}", + }, + [ + { + "context": {}, + "input": "{run_created_at}_{index}.{extension}{some_unknown_tag}", + "location": ["body", "target_params", "s3", "file_name_template"], + "message": "Value error, Invalid placeholder: 'some_unknown_tag'", + "code": "value_error", + }, + ], + id="unknown_tag", + ), + ], +) +async def test_file_name_template_validation( + client: AsyncClient, + two_group_connections: tuple[MockConnection, MockConnection], + group_queue: Queue, + mock_group: MockGroup, + target_source_params: dict, + target_params: dict, + expected_errors: list, + create_connection_data: dict, +): + first_connection, second_connection = two_group_connections + user = mock_group.get_member_of_role(UserTestRoles.Developer) + + result = await client.post( + "v1/transfers", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + "group_id": mock_group.group.id, + "name": "new test transfer", + "description": "", + "is_scheduled": False, + "schedule": "", + "source_connection_id": first_connection.id, + "target_connection_id": second_connection.id, + "source_params": target_source_params, + "target_params": {**target_source_params, **target_params}, + "strategy_params": {"type": "full"}, + "queue_id": group_queue.id, + }, + ) + + assert result.status_code == 422 + assert result.json() == { + "error": { + "code": "invalid_request", + "message": "Invalid request", + "details": expected_errors, + }, + } diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py index 53d2f544..f30527cc 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py @@ -25,6 +25,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}_{index}.{extension}", + }, "transformations": [ { "type": "dataframe_rows_filter", @@ -54,6 +57,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{index}.{extension}", + }, }, { "source_and_target_params": { @@ -67,6 +73,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}-{index}.{extension}", + }, }, { "source_and_target_params": { @@ -78,6 +87,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}_{index}.{extension}", + }, }, { "source_and_target_params": { @@ -89,6 +101,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}_{index}.{extension}", + }, }, ], ) diff --git a/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py b/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py index da8cd82c..285d76d7 100644 --- a/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py +++ b/tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py @@ -25,6 +25,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}_{index}.{extension}", + }, }, { "source_and_target_params": { @@ -37,6 +40,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{index}.{extension}", + }, }, { "source_and_target_params": { @@ -50,6 +56,9 @@ }, "options": {}, }, + "target_params": { + "file_name_template": "{run_created_at}-{index}.{extension}", + }, }, { "source_and_target_params": { @@ -146,6 +155,13 @@ async def test_developer_plus_can_update_s3_transfer( "file_format": create_transfer_data["source_and_target_params"]["file_format"], "options": {"some": "option"}, }, + "target_params": { + "type": "s3", + "directory_path": "/some/new/test/directory", + "file_format": create_transfer_data["source_and_target_params"]["file_format"], + "file_name_template": "{index}.{extension}", + "options": {"some": "option"}, + }, "transformations": transformations, }, ) @@ -159,6 +175,10 @@ async def test_developer_plus_can_update_s3_transfer( "options": {"some": "option"}, }, ) + target_params = { + **source_params, + "file_name_template": "{index}.{extension}", + } # Assert assert result.status_code == 200 @@ -172,7 +192,7 @@ async def test_developer_plus_can_update_s3_transfer( "source_connection_id": group_transfer.source_connection_id, "target_connection_id": group_transfer.target_connection_id, "source_params": source_params, - "target_params": group_transfer.target_params, + "target_params": target_params, "strategy_params": group_transfer.strategy_params, "transformations": transformations, "queue_id": group_transfer.transfer.queue_id, diff --git a/tests/test_unit/test_transfers/transfer_fixtures/transfer_fixture.py b/tests/test_unit/test_transfers/transfer_fixtures/transfer_fixture.py index 4ac28bb0..037d80d4 100644 --- a/tests/test_unit/test_transfers/transfer_fixtures/transfer_fixture.py +++ b/tests/test_unit/test_transfers/transfer_fixtures/transfer_fixture.py @@ -93,6 +93,7 @@ async def group_transfer( target_connection = await create_connection( session=session, name="group_transfer_target_connection", + type=connection_type or "postgres", group_id=group.id, data=create_connection_data, ) @@ -102,6 +103,8 @@ async def group_transfer( connection_id=target_connection.id, ) + source_and_target_params = create_transfer_data.get("source_and_target_params", {}) if create_transfer_data else {} + target_params = create_transfer_data.get("target_params", {}) if create_transfer_data else {} transfer = await create_transfer( session=session, name="group_transfer", @@ -109,8 +112,8 @@ async def group_transfer( source_connection_id=source_connection.id, target_connection_id=target_connection.id, queue_id=queue.id, - source_params=create_transfer_data.get("source_and_target_params") if create_transfer_data else None, - target_params=create_transfer_data.get("source_and_target_params") if create_transfer_data else None, + source_params=source_and_target_params, + target_params={**source_and_target_params, **target_params}, transformations=create_transfer_data.get("transformations") if create_transfer_data else None, ) diff --git a/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py b/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py index 1033836d..05fd0456 100644 --- a/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py +++ b/tests/test_unit/test_transfers/transfer_fixtures/transfers_fixture.py @@ -71,12 +71,14 @@ async def group_transfers( source_params["directory_path"] = "/path/to/source" target_params.update(common_params) target_params["directory_path"] = "/path/to/target" + target_params["file_name_template"] = "{run_created_at}_{index}.{extension}" elif connection_type == ConnectionType.HDFS: common_params = {"options": {}} source_params.update(common_params) source_params["directory_path"] = "/path/to/source" target_params.update(common_params) target_params["directory_path"] = "/path/to/target" + target_params["file_name_template"] = "{run_created_at}_{index}.{extension}" elif connection_type in [ ConnectionType.HIVE, ConnectionType.POSTGRES,