Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-19896] Add Clickhouse API schema #124

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Data.SyncMaster is as low-code ETL tool for transfering data between databases a
List of currently supported connections:

* Apache Hive
* Clickhouse
* Postgres
* Oracle
* HDFS
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/124.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Clickhouse API schema
3 changes: 2 additions & 1 deletion syncmaster/backend/api/v1/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from syncmaster.exceptions.credentials import AuthDataNotFoundError
from syncmaster.exceptions.group import GroupNotFoundError
from syncmaster.schemas.v1.connection_types import (
CLICKHOUSE_TYPE,
HDFS_TYPE,
HIVE_TYPE,
ORACLE_TYPE,
Expand All @@ -37,7 +38,7 @@

router = APIRouter(tags=["Connections"], responses=get_error_responses())

CONNECTION_TYPES = ORACLE_TYPE, POSTGRES_TYPE, HIVE_TYPE, S3_TYPE, HDFS_TYPE
CONNECTION_TYPES = ORACLE_TYPE, POSTGRES_TYPE, CLICKHOUSE_TYPE, HIVE_TYPE, S3_TYPE, HDFS_TYPE


@router.get("/connections")
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/schemas/v1/connection_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
HIVE_TYPE = Literal["hive"]
ORACLE_TYPE = Literal["oracle"]
POSTGRES_TYPE = Literal["postgres"]
CLICKHOUSE_TYPE = Literal["clickhouse"]
S3_TYPE = Literal["s3"]
HDFS_TYPE = Literal["hdfs"]

Expand All @@ -14,5 +15,6 @@ class ConnectionType(str, Enum):
POSTGRES = "postgres"
HIVE = "hive"
ORACLE = "oracle"
CLICKHOUSE = "clickhouse"
S3 = "s3"
HDFS = "hdfs"
47 changes: 47 additions & 0 deletions syncmaster/schemas/v1/connections/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from pydantic import BaseModel, Field, SecretStr

from syncmaster.schemas.v1.connection_types import CLICKHOUSE_TYPE


class ClickhouseBaseSchema(BaseModel):
type: CLICKHOUSE_TYPE

class Config:
from_attributes = True


class ReadClickhouseConnectionSchema(ClickhouseBaseSchema):
host: str
port: int
database: str | None = None
additional_params: dict = Field(default_factory=dict)


class ReadClickhouseAuthSchema(ClickhouseBaseSchema):
user: str


class UpdateClickhouseConnectionSchema(ClickhouseBaseSchema):
host: str | None = None
port: int | None = None
database: str | None = None
additional_params: dict | None = Field(default_factory=dict)


class UpdateClickhouseAuthSchema(ClickhouseBaseSchema):
user: str | None = None # noqa: F722
password: SecretStr | None = None


class CreateClickhouseConnectionSchema(ClickhouseBaseSchema):
host: str
port: int
database: str | None = None
additional_params: dict = Field(default_factory=dict)


class CreateClickhouseAuthSchema(ClickhouseBaseSchema):
user: str
password: SecretStr
32 changes: 29 additions & 3 deletions syncmaster/schemas/v1/connections/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

from pydantic import BaseModel, Field, model_validator

from syncmaster.schemas.v1.connections.clickhouse import (
CreateClickhouseAuthSchema,
CreateClickhouseConnectionSchema,
ReadClickhouseAuthSchema,
ReadClickhouseConnectionSchema,
UpdateClickhouseAuthSchema,
UpdateClickhouseConnectionSchema,
)
from syncmaster.schemas.v1.connections.hdfs import (
HDFSCreateAuthSchema,
HDFSCreateConnectionSchema,
Expand Down Expand Up @@ -51,12 +59,14 @@
| HDFSReadConnectionSchema
| ReadOracleConnectionSchema
| ReadPostgresConnectionSchema
| ReadClickhouseConnectionSchema
| S3ReadConnectionSchema
)
CreateConnectionDataSchema = (
CreateHiveConnectionSchema
| CreateOracleConnectionSchema
| CreatePostgresConnectionSchema
| CreateClickhouseConnectionSchema
| HDFSCreateConnectionSchema
| S3CreateConnectionSchema
)
Expand All @@ -66,15 +76,31 @@
| S3UpdateConnectionSchema
| UpdateOracleConnectionSchema
| UpdatePostgresConnectionSchema
| UpdateClickhouseConnectionSchema
)
ReadConnectionAuthDataSchema = (
ReadHiveAuthSchema | ReadOracleAuthSchema | ReadPostgresAuthSchema | S3ReadAuthSchema | HDFSReadAuthSchema
ReadHiveAuthSchema
| ReadOracleAuthSchema
| ReadPostgresAuthSchema
| ReadClickhouseAuthSchema
| S3ReadAuthSchema
| HDFSReadAuthSchema
)
CreateConnectionAuthDataSchema = (
CreateHiveAuthSchema | CreateOracleAuthSchema | CreatePostgresAuthSchema | S3CreateAuthSchema | HDFSCreateAuthSchema
CreateHiveAuthSchema
| CreateOracleAuthSchema
| CreatePostgresAuthSchema
| CreateClickhouseAuthSchema
| S3CreateAuthSchema
| HDFSCreateAuthSchema
)
UpdateConnectionAuthDataSchema = (
UpdateHiveAuthSchema | UpdateOracleAuthSchema | UpdatePostgresAuthSchema | S3UpdateAuthSchema | HDFSUpdateAuthSchema
UpdateHiveAuthSchema
| UpdateOracleAuthSchema
| UpdatePostgresAuthSchema
| UpdateClickhouseAuthSchema
| S3UpdateAuthSchema
| HDFSUpdateAuthSchema
)


Expand Down
7 changes: 7 additions & 0 deletions syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from syncmaster.schemas.v1.connections.connection import ReadConnectionSchema
from syncmaster.schemas.v1.page import PageSchema
from syncmaster.schemas.v1.transfers.db import (
ClickhouseReadTransferSourceAndTarget,
HiveReadTransferSourceAndTarget,
OracleReadTransferSourceAndTarget,
PostgresReadTransferSourceAndTarget,
Expand All @@ -31,6 +32,7 @@
| HDFSReadTransferSource
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3ReadTransferSource
)

Expand All @@ -39,6 +41,7 @@
| HDFSReadTransferTarget
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3ReadTransferTarget
)

Expand All @@ -47,6 +50,7 @@
| HDFSCreateTransferSource
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3CreateTransferSource
)

Expand All @@ -55,6 +59,7 @@
| HDFSCreateTransferTarget
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3CreateTransferTarget
)

Expand All @@ -63,6 +68,7 @@
| HDFSReadTransferSource
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3CreateTransferSource
| None
)
Expand All @@ -72,6 +78,7 @@
| HDFSReadTransferSource
| HiveReadTransferSourceAndTarget
| OracleReadTransferSourceAndTarget
| ClickhouseReadTransferSourceAndTarget
| S3CreateTransferTarget
| None
)
Expand Down
11 changes: 10 additions & 1 deletion syncmaster/schemas/v1/transfers/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from pydantic import BaseModel

from syncmaster.schemas.v1.connection_types import HIVE_TYPE, ORACLE_TYPE, POSTGRES_TYPE
from syncmaster.schemas.v1.connection_types import (
CLICKHOUSE_TYPE,
HIVE_TYPE,
ORACLE_TYPE,
POSTGRES_TYPE,
)


class ReadDBTransfer(BaseModel):
Expand All @@ -21,3 +26,7 @@ class OracleReadTransferSourceAndTarget(ReadDBTransfer):

class PostgresReadTransferSourceAndTarget(ReadDBTransfer):
type: POSTGRES_TYPE


class ClickhouseReadTransferSourceAndTarget(ReadDBTransfer):
type: CLICKHOUSE_TYPE
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.db.models import AuthData, Connection
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.settings import Settings
from tests.mocks import MockGroup, UserTestRoles

pytestmark = [pytest.mark.asyncio, pytest.mark.backend, pytest.mark.clickhouse]


async def test_developer_plus_can_create_clickhouse_connection(
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
client: AsyncClient,
group: MockGroup,
session: AsyncSession,
settings: Settings,
role_developer_plus: UserTestRoles,
):
# Arrange
user = group.get_member_of_role(role_developer_plus)

# Act
result = await client.post(
"v1/connections",
headers={"Authorization": f"Bearer {user.token}"},
json={
"group_id": group.id,
"name": "New connection",
"description": "",
"connection_data": {
"type": "clickhouse",
"host": "127.0.0.1",
"port": 8123,
"database": "database_name",
},
"auth_data": {
"type": "clickhouse",
"user": "user",
"password": "secret",
},
},
)
connection = (
await session.scalars(
select(Connection).filter_by(
name="New connection",
),
)
).first()

creds = (
await session.scalars(
select(AuthData).filter_by(
connection_id=connection.id,
),
)
).one()

# Assert
decrypted = decrypt_auth_data(creds.value, settings=settings)
assert result.status_code == 200
assert result.json() == {
"id": connection.id,
"name": connection.name,
"description": connection.description,
"group_id": connection.group_id,
"connection_data": {
"type": connection.data["type"],
"host": connection.data["host"],
"port": connection.data["port"],
"database": connection.data["database"],
"additional_params": connection.data["additional_params"],
},
"auth_data": {
"type": decrypted["type"],
"user": decrypted["user"],
},
}
4 changes: 2 additions & 2 deletions tests/test_unit/test_connections/test_create_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async def test_check_fields_validation_on_create_connection(
"context": {
"discriminator": "'type'",
"tag": "POSTGRESQL",
"expected_tags": "'hive', 'oracle', 'postgres', 'hdfs', 's3'",
"expected_tags": "'hive', 'oracle', 'postgres', 'clickhouse', 'hdfs', 's3'",
},
"input": {
"type": "POSTGRESQL",
Expand All @@ -292,7 +292,7 @@ async def test_check_fields_validation_on_create_connection(
"database_name": "postgres",
},
"location": ["body", "connection_data"],
"message": "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'hive', 'oracle', 'postgres', 'hdfs', 's3'",
"message": "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'hive', 'oracle', 'postgres', 'clickhouse', 'hdfs', 's3'",
"code": "union_tag_invalid",
},
],
Expand Down
4 changes: 2 additions & 2 deletions tests/test_unit/test_connections/test_read_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ async def test_search_connections_with_nonexistent_query(
@pytest.mark.parametrize(
"filter_params, expected_total",
[
({}, 5), # No filters applied, expecting all connections
({}, 6), # No filters applied, expecting all connections
({"type": ["oracle"]}, 1),
({"type": ["postgres", "hive"]}, 2),
({"type": ["postgres", "hive", "oracle", "hdfs", "s3"]}, 5),
({"type": ["postgres", "hive", "oracle", "clickhouse", "hdfs", "s3"]}, 6),
],
ids=[
"no_filters",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_unit/test_transfers/test_create_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ async def test_superuser_can_create_transfer(
"location": ["body", "source_params"],
"message": (
"Input tag 'new some connection type' found using 'type' "
"does not match any of the expected tags: 'postgres', 'hdfs', 'hive', 'oracle', 's3'"
"does not match any of the expected tags: 'postgres', 'hdfs', 'hive', 'oracle', 'clickhouse', 's3'"
),
"code": "union_tag_invalid",
"context": {
"discriminator": "'type'",
"expected_tags": "'postgres', 'hdfs', 'hive', 'oracle', 's3'",
"expected_tags": "'postgres', 'hdfs', 'hive', 'oracle', 'clickhouse', 's3'",
"tag": "new some connection type",
},
"input": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ async def group_transfers(
source_params["directory_path"] = "/path/to/source"
target_params.update(common_params)
target_params["directory_path"] = "/path/to/target"
elif connection_type in [ConnectionType.HIVE, ConnectionType.POSTGRES, ConnectionType.ORACLE]:
elif connection_type in [
ConnectionType.HIVE,
ConnectionType.POSTGRES,
ConnectionType.ORACLE,
ConnectionType.CLICKHOUSE,
]:
source_params["table_name"] = "source_table"
target_params["table_name"] = "target_table"

Expand Down
Loading