Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed Up Publishing Times #1120

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 51 additions & 31 deletions client/ayon_core/lib/file_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,42 +108,62 @@ def add(self, src, dst, mode=MODE_COPY):

self._transfers[dst] = (src, opts)


def _process_futures(self, futures):
"""Wait for futures and raise exceptions if any task fails."""
try:
for future in concurrent.futures.as_completed(futures):
future.result() # If an exception occurs, it will be raised here
except Exception as e:
print(f"File Transaction task failed with error: {e}", file=sys.stderr)
raise

def process(self):
# Backup any existing files
for dst, (src, _) in self._transfers.items():
self.log.debug("Checking file ... {} -> {}".format(src, dst))
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
continue

# Backup original file
# todo: add timestamp or uuid to ensure unique
backup = dst + ".bak"
self._backup_to_original[backup] = dst
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor:
# Submit backup tasks
backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in
self._transfers.items()]
self._process_futures(backup_futures)

# Submit transfer tasks
transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in
self._transfers.items()]
self._process_futures(transfer_futures)

except Exception as e:
print(f"File Transaction Failed: {e}", file=sys.stderr)
sys.exit(1)

def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
return

# Backup original file
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug(f"Backup existing file: {dst} -> {backup}")
os.rename(dst, backup)

def _transfer_file(self, dst, src, opts):
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Backup existing file: {} -> {}".format(dst, backup))
os.rename(dst, backup)

# Copy the files to transfer
for dst, (src, opts) in self._transfers.items():
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Source and destination are same files {} -> {}".format(
src, dst))
continue
f"Source and destination are same files {src} -> {dst}")
return

self._create_folder_for_file(dst)
self._create_folder_for_file(dst)

if opts["mode"] == self.MODE_COPY:
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(
src, dst))
create_hard_link(src, dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug(f"Copying file ... {src} -> {dst}")
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug(f"Hardlinking file ... {src} -> {dst}")
create_hard_link(src, dst)

self._transferred.append(dst)
self._transferred.append(dst)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this runs in a thread now - and hence it should not do this on an object outside of the thread. However, in Python implementations this should still be safe due to the GIL and how the interpreter locks, etc.

Nonetheless, in a potential non-GIL or free threaded Python 3 world this would be unsafe and instead we should be instead relying on the future.result() instead.


def finalize(self):
# Delete any backed up files
Expand Down
20 changes: 14 additions & 6 deletions client/ayon_core/plugins/publish/integrate_hero_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
import copy
import errno
import shutil
import sys
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
Comment on lines +8 to +12
Copy link
Collaborator

@BigRoy BigRoy Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is copied from here.

However, we're also using speedcopy in the codebase without that fallback, e.g. here:

Which may hint that either the fallback is redundant nowadays - or we have other areas in the codebase that are potentially buggy on Linux? @iLLiCiTiT @antirotor do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speedcopy is in dependencies of AYON (

speedcopy = "^2.1"
) and it has its own fallback if it cannot use server-side copy. What is the issue with spedcopy on linux?


import concurrent.futures

import clique
import pyblish.api
Expand Down Expand Up @@ -415,11 +423,11 @@ def integrate_instance(
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
for src_path, dst_path in src_to_dst_file_paths:
self.copy_file(src_path, dst_path)

for src_path, dst_path in other_file_paths_mapping:
self.copy_file(src_path, dst_path)
with concurrent.futures.ThreadPoolExecutor() as executor:
file_futures = []
for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping:
file_futures.append(executor.submit(self.copy_file, src_path, dst_path))
concurrent.futures.wait(file_futures)
BigRoy marked this conversation as resolved.
Show resolved Hide resolved

# Update prepared representation etity data with files
# and integrate it to server.
Expand Down Expand Up @@ -648,7 +656,7 @@ def copy_file(self, src_path, dst_path):
src_path, dst_path
))

shutil.copy(src_path, dst_path)
copyfile(src_path, dst_path)

def version_from_representations(self, project_name, repres):
for repre in repres:
Expand Down
Loading