-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathworkflow.py
63 lines (47 loc) · 1.58 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import importlib
import os
import traceback
from .comfy_script.runtime import Workflow, load, queue
WORKFLOW_MAP = {}
BASE_DIR = os.path.dirname(__file__)
class WorkFlowObject:
"""Generic parent."""
def __init__(self) -> None:
self.workflow: Workflow = None
self.results = None
def pre_execute(self, kwargs):
"""Processing parameters"""
return kwargs
def execute(self, **kwargs):
"""WorkFlow."""
pass
def post_execute(self, results):
"""Process the results."""
pass
def init_comfy_script(comfyui: str = None):
load(comfyui)
queue.watch_display(False)
workflow_dir = os.path.join(BASE_DIR, "custom_workflows")
for file_name in os.listdir(workflow_dir):
try:
base_name, ext = os.path.splitext(file_name)
if ext == ".py":
module = importlib.import_module(f".custom_workflows.{base_name}", package=__package__)
if base_name in WORKFLOW_MAP:
importlib.reload(module)
WORKFLOW_MAP[base_name] = module.WorkFlow
except Exception:
traceback.print_exc()
def run_workflow(workflow: WorkFlowObject, *arg, **kwargs):
wf = Workflow()
with wf:
result = workflow.execute(*arg, **kwargs)
workflow.workflow = wf
workflow.results = result
return wf, result
def wait_for_workflow(workflow: WorkFlowObject, timeout=0.0001):
try:
asyncio.run(asyncio.wait_for(asyncio.shield(workflow.workflow.task), timeout))
except TimeoutError:
pass