Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilyas Gasanov committed Jan 28, 2025
1 parent 8435484 commit 8386181
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
TEST_SFTP_HOST_FOR_WORKER=test-sftp
TEST_SFTP_PORT_FOR_WORKER=2222
TEST_SFTP_USER=syncmaster
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
TEST_SFTP_PASSWORD=test_only

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
export TEST_SFTP_PORT_FOR_WORKER=2222
export TEST_SFTP_USER=syncmaster
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
export TEST_SFTP_PASSWORD=test_only

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ services:
USER_NAME: syncmaster
PASSWORD_ACCESS: true
SUDO_ACCESS: true
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
USER_PASSWORD: test_only
profiles: [sftp, all]

volumes:
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import json
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar

from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
Expand All @@ -21,6 +22,7 @@ class DBTransferDTO(TransferDTO):
@dataclass
class FileTransferDTO(TransferDTO):
directory_path: str
temp_worker_directory_path: Path
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
options: dict
df_schema: dict | None = None
Expand Down
12 changes: 12 additions & 0 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import logging
import tempfile
from typing import Any

from syncmaster.db.models import Connection, Run
Expand Down Expand Up @@ -147,6 +148,17 @@ def get_handler(

handler, connection_dto, transfer_dto = connection_handler_proxy[handler_type]

if handler_type in ("s3", "hdfs", "sftp"):
with tempfile.TemporaryDirectory(prefix=f"syncmaster_{handler_type}_") as temp_dir:
return handler(
connection_dto=connection_dto(**connection_data),
transfer_dto=transfer_dto(
**transfer_params,
transformations=transformations,
temp_worker_directory_path=temp_dir,
),
)

return handler(
connection_dto=connection_dto(**connection_data),
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
Expand Down
14 changes: 5 additions & 9 deletions syncmaster/worker/handlers/file/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None:
port=self.connection_dto.port,
user=self.connection_dto.user,
password=self.connection_dto.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
).check()
self.local_connection = SparkLocalFS(
spark=spark,
Expand All @@ -36,16 +36,14 @@ def read(self) -> DataFrame:
downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=self.transfer_dto.temp_worker_directory_path,
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path="/tmp/syncmaster/sftp",
source_path=self.transfer_dto.temp_worker_directory_path,
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
options=self.transfer_dto.options,
)
Expand All @@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path="/tmp/syncmaster/sftp",
target_path=self.transfer_dto.temp_worker_directory_path,
options=self.transfer_dto.options,
)
writer.run(df=df)

uploader = FileUploader(
connection=self.connection,
local_path="/tmp/syncmaster/sftp",
temp_path="/config/target", # SFTP host
local_path=self.transfer_dto.temp_worker_directory_path,
target_path=self.transfer_dto.directory_path,
options={"if_exists": "replace_entire_directory"},
)
uploader.run()
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
port=sftp_for_conftest.port,
user=sftp_for_conftest.user,
password=sftp_for_conftest.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
)


Expand Down
9 changes: 4 additions & 5 deletions tests/test_integration/test_run_transfer/test_sftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import secrets
from pathlib import Path

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
postgres_to_sftp: Transfer,
target_file_format,
file_format_flavor: str,
tmp_path: Path,
):
format_name, format = target_file_format

Expand Down Expand Up @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
downloader = FileDownloader(
connection=sftp_file_connection,
source_path=f"/config/target/{format_name}/{file_format_flavor}",
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=tmp_path,
)
downloader.run()

reader = FileDFReader(
connection=sftp_file_df_connection,
format=format,
source_path="/tmp/syncmaster/sftp",
source_path=tmp_path,
df_schema=init_df.schema,
options={},
)
df = reader.run()

Expand Down

0 comments on commit 8386181

Please sign in to comment.