Skip to content

Commit

Permalink
Implement AMQP with built-in balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
casskir committed Sep 30, 2019
1 parent c9a016b commit bf198b6
Show file tree
Hide file tree
Showing 12 changed files with 577 additions and 3 deletions.
52 changes: 52 additions & 0 deletions examples/amqp_transporter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"fmt"
"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/broker"
"github.com/moleculer-go/moleculer/payload"
"github.com/moleculer-go/moleculer/transit/amqp"
"github.com/sirupsen/logrus"
"sync"
"time"
)

var mathService = moleculer.ServiceSchema{
Name: "math",
Actions: []moleculer.Action{
{
Name: "add",
Handler: func(ctx moleculer.Context, params moleculer.Payload) interface{} {
return params.Get("a").Int() + params.Get("b").Int()
},
},
},
}

func main() {
amqpConfig := amqp.AmqpOptions{
Url: []string{"amqp://guest:guest@localhost:5672"},
AutoDeleteQueues: 20 * time.Second,
Logger: logrus.WithField("transport", "amqp"),
}

config := moleculer.Config{
LogLevel: "debug",
TransporterFactory: func() interface{} {
return amqp.CreateAmqpTransporter(amqpConfig)
},
}

var bkr = broker.New(&config)
bkr.Publish(mathService)
bkr.Start()
result := <-bkr.Call("math.add", payload.New(map[string]int{
"a": 10,
"b": 130,
}))
fmt.Println("result: ", result.Int()) //$ result: 140

wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/pkg/errors v0.8.1
github.com/prometheus/procfs v0.0.0-20190503130316-740c07785007 // indirect
github.com/sirupsen/logrus v1.4.1
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -104,6 +105,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
Expand Down
6 changes: 3 additions & 3 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,11 @@ func (registry *ServiceRegistry) AddLocalService(service *service.Service) {
}
registry.localNode.Publish(service.AsMap())
registry.logger.Debug("Registry published local service: ", service.FullName(), " # actions: ", len(actions), " # events: ", len(events), " nodeID: ", service.NodeID())
registry.notifyServiceAded(service.Summary())
registry.notifyServiceAdded(service.Summary())
}

// notifyServiceAded notify when a service is added to the registry.
func (registry *ServiceRegistry) notifyServiceAded(svc map[string]string) {
// notifyServiceAdded notify when a service is added to the registry.
func (registry *ServiceRegistry) notifyServiceAdded(svc map[string]string) {
if registry.broker.IsStarted() {
registry.broker.Bus().EmitAsync(
"$registry.service.added",
Expand Down
Loading

0 comments on commit bf198b6

Please sign in to comment.