-
Notifications
You must be signed in to change notification settings - Fork 168
/
Copy pathgather.py
108 lines (85 loc) · 2.24 KB
/
gather.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# gather.py Demo of Gatherable coroutines. Includes 3 cases:
# 1. A normal coro
# 2. A coro with a timeout
# 3. A cancellable coro
import asyncio
async def barking(n):
print("Start normal coro barking()")
for _ in range(6):
await asyncio.sleep(1)
print("Done barking.")
return 2 * n
async def foo(n):
print("Start timeout coro foo()")
try:
while True:
await asyncio.sleep(1)
n += 1
except asyncio.CancelledError:
print("Trapped foo timeout.")
raise
return n
async def bar(n):
print("Start cancellable bar()")
try:
while True:
await asyncio.sleep(1)
n += 1
except asyncio.CancelledError: # Demo of trapping
print("Trapped bar cancellation.")
raise
return n
async def do_cancel(task):
await asyncio.sleep(5)
print("About to cancel bar")
task.cancel()
async def main(rex):
bar_task = asyncio.create_task(bar(70)) # Note args here
tasks = []
tasks.append(barking(21))
tasks.append(asyncio.wait_for(foo(10), 7))
asyncio.create_task(do_cancel(bar_task))
try:
res = await asyncio.gather(*tasks, return_exceptions=rex)
except asyncio.TimeoutError:
print("foo timed out.")
res = "No result"
print("Result: ", res)
exp_false = """Test runs for 10s. Expected output:
Start cancellable bar()
Start normal coro barking()
Start timeout coro foo()
About to cancel bar
Trapped bar cancellation.
Done barking.
Trapped foo timeout.
foo timed out.
Result: No result
"""
exp_true = """Test runs for 10s. Expected output:
Start cancellable bar()
Start normal coro barking()
Start timeout coro foo()
About to cancel bar
Trapped bar cancellation.
Done barking.
Trapped foo timeout.
Result: [42, TimeoutError()]
"""
def printexp(st):
print("\x1b[32m")
print(st)
print("\x1b[39m")
def test(rex):
st = exp_true if rex else exp_false
printexp(st)
try:
asyncio.run(main(rex))
except KeyboardInterrupt:
print("Interrupted")
finally:
asyncio.new_event_loop()
print()
print("as_demos.gather.test() to run again.")
print("as_demos.gather.test(True) to see effect of return_exceptions.")
test(rex=False)