Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Wrapper: Implement Import Manager #7084

Merged
merged 4 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/python-wrapper/docs/lakefs.import_manager.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
lakefs.import\_manager module
=============================

.. automodule:: lakefs.import_manager
:members:
:undoc-members:
:show-inheritance:
7 changes: 7 additions & 0 deletions clients/python-wrapper/docs/lakefs.models.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
lakefs.models module
====================

.. automodule:: lakefs.models
:members:
:undoc-members:
:show-inheritance:
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ max-line-length=120
max-locals=25
# Maximum number of arguments for function / method
max-args=10
# Maximum number of class attributes
max-attributes=10

[MESSAGES CONTROL]
disable=too-few-public-methods, fixme
Expand Down
5 changes: 3 additions & 2 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Allow importing of models from package root
"""

from lakefs.repository import Repository, RepositoryProperties
from lakefs.reference import Reference, Commit, Change
from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
Expand Down
3 changes: 2 additions & 1 deletion clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from lakefs.object import WriteableObject
from lakefs.object import StoredObject
from lakefs.import_manager import ImportManager
from lakefs.reference import Reference, Change, generate_listing
from lakefs.reference import Reference, generate_listing
from lakefs.models import Change
from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException


Expand Down
16 changes: 1 addition & 15 deletions clients/python-wrapper/lakefs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,12 @@

from lakefs.config import ClientConfig
from lakefs.exceptions import NoAuthenticationFound, NotAuthorizedException, ServerException
from lakefs.namedtuple import LenientNamedTuple
from lakefs.models import ServerStorageConfiguration

# global default client
DEFAULT_CLIENT: Optional[Client] = None


class ServerStorageConfiguration(LenientNamedTuple):
"""
Represent a lakeFS server's storage configuration
"""
blockstore_type: str
pre_sign_support: bool
import_support: bool
blockstore_namespace_example: str
blockstore_namespace_validity_regex: str
pre_sign_support_ui: bool
import_validity_regex: str
default_namespace_prefix: Optional[str] = None


class ServerConfiguration:
"""
Represent a lakeFS server's configuration
Expand Down
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Client configuration module
"""

from __future__ import annotations

import os
from pathlib import Path

Expand Down
34 changes: 20 additions & 14 deletions clients/python-wrapper/lakefs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ class LakeFSException(Exception):
"""
Base exception for all SDK exceptions
"""


class ServerException(LakeFSException):
"""
Generic exception when no other exception is applicable
"""
status_code: int
reason: str

Expand All @@ -21,7 +27,7 @@ def __init__(self, status=None, reason=None):
self.message = reason


class NotFoundException(LakeFSException):
class NotFoundException(ServerException):
"""
Resource could not be found on lakeFS server
"""
Expand All @@ -32,37 +38,31 @@ def __init__(self, status=None, reason=None):
super().__init__(status, reason)


class ForbiddenException(LakeFSException):
class ForbiddenException(ServerException):
"""
Operation not permitted
"""


class NoAuthenticationFound(LakeFSException):
class NoAuthenticationFound(ServerException):
"""
Raised when no authentication method could be found on Client instantiation
"""


class NotAuthorizedException(LakeFSException):
class NotAuthorizedException(ServerException):
"""
User not authorized to perform operation
"""


class ServerException(LakeFSException):
"""
Generic exception when no other exception is applicable
"""


class UnsupportedOperationException(LakeFSException):
class UnsupportedOperationException(ServerException):
"""
Operation not supported by lakeFS server or SDK
"""


class ConflictException(LakeFSException):
class ConflictException(ServerException):
"""
Resource / request conflict
"""
Expand All @@ -74,7 +74,7 @@ class ObjectNotFoundException(NotFoundException, FileNotFoundError):
"""


class ObjectExistsException(LakeFSException, FileExistsError):
class ObjectExistsException(ServerException, FileExistsError):
"""
Raised when Object('...').create(mode='x') and object exists
"""
Expand All @@ -86,12 +86,18 @@ class PermissionException(NotAuthorizedException, PermissionError):
"""


class InvalidRangeException(LakeFSException, OSError):
class InvalidRangeException(ServerException, OSError):
"""
Raised when the reference could not be found in the lakeFS server
"""


class ImportManagerException(LakeFSException):
"""
Import manager exceptions that are not originated from the SDK
"""


_STATUS_CODE_TO_EXCEPTION = {
http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException,
http.HTTPStatus.FORBIDDEN.value: ForbiddenException,
Expand Down
183 changes: 173 additions & 10 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,186 @@
"""
Module implementing import logic
Import module provides a simpler interface to the lakeFS SDK import functionality
"""
from typing import Optional

from __future__ import annotations

import asyncio
from datetime import timedelta
from typing import Optional, Dict, List

import lakefs_sdk

from lakefs.models import ImportStatus
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import ImportManagerException, api_exception_handler

_PREFIX = "common_prefix"
_OBJECT = "object"


class ImportManager:
"""
Manage an import operation on a given repository
ImportManager provides an easy-to-use interface to perform imports with multiple sources.
It provides both synchronous and asynchronous functionality allowing the user to start an import process,
continue executing logic and poll for the import completion.
"""
_client: Client
_repo_id: str
_branch_id: str
_in_progress: bool = False
_import_id: str = None
commit_message: str
commit_metadata: Optional[Dict]
sources: List[lakefs_sdk.ImportLocation]

def __init__(self, repository_id: str, reference_id: str, commit_message: Optional[str] = None,
metadata: dict = None,
client: Client = DEFAULT_CLIENT):
def __init__(self, repository_id: str, branch_id: str, commit_message: Optional[str] = "",
commit_metadata: Optional[Dict] = None, client: Optional[Client] = DEFAULT_CLIENT) -> None:
self._client = client
self._repo_id = repository_id
self._ref_id = reference_id
self._commit_message = commit_message
self._metadata = metadata
self._branch_id = branch_id
self.commit_message = commit_message
self.commit_metadata = commit_metadata
self.sources = []

@property
def import_id(self) -> str:
"""
Returns the id of the current import process
"""
return self._import_id

def prefix(self, object_store_uri: str, destination: str) -> ImportManager:
"""
Creates a new import source of type "common_prefix" and adds it to the list of sources

:param object_store_uri: The URI from which to import the objects
:param destination: The destination prefix relative to the branch
:return: The ImportManager instance (self) after update, to allow operations chaining
"""
self.sources.append(lakefs_sdk.ImportLocation(type=_PREFIX, path=object_store_uri, destination=destination))
return self

def object(self, object_store_uri: str, destination: str) -> ImportManager:
"""
Creates a new import source of type "object" and adds it to the list of sources

:param object_store_uri: The URI from which to import the object
:param destination: The destination path for the object relative to the branch
:return: The ImportManager instance (self) after update, to allow operations chaining
"""
self.sources.append(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination))
return self

def start(self) -> str:
"""
Start import, reporting back (and storing) a process id

:return: The import process identifier in lakeFS
:raises:
ImportManagerException if an import process is already in progress
NotFoundException if branch or repository do not exist
NotAuthorizedException if user is not authorized to perform this operation
ValidationError if path_type is not one of the allowed values
ServerException for any other errors
"""
if self._in_progress:
raise ImportManagerException("Import in progress")

Comment on lines +86 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe this is more of a product question: are we allowed to call an import twice (i.e call (start,wait) twice ) currently every time wait is done, the fields are reset and we're ready to do another import with this manager with the same commit_message and metadata.
Also the import_id isn't erased so status will still return the status of the previous run. something feels a bit inconsistent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import id isn't erased by design so that you can continue to status on the import process even when it was completed.
I was considering making the mgr a single use object but eventually decided this would just make the user experience more burdensome. My assumption is that if we wanted it to be single use - it's better to just replace it with a function call rather then an object

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always modify this behavior in the future as this is not a guarantee we have to commit to

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always modify this behavior in the future as this is not a guarantee we have to commit to

creation = lakefs_sdk.ImportCreation(paths=self.sources,
commit=lakefs_sdk.CommitCreation(message=self.commit_message,
metadata=self.commit_metadata))
with api_exception_handler():
res = self._client.sdk_client.import_api.import_start(repository=self._repo_id,
branch=self._branch_id,
import_creation=creation)
self._import_id = res.id
self._in_progress = True

return self._import_id

async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.ImportStatus:
while True:
with api_exception_handler():
resp = self._client.sdk_client.import_api.import_status(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
if resp.completed:
return resp
if resp.error is not None:
raise ImportManagerException(f"Import Error: {resp.error.message}")

await asyncio.sleep(poll_interval.total_seconds())

def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> ImportStatus:
"""
Poll a started import task ID, blocking until completion

:param poll_interval: The interval for polling the import status.
:return: Import status as returned by the lakeFS server
:raises:
ImportManagerException if no import is in progress
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""
if not self._in_progress:
raise ImportManagerException("No import in progress")

res = asyncio.run(self._wait_for_completion(poll_interval))
self._in_progress = False
self.sources = []
return ImportStatus(**res.dict())

def run(self, poll_interval: Optional[timedelta] = None) -> ImportStatus:
"""
Same as calling start() and then wait()

:param poll_interval: The interval for polling the import status.
:return: Import status as returned by the lakeFS server
:raises:
See start(), wait()
"""
self.start()
wait_kwargs = {} if poll_interval is None else {"poll_interval": poll_interval}
return self.wait(**wait_kwargs)

def cancel(self) -> None:
"""
Cancel an ongoing import process

:raises:
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ConflictException if the import was already completed
ServerException for any other errors
"""
if self._import_id is None: # Can't cancel on no id
raise ImportManagerException("No import in progress")

with api_exception_handler():
self._client.sdk_client.import_api.import_cancel(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
self._in_progress = False
self.sources = []

def status(self) -> ImportStatus:
"""
Get the current import status

:return: Import status as returned by the lakeFS server
:raises:
ImportManagerException if no import is in progress
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""

if self._import_id is None:
raise ImportManagerException("No import in progress")

# TODO: Implement
with api_exception_handler():
res = self._client.sdk_client.import_api.import_status(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
return ImportStatus(**res.dict())
Loading