Skip to content

Commit

Permalink
Interface implementations not required anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
badu committed Mar 17, 2023
1 parent cbaf5ac commit ecf5448
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 109 deletions.
22 changes: 9 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@ and the handler is having the following signature:

`func OnMyEventOccurred(event InterestingEvent)`

where `InterestingEvent` has to be a struct which has to implement two methods:

`type InterestingEvent struct{}`
The event producer will simply do:

`func (e InterestingEvent) EventID() string{ return "MyUniqueName" }`
`bus.Pub(InterestingEvent{})`

where the string which represents the name of the event has to be unique across the event system.
Optional, to allow the bus to spin a goroutine for dispatching events, implement the following interface:

`func (e InterestingEvent) Async() bool{ return true }`

where we signal that the event will be passed to the listeners by spinning up a goroutine.
or

The event producer will simply do:
`func (e *InterestingEvent) Async() bool{ return true }`

`bus.Pub(InterestingEvent{})`
By default, the bus is using sync events : waits for listeners to complete their jobs before calling the next listener.

Usage : `go get github.com/badu/bus`

Expand Down Expand Up @@ -100,10 +98,6 @@ Inside the `test_scenarios` folder, you can find the following scenarios:

I am sure that you will find this technique interesting and having a large number of applications.

An important note is about not forgetting to implement the `EventID() string` correctly, as incorrect naming triggers
panic (expecting one type of event, but receiving another). To exemplify this, just alter the return of
this [function](https://github.com/badu/bus/blob/main/test_scenarios/request-reply-callback/events/main.go#L24).

4. Request Reply with Cancellation

Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain.
Expand All @@ -120,7 +114,9 @@ Inside the `test_scenarios` folder, you can find the following scenarios:
because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup`
inside your event struct, always use method receivers and pass the event as pointer, otherwise you will be passing a
lock by value (which is `sync.Locker`).
2. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
dispatcher. You should still have at least one property of that event that is a pointer (see events
in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be
passed around with an event that is being sent by value, not by pointer.
4. you can override the event name (which is by default, built using `fmt.Sprintf("%T", yourEvent)`) you need to
implement `EventID() string` interface.
12 changes: 0 additions & 12 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,10 @@ type Uint32SyncEvent struct {
u uint32
}

func (u Uint32SyncEvent) EventID() string {
return "Uint32SyncEvent"
}

func (u Uint32SyncEvent) Async() bool {
return false
}

type Uint32AsyncEvent struct {
u uint32
}

func (u Uint32AsyncEvent) EventID() string {
return "Uint32AsyncEvent"
}

func (u Uint32AsyncEvent) Async() bool {
return true
}
Expand Down
81 changes: 60 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
package bus

import (
"fmt"
"sync"
"sync/atomic"
)

var mapper sync.Map // holds key (event id, typed string) versus topic values
var mapper sync.Map // holds key (event name - string) versus topic values

// internal interface that all the events must implement
type iEvent interface {
// we allow developers to override event names. They should be careful about name collisions
type iEventName interface {
EventID() string //
Async() bool // if returns true, this event will be triggered by spinning a goroutine
}

// if developers implement this interface, we're spinning a goroutine if the event says it is async
type iAsync interface {
Async() bool
}

// Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic
type Listener[T iEvent] struct {
type Listener[T any] struct {
parent *Topic[T] // so we can call unsubscribe from parent
callback func(event T) // the function that we're going to call
}

// Topic keeps the subscribers of one topic
type Topic[T iEvent] struct {
type Topic[T any] struct {
subs []*Listener[T] // list of listeners
rwMu sync.RWMutex // guards subs
lisnsPool sync.Pool // a pool of listeners
}

// NewTopic creates a new topic for a specie of events
func NewTopic[T iEvent]() *Topic[T] {
func NewTopic[T any]() *Topic[T] {
result := &Topic[T]{}
result.lisnsPool.New = func() any {
return &Listener[T]{
Expand Down Expand Up @@ -90,19 +95,26 @@ func (s *Listener[T]) Topic() *Topic[T] {
// Pub allows you to publish an event in that topic
func (b *Topic[T]) Pub(event T) {
b.rwMu.RLock()
for topic := range b.subs {
if event.Async() {
go b.subs[topic].callback(event)

isAsync := false
switch m := any(event).(type) {
case iAsync:
isAsync = m.Async()
}

for sub := range b.subs {
if isAsync {
go b.subs[sub].callback(event)
continue
}

b.subs[topic].callback(event)
b.subs[sub].callback(event)
}
b.rwMu.RUnlock()
}

// Bus is being returned when you subscribe, so you can manually Unsub
type Bus[T iEvent] struct {
type Bus[T any] struct {
listener *Listener[T]
stop atomic.Uint32 // flag for unsubscribing after receiving one event
}
Expand All @@ -115,11 +127,20 @@ func (o *Bus[T]) Unsub() {
}

// SubUnsub can be used if you need to unsubscribe immediately after receiving an event, by making your function return true
func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] {
func SubUnsub[T any](callback func(event T) bool) *Bus[T] {
var event T
topic, ok := mapper.Load(event.EventID())

key := ""
switch m := any(event).(type) {
case iEventName:
key = m.EventID()
default:
key = fmt.Sprintf("%T", event)
}

topic, ok := mapper.Load(key)
if !ok || topic == nil {
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
}

var result Bus[T]
Expand All @@ -140,11 +161,20 @@ func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] {
}

// Sub subscribes a callback function to listen for a specie of events
func Sub[T iEvent](callback func(event T)) *Bus[T] {
func Sub[T any](callback func(event T)) *Bus[T] {
var event T
topic, ok := mapper.Load(event.EventID())

key := ""
switch m := any(event).(type) {
case iEventName:
key = m.EventID()
default:
key = fmt.Sprintf("%T", event)
}

topic, ok := mapper.Load(key)
if !ok || topic == nil {
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
}

var result Bus[T]
Expand All @@ -160,10 +190,19 @@ func Sub[T iEvent](callback func(event T)) *Bus[T] {
}

// Pub publishes an event which will be dispatched to all listeners
func Pub[T iEvent](event T) {
topic, ok := mapper.Load(event.EventID())
func Pub[T any](event T) {
key := ""
switch m := any(event).(type) {
case iEventName:
key = m.EventID()
default:
key = fmt.Sprintf("%T", event)
}

topic, ok := mapper.Load(key)
if !ok || topic == nil { // create new topic, even if there are no listeners (otherwise we will have to panic)
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
}

topic.(*Topic[T]).Pub(event)
}
17 changes: 0 additions & 17 deletions test_scenarios/factory-request-reply/events/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import (
"github.com/badu/bus/test_scenarios/factory-request-reply/prices"
)

const (
InventoryGRPCClientRequestEventType = "InventoryGRPCClientRequestEvent"
PricesGRPCClientRequestEventType = "PricesGRPCClientRequestEvent"
)

type InventoryGRPCClientRequestEvent struct {
wg sync.WaitGroup
Conn Closer // should be *grpc.ClientConn, but we're avoiding the import
Expand All @@ -24,10 +19,6 @@ func NewInventoryGRPCClientRequestEvent() *InventoryGRPCClientRequestEvent {
return &result
}

func (i *InventoryGRPCClientRequestEvent) EventID() string {
return InventoryGRPCClientRequestEventType
}

func (i *InventoryGRPCClientRequestEvent) Async() bool {
return true // this one is async
}
Expand All @@ -52,14 +43,6 @@ func NewPricesGRPCClientRequestEvent() *PricesGRPCClientRequestEvent {
return &result
}

func (p *PricesGRPCClientRequestEvent) EventID() string {
return PricesGRPCClientRequestEventType
}

func (p *PricesGRPCClientRequestEvent) Async() bool {
return false // this one is sync
}

func (p *PricesGRPCClientRequestEvent) WaitReply() {
p.wg.Wait()
}
Expand Down
26 changes: 2 additions & 24 deletions test_scenarios/fire-and-forget/events/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
package events

const (
UserRegisteredEventType string = "UserRegisteredEvent"
SMSRequestEventType string = "SMSRequestEvent"
SMSSentEventType string = "SmsSentEvent"
DummyEventType string = "DummyEvent"
)

type UserRegisteredEvent struct {
UserName string
Phone string
}

func (e UserRegisteredEvent) EventID() string {
return UserRegisteredEventType
}

func (e UserRegisteredEvent) Async() bool {
return true
}
Expand All @@ -25,10 +14,6 @@ type SMSRequestEvent struct {
Message string
}

func (e SMSRequestEvent) EventID() string {
return SMSRequestEventType
}

func (e SMSRequestEvent) Async() bool {
return true
}
Expand All @@ -38,21 +23,14 @@ type SMSSentEvent struct {
Status string
}

func (e SMSSentEvent) EventID() string {
return SMSSentEventType
}

func (e SMSSentEvent) Async() bool {
return true
}

type DummyEvent struct {
}

func (e *DummyEvent) EventID() string {
return DummyEventType
AlteredAsync bool
}

func (e *DummyEvent) Async() bool {
return true
return e.AlteredAsync
}
9 changes: 9 additions & 0 deletions test_scenarios/fire-and-forget/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package fire_and_forget

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/badu/bus"
"github.com/badu/bus/test_scenarios/fire-and-forget/audit"
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
"github.com/badu/bus/test_scenarios/fire-and-forget/notifications"
"github.com/badu/bus/test_scenarios/fire-and-forget/users"
)

func OnDummyEvent(event *events.DummyEvent) {
fmt.Println("dummy event async ?", event.Async())
}

func TestUserRegistration(t *testing.T) {
var sb strings.Builder

Expand All @@ -19,6 +26,8 @@ func TestUserRegistration(t *testing.T) {
notifications.NewEmailService(&sb)
audit.NewAuditService(&sb)

bus.Sub(OnDummyEvent)

userSvc.RegisterUser(context.Background(), "Badu", "+40742222222")

<-time.After(500 * time.Millisecond)
Expand Down
4 changes: 3 additions & 1 deletion test_scenarios/fire-and-forget/users/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type ServiceImpl struct {
sb *strings.Builder
c int
}

func NewService(sb *strings.Builder) ServiceImpl {
Expand All @@ -18,6 +19,7 @@ func NewService(sb *strings.Builder) ServiceImpl {
}

func (s *ServiceImpl) RegisterUser(ctx context.Context, name, phone string) {
s.c++
bus.Pub(events.UserRegisteredEvent{UserName: name, Phone: phone})
bus.Pub(&events.DummyEvent{}) // nobody listens on this one
bus.Pub(&events.DummyEvent{AlteredAsync: s.c%2 == 0}) // nobody listens on this one
}
11 changes: 0 additions & 11 deletions test_scenarios/request-reply-callback/events/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package events

import (
"fmt"
)

const RequestEventType = "RequestEvent"

type RequestEvent[T any] struct {
Payload T
Callback func() (*T, error)
Expand All @@ -19,11 +13,6 @@ func NewRequestEvent[T any](payload T) *RequestEvent[T] {
}
}

func (i *RequestEvent[T]) EventID() string {
var t T
return fmt.Sprintf("%s%T", RequestEventType, t)
}

func (i *RequestEvent[T]) Async() bool {
return true // this one is async
}
Loading

0 comments on commit ecf5448

Please sign in to comment.