generated from snivilised/astrolib
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpool-defs-internal.go
132 lines (119 loc) Β· 3.51 KB
/
pool-defs-internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package pants
import (
"time"
)
const (
// TODO: This is just temporary, channel size definition still needs to be
// fine tuned
//
DefaultChSize = 100
)
type (
workerID string
workerFinishedResult struct {
id workerID
err error
}
finishedStream = chan *workerFinishedResult
finishedStreamR = <-chan *workerFinishedResult
finishedStreamW = chan<- *workerFinishedResult
injectable[I any] interface {
inject(input I) error
}
closable interface {
terminate()
}
)
type injector[I any] func(input I) error
func (f injector[I]) inject(input I) error {
return f(input)
}
type terminator func()
func (f terminator) terminate() {
f()
}
type outputInfo[O any] struct {
outputDupCh *Duplex[JobOutput[O]]
cancelDupCh *Duplex[CancelWorkSignal]
}
type outputInfoW[O any] struct {
outputCh JobOutputStreamW[O]
cancelCh CancelStreamW
timeoutOnSend time.Duration
}
// Worker pool types:
//
// πΊ ManifoldFuncPool (to be used by traverse):
// description: this is the most comprehensive pool type with return
// semantics. It is functional meaning that the pool is defined by a
// predefined executive function.
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: JobOutput(O), error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: tbd (this is the result that represents the overall pool result.
// If pool shuts down as a result of premature error or ctrl-c abort, then this
// will be reflected in the pool's result).
//
// πΊ ManifoldTaskPool:
// description: like ManifoldFuncPool but accepts task based jobs meaning each
// job can be any function as opposed to be being a pre-defined function registered
// with the pool. Each job accepts an input I and emits an output O with an error.
// ants: Pool
// post(ants): Submit
// job(Param): Job(func(I) JobOutput(O), error)
// job-return: JobOutput(O), error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: yes
//
// πΊ FuncPoolE
// description: A simple functional pool with fire and return semantics. Client
// submits jobs with only an error return value.
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: none; error only
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: yes
// observable: none
// start: returns completion stream
// pool-result: yes
//
// πΊ FuncPool
// description: A simple functional pool with fire and forget semantics. Client
// submits jobs with no return value
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: none
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: no
// observable: none
// start: returns completion stream
// pool-result: yes
//
// πΊ TaskPoolE
// description: accepts task based jobs. Each job accepts an input I and
// emits only an error return value.
// ants: Pool
// post(ants): Submit
// job(Param): Job(func(I) error)
// job-return: error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: yes
//