diff --git a/.env.docker b/.env.docker index bab5bb6f..94b48e23 100644 --- a/.env.docker +++ b/.env.docker @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222 TEST_SFTP_HOST_FOR_WORKER=test-sftp TEST_SFTP_PORT_FOR_WORKER=2222 TEST_SFTP_USER=syncmaster -TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +TEST_SFTP_PASSWORD=test_only SPARK_CONF_DIR=/app/tests/spark/hive/conf/ HADOOP_CONF_DIR=/app/tests/spark/hadoop/ diff --git a/.env.local b/.env.local index 6587925a..87958d3e 100644 --- a/.env.local +++ b/.env.local @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222 export TEST_SFTP_HOST_FOR_WORKER=test-sftp export TEST_SFTP_PORT_FOR_WORKER=2222 export TEST_SFTP_USER=syncmaster -export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +export TEST_SFTP_PASSWORD=test_only export SPARK_CONF_DIR=./tests/spark/hive/conf/ export HADOOP_CONF_DIR=./tests/spark/hadoop/ diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 1bbe5c1a..28657df9 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -277,7 +277,7 @@ services: USER_NAME: syncmaster PASSWORD_ACCESS: true SUDO_ACCESS: true - USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho + USER_PASSWORD: test_only profiles: [sftp, all] volumes: diff --git a/syncmaster/dto/transfers.py b/syncmaster/dto/transfers.py index 838e268e..5fc7e08b 100644 --- a/syncmaster/dto/transfers.py +++ b/syncmaster/dto/transfers.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import json from dataclasses import dataclass +from pathlib import Path from typing import ClassVar from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet @@ -21,6 +22,7 @@ class DBTransferDTO(TransferDTO): @dataclass class FileTransferDTO(TransferDTO): directory_path: str + temp_worker_directory_path: Path file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet options: dict df_schema: dict | None = None diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index fbbec2fe..8b7efceb 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 import logging +import tempfile from typing import Any from syncmaster.db.models import Connection, Run @@ -147,6 +148,17 @@ def get_handler( handler, connection_dto, transfer_dto = connection_handler_proxy[handler_type] + if handler_type in ("s3", "hdfs", "sftp"): + with tempfile.TemporaryDirectory(prefix=f"syncmaster_{handler_type}_") as temp_dir: + return handler( + connection_dto=connection_dto(**connection_data), + transfer_dto=transfer_dto( + **transfer_params, + transformations=transformations, + temp_worker_directory_path=temp_dir, + ), + ) + return handler( connection_dto=connection_dto(**connection_data), transfer_dto=transfer_dto(**transfer_params, transformations=transformations), diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index a952f03b..a54a9f85 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None: port=self.connection_dto.port, user=self.connection_dto.user, password=self.connection_dto.password, - compress=False, + compress=False, # to avoid errors from combining file and SCP-level compression ).check() self.local_connection = SparkLocalFS( spark=spark, @@ -36,16 +36,14 @@ def read(self) -> DataFrame: downloader = FileDownloader( connection=self.connection, source_path=self.transfer_dto.directory_path, - temp_path="/tmp/syncmaster", - local_path="/tmp/syncmaster/sftp", - options={"if_exists": "replace_entire_directory"}, + local_path=self.transfer_dto.temp_worker_directory_path, ) downloader.run() reader = FileDFReader( connection=self.local_connection, format=self.transfer_dto.file_format, - source_path="/tmp/syncmaster/sftp", + source_path=self.transfer_dto.temp_worker_directory_path, df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, options=self.transfer_dto.options, ) @@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None: writer = FileDFWriter( connection=self.local_connection, format=self.transfer_dto.file_format, - target_path="/tmp/syncmaster/sftp", + target_path=self.transfer_dto.temp_worker_directory_path, options=self.transfer_dto.options, ) writer.run(df=df) uploader = FileUploader( connection=self.connection, - local_path="/tmp/syncmaster/sftp", - temp_path="/config/target", # SFTP host + local_path=self.transfer_dto.temp_worker_directory_path, target_path=self.transfer_dto.directory_path, - options={"if_exists": "replace_entire_directory"}, ) uploader.run() diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py index d2fb8b4b..751e14d5 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest): port=sftp_for_conftest.port, user=sftp_for_conftest.user, password=sftp_for_conftest.password, - compress=False, + compress=False, # to avoid errors from combining file and SCP-level compression ) diff --git a/tests/test_integration/test_run_transfer/test_sftp.py b/tests/test_integration/test_run_transfer/test_sftp.py index 8d4bb649..536fb007 100644 --- a/tests/test_integration/test_run_transfer/test_sftp.py +++ b/tests/test_integration/test_run_transfer/test_sftp.py @@ -1,5 +1,6 @@ import os import secrets +from pathlib import Path import pytest import pytest_asyncio @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp( postgres_to_sftp: Transfer, target_file_format, file_format_flavor: str, + tmp_path: Path, ): format_name, format = target_file_format @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp( downloader = FileDownloader( connection=sftp_file_connection, source_path=f"/config/target/{format_name}/{file_format_flavor}", - temp_path="/tmp/syncmaster", - local_path="/tmp/syncmaster/sftp", - options={"if_exists": "replace_entire_directory"}, + local_path=tmp_path, ) downloader.run() reader = FileDFReader( connection=sftp_file_df_connection, format=format, - source_path="/tmp/syncmaster/sftp", + source_path=tmp_path, df_schema=init_df.schema, - options={}, ) df = reader.run()