Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: downloaders rework #903

Merged
merged 13 commits into from
Nov 26, 2024
Merged
541 changes: 256 additions & 285 deletions poetry.lock

Large diffs are not rendered by default.

92 changes: 91 additions & 1 deletion src/program/managers/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import sys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused import sys

The sys module imported on line 2 is not used in the current codebase. Since it's only referenced in the commented-out code, please remove this unused import to clean up the code.

Apply this diff to remove the unused import:

-import sys
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import sys
🧰 Tools
🪛 Ruff (0.8.0)

2-2: sys imported but unused

Remove unused import: sys

(F401)

import threading
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused import time

The time module imported on line 4 is not used in the current codebase. It was only used in the commented-out code. Please remove this unused import.

Apply this diff to remove the unused import:

-import time
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import time
🧰 Tools
🪛 Ruff (0.8.0)

4-4: time imported but unused

Remove unused import: time

(F401)

import traceback
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
Expand Down Expand Up @@ -170,7 +172,6 @@ def submit_job(self, service, program, event=None):
item (Event, optional): The event item to process. Defaults to None.
"""
log_message = f"Submitting service {service.__name__} to be executed"
item_id = None
# Content services dont provide an event.
if event:
log_message += f" with {event.log_message}"
Expand All @@ -186,6 +187,95 @@ def submit_job(self, service, program, event=None):
sse_manager.publish_event("event_update", self.get_event_updates())
future.add_done_callback(lambda f:self._process_future(f, service))

# For debugging purposes we can monitor the execution time of the service. (comment out above and uncomment below)
# def submit_job(self, service, program, event=None):
# """
# Submits a job to be executed by the service.

# Args:
# service (type): The service class to execute.
# program (Program): The program containing the service.
# item (Event, optional): The event item to process. Defaults to None.
# """
# log_message = f"Submitting service {service.__name__} to be executed"
# if event:
# log_message += f" with {event.log_message}"
# logger.debug(log_message)

# cancellation_event = threading.Event()
# executor = self._find_or_create_executor(service)

# # Add start time to track execution duration
# start_time = datetime.now()

# def _monitor_execution(future):
# """Monitor execution time and log if taking too long"""
# while not future.done():
# execution_time = (datetime.now() - start_time).total_seconds()
# if execution_time > 180: # 3 minutes
# current_thread = None
# for thread in threading.enumerate():
# if thread.name.startswith(service.__name__) and not thread.name.endswith('_monitor'):
# current_thread = thread
# break

# if current_thread:
# # Get stack frames for the worker thread
# frames = sys._current_frames()
# thread_frame = None
# for thread_id, frame in frames.items():
# if thread_id == current_thread.ident:
# thread_frame = frame
# break

# if thread_frame:
# stack_trace = ''.join(traceback.format_stack(thread_frame))
# else:
# stack_trace = "Could not get stack trace for worker thread"
# else:
# stack_trace = "Could not find worker thread"

# logger.warning(
# f"Service {service.__name__} execution taking longer than 3 minutes!\n"
# f"Event: {event.log_message if event else 'No event'}\n"
# f"Execution time: {execution_time:.1f} seconds\n"
# f"Thread name: {current_thread.name if current_thread else 'Unknown'}\n"
# f"Thread alive: {current_thread.is_alive() if current_thread else 'Unknown'}\n"
# f"Stack trace:\n{stack_trace}"
# )

# # Cancel the future and kill the thread
# future.cancellation_event.set()
# future.cancel()
# if current_thread:
# logger.warning(f"Killing thread {current_thread.name} due to timeout")
# self._futures.remove(future)
# if event:
# self.remove_event_from_running(event)
# return # Exit the monitoring thread

# time.sleep(60) # Check every minute

# future = executor.submit(db_functions.run_thread_with_db_item,
# program.all_services[service].run,
# service, program, event, cancellation_event)

# # Start monitoring thread
# monitor_thread = threading.Thread(
# target=_monitor_execution,
# args=(future,),
# name=f"{service.__name__}_monitor",
# daemon=True
# )
# monitor_thread.start()

# future.cancellation_event = cancellation_event
# if event:
# future.event = event
# self._futures.append(future)
# sse_manager.publish_event("event_update", self.get_event_updates())
# future.add_done_callback(lambda f: self._process_future(f, service))

Comment on lines +190 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove large block of commented-out code

Lines 190-278 contain a large block of code that is entirely commented out. Keeping such large blocks of commented-out code can clutter the codebase and reduce readability. If this code is no longer needed, please remove it. If it's intended for future use, consider using version control branches or feature toggles.

Apply this diff to remove the commented-out code:

-    # For debugging purposes we can monitor the execution time of the service. (comment out above and uncomment below)
-    # def submit_job(self, service, program, event=None):
-    #     """
-    #     Submits a job to be executed by the service.
-    #     ...
-    #     future.add_done_callback(lambda f: self._process_future(f, service))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# For debugging purposes we can monitor the execution time of the service. (comment out above and uncomment below)
# def submit_job(self, service, program, event=None):
# """
# Submits a job to be executed by the service.
# Args:
# service (type): The service class to execute.
# program (Program): The program containing the service.
# item (Event, optional): The event item to process. Defaults to None.
# """
# log_message = f"Submitting service {service.__name__} to be executed"
# if event:
# log_message += f" with {event.log_message}"
# logger.debug(log_message)
# cancellation_event = threading.Event()
# executor = self._find_or_create_executor(service)
# # Add start time to track execution duration
# start_time = datetime.now()
# def _monitor_execution(future):
# """Monitor execution time and log if taking too long"""
# while not future.done():
# execution_time = (datetime.now() - start_time).total_seconds()
# if execution_time > 180: # 3 minutes
# current_thread = None
# for thread in threading.enumerate():
# if thread.name.startswith(service.__name__) and not thread.name.endswith('_monitor'):
# current_thread = thread
# break
# if current_thread:
# # Get stack frames for the worker thread
# frames = sys._current_frames()
# thread_frame = None
# for thread_id, frame in frames.items():
# if thread_id == current_thread.ident:
# thread_frame = frame
# break
# if thread_frame:
# stack_trace = ''.join(traceback.format_stack(thread_frame))
# else:
# stack_trace = "Could not get stack trace for worker thread"
# else:
# stack_trace = "Could not find worker thread"
# logger.warning(
# f"Service {service.__name__} execution taking longer than 3 minutes!\n"
# f"Event: {event.log_message if event else 'No event'}\n"
# f"Execution time: {execution_time:.1f} seconds\n"
# f"Thread name: {current_thread.name if current_thread else 'Unknown'}\n"
# f"Thread alive: {current_thread.is_alive() if current_thread else 'Unknown'}\n"
# f"Stack trace:\n{stack_trace}"
# )
# # Cancel the future and kill the thread
# future.cancellation_event.set()
# future.cancel()
# if current_thread:
# logger.warning(f"Killing thread {current_thread.name} due to timeout")
# self._futures.remove(future)
# if event:
# self.remove_event_from_running(event)
# return # Exit the monitoring thread
# time.sleep(60) # Check every minute
# future = executor.submit(db_functions.run_thread_with_db_item,
# program.all_services[service].run,
# service, program, event, cancellation_event)
# # Start monitoring thread
# monitor_thread = threading.Thread(
# target=_monitor_execution,
# args=(future,),
# name=f"{service.__name__}_monitor",
# daemon=True
# )
# monitor_thread.start()
# future.cancellation_event = cancellation_event
# if event:
# future.event = event
# self._futures.append(future)
# sse_manager.publish_event("event_update", self.get_event_updates())
# future.add_done_callback(lambda f: self._process_future(f, service))

def cancel_job(self, item_id: str, suppress_logs=False):
"""
Cancels a job associated with the given item.
Expand Down
2 changes: 1 addition & 1 deletion src/program/media/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .item import Episode, MediaItem, Movie, Season, Show, ShowMediaType, MovieMediaType, MediaType # noqa
from .item import Episode, MediaItem, Movie, Season, Show # noqa
from .state import States # noqa
24 changes: 4 additions & 20 deletions src/program/media/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,6 @@
from ..db.db_functions import blacklist_stream, reset_streams
from .stream import Stream

class ShowMediaType(Enum):
"""Show media types"""
Show = "show"
Season = "season"
Episode = "episode"

class MovieMediaType(Enum):
"""Media types"""
Movie = "movie"

class MediaType(Enum):
"""Combined media types"""
Show = ShowMediaType.Show.value
Season = ShowMediaType.Season.value
Episode = ShowMediaType.Episode.value
Movie = MovieMediaType.Movie.value

class MediaItem(db.Model):
"""MediaItem class"""
Expand Down Expand Up @@ -426,7 +410,7 @@ def copy(self, other):
return self

def __init__(self, item):
self.type = MovieMediaType.Movie.value
self.type = "movie"
self.file = item.get("file", None)
super().__init__(item)

Expand All @@ -448,7 +432,7 @@ class Show(MediaItem):
}

def __init__(self, item):
self.type = ShowMediaType.Show.value
self.type = "show"
self.locations = item.get("locations", [])
self.seasons: list[Season] = item.get("seasons", [])
self.propagate_attributes_to_childs()
Comment on lines +435 to 438
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix initialization order in Show constructor.

The propagation of attributes to child elements occurs before the parent class initialization, which could lead to undefined behavior if the parent class initialization modifies any attributes used in the propagation.

  def __init__(self, item):
-     self.type = "show"
-     self.locations = item.get("locations", [])
-     self.seasons: list[Season] = item.get("seasons", [])
-     self.propagate_attributes_to_childs()
-     super().__init__(item)
+     super().__init__(item)
+     self.type = MEDIA_TYPE_SHOW
+     validate_media_type(self.type)
+     self.locations = item.get("locations", [])
+     self.seasons: list[Season] = item.get("seasons", [])
+     self.propagate_attributes_to_childs()

Committable suggestion skipped: line range outside the PR's diff.

Expand Down Expand Up @@ -563,7 +547,7 @@ def store_state(self, given_state: States = None) -> None:
super().store_state(given_state)

def __init__(self, item):
self.type = ShowMediaType.Season.value
self.type = "season"
self.number = item.get("number", None)
self.episodes: list[Episode] = item.get("episodes", [])
super().__init__(item)
Expand Down Expand Up @@ -662,7 +646,7 @@ class Episode(MediaItem):
}

def __init__(self, item):
self.type = ShowMediaType.Episode.value
self.type = "episode"
self.number = item.get("number", None)
self.file = item.get("file", None)
super().__init__(item)
Expand Down
9 changes: 5 additions & 4 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,11 @@ def _schedule_functions(self) -> None:
}

if settings_manager.settings.symlink.repair_symlinks:
scheduled_functions[fix_broken_symlinks] = {
"interval": 60 * 60 * settings_manager.settings.symlink.repair_interval,
"args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path]
}
# scheduled_functions[fix_broken_symlinks] = {
# "interval": 60 * 60 * settings_manager.settings.symlink.repair_interval,
# "args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path]
# }
logger.warning("Symlink repair is disabled, this will be re-enabled in the future.")

for func, config in scheduled_functions.items():
self.scheduler.add_job(
Expand Down
Loading