From b01ed6e5e478a34db5baeb380cd30b792a47f7d2 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Thu, 25 Jul 2024 00:38:47 +0200 Subject: [PATCH] adjust tests --- src/lifeblood_testing_common/common.py | 4 +- .../integration_common.py | 14 +- src/lifeblood_testing_common/nodes_common.py | 29 ++- ...eduler_config_provider_default_override.py | 7 +- tests/test_invocationjob.py | 230 ++++++++++++++++++ tests/test_resources_integration.py | 44 ++++ tests/test_scheduler_worker_comm.py | 57 +++-- tests/test_ui_protocol_data.py | 18 +- 8 files changed, 362 insertions(+), 41 deletions(-) create mode 100644 tests/test_invocationjob.py create mode 100644 tests/test_resources_integration.py diff --git a/src/lifeblood_testing_common/common.py b/src/lifeblood_testing_common/common.py index d70ab8a6..27d38294 100644 --- a/src/lifeblood_testing_common/common.py +++ b/src/lifeblood_testing_common/common.py @@ -1,7 +1,7 @@ from pathlib import Path import lifeblood from lifeblood.scheduler.scheduler import Scheduler -from lifeblood.worker_resource_definition import WorkerResourceDefinition +from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition from lifeblood.basenode_serializer_v2 import NodeSerializerV2 from lifeblood.pluginloader import PluginNodeDataProvider # TODO: this must be replaced by a testing mocker from lifeblood_testing_common.scheduler_config_provider_default_override import SchedulerConfigProviderOverrides @@ -25,6 +25,7 @@ def create_default_scheduler( node_per_node_config: Optional[Dict[str, Dict[str, Any]]] = None, node_global_config: Optional[Dict[str, Dict[str, Any]]] = None, resource_definitions: Optional[Tuple[WorkerResourceDefinition, ...]] = None, + device_type_definitions: Optional[Tuple[WorkerDeviceTypeDefinition, ...]] = None, ) -> Scheduler: legacy_addr = None message_addr = None @@ -42,6 +43,7 @@ def create_default_scheduler( node_per_node_config=node_per_node_config, node_global_config=node_global_config, resource_definitions=resource_definitions, + device_type_definitions=device_type_definitions, ) return Scheduler( scheduler_config_provider=config, diff --git a/src/lifeblood_testing_common/integration_common.py b/src/lifeblood_testing_common/integration_common.py index 57d052c0..4f72e30e 100644 --- a/src/lifeblood_testing_common/integration_common.py +++ b/src/lifeblood_testing_common/integration_common.py @@ -11,6 +11,7 @@ from lifeblood.simple_worker_pool import WorkerPool from lifeblood.net_messages.address import AddressChain from lifeblood.taskspawn import NewTask +from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition from lifeblood.enums import SpawnStatus from typing import Dict, Iterable, Optional, Tuple, Union @@ -57,6 +58,8 @@ async def asyncSetUp(self): server_ui_addr=(get_default_addr(), test_server_port3), node_per_node_config=c1, node_global_config=c2, + resource_definitions=self._resource_definitions(), + device_type_definitions=self._device_type_definitions(), ) self.worker_pool = WorkerPool( scheduler_address=AddressChain(f'{get_default_addr()}:{test_server_port2}'), @@ -96,7 +99,7 @@ async def test_main(self): print(f'expecting {expected_states}') # so expected_states is dict of task id to (state, is paused, node_id), not node name - required_succ_time = 0 + required_succ_time = self._required_expected_state_keep_time() # wait for processing timeout = self._timeout() @@ -180,6 +183,9 @@ def _expected_task_attributes(self) -> Dict[int, dict]: async def _additional_checks_on_finish(self): return + def _required_expected_state_keep_time(self) -> float: + return 0.0 + def _timeout(self) -> float: return 15.0 @@ -191,3 +197,9 @@ def _minimal_total_to_ensure(self) -> int: def _maximum_total(self) -> int: return 16 + + def _resource_definitions(self) -> Optional[Tuple[WorkerResourceDefinition, ...]]: + return None + + def _device_type_definitions(self) -> Optional[Tuple[WorkerDeviceTypeDefinition, ...]]: + return None diff --git a/src/lifeblood_testing_common/nodes_common.py b/src/lifeblood_testing_common/nodes_common.py index e5f91150..f19d2334 100644 --- a/src/lifeblood_testing_common/nodes_common.py +++ b/src/lifeblood_testing_common/nodes_common.py @@ -17,7 +17,7 @@ from lifeblood.scheduler import Scheduler from lifeblood_testing_common.common import create_default_scheduler from lifeblood.worker import Worker -from lifeblood.invocationjob import InvocationJob, Environment +from lifeblood.invocationjob import Invocation, InvocationJob, InvocationResources, Environment from lifeblood.scheduler.pinger import Pinger from lifeblood.pluginloader import PluginNodeDataProvider from lifeblood.processingcontext import ProcessingContext @@ -47,7 +47,7 @@ def __init__(self, path_to_bin: str): super().__init__() self.__bin_path = Path(path_to_bin) - async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment": + async def get_environment(self, arguments: Mapping) -> Environment: return Environment({**os.environ, 'PATH': os.pathsep.join((str(self.__bin_path), os.environ.get('PATH', ''))), 'PYTHONUNBUFFERED': '1'}) @@ -254,7 +254,7 @@ async def _helper_test_worker_node(self, tasks_to_complete = tasks_to_complete or worker_count side_effect_was_good = True with mock.patch('lifeblood.scheduler.scheduler.Scheduler.task_done_reported') as td_patch: - def _side_effect(task: InvocationJob, stdout: str, stderr: str): + def _side_effect(task: Invocation, stdout: str, stderr: str): nonlocal tasks_to_complete, side_effect_was_good tasks_to_complete -= 1 print(f'finished {task.task_id()} out: {stdout}') @@ -351,12 +351,16 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): ij.args().insert(0, str(Path(__file__).parent / Path(bin_rel_path) / command)) ij.args().insert(0, 'python') - ij._set_task_id(2345) - ij._set_invocation_id(1234) + invoc = Invocation( + ij, + invocation_id=1234, + task_id=2345, + resources_to_use=InvocationResources({}, {}), + ) the_worker = workers[0] await workers[0].run_task( - ij, + invoc, scheduler.server_message_addresses()[0] ) @@ -384,7 +388,7 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): return _logic - def _task_done_logic(task: InvocationJob): + def _task_done_logic(task: Invocation): self.assertEqual(100.0, the_worker.task_status()) for skip_exist, pre_exist in ((False, False), (True, False), (True, True)): @@ -456,12 +460,17 @@ async def _logic(scheduler, workers, script_path, done_waiter): ij.args().insert(0, str(Path(__file__).parent / Path(add_relative_to_PATH) / command)) ij.args().insert(0, 'python') - ij._set_task_id(2345) - ij._set_invocation_id(1234) + invoc = Invocation( + ij, + invocation_id=1234, + task_id=2345, + resources_to_use=InvocationResources({}, {}), + ) + the_worker = workers[0] await workers[0].run_task( - ij, + invoc, scheduler.server_message_addresses()[0] ) diff --git a/src/lifeblood_testing_common/scheduler_config_provider_default_override.py b/src/lifeblood_testing_common/scheduler_config_provider_default_override.py index 24b918b3..bf8baa87 100644 --- a/src/lifeblood_testing_common/scheduler_config_provider_default_override.py +++ b/src/lifeblood_testing_common/scheduler_config_provider_default_override.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Tuple -from lifeblood.worker_resource_definition import WorkerResourceDefinition +from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition class SchedulerConfigProviderOverrides(SchedulerConfigProviderDefaults): @@ -20,6 +20,7 @@ def __init__( node_per_node_config: Optional[Dict[str, Dict[str, Any]]] = None, node_global_config: Optional[Dict[str, Dict[str, Any]]] = None, resource_definitions: Optional[Tuple[WorkerResourceDefinition, ...]] = None, + device_type_definitions: Optional[Tuple[WorkerDeviceTypeDefinition, ...]] = None, ): super().__init__() self.__main_db_location_override = main_db_location @@ -33,6 +34,7 @@ def __init__( self.__node_per_node_config = node_per_node_config or {} self.__node_global_config = node_global_config or {} self.__resource_definitions = resource_definitions + self.__device_type_definitions = device_type_definitions def main_database_location(self) -> str: return self.__main_db_location_override or super().main_database_location() @@ -82,3 +84,6 @@ def node_configuration(self, node_type_id: str) -> dict: def hardware_resource_definitions(self) -> Tuple[WorkerResourceDefinition, ...]: return self.__resource_definitions or super().hardware_resource_definitions() + + def hardware_device_type_definitions(self) -> Tuple[WorkerDeviceTypeDefinition, ...]: + return self.__device_type_definitions or super().hardware_device_type_definitions() diff --git a/tests/test_invocationjob.py b/tests/test_invocationjob.py new file mode 100644 index 00000000..a42550aa --- /dev/null +++ b/tests/test_invocationjob.py @@ -0,0 +1,230 @@ +from unittest import TestCase +import random +from string import ascii_letters +from lifeblood.invocationjob import (Requirements, ResourceRequirements, ResourceRequirement, DeviceRequirements, + DeviceRequirement, Invocation, InvocationJob, InvocationResources) + + +class TestRequirements(TestCase): + def test_serde(self): + rng = random.Random(12432365) + + for _ in range(66): + resreq = {} + devreq = {} + for i in range(rng.randint(0, 10)): + name = ''.join(rng.choice(ascii_letters) for _ in range(rng.randint(1, 33))) + resreq[name] = ResourceRequirement(rng.uniform(0, 100), rng.uniform(0, 100)) + for i in range(rng.randint(0, 10)): + name = ''.join(rng.choice(ascii_letters) for _ in range(rng.randint(1, 33))) + dresreq = {} + for _ in range(rng.randint(0, 10)): + rname = ''.join(rng.choice(ascii_letters) for _ in range(rng.randint(1, 33))) + dresreq[rname] = ResourceRequirement(rng.uniform(0, 100), rng.uniform(0, 100)) + devreq[name] = DeviceRequirement( + resources=ResourceRequirements(dresreq), + ) + + expected_req = Requirements( + resources=ResourceRequirements(resreq), + devices=DeviceRequirements(devreq), + ) + + self.assertEqual(expected_req, Requirements.deserialize_from_string(expected_req.serialize_to_string())) + + +class TestInvocation(TestCase): + def test_serde(self): + exp_inv1 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + + self.assertEqual(exp_inv1, Invocation.deserialize_from_data(exp_inv1.serialize_to_data())) + + def test_eq1(self): + exp_inv1 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv1a = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv2 = Invocation( + InvocationJob( + ['boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv3 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 5555, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv4 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 5555, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv5 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + {}, + {} + ) + ) + exp_inv6 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mam': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv7 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123457, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['bark', 'woof'] + } + ) + ) + exp_inv8 = Invocation( + InvocationJob( + ['beep', 'boop'], + ), + 1234, + 4321, + InvocationResources( + { + 'mem': 123456, + 'cats': 4.4445 + }, + { + 'wheel': ['w1', 'w2', 'q4'], + 'dog': ['hurk', 'woof'] + } + ) + ) + + self.assertEqual(exp_inv1, exp_inv1) + self.assertEqual(exp_inv1, exp_inv1a) + self.assertEqual(exp_inv1a, exp_inv1a) + self.assertEqual(exp_inv2, exp_inv2) + self.assertNotEqual(exp_inv1, exp_inv2) + self.assertNotEqual(exp_inv2, exp_inv1) + self.assertEqual(exp_inv3, exp_inv3) + self.assertNotEqual(exp_inv1, exp_inv3) + self.assertNotEqual(exp_inv3, exp_inv1) + self.assertEqual(exp_inv4, exp_inv4) + self.assertNotEqual(exp_inv1, exp_inv4) + self.assertNotEqual(exp_inv4, exp_inv1) + self.assertEqual(exp_inv5, exp_inv5) + self.assertNotEqual(exp_inv1, exp_inv5) + self.assertNotEqual(exp_inv5, exp_inv1) + self.assertEqual(exp_inv6, exp_inv6) + self.assertNotEqual(exp_inv1, exp_inv6) + self.assertNotEqual(exp_inv6, exp_inv1) + self.assertEqual(exp_inv7, exp_inv7) + self.assertNotEqual(exp_inv1, exp_inv7) + self.assertNotEqual(exp_inv7, exp_inv1) + self.assertEqual(exp_inv8, exp_inv8) + self.assertNotEqual(exp_inv1, exp_inv8) + self.assertNotEqual(exp_inv8, exp_inv1) diff --git a/tests/test_resources_integration.py b/tests/test_resources_integration.py new file mode 100644 index 00000000..ad88e98d --- /dev/null +++ b/tests/test_resources_integration.py @@ -0,0 +1,44 @@ +from lifeblood.worker_resource_definition import WorkerDeviceTypeDefinition, WorkerResourceDefinition, WorkerResourceDataType +from lifeblood_testing_common.integration_common import FullIntegrationTestCase + +from typing import Iterable, Optional, Tuple + +# TODO: to cover: +# - no device +# - 1 min device req, 0 pref +# - 1 min device req, >1 pref +# - >1 min device req + + +class ResourceBaseTestCase(FullIntegrationTestCase): + __test__ = True + + @classmethod + def _initial_db_file(cls) -> str: + return 'data/test_resources.db' + + def _device_type_definitions(self) -> Optional[Tuple[WorkerDeviceTypeDefinition, ...]]: + return ( + WorkerDeviceTypeDefinition('gapauu', ( + WorkerResourceDefinition('megaresfoo', WorkerResourceDataType.GENERIC_FLOAT, 'foo is foo', 'Foo', 0), + WorkerResourceDefinition('megaresbar', WorkerResourceDataType.GENERIC_INT, 'bar is bar', 'Bar', 0), + )), + ) + + def _timeout(self) -> float: + return 60.0 + + +class TestDeviceRequirement(ResourceBaseTestCase): + async def _create_test_tasks(self) -> Iterable[int]: + tasks = [ + *await self._create_task(node_name='TEST DEV IN', attributes={ + **attrs, + }), + ] + return tasks + + def _expected_task_attributes(self): + return { + 0: exp_attrs, + } \ No newline at end of file diff --git a/tests/test_scheduler_worker_comm.py b/tests/test_scheduler_worker_comm.py index 94e4b943..cf790544 100644 --- a/tests/test_scheduler_worker_comm.py +++ b/tests/test_scheduler_worker_comm.py @@ -8,7 +8,7 @@ import logging from lifeblood.worker import Worker -from lifeblood.invocationjob import InvocationJob +from lifeblood.invocationjob import Invocation, InvocationJob, InvocationResources from lifeblood.taskspawn import NewTask from lifeblood.enums import WorkerType, WorkerState, SpawnStatus, InvocationState, InvocationMessageResult from lifeblood.db_misc import sql_init_script @@ -64,11 +64,14 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): spawn_patch.side_effect = lambda *args, **kwargs: print(f'spawn_tasks_called with {args}, {kwargs}') \ or (SpawnStatus.SUCCEEDED, 2346) - ij = InvocationJob( + ij = Invocation( + InvocationJob( ['python', tmp_script_path], - invocation_id=1234, - ) - ij._set_task_id(2345) + ), + invocation_id=1234, + task_id=2345, + resources_to_use=InvocationResources({}, {}), + ) await workers[0].run_task( ij, scheduler.server_message_addresses()[0] @@ -90,11 +93,14 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): with mock.patch('lifeblood.scheduler.scheduler.Scheduler.update_task_attributes') as attr_patch: attr_patch.side_effect = lambda *args, **kwargs: print(f'update attrs with {args}, {kwargs}') - ij = InvocationJob( + ij = Invocation( + InvocationJob( ['python', tmp_script_path], - invocation_id=1234, - ) - ij._set_task_id(2345) + ), + invocation_id=1234, + task_id=2345, + resources_to_use=InvocationResources({}, {}) + ) await workers[0].run_task( ij, scheduler.server_message_addresses()[0] @@ -241,9 +247,9 @@ async def test_worker_invocation_comm_api_worker_recv_timeout(self): async def _helper_test_worker_invocation_comm_api(self, *, i1_script: str, i2_script: str): async def _logic(scheduler, workers: List[Worker], tmp_script_path, done_waiter): - with mock.patch('lifeblood.scheduler.scheduler.Scheduler.update_task_attributes') as attr_patch, \ + with (mock.patch('lifeblood.scheduler.scheduler.Scheduler.update_task_attributes') as attr_patch, \ mock.patch('lifeblood.scheduler.scheduler.Scheduler.get_invocation_state') as get_invoc_patch, \ - mock.patch('lifeblood.scheduler.scheduler.Scheduler.get_invocation_worker') as get_invoc_worker_patch: + mock.patch('lifeblood.scheduler.scheduler.Scheduler.get_invocation_worker') as get_invoc_worker_patch): attr_patch.side_effect = lambda *args, **kwargs: print(f'update attrs with {args}, {kwargs}') get_invoc_patch.return_value = InvocationState.IN_PROGRESS get_invoc_worker_patch.side_effect = lambda inv_id: \ @@ -253,21 +259,26 @@ async def _logic(scheduler, workers: List[Worker], tmp_script_path, done_waiter) 80085: AddressChain('127.2.3.4:567'), # BAD address }.get(inv_id) - ij1 = InvocationJob( + ij1 = Invocation( + InvocationJob( ['python', '-c', - i1_script + i1_script ], - invocation_id=11234, - ) - ij1._set_task_id(3456) - - ij2 = InvocationJob( - ['python', '-c', - i2_script - ], + ), + invocation_id=11234, + task_id=3456, + resources_to_use=InvocationResources({}, {}), + ) + ij2 = Invocation( + InvocationJob( + ['python', '-c', + i2_script + ] + ), invocation_id=11235, + task_id=3457, + resources_to_use=InvocationResources({}, {}), ) - ij2._set_task_id(3457) await workers[0].run_task( ij1, @@ -310,7 +321,7 @@ async def _helper_test_worker_invocation_api(self, runcode: str, logic: Callable tasks_to_complete = tasks_to_complete or worker_count side_effect_was_good = True with mock.patch('lifeblood.scheduler.scheduler.Scheduler.task_done_reported') as td_patch: - def _side_effect(task: InvocationJob, stdout: str, stderr: str): + def _side_effect(task: Invocation, stdout: str, stderr: str): nonlocal tasks_to_complete, side_effect_was_good tasks_to_complete -= 1 print(f'finished {task.task_id()} out: {stdout}') diff --git a/tests/test_ui_protocol_data.py b/tests/test_ui_protocol_data.py index 9ec75cc8..77217009 100644 --- a/tests/test_ui_protocol_data.py +++ b/tests/test_ui_protocol_data.py @@ -6,7 +6,7 @@ from lifeblood.worker_metadata import WorkerMetadata from lifeblood.ui_protocol_data import UiData, NodeGraphStructureData, \ NodeData, NodeConnectionData, TaskBatchData, TaskData, WorkerData, WorkerResources, WorkerResource, WorkerResourceType, \ - WorkerBatchData, TaskGroupData, TaskGroupStatisticsData, TaskGroupBatchData, TaskDelta + WorkerBatchData, TaskGroupData, TaskGroupStatisticsData, TaskGroupBatchData, TaskDelta, WorkerDevice, WorkerDeviceResource from lifeblood.ui_events import TasksChanged, TasksUpdated, TasksRemoved, TaskFullState, TaskEvent @@ -25,17 +25,25 @@ def test_trivial1(self): 11: TaskData(11, None, 10, 5, TaskState.ERROR, "oh noooo", True, 1531, "main", "out", 'a task', 2, 11, 52.1, 8484, 158, 816, {'groo', 'froo'}) }), WorkerBatchData(0, { - 121: WorkerData(121, WorkerResources( + 121: WorkerData(121, + 1928374, '127.1.2.333:blo', 1234567, WorkerState.BUSY, WorkerType.STANDARD, + 1534, 8273, 7573, 55.5, {"borker", "gorker"}, + WorkerMetadata("fooo")) + }, { + 1928374: WorkerResources( [ WorkerResource(12.3, 23.4, WorkerResourceType.FLOAT, 'cpu_count'), WorkerResource(929283838, 939384848, WorkerResourceType.INT, 'cpu_mem'), WorkerResource(56.7, 67.8, WorkerResourceType.FLOAT, 'gpu_count'), WorkerResource(84847575, 85857676, WorkerResourceType.INT, 'gpu_mem'), + ], + [ + WorkerDevice('footype', 'dename', True, [ + WorkerDeviceResource(12.21, WorkerResourceType.FLOAT, 'lompas'), + WorkerDeviceResource(55, WorkerResourceType.INT, 'wompas') + ]) ] ), - '1928374', '127.1.2.333:blo', 1234567, WorkerState.BUSY, WorkerType.STANDARD, - 1534, 8273, 7573, 55.5, {"borker", "gorker"}, - WorkerMetadata("fooo")) }), TaskGroupBatchData(0, { 'grooup foo': TaskGroupData('grooup foo', 2345678, TaskGroupArchivedState.ARCHIVED, 22.3, TaskGroupStatisticsData(45, 56, 67, 78))