Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilyas Gasanov committed Jan 29, 2025
1 parent 8435484 commit 421fe46
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
TEST_SFTP_HOST_FOR_WORKER=test-sftp
TEST_SFTP_PORT_FOR_WORKER=2222
TEST_SFTP_USER=syncmaster
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
TEST_SFTP_PASSWORD=test_only

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
export TEST_SFTP_PORT_FOR_WORKER=2222
export TEST_SFTP_USER=syncmaster
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
export TEST_SFTP_PASSWORD=test_only

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ services:
USER_NAME: syncmaster
PASSWORD_ACCESS: true
SUDO_ACCESS: true
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
USER_PASSWORD: test_only
profiles: [sftp, all]

volumes:
Expand Down
30 changes: 20 additions & 10 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import logging
from tempfile import TemporaryDirectory
from typing import Any

from syncmaster.db.models import Connection, Run
Expand Down Expand Up @@ -103,40 +104,48 @@ def __init__(
target_connection: Connection,
target_auth_data: dict,
):
self.temp_dir = TemporaryDirectory(prefix="syncmaster_")

self.run = run
self.source_handler = self.get_handler(
connection_data=source_connection.data,
transfer_params=run.transfer.source_params,
transformations=run.transfer.transformations,
connection_auth_data=source_auth_data,
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
)
self.target_handler = self.get_handler(
connection_data=target_connection.data,
transfer_params=run.transfer.target_params,
transformations=run.transfer.transformations,
connection_auth_data=target_auth_data,
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
)

def perform_transfer(self, settings: WorkerAppSettings) -> None:
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
run=self.run,
source=self.source_handler.connection_dto,
target=self.target_handler.connection_dto,
)
try:
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
run=self.run,
source=self.source_handler.connection_dto,
target=self.target_handler.connection_dto,
)

with spark:
self.source_handler.connect(spark)
self.target_handler.connect(spark)
with spark:
self.source_handler.connect(spark)
self.target_handler.connect(spark)

df = self.source_handler.read()
self.target_handler.write(df)
df = self.source_handler.read()
self.target_handler.write(df)
finally:
self.temp_dir.cleanup()

def get_handler(
self,
connection_data: dict[str, Any],
connection_auth_data: dict,
transfer_params: dict[str, Any],
transformations: list[dict],
temp_dir: TemporaryDirectory,
) -> Handler:
connection_data.update(connection_auth_data)
connection_data.pop("type")
Expand All @@ -150,4 +159,5 @@ def get_handler(
return handler(
connection_dto=connection_dto(**connection_data),
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
temp_dir=temp_dir,
)
3 changes: 3 additions & 0 deletions syncmaster/worker/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

from syncmaster.dto.connections import ConnectionDTO
Expand All @@ -19,9 +20,11 @@ def __init__(
self,
connection_dto: ConnectionDTO,
transfer_dto: TransferDTO,
temp_dir: TemporaryDirectory,
):
self.connection_dto = connection_dto
self.transfer_dto = transfer_dto
self.temp_dir = temp_dir

@abstractmethod
def connect(self, spark: SparkSession) -> None: ...
Expand Down
14 changes: 5 additions & 9 deletions syncmaster/worker/handlers/file/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None:
port=self.connection_dto.port,
user=self.connection_dto.user,
password=self.connection_dto.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
).check()
self.local_connection = SparkLocalFS(
spark=spark,
Expand All @@ -36,16 +36,14 @@ def read(self) -> DataFrame:
downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=self.temp_dir.name,
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path="/tmp/syncmaster/sftp",
source_path=self.temp_dir.name,
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
options=self.transfer_dto.options,
)
Expand All @@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path="/tmp/syncmaster/sftp",
target_path=self.temp_dir.name,
options=self.transfer_dto.options,
)
writer.run(df=df)

uploader = FileUploader(
connection=self.connection,
local_path="/tmp/syncmaster/sftp",
temp_path="/config/target", # SFTP host
local_path=self.temp_dir.name,
target_path=self.transfer_dto.directory_path,
options={"if_exists": "replace_entire_directory"},
)
uploader.run()
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
port=sftp_for_conftest.port,
user=sftp_for_conftest.user,
password=sftp_for_conftest.password,
compress=False,
compress=False, # to avoid errors from combining file and SCP-level compression
)


Expand Down
9 changes: 4 additions & 5 deletions tests/test_integration/test_run_transfer/test_sftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import secrets
from pathlib import Path

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
postgres_to_sftp: Transfer,
target_file_format,
file_format_flavor: str,
tmp_path: Path,
):
format_name, format = target_file_format

Expand Down Expand Up @@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
downloader = FileDownloader(
connection=sftp_file_connection,
source_path=f"/config/target/{format_name}/{file_format_flavor}",
temp_path="/tmp/syncmaster",
local_path="/tmp/syncmaster/sftp",
options={"if_exists": "replace_entire_directory"},
local_path=tmp_path,
)
downloader.run()

reader = FileDFReader(
connection=sftp_file_df_connection,
format=format,
source_path="/tmp/syncmaster/sftp",
source_path=tmp_path,
df_schema=init_df.schema,
options={},
)
df = reader.run()

Expand Down

0 comments on commit 421fe46

Please sign in to comment.