Skip to content

Commit 1a2f2c8

Browse files
committed
Retrieve exec_hosts from bjobs
1 parent 484381b commit 1a2f2c8

File tree

3 files changed

+36
-7
lines changed

3 files changed

+36
-7
lines changed

src/ert/scheduler/event.py

+2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
@dataclass
88
class StartedEvent:
99
iens: int
10+
exec_hosts: str = ""
1011

1112

1213
@dataclass
1314
class FinishedEvent:
1415
iens: int
1516
returncode: int
17+
exec_hosts: str = ""
1618

1719

1820
Event = Union[StartedEvent, FinishedEvent]

src/ert/scheduler/lsf_driver.py

+28-6
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,15 @@ class JobData:
108108
iens: int
109109
job_state: AnyJob
110110
submitted_timestamp: float
111+
exec_hosts: str
111112

112113

113114
def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
114115
data: Dict[str, JobState] = {}
115116
for line in bjobs_output.splitlines():
116117
tokens = line.split(sep="^")
117-
if len(tokens) == 2:
118-
job_id, job_state = tokens
118+
if len(tokens) == 3:
119+
job_id, job_state, _ = tokens
119120
if job_state not in get_args(JobState):
120121
logger.error(
121122
f"Unknown state {job_state} obtained from "
@@ -126,6 +127,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
126127
return data
127128

128129

130+
def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
131+
data: Dict[str, str] = {}
132+
for line in bjobs_output.splitlines():
133+
tokens = line.split(sep="^")
134+
if len(tokens) == 3:
135+
job_id, _, exec_hosts = tokens
136+
if exec_hosts != "-":
137+
data[job_id] = exec_hosts
138+
return data
139+
140+
129141
def build_resource_requirement_string(
130142
exclude_hosts: Sequence[str],
131143
realization_memory: int,
@@ -360,6 +372,7 @@ async def submit(
360372
iens=iens,
361373
job_state=QueuedJob(job_state="PEND"),
362374
submitted_timestamp=time.time(),
375+
exec_hosts="-",
363376
)
364377
self._iens2jobid[iens] = job_id
365378

@@ -421,7 +434,7 @@ async def poll(self) -> None:
421434
str(self._bjobs_cmd),
422435
"-noheader",
423436
"-o",
424-
"jobid stat delimiter='^'",
437+
"jobid stat exec_host delimiter='^'",
425438
*current_jobids,
426439
stdout=asyncio.subprocess.PIPE,
427440
stderr=asyncio.subprocess.PIPE,
@@ -438,6 +451,12 @@ async def poll(self) -> None:
438451
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
439452
)
440453
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
454+
bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))
455+
456+
for jobid, exec_hosts in bjobs_exec_hosts.items():
457+
if self._jobs[jobid].exec_hosts == "-":
458+
logger.warning(f"bjobs exec_host: {jobid} {exec_hosts}")
459+
self._jobs[jobid].exec_hosts = exec_hosts
441460

442461
job_ids_found_in_bjobs_output = set(bjobs_states.keys())
443462
if (
@@ -484,12 +503,15 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
484503
event: Optional[Event] = None
485504
if isinstance(new_state, RunningJob):
486505
logger.debug(f"Realization {iens} is running")
487-
event = StartedEvent(iens=iens)
506+
event = StartedEvent(iens=iens, exec_hosts=self._jobs[job_id].exec_hosts)
488507
elif isinstance(new_state, FinishedJobFailure):
489508
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
490509
exit_code = await self._get_exit_code(job_id)
491-
event = FinishedEvent(iens=iens, returncode=exit_code)
492-
510+
event = FinishedEvent(
511+
iens=iens,
512+
returncode=exit_code,
513+
exec_hosts=self._jobs[job_id].exec_hosts,
514+
)
493515
elif isinstance(new_state, FinishedJobSuccess):
494516
logger.info(
495517
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"

src/ert/scheduler/scheduler.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from ert.constant_filenames import CERT_FILE
2929

3030
from .driver import Driver
31-
from .event import FinishedEvent
31+
from .event import FinishedEvent, StartedEvent
3232
from .job import Job, JobState
3333

3434
if TYPE_CHECKING:
@@ -308,6 +308,11 @@ async def _process_event_queue(self) -> None:
308308
# Any event implies the job has at least started
309309
job.started.set()
310310

311+
if isinstance(event, (StartedEvent, FinishedEvent)) and event.exec_hosts:
312+
logger.info(
313+
f"Realization {event.iens} was executed on host: {event.exec_hosts}"
314+
)
315+
311316
if (
312317
isinstance(event, FinishedEvent)
313318
and not self._cancelled

0 commit comments

Comments
 (0)