-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync.lua
196 lines (156 loc) · 4.11 KB
/
async.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
local guard = require("meido.guard")
local meta = require("meido.meta")
local coroutine_create = coroutine.create
local coroutine_running = coroutine.running
local coroutine_yield = coroutine.yield
local coroutine_resume = coroutine.resume
local remove = table.remove
local async = {}
-- cancellation token
local cancellation_token = {}
cancellation_token.__index = cancellation_token
function cancellation_token:notify(callback)
guard.callable("callback", callback)
local on_cancelled = self.on_cancelled
on_cancelled[#on_cancelled+1] = callback
end
function cancellation_token:unnotify(callback)
guard.callable("callback", callback)
local on_cancelled = self.on_cancelled
for i = 1, #on_cancelled do
if on_cancelled[i] == callback then
remove(on_cancelled, i)
return true
end
end
return false
end
function cancellation_token:link_to(other_token)
assert(getmetatable(other_token) == cancellation_token,
"other_token must be cancellation token")
other_token:notify(function()
self:cancel()
end)
end
local function is_cancelled_false()
return false
end
local function is_cancelled_true()
return true
end
function cancellation_token:cancel()
if self.is_cancelled == is_cancelled_true then
return
end
self.is_cancelled = is_cancelled_true
local on_cancelled = self.on_cancelled
for i = 1, #on_cancelled do
on_cancelled[i]()
on_cancelled[i] = nil
end
end
local cancelled_token = setmetatable({
on_cancelled = meta.ungrowable {},
is_cancelled = is_cancelled_true
}, cancellation_token)
async.cancellation_token = function(cancelled)
if cancelled then
return cancelled_token
end
local t = {
on_cancelled = {},
is_cancelled = is_cancelled_false
}
return setmetatable(t, cancellation_token)
end
-- methods
local function current_coroutine()
return assert(coroutine_running(),
"should be run in coroutine")
end
local function safe_resume(co, ...)
local success, err =
coroutine_resume(co, ...)
if not success then
error(err, 0)
end
end
async.current_coroutine = current_coroutine
async.safe_resume = safe_resume
async.spawner = function(f)
return function(...)
local co = coroutine_create(f)
safe_resume(co, ...)
return co
end
end
async.run = function(f, ...)
local co = coroutine_create(f)
safe_resume(co, ...)
return co
end
async.forever = function(f, ...)
local co = coroutine_create(function()
while true do
f()
end
end)
safe_resume(co, ...)
return co
end
async.all = function(fs)
guard.table("fs", fs)
local fs_count = #fs
for i = 1, fs_count do
local f = fs[i]
guard.callable("element in fs", f)
end
local co = current_coroutine()
local results = {}
local resumed = false
for i = 1, #fs do
local f = fs[i]
local f_co = coroutine_create(function()
local res, err = f()
if resumed then
return -- ignored
elseif not res then
safe_resume(co, nil, err)
else
local count = #results + 1
results[count] = res
if count ~= fs_count then
return
end
safe_resume(co, results)
end
resumed = true
end)
safe_resume(f_co)
end
return coroutine_yield()
end
async.any = function(fs)
guard.table("fs", fs)
local fs_count = #fs
for i = 1, fs_count do
local f = fs[i]
guard.callable("element in fs", f)
end
local co = current_coroutine()
local resumed = false
for i = 1, #fs do
local f = fs[i]
local f_co = coroutine_create(function()
local res, err = f()
if resumed then
return -- ignored
end
safe_resume(co, res, err)
resuemd = true
end)
safe_resume(f_co)
end
return coroutine_yield()
end
return async