Skip to content

Commit

Permalink
network: Improve PublishPersistentMessage method
Browse files Browse the repository at this point in the history
In order to send reply messages with the associated correlation ID in
the message properties, this patch improves the publish method to accept
an option structure. This structure replaces the headers parameter.
  • Loading branch information
esdrasjnr authored and netoax committed Jun 11, 2020
1 parent 7af2225 commit b4a9de8
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 79 deletions.
38 changes: 30 additions & 8 deletions pkg/network/amqp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package network

import (
"fmt"

"github.com/cenkalti/backoff/v4"

"github.com/CESARBR/knot-babeltower/pkg/logging"
Expand All @@ -21,11 +23,17 @@ type InMsg struct {
Exchange string
RoutingKey string
ReplyTo string
CorrelationId string
CorrelationID string
Headers map[string]interface{}
Body []byte
}

// MessageOptions represents the message publishing options
type MessageOptions struct {
Authorization string
CorrelationID string
}

// NewAmqp constructs the AMQP connection handler
func NewAmqp(url string, logger logging.Logger) *Amqp {
return &Amqp{url, logger, nil, nil, nil}
Expand Down Expand Up @@ -58,11 +66,25 @@ func (a *Amqp) Stop() {
}

// PublishPersistentMessage sends a persistent message to RabbitMQ
func (a *Amqp) PublishPersistentMessage(exchange, exchangeType, key string, body []byte, headers amqp.Table) error {
err := a.declareExchange(exchange, exchangeType)
func (a *Amqp) PublishPersistentMessage(exchange, exchangeType, key string, msg MessageSerializer, options *MessageOptions) error {
var headers map[string]interface{}
var corrID string

if options != nil {
headers = map[string]interface{}{
"Authorization": options.Authorization,
}
corrID = options.CorrelationID
}

body, err := msg.Serialize()
if err != nil {
a.logger.Error(err)
return err
return fmt.Errorf("error serializing message: %w", err)
}

err = a.declareExchange(exchange, exchangeType)
if err != nil {
return fmt.Errorf("error declaring exchange: %w", err)
}

err = a.channel.Publish(
Expand All @@ -74,14 +96,14 @@ func (a *Amqp) PublishPersistentMessage(exchange, exchangeType, key string, body
Headers: headers,
ContentType: "text/plain",
ContentEncoding: "",
Body: body,
DeliveryMode: amqp.Persistent,
Priority: 0,
CorrelationId: corrID,
Body: body,
},
)
if err != nil {
a.logger.Error(err)
return err
return fmt.Errorf("error publishing message in channel: %w", err)
}

return nil
Expand Down
31 changes: 30 additions & 1 deletion pkg/network/msg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
package network

import "github.com/CESARBR/knot-babeltower/pkg/thing/entities"
import (
"encoding/json"
"fmt"

"github.com/CESARBR/knot-babeltower/pkg/thing/entities"
)

// MessageSerializer represents a interface for KNoT messages
type MessageSerializer interface {
Serialize() ([]byte, error)
}

// message represents a KNoT message
type message struct {
Payload interface{}
}

// DeviceRegisterRequest represents the incoming register device request message
type DeviceRegisterRequest struct {
Expand Down Expand Up @@ -75,3 +90,17 @@ type DataSent struct {
ID string `json:"id"`
Data []entities.Data `json:"data"`
}

// NewMessage creates a message
func NewMessage(msg interface{}) MessageSerializer {
return message{Payload: msg}
}

// Serialize serializes the message in a byte stream
func (m message) Serialize() ([]byte, error) {
data, err := json.Marshal(&m.Payload)
if err != nil {
return nil, fmt.Errorf("error econding JSON message: %w", err)
}
return data, nil
}
4 changes: 2 additions & 2 deletions pkg/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func (mc *MsgHandler) handleClientMessages(msg network.InMsg, token string) erro
func (mc *MsgHandler) handleRequestReplyCommands(msg network.InMsg, token string) error {
switch msg.RoutingKey {
case bindingKeyAuthDevice:
return mc.thingController.AuthDevice(msg.Body, token, msg.ReplyTo, msg.CorrelationId)
return mc.thingController.AuthDevice(msg.Body, token, msg.ReplyTo, msg.CorrelationID)
case bindingKeyListDevices:
return mc.thingController.ListDevices(token, msg.ReplyTo, msg.CorrelationId)
return mc.thingController.ListDevices(token, msg.ReplyTo, msg.CorrelationID)
}

return nil
Expand Down
99 changes: 31 additions & 68 deletions pkg/thing/delivery/amqp/client.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package amqp

import (
"encoding/json"
"fmt"

"github.com/CESARBR/knot-babeltower/pkg/logging"
"github.com/CESARBR/knot-babeltower/pkg/network"
"github.com/CESARBR/knot-babeltower/pkg/thing/entities"
)

const (
exchangeDevices = "device"
exchangeDevicesType = "direct"
exchangeDevice = "device"
exchangeDeviceType = "direct"
exchangeDataPublished = "data.published"
exchangeDataPublishedType = "fanout"
registerOutKey = "device.registered"
Expand Down Expand Up @@ -61,110 +58,76 @@ func NewCommandSender(logger logging.Logger, amqp *network.Amqp) Sender {

// PublishRegisteredDevice publishes the registered device's credentials to the device registration queue
func (mp *msgClientPublisher) PublishRegisteredDevice(thingID, name, token string, err error) error {
mp.logger.Debug("sending registered message")
mp.logger.Debug("sending registered response")
errMsg := getErrMsg(err)
resp := &network.DeviceRegisteredResponse{ID: thingID, Name: name, Token: token, Error: errMsg}
msg, err := json.Marshal(resp)
if err != nil {
mp.logger.Error(err)
return err
}
msg := network.NewMessage(network.DeviceRegisteredResponse{ID: thingID, Name: name, Token: token, Error: errMsg})

return mp.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, registerOutKey, msg, nil)
return mp.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, registerOutKey, msg, nil)
}

// PublishUnregisteredDevice publishes the unregistered device's id and error message to the device unregistered queue
func (mp *msgClientPublisher) PublishUnregisteredDevice(thingID string, err error) error {
mp.logger.Debug("sending unregistered message")
mp.logger.Debug("sending unregistered response")
errMsg := getErrMsg(err)
resp := &network.DeviceUnregisteredResponse{ID: thingID, Error: errMsg}
msg, err := json.Marshal(resp)
if err != nil {
mp.logger.Error(err)
return err
}
msg := network.NewMessage(network.DeviceUnregisteredResponse{ID: thingID, Error: errMsg})

return mp.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, unregisterOutKey, msg, nil)
return mp.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, unregisterOutKey, msg, nil)
}

// PublishUpdatedSchema sends the updated schema response
func (mp *msgClientPublisher) PublishUpdatedSchema(thingID string, schema []entities.Schema, err error) error {
mp.logger.Debug("sending update schema response")
errMsg := getErrMsg(err)
resp := &network.SchemaUpdatedResponse{ID: thingID, Schema: schema, Error: errMsg}
msg, err := json.Marshal(resp)
if err != nil {
return err
}
msg := network.NewMessage(network.SchemaUpdatedResponse{ID: thingID, Schema: schema, Error: errMsg})

return mp.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, schemaOutKey, msg, nil)
return mp.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, schemaOutKey, msg, nil)
}

// PublishRequestData sends request data command
func (mp *msgClientPublisher) PublishRequestData(thingID string, sensorIds []int) error {
resp := &network.DataRequest{ID: thingID, SensorIds: sensorIds}
msg, err := json.Marshal(resp)
if err != nil {
return err
}

mp.logger.Debug("sending request data request")
msg := network.NewMessage(network.DataRequest{ID: thingID, SensorIds: sensorIds})
routingKey := "device." + thingID + "." + requestDataKey
return mp.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, routingKey, msg, nil)

return mp.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, routingKey, msg, nil)
}

// PublishUpdateData send update data command
func (mp *msgClientPublisher) PublishUpdateData(thingID string, data []entities.Data) error {
resp := &network.DataUpdate{ID: thingID, Data: data}
msg, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("message parsing error: %w", err)
}

mp.logger.Debug("sending update data request")
msg := network.NewMessage(network.DataUpdate{ID: thingID, Data: data})
routingKey := "device." + thingID + "." + updateDataKey
return mp.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, routingKey, msg, nil)

return mp.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, routingKey, msg, nil)
}

// SendAuthResponse sends the auth thing status response
func (cs *commandSender) SendAuthResponse(thingID string, replyTo, corrID string, err error) error {
cs.logger.Debug("sending auth device response")
errMsg := getErrMsg(err)
resp := &network.DeviceAuthResponse{ID: thingID, Error: errMsg}
headers := map[string]interface{}{
"correlation_id": corrID,
}
msg, err := json.Marshal(resp)
if err != nil {
return err
}
msg := network.NewMessage(network.DeviceAuthResponse{ID: thingID, Error: errMsg})
options := &network.MessageOptions{CorrelationID: corrID}

return cs.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, replyTo, msg, headers)
return cs.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, replyTo, msg, options)
}

// SendListResponse sends the list devices command response
func (cs *commandSender) SendListResponse(things []*entities.Thing, replyTo, corrID string, err error) error {
cs.logger.Debug("sending list devices response")
errMsg := getErrMsg(err)
resp := &network.DeviceListResponse{Things: things, Error: errMsg}
headers := map[string]interface{}{
"correlation_id": corrID,
}
msg, err := json.Marshal(resp)
if err != nil {
return err
}
msg := network.NewMessage(network.DeviceListResponse{Things: things, Error: errMsg})
options := &network.MessageOptions{CorrelationID: corrID}

return cs.amqp.PublishPersistentMessage(exchangeDevices, exchangeDevicesType, replyTo, msg, headers)
return cs.amqp.PublishPersistentMessage(exchangeDevice, exchangeDeviceType, replyTo, msg, options)
}

// PublishPublishedData send update data command
func (mp *msgClientPublisher) PublishPublishedData(thingID, token string, data []entities.Data) error {
resp := &network.DataSent{ID: thingID, Data: data}
headers := map[string]interface{}{
"Authorization": token,
}
msg, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("message parsing error: %w", err)
}
mp.logger.Debug("sending publish data request")
msg := network.NewMessage(network.DataSent{ID: thingID, Data: data})
options := &network.MessageOptions{Authorization: token}

return mp.amqp.PublishPersistentMessage(exchangeDataPublished, exchangeDataPublishedType, "", msg, headers)
return mp.amqp.PublishPersistentMessage(exchangeDataPublished, exchangeDataPublishedType, "", msg, options)
}

func getErrMsg(err error) *string {
Expand Down

0 comments on commit b4a9de8

Please sign in to comment.