Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed Up Publishing Times #1120

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 88 additions & 31 deletions client/ayon_core/lib/file_transaction.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import concurrent.futures
import os
import logging
import sys
import errno
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional

from ayon_core.lib import create_hard_link

Expand Down Expand Up @@ -109,41 +112,52 @@ def add(self, src, dst, mode=MODE_COPY):
self._transfers[dst] = (src, opts)

def process(self):
# Backup any existing files
for dst, (src, _) in self._transfers.items():
self.log.debug("Checking file ... {} -> {}".format(src, dst))
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
continue

# Backup original file
# todo: add timestamp or uuid to ensure unique
backup = dst + ".bak"
self._backup_to_original[backup] = dst
with ThreadPoolExecutor(max_workers=8) as executor:
# Submit backup tasks
backup_futures = [
executor.submit(self._backup_file, dst, src)
for dst, (src, _) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, backup_futures, logger=self.log)

# Submit transfer tasks
transfer_futures = [
executor.submit(self._transfer_file, dst, src, opts)
for dst, (src, opts) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, transfer_futures, logger=self.log)

def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
return

# Backup original file
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug(f"Backup existing file: {dst} -> {backup}")
os.rename(dst, backup)

def _transfer_file(self, dst, src, opts):
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Backup existing file: {} -> {}".format(dst, backup))
os.rename(dst, backup)
f"Source and destination are same files {src} -> {dst}")
return

# Copy the files to transfer
for dst, (src, opts) in self._transfers.items():
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Source and destination are same files {} -> {}".format(
src, dst))
continue
self._create_folder_for_file(dst)

self._create_folder_for_file(dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug(f"Copying file ... {src} -> {dst}")
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug(f"Hardlinking file ... {src} -> {dst}")
create_hard_link(src, dst)

if opts["mode"] == self.MODE_COPY:
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(
src, dst))
create_hard_link(src, dst)

self._transferred.append(dst)
self._transferred.append(dst)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this runs in a thread now - and hence it should not do this on an object outside of the thread. However, in Python implementations this should still be safe due to the GIL and how the interpreter locks, etc.

Nonetheless, in a potential non-GIL or free threaded Python 3 world this would be unsafe and instead we should be instead relying on the future.result() instead.


def finalize(self):
# Delete any backed up files
Expand Down Expand Up @@ -212,3 +226,46 @@ def _same_paths(self, src, dst):
return os.stat(src) == os.stat(dst)

return src == dst


def as_completed_stop_and_raise_on_error(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.

The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions, but re-raise the last exception only.
"""
if logger is None:
logger = logging.getLogger(__name__)

for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers however, will still complete so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return

# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)

# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)

# Raise the last exception
raise exceptions[-1]
24 changes: 18 additions & 6 deletions client/ayon_core/plugins/publish/integrate_hero_version.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import os
import copy
import errno
import itertools
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
Comment on lines +8 to +12
Copy link
Collaborator

@BigRoy BigRoy Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is copied from here.

However, we're also using speedcopy in the codebase without that fallback, e.g. here:

Which may hint that either the fallback is redundant nowadays - or we have other areas in the codebase that are potentially buggy on Linux? @iLLiCiTiT @antirotor do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speedcopy is in dependencies of AYON (

speedcopy = "^2.1"
) and it has its own fallback if it cannot use server-side copy. What is the issue with spedcopy on linux?


import clique
import pyblish.api
Expand All @@ -13,6 +21,7 @@
from ayon_api.utils import create_entity_id

from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,
Expand Down Expand Up @@ -415,11 +424,14 @@ def integrate_instance(
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
for src_path, dst_path in src_to_dst_file_paths:
self.copy_file(src_path, dst_path)

for src_path, dst_path in other_file_paths_mapping:
self.copy_file(src_path, dst_path)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path
in itertools.chain(src_to_dst_file_paths,
other_file_paths_mapping)
]
as_completed_stop_and_raise_on_error(executor, futures)

# Update prepared representation etity data with files
# and integrate it to server.
Expand Down Expand Up @@ -648,7 +660,7 @@ def copy_file(self, src_path, dst_path):
src_path, dst_path
))

shutil.copy(src_path, dst_path)
copyfile(src_path, dst_path)

def version_from_representations(self, project_name, repres):
for repre in repres:
Expand Down