Skip to content

Commit

Permalink
[DOP-22337] Add logic for handling Samba transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilyas Gasanov committed Jan 31, 2025
1 parent cc69298 commit ddf4ca8
Show file tree
Hide file tree
Showing 27 changed files with 689 additions and 370 deletions.
13 changes: 13 additions & 0 deletions .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ TEST_FTPS_PORT_FOR_WORKER=21
TEST_FTPS_USER=syncmaster
TEST_FTPS_PASSWORD=test_only

TEST_SAMBA_HOST_FOR_CONFTEST=test-samba
TEST_SAMBA_PORT_FOR_CONFTEST=445
TEST_SAMBA_HOST_FOR_WORKER=test-samba
TEST_SAMBA_PORT_FOR_WORKER=445
TEST_SAMBA_DOMAIN=domain
TEST_SAMBA_PROTOCOL=SMB
TEST_SAMBA_UID=1000
TEST_SAMBA_GID=1000
TEST_SAMBA_SHARE=SmbShare
TEST_SAMBA_USER=syncmaster
TEST_SAMBA_PASSWORD=test_only
TEST_SAMBA_AUTH_TYPE=NTLMv2

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
HIVE_CONF_DIR=/app/tests/spark/hive/conf/
13 changes: 13 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ export TEST_FTPS_PORT_FOR_WORKER=21
export TEST_FTPS_USER=syncmaster
export TEST_FTPS_PASSWORD=test_only

export TEST_SAMBA_HOST_FOR_CONFTEST=localhost
export TEST_SAMBA_PORT_FOR_CONFTEST=1445
export TEST_SAMBA_HOST_FOR_WORKER=test-samba
export TEST_SAMBA_PORT_FOR_WORKER=445
export TEST_SAMBA_DOMAIN=domain
export TEST_SAMBA_PROTOCOL=SMB
export TEST_SAMBA_UID=1000
export TEST_SAMBA_GID=1000
export TEST_SAMBA_SHARE=SmbShare
export TEST_SAMBA_USER=syncmaster
export TEST_SAMBA_PASSWORD=test_only
export TEST_SAMBA_AUTH_TYPE=NTLMv2

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
export HIVE_CONF_DIR=./tests/spark/hive/conf/
82 changes: 82 additions & 0 deletions .github/workflows/samba-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: Samba tests
on:
workflow_call:

env:
DEFAULT_PYTHON: '3.12'

jobs:
test:
name: Run Samba tests
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Cache jars
uses: actions/cache@v4
with:
path: ./cached_jars
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-samba
restore-keys: |
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-samba
${{ runner.os }}-python-
- name: Build Worker Image
uses: docker/build-push-action@v6
with:
context: .
tags: mtsrus/syncmaster-worker:${{ github.sha }}
target: test
file: docker/Dockerfile.worker
load: true
cache-from: mtsrus/syncmaster-worker:develop

- name: Make custom entrypoint script executable
run: chmod +x ./docker/samba/custom_entrypoint.sh

- name: Docker compose up
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
docker compose -f docker-compose.test.yml --profile samba up -d --wait --wait-timeout 200
env:
WORKER_IMAGE_TAG: ${{ github.sha }}

- name: Run Samba Tests
run: |
docker compose -f ./docker-compose.test.yml --profile samba exec -T worker coverage run -m pytest -vvv -s -m "worker and samba"
- name: Dump worker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
images: mtsrus/syncmaster-worker
dest: ./logs

# This is important, as coverage is exported after receiving SIGTERM
- name: Shutdown
if: always()
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
- name: Upload worker logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: worker-logs-samba
path: logs/*

- name: Upload coverage results
uses: actions/upload-artifact@v4
with:
name: coverage-samba
path: reports/*
# https://github.com/actions/upload-artifact/issues/602
include-hidden-files: true
6 changes: 5 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ jobs:
name: FTPS tests
uses: ./.github/workflows/ftps-tests.yml

samba_tests:
name: Samba tests
uses: ./.github/workflows/samba-tests.yml

scheduler_tests:
name: Scheduler tests
uses: ./.github/workflows/scheduler-tests.yml
Expand All @@ -68,7 +72,7 @@ jobs:
name: Tests done
runs-on: ubuntu-latest

needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests]
needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests, samba_tests]
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ test-integration-ftps: test-db ##@Test Run integration tests for FTPS
docker compose -f docker-compose.test.yml --profile ftps up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m ftps $(PYTEST_ARGS)

test-integration-samba: test-db ##@Test Run integration tests for Samba
docker compose -f docker-compose.test.yml --profile samba up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m samba $(PYTEST_ARGS)

test-integration: test-db ##@Test Run all integration tests
docker compose -f docker-compose.test.yml --profile all up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration $(PYTEST_ARGS)
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ List of currently supported connections:
* FTP
* FTPS
* SFTP
* Samba

Current Data.SyncMaster implementation provides following components:

Expand Down
19 changes: 15 additions & 4 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ services:
condition: service_completed_successfully
rabbitmq:
condition: service_healthy
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, all]
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, all]

test-postgres:
image: postgres
Expand All @@ -139,7 +139,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, all]
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, all]

test-s3:
image: bitnami/minio:latest
Expand Down Expand Up @@ -225,7 +225,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [hive, hdfs, s3, sftp, ftp, ftps, all]
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all]

keycloak:
image: quay.io/keycloak/keycloak:latest
Expand Down Expand Up @@ -264,7 +264,7 @@ services:
HIVE_METASTORE_DB_USER: test_hive
HIVE_METASTORE_DB_PASSWORD: test_hive
# writing spark dataframe to s3, sftp, ftp, ftps xml file fails without running hive metastore server
profiles: [hive, hdfs, s3, sftp, ftp, ftps, all]
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all]

test-sftp:
image: ${SFTP_IMAGE:-linuxserver/openssh-server}
Expand Down Expand Up @@ -316,6 +316,17 @@ services:
- ./docker/ftp/on_post_init.sh:/sources/ftps/eventscripts/on_post_init.sh
profiles: [ftps, all]

test-samba:
image: ${SAMBA_IMAGE:-elswork/samba}
restart: unless-stopped
ports:
- 139:139
- 1445:445
volumes:
- ./docker/samba/custom_entrypoint.sh:/custom_entrypoint.sh
entrypoint: [/custom_entrypoint.sh]
profiles: [samba, all]

volumes:
postgres_test_data:
rabbitmq_test_data:
Expand Down
6 changes: 6 additions & 0 deletions docker/samba/custom_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash

# allow create files and directories
mkdir -p /share/folder
chmod 0777 /share/folder
/entrypoint.sh -u "1000:1000:syncmaster:syncmaster:test_only" -s "SmbShare:/share/folder:rw:syncmaster"
1 change: 1 addition & 0 deletions docs/changelog/next_release/192.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add logic for handling Samba transfers
15 changes: 14 additions & 1 deletion syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from typing import ClassVar
from typing import ClassVar, Literal


@dataclass
Expand Down Expand Up @@ -119,3 +119,16 @@ class FTPSConnectionDTO(ConnectionDTO):
user: str
password: str
type: ClassVar[str] = "ftps"


@dataclass
class SambaConnectionDTO(ConnectionDTO):
host: str
share: str
protocol: Literal["SMB", "NetBIOS"]
user: str
password: str
auth_type: Literal["NTLMv1", "NTLMv2"] = "NTLMv2"
domain: str = ""
port: int | None = None
type: ClassVar[str] = "samba"
5 changes: 5 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ class FTPTransferDTO(FileTransferDTO):
@dataclass
class FTPSTransferDTO(FileTransferDTO):
type: ClassVar[str] = "ftps"


@dataclass
class SambaTransferDTO(FileTransferDTO):
type: ClassVar[str] = "samba"
3 changes: 2 additions & 1 deletion syncmaster/schemas/v1/connections/connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from pydantic import BaseModel, Field

from syncmaster.schemas.v1.auth import ReadBasicAuthSchema, ReadS3AuthSchema
from syncmaster.schemas.v1.auth.samba import ReadSambaAuthSchema
from syncmaster.schemas.v1.types import NameConstr

ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema
ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema


class CreateConnectionBaseSchema(BaseModel):
Expand Down
8 changes: 8 additions & 0 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
OracleConnectionDTO,
PostgresConnectionDTO,
S3ConnectionDTO,
SambaConnectionDTO,
SFTPConnectionDTO,
)
from syncmaster.dto.transfers import (
Expand All @@ -29,6 +30,7 @@
OracleTransferDTO,
PostgresTransferDTO,
S3TransferDTO,
SambaTransferDTO,
SFTPTransferDTO,
)
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
Expand All @@ -43,6 +45,7 @@
from syncmaster.worker.handlers.file.ftps import FTPSHandler
from syncmaster.worker.handlers.file.hdfs import HDFSHandler
from syncmaster.worker.handlers.file.s3 import S3Handler
from syncmaster.worker.handlers.file.samba import SambaHandler
from syncmaster.worker.handlers.file.sftp import SFTPHandler
from syncmaster.worker.settings import WorkerAppSettings

Expand Down Expand Up @@ -105,6 +108,11 @@
FTPSConnectionDTO,
FTPSTransferDTO,
),
"samba": (
SambaHandler,
SambaConnectionDTO,
SambaTransferDTO,
),
}


Expand Down
57 changes: 3 additions & 54 deletions syncmaster/worker/handlers/file/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@

from __future__ import annotations

import os
from typing import TYPE_CHECKING

from onetl.connection import FTP, SparkLocalFS
from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader

from syncmaster.dto.connections import FTPConnectionDTO
from syncmaster.worker.handlers.file.base import FileHandler
from syncmaster.worker.handlers.file.protocol import FileProtocolHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import SparkSession


class FTPHandler(FileHandler):
class FTPHandler(FileProtocolHandler):
connection_dto: FTPConnectionDTO

def connect(self, spark: SparkSession) -> None:
Expand All @@ -29,52 +27,3 @@ def connect(self, spark: SparkSession) -> None:
self.local_connection = SparkLocalFS(
spark=spark,
).check()

def read(self) -> DataFrame:
from pyspark.sql.types import StructType

downloader = FileDownloader(
connection=self.connection,
source_path=self.transfer_dto.directory_path,
local_path=self.temp_dir.name,
)
downloader.run()

reader = FileDFReader(
connection=self.local_connection,
format=self.transfer_dto.file_format,
source_path=self.temp_dir.name,
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
)
df = reader.run()

rows_filter_expression = self._get_rows_filter_expression()
if rows_filter_expression:
df = df.where(rows_filter_expression)

columns_filter_expressions = self._get_columns_filter_expressions()
if columns_filter_expressions:
df = df.selectExpr(*columns_filter_expressions)

return df

def write(self, df: DataFrame) -> None:
writer = FileDFWriter(
connection=self.local_connection,
format=self.transfer_dto.file_format,
target_path=self.temp_dir.name,
options={"if_exists": "replace_entire_directory"},
)
writer.run(df=df)

crc_files = [f for f in os.listdir(self.temp_dir.name) if f.endswith(".crc")]
for file in crc_files:
os.remove(os.path.join(self.temp_dir.name, file))

uploader = FileUploader(
connection=self.connection,
local_path=self.temp_dir.name,
target_path=self.transfer_dto.directory_path,
options=self.transfer_dto.options,
)
uploader.run()
Loading

0 comments on commit ddf4ca8

Please sign in to comment.