Skip to content

Commit

Permalink
adjust tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pedohorse committed Jul 24, 2024
1 parent 071744a commit b01ed6e
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 41 deletions.
4 changes: 3 additions & 1 deletion src/lifeblood_testing_common/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
import lifeblood
from lifeblood.scheduler.scheduler import Scheduler
from lifeblood.worker_resource_definition import WorkerResourceDefinition
from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition
from lifeblood.basenode_serializer_v2 import NodeSerializerV2
from lifeblood.pluginloader import PluginNodeDataProvider # TODO: this must be replaced by a testing mocker
from lifeblood_testing_common.scheduler_config_provider_default_override import SchedulerConfigProviderOverrides
Expand All @@ -25,6 +25,7 @@ def create_default_scheduler(
node_per_node_config: Optional[Dict[str, Dict[str, Any]]] = None,
node_global_config: Optional[Dict[str, Dict[str, Any]]] = None,
resource_definitions: Optional[Tuple[WorkerResourceDefinition, ...]] = None,
device_type_definitions: Optional[Tuple[WorkerDeviceTypeDefinition, ...]] = None,
) -> Scheduler:
legacy_addr = None
message_addr = None
Expand All @@ -42,6 +43,7 @@ def create_default_scheduler(
node_per_node_config=node_per_node_config,
node_global_config=node_global_config,
resource_definitions=resource_definitions,
device_type_definitions=device_type_definitions,
)
return Scheduler(
scheduler_config_provider=config,
Expand Down
14 changes: 13 additions & 1 deletion src/lifeblood_testing_common/integration_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from lifeblood.simple_worker_pool import WorkerPool
from lifeblood.net_messages.address import AddressChain
from lifeblood.taskspawn import NewTask
from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition
from lifeblood.enums import SpawnStatus

from typing import Dict, Iterable, Optional, Tuple, Union
Expand Down Expand Up @@ -57,6 +58,8 @@ async def asyncSetUp(self):
server_ui_addr=(get_default_addr(), test_server_port3),
node_per_node_config=c1,
node_global_config=c2,
resource_definitions=self._resource_definitions(),
device_type_definitions=self._device_type_definitions(),
)
self.worker_pool = WorkerPool(
scheduler_address=AddressChain(f'{get_default_addr()}:{test_server_port2}'),
Expand Down Expand Up @@ -96,7 +99,7 @@ async def test_main(self):
print(f'expecting {expected_states}')
# so expected_states is dict of task id to (state, is paused, node_id), not node name

required_succ_time = 0
required_succ_time = self._required_expected_state_keep_time()

# wait for processing
timeout = self._timeout()
Expand Down Expand Up @@ -180,6 +183,9 @@ def _expected_task_attributes(self) -> Dict[int, dict]:
async def _additional_checks_on_finish(self):
return

def _required_expected_state_keep_time(self) -> float:
return 0.0

def _timeout(self) -> float:
return 15.0

Expand All @@ -191,3 +197,9 @@ def _minimal_total_to_ensure(self) -> int:

def _maximum_total(self) -> int:
return 16

def _resource_definitions(self) -> Optional[Tuple[WorkerResourceDefinition, ...]]:
return None

def _device_type_definitions(self) -> Optional[Tuple[WorkerDeviceTypeDefinition, ...]]:
return None
29 changes: 19 additions & 10 deletions src/lifeblood_testing_common/nodes_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from lifeblood.scheduler import Scheduler
from lifeblood_testing_common.common import create_default_scheduler
from lifeblood.worker import Worker
from lifeblood.invocationjob import InvocationJob, Environment
from lifeblood.invocationjob import Invocation, InvocationJob, InvocationResources, Environment
from lifeblood.scheduler.pinger import Pinger
from lifeblood.pluginloader import PluginNodeDataProvider
from lifeblood.processingcontext import ProcessingContext
Expand Down Expand Up @@ -47,7 +47,7 @@ def __init__(self, path_to_bin: str):
super().__init__()
self.__bin_path = Path(path_to_bin)

async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
async def get_environment(self, arguments: Mapping) -> Environment:
return Environment({**os.environ,
'PATH': os.pathsep.join((str(self.__bin_path), os.environ.get('PATH', ''))),
'PYTHONUNBUFFERED': '1'})
Expand Down Expand Up @@ -254,7 +254,7 @@ async def _helper_test_worker_node(self,
tasks_to_complete = tasks_to_complete or worker_count
side_effect_was_good = True
with mock.patch('lifeblood.scheduler.scheduler.Scheduler.task_done_reported') as td_patch:
def _side_effect(task: InvocationJob, stdout: str, stderr: str):
def _side_effect(task: Invocation, stdout: str, stderr: str):
nonlocal tasks_to_complete, side_effect_was_good
tasks_to_complete -= 1
print(f'finished {task.task_id()} out: {stdout}')
Expand Down Expand Up @@ -351,12 +351,16 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter):
ij.args().insert(0, str(Path(__file__).parent / Path(bin_rel_path) / command))
ij.args().insert(0, 'python')

ij._set_task_id(2345)
ij._set_invocation_id(1234)
invoc = Invocation(
ij,
invocation_id=1234,
task_id=2345,
resources_to_use=InvocationResources({}, {}),
)
the_worker = workers[0]

await workers[0].run_task(
ij,
invoc,
scheduler.server_message_addresses()[0]
)

Expand Down Expand Up @@ -384,7 +388,7 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter):

return _logic

def _task_done_logic(task: InvocationJob):
def _task_done_logic(task: Invocation):
self.assertEqual(100.0, the_worker.task_status())

for skip_exist, pre_exist in ((False, False), (True, False), (True, True)):
Expand Down Expand Up @@ -456,12 +460,17 @@ async def _logic(scheduler, workers, script_path, done_waiter):
ij.args().insert(0, str(Path(__file__).parent / Path(add_relative_to_PATH) / command))
ij.args().insert(0, 'python')

ij._set_task_id(2345)
ij._set_invocation_id(1234)
invoc = Invocation(
ij,
invocation_id=1234,
task_id=2345,
resources_to_use=InvocationResources({}, {}),
)

the_worker = workers[0]

await workers[0].run_task(
ij,
invoc,
scheduler.server_message_addresses()[0]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Any, Dict, List, Optional, Tuple

from lifeblood.worker_resource_definition import WorkerResourceDefinition
from lifeblood.worker_resource_definition import WorkerResourceDefinition, WorkerDeviceTypeDefinition


class SchedulerConfigProviderOverrides(SchedulerConfigProviderDefaults):
Expand All @@ -20,6 +20,7 @@ def __init__(
node_per_node_config: Optional[Dict[str, Dict[str, Any]]] = None,
node_global_config: Optional[Dict[str, Dict[str, Any]]] = None,
resource_definitions: Optional[Tuple[WorkerResourceDefinition, ...]] = None,
device_type_definitions: Optional[Tuple[WorkerDeviceTypeDefinition, ...]] = None,
):
super().__init__()
self.__main_db_location_override = main_db_location
Expand All @@ -33,6 +34,7 @@ def __init__(
self.__node_per_node_config = node_per_node_config or {}
self.__node_global_config = node_global_config or {}
self.__resource_definitions = resource_definitions
self.__device_type_definitions = device_type_definitions

def main_database_location(self) -> str:
return self.__main_db_location_override or super().main_database_location()
Expand Down Expand Up @@ -82,3 +84,6 @@ def node_configuration(self, node_type_id: str) -> dict:

def hardware_resource_definitions(self) -> Tuple[WorkerResourceDefinition, ...]:
return self.__resource_definitions or super().hardware_resource_definitions()

def hardware_device_type_definitions(self) -> Tuple[WorkerDeviceTypeDefinition, ...]:
return self.__device_type_definitions or super().hardware_device_type_definitions()
Loading

0 comments on commit b01ed6e

Please sign in to comment.