Skip to content

Commit

Permalink
[NOD-380] Implement MQTT client in api-server (#468)
Browse files Browse the repository at this point in the history
* [NOD-380] Add MQTT to the project.

* [NOD-380] Add MQTT params to config.

* [NOD-380] Implement connecting to an mqtt broker.

* [NOD-380] Fix a comment.

* [NOD-380] Removed unnecessary option.

* [NOD-380] Added comments to MQTT functions.

* [NOD-380] Fix copy+paste error.

* [NOD-380] Make it so that all the mqtt flags must be passed together.

* [NOD-380] Use activeConfig instead of passing it everywhere.
  • Loading branch information
stasatdaglabs authored and Dan Aharoni committed Nov 14, 2019
1 parent 7d7df10 commit 1ce7f21
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 22 deletions.
32 changes: 20 additions & 12 deletions apiserver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ func ActiveConfig() *Config {

// Config defines the configuration options for the API server.
type Config struct {
LogDir string `long:"logdir" description:"Directory to log output."`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"`
DisableTLS bool `long:"notls" description:"Disable TLS"`
DBAddress string `long:"dbaddress" description:"Database address"`
DBUser string `long:"dbuser" description:"Database user" required:"true"`
DBPassword string `long:"dbpass" description:"Database password" required:"true"`
DBName string `long:"dbname" description:"Database name" required:"true"`
HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"`
Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The server will not start when using this flag."`
LogDir string `long:"logdir" description:"Directory to log output."`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"`
DisableTLS bool `long:"notls" description:"Disable TLS"`
DBAddress string `long:"dbaddress" description:"Database address"`
DBUser string `long:"dbuser" description:"Database user" required:"true"`
DBPassword string `long:"dbpass" description:"Database password" required:"true"`
DBName string `long:"dbname" description:"Database name" required:"true"`
HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"`
Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The server will not start when using this flag."`
MQTTBrokerAddress string `long:"mqttaddress" description:"MQTT broker address" required:"false"`
MQTTUser string `long:"mqttuser" description:"MQTT server user" required:"false"`
MQTTPassword string `long:"mqttpass" description:"MQTT server password" required:"false"`
config.NetworkFlags
}

Expand Down Expand Up @@ -77,6 +80,11 @@ func Parse() (*Config, error) {
return nil, errors.New("--cert should be omitted if --notls is used")
}

if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
return nil, errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
}

err = activeConfig.ResolveNetwork(parser)
if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions apiserver/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (l gormLogger) Print(v ...interface{}) {

// Connect connects to the database mentioned in
// config variable.
func Connect(cfg *config.Config) error {
connectionString := buildConnectionString(cfg)
func Connect() error {
connectionString := buildConnectionString()
migrator, driver, err := openMigrator(connectionString)
if err != nil {
return err
Expand Down Expand Up @@ -67,7 +67,8 @@ func Close() error {
return err
}

func buildConnectionString(cfg *config.Config) string {
func buildConnectionString() string {
cfg := config.ActiveConfig()
return fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True",
cfg.DBUser, cfg.DBPassword, cfg.DBAddress, cfg.DBName)
}
Expand Down Expand Up @@ -111,8 +112,8 @@ func openMigrator(connectionString string) (*migrate.Migrate, source.Driver, err
}

// Migrate database to the latest version.
func Migrate(cfg *config.Config) error {
connectionString := buildConnectionString(cfg)
func Migrate() error {
connectionString := buildConnectionString()
migrator, driver, err := openMigrator(connectionString)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion apiserver/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func Close() {
}

// Connect initiates a connection to the JSON-RPC API Server
func Connect(cfg *config.Config) error {
func Connect() error {
cfg := config.ActiveConfig()
var cert []byte
if !cfg.DisableTLS {
var err error
Expand Down
15 changes: 11 additions & 4 deletions apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/daglabs/btcd/apiserver/mqtt"
"github.com/pkg/errors"
"os"

Expand Down Expand Up @@ -31,14 +32,14 @@ func main() {
}

if cfg.Migrate {
err := database.Migrate(cfg)
err := database.Migrate()
if err != nil {
panic(errors.Errorf("Error migrating database: %s", err))
}
return
}

err = database.Connect(cfg)
err = database.Connect()
if err != nil {
panic(errors.Errorf("Error connecting to database: %s", err))
}
Expand All @@ -49,13 +50,19 @@ func main() {
}
}()

err = jsonrpc.Connect(cfg)
err = mqtt.Connect()
if err != nil {
panic(errors.Errorf("Error connecting to MQTT: %s", err))
}
defer mqtt.Close()

err = jsonrpc.Connect()
if err != nil {
panic(errors.Errorf("Error connecting to servers: %s", err))
}
defer jsonrpc.Close()

shutdownServer := server.Start(cfg.HTTPListen)
shutdownServer := server.Start(config.ActiveConfig().HTTPListen)
defer shutdownServer()

doneChan := make(chan struct{}, 1)
Expand Down
9 changes: 9 additions & 0 deletions apiserver/mqtt/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package mqtt

import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"

var (
log = logger.BackendLog.Logger("MQTT")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)
50 changes: 50 additions & 0 deletions apiserver/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mqtt

import (
"errors"
"github.com/daglabs/btcd/apiserver/config"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

// client is an instance of the MQTT client, in case we have an active connection
var client mqtt.Client

// GetClient returns an instance of the MQTT client, in case we have an active connection
func GetClient() (mqtt.Client, error) {
if client == nil {
return nil, errors.New("MQTT is not connected")
}
return client, nil
}

// Connect initiates a connection to the MQTT server, if defined
func Connect() error {
cfg := config.ActiveConfig()
if cfg.MQTTBrokerAddress == "" {
// MQTT broker not defined -- nothing to do
return nil
}

options := mqtt.NewClientOptions()
options.AddBroker(cfg.MQTTBrokerAddress)
options.SetUsername(cfg.MQTTUser)
options.SetPassword(cfg.MQTTPassword)
options.SetAutoReconnect(true)

newClient := mqtt.NewClient(options)
if token := newClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
client = newClient

return nil
}

// Close closes the connection to the MQTT server, if previously connected
func Close() {
if client == nil {
return
}
client.Disconnect(250)
client = nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
github.com/btcsuite/winsvc v1.0.0
github.com/davecgh/go-spew v1.1.1
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/golang-migrate/migrate/v4 v4.6.1
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc
github.com/gorilla/handlers v1.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
Expand Down

0 comments on commit 1ce7f21

Please sign in to comment.