From d0ee1bbe88cbb7d50e755be2f892087cba9bdd37 Mon Sep 17 00:00:00 2001 From: Ilyas Gasanov Date: Mon, 3 Feb 2025 13:36:54 +0300 Subject: [PATCH] [DOP-22336] Add logic for handling WebDAV transfers --- .env.docker | 7 + .env.local | 7 + .github/workflows/tests.yml | 6 +- .github/workflows/webdav-tests.yml | 79 ++++++ Makefile | 4 + README.rst | 1 + docker-compose.test.yml | 24 +- docker/webdav/on_post_init.sh | 5 + docs/changelog/next_release/194.feature.rst | 1 + syncmaster/dto/connections.py | 10 + syncmaster/dto/transfers.py | 5 + syncmaster/worker/controller.py | 8 + syncmaster/worker/handlers/file/webdav.py | 31 +++ syncmaster/worker/spark.py | 2 +- tests/settings.py | 8 + .../connection_fixtures/__init__.py | 10 + .../connection_fixtures/spark_fixtures.py | 2 +- .../connection_fixtures/webdav_fixtures.py | 137 ++++++++++ .../test_run_transfer/test_webdav.py | 237 ++++++++++++++++++ 19 files changed, 576 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/webdav-tests.yml create mode 100644 docker/webdav/on_post_init.sh create mode 100644 docs/changelog/next_release/194.feature.rst create mode 100644 syncmaster/worker/handlers/file/webdav.py create mode 100644 tests/test_integration/test_run_transfer/connection_fixtures/webdav_fixtures.py create mode 100644 tests/test_integration/test_run_transfer/test_webdav.py diff --git a/.env.docker b/.env.docker index 05495101..d0f2d570 100644 --- a/.env.docker +++ b/.env.docker @@ -142,6 +142,13 @@ TEST_SAMBA_USER=syncmaster TEST_SAMBA_PASSWORD=test_only TEST_SAMBA_AUTH_TYPE=NTLMv2 +TEST_WEBDAV_HOST_FOR_CONFTEST=webdav +TEST_WEBDAV_PORT_FOR_CONFTEST=80 +TEST_WEBDAV_HOST_FOR_WORKER=webdav +TEST_WEBDAV_PORT_FOR_WORKER=80 +TEST_WEBDAV_USER=syncmaster +TEST_WEBDAV_PASSWORD=test_only + SPARK_CONF_DIR=/app/tests/spark/hive/conf/ HADOOP_CONF_DIR=/app/tests/spark/hadoop/ HIVE_CONF_DIR=/app/tests/spark/hive/conf/ diff --git a/.env.local b/.env.local index 9d38a5b7..abdf1205 100644 --- a/.env.local +++ b/.env.local @@ -129,6 +129,13 @@ export TEST_SAMBA_USER=syncmaster export TEST_SAMBA_PASSWORD=test_only export TEST_SAMBA_AUTH_TYPE=NTLMv2 +export TEST_WEBDAV_HOST_FOR_CONFTEST=localhost +export TEST_WEBDAV_PORT_FOR_CONFTEST=8010 +export TEST_WEBDAV_HOST_FOR_WORKER=webdav +export TEST_WEBDAV_PORT_FOR_WORKER=80 +export TEST_WEBDAV_USER=syncmaster +export TEST_WEBDAV_PASSWORD=test_only + export SPARK_CONF_DIR=./tests/spark/hive/conf/ export HADOOP_CONF_DIR=./tests/spark/hadoop/ export HIVE_CONF_DIR=./tests/spark/hive/conf/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c74dce4e..723a2341 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -60,6 +60,10 @@ jobs: name: Samba tests uses: ./.github/workflows/samba-tests.yml + webdav_tests: + name: WebDAV tests + uses: ./.github/workflows/webdav-tests.yml + scheduler_tests: name: Scheduler tests uses: ./.github/workflows/scheduler-tests.yml @@ -72,7 +76,7 @@ jobs: name: Tests done runs-on: ubuntu-latest - needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests, samba_tests] + needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests, samba_tests, webdav_tests] steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.github/workflows/webdav-tests.yml b/.github/workflows/webdav-tests.yml new file mode 100644 index 00000000..c83bf52e --- /dev/null +++ b/.github/workflows/webdav-tests.yml @@ -0,0 +1,79 @@ +name: WebDAV tests +on: + workflow_call: + +env: + DEFAULT_PYTHON: '3.12' + +jobs: + test: + name: Run WebDAV tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Cache jars + uses: actions/cache@v4 + with: + path: ./cached_jars + key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-webdav + restore-keys: | + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-webdav + ${{ runner.os }}-python- + + - name: Build Worker Image + uses: docker/build-push-action@v6 + with: + context: . + tags: mtsrus/syncmaster-worker:${{ github.sha }} + target: test + file: docker/Dockerfile.worker + load: true + cache-from: mtsrus/syncmaster-worker:develop + + - name: Docker compose up + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + docker compose -f docker-compose.test.yml --profile webdav up -d --wait --wait-timeout 200 + env: + WORKER_IMAGE_TAG: ${{ github.sha }} + + - name: Run WebDAV Tests + run: | + docker compose -f ./docker-compose.test.yml --profile webdav exec -T worker coverage run -m pytest -vvv -s -m "worker and webdav" + + - name: Dump worker logs on failure + if: failure() + uses: jwalton/gh-docker-logs@v2 + with: + images: mtsrus/syncmaster-worker + dest: ./logs + + # This is important, as coverage is exported after receiving SIGTERM + - name: Shutdown + if: always() + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + + - name: Upload worker logs + uses: actions/upload-artifact@v4 + if: failure() + with: + name: worker-logs-webdav + path: logs/* + + - name: Upload coverage results + uses: actions/upload-artifact@v4 + with: + name: coverage-webdav + path: reports/* + # https://github.com/actions/upload-artifact/issues/602 + include-hidden-files: true diff --git a/Makefile b/Makefile index 8063bb8b..efa98707 100644 --- a/Makefile +++ b/Makefile @@ -125,6 +125,10 @@ test-integration-samba: test-db ##@Test Run integration tests for Samb docker compose -f docker-compose.test.yml --profile samba up -d --wait $(DOCKER_COMPOSE_ARGS) ${POETRY} run pytest ./tests/test_integration -m samba $(PYTEST_ARGS) +test-integration-webdav: test-db ##@Test Run integration tests for WebDAV + docker compose -f docker-compose.test.yml --profile webdav up -d --wait $(DOCKER_COMPOSE_ARGS) + ${POETRY} run pytest ./tests/test_integration -m webdav $(PYTEST_ARGS) + test-integration: test-db ##@Test Run all integration tests docker compose -f docker-compose.test.yml --profile all up -d --wait $(DOCKER_COMPOSE_ARGS) ${POETRY} run pytest ./tests/test_integration $(PYTEST_ARGS) diff --git a/README.rst b/README.rst index ff956b7f..f7b46393 100644 --- a/README.rst +++ b/README.rst @@ -44,6 +44,7 @@ List of currently supported connections: * FTPS * SFTP * Samba +* WebDAV Current Data.SyncMaster implementation provides following components: diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 2f2f1138..e9144c1b 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -125,7 +125,7 @@ services: condition: service_completed_successfully rabbitmq: condition: service_healthy - profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, all] + profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] test-postgres: image: postgres @@ -139,7 +139,7 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, all] + profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, webdav, all] test-s3: image: bitnami/minio:latest @@ -209,7 +209,6 @@ services: platform: linux/amd64 profiles: [mysql, all] - metastore-hive: image: postgres restart: unless-stopped @@ -225,7 +224,7 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all] + profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, webdav, all] keycloak: image: quay.io/keycloak/keycloak:latest @@ -264,7 +263,7 @@ services: HIVE_METASTORE_DB_USER: test_hive HIVE_METASTORE_DB_PASSWORD: test_hive # writing spark dataframe to s3, sftp, ftp, ftps xml file fails without running hive metastore server - profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all] + profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, webdav, all] test-sftp: image: ${SFTP_IMAGE:-linuxserver/openssh-server} @@ -327,6 +326,21 @@ services: entrypoint: [/custom_entrypoint.sh] profiles: [samba, all] + webdav: + image: ${WEBDAV_IMAGE:-chonjay21/webdav:latest} + restart: unless-stopped + environment: + - APP_USER_NAME=syncmaster + - APP_USER_PASSWD=test_only + - APP_UID=1000 + - APP_GID=1000 + ports: + - 8010:80 + volumes: + # Remove after https://github.com/chonjay21/docker-webdav/pull/3 + - ./docker/webdav/on_post_init.sh:/sources/webdav/eventscripts/on_post_init.sh + profiles: [webdav, all] + volumes: postgres_test_data: rabbitmq_test_data: diff --git a/docker/webdav/on_post_init.sh b/docker/webdav/on_post_init.sh new file mode 100644 index 00000000..f4a22027 --- /dev/null +++ b/docker/webdav/on_post_init.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +set -e + +# allow create files and directories +chown -R www-data:www-data /var/webdav diff --git a/docs/changelog/next_release/194.feature.rst b/docs/changelog/next_release/194.feature.rst new file mode 100644 index 00000000..d0ea2ba7 --- /dev/null +++ b/docs/changelog/next_release/194.feature.rst @@ -0,0 +1 @@ +Add logic for handling WebDAV transfers \ No newline at end of file diff --git a/syncmaster/dto/connections.py b/syncmaster/dto/connections.py index 688f02c2..28cd115c 100644 --- a/syncmaster/dto/connections.py +++ b/syncmaster/dto/connections.py @@ -132,3 +132,13 @@ class SambaConnectionDTO(ConnectionDTO): domain: str = "" port: int | None = None type: ClassVar[str] = "samba" + + +@dataclass +class WebDAVConnectionDTO(ConnectionDTO): + host: str + port: int + user: str + password: str + protocol: Literal["http", "https"] = "https" + type: ClassVar[str] = "webdav" diff --git a/syncmaster/dto/transfers.py b/syncmaster/dto/transfers.py index b545ec54..b2810fa5 100644 --- a/syncmaster/dto/transfers.py +++ b/syncmaster/dto/transfers.py @@ -115,3 +115,8 @@ class FTPSTransferDTO(FileTransferDTO): @dataclass class SambaTransferDTO(FileTransferDTO): type: ClassVar[str] = "samba" + + +@dataclass +class WebDAVTransferDTO(FileTransferDTO): + type: ClassVar[str] = "webdav" diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index 8ca202c3..6586176c 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -18,6 +18,7 @@ S3ConnectionDTO, SambaConnectionDTO, SFTPConnectionDTO, + WebDAVConnectionDTO, ) from syncmaster.dto.transfers import ( ClickhouseTransferDTO, @@ -32,6 +33,7 @@ S3TransferDTO, SambaTransferDTO, SFTPTransferDTO, + WebDAVTransferDTO, ) from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError from syncmaster.worker.handlers.base import Handler @@ -47,6 +49,7 @@ from syncmaster.worker.handlers.file.s3 import S3Handler from syncmaster.worker.handlers.file.samba import SambaHandler from syncmaster.worker.handlers.file.sftp import SFTPHandler +from syncmaster.worker.handlers.file.webdav import WebDAVHandler from syncmaster.worker.settings import WorkerAppSettings logger = logging.getLogger(__name__) @@ -113,6 +116,11 @@ SambaConnectionDTO, SambaTransferDTO, ), + "webdav": ( + WebDAVHandler, + WebDAVConnectionDTO, + WebDAVTransferDTO, + ), } diff --git a/syncmaster/worker/handlers/file/webdav.py b/syncmaster/worker/handlers/file/webdav.py new file mode 100644 index 00000000..7df99d89 --- /dev/null +++ b/syncmaster/worker/handlers/file/webdav.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from onetl.connection import SparkLocalFS, WebDAV + +from syncmaster.dto.connections import WebDAVConnectionDTO +from syncmaster.worker.handlers.file.protocol import FileProtocolHandler + +if TYPE_CHECKING: + from pyspark.sql import SparkSession + + +class WebDAVHandler(FileProtocolHandler): + connection_dto: WebDAVConnectionDTO + + def connect(self, spark: SparkSession) -> None: + self.connection = WebDAV( + host=self.connection_dto.host, + port=self.connection_dto.port, + protocol=self.connection_dto.protocol, + user=self.connection_dto.user, + password=self.connection_dto.password, + ssl_verify=False, + ).check() + self.local_connection = SparkLocalFS( + spark=spark, + ).check() diff --git a/syncmaster/worker/spark.py b/syncmaster/worker/spark.py index 684c3274..4098c14d 100644 --- a/syncmaster/worker/spark.py +++ b/syncmaster/worker/spark.py @@ -64,7 +64,7 @@ def get_packages(connection_type: str) -> list[str]: spark_version = pyspark.__version__ return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages - if connection_type in ("hdfs", "sftp", "ftp", "ftps", "samba"): + if connection_type in ("hdfs", "sftp", "ftp", "ftps", "samba", "webdav"): return file_formats_spark_packages # If the database type does not require downloading .jar packages diff --git a/tests/settings.py b/tests/settings.py index 0b1a9da7..bd5813a9 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -98,6 +98,14 @@ class TestSettings(BaseSettings): TEST_SAMBA_PASSWORD: str TEST_SAMBA_AUTH_TYPE: str = "NTLMv2" + TEST_WEBDAV_HOST_FOR_CONFTEST: str + TEST_WEBDAV_PORT_FOR_CONFTEST: int + TEST_WEBDAV_HOST_FOR_WORKER: str + TEST_WEBDAV_PORT_FOR_WORKER: int + TEST_WEBDAV_PROTOCOL: str = "http" + TEST_WEBDAV_USER: str + TEST_WEBDAV_PASSWORD: str + @model_validator(mode="before") def check_sid_and_service_name(cls, values): sid = values.get("TEST_ORACLE_SID") diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py index e63d41f7..02e53064 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py @@ -117,3 +117,13 @@ from tests.test_integration.test_run_transfer.connection_fixtures.spark_fixtures import ( spark, ) +from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import ( + prepare_webdav, + webdav_connection, + webdav_file_connection, + webdav_file_connection_with_path, + webdav_file_df_connection, + webdav_file_df_connection_with_path, + webdav_for_conftest, + webdav_for_worker, +) diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py index 30c5f53c..ae71307c 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py @@ -75,7 +75,7 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession: ) ) - if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba"}): + if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba", "webdav"}): # excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902 file_formats_spark_packages: list[str] = XML.get_packages( spark_version=pyspark.__version__, diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/webdav_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/webdav_fixtures.py new file mode 100644 index 00000000..947fa00b --- /dev/null +++ b/tests/test_integration/test_run_transfer/connection_fixtures/webdav_fixtures.py @@ -0,0 +1,137 @@ +import logging +import secrets +from pathlib import PurePosixPath + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import Group +from syncmaster.dto.connections import WebDAVConnectionDTO +from syncmaster.server.settings import ServerAppSettings as Settings +from tests.settings import TestSettings +from tests.test_unit.utils import create_connection, create_credentials, upload_files + +logger = logging.getLogger(__name__) + + +@pytest_asyncio.fixture +async def webdav_connection( + webdav_for_worker: WebDAVConnectionDTO, + settings: Settings, + session: AsyncSession, + group: Group, +): + result = await create_connection( + session=session, + name=secrets.token_hex(5), + type=webdav_for_worker.type, + data=dict( + host=webdav_for_worker.host, + port=webdav_for_worker.port, + protocol=webdav_for_worker.protocol, + ), + group_id=group.id, + ) + + await create_credentials( + session=session, + settings=settings, + connection_id=result.id, + auth_data=dict( + type="basic", + user=webdav_for_worker.user, + password=webdav_for_worker.password, + ), + ) + + yield result + await session.delete(result) + await session.commit() + + +@pytest.fixture( + scope="session", + params=[pytest.param("webdav", marks=[pytest.mark.webdav])], +) +def webdav_for_conftest(test_settings: TestSettings) -> WebDAVConnectionDTO: + return WebDAVConnectionDTO( + host=test_settings.TEST_WEBDAV_HOST_FOR_CONFTEST, + port=test_settings.TEST_WEBDAV_PORT_FOR_CONFTEST, + protocol=test_settings.TEST_WEBDAV_PROTOCOL, + user=test_settings.TEST_WEBDAV_USER, + password=test_settings.TEST_WEBDAV_PASSWORD, + ) + + +@pytest.fixture( + scope="session", + params=[pytest.param("webdav", marks=[pytest.mark.webdav])], +) +def webdav_for_worker(test_settings: TestSettings) -> WebDAVConnectionDTO: + return WebDAVConnectionDTO( + host=test_settings.TEST_WEBDAV_HOST_FOR_WORKER, + port=test_settings.TEST_WEBDAV_PORT_FOR_WORKER, + protocol=test_settings.TEST_WEBDAV_PROTOCOL, + user=test_settings.TEST_WEBDAV_USER, + password=test_settings.TEST_WEBDAV_PASSWORD, + ) + + +@pytest.fixture(scope="session") +def webdav_file_connection(webdav_for_conftest): + from onetl.connection import WebDAV + + return WebDAV( + host=webdav_for_conftest.host, + port=webdav_for_conftest.port, + protocol=webdav_for_conftest.protocol, + user=webdav_for_conftest.user, + password=webdav_for_conftest.password, + ssl_verify=False, + ) + + +@pytest.fixture() +def webdav_file_connection_with_path(request, webdav_file_connection): + connection = webdav_file_connection + source = PurePosixPath("/data") + target = PurePosixPath("/target") + + def finalizer(): + connection.remove_dir(source, recursive=True) + connection.remove_dir(target, recursive=True) + + request.addfinalizer(finalizer) + + connection.remove_dir(source, recursive=True) + connection.remove_dir(target, recursive=True) + connection.create_dir(source) + + return connection, source + + +@pytest.fixture(scope="session") +def webdav_file_df_connection(spark): + from onetl.connection import SparkLocalFS + + return SparkLocalFS(spark=spark) + + +@pytest.fixture() +def webdav_file_df_connection_with_path(webdav_file_connection_with_path, webdav_file_df_connection): + _, source = webdav_file_connection_with_path + return webdav_file_df_connection, source + + +@pytest.fixture() +def prepare_webdav( + webdav_file_df_connection_with_path, + webdav_file_connection, + resource_path, +): + logger.info("START PREPARE WEBDAV") + connection, upload_to = webdav_file_df_connection_with_path + files = upload_files(resource_path, upload_to, webdav_file_connection) + logger.info("END PREPARE WEBDAV") + return connection, upload_to, files diff --git a/tests/test_integration/test_run_transfer/test_webdav.py b/tests/test_integration/test_run_transfer/test_webdav.py new file mode 100644 index 00000000..699af636 --- /dev/null +++ b/tests/test_integration/test_run_transfer/test_webdav.py @@ -0,0 +1,237 @@ +import os +import secrets +from pathlib import Path + +import pytest +import pytest_asyncio +from httpx import AsyncClient +from onetl.connection import SparkLocalFS, WebDAV +from onetl.db import DBReader +from onetl.file import FileDFReader, FileDownloader +from pyspark.sql import DataFrame +from pytest import FixtureRequest +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import Connection, Group, Queue, Status +from syncmaster.db.models.transfer import Transfer +from tests.mocks import MockUser +from tests.test_unit.utils import create_transfer +from tests.utils import get_run_on_end + +pytestmark = [pytest.mark.asyncio, pytest.mark.worker] + + +@pytest.fixture(params=[""]) +def file_format_flavor(request: FixtureRequest): + return request.param + + +@pytest_asyncio.fixture +async def webdav_to_postgres( + session: AsyncSession, + group: Group, + queue: Queue, + init_df: DataFrame, + webdav_connection: Connection, + postgres_connection: Connection, + prepare_webdav, + source_file_format, + file_format_flavor: str, +): + format_name, file_format = source_file_format + format_name_in_path = "xlsx" if format_name == "excel" else format_name + _, source_path, _ = prepare_webdav + + result = await create_transfer( + session=session, + group_id=group.id, + name=f"webdav2postgres_{secrets.token_hex(5)}", + source_connection_id=webdav_connection.id, + target_connection_id=postgres_connection.id, + source_params={ + "type": "webdav", + "directory_path": os.fspath(source_path / "file_df_connection" / format_name_in_path / file_format_flavor), + "file_format": { + "type": format_name, + **file_format.dict(), + }, + "df_schema": init_df.schema.json(), + "options": {}, + }, + target_params={ + "type": "postgres", + "table_name": "public.target_table", + }, + queue_id=queue.id, + ) + yield result + await session.delete(result) + await session.commit() + + +@pytest_asyncio.fixture(params=[""]) +async def postgres_to_webdav( + session: AsyncSession, + group: Group, + queue: Queue, + webdav_connection: Connection, + postgres_connection: Connection, + target_file_format, + file_format_flavor: str, +): + format_name, file_format = target_file_format + result = await create_transfer( + session=session, + group_id=group.id, + name=f"postgres2webdav_{secrets.token_hex(5)}", + source_connection_id=postgres_connection.id, + target_connection_id=webdav_connection.id, + source_params={ + "type": "postgres", + "table_name": "public.source_table", + }, + target_params={ + "type": "webdav", + "directory_path": f"/target/{format_name}/{file_format_flavor}", + "file_format": { + "type": format_name, + **file_format.dict(), + }, + "options": {}, + }, + queue_id=queue.id, + ) + yield result + await session.delete(result) + await session.commit() + + +@pytest.mark.parametrize( + "source_file_format, file_format_flavor", + [ + pytest.param( + ("csv", {}), + "with_header", + id="csv", + ), + ], + indirect=["source_file_format", "file_format_flavor"], +) +async def test_run_transfer_webdav_to_postgres( + prepare_postgres, + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + webdav_to_postgres: Transfer, + source_file_format, + file_format_flavor, +): + # Arrange + postgres, _ = prepare_postgres + + # Act + result = await client.post( + "v1/runs", + headers={"Authorization": f"Bearer {group_owner.token}"}, + json={"transfer_id": webdav_to_postgres.id}, + ) + # Assert + assert result.status_code == 200 + + run_data = await get_run_on_end( + client=client, + run_id=result.json()["id"], + token=group_owner.token, + ) + source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] + target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + + assert run_data["status"] == Status.FINISHED.value + assert source_auth_data["user"] + assert "password" not in source_auth_data + assert target_auth_data["user"] + assert "password" not in target_auth_data + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df = reader.run() + + for field in init_df.schema: + df = df.withColumn(field.name, df[field.name].cast(field.dataType)) + + assert df.sort("id").collect() == init_df.sort("id").collect() + + +@pytest.mark.parametrize( + "target_file_format, file_format_flavor", + [ + pytest.param( + ("csv", {"compression": "lz4"}), + "with_compression", + id="csv", + ), + ], + indirect=["target_file_format", "file_format_flavor"], +) +async def test_run_transfer_postgres_to_webdav( + group_owner: MockUser, + init_df: DataFrame, + client: AsyncClient, + prepare_postgres, + webdav_file_connection: WebDAV, + webdav_file_df_connection: SparkLocalFS, + postgres_to_webdav: Transfer, + target_file_format, + file_format_flavor: str, + tmp_path: Path, +): + format_name, format = target_file_format + + # Arrange + _, fill_with_data = prepare_postgres + fill_with_data(init_df) + + # Act + result = await client.post( + "v1/runs", + headers={"Authorization": f"Bearer {group_owner.token}"}, + json={"transfer_id": postgres_to_webdav.id}, + ) + # Assert + assert result.status_code == 200 + + run_data = await get_run_on_end( + client=client, + run_id=result.json()["id"], + token=group_owner.token, + ) + source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] + target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] + + assert run_data["status"] == Status.FINISHED.value + assert source_auth_data["user"] + assert "password" not in source_auth_data + assert target_auth_data["user"] + assert "password" not in target_auth_data + + downloader = FileDownloader( + connection=webdav_file_connection, + source_path=f"/target/{format_name}/{file_format_flavor}", + local_path=tmp_path, + ) + downloader.run() + + reader = FileDFReader( + connection=webdav_file_df_connection, + format=format, + source_path=tmp_path, + df_schema=init_df.schema, + ) + df = reader.run() + + for field in init_df.schema: + df = df.withColumn(field.name, df[field.name].cast(field.dataType)) + + assert df.sort("id").collect() == init_df.sort("id").collect()