Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Use RabbitMQ as MQTT broker #2695

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check-license.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
CHECK=""
for file in $(grep -rl --exclude-dir={.git,build} \
--exclude=\*.{crt,key,pem,zed,hcl,md,json,csv,mod,sum,tmpl,args} \
--exclude={CODEOWNERS,LICENSE,MAINTAINERS} \
--exclude={CODEOWNERS,LICENSE,MAINTAINERS,enabled_plugins,rabbitmq.conf} \
.); do

if ! head -n 5 "$file" | grep -q "Copyright (c) Abstract Machines"; then
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

SMQ_DOCKER_IMAGE_NAME_PREFIX ?= supermq
BUILD_DIR ?= build
SERVICES = auth users clients groups channels domains http coap ws cli mqtt certs journal
SERVICES = auth users clients groups channels domains http coap ws cli mqtt certs journal rabbitmq-auth
TEST_API_SERVICES = journal auth certs http clients users channels groups domains
TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES))
DOCKERS = $(addprefix docker_,$(SERVICES))
Expand All @@ -29,13 +29,13 @@ INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed
ifneq ($(SMQ_MESSAGE_BROKER_TYPE),)
SMQ_MESSAGE_BROKER_TYPE := $(SMQ_MESSAGE_BROKER_TYPE)
else
SMQ_MESSAGE_BROKER_TYPE=nats
SMQ_MESSAGE_BROKER_TYPE=rabbitmq
endif

ifneq ($(SMQ_ES_TYPE),)
SMQ_ES_TYPE := $(SMQ_ES_TYPE)
else
SMQ_ES_TYPE=nats
SMQ_ES_TYPE=rabbitmq
endif

define compile_service
Expand Down
2 changes: 1 addition & 1 deletion cmd/auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type config struct {
SpicedbSchemaFile string `env:"SMQ_SPICEDB_SCHEMA_FILE" envDefault:"./docker/spicedb/schema.zed"`
SpicedbPreSharedKey string `env:"SMQ_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/channels/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type config struct {
InstanceID string `env:"SMQ_CHANNELS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
ESConsumerName string `env:"SMQ_CHANNELS_EVENT_CONSUMER" envDefault:"channels"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/clients/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type config struct {
CacheKeyDuration time.Duration `env:"SMQ_CLIENTS_CACHE_KEY_DURATION" envDefault:"10m"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
ESConsumerName string `env:"SMQ_CLIENTS_EVENT_CONSUMER" envDefault:"clients"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"`
Expand Down
4 changes: 2 additions & 2 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ const (

type config struct {
LogLevel string `env:"SMQ_COAP_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/domains/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type config struct {
SpicedbSchemaFile string `env:"SMQ_SPICEDB_SCHEMA_FILE" envDefault:"schema.zed"`
SpicedbPreSharedKey string `env:"SMQ_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/groups/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type config struct {
InstanceID string `env:"SMQ_GROUPS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
ESConsumerName string `env:"SMQ_GROUPS_EVENT_CONSUMER" envDefault:"groups"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"`
Expand Down
4 changes: 2 additions & 2 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ const (

type config struct {
LogLevel string `env:"SMQ_HTTP_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/journal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (

type config struct {
LogLevel string `env:"SMQ_JOURNAL_LOG_LEVEL" envDefault:"info"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_JOURNAL_INSTANCE_ID" envDefault:""`
Expand Down
97 changes: 81 additions & 16 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ import (
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/mqtt/events"
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/cenkalti/backoff/v4"
"github.com/eclipse/paho.mqtt.golang/packets"
"golang.org/x/sync/errgroup"
)

Expand All @@ -48,21 +51,26 @@ const (
)

type config struct {
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetUsername string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME" envDefault:""`
MQTTTargetPassword string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD" envDefault:""`
MQTTForwarderTimeout time.Duration `env:"SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}

func main() {
Expand Down Expand Up @@ -121,6 +129,38 @@ func main() {
}()
tracer := tp.Tracer(svcName)

bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
defer mpub.Close()

mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}

np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
Expand Down Expand Up @@ -185,7 +225,10 @@ func main() {
go chc.CallHome(ctx)
}

var interceptor session.Interceptor
interceptor := interceptor{
username: cfg.MQTTTargetUsername,
password: cfg.MQTTTargetPassword,
}
Comment on lines +228 to +231
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this interceptor to set username and password?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RabbitMQ runs in two modes, with and without authentication. For our case devices, i.e, clients always have authentication (thingID and thingKey). So we either add the RabbitMQ username and password when sending the request or remove the username and password when forwarding the request to RabbitMQ

logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h, interceptor)
Expand Down Expand Up @@ -281,3 +324,25 @@ func stopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *s
return nil
}
}

type interceptor struct {
username string
password string
}

func (ic interceptor) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
if connectPkt, ok := pkt.(*packets.ConnectPacket); ok {
if ic.username != "" {
connectPkt.Username = ic.username
connectPkt.UsernameFlag = true
}
if ic.password != "" {
connectPkt.Password = []byte(ic.password)
connectPkt.PasswordFlag = true
}

return connectPkt, nil
}

return pkt, nil
}
2 changes: 1 addition & 1 deletion cmd/users/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type config struct {
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_USERS_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SelfRegister bool `env:"SMQ_USERS_ALLOW_SELF_REGISTER" envDefault:"false"`
OAuthUIRedirectURL string `env:"SMQ_OAUTH_UI_REDIRECT_URL" envDefault:"http://localhost:9095/domains"`
Expand Down
4 changes: 2 additions & 2 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ const (

type config struct {
LogLevel string `env:"SMQ_WS_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"`
}

func main() {
Expand Down
Loading
Loading