-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubject.go
144 lines (123 loc) · 3.85 KB
/
subject.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package reactive
import (
"errors"
"reflect"
"sync"
)
// subject is the basic implementation of a subjectable
type subject struct {
Subscriptions sync.Map
}
// AsChannel returns a channel which will receive all
// further updates of this observable
func (subject *subject) AsChannel() chan []interface{} {
channel := make(chan []interface{})
go subject.Subscribe(func(args ...interface{}) {
go func(channel chan []interface{}) {
channel <- args
}(channel)
})
return channel
}
// Close will remove all subscribers and render the subjectable useless
func (subject *subject) Close() {
subject.Subscriptions = sync.Map{}
}
// Next takes an undefined amount of parameters which will be passed to
// subscribed functions
func (subject *subject) Next(values ...interface{}) {
subject.Subscriptions.Range(func(subscription, value interface{}) bool {
subject.notifySubscriber(subscription, values)
return true
})
}
func (subject subject) notifySubscriber(subscription interface{}, values []interface{}) {
if fn, ok := subject.Subscriptions.Load(subscription); ok {
refFn := reflect.TypeOf(fn)
fnArgs := make([]reflect.Value, 0, refFn.NumIn())
for argIndex := 0; argIndex < refFn.NumIn(); argIndex++ {
if len(values) == argIndex {
return
}
providedVal := values[argIndex]
// Variadic arguments need special treatment
if refFn.IsVariadic() && refFn.In(argIndex).Kind() == reflect.Slice && argIndex == refFn.NumIn()-1 {
sliceType := refFn.In(argIndex).Elem()
for _, innerVal := range values[argIndex:len(values)] {
if innerVal == nil {
fnArgs = append(fnArgs, reflect.New(sliceType).Elem())
continue
}
if !reflect.TypeOf(innerVal).AssignableTo(sliceType) {
// Slice does not match received data, skipping this subscriber
return
}
fnArgs = append(fnArgs, reflect.ValueOf(innerVal))
}
// Finish loop as we have filled in all data to the slice
break
} else {
argType := refFn.In(argIndex)
if providedVal == nil {
values[argIndex] = reflect.New(argType).Elem()
providedVal = values[argIndex]
}
if !reflect.TypeOf(providedVal).AssignableTo(argType) {
// Method signature not compatible with this input. Skipping subscriber
return
}
fnArgs = append(fnArgs, reflect.ValueOf(values[argIndex]))
if argIndex == refFn.NumIn()-1 {
if refFn.NumIn() != len(fnArgs) {
// Skipping non-slice overflow
return
}
}
}
}
if fn != nil {
reflect.ValueOf(fn).Call(fnArgs)
}
}
}
// Pipe decorates an observable with one or multiple middlewares
// and returns a new observable with the decoration applied
func (su *subject) Pipe(fns ...func(Observable, Subjectable)) Observable {
parent := su
for _, fn := range fns {
if fn == nil {
continue
}
sub := NewSubject().(*subject)
fn(parent, sub)
parent = sub
}
return parent
}
// Subscribe registers a function for further updates of
// this observable and returns a subscription token which can
// be used to unsubscribe from it at any time
func (subject *subject) Subscribe(fn interface{}) (Subscription, error) {
if fn != nil && reflect.TypeOf(fn).Kind() == reflect.Func {
subscription := NewSubscription()
subject.Subscriptions.Store(subscription, fn)
return subscription, nil
}
return EmptySubscription(), errors.New("fn is not a function")
}
// Unsubscribe unregisters a previously registered function for all
// further updates of this observable or until re-registering.
func (subject *subject) Unsubscribe(subscription Subscription) error {
if _, ok := subject.Subscriptions.Load(subscription); !ok {
return errors.New("Subscription not found in subject")
}
subject.Subscriptions.Delete(subscription)
return nil
}
// NewSubject returns a pointer
// to an empty instance of subject
func NewSubject() Subjectable {
return &subject{
Subscriptions: sync.Map{},
}
}