Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilyas Gasanov committed Jan 28, 2025
1 parent 8435484 commit cda4e5d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
TEST_SFTP_HOST_FOR_WORKER=test-sftp
TEST_SFTP_PORT_FOR_WORKER=2222
TEST_SFTP_USER=syncmaster
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
TEST_SFTP_PASSWORD=test_only

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
export TEST_SFTP_PORT_FOR_WORKER=2222
export TEST_SFTP_USER=syncmaster
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
export TEST_SFTP_PASSWORD=test_only

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ services:
USER_NAME: syncmaster
PASSWORD_ACCESS: true
SUDO_ACCESS: true
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
USER_PASSWORD: test_only
profiles: [sftp, all]

volumes:
Expand Down
68 changes: 34 additions & 34 deletions syncmaster/worker/handlers/file/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import tempfile
from typing import TYPE_CHECKING

from onetl.connection import SFTP, SparkLocalFS
Expand All @@ -24,7 +25,7 @@ def connect(self, spark: SparkSession) -> None:
port=self.connection_dto.port,
user=self.connection_dto.user,
password=self.connection_dto.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
).check()
self.local_connection = SparkLocalFS(
spark=spark,
Expand All @@ -33,23 +34,23 @@ def connect(self, spark: SparkSession) -> None:
def read(self) -> DataFrame:
from pyspark.sql.types import StructType

downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path="/tmp/syncmaster/sftp",
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
options=self.transfer_dto.options,
)
df = reader.run()
with tempfile.TemporaryDirectory(prefix="syncmaster_sftp_") as temp_dir:
downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
local_path=temp_dir,
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path=temp_dir,
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
options=self.transfer_dto.options,
)
df = reader.run()
df.cache().count()

rows_filter_expression = self._get_rows_filter_expression()
if rows_filter_expression:
Expand All @@ -62,19 +63,18 @@ def read(self) -> DataFrame:
return df

def write(self, df: DataFrame) -> None:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path="/tmp/syncmaster/sftp",
options=self.transfer_dto.options,
)
writer.run(df=df)

uploader = FileUploader(
connection=self.connection,
local_path="/tmp/syncmaster/sftp",
temp_path="/config/target", # SFTP host
target_path=self.transfer_dto.directory_path,
options={"if_exists": "replace_entire_directory"},
)
uploader.run()
with tempfile.TemporaryDirectory(prefix="syncmaster_sftp_") as temp_dir:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path=temp_dir,
options=self.transfer_dto.options,
)
writer.run(df=df)

uploader = FileUploader(
connection=self.connection,
local_path=temp_dir,
target_path=self.transfer_dto.directory_path,
)
uploader.run()
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
port=sftp_for_conftest.port,
user=sftp_for_conftest.user,
password=sftp_for_conftest.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
)


Expand Down
9 changes: 4 additions & 5 deletions tests/test_integration/test_run_transfer/test_sftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import secrets
from pathlib import Path

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
postgres_to_sftp: Transfer,
target_file_format,
file_format_flavor: str,
tmp_path: Path,
):
format_name, format = target_file_format

Expand Down Expand Up @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
downloader = FileDownloader(
connection=sftp_file_connection,
source_path=f"/config/target/{format_name}/{file_format_flavor}",
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=tmp_path,
)
downloader.run()

reader = FileDFReader(
connection=sftp_file_df_connection,
format=format,
source_path="/tmp/syncmaster/sftp",
source_path=tmp_path,
df_schema=init_df.schema,
options={},
)
df = reader.run()

Expand Down

0 comments on commit cda4e5d

Please sign in to comment.