diff --git a/.env.docker b/.env.docker index bab5bb6f..94b48e23 100644 --- a/.env.docker +++ b/.env.docker @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222 TEST_SFTP_HOST_FOR_WORKER=test-sftp TEST_SFTP_PORT_FOR_WORKER=2222 TEST_SFTP_USER=syncmaster -TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +TEST_SFTP_PASSWORD=test_only SPARK_CONF_DIR=/app/tests/spark/hive/conf/ HADOOP_CONF_DIR=/app/tests/spark/hadoop/ diff --git a/.env.local b/.env.local index 6587925a..87958d3e 100644 --- a/.env.local +++ b/.env.local @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222 export TEST_SFTP_HOST_FOR_WORKER=test-sftp export TEST_SFTP_PORT_FOR_WORKER=2222 export TEST_SFTP_USER=syncmaster -export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +export TEST_SFTP_PASSWORD=test_only export SPARK_CONF_DIR=./tests/spark/hive/conf/ export HADOOP_CONF_DIR=./tests/spark/hadoop/ diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 1bbe5c1a..28657df9 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -277,7 +277,7 @@ services: USER_NAME: syncmaster PASSWORD_ACCESS: true SUDO_ACCESS: true - USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho + USER_PASSWORD: test_only profiles: [sftp, all] volumes: diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index a952f03b..7ad1a59b 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -3,6 +3,7 @@ from __future__ import annotations +import tempfile from typing import TYPE_CHECKING from onetl.connection import SFTP, SparkLocalFS @@ -24,7 +25,7 @@ def connect(self, spark: SparkSession) -> None: port=self.connection_dto.port, user=self.connection_dto.user, password=self.connection_dto.password, - compress=False, + compress=False, # to avoid errors from combining file and SCP-level compression ).check() self.local_connection = SparkLocalFS( spark=spark, @@ -33,23 +34,23 @@ def connect(self, spark: SparkSession) -> None: def read(self) -> DataFrame: from pyspark.sql.types import StructType - downloader = FileDownloader( - connection=self.connection, - source_path=self.transfer_dto.directory_path, - temp_path="/tmp/syncmaster", - local_path="/tmp/syncmaster/sftp", - options={"if_exists": "replace_entire_directory"}, - ) - downloader.run() - - reader = FileDFReader( - connection=self.local_connection, - format=self.transfer_dto.file_format, - source_path="/tmp/syncmaster/sftp", - df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, - options=self.transfer_dto.options, - ) - df = reader.run() + with tempfile.TemporaryDirectory(prefix="syncmaster_sftp_") as temp_dir: + downloader = FileDownloader( + connection=self.connection, + source_path=self.transfer_dto.directory_path, + local_path=temp_dir, + ) + downloader.run() + + reader = FileDFReader( + connection=self.local_connection, + format=self.transfer_dto.file_format, + source_path=temp_dir, + df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, + options=self.transfer_dto.options, + ) + df = reader.run() + df.cache().count() rows_filter_expression = self._get_rows_filter_expression() if rows_filter_expression: @@ -62,19 +63,18 @@ def read(self) -> DataFrame: return df def write(self, df: DataFrame) -> None: - writer = FileDFWriter( - connection=self.local_connection, - format=self.transfer_dto.file_format, - target_path="/tmp/syncmaster/sftp", - options=self.transfer_dto.options, - ) - writer.run(df=df) - - uploader = FileUploader( - connection=self.connection, - local_path="/tmp/syncmaster/sftp", - temp_path="/config/target", # SFTP host - target_path=self.transfer_dto.directory_path, - options={"if_exists": "replace_entire_directory"}, - ) - uploader.run() + with tempfile.TemporaryDirectory(prefix="syncmaster_sftp_") as temp_dir: + writer = FileDFWriter( + connection=self.local_connection, + format=self.transfer_dto.file_format, + target_path=temp_dir, + options=self.transfer_dto.options, + ) + writer.run(df=df) + + uploader = FileUploader( + connection=self.connection, + local_path=temp_dir, + target_path=self.transfer_dto.directory_path, + ) + uploader.run() diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py index d2fb8b4b..751e14d5 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest): port=sftp_for_conftest.port, user=sftp_for_conftest.user, password=sftp_for_conftest.password, - compress=False, + compress=False, # to avoid errors from combining file and SCP-level compression ) diff --git a/tests/test_integration/test_run_transfer/test_sftp.py b/tests/test_integration/test_run_transfer/test_sftp.py index 8d4bb649..536fb007 100644 --- a/tests/test_integration/test_run_transfer/test_sftp.py +++ b/tests/test_integration/test_run_transfer/test_sftp.py @@ -1,5 +1,6 @@ import os import secrets +from pathlib import Path import pytest import pytest_asyncio @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp( postgres_to_sftp: Transfer, target_file_format, file_format_flavor: str, + tmp_path: Path, ): format_name, format = target_file_format @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp( downloader = FileDownloader( connection=sftp_file_connection, source_path=f"/config/target/{format_name}/{file_format_flavor}", - temp_path="/tmp/syncmaster", - local_path="/tmp/syncmaster/sftp", - options={"if_exists": "replace_entire_directory"}, + local_path=tmp_path, ) downloader.run() reader = FileDFReader( connection=sftp_file_df_connection, format=format, - source_path="/tmp/syncmaster/sftp", + source_path=tmp_path, df_schema=init_df.schema, - options={}, ) df = reader.run()