Skip to content

Commit

Permalink
updated dbesm sim
Browse files Browse the repository at this point in the history
  • Loading branch information
mfioren committed Feb 5, 2025
1 parent bce8a1a commit afb7864
Show file tree
Hide file tree
Showing 2 changed files with 642 additions and 6 deletions.
211 changes: 210 additions & 1 deletion simulators/dbesm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ class System(ListeningSystem):
'DBE GETCFG': '_get_cfg',
'DBE SETDBEATT': '_set_dbeatt',
'DBE GETDBEATT': '_get_dbeatt',
'DBE FIRM': '_get_firm'
'DBE GETFIRM': '_get_firm',
'DBE SETDBEAMP': '_set_dbeamp',
'DBE GETDBEAMP': '_get_dbeamp',
'DBE SETDBEEQ': '_set_dbeeq',
'DBE GETDBEEQ': '_get_dbeeq',
'DBE SETDBEBPF': '_set_dbebpf',
'DBE GETDBEBPF': '_get_dbebpf'
}

errors = {
Expand Down Expand Up @@ -546,6 +552,209 @@ def _get_firm(self, params):
retval += f'Prog=DBESM, Rev=rev {selected_board["FIRM"]}'
return retval + '\x0D\x0A'

def _set_dbeamp(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 3:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.amps_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["AMP"])
print(board["AMP"][a[out_idx[b_idx]]])
if (float(params[2]) not in [0,1]):
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} value out of range\n')
elif board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
board["AMP"][a[out_idx[b_idx]]] = float(params[2])
retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n'
return retval[:-1] + '\x0D\x0A'

def _get_dbeamp(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 2:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.amps_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["AMP"])
print(board["AMP"][a[out_idx[b_idx]]])

if board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
retval += (f'ACK {params[1]} BOARD {board["Address"]} '
f'AMP {a[out_idx[b_idx]]} VALUE '
f'{board["AMP"][a[out_idx[b_idx]]]}\n')
return retval[:-1] + '\x0D\x0A'


def _set_dbeeq(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 3:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.eqs_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["EQ"])
print(board["EQ"][a[out_idx[b_idx]]])
if (float(params[2]) not in [0,1]):
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} value out of range\n')
elif board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
board["EQ"][a[out_idx[b_idx]]] = float(params[2])
retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n'
return retval[:-1] + '\x0D\x0A'

def _get_dbeeq(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 2:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.eqs_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["EQ"])
print(board["EQ"][a[out_idx[b_idx]]])

if board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
retval += (f'ACK {params[1]} BOARD {board["Address"]} '
f'EQ {a[out_idx[b_idx]]} VALUE '
f'{board["EQ"][a[out_idx[b_idx]]]}\n')
return retval[:-1] + '\x0D\x0A'


def _set_dbebpf(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 3:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.bpfs_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["BPF"])
print(board["BPF"][a[out_idx[b_idx]]])
if (float(params[2]) not in [0,1]):
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} value out of range\n')
elif board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
board["BPF"][a[out_idx[b_idx]]] = float(params[2])
retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n'
return retval[:-1] + '\x0D\x0A'

def _get_dbebpf(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 2:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.bpfs_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["BPF"])
print(board["BPF"][a[out_idx[b_idx]]])

if board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
retval += (f'ACK {params[1]} BOARD {board["Address"]} '
f'BPF {a[out_idx[b_idx]]} VALUE '
f'{board["BPF"][a[out_idx[b_idx]]]}\n')
return retval[:-1] + '\x0D\x0A'

def _error(self, device_code, error_code, board_address=None):
error_string = self.errors.get(error_code)
if error_code == 1001:
Expand Down
Loading

0 comments on commit afb7864

Please sign in to comment.