forked from NSLS-II/sirepo-bluesky-legacy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsirepo_bluesky.py
153 lines (133 loc) · 5.82 KB
/
sirepo_bluesky.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import requests
import time
import random
import numconv
import hashlib
import base64
class SirepoBluesky(object):
"""
Invoke a remote sirepo simulation with custom arguments.
Parameters
----------
server: str
Sirepo server to call, ex. 'http://locahost:8000'
Examples
--------
sim_id = '1tNWph0M'
sb = SirepoBluesky('http://localhost:8000')
data, schema = sb.auth('srw', sim_id)
# update the model values and choose the report
data['models']['undulator']['verticalAmplitude'] = 0.95
data['report'] = 'trajectoryReport'
sb.run_simulation()
f = sb.get_datafile()
# assumes there is an aperture named A1 and a watchpoint named W1 in the beamline
aperture = sb.find_element(data['models']['beamline'], 'title', 'A1')
aperture['horizontalSize'] = 0.1
aperture['verticalSize'] = 0.1
watch = sb.find_element(data['models']['beamline'], 'title', 'W1')
data['report'] = 'watchpointReport{}'.format(watch['id'])
sb.run_simulation()
f2 = sb.get_datafile()
Start Sirepo Server
-------------------
$ SIREPO_BLUESKY_AUTH_SECRET=bluesky sirepo service http
- 'bluesky' is the secret key in this case
"""
def __init__(self, server, secret='bluesky'):
self.server = server
self.secret = secret
def auth(self, sim_type, sim_id):
""" Connect to the server and returns the data for the simulation identified by sim_id. """
req = dict(simulationType=sim_type, simulationId=sim_id)
r = random.SystemRandom()
req['authNonce'] = str(int(time.time())) + '-' + ''.join(r.choice
(numconv.BASE62) for x in range(32))
h = hashlib.sha256()
h.update(':'.join([req['authNonce'], req['simulationType'],
req['simulationId'], self.secret]).encode())
req['authHash'] = 'v1:' + base64.urlsafe_b64encode(h.digest()).decode()
self.cookies = None
res = self._post_json('bluesky-auth', req)
assert 'state' in res and res['state'] == 'ok', 'bluesky_auth failed: {}'.format(res)
self.sim_type = sim_type
self.sim_id = sim_id
self.schema = res['schema']
self.data = res['data']
return self.data, self.schema
def copy_sim(self, sim_name):
""" Create a copy of the current simulation. Returns a new instance of SirepoBluesky. """
assert self.sim_id
# simulationId, simulationType, name, folder
res = self._post_json('copy-simulation', {
'simulationId': self.sim_id,
'simulationType': self.sim_type,
'folder': self.data['models']['simulation']['folder'],
'name': sim_name,
})
copy = SirepoBluesky(self.server, self.secret)
copy.cookies = self.cookies
copy.sim_type = self.sim_type
copy.sim_id = res['models']['simulation']['simulationId']
copy.schema = self.schema
copy.data = res
copy.is_copy = True
return copy
def delete_copy(self):
""" Delete a simulation which was created using copy_sim(). """
assert self.is_copy
res = self._post_json('delete-simulation', {
'simulationId': self.sim_id,
'simulationType': self.sim_type,
})
assert res['state'] == 'ok'
self.sim_id = None
@staticmethod
def find_element(elements, field, value):
""" Helper method to lookup an element in an array by field value. """
for e in elements:
if e[field] == value:
return e
assert False, 'element not found, {}={}'.format(field, value)
def find_optic_id_by_name(self, optic_name):
for optic_id in range(len(self.data['models']['beamline'])):
if self.data['models']['beamline'][optic_id]['title'] == optic_name:
return optic_id
raise ValueError(f'Not valid optic {optic_name}')
def get_datafile(self):
""" Requests the raw datafile of simulation results from the server.
Call auth() and run_simulation() before this. """
assert hasattr(self, 'cookies'), 'call auth() before get_datafile()'
url = 'download-data-file/{}/{}/{}/-1'.format(self.sim_type, self.sim_id, self.data['report'])
response = requests.get('{}/{}'.format(self.server, url), cookies=self.cookies)
self._assert_success(response, url)
return response.content
def run_simulation(self, max_status_calls=1000):
""" Run the sirepo simulation and returns the formatted plot data.
Parameters
----------
max_status_calls: int, optional
Maximum calls to check a running simulation's status. Roughly in seconds.
Defaults is 1000.
"""
assert hasattr(self, 'cookies'), 'call auth() before run_simulation()'
assert 'report' in self.data, 'client needs to set data[\'report\']'
self.data['simulationId'] = self.sim_id
res = self._post_json('run-simulation', self.data)
for _ in range(max_status_calls):
state = res['state']
if state == 'completed' or state == 'error':
break
time.sleep(res['nextRequestSeconds'])
res = self._post_json('run-status', res['nextRequest'])
assert state == 'completed', 'simulation failed to completed: {}'.format(state)
return res
@staticmethod
def _assert_success(response, url):
assert response.status_code == requests.codes.ok, '{} request failed, status: {}'.format(url, response.status_code)
def _post_json(self, url, payload):
response = requests.post('{}/{}'.format(self.server, url), json=payload, cookies=self.cookies)
self._assert_success(response, url)
if not self.cookies:
self.cookies = response.cookies
return response.json()