Skip to content

Commit

Permalink
[DOP-22336] Add logic for handling WebDAV transfers (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyasDevelopment authored Feb 3, 2025
1 parent f3abd77 commit 49273f2
Show file tree
Hide file tree
Showing 19 changed files with 576 additions and 8 deletions.
7 changes: 7 additions & 0 deletions .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ TEST_SAMBA_USER=syncmaster
TEST_SAMBA_PASSWORD=test_only
TEST_SAMBA_AUTH_TYPE=NTLMv2

TEST_WEBDAV_HOST_FOR_CONFTEST=webdav
TEST_WEBDAV_PORT_FOR_CONFTEST=80
TEST_WEBDAV_HOST_FOR_WORKER=webdav
TEST_WEBDAV_PORT_FOR_WORKER=80
TEST_WEBDAV_USER=syncmaster
TEST_WEBDAV_PASSWORD=test_only

SPARK_CONF_DIR=/app/tests/spark/hive/conf/
HADOOP_CONF_DIR=/app/tests/spark/hadoop/
HIVE_CONF_DIR=/app/tests/spark/hive/conf/
7 changes: 7 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ export TEST_SAMBA_USER=syncmaster
export TEST_SAMBA_PASSWORD=test_only
export TEST_SAMBA_AUTH_TYPE=NTLMv2

export TEST_WEBDAV_HOST_FOR_CONFTEST=localhost
export TEST_WEBDAV_PORT_FOR_CONFTEST=8010
export TEST_WEBDAV_HOST_FOR_WORKER=webdav
export TEST_WEBDAV_PORT_FOR_WORKER=80
export TEST_WEBDAV_USER=syncmaster
export TEST_WEBDAV_PASSWORD=test_only

export SPARK_CONF_DIR=./tests/spark/hive/conf/
export HADOOP_CONF_DIR=./tests/spark/hadoop/
export HIVE_CONF_DIR=./tests/spark/hive/conf/
6 changes: 5 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ jobs:
name: Samba tests
uses: ./.github/workflows/samba-tests.yml

webdav_tests:
name: WebDAV tests
uses: ./.github/workflows/webdav-tests.yml

scheduler_tests:
name: Scheduler tests
uses: ./.github/workflows/scheduler-tests.yml
Expand All @@ -72,7 +76,7 @@ jobs:
name: Tests done
runs-on: ubuntu-latest

needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests, samba_tests]
needs: [unit_tests, scheduler_tests, oracle_tests, clickhouse_tests, mssql_tests, mysql_tests, hive_tests, hdfs_tests, s3_tests, sftp_tests, ftp_tests, ftps_tests, samba_tests, webdav_tests]
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
79 changes: 79 additions & 0 deletions .github/workflows/webdav-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: WebDAV tests
on:
workflow_call:

env:
DEFAULT_PYTHON: '3.12'

jobs:
test:
name: Run WebDAV tests
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Cache jars
uses: actions/cache@v4
with:
path: ./cached_jars
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-webdav
restore-keys: |
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-webdav
${{ runner.os }}-python-
- name: Build Worker Image
uses: docker/build-push-action@v6
with:
context: .
tags: mtsrus/syncmaster-worker:${{ github.sha }}
target: test
file: docker/Dockerfile.worker
load: true
cache-from: mtsrus/syncmaster-worker:develop

- name: Docker compose up
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
docker compose -f docker-compose.test.yml --profile webdav up -d --wait --wait-timeout 200
env:
WORKER_IMAGE_TAG: ${{ github.sha }}

- name: Run WebDAV Tests
run: |
docker compose -f ./docker-compose.test.yml --profile webdav exec -T worker coverage run -m pytest -vvv -s -m "worker and webdav"
- name: Dump worker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
images: mtsrus/syncmaster-worker
dest: ./logs

# This is important, as coverage is exported after receiving SIGTERM
- name: Shutdown
if: always()
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
- name: Upload worker logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: worker-logs-webdav
path: logs/*

- name: Upload coverage results
uses: actions/upload-artifact@v4
with:
name: coverage-webdav
path: reports/*
# https://github.com/actions/upload-artifact/issues/602
include-hidden-files: true
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ test-integration-samba: test-db ##@Test Run integration tests for Samb
docker compose -f docker-compose.test.yml --profile samba up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m samba $(PYTEST_ARGS)

test-integration-webdav: test-db ##@Test Run integration tests for WebDAV
docker compose -f docker-compose.test.yml --profile webdav up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m webdav $(PYTEST_ARGS)

test-integration: test-db ##@Test Run all integration tests
docker compose -f docker-compose.test.yml --profile all up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration $(PYTEST_ARGS)
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ List of currently supported connections:
* FTPS
* SFTP
* Samba
* WebDAV

Current Data.SyncMaster implementation provides following components:

Expand Down
24 changes: 19 additions & 5 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ services:
condition: service_completed_successfully
rabbitmq:
condition: service_healthy
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, all]
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

test-postgres:
image: postgres
Expand All @@ -139,7 +139,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, all]
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, webdav, all]

test-s3:
image: bitnami/minio:latest
Expand Down Expand Up @@ -209,7 +209,6 @@ services:
platform: linux/amd64
profiles: [mysql, all]


metastore-hive:
image: postgres
restart: unless-stopped
Expand All @@ -225,7 +224,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all]
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, webdav, all]

keycloak:
image: quay.io/keycloak/keycloak:latest
Expand Down Expand Up @@ -264,7 +263,7 @@ services:
HIVE_METASTORE_DB_USER: test_hive
HIVE_METASTORE_DB_PASSWORD: test_hive
# writing spark dataframe to s3, sftp, ftp, ftps xml file fails without running hive metastore server
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, all]
profiles: [hive, hdfs, s3, sftp, ftp, ftps, samba, webdav, all]

test-sftp:
image: ${SFTP_IMAGE:-linuxserver/openssh-server}
Expand Down Expand Up @@ -327,6 +326,21 @@ services:
entrypoint: [/custom_entrypoint.sh]
profiles: [samba, all]

webdav:
image: ${WEBDAV_IMAGE:-chonjay21/webdav:latest}
restart: unless-stopped
environment:
- APP_USER_NAME=syncmaster
- APP_USER_PASSWD=test_only
- APP_UID=1000
- APP_GID=1000
ports:
- 8010:80
volumes:
# Remove after https://github.com/chonjay21/docker-webdav/pull/3
- ./docker/webdav/on_post_init.sh:/sources/webdav/eventscripts/on_post_init.sh
profiles: [webdav, all]

volumes:
postgres_test_data:
rabbitmq_test_data:
Expand Down
5 changes: 5 additions & 0 deletions docker/webdav/on_post_init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -e

# allow create files and directories
chown -R www-data:www-data /var/webdav
1 change: 1 addition & 0 deletions docs/changelog/next_release/194.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add logic for handling WebDAV transfers
10 changes: 10 additions & 0 deletions syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,13 @@ class SambaConnectionDTO(ConnectionDTO):
domain: str = ""
port: int | None = None
type: ClassVar[str] = "samba"


@dataclass
class WebDAVConnectionDTO(ConnectionDTO):
host: str
port: int
user: str
password: str
protocol: Literal["http", "https"] = "https"
type: ClassVar[str] = "webdav"
5 changes: 5 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ class FTPSTransferDTO(FileTransferDTO):
@dataclass
class SambaTransferDTO(FileTransferDTO):
type: ClassVar[str] = "samba"


@dataclass
class WebDAVTransferDTO(FileTransferDTO):
type: ClassVar[str] = "webdav"
8 changes: 8 additions & 0 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
S3ConnectionDTO,
SambaConnectionDTO,
SFTPConnectionDTO,
WebDAVConnectionDTO,
)
from syncmaster.dto.transfers import (
ClickhouseTransferDTO,
Expand All @@ -32,6 +33,7 @@
S3TransferDTO,
SambaTransferDTO,
SFTPTransferDTO,
WebDAVTransferDTO,
)
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
from syncmaster.worker.handlers.base import Handler
Expand All @@ -47,6 +49,7 @@
from syncmaster.worker.handlers.file.s3 import S3Handler
from syncmaster.worker.handlers.file.samba import SambaHandler
from syncmaster.worker.handlers.file.sftp import SFTPHandler
from syncmaster.worker.handlers.file.webdav import WebDAVHandler
from syncmaster.worker.settings import WorkerAppSettings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,6 +116,11 @@
SambaConnectionDTO,
SambaTransferDTO,
),
"webdav": (
WebDAVHandler,
WebDAVConnectionDTO,
WebDAVTransferDTO,
),
}


Expand Down
31 changes: 31 additions & 0 deletions syncmaster/worker/handlers/file/webdav.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import TYPE_CHECKING

from onetl.connection import SparkLocalFS, WebDAV

from syncmaster.dto.connections import WebDAVConnectionDTO
from syncmaster.worker.handlers.file.protocol import FileProtocolHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession


class WebDAVHandler(FileProtocolHandler):
connection_dto: WebDAVConnectionDTO

def connect(self, spark: SparkSession) -> None:
self.connection = WebDAV(
host=self.connection_dto.host,
port=self.connection_dto.port,
protocol=self.connection_dto.protocol,
user=self.connection_dto.user,
password=self.connection_dto.password,
ssl_verify=False,
).check()
self.local_connection = SparkLocalFS(
spark=spark,
).check()
2 changes: 1 addition & 1 deletion syncmaster/worker/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_packages(connection_type: str) -> list[str]:
spark_version = pyspark.__version__
return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages

if connection_type in ("hdfs", "sftp", "ftp", "ftps", "samba"):
if connection_type in ("hdfs", "sftp", "ftp", "ftps", "samba", "webdav"):
return file_formats_spark_packages

# If the database type does not require downloading .jar packages
Expand Down
8 changes: 8 additions & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ class TestSettings(BaseSettings):
TEST_SAMBA_PASSWORD: str
TEST_SAMBA_AUTH_TYPE: str = "NTLMv2"

TEST_WEBDAV_HOST_FOR_CONFTEST: str
TEST_WEBDAV_PORT_FOR_CONFTEST: int
TEST_WEBDAV_HOST_FOR_WORKER: str
TEST_WEBDAV_PORT_FOR_WORKER: int
TEST_WEBDAV_PROTOCOL: str = "http"
TEST_WEBDAV_USER: str
TEST_WEBDAV_PASSWORD: str

@model_validator(mode="before")
def check_sid_and_service_name(cls, values):
sid = values.get("TEST_ORACLE_SID")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,13 @@
from tests.test_integration.test_run_transfer.connection_fixtures.spark_fixtures import (
spark,
)
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
prepare_webdav,
webdav_connection,
webdav_file_connection,
webdav_file_connection_with_path,
webdav_file_df_connection,
webdav_file_df_connection_with_path,
webdav_for_conftest,
webdav_for_worker,
)
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
)
)

if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba"}):
if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba", "webdav"}):
# excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902
file_formats_spark_packages: list[str] = XML.get_packages(
spark_version=pyspark.__version__,
Expand Down
Loading

0 comments on commit 49273f2

Please sign in to comment.