Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilyas Gasanov committed Jan 28, 2025
1 parent 8435484 commit 14414b6
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
TEST_SFTP_HOST_FOR_WORKER=test-sftp
TEST_SFTP_PORT_FOR_WORKER=2222
TEST_SFTP_USER=syncmaster
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
TEST_SFTP_PASSWORD=test_only

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
export TEST_SFTP_PORT_FOR_WORKER=2222
export TEST_SFTP_USER=syncmaster
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
export TEST_SFTP_PASSWORD=test_only

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ services:
USER_NAME: syncmaster
PASSWORD_ACCESS: true
SUDO_ACCESS: true
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
USER_PASSWORD: test_only
profiles: [sftp, all]

volumes:
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import json
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar

from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
Expand Down Expand Up @@ -98,3 +99,4 @@ class HDFSTransferDTO(FileTransferDTO):
@dataclass
class SFTPTransferDTO(FileTransferDTO):
type: ClassVar[str] = "sftp"
temp_worker_directory_path: Path | None = None
18 changes: 16 additions & 2 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import logging
import tempfile
from typing import Any

from syncmaster.db.models import Connection, Run
Expand Down Expand Up @@ -128,8 +129,21 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None:
self.source_handler.connect(spark)
self.target_handler.connect(spark)

df = self.source_handler.read()
self.target_handler.write(df)
source_needs_temp_dir = self.source_handler.transfer_dto.type == "sftp"
target_needs_temp_dir = self.target_handler.transfer_dto.type == "sftp"

if source_needs_temp_dir or target_needs_temp_dir:
with tempfile.TemporaryDirectory(prefix="syncmaster_") as temp_dir:
if source_needs_temp_dir:
self.source_handler.transfer_dto.temp_worker_directory_path = temp_dir
if target_needs_temp_dir:
self.target_handler.transfer_dto.temp_worker_directory_path = temp_dir

df = self.source_handler.read()
self.target_handler.write(df)
else:
df = self.source_handler.read()
self.target_handler.write(df)

def get_handler(
self,
Expand Down
14 changes: 5 additions & 9 deletions syncmaster/worker/handlers/file/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None:
port=self.connection_dto.port,
user=self.connection_dto.user,
password=self.connection_dto.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
).check()
self.local_connection = SparkLocalFS(
spark=spark,
Expand All @@ -36,16 +36,14 @@ def read(self) -> DataFrame:
downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=self.transfer_dto.temp_worker_directory_path,
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path="/tmp/syncmaster/sftp",
source_path=self.transfer_dto.temp_worker_directory_path,
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
options=self.transfer_dto.options,
)
Expand All @@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path="/tmp/syncmaster/sftp",
target_path=self.transfer_dto.temp_worker_directory_path,
options=self.transfer_dto.options,
)
writer.run(df=df)

uploader = FileUploader(
connection=self.connection,
local_path="/tmp/syncmaster/sftp",
temp_path="/config/target", # SFTP host
local_path=self.transfer_dto.temp_worker_directory_path,
target_path=self.transfer_dto.directory_path,
options={"if_exists": "replace_entire_directory"},
)
uploader.run()
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
port=sftp_for_conftest.port,
user=sftp_for_conftest.user,
password=sftp_for_conftest.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
)


Expand Down
9 changes: 4 additions & 5 deletions tests/test_integration/test_run_transfer/test_sftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import secrets
from pathlib import Path

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
postgres_to_sftp: Transfer,
target_file_format,
file_format_flavor: str,
tmp_path: Path,
):
format_name, format = target_file_format

Expand Down Expand Up @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
downloader = FileDownloader(
connection=sftp_file_connection,
source_path=f"/config/target/{format_name}/{file_format_flavor}",
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=tmp_path,
)
downloader.run()

reader = FileDFReader(
connection=sftp_file_df_connection,
format=format,
source_path="/tmp/syncmaster/sftp",
source_path=tmp_path,
df_schema=init_df.schema,
options={},
)
df = reader.run()

Expand Down

0 comments on commit 14414b6

Please sign in to comment.