Skip to content

Commit

Permalink
Updated dbesm sim and sim test with getcfg and dbeatt functions (#342)
Browse files Browse the repository at this point in the history
* Updated dbesm sim and sim test with getcfg and dbeatt functions

* updated dbesm simulator

* updated dbesm sim tests

* updated dbesm tests

---------

Co-authored-by: Giuseppe Carboni <giuseppecarboni89@live.com>
  • Loading branch information
mfioren and giuseppe-carboni authored Jan 9, 2024
1 parent 8099685 commit 055e0f2
Show file tree
Hide file tree
Showing 2 changed files with 494 additions and 137 deletions.
202 changes: 168 additions & 34 deletions simulators/dbesm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ class System(ListeningSystem):
'DBE SETALLMODE': '_set_allmode',
'DBE MODE': '_set_mode',
'DBE STOREALLMODE': '_store_allmode',
'DBE CLRMODE': '_clr_mode',
'DBE DELETEFILE': '_delete_file',
'DBE GETSTATUS': '_get_status',
'DBE SETATT': '_set_att',
'DBE SETAMP': '_set_amp',
'DBE SETEQ': '_set_eq',
'DBE SETBPF': '_set_bpf',
'DBE ALLDIAG': '_all_diag',
'DBE DIAG': '_diag',
'DBE ReadALLDIAG': '_all_diag',
'DBE ReadDIAG': '_diag',
'DBE SETSTATUS': '_set_status',
'DBE GETCOMP': '_getcomp',
'DBE GETCOMP': '_get_comp',
'DBE GETCFG': '_get_cfg',
'DBE SETDBEATT': '_set_dbeatt',
'DBE GETDBEATT': '_get_dbeatt',
'DBE FIRM': '_get_firm'
}

errors = {
Expand All @@ -48,6 +52,7 @@ class System(ListeningSystem):
1012: 'AMP X not existing',
1013: 'EQ X not existing',
1014: 'BPF X not existing',
1015: 'Output not existing',
}

obs_mode = [
Expand All @@ -59,58 +64,73 @@ class System(ListeningSystem):
'MFS_7',
]

out_boards = ['0', '0', '0', '1', '1', '2', '2', '3', '3']
out_dbe = ['1_DBBC2', 'prova', 'SARDA_01', 'prova',
'prova2', 'Space_Debris', 'prova', 'prova2', 'SARDA_14']
outputs = list(zip(out_boards, out_dbe))
out_att = [11, 3, 2, 1, 8, 1, 5, 16, 7]
atts_in_boards = list(zip(out_boards, out_att))

def __init__(self):
self.msg = ''

# Status -1 -> board not available
self.boards = [
{
'Address': '12',
'Address': '3',
"Status": 0,
"Configuration": "default",
"REG": self._init_reg(),
"ATT": self._init_att(),
"AMP": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"EQ": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"BPF": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"5V": 0.00,
"3V3": 0.00,
"T0": 00.00
"AMP": [1, 1, 1, 1, 0, 0, 0, 0, 0, 1],
"EQ": [1, 1, 1, 1, 0, 0, 0, 0, 0, 1],
"BPF": [1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1],
"5V": 1.11,
"3V3": 0.11,
"T0": 05.10,
"FIRM": "0.116_NEW_win"
},
{
'Address': '13',
'Address': '2',
"Status": 0,
"Configuration": "default",
"REG": self._init_reg(),
"ATT": self._init_att(),
"AMP": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"EQ": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"BPF": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"AMP": [1, 0, 0, 0, 0, 0, 1, 1, 1, 1],
"EQ": [1, 0, 0, 0, 0, 0, 1, 1, 1, 1],
"BPF": [1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1],
"5V": 0.00,
"3V3": 0.00,
"T0": 00.00
"3V3": 0.01,
"T0": 50.51,
"FIRM": "0.116_NEW_win"
},
{
'Address': '14',
'Address': '1',
"Status": 0,
"Configuration": "default",
"REG": self._init_reg(),
"ATT": self._init_att(),
"AMP": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"EQ": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"BPF": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"5V": 0.00,
"3V3": 0.00,
"T0": 00.00
"AMP": [1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
"EQ": [1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
"BPF": [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
"5V": 1.00,
"3V3": 0.10,
"T0": 15.50,
"FIRM": "0.116_NEW_win"
},
{
'Address': '15',
'Address': '0',
"Status": 0,
"Configuration": "default",
"REG": self._init_reg(),
"ATT": self._init_att(),
"AMP": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"EQ": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"BPF": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"5V": 0.00,
"3V3": 0.00,
"T0": 00.00
"AMP": [0, 0, 0, 1, 1, 1, 1, 0, 0, 0],
"EQ": [0, 0, 0, 1, 1, 1, 1, 0, 0, 0],
"BPF": [0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0],
"5V": 0.01,
"3V3": 1.00,
"T0": 05.05,
"FIRM": "0.116_NEW_win"
},
]

Expand All @@ -124,6 +144,7 @@ def _init_att(self):
return att

def parse(self, byte):
print(byte)
if byte == self.tail:
msg = self.msg[:-1]
self.msg = ''
Expand All @@ -138,6 +159,7 @@ def _execute(self, msg):
:param msg: the received command, comprehensive of its header and tail.
"""
print(msg)
args = [x.strip() for x in msg.split(' ')]
try:
device_code = list(self.devices.keys())[
Expand Down Expand Up @@ -173,6 +195,7 @@ def _set_allmode(self, params):
if key['Status'] != 0:
retval += f'BOARD {key["Address"]} ERR DBE BOARD unreachable\n'
else:
key["Configuration"] = params[1]
retval += f'BOARD {key["Address"]} ACK\n'
return retval[:-1] + '\x0D\x0A'

Expand All @@ -192,6 +215,7 @@ def _set_mode(self, params):
elif selected_board["Status"] != 0:
return self._error(params[0], 1005, params[2])
else:
selected_board["Configuration"] = params[3]
return self.ack + '\x0D\x0A'

def _store_allmode(self, params):
Expand All @@ -212,7 +236,7 @@ def _store_allmode(self, params):
self.obs_mode.append(params[1])
return self.ack + '\x0D\x0A'

def _clr_mode(self, params):
def _delete_file(self, params):
if len(params) != 2:
return self._error(params[0], 1001)
elif params[1] not in self.obs_mode:
Expand Down Expand Up @@ -250,7 +274,7 @@ def _set_att(self, params):
return self._error(params[0], 1007, params[3])
elif int(params[1]) not in list(range(0, 17)):
return self._error(params[0], 1010, params[1])
elif float(params[5]) not in list(numpy.arange(0, 31.5, 0.5)):
elif float(params[5]) not in list(numpy.arange(0, 32, 0.5)):
return self._error(params[0], 1011, selected_board["Address"])
elif selected_board["Status"] != 0:
return self._error(params[0], 1005, params[3])
Expand Down Expand Up @@ -391,7 +415,7 @@ def _set_status(self, params):
except ValueError:
return self._error(params[0], 1001)

def _getcomp(self, params):
def _get_comp(self, params):
if len(params) != 3 or params[1] != 'BOARD':
return self._error(params[0], 1001)
try:
Expand All @@ -412,6 +436,116 @@ def _getcomp(self, params):
retval += f'BPF=[ {" ".join(map(str,selected_board["BPF"])) } ]'
return retval + '\x0D\x0A'

def _get_cfg(self, params):
retval = ''
if len(params) != 1:
return self._error(params[0], 1001)
else:
retval += self.ack + '\n'
for board in self.boards:
retval += f'BOARD {board["Address"]} '
if board['Status'] == 0:
retval += f'{board["Configuration"]}\n\n'
else:
retval += 'ERR DBE BOARD unreachable\n\n'
return retval[:-2] + '\x0D\x0A'

def _set_dbeatt(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 3:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.atts_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["ATT"])
print(board["ATT"][a[out_idx[b_idx]]])
if ((params[2] == '+3' or params[2] == '-3') and
not (0 <= (float(board["ATT"][a[out_idx[b_idx]]]) +
float(params[2])) <= 31.5)):
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} value out of range\n')
elif (params[2] != '+3' and params[2] != '-3'
and float(params[2]) not in
list(numpy.arange(0, 32, 0.5))):
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} value out of range\n')
elif board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
if params[2] == '+3' or params[2] == '-3':
board["ATT"][a[out_idx[b_idx]]] += float(params[2])
else:
board["ATT"][a[out_idx[b_idx]]] = float(params[2])
retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n'
return retval[:-1] + '\x0D\x0A'

def _get_dbeatt(self, params):
retval = ''
boardx = {}
selected_boards = []
out_idx = []
if len(params) != 2:
return self._error(params[0], 1001)
b, d = zip(*self.outputs)
for i, v in enumerate(d):
if v == params[1]:
out_idx.append(i)
boardx = next((sub for sub in self.boards
if int(sub['Address']) == int(b[i])), None)
selected_boards.append(boardx)
if len(selected_boards) == 0:
return self._error(params[0], 1015)
else:
brd, a = zip(*self.atts_in_boards)
for board in selected_boards:
b_idx = selected_boards.index(board)
print(brd)
print(board["ATT"])
print(board["ATT"][a[out_idx[b_idx]]])

if board["Status"] != 0:
retval += (f'ERR DBE {params[1]} BOARD '
f'{board["Address"]} unreachable\n')
else:
retval += (f'ACK {params[1]} BOARD {board["Address"]} '
f'ATT {a[out_idx[b_idx]]} VALUE '
f'{board["ATT"][a[out_idx[b_idx]]]}\n')
return retval[:-1] + '\x0D\x0A'

def _get_firm(self, params):
if len(params) != 3 or params[1] != 'BOARD':
return self._error(params[0], 1001)
try:
selected_board = next((sub for sub in self.boards
if int(sub['Address']) == int(params[2])), None)
except ValueError:
return self._error(params[0], 1001)

if selected_board is None:
return self._error(params[0], 1007, params[2])
elif selected_board["Status"] != 0:
return self._error(params[0], 1005, params[2])
else:
retval = self.ack + '\n'
retval += f'BOARD {selected_board["Address"]} '
retval += f'Prog=DBESM, Rev=rev {selected_board["FIRM"]}'
return retval + '\x0D\x0A'

def _error(self, device_code, error_code, board_address=None):
error_string = self.errors.get(error_code)
if error_code == 1001:
Expand Down
Loading

0 comments on commit 055e0f2

Please sign in to comment.