@@ -108,14 +108,15 @@ class JobData:
108
108
iens : int
109
109
job_state : AnyJob
110
110
submitted_timestamp : float
111
+ exec_hosts : str
111
112
112
113
113
114
def parse_bjobs (bjobs_output : str ) -> Dict [str , JobState ]:
114
115
data : Dict [str , JobState ] = {}
115
116
for line in bjobs_output .splitlines ():
116
117
tokens = line .split (sep = "^" )
117
- if len (tokens ) == 2 :
118
- job_id , job_state = tokens
118
+ if len (tokens ) == 3 :
119
+ job_id , job_state , _ = tokens
119
120
if job_state not in get_args (JobState ):
120
121
logger .error (
121
122
f"Unknown state { job_state } obtained from "
@@ -126,6 +127,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
126
127
return data
127
128
128
129
130
+ def parse_bjobs_exec_hosts (bjobs_output : str ) -> Dict [str , str ]:
131
+ data : Dict [str , str ] = {}
132
+ for line in bjobs_output .splitlines ():
133
+ tokens = line .split (sep = "^" )
134
+ if len (tokens ) == 3 :
135
+ job_id , _ , exec_hosts = tokens
136
+ if exec_hosts != "-" :
137
+ data [job_id ] = exec_hosts
138
+ return data
139
+
140
+
129
141
def build_resource_requirement_string (
130
142
exclude_hosts : Sequence [str ],
131
143
realization_memory : int ,
@@ -360,6 +372,7 @@ async def submit(
360
372
iens = iens ,
361
373
job_state = QueuedJob (job_state = "PEND" ),
362
374
submitted_timestamp = time .time (),
375
+ exec_hosts = "-" ,
363
376
)
364
377
self ._iens2jobid [iens ] = job_id
365
378
@@ -421,7 +434,7 @@ async def poll(self) -> None:
421
434
str (self ._bjobs_cmd ),
422
435
"-noheader" ,
423
436
"-o" ,
424
- "jobid stat delimiter='^'" ,
437
+ "jobid stat exec_host delimiter='^'" ,
425
438
* current_jobids ,
426
439
stdout = asyncio .subprocess .PIPE ,
427
440
stderr = asyncio .subprocess .PIPE ,
@@ -438,6 +451,14 @@ async def poll(self) -> None:
438
451
f"bjobs gave returncode { process .returncode } and error { stderr .decode ()} "
439
452
)
440
453
bjobs_states = _parse_jobs_dict (parse_bjobs (stdout .decode (errors = "ignore" )))
454
+ bjobs_exec_hosts = parse_bjobs_exec_hosts (stdout .decode (errors = "ignore" ))
455
+
456
+ for jobid , exec_hosts in bjobs_exec_hosts .items ():
457
+ if self ._jobs [jobid ].exec_hosts == "-" :
458
+ logger .warning (
459
+ f"Realization { self ._jobs [jobid ].iens } was executed on host: { exec_hosts } "
460
+ )
461
+ self ._jobs [jobid ].exec_hosts = exec_hosts
441
462
442
463
job_ids_found_in_bjobs_output = set (bjobs_states .keys ())
443
464
if (
@@ -489,7 +510,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
489
510
logger .info (f"Realization { iens } (LSF-id: { self ._iens2jobid [iens ]} ) failed" )
490
511
exit_code = await self ._get_exit_code (job_id )
491
512
event = FinishedEvent (iens = iens , returncode = exit_code )
492
-
493
513
elif isinstance (new_state , FinishedJobSuccess ):
494
514
logger .info (
495
515
f"Realization { iens } (LSF-id: { self ._iens2jobid [iens ]} ) succeeded"
0 commit comments