From 551e0b800aa03090adf5c88eea0259971e480319 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sun, 15 Sep 2024 16:20:01 +0200 Subject: [PATCH 1/6] add event list collapse tests --- tests/test_ui_events_tools.py | 118 ++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 tests/test_ui_events_tools.py diff --git a/tests/test_ui_events_tools.py b/tests/test_ui_events_tools.py new file mode 100644 index 00000000..b5320813 --- /dev/null +++ b/tests/test_ui_events_tools.py @@ -0,0 +1,118 @@ +from unittest import TestCase +from lifeblood.ui_events_tools import collapse_task_event_list +from lifeblood.ui_events import TaskFullState, TasksChanged, TasksRemoved, TasksUpdated +from lifeblood.ui_protocol_data import TaskData, TaskBatchData, TaskDelta +from lifeblood.enums import TaskState + + +class TestUIEventsTools(TestCase): + def test_collapse_task_event_list_trivial(self): + self.assertIsNone(collapse_task_event_list([])) + + data = TaskBatchData( + 12345, + { + 2: TaskData( + 2, None, 4, 3, TaskState.WAITING, 'bla blaa', False, 55, 'maino', 'bleion', 'footask', 5, 6, 0.567, 7, 8, 9, {'qwe', 'asd', 'zxc'}, + ) + } + ) + fullstate = TaskFullState( + 12345, + data, + ) + + self.assertEqual(data, collapse_task_event_list([fullstate])) + + def test_collapse_task_event_list_errors(self): + self.assertRaises(RuntimeError, collapse_task_event_list, [ + TaskFullState( + 12345, TaskBatchData(12345, {}) + ), + TasksChanged( + 23456, [] + ), + ]) + + def test_collapse_task_event_list_common1(self): + fullstate_init = TaskFullState( + 12345, + TaskBatchData( + 12345, + { + 2: TaskData( + 2, None, 4, 3, TaskState.WAITING, 'bla blaa', False, 55, 'maino', 'bleion', 'footask', 5, 6, 0.567, 7, 8, 9, {'qwe', 'asd', 'zxc'}, + ) + } + ), + ) + fulldata_final = TaskBatchData( + 12345, + { + 2: TaskData( + 2, None, 4, 3, TaskState.WAITING, 'bla blaa', False, 55, 'maino', 'bleion', 'footask', 5, 6, 0.567, 7, 8, 9, {'qwe', 'asd', 'zxc'}, + ), + 22: TaskData( + 22, None, 44, 33, TaskState.POST_WAITING, 'beeba', True, 555, 'maino1', 'bleion1', 'bartask', 55, 66, 0.5678, 77, 88, 99, {'fgh'}, + ), + } + ) + event_list = [ + fullstate_init, + TasksUpdated( + 12345, + TaskBatchData( + 12345, + { + 22: TaskData( + 22, None, 0, 33, TaskState.DONE, None, True, 555, 'maino1', '', 'bartask', 55, 66, 0, 77, 88, 99, set(), + ), + } + ) + ), + TasksChanged( + 12345, + [ + TaskDelta( + 22, + children_count=44, + state=TaskState.POST_WAITING, + state_details='beeba', + node_output_name='badbad', + groups={'hhh'}, + ) + ] + ), + TasksUpdated( + 12345, + TaskBatchData( + 12345, + { + 33: TaskData( + 33, None, 0, 33, TaskState.DONE, None, True, 555, 'agageh', '', 'jgfjft', 55, 66, 0, 77, 88, 99, set(), + ), + } + ) + ), + TasksChanged( + 12345, + [ + TaskDelta( + 22, + node_output_name='bleion1', + progress=0.5678, + groups={'fgh'}, + ) + ] + ), + TasksRemoved( + 12345, + (33,), + ) + ] + + self.assertEqual( + fulldata_final, + collapse_task_event_list(event_list) + ) + From 320ff121be23eee8b2ee2ed332fbb93dcf1b3df9 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sun, 15 Sep 2024 16:20:19 +0200 Subject: [PATCH 2/6] implement collapse_task_event_list --- src/lifeblood/ui_events_tools.py | 50 ++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 src/lifeblood/ui_events_tools.py diff --git a/src/lifeblood/ui_events_tools.py b/src/lifeblood/ui_events_tools.py new file mode 100644 index 00000000..599bf446 --- /dev/null +++ b/src/lifeblood/ui_events_tools.py @@ -0,0 +1,50 @@ +from .ui_events import TaskFullState, TasksChanged, TasksRemoved, TasksUpdated, TaskEvent +from .ui_protocol_data import TaskBatchData, DataNotSet +from .logging import get_logger + +from typing import List, Optional + + +def collapse_task_event_list(event_list: List[TaskEvent]) -> Optional[TaskBatchData]: + if len(event_list) == 0: + return None + collapsed_data = TaskBatchData + db_id = None + event_id = None + timestamp = None + for event in event_list: + if db_id is None: + db_id = event.database_uid + event_id = event.event_id + timestamp = event.timestamp + elif db_id != event.database_uid: + raise RuntimeError('provided event list has events from different databases') + event_id = max(event_id, event.event_id) + timestamp = max(timestamp, event.timestamp) + + if isinstance(event, TaskFullState): + collapsed_data = event.task_data + elif isinstance(event, TasksRemoved): + for task_id in event.task_ids: + if task_id not in collapsed_data.tasks: + get_logger('lifeblood.utility').warning(f'event list inconsistency: task id {task_id} is not in tasks, cannot remove') + continue + collapsed_data.tasks.pop(task_id) + elif isinstance(event, TasksUpdated): + for task_id, task_data in event.task_data.tasks.items(): + collapsed_data.tasks[task_id] = task_data + elif isinstance(event, TasksChanged): + for task_delta in event.task_deltas: + task_id = task_delta.id + if task_id not in collapsed_data.tasks: + get_logger('lifeblood.utility').warning(f'event list inconsistency: task id {task_id} is not in tasks, cannot apply delta') + continue + for field in ('parent_id', 'children_count', 'active_children_count', 'state', 'state_details', 'paused', 'node_id', 'node_input_name', + 'node_output_name', 'name', 'split_level', 'work_data_invocation_attempt', 'progress', 'split_origin_task_id', 'split_id', + 'invocation_id', 'groups'): + if (val := getattr(task_delta, field)) is not DataNotSet: + setattr(collapsed_data.tasks[task_id], field, val) + else: + raise NotImplementedError(f'handling of event type "{type(event)}" is not implemented') + + return collapsed_data From 90d7e0241a8bc9a9b5367489fbd7659d2dd40819 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sun, 15 Sep 2024 16:20:48 +0200 Subject: [PATCH 3/6] collapse task event list on first subscription --- src/lifeblood_viewer/connection_worker.py | 32 +++++++++++++++-------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/lifeblood_viewer/connection_worker.py b/src/lifeblood_viewer/connection_worker.py index c6a81141..1faea244 100644 --- a/src/lifeblood_viewer/connection_worker.py +++ b/src/lifeblood_viewer/connection_worker.py @@ -1,34 +1,30 @@ import asyncio import socket -import struct import json import time -import pickle -from io import BytesIO from lifeblood.uidata import NodeUi -from lifeblood.ui_protocol_data import UiData from lifeblood.invocationjob import InvocationJob -from lifeblood.nethelpers import recv_exactly, address_to_ip_port, get_default_addr +from lifeblood.nethelpers import address_to_ip_port, get_default_addr from lifeblood import logging -from lifeblood.enums import NodeParameterType, TaskState, TaskGroupArchivedState +from lifeblood.enums import TaskState, TaskGroupArchivedState from lifeblood.broadcasting import await_broadcast from lifeblood.config import get_config from lifeblood.exceptions import UiClientOperationFailed from lifeblood.uidata import Parameter -from lifeblood.node_type_metadata import NodeTypeMetadata from lifeblood.taskspawn import NewTask -from lifeblood.snippets import NodeSnippetData, NodeSnippetDataPlaceholder +from lifeblood.snippets import NodeSnippetData from lifeblood.defaults import ui_port from lifeblood.environment_resolver import EnvironmentResolverArguments from lifeblood.scheduler_ui_protocol import UIProtocolSocketClient from lifeblood.ui_protocol_data import TaskBatchData +from lifeblood.ui_events import TaskFullState +from lifeblood.ui_events_tools import collapse_task_event_list import PySide2 from PySide2.QtCore import Signal, Slot, QPointF, QThread -#from PySide2.QtGui import QPoin -from typing import Callable, Optional, Set, List, Union, Dict, Iterable +from typing import Callable, Optional, Set, List, Union, Iterable logger = logging.get_logger('viewer') @@ -398,8 +394,22 @@ def _check_tasks(self): assert len(task_events) > 0 # on subscription there MUST be at least a single event if len(task_events) > 0: - first_time_getting_events = self.__last_known_event_id < 0 + first_time_receiving_events_for_this_filter = self.__last_known_event_id < 0 self.__last_known_event_id = task_events[-1].event_id + if first_time_receiving_events_for_this_filter: + try: + collapsed_data = collapse_task_event_list(task_events) + except RuntimeError: + logger.warning("failed to collapse event list, event list malformed!") + else: + subst_event = TaskFullState( + collapsed_data.db_uid, + collapsed_data + ) + subst_event.timestamp = task_events[-1].timestamp + subst_event.event_id = task_events[-1].event_id + task_events = [subst_event] + self.tasks_events_arrived.emit(task_events) else: tasks_state = self.__client.get_ui_tasks_state(self.__task_group_filter or [], not self.__skip_dead) From 55ae6df92b9ab91e8d5a574818a4904083533b7b Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sun, 15 Sep 2024 22:40:10 +0200 Subject: [PATCH 4/6] change error behaviour --- src/lifeblood/ui_events_tools.py | 32 +++++++++++++++++--------------- tests/test_ui_events_tools.py | 17 +++++++++++++++++ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/lifeblood/ui_events_tools.py b/src/lifeblood/ui_events_tools.py index 599bf446..901cfb96 100644 --- a/src/lifeblood/ui_events_tools.py +++ b/src/lifeblood/ui_events_tools.py @@ -1,14 +1,13 @@ from .ui_events import TaskFullState, TasksChanged, TasksRemoved, TasksUpdated, TaskEvent -from .ui_protocol_data import TaskBatchData, DataNotSet -from .logging import get_logger +from .ui_protocol_data import TaskBatchData, DataNotSet, TaskData -from typing import List, Optional +from typing import Dict, List, Optional def collapse_task_event_list(event_list: List[TaskEvent]) -> Optional[TaskBatchData]: if len(event_list) == 0: return None - collapsed_data = TaskBatchData + collapsed_tasks: Dict[int, TaskData] = {} db_id = None event_id = None timestamp = None @@ -23,28 +22,31 @@ def collapse_task_event_list(event_list: List[TaskEvent]) -> Optional[TaskBatchD timestamp = max(timestamp, event.timestamp) if isinstance(event, TaskFullState): - collapsed_data = event.task_data + collapsed_tasks = dict(event.task_data.tasks) elif isinstance(event, TasksRemoved): for task_id in event.task_ids: - if task_id not in collapsed_data.tasks: - get_logger('lifeblood.utility').warning(f'event list inconsistency: task id {task_id} is not in tasks, cannot remove') - continue - collapsed_data.tasks.pop(task_id) + if task_id not in collapsed_tasks: + raise RuntimeError(f'event list inconsistency: task id {task_id} is not in tasks, cannot remove') + collapsed_tasks.pop(task_id) elif isinstance(event, TasksUpdated): for task_id, task_data in event.task_data.tasks.items(): - collapsed_data.tasks[task_id] = task_data + collapsed_tasks[task_id] = task_data elif isinstance(event, TasksChanged): for task_delta in event.task_deltas: task_id = task_delta.id - if task_id not in collapsed_data.tasks: - get_logger('lifeblood.utility').warning(f'event list inconsistency: task id {task_id} is not in tasks, cannot apply delta') - continue + if task_id not in collapsed_tasks: + print(collapsed_tasks) + raise RuntimeError(f'event list inconsistency: task id {task_id} is not in tasks, cannot apply delta') for field in ('parent_id', 'children_count', 'active_children_count', 'state', 'state_details', 'paused', 'node_id', 'node_input_name', 'node_output_name', 'name', 'split_level', 'work_data_invocation_attempt', 'progress', 'split_origin_task_id', 'split_id', 'invocation_id', 'groups'): if (val := getattr(task_delta, field)) is not DataNotSet: - setattr(collapsed_data.tasks[task_id], field, val) + setattr(collapsed_tasks[task_id], field, val) else: raise NotImplementedError(f'handling of event type "{type(event)}" is not implemented') - return collapsed_data + return TaskBatchData( + db_id, + collapsed_tasks + ) + diff --git a/tests/test_ui_events_tools.py b/tests/test_ui_events_tools.py index b5320813..777e4f1b 100644 --- a/tests/test_ui_events_tools.py +++ b/tests/test_ui_events_tools.py @@ -34,6 +34,23 @@ def test_collapse_task_event_list_errors(self): ), ]) + self.assertRaises(RuntimeError, collapse_task_event_list, [ + TasksRemoved( + 12345, (1,) + ) + ]) + + self.assertRaises(RuntimeError, collapse_task_event_list, [ + TasksChanged( + 12345, [ + TaskDelta( + 2, + children_count=123, + ), + ], + ) + ]) + def test_collapse_task_event_list_common1(self): fullstate_init = TaskFullState( 12345, From ef4abf3d7f2ca0887283e4206a92a3ac01833c0b Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 18 Sep 2024 10:24:10 +0200 Subject: [PATCH 5/6] add event collapse None check --- src/lifeblood_viewer/connection_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lifeblood_viewer/connection_worker.py b/src/lifeblood_viewer/connection_worker.py index 1faea244..44e073ef 100644 --- a/src/lifeblood_viewer/connection_worker.py +++ b/src/lifeblood_viewer/connection_worker.py @@ -397,11 +397,12 @@ def _check_tasks(self): first_time_receiving_events_for_this_filter = self.__last_known_event_id < 0 self.__last_known_event_id = task_events[-1].event_id if first_time_receiving_events_for_this_filter: + collapsed_data: Optional[TaskBatchData] = None try: collapsed_data = collapse_task_event_list(task_events) except RuntimeError: logger.warning("failed to collapse event list, event list malformed!") - else: + if collapsed_data is not None: subst_event = TaskFullState( collapsed_data.db_uid, collapsed_data From 636afc592dd73b64471761f3f8e2b5f9b4c802a5 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 18 Sep 2024 13:52:13 +0200 Subject: [PATCH 6/6] ensure input event list is unmodified --- src/lifeblood/ui_events_tools.py | 14 ++--- tests/test_ui_events_tools.py | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/src/lifeblood/ui_events_tools.py b/src/lifeblood/ui_events_tools.py index 901cfb96..5f0ae9f9 100644 --- a/src/lifeblood/ui_events_tools.py +++ b/src/lifeblood/ui_events_tools.py @@ -1,5 +1,7 @@ +import copy + from .ui_events import TaskFullState, TasksChanged, TasksRemoved, TasksUpdated, TaskEvent -from .ui_protocol_data import TaskBatchData, DataNotSet, TaskData +from .ui_protocol_data import TaskBatchData, DataNotSet, TaskData, TaskDelta from typing import Dict, List, Optional @@ -22,7 +24,7 @@ def collapse_task_event_list(event_list: List[TaskEvent]) -> Optional[TaskBatchD timestamp = max(timestamp, event.timestamp) if isinstance(event, TaskFullState): - collapsed_tasks = dict(event.task_data.tasks) + collapsed_tasks = {k: copy.copy(v) for k, v in event.task_data.tasks.items()} elif isinstance(event, TasksRemoved): for task_id in event.task_ids: if task_id not in collapsed_tasks: @@ -30,17 +32,17 @@ def collapse_task_event_list(event_list: List[TaskEvent]) -> Optional[TaskBatchD collapsed_tasks.pop(task_id) elif isinstance(event, TasksUpdated): for task_id, task_data in event.task_data.tasks.items(): - collapsed_tasks[task_id] = task_data + collapsed_tasks[task_id] = copy.copy(task_data) elif isinstance(event, TasksChanged): for task_delta in event.task_deltas: task_id = task_delta.id if task_id not in collapsed_tasks: print(collapsed_tasks) raise RuntimeError(f'event list inconsistency: task id {task_id} is not in tasks, cannot apply delta') - for field in ('parent_id', 'children_count', 'active_children_count', 'state', 'state_details', 'paused', 'node_id', 'node_input_name', - 'node_output_name', 'name', 'split_level', 'work_data_invocation_attempt', 'progress', 'split_origin_task_id', 'split_id', - 'invocation_id', 'groups'): + for field in TaskDelta.__annotations__.keys(): if (val := getattr(task_delta, field)) is not DataNotSet: + if field == 'id': + assert collapsed_tasks[task_id].id == val setattr(collapsed_tasks[task_id], field, val) else: raise NotImplementedError(f'handling of event type "{type(event)}" is not implemented') diff --git a/tests/test_ui_events_tools.py b/tests/test_ui_events_tools.py index 777e4f1b..2d3afff4 100644 --- a/tests/test_ui_events_tools.py +++ b/tests/test_ui_events_tools.py @@ -1,3 +1,5 @@ +import copy +import random from unittest import TestCase from lifeblood.ui_events_tools import collapse_task_event_list from lifeblood.ui_events import TaskFullState, TasksChanged, TasksRemoved, TasksUpdated @@ -133,3 +135,89 @@ def test_collapse_task_event_list_common1(self): collapse_task_event_list(event_list) ) + def test_ensure_source_unmodified(self): + update_event = TasksUpdated( + 12345, + TaskBatchData( + 12345, + { + 123: TaskData(123, 234, 444, 333, TaskState.GENERATING, 'nope', True, 345, 'floo', 'flee', 'nonde', 456, 567, 0.51423, 678, 789, 890, {'karrr'},), + } + ) + ) + full_event = TaskFullState( + 12345, + TaskBatchData( + 12345, + { + 123: TaskData(123, 234, 444, 333, TaskState.GENERATING, 'nope', True, 345, 'floo', 'flee', 'nonde', 456, 567, 0.51423, 678, 789, 890, {'karrr'}, ), + } + ) + ) + delta_event = TasksChanged( + 12345, + [ + TaskDelta(123, children_count=999, split_origin_task_id=888, name='foooooooo') + ] + ) + + update_event_control = copy.deepcopy(update_event) + full_event_control = copy.deepcopy(full_event) + + collapsed_data = collapse_task_event_list([full_event, delta_event]) + self.assertIsNotNone(collapsed_data) + collapsed_data = collapse_task_event_list([update_event, delta_event]) + self.assertIsNotNone(collapsed_data) + + self.assertEqual(update_event_control, update_event) + self.assertEqual(full_event_control, full_event) + + def test_random_change(self): + rng = random.Random(1827361) + for _ in range(999): + fields = list(TaskDelta.__annotations__.keys()) + rng.shuffle(fields) + delta = TaskDelta(123) + attrs_set = {} + for field in fields[:rng.randint(0, len(fields))]: + if field == 'id': + continue + # NOTE: we ignore typing, which may cause test fails on correct implementations + val = random.randint(0, 99999) + setattr(delta, field, val) + attrs_set[field] = val + + task_data_control = TaskData(123, 234, 444, 333, TaskState.GENERATING, 'nope', True, 345, 'floo', 'flee', 'nonde', 456, 567, 0.51423, 678, 789, 890, {'karrr'},) + task_data = TaskData(123, 234, 444, 333, TaskState.GENERATING, 'nope', True, 345, 'floo', 'flee', 'nonde', 456, 567, 0.51423, 678, 789, 890, {'karrr'},) + event_list = [ + TasksUpdated( + 12345, + TaskBatchData( + 12345, + { + 123: task_data, + } + ) + ), + TasksChanged( + 12345, + [ + delta + ] + ) + ] + + collapsed_data = collapse_task_event_list(event_list) + self.assertIsNotNone(collapsed_data) + + # ensure that original event was not changed + self.assertEqual(task_data_control, task_data) + + self.assertSetEqual({123}, set(collapsed_data.tasks.keys())) + for field in TaskDelta.__annotations__.keys(): + if field in attrs_set: + expected_val = attrs_set[field] + else: + expected_val = getattr(task_data_control, field) + + self.assertEqual(expected_val, getattr(collapsed_data.tasks[123], field), f'fail in "{field}" field')