diff --git a/README.md b/README.md index 86ee23c..147096d 100644 --- a/README.md +++ b/README.md @@ -10,21 +10,19 @@ and the handler is having the following signature: `func OnMyEventOccurred(event InterestingEvent)` -where `InterestingEvent` has to be a struct which has to implement two methods: - -`type InterestingEvent struct{}` +The event producer will simply do: -`func (e InterestingEvent) EventID() string{ return "MyUniqueName" }` +`bus.Pub(InterestingEvent{})` -where the string which represents the name of the event has to be unique across the event system. +Optional, to allow the bus to spin a goroutine for dispatching events, implement the following interface: `func (e InterestingEvent) Async() bool{ return true }` -where we signal that the event will be passed to the listeners by spinning up a goroutine. +or -The event producer will simply do: +`func (e *InterestingEvent) Async() bool{ return true }` -`bus.Pub(InterestingEvent{})` +By default, the bus is using sync events : waits for listeners to complete their jobs before calling the next listener. Usage : `go get github.com/badu/bus` @@ -100,10 +98,6 @@ Inside the `test_scenarios` folder, you can find the following scenarios: I am sure that you will find this technique interesting and having a large number of applications. - An important note is about not forgetting to implement the `EventID() string` correctly, as incorrect naming triggers - panic (expecting one type of event, but receiving another). To exemplify this, just alter the return of - this [function](https://github.com/badu/bus/blob/main/test_scenarios/request-reply-callback/events/main.go#L24). - 4. Request Reply with Cancellation Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain. @@ -120,7 +114,9 @@ Inside the `test_scenarios` folder, you can find the following scenarios: because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup` inside your event struct, always use method receivers and pass the event as pointer, otherwise you will be passing a lock by value (which is `sync.Locker`). -2. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the +3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the dispatcher. You should still have at least one property of that event that is a pointer (see events in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be passed around with an event that is being sent by value, not by pointer. +4. you can override the event name (which is by default, built using `fmt.Sprintf("%T", yourEvent)`) you need to + implement `EventID() string` interface. diff --git a/bench_test.go b/bench_test.go index 26bc2fa..0fc2fbd 100644 --- a/bench_test.go +++ b/bench_test.go @@ -11,22 +11,10 @@ type Uint32SyncEvent struct { u uint32 } -func (u Uint32SyncEvent) EventID() string { - return "Uint32SyncEvent" -} - -func (u Uint32SyncEvent) Async() bool { - return false -} - type Uint32AsyncEvent struct { u uint32 } -func (u Uint32AsyncEvent) EventID() string { - return "Uint32AsyncEvent" -} - func (u Uint32AsyncEvent) Async() bool { return true } diff --git a/main.go b/main.go index 4cbe4d7..c3786a1 100644 --- a/main.go +++ b/main.go @@ -1,33 +1,38 @@ package bus import ( + "fmt" "sync" "sync/atomic" ) -var mapper sync.Map // holds key (event id, typed string) versus topic values +var mapper sync.Map // holds key (event name - string) versus topic values -// internal interface that all the events must implement -type iEvent interface { +// we allow developers to override event names. They should be careful about name collisions +type iEventName interface { EventID() string // - Async() bool // if returns true, this event will be triggered by spinning a goroutine +} + +// if developers implement this interface, we're spinning a goroutine if the event says it is async +type iAsync interface { + Async() bool } // Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic -type Listener[T iEvent] struct { +type Listener[T any] struct { parent *Topic[T] // so we can call unsubscribe from parent callback func(event T) // the function that we're going to call } // Topic keeps the subscribers of one topic -type Topic[T iEvent] struct { +type Topic[T any] struct { subs []*Listener[T] // list of listeners rwMu sync.RWMutex // guards subs lisnsPool sync.Pool // a pool of listeners } // NewTopic creates a new topic for a specie of events -func NewTopic[T iEvent]() *Topic[T] { +func NewTopic[T any]() *Topic[T] { result := &Topic[T]{} result.lisnsPool.New = func() any { return &Listener[T]{ @@ -90,19 +95,26 @@ func (s *Listener[T]) Topic() *Topic[T] { // Pub allows you to publish an event in that topic func (b *Topic[T]) Pub(event T) { b.rwMu.RLock() - for topic := range b.subs { - if event.Async() { - go b.subs[topic].callback(event) + + isAsync := false + switch m := any(event).(type) { + case iAsync: + isAsync = m.Async() + } + + for sub := range b.subs { + if isAsync { + go b.subs[sub].callback(event) continue } - b.subs[topic].callback(event) + b.subs[sub].callback(event) } b.rwMu.RUnlock() } // Bus is being returned when you subscribe, so you can manually Unsub -type Bus[T iEvent] struct { +type Bus[T any] struct { listener *Listener[T] stop atomic.Uint32 // flag for unsubscribing after receiving one event } @@ -115,11 +127,20 @@ func (o *Bus[T]) Unsub() { } // SubUnsub can be used if you need to unsubscribe immediately after receiving an event, by making your function return true -func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] { +func SubUnsub[T any](callback func(event T) bool) *Bus[T] { var event T - topic, ok := mapper.Load(event.EventID()) + + key := "" + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) if !ok || topic == nil { - topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]()) + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) } var result Bus[T] @@ -140,11 +161,20 @@ func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] { } // Sub subscribes a callback function to listen for a specie of events -func Sub[T iEvent](callback func(event T)) *Bus[T] { +func Sub[T any](callback func(event T)) *Bus[T] { var event T - topic, ok := mapper.Load(event.EventID()) + + key := "" + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) if !ok || topic == nil { - topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]()) + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) } var result Bus[T] @@ -160,10 +190,19 @@ func Sub[T iEvent](callback func(event T)) *Bus[T] { } // Pub publishes an event which will be dispatched to all listeners -func Pub[T iEvent](event T) { - topic, ok := mapper.Load(event.EventID()) +func Pub[T any](event T) { + key := "" + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) if !ok || topic == nil { // create new topic, even if there are no listeners (otherwise we will have to panic) - topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]()) + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) } + topic.(*Topic[T]).Pub(event) } diff --git a/test_scenarios/factory-request-reply/events/main.go b/test_scenarios/factory-request-reply/events/main.go index a7f4a8d..17a9c0a 100644 --- a/test_scenarios/factory-request-reply/events/main.go +++ b/test_scenarios/factory-request-reply/events/main.go @@ -7,11 +7,6 @@ import ( "github.com/badu/bus/test_scenarios/factory-request-reply/prices" ) -const ( - InventoryGRPCClientRequestEventType = "InventoryGRPCClientRequestEvent" - PricesGRPCClientRequestEventType = "PricesGRPCClientRequestEvent" -) - type InventoryGRPCClientRequestEvent struct { wg sync.WaitGroup Conn Closer // should be *grpc.ClientConn, but we're avoiding the import @@ -24,10 +19,6 @@ func NewInventoryGRPCClientRequestEvent() *InventoryGRPCClientRequestEvent { return &result } -func (i *InventoryGRPCClientRequestEvent) EventID() string { - return InventoryGRPCClientRequestEventType -} - func (i *InventoryGRPCClientRequestEvent) Async() bool { return true // this one is async } @@ -52,14 +43,6 @@ func NewPricesGRPCClientRequestEvent() *PricesGRPCClientRequestEvent { return &result } -func (p *PricesGRPCClientRequestEvent) EventID() string { - return PricesGRPCClientRequestEventType -} - -func (p *PricesGRPCClientRequestEvent) Async() bool { - return false // this one is sync -} - func (p *PricesGRPCClientRequestEvent) WaitReply() { p.wg.Wait() } diff --git a/test_scenarios/fire-and-forget/events/main.go b/test_scenarios/fire-and-forget/events/main.go index 0aae0d0..1c1c0fe 100644 --- a/test_scenarios/fire-and-forget/events/main.go +++ b/test_scenarios/fire-and-forget/events/main.go @@ -1,21 +1,10 @@ package events -const ( - UserRegisteredEventType string = "UserRegisteredEvent" - SMSRequestEventType string = "SMSRequestEvent" - SMSSentEventType string = "SmsSentEvent" - DummyEventType string = "DummyEvent" -) - type UserRegisteredEvent struct { UserName string Phone string } -func (e UserRegisteredEvent) EventID() string { - return UserRegisteredEventType -} - func (e UserRegisteredEvent) Async() bool { return true } @@ -25,10 +14,6 @@ type SMSRequestEvent struct { Message string } -func (e SMSRequestEvent) EventID() string { - return SMSRequestEventType -} - func (e SMSRequestEvent) Async() bool { return true } @@ -38,21 +23,14 @@ type SMSSentEvent struct { Status string } -func (e SMSSentEvent) EventID() string { - return SMSSentEventType -} - func (e SMSSentEvent) Async() bool { return true } type DummyEvent struct { -} - -func (e *DummyEvent) EventID() string { - return DummyEventType + AlteredAsync bool } func (e *DummyEvent) Async() bool { - return true + return e.AlteredAsync } diff --git a/test_scenarios/fire-and-forget/main_test.go b/test_scenarios/fire-and-forget/main_test.go index 26032cf..d54c72a 100644 --- a/test_scenarios/fire-and-forget/main_test.go +++ b/test_scenarios/fire-and-forget/main_test.go @@ -2,15 +2,22 @@ package fire_and_forget import ( "context" + "fmt" "strings" "testing" "time" + "github.com/badu/bus" "github.com/badu/bus/test_scenarios/fire-and-forget/audit" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" "github.com/badu/bus/test_scenarios/fire-and-forget/notifications" "github.com/badu/bus/test_scenarios/fire-and-forget/users" ) +func OnDummyEvent(event *events.DummyEvent) { + fmt.Println("dummy event async ?", event.Async()) +} + func TestUserRegistration(t *testing.T) { var sb strings.Builder @@ -19,6 +26,8 @@ func TestUserRegistration(t *testing.T) { notifications.NewEmailService(&sb) audit.NewAuditService(&sb) + bus.Sub(OnDummyEvent) + userSvc.RegisterUser(context.Background(), "Badu", "+40742222222") <-time.After(500 * time.Millisecond) diff --git a/test_scenarios/fire-and-forget/users/service.go b/test_scenarios/fire-and-forget/users/service.go index ba5718b..3a1be56 100644 --- a/test_scenarios/fire-and-forget/users/service.go +++ b/test_scenarios/fire-and-forget/users/service.go @@ -10,6 +10,7 @@ import ( type ServiceImpl struct { sb *strings.Builder + c int } func NewService(sb *strings.Builder) ServiceImpl { @@ -18,6 +19,7 @@ func NewService(sb *strings.Builder) ServiceImpl { } func (s *ServiceImpl) RegisterUser(ctx context.Context, name, phone string) { + s.c++ bus.Pub(events.UserRegisteredEvent{UserName: name, Phone: phone}) - bus.Pub(&events.DummyEvent{}) // nobody listens on this one + bus.Pub(&events.DummyEvent{AlteredAsync: s.c%2 == 0}) // nobody listens on this one } diff --git a/test_scenarios/request-reply-callback/events/main.go b/test_scenarios/request-reply-callback/events/main.go index 85b8764..b382178 100644 --- a/test_scenarios/request-reply-callback/events/main.go +++ b/test_scenarios/request-reply-callback/events/main.go @@ -1,11 +1,5 @@ package events -import ( - "fmt" -) - -const RequestEventType = "RequestEvent" - type RequestEvent[T any] struct { Payload T Callback func() (*T, error) @@ -19,11 +13,6 @@ func NewRequestEvent[T any](payload T) *RequestEvent[T] { } } -func (i *RequestEvent[T]) EventID() string { - var t T - return fmt.Sprintf("%s%T", RequestEventType, t) -} - func (i *RequestEvent[T]) Async() bool { return true // this one is async } diff --git a/test_scenarios/request-reply-callback/orders/service.go b/test_scenarios/request-reply-callback/orders/service.go index 1c7697d..5287303 100644 --- a/test_scenarios/request-reply-callback/orders/service.go +++ b/test_scenarios/request-reply-callback/orders/service.go @@ -20,7 +20,7 @@ func NewService(sb *strings.Builder) ServiceImpl { func (s *ServiceImpl) RegisterOrder(ctx context.Context, productIDs []int) (*Order, error) { event := events.NewRequestEvent[Order](Order{ProductIDs: productIDs}) - s.sb.WriteString(fmt.Sprintf("dispatching event typed %s\n", event.EventID())) + s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event)) bus.Pub(event) <-event.Done // wait for "reply" return event.Callback() // return the callback, which is containing the actual result @@ -28,7 +28,7 @@ func (s *ServiceImpl) RegisterOrder(ctx context.Context, productIDs []int) (*Ord func (s *ServiceImpl) GetOrderStatus(ctx context.Context, orderID int) (*OrderStatus, error) { event := events.NewRequestEvent[OrderStatus](OrderStatus{OrderID: orderID}) - s.sb.WriteString(fmt.Sprintf("dispatching event typed %s\n", event.EventID())) + s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event)) bus.Pub(event) <-event.Done // wait for "reply" return event.Callback() // return the callback, which is containing the actual result diff --git a/test_scenarios/request-reply-with-cancellation/events/main.go b/test_scenarios/request-reply-with-cancellation/events/main.go index 7380518..94f4fa2 100644 --- a/test_scenarios/request-reply-with-cancellation/events/main.go +++ b/test_scenarios/request-reply-with-cancellation/events/main.go @@ -31,11 +31,3 @@ type CreateOrderEvent struct { ProductIDs []int State *EventState } - -func (c CreateOrderEvent) EventID() string { - return "CreateOrderEventType" -} - -func (c CreateOrderEvent) Async() bool { - return false -}