-
Notifications
You must be signed in to change notification settings - Fork 168
/
Copy pathauart_hd.py
119 lines (100 loc) · 3.71 KB
/
auart_hd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# auart_hd.py
# Author: Peter Hinch
# Copyright Peter Hinch 2018-2020 Released under the MIT license
# Demo of running a half-duplex protocol to a device. The device never sends
# unsolicited messages. An example is a communications device which responds
# to AT commands.
# The master sends a message to the device, which may respond with one or more
# lines of data. The master assumes that the device has sent all its data when
# a timeout has elapsed.
# In this test a physical device is emulated by the Device class
# To test link X1-X4 and X2-X3
from pyb import UART
import asyncio
from primitives.delay_ms import Delay_ms
# Dummy device waits for any incoming line and responds with 4 lines at 1 second
# intervals.
class Device:
def __init__(self, uart_no=4):
self.uart = UART(uart_no, 9600)
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
asyncio.create_task(self._run())
async def _run(self):
responses = ["Line 1", "Line 2", "Line 3", "Goodbye"]
while True:
res = await self.sreader.readline()
for response in responses:
await self.swriter.awrite("{}\r\n".format(response))
# Demo the fact that the master tolerates slow response.
await asyncio.sleep_ms(300)
# The master's send_command() method sends a command and waits for a number of
# lines from the device. The end of the process is signified by a timeout, when
# a list of lines is returned. This allows line-by-line processing.
# A special test mode demonstrates the behaviour with a non-responding device. If
# None is passed, no commend is sent. The master waits for a response which never
# arrives and returns an empty list.
class Master:
def __init__(self, uart_no=2, timeout=4000):
self.uart = UART(uart_no, 9600)
self.timeout = timeout
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
self.delay = Delay_ms()
self.response = []
asyncio.create_task(self._recv())
async def _recv(self):
while True:
res = await self.sreader.readline()
self.response.append(res) # Append to list of lines
self.delay.trigger(self.timeout) # Got something, retrigger timer
async def send_command(self, command):
self.response = [] # Discard any pending messages
if command is None:
print("Timeout test.")
else:
await self.swriter.awrite("{}\r\n".format(command))
print("Command sent:", command)
self.delay.trigger(self.timeout) # Re-initialise timer
while self.delay.running():
await asyncio.sleep(1) # Wait for 4s after last msg received
return self.response
async def main():
print("This test takes 10s to complete.")
master = Master()
device = Device()
for cmd in ["Run", None]:
print()
res = await master.send_command(cmd)
# can use b''.join(res) if a single string is required.
if res:
print("Result is:")
for line in res:
print(line.decode("UTF8"), end="")
else:
print("Timed out waiting for result.")
def printexp():
st = """Expected output:
This test takes 10s to complete.
Command sent: Run
Result is:
Line 1
Line 2
Line 3
Goodbye
Timeout test.
Timed out waiting for result.
"""
print("\x1b[32m")
print(st)
print("\x1b[39m")
def test():
printexp()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Interrupted")
finally:
asyncio.new_event_loop()
print("as_demos.auart_hd.test() to run again.")
test()