diff --git a/clients/python-wrapper/docs/lakefs.import_manager.rst b/clients/python-wrapper/docs/lakefs.import_manager.rst new file mode 100644 index 00000000000..7c594d8371d --- /dev/null +++ b/clients/python-wrapper/docs/lakefs.import_manager.rst @@ -0,0 +1,7 @@ +lakefs.import\_manager module +============================= + +.. automodule:: lakefs.import_manager + :members: + :undoc-members: + :show-inheritance: diff --git a/clients/python-wrapper/docs/lakefs.models.rst b/clients/python-wrapper/docs/lakefs.models.rst new file mode 100644 index 00000000000..3d60f18c7b8 --- /dev/null +++ b/clients/python-wrapper/docs/lakefs.models.rst @@ -0,0 +1,7 @@ +lakefs.models module +==================== + +.. automodule:: lakefs.models + :members: + :undoc-members: + :show-inheritance: diff --git a/clients/python-wrapper/lakefs/.pylintrc b/clients/python-wrapper/lakefs/.pylintrc index b49a678e99d..a2952efb592 100644 --- a/clients/python-wrapper/lakefs/.pylintrc +++ b/clients/python-wrapper/lakefs/.pylintrc @@ -8,6 +8,8 @@ max-line-length=120 max-locals=25 # Maximum number of arguments for function / method max-args=10 +# Maximum number of class attributes +max-attributes=10 [MESSAGES CONTROL] disable=too-few-public-methods, fixme diff --git a/clients/python-wrapper/lakefs/__init__.py b/clients/python-wrapper/lakefs/__init__.py index 9564c9522a4..7515110a78e 100644 --- a/clients/python-wrapper/lakefs/__init__.py +++ b/clients/python-wrapper/lakefs/__init__.py @@ -2,8 +2,9 @@ Allow importing of models from package root """ -from lakefs.repository import Repository, RepositoryProperties -from lakefs.reference import Reference, Commit, Change +from lakefs.repository import Repository +from lakefs.reference import Reference +from lakefs.models import Commit, Change, RepositoryProperties from lakefs.tag import Tag from lakefs.branch import Branch from lakefs.object import StoredObject, WriteableObject, ObjectReader diff --git a/clients/python-wrapper/lakefs/branch.py b/clients/python-wrapper/lakefs/branch.py index fca19b6fc02..ff43b1abff5 100644 --- a/clients/python-wrapper/lakefs/branch.py +++ b/clients/python-wrapper/lakefs/branch.py @@ -10,7 +10,8 @@ from lakefs.object import WriteableObject from lakefs.object import StoredObject from lakefs.import_manager import ImportManager -from lakefs.reference import Reference, Change, generate_listing +from lakefs.reference import Reference, generate_listing +from lakefs.models import Change from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException diff --git a/clients/python-wrapper/lakefs/client.py b/clients/python-wrapper/lakefs/client.py index 0acc38c2e10..fd0ccf62794 100644 --- a/clients/python-wrapper/lakefs/client.py +++ b/clients/python-wrapper/lakefs/client.py @@ -18,26 +18,12 @@ from lakefs.config import ClientConfig from lakefs.exceptions import NoAuthenticationFound, NotAuthorizedException, ServerException -from lakefs.namedtuple import LenientNamedTuple +from lakefs.models import ServerStorageConfiguration # global default client DEFAULT_CLIENT: Optional[Client] = None -class ServerStorageConfiguration(LenientNamedTuple): - """ - Represent a lakeFS server's storage configuration - """ - blockstore_type: str - pre_sign_support: bool - import_support: bool - blockstore_namespace_example: str - blockstore_namespace_validity_regex: str - pre_sign_support_ui: bool - import_validity_regex: str - default_namespace_prefix: Optional[str] = None - - class ServerConfiguration: """ Represent a lakeFS server's configuration diff --git a/clients/python-wrapper/lakefs/config.py b/clients/python-wrapper/lakefs/config.py index 1d4aa52b970..74fc4223e74 100644 --- a/clients/python-wrapper/lakefs/config.py +++ b/clients/python-wrapper/lakefs/config.py @@ -2,6 +2,8 @@ Client configuration module """ +from __future__ import annotations + import os from pathlib import Path diff --git a/clients/python-wrapper/lakefs/exceptions.py b/clients/python-wrapper/lakefs/exceptions.py index 3a6e35fbf37..a06ebdd43f0 100644 --- a/clients/python-wrapper/lakefs/exceptions.py +++ b/clients/python-wrapper/lakefs/exceptions.py @@ -13,6 +13,12 @@ class LakeFSException(Exception): """ Base exception for all SDK exceptions """ + + +class ServerException(LakeFSException): + """ + Generic exception when no other exception is applicable + """ status_code: int reason: str @@ -21,7 +27,7 @@ def __init__(self, status=None, reason=None): self.message = reason -class NotFoundException(LakeFSException): +class NotFoundException(ServerException): """ Resource could not be found on lakeFS server """ @@ -32,37 +38,31 @@ def __init__(self, status=None, reason=None): super().__init__(status, reason) -class ForbiddenException(LakeFSException): +class ForbiddenException(ServerException): """ Operation not permitted """ -class NoAuthenticationFound(LakeFSException): +class NoAuthenticationFound(ServerException): """ Raised when no authentication method could be found on Client instantiation """ -class NotAuthorizedException(LakeFSException): +class NotAuthorizedException(ServerException): """ User not authorized to perform operation """ -class ServerException(LakeFSException): - """ - Generic exception when no other exception is applicable - """ - - -class UnsupportedOperationException(LakeFSException): +class UnsupportedOperationException(ServerException): """ Operation not supported by lakeFS server or SDK """ -class ConflictException(LakeFSException): +class ConflictException(ServerException): """ Resource / request conflict """ @@ -74,7 +74,7 @@ class ObjectNotFoundException(NotFoundException, FileNotFoundError): """ -class ObjectExistsException(LakeFSException, FileExistsError): +class ObjectExistsException(ServerException, FileExistsError): """ Raised when Object('...').create(mode='x') and object exists """ @@ -86,12 +86,18 @@ class PermissionException(NotAuthorizedException, PermissionError): """ -class InvalidRangeException(LakeFSException, OSError): +class InvalidRangeException(ServerException, OSError): """ Raised when the reference could not be found in the lakeFS server """ +class ImportManagerException(LakeFSException): + """ + Import manager exceptions that are not originated from the SDK + """ + + _STATUS_CODE_TO_EXCEPTION = { http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException, http.HTTPStatus.FORBIDDEN.value: ForbiddenException, diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 2b9684b9b7d..0ff4490864b 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -1,23 +1,186 @@ """ -Module implementing import logic +Import module provides a simpler interface to the lakeFS SDK import functionality """ -from typing import Optional +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import Optional, Dict, List + +import lakefs_sdk + +from lakefs.models import ImportStatus from lakefs.client import Client, DEFAULT_CLIENT +from lakefs.exceptions import ImportManagerException, api_exception_handler + +_PREFIX = "common_prefix" +_OBJECT = "object" class ImportManager: """ - Manage an import operation on a given repository + ImportManager provides an easy-to-use interface to perform imports with multiple sources. + It provides both synchronous and asynchronous functionality allowing the user to start an import process, + continue executing logic and poll for the import completion. """ + _client: Client + _repo_id: str + _branch_id: str + _in_progress: bool = False + _import_id: str = None + commit_message: str + commit_metadata: Optional[Dict] + sources: List[lakefs_sdk.ImportLocation] - def __init__(self, repository_id: str, reference_id: str, commit_message: Optional[str] = None, - metadata: dict = None, - client: Client = DEFAULT_CLIENT): + def __init__(self, repository_id: str, branch_id: str, commit_message: Optional[str] = "", + commit_metadata: Optional[Dict] = None, client: Optional[Client] = DEFAULT_CLIENT) -> None: self._client = client self._repo_id = repository_id - self._ref_id = reference_id - self._commit_message = commit_message - self._metadata = metadata + self._branch_id = branch_id + self.commit_message = commit_message + self.commit_metadata = commit_metadata + self.sources = [] + + @property + def import_id(self) -> str: + """ + Returns the id of the current import process + """ + return self._import_id + + def prefix(self, object_store_uri: str, destination: str) -> ImportManager: + """ + Creates a new import source of type "common_prefix" and adds it to the list of sources + + :param object_store_uri: The URI from which to import the objects + :param destination: The destination prefix relative to the branch + :return: The ImportManager instance (self) after update, to allow operations chaining + """ + self.sources.append(lakefs_sdk.ImportLocation(type=_PREFIX, path=object_store_uri, destination=destination)) + return self + + def object(self, object_store_uri: str, destination: str) -> ImportManager: + """ + Creates a new import source of type "object" and adds it to the list of sources + + :param object_store_uri: The URI from which to import the object + :param destination: The destination path for the object relative to the branch + :return: The ImportManager instance (self) after update, to allow operations chaining + """ + self.sources.append(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination)) + return self + + def start(self) -> str: + """ + Start import, reporting back (and storing) a process id + + :return: The import process identifier in lakeFS + :raises: + ImportManagerException if an import process is already in progress + NotFoundException if branch or repository do not exist + NotAuthorizedException if user is not authorized to perform this operation + ValidationError if path_type is not one of the allowed values + ServerException for any other errors + """ + if self._in_progress: + raise ImportManagerException("Import in progress") + + creation = lakefs_sdk.ImportCreation(paths=self.sources, + commit=lakefs_sdk.CommitCreation(message=self.commit_message, + metadata=self.commit_metadata)) + with api_exception_handler(): + res = self._client.sdk_client.import_api.import_start(repository=self._repo_id, + branch=self._branch_id, + import_creation=creation) + self._import_id = res.id + self._in_progress = True + + return self._import_id + + async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.ImportStatus: + while True: + with api_exception_handler(): + resp = self._client.sdk_client.import_api.import_status(repository=self._repo_id, + branch=self._branch_id, + id=self._import_id) + if resp.completed: + return resp + if resp.error is not None: + raise ImportManagerException(f"Import Error: {resp.error.message}") + + await asyncio.sleep(poll_interval.total_seconds()) + + def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> ImportStatus: + """ + Poll a started import task ID, blocking until completion + + :param poll_interval: The interval for polling the import status. + :return: Import status as returned by the lakeFS server + :raises: + ImportManagerException if no import is in progress + NotFoundException if branch, repository or import id do not exist + NotAuthorizedException if user is not authorized to perform this operation + ServerException for any other errors + """ + if not self._in_progress: + raise ImportManagerException("No import in progress") + + res = asyncio.run(self._wait_for_completion(poll_interval)) + self._in_progress = False + self.sources = [] + return ImportStatus(**res.dict()) + + def run(self, poll_interval: Optional[timedelta] = None) -> ImportStatus: + """ + Same as calling start() and then wait() + + :param poll_interval: The interval for polling the import status. + :return: Import status as returned by the lakeFS server + :raises: + See start(), wait() + """ + self.start() + wait_kwargs = {} if poll_interval is None else {"poll_interval": poll_interval} + return self.wait(**wait_kwargs) + + def cancel(self) -> None: + """ + Cancel an ongoing import process + + :raises: + NotFoundException if branch, repository or import id do not exist + NotAuthorizedException if user is not authorized to perform this operation + ConflictException if the import was already completed + ServerException for any other errors + """ + if self._import_id is None: # Can't cancel on no id + raise ImportManagerException("No import in progress") + + with api_exception_handler(): + self._client.sdk_client.import_api.import_cancel(repository=self._repo_id, + branch=self._branch_id, + id=self._import_id) + self._in_progress = False + self.sources = [] + + def status(self) -> ImportStatus: + """ + Get the current import status + + :return: Import status as returned by the lakeFS server + :raises: + ImportManagerException if no import is in progress + NotFoundException if branch, repository or import id do not exist + NotAuthorizedException if user is not authorized to perform this operation + ServerException for any other errors + """ + + if self._import_id is None: + raise ImportManagerException("No import in progress") - # TODO: Implement + with api_exception_handler(): + res = self._client.sdk_client.import_api.import_status(repository=self._repo_id, + branch=self._branch_id, + id=self._import_id) + return ImportStatus(**res.dict()) diff --git a/clients/python-wrapper/lakefs/models.py b/clients/python-wrapper/lakefs/models.py new file mode 100644 index 00000000000..11e93836415 --- /dev/null +++ b/clients/python-wrapper/lakefs/models.py @@ -0,0 +1,99 @@ +""" +Module containing all of lakeFS data models +""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional, Literal + +from lakefs.namedtuple import LenientNamedTuple + + +class Commit(LenientNamedTuple): + """ + NamedTuple representing a lakeFS commit's properties + """ + id: str + parents: List[str] + committer: str + message: str + creation_date: int + meta_range_id: str + metadata: Optional[dict[str, str]] = None + + +class Change(LenientNamedTuple): + """ + NamedTuple representing a diff change between two refs in lakeFS + """ + type: Literal["added", "removed", "changed", "conflict", "prefix_changed"] + path: str + path_type: Literal["common_prefix", "object"] + size_bytes: Optional[int] + + +class ImportStatus(LenientNamedTuple): + """ + NamedTuple representing an ongoing import's status in lakeFS + """ + + class _Error(LenientNamedTuple): + message: str + + completed: bool + update_time: datetime + ingested_objects: Optional[int] + metarange_id: Optional[str] + commit: Optional[Commit] + error: Optional[_Error] + + def __init__(self, **kwargs): + commit = kwargs.get("commit") + if commit is not None: + kwargs["commit"] = Commit(**commit) + + error = kwargs.get("error") + if error is not None: + kwargs["error"] = ImportStatus._Error(**error) + + super().__init__(**kwargs) + + +class ServerStorageConfiguration(LenientNamedTuple): + """ + Represent a lakeFS server's storage configuration + """ + blockstore_type: str + pre_sign_support: bool + import_support: bool + blockstore_namespace_example: str + blockstore_namespace_validity_regex: str + pre_sign_support_ui: bool + import_validity_regex: str + default_namespace_prefix: Optional[str] = None + + +class ObjectStats(LenientNamedTuple): + """ + Represent a lakeFS object's stats + """ + path: str + path_type: str + physical_address: str + checksum: str + mtime: int + physical_address_expiry: Optional[int] = None + size_bytes: Optional[int] = None + metadata: Optional[dict[str, str]] = None + content_type: Optional[str] = None + + +class RepositoryProperties(LenientNamedTuple): + """ + Represent a lakeFS repository's properties + """ + id: str + creation_date: int + default_branch: str + storage_namespace: str diff --git a/clients/python-wrapper/lakefs/namedtuple.py b/clients/python-wrapper/lakefs/namedtuple.py index 1171b5efe85..79631425ca4 100644 --- a/clients/python-wrapper/lakefs/namedtuple.py +++ b/clients/python-wrapper/lakefs/namedtuple.py @@ -23,6 +23,7 @@ def __init__(self, **kwargs): if len(fields) > 0: raise TypeError(f"missing {len(fields)} required arguments: {fields}") + self.__initialized = True super().__init__() diff --git a/clients/python-wrapper/lakefs/object.py b/clients/python-wrapper/lakefs/object.py index 2d560b09551..adb44e9e43e 100644 --- a/clients/python-wrapper/lakefs/object.py +++ b/clients/python-wrapper/lakefs/object.py @@ -30,7 +30,7 @@ PermissionException, ObjectExistsException, ) -from lakefs.namedtuple import LenientNamedTuple +from lakefs.models import ObjectStats _LAKEFS_METADATA_PREFIX = "x-lakefs-meta-" # _BUFFER_SIZE - Writer buffer size. While buffer size not exceed, data will be maintained in memory and file will @@ -41,21 +41,6 @@ WriteModes = Literal['x', 'xb', 'w', 'wb'] -class ObjectStats(LenientNamedTuple): - """ - Represent a lakeFS object's stats - """ - path: str - path_type: str - physical_address: str - checksum: str - mtime: int - physical_address_expiry: Optional[int] = None - size_bytes: Optional[int] = None - metadata: Optional[dict[str, str]] = None - content_type: Optional[str] = None - - class LakeFSIOBase(IO): """ Base class for the lakeFS Reader and Writer classes diff --git a/clients/python-wrapper/lakefs/reference.py b/clients/python-wrapper/lakefs/reference.py index 19aead90839..36c77fd9ed7 100644 --- a/clients/python-wrapper/lakefs/reference.py +++ b/clients/python-wrapper/lakefs/reference.py @@ -4,40 +4,17 @@ from __future__ import annotations -from typing import Optional, Generator, Literal, List +from typing import Optional, Generator import lakefs_sdk +from lakefs.models import Commit, Change from lakefs.client import Client, DEFAULT_CLIENT from lakefs.exceptions import api_exception_handler from lakefs.object import StoredObject -from lakefs.namedtuple import LenientNamedTuple from lakefs.object_manager import ObjectManager -class Commit(LenientNamedTuple): - """ - NamedTuple representing a lakeFS commit's properties - """ - id: str - parents: List[str] - committer: str - message: str - creation_date: int - meta_range_id: str - metadata: Optional[dict[str, str]] = None - - -class Change(LenientNamedTuple): - """ - NamedTuple representing a diff change between two refs in lakeFS - """ - type: Literal["added", "removed", "changed", "conflict", "prefix_changed"] - path: str - path_type: Literal["common_prefix", "object"] - size_bytes: Optional[int] - - class Reference: """ Class representing a reference in lakeFS. diff --git a/clients/python-wrapper/lakefs/repository.py b/clients/python-wrapper/lakefs/repository.py index 61f74ba12e5..9bf1feabf29 100644 --- a/clients/python-wrapper/lakefs/repository.py +++ b/clients/python-wrapper/lakefs/repository.py @@ -5,26 +5,17 @@ from __future__ import annotations from typing import Optional, Generator + import lakefs_sdk +from lakefs.models import RepositoryProperties from lakefs.tag import Tag from lakefs.branch import Branch from lakefs.client import Client, DEFAULT_CLIENT from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException -from lakefs.namedtuple import LenientNamedTuple from lakefs.reference import Reference, generate_listing -class RepositoryProperties(LenientNamedTuple): - """ - Represent a lakeFS repository's properties - """ - id: str - creation_date: int - default_branch: str - storage_namespace: str - - class Repository: """ Class representing a Repository in lakeFS. diff --git a/clients/python-wrapper/tests/integration/test_import.py b/clients/python-wrapper/tests/integration/test_import.py new file mode 100644 index 00000000000..caa22e515fb --- /dev/null +++ b/clients/python-wrapper/tests/integration/test_import.py @@ -0,0 +1,76 @@ +from time import sleep + +from lakefs.exceptions import ImportManagerException, ConflictException +from tests.utests.common import expect_exception_context + +_IMPORT_PATH = "s3://esti-system-testing-data/import-test-data/" + +_FILES_TO_CHECK = ["nested/prefix-1/file002005", + "nested/prefix-2/file001894", + "nested/prefix-3/file000005", + "nested/prefix-4/file000645", + "nested/prefix-5/file001566", + "nested/prefix-6/file002011", + "nested/prefix-7/file000101", ] + + +def test_import_manager(setup_repo): + _, repo = setup_repo + branch = repo.branch("import-branch").create("main") + mgr = branch.import_data(commit_message="my imported data", metadata={"foo": "bar"}) + + # No import running + with expect_exception_context(ImportManagerException): + mgr.cancel() + + # empty import + res = mgr.run() + assert res.error is None + assert res.completed + assert res.commit.id == branch.commit_id() + assert res.commit.message == "my imported data" + assert res.commit.metadata.get("foo") == "bar" + assert res.ingested_objects == 0 + + # Import with objects and prefixes + dest_prefix = "imported/new-prefix/" + mgr.prefix(_IMPORT_PATH + "prefix-1/", + dest_prefix + "prefix-1/").prefix(_IMPORT_PATH + "prefix-2/", + dest_prefix + "prefix-2/") + for o in _FILES_TO_CHECK: + mgr.object(_IMPORT_PATH + o, dest_prefix + o) + + mgr.commit_message = "new commit" + mgr.commit_metadata = None + res = mgr.run() + assert res.error is None + assert res.completed + assert res.commit.id == branch.commit_id() + assert res.commit.message == mgr.commit_message + assert res.commit.metadata.get("foo") is None + assert res.ingested_objects == 4207 + + # Conflict since import completed + with expect_exception_context(ConflictException): + mgr.cancel() + + +def test_import_manager_cancel(setup_repo): + _, repo = setup_repo + branch = repo.branch("import-branch").create("main") + expected_commit_id = branch.commit_id() + expected_commit_message = branch.commit_message() + + mgr = branch.import_data("my imported data", metadata={"foo": "bar"}) + mgr.prefix(_IMPORT_PATH, "import/") + + mgr.start() + sleep(1) + mgr.cancel() + + status = mgr.status() + assert branch.commit_id() == expected_commit_id + assert branch.commit_message() == expected_commit_message + assert not status.completed + assert "Canceled" in status.error.message + assert len(mgr.sources) == 0 diff --git a/clients/python-wrapper/tests/utests/test_import.py b/clients/python-wrapper/tests/utests/test_import.py new file mode 100644 index 00000000000..efbedc5824d --- /dev/null +++ b/clients/python-wrapper/tests/utests/test_import.py @@ -0,0 +1,54 @@ +import time +from datetime import timedelta + +import lakefs_sdk + +from lakefs.exceptions import ImportManagerException +from lakefs.import_manager import ImportManager +from tests.utests.common import get_test_client, expect_exception_context + + +class TestImportManager: + def test_import_start_wait(self, monkeypatch): + clt = get_test_client() + mgr = ImportManager("test_repo", "test_branch", client=clt) + + # Wait before start + with expect_exception_context(ImportManagerException): + mgr.wait() + + with monkeypatch.context(): + def monkey_import_start(*_, **__): + return lakefs_sdk.ImportCreationResponse(id="import_id") + + monkeypatch.setattr(lakefs_sdk.ImportApi, "import_start", monkey_import_start) + res = mgr.start() + assert res == "import_id" + + # try again and expect import in progress + with expect_exception_context(ImportManagerException): + mgr.start() + + requests = 5 + status = lakefs_sdk.ImportStatus(completed=False, + update_time=time.time(), + ingested_objects=500, + metarange_id=None, + commit=None, + error=None) + + def monkey_import_status(*_, **__): + nonlocal requests, status + requests -= 1 + if requests == 0: + status.completed = True + return status + + monkeypatch.setattr(lakefs_sdk.ImportApi, "import_status", monkey_import_status) + res = mgr.wait(poll_interval=timedelta()) + assert res.completed + assert requests == 0 + + # try again and expect error + with expect_exception_context(ImportManagerException): + mgr.wait() diff --git a/clients/python-wrapper/tests/utests/test_repository.py b/clients/python-wrapper/tests/utests/test_repository.py index 60401c4c523..50b4c8bc72e 100644 --- a/clients/python-wrapper/tests/utests/test_repository.py +++ b/clients/python-wrapper/tests/utests/test_repository.py @@ -6,7 +6,7 @@ from tests.utests.common import get_test_repo, TEST_REPO_ARGS, expect_exception_context from lakefs.exceptions import ServerException, NotAuthorizedException, NotFoundException, ConflictException -from lakefs.repository import RepositoryProperties +from lakefs import RepositoryProperties def test_repository_creation(monkeypatch):