Skip to content

Commit

Permalink
MF-1034 - Wrapping MQTT client (#1318)
Browse files Browse the repository at this point in the history
* use subscribe interface

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* delete old implementation

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* add subscribeToLoRaBroker method
change declaration

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* remove alias for package

Signed-off-by: Ivan Milosevic <iva@blokovi.com>
  • Loading branch information
blokovi authored Jan 15, 2021
1 parent a8c652f commit 0516fe2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 93 deletions.
72 changes: 38 additions & 34 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

mqttPaho "github.com/eclipse/paho.mqtt.golang"
r "github.com/go-redis/redis"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/lora/api"
"github.com/mainflux/mainflux/lora/mqtt"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/mqtt"
"github.com/mainflux/mainflux/pkg/messaging/nats"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
Expand All @@ -30,6 +33,7 @@ const (
defLogLevel = "error"
defHTTPPort = "8180"
defLoraMsgURL = "tcp://localhost:1883"
defSubTimeout = "30s" // 30 seconds
defNatsURL = "nats://localhost:4222"
defESURL = "localhost:6379"
defESPass = ""
Expand All @@ -41,6 +45,7 @@ const (

envHTTPPort = "MF_LORA_ADAPTER_HTTP_PORT"
envLoraMsgURL = "MF_LORA_ADAPTER_MESSAGES_URL"
envSubTimeout = "MF_LORA_ADAPTER_SUBSCRIBER_TIMEOUT"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_LORA_ADAPTER_LOG_LEVEL"
envESURL = "MF_THINGS_ES_URL"
Expand All @@ -61,6 +66,7 @@ type config struct {
httpPort string
loraMsgURL string
natsURL string
subTimeout time.Duration
logLevel string
esURL string
esPass string
Expand Down Expand Up @@ -95,8 +101,6 @@ func main() {
thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)

mqttConn := connectToMQTTBroker(cfg.loraMsgURL, logger)

svc := lora.New(pub, thingRM, chanRM)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand All @@ -115,7 +119,14 @@ func main() {
}, []string{"method"}),
)

go subscribeToLoRaBroker(svc, mqttConn, logger)
msub, err := mqtt.NewSubscriber(cfg.loraMsgURL, cfg.subTimeout, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create MQTT subscriber: %s", err))
os.Exit(1)
}

go subscribeToLoRaBroker(svc, msub, logger)

go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)

errs := make(chan error, 2)
Expand All @@ -133,9 +144,14 @@ func main() {
}

func loadConfig() config {
mqttTimeout, err := time.ParseDuration(mainflux.Env(envSubTimeout, defSubTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envSubTimeout, err.Error())
}
return config{
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
loraMsgURL: mainflux.Env(envLoraMsgURL, defLoraMsgURL),
subTimeout: mqttTimeout,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
esURL: mainflux.Env(envESURL, defESURL),
Expand All @@ -148,28 +164,6 @@ func loadConfig() config {
}
}

func connectToMQTTBroker(loraURL string, logger logger.Logger) mqttPaho.Client {
opts := mqttPaho.NewClientOptions()
opts.AddBroker(loraURL)
opts.SetUsername("")
opts.SetPassword("")
opts.SetOnConnectHandler(func(c mqttPaho.Client) {
logger.Info("Connected to Lora MQTT broker")
})
opts.SetConnectionLostHandler(func(c mqttPaho.Client, err error) {
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error()))
os.Exit(1)
})

client := mqttPaho.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error(fmt.Sprintf("Failed to connect to Lora MQTT broker: %s", token.Error()))
os.Exit(1)
}

return client
}

func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *r.Client {
db, err := strconv.Atoi(redisDB)
if err != nil {
Expand All @@ -184,20 +178,30 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}

func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, logger logger.Logger) {
mqtt := mqtt.NewBroker(svc, mc, logger)
logger.Info("Subscribed to Lora MQTT broker")
if err := mqtt.Subscribe(loraServerTopic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to Lora MQTT broker: %s", err))
func subscribeToLoRaBroker(svc lora.Service, msub messaging.Subscriber, logger logger.Logger) {
err := msub.Subscribe(loraServerTopic, func(msg messaging.Message) error {
var m lora.Message
if err := json.Unmarshal(msg.Payload, &m); err != nil {
logger.Warn(fmt.Sprintf("Failed to Unmarshal message: %s", err.Error()))
return err
}
if err := svc.Publish(context.Background(), "", m); err != nil {
return err
}
return nil
})
if err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to LoRa MQTT broker: %s", err))
os.Exit(1)
}
logger.Info("Subscribed to LoRa MQTT broker")
}

func subscribeToThingsES(svc lora.Service, client *r.Client, consumer string, logger logger.Logger) {
eventStore := redis.NewEventStore(svc, client, consumer, logger)
logger.Info("Subscribed to Redis Event Store")
if err := eventStore.Subscribe("mainflux.things"); err != nil {
logger.Warn(fmt.Sprintf("Lora-adapter service failed to subscribe to Redis event source: %s", err))
logger.Warn(fmt.Sprintf("LoRa-adapter service failed to subscribe to Redis event source: %s", err))
}
}

Expand All @@ -208,6 +212,6 @@ func newRouteMapRepositoy(client *r.Client, prefix string, logger logger.Logger)

func startHTTPServer(cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.httpPort)
logger.Info(fmt.Sprintf("lora-adapter service started, exposed port %s", cfg.httpPort))
logger.Info(fmt.Sprintf("LoRa-adapter service started, exposed port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, api.MakeHandler())
}
6 changes: 3 additions & 3 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ func main() {
}
defer nps.Close()

mp, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort), cfg.mqttForwarderTimeout)
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort), cfg.mqttForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create MQTT publisher: %s", err))
os.Exit(1)
}

fwd := mqtt.NewForwarder(nats.SubjectAllChannels, logger)
if err := fwd.Forward(nps, mp); err != nil {
if err := fwd.Forward(nps, mpub); err != nil {
logger.Error(fmt.Sprintf("Failed to forward NATS messages: %s", err))
os.Exit(1)
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func loadConfig() config {

mqttTimeout, err := time.ParseDuration(mainflux.Env(envMQTTForwarderTimeout, defMQTTForwarderTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
log.Fatalf("Invalid %s value: %s", envMQTTForwarderTimeout, err.Error())
}

return config{
Expand Down
56 changes: 0 additions & 56 deletions lora/mqtt/sub.go

This file was deleted.

0 comments on commit 0516fe2

Please sign in to comment.