From 1ce7f210267df676984041a39a390a482184ef5a Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Thu, 14 Nov 2019 10:44:45 +0200 Subject: [PATCH] [NOD-380] Implement MQTT client in api-server (#468) * [NOD-380] Add MQTT to the project. * [NOD-380] Add MQTT params to config. * [NOD-380] Implement connecting to an mqtt broker. * [NOD-380] Fix a comment. * [NOD-380] Removed unnecessary option. * [NOD-380] Added comments to MQTT functions. * [NOD-380] Fix copy+paste error. * [NOD-380] Make it so that all the mqtt flags must be passed together. * [NOD-380] Use activeConfig instead of passing it everywhere. --- apiserver/config/config.go | 32 ++++++++++++++-------- apiserver/database/database.go | 11 ++++---- apiserver/jsonrpc/client.go | 3 +- apiserver/main.go | 15 +++++++--- apiserver/mqtt/log.go | 9 ++++++ apiserver/mqtt/mqtt.go | 50 ++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ 8 files changed, 101 insertions(+), 22 deletions(-) create mode 100644 apiserver/mqtt/log.go create mode 100644 apiserver/mqtt/mqtt.go diff --git a/apiserver/config/config.go b/apiserver/config/config.go index 4ea74b1e..aacf7dd4 100644 --- a/apiserver/config/config.go +++ b/apiserver/config/config.go @@ -29,18 +29,21 @@ func ActiveConfig() *Config { // Config defines the configuration options for the API server. type Config struct { - LogDir string `long:"logdir" description:"Directory to log output."` - RPCUser string `short:"u" long:"rpcuser" description:"RPC username"` - RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"` - RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"` - RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"` - DisableTLS bool `long:"notls" description:"Disable TLS"` - DBAddress string `long:"dbaddress" description:"Database address"` - DBUser string `long:"dbuser" description:"Database user" required:"true"` - DBPassword string `long:"dbpass" description:"Database password" required:"true"` - DBName string `long:"dbname" description:"Database name" required:"true"` - HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"` - Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The server will not start when using this flag."` + LogDir string `long:"logdir" description:"Directory to log output."` + RPCUser string `short:"u" long:"rpcuser" description:"RPC username"` + RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"` + RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"` + RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"` + DisableTLS bool `long:"notls" description:"Disable TLS"` + DBAddress string `long:"dbaddress" description:"Database address"` + DBUser string `long:"dbuser" description:"Database user" required:"true"` + DBPassword string `long:"dbpass" description:"Database password" required:"true"` + DBName string `long:"dbname" description:"Database name" required:"true"` + HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"` + Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The server will not start when using this flag."` + MQTTBrokerAddress string `long:"mqttaddress" description:"MQTT broker address" required:"false"` + MQTTUser string `long:"mqttuser" description:"MQTT server user" required:"false"` + MQTTPassword string `long:"mqttpass" description:"MQTT server password" required:"false"` config.NetworkFlags } @@ -77,6 +80,11 @@ func Parse() (*Config, error) { return nil, errors.New("--cert should be omitted if --notls is used") } + if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") && + (activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") { + return nil, errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together") + } + err = activeConfig.ResolveNetwork(parser) if err != nil { return nil, err diff --git a/apiserver/database/database.go b/apiserver/database/database.go index 279bdc96..8d4b223a 100644 --- a/apiserver/database/database.go +++ b/apiserver/database/database.go @@ -33,8 +33,8 @@ func (l gormLogger) Print(v ...interface{}) { // Connect connects to the database mentioned in // config variable. -func Connect(cfg *config.Config) error { - connectionString := buildConnectionString(cfg) +func Connect() error { + connectionString := buildConnectionString() migrator, driver, err := openMigrator(connectionString) if err != nil { return err @@ -67,7 +67,8 @@ func Close() error { return err } -func buildConnectionString(cfg *config.Config) string { +func buildConnectionString() string { + cfg := config.ActiveConfig() return fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", cfg.DBUser, cfg.DBPassword, cfg.DBAddress, cfg.DBName) } @@ -111,8 +112,8 @@ func openMigrator(connectionString string) (*migrate.Migrate, source.Driver, err } // Migrate database to the latest version. -func Migrate(cfg *config.Config) error { - connectionString := buildConnectionString(cfg) +func Migrate() error { + connectionString := buildConnectionString() migrator, driver, err := openMigrator(connectionString) if err != nil { return err diff --git a/apiserver/jsonrpc/client.go b/apiserver/jsonrpc/client.go index fd8326f6..2abca400 100644 --- a/apiserver/jsonrpc/client.go +++ b/apiserver/jsonrpc/client.go @@ -54,7 +54,8 @@ func Close() { } // Connect initiates a connection to the JSON-RPC API Server -func Connect(cfg *config.Config) error { +func Connect() error { + cfg := config.ActiveConfig() var cert []byte if !cfg.DisableTLS { var err error diff --git a/apiserver/main.go b/apiserver/main.go index 1c51c43a..888c0900 100644 --- a/apiserver/main.go +++ b/apiserver/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/daglabs/btcd/apiserver/mqtt" "github.com/pkg/errors" "os" @@ -31,14 +32,14 @@ func main() { } if cfg.Migrate { - err := database.Migrate(cfg) + err := database.Migrate() if err != nil { panic(errors.Errorf("Error migrating database: %s", err)) } return } - err = database.Connect(cfg) + err = database.Connect() if err != nil { panic(errors.Errorf("Error connecting to database: %s", err)) } @@ -49,13 +50,19 @@ func main() { } }() - err = jsonrpc.Connect(cfg) + err = mqtt.Connect() + if err != nil { + panic(errors.Errorf("Error connecting to MQTT: %s", err)) + } + defer mqtt.Close() + + err = jsonrpc.Connect() if err != nil { panic(errors.Errorf("Error connecting to servers: %s", err)) } defer jsonrpc.Close() - shutdownServer := server.Start(cfg.HTTPListen) + shutdownServer := server.Start(config.ActiveConfig().HTTPListen) defer shutdownServer() doneChan := make(chan struct{}, 1) diff --git a/apiserver/mqtt/log.go b/apiserver/mqtt/log.go new file mode 100644 index 00000000..2fb33c1e --- /dev/null +++ b/apiserver/mqtt/log.go @@ -0,0 +1,9 @@ +package mqtt + +import "github.com/daglabs/btcd/util/panics" +import "github.com/daglabs/btcd/apiserver/logger" + +var ( + log = logger.BackendLog.Logger("MQTT") + spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog) +) diff --git a/apiserver/mqtt/mqtt.go b/apiserver/mqtt/mqtt.go new file mode 100644 index 00000000..80ec4a64 --- /dev/null +++ b/apiserver/mqtt/mqtt.go @@ -0,0 +1,50 @@ +package mqtt + +import ( + "errors" + "github.com/daglabs/btcd/apiserver/config" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// client is an instance of the MQTT client, in case we have an active connection +var client mqtt.Client + +// GetClient returns an instance of the MQTT client, in case we have an active connection +func GetClient() (mqtt.Client, error) { + if client == nil { + return nil, errors.New("MQTT is not connected") + } + return client, nil +} + +// Connect initiates a connection to the MQTT server, if defined +func Connect() error { + cfg := config.ActiveConfig() + if cfg.MQTTBrokerAddress == "" { + // MQTT broker not defined -- nothing to do + return nil + } + + options := mqtt.NewClientOptions() + options.AddBroker(cfg.MQTTBrokerAddress) + options.SetUsername(cfg.MQTTUser) + options.SetPassword(cfg.MQTTPassword) + options.SetAutoReconnect(true) + + newClient := mqtt.NewClient(options) + if token := newClient.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + client = newClient + + return nil +} + +// Close closes the connection to the MQTT server, if previously connected +func Close() { + if client == nil { + return + } + client.Disconnect(250) + client = nil +} diff --git a/go.mod b/go.mod index 1a8f8388..d491efa0 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 github.com/btcsuite/winsvc v1.0.0 github.com/davecgh/go-spew v1.1.1 + github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/golang-migrate/migrate/v4 v4.6.1 github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc github.com/gorilla/handlers v1.4.2 diff --git a/go.sum b/go.sum index ca81ae42..7b070022 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=