@@ -34,7 +34,7 @@ def test_port_map_and_compiled_programs(self):
34
34
for i in range (9 , 27 ):
35
35
self .assertEqual (node_data .inPortByteMap [i ].name , 'RecordSignal' )
36
36
self .assertEqual (node_data .inPortByteMap [i ].id , 2 )
37
-
37
+
38
38
self .assertEqual (len (node_data .outPortDataMap ), 4 )
39
39
elem = node_data .outPortDataMap [0 ]
40
40
self .assertEqual (elem .data_offset , 0 )
@@ -57,11 +57,11 @@ def test_port_map_and_compiled_programs(self):
57
57
apx .OPCODE_PACK_U16 ])
58
58
self .assertEqual (node_data .outPortPrograms [0 ], expected )
59
59
expected = bytes ([apx .OPCODE_PACK_PROG , apx .UINT8_LEN ,0 ,0 ,0 ,
60
- apx .OPCODE_PACK_U8 ])
60
+ apx .OPCODE_PACK_U8 ])
61
61
self .assertEqual (node_data .outPortPrograms [1 ], expected )
62
62
expected = bytes ([apx .OPCODE_PACK_PROG , apx .UINT32_LEN ,0 ,0 ,0 ,
63
- apx .OPCODE_PACK_U32 ])
64
- self .assertEqual (node_data .outPortPrograms [2 ], expected )
63
+ apx .OPCODE_PACK_U32 ])
64
+ self .assertEqual (node_data .outPortPrograms [2 ], expected )
65
65
expected = bytes ([apx .OPCODE_PACK_PROG , (3 * apx .UINT16_LEN + apx .UINT32_LEN ),0 ,0 ,0 ,
66
66
apx .OPCODE_RECORD_ENTER ,
67
67
apx .OPCODE_RECORD_SELECT ])+ "SensorData\0 " .encode ('ascii' )+ bytes ([
@@ -75,10 +75,10 @@ def test_port_map_and_compiled_programs(self):
75
75
apx .OPCODE_RECORD_LEAVE ,
76
76
apx .OPCODE_RECORD_SELECT ])+ "TimeStamp\0 " .encode ('ascii' )+ bytes ([
77
77
apx .OPCODE_PACK_U32 ,
78
- apx .OPCODE_RECORD_LEAVE
78
+ apx .OPCODE_RECORD_LEAVE
79
79
])
80
- self .assertEqual (node_data .outPortPrograms [3 ], expected )
81
-
80
+ self .assertEqual (node_data .outPortPrograms [3 ], expected )
81
+
82
82
expected = bytes ([apx .OPCODE_UNPACK_PROG , apx .UINT8_LEN ,0 ,0 ,0 ,
83
83
apx .OPCODE_UNPACK_U8 ])
84
84
self .assertEqual (node_data .inPortPrograms [0 ], expected )
@@ -100,7 +100,7 @@ def test_port_map_and_compiled_programs(self):
100
100
101
101
102
102
class TestNodeDataRead (unittest .TestCase ):
103
-
103
+
104
104
def test_read_port_RheostatLevelRqst (self ):
105
105
node = create_node_and_data ()
106
106
port_RheostatLevelRqst = node .find ('RheostatLevelRqst' )
@@ -153,7 +153,7 @@ def test_read_RecordSignal(self):
153
153
self .assertEqual (node_data .read_require_port (port_RecordSignal ), {'Name' : "abcdefgh" , 'Id' :0x12345678 , 'Data' : [0 ,0 ,1 ]})
154
154
input_file .write (data_offset , struct .pack ("<HHH" ,18000 ,2 ,10 ))
155
155
self .assertEqual (node_data .read_require_port (port_RecordSignal ), {'Name' : "abcdefgh" , 'Id' :0x12345678 , 'Data' : [18000 ,2 ,10 ]})
156
-
156
+
157
157
def test_byte_to_port_all (self ):
158
158
node = create_node_and_data ()
159
159
node_data = apx .NodeData (node )
@@ -179,11 +179,11 @@ def test_byte_to_port_all(self):
179
179
self .assertEqual (result [0 ][2 ], RheostatLevelRqst_data_len )
180
180
self .assertEqual (result [1 ][2 ], StrSignal_data_len )
181
181
self .assertEqual (result [2 ][2 ], RecordSignal_data_len )
182
-
182
+
183
183
def test_byte_to_port_RheostatLevelRqst (self ):
184
184
node = create_node_and_data ()
185
185
node_data = apx .NodeData (node )
186
-
186
+
187
187
RheostatLevelRqst_data_offset = 0
188
188
RheostatLevelRqst_data_len = 1
189
189
result = list (node_data .byte_to_port (RheostatLevelRqst_data_offset , RheostatLevelRqst_data_len ))
@@ -196,7 +196,7 @@ def test_byte_to_port_RheostatLevelRqst(self):
196
196
def test_byte_to_port_StrSignal (self ):
197
197
node = create_node_and_data ()
198
198
node_data = apx .NodeData (node )
199
-
199
+
200
200
StrSignal_data_offset = 1
201
201
StrSignal_data_len = 8
202
202
for offset in range (StrSignal_data_offset , StrSignal_data_offset + StrSignal_data_len ):
@@ -210,7 +210,7 @@ def test_byte_to_port_StrSignal(self):
210
210
def test_byte_to_port_RecordSignal (self ):
211
211
node = create_node_and_data ()
212
212
node_data = apx .NodeData (node )
213
-
213
+
214
214
RecordSignal_data_offset = 9
215
215
RecordSignal_data_len = 18
216
216
for offset in range (RecordSignal_data_offset , RecordSignal_data_offset + RecordSignal_data_len ):
@@ -220,20 +220,20 @@ def test_byte_to_port_RecordSignal(self):
220
220
self .assertIs (port , node .find ('RecordSignal' ))
221
221
self .assertEqual (offset , RecordSignal_data_offset )
222
222
self .assertEqual (length , RecordSignal_data_len )
223
-
223
+
224
224
def test_byte_to_port_invalid_args (self ):
225
225
node = create_node_and_data ()
226
226
node_data = apx .NodeData (node )
227
227
self .assertEqual (len (node_data .inPortByteMap ), 27 )
228
-
228
+
229
229
with self .assertRaises (ValueError ) as context :
230
230
result = list (node_data .byte_to_port (28 ,1 ))
231
231
self .assertEqual (str (context .exception ), "start_offset (28) is beyond length of file (27)" )
232
-
232
+
233
233
with self .assertRaises (ValueError ) as context :
234
234
result = list (node_data .byte_to_port (25 ,5 ))
235
235
self .assertEqual (str (context .exception ), "end_offset (30) is beyond length of file (27)" )
236
-
236
+
237
237
RecordSignal_data_offset = 9
238
238
RecordSignal_data_len = 18
239
239
result = list (node_data .byte_to_port (25 ,2 ))
@@ -243,14 +243,14 @@ def test_byte_to_port_invalid_args(self):
243
243
self .assertEqual (length , RecordSignal_data_len )
244
244
245
245
def test_callback (self ):
246
-
246
+
247
247
call_history = []
248
-
248
+
249
249
@apx .NodeDataClient .register
250
250
class Listener :
251
251
def on_require_port_data (self , port , value ):
252
252
call_history .append ((port , value ))
253
-
253
+
254
254
listener_obj = Listener ()
255
255
node = create_node_and_data ()
256
256
node_data = apx .NodeData (node )
@@ -273,7 +273,7 @@ def on_require_port_data(self, port, value):
273
273
self .assertEqual (len (call_history ), 2 )
274
274
self .assertEqual (call_history [- 1 ][0 ], node .find ('RheostatLevelRqst' ))
275
275
self .assertEqual (call_history [- 1 ][1 ], 255 )
276
-
276
+
277
277
#test write RecordSignal
278
278
input_file .write (RecordSignal_data_offset , "Test" .encode ('utf-8' ))
279
279
self .assertEqual (len (call_history ), 3 )
@@ -285,7 +285,7 @@ def on_require_port_data(self, port, value):
285
285
self .assertEqual (call_history [- 1 ][1 ], {'Name' : "Abc" , 'Id' : 918 , 'Data' :[1000 ,2000 ,4000 ]})
286
286
287
287
class TestNodeDataWrite (unittest .TestCase ):
288
-
288
+
289
289
def test_write_VehicleSpeed (self ):
290
290
node = create_node_and_data ()
291
291
node_data = apx .NodeData (node )
@@ -295,9 +295,9 @@ def test_write_VehicleSpeed(self):
295
295
output_file = node_data .outPortDataFile
296
296
#verify init value
297
297
self .assertEqual (output_file .read (VehicleSpeed_offset , VehicleSpeed_length ), bytes ([0xFF , 0xFF ]))
298
- node_data .write_provide_port (VehicleSpeed_port , 0x1234 )
298
+ node_data .write_provide_port (VehicleSpeed_port , 0x1234 )
299
299
self .assertEqual (output_file .read (VehicleSpeed_offset , VehicleSpeed_length ), bytes ([0x34 , 0x12 ]))
300
-
300
+
301
301
def test_write_MainBeam (self ):
302
302
node = create_node_and_data ()
303
303
node_data = apx .NodeData (node )
@@ -311,7 +311,7 @@ def test_write_MainBeam(self):
311
311
self .assertEqual (output_file .read (MainBeam_offset , MainBeam_length ), bytes ([0 ]))
312
312
node_data .write_provide_port (MainBeam_port , 3 )
313
313
self .assertEqual (output_file .read (MainBeam_offset , MainBeam_length ), bytes ([3 ]))
314
-
314
+
315
315
def test_write_TotalDistance (self ):
316
316
node = create_node_and_data ()
317
317
node_data = apx .NodeData (node )
@@ -338,7 +338,7 @@ def test_write_ComplexRecordSignal(self):
338
338
#write some values
339
339
node_data .write_provide_port (ComplexRecordSignal_port , {"SensorData" : dict (x = 1 , y = 2 , z = 3 ), 'TimeStamp' :0 })
340
340
self .assertEqual (output_file .read (ComplexRecordSignal_offset , ComplexRecordSignal_length ), struct .pack ("<HHHL" , 1 , 2 , 3 , 0 ))
341
-
341
+
342
342
def test_write_string (self ):
343
343
node = apx .Node ('TestNode' )
344
344
port = node .append (apx .ProvidePort ('StrSignal' , 'a[6]' , '=""' ))
@@ -374,7 +374,7 @@ def test_write_s8(self):
374
374
self .assertEqual (output_file .read (signal_offset , signal_length ), struct .pack ('<b' , 127 ))
375
375
node_data .write_provide_port (port , 0 )
376
376
self .assertEqual (output_file .read (signal_offset , signal_length ), struct .pack ('<b' , 0 ))
377
-
377
+
378
378
def test_write_s16 (self ):
379
379
node = apx .Node ('TestNode' )
380
380
port = node .append (apx .ProvidePort ('S16Signal' , 's' , '=0' ))
@@ -411,7 +411,7 @@ def test_write_s32(self):
411
411
node_data .write_provide_port (port , 2147483647 )
412
412
self .assertEqual (output_file .read (signal_offset , signal_length ), struct .pack ('<i' ,2147483647 ))
413
413
node_data .write_provide_port (port , 0 )
414
- self .assertEqual (output_file .read (signal_offset , signal_length ), struct .pack ('<i' , 0 ))
415
-
414
+ self .assertEqual (output_file .read (signal_offset , signal_length ), struct .pack ('<i' , 0 ))
415
+
416
416
if __name__ == '__main__' :
417
- unittest .main ()
417
+ unittest .main ()
0 commit comments