Skip to content

Commit

Permalink
[DOP-22130] Add support for FileModifiedTimeHWM
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jan 28, 2025
1 parent bfdda89 commit 12f8442
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 142 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/331.dependency.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minimal ``etl-entities`` version is now `2.5.0 <https://github.com/MobileTeleSystems/etl-entities/releases/tag/2.5.0>`_.
19 changes: 9 additions & 10 deletions onetl/connection/file_connection/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import os
import textwrap
from contextlib import suppress
from logging import getLogger
from typing import Optional

Expand Down Expand Up @@ -203,16 +204,14 @@ def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPat
def _get_stat(self, path: RemotePath) -> RemotePathStat:
path_str = self._delete_absolute_path_slash(path)

try:
stat = self.client.stat_object(self.bucket, path_str)
return RemotePathStat(
st_size=stat.size or 0,
st_mtime=stat.last_modified.timestamp() if stat.last_modified else None,
st_uid=stat.owner_name or stat.owner_id,
)

except Exception: # noqa: B001,E722
return RemotePathStat()
with suppress(Exception):
# for some reason, client.stat_object returns less precise st_mtime than client.list_objects
objects = self.client.list_objects(self.bucket, prefix=path_str)
for obj in objects:
if obj.object_name == path_str:
return self._extract_stat_from_entry(path.parent, obj)

return RemotePathStat()

def _remove_file(self, remote_file_path: RemotePath) -> None:
path_str = self._delete_absolute_path_slash(remote_file_path)
Expand Down
35 changes: 30 additions & 5 deletions onetl/file/file_downloader/file_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class FileDownloader(FrozenModel):
# and stop before downloading 101 file
downloader.run()
Incremental download:
Incremental download (by tracking list of file paths):
.. code:: python
Expand All @@ -222,12 +222,37 @@ class FileDownloader(FrozenModel):
connection=sftp,
source_path="/path/to/remote/source",
local_path="/path/to/local",
hwm=FileListHWM(
name="my_unique_hwm_name", directory="/path/to/remote/source"
), # mandatory for IncrementalStrategy
hwm=FileListHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
# download files to "/path/to/local", but only new ones
# download files to "/path/to/local", but only added since previous run
with IncrementalStrategy():
downloader.run()
Incremental download (by tracking file modification time):
.. code:: python
from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.strategy import IncrementalStrategy
from etl_entities.hwm import FileModifiedTimeHWM
sftp = SFTP(...)
# create downloader
downloader = FileDownloader(
connection=sftp,
source_path="/path/to/remote/source",
local_path="/path/to/local",
hwm=FileModifiedTimeHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
# download files to "/path/to/local", but only modified/created since previous run
with IncrementalStrategy():
downloader.run()
Expand Down
108 changes: 100 additions & 8 deletions onetl/strategy/incremental_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class IncrementalStrategy(HWMStrategy):
},
)
Then the downloaded files list is saved as ``FileListHWM`` object into :ref:`HWM Store <hwm>`:
Then the list of original file paths is saved as ``FileListHWM`` object into :ref:`HWM Store <hwm>`:
.. code:: python
Expand All @@ -84,7 +84,7 @@ class IncrementalStrategy(HWMStrategy):
],
)
Next incremental run will download only new files from the source:
Next incremental run will download only new files which were added to the source since previous run:
.. code:: bash
Expand All @@ -96,28 +96,89 @@ class IncrementalStrategy(HWMStrategy):
.. code:: python
# only files which are not in FileListHWM
# only files which are not covered by FileListHWM
DownloadResult(
...,
successful={
LocalFile("/downloaded/file3"),
},
)
New files will be added to the ``FileListHWM`` and saved to :ref:`HWM Store <hwm>`:
Value of ``FileListHWM`` will be updated and saved to :ref:`HWM Store <hwm>`:
.. code:: python
FileListHWM(
...,
entity="/path",
directory="/path",
value=[
"/path/my/file1",
"/path/my/file2",
"/path/my/file3",
],
)
``hwm=FileModifiedTimeHWM(...)``:
First incremental run is just the same as :obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>` -
all files are downloaded:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
.. code:: python
DownloadResult(
...,
successful={
LocalFile("/downloaded/file1"),
LocalFile("/downloaded/file2"),
},
)
Then the maximum modified time of original files is saved as ``FileModifiedTimeHWM`` object into :ref:`HWM Store <hwm>`:
.. code:: python
FileModifiedTimeHWM(
...,
directory="/path",
value=datetime.datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc),
)
Next incremental run will download only files from the source which were modified or created since previous run:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
/path/my/file3
.. code:: python
# only files which are not covered by FileModifiedTimeHWM
DownloadResult(
...,
successful={
LocalFile("/downloaded/file3"),
},
)
Value of ``FileModifiedTimeHWM`` will be updated and and saved to :ref:`HWM Store <hwm>`:
.. code:: python
FileModifiedTimeHWM(
...,
directory="/path",
value=datetime.datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone.utc),
)
.. warning::
FileDownload updates HWM in HWM Store at the end of ``.run()`` call,
Expand Down Expand Up @@ -177,7 +238,7 @@ class IncrementalStrategy(HWMStrategy):
.. warning::
Cannot be used with :ref:`file-downloader` and ``hwm=FileListHWM(...)``
Cannot be used with :ref:`file-downloader`
.. note::
Expand Down Expand Up @@ -332,13 +393,44 @@ class IncrementalStrategy(HWMStrategy):
connection=sftp,
source_path="/remote",
local_path="/local",
hwm=FileListHWM(name="some_hwm_name"),
hwm=FileListHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
with IncrementalStrategy():
df = downloader.run()
# current run will download only files which were added since previous run
Incremental run with :ref:`file-downloader` and ``hwm=FileModifiedTimeHWM(...)``:
.. code:: python
from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.strategy import SnapshotStrategy
from etl_entities.hwm import FileModifiedTimeHWM
sftp = SFTP(
host="sftp.domain.com",
user="user",
password="*****",
)
downloader = FileDownloader(
connection=sftp,
source_path="/remote",
local_path="/local",
hwm=FileModifiedTimeHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
with IncrementalStrategy():
df = downloader.run()
# current run will download only files which were not downloaded in previous runs
# current run will download only files which were modified/created since previous run
"""

hwm: Optional[HWM] = None
Expand Down
2 changes: 1 addition & 1 deletion requirements/core.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
etl-entities>=2.2,<2.5
etl-entities>=2.5,<2.6
evacuator>=1.0,<1.1
frozendict
humanize
Expand Down
Loading

0 comments on commit 12f8442

Please sign in to comment.