diff --git a/.github/workflows/check-license.yaml b/.github/workflows/check-license.yaml index 2977dd9b1e..3d2eb3380d 100644 --- a/.github/workflows/check-license.yaml +++ b/.github/workflows/check-license.yaml @@ -23,7 +23,7 @@ jobs: CHECK="" for file in $(grep -rl --exclude-dir={.git,build} \ --exclude=\*.{crt,key,pem,zed,hcl,md,json,csv,mod,sum,tmpl,args} \ - --exclude={CODEOWNERS,LICENSE,MAINTAINERS} \ + --exclude={CODEOWNERS,LICENSE,MAINTAINERS,enabled_plugins,rabbitmq.conf} \ .); do if ! head -n 5 "$file" | grep -q "Copyright (c) Abstract Machines"; then diff --git a/Makefile b/Makefile index 5318a3c3ba..31fe525fad 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ SMQ_DOCKER_IMAGE_NAME_PREFIX ?= supermq BUILD_DIR ?= build -SERVICES = auth users clients groups channels domains http coap ws cli mqtt certs journal +SERVICES = auth users clients groups channels domains http coap ws cli mqtt certs journal rabbitmq-auth TEST_API_SERVICES = journal auth certs http clients users channels groups domains TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES)) DOCKERS = $(addprefix docker_,$(SERVICES)) @@ -29,13 +29,13 @@ INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed ifneq ($(SMQ_MESSAGE_BROKER_TYPE),) SMQ_MESSAGE_BROKER_TYPE := $(SMQ_MESSAGE_BROKER_TYPE) else - SMQ_MESSAGE_BROKER_TYPE=nats + SMQ_MESSAGE_BROKER_TYPE=rabbitmq endif ifneq ($(SMQ_ES_TYPE),) SMQ_ES_TYPE := $(SMQ_ES_TYPE) else - SMQ_ES_TYPE=nats + SMQ_ES_TYPE=rabbitmq endif define compile_service diff --git a/cmd/auth/main.go b/cmd/auth/main.go index c5ef6d2b5e..1e12744e2a 100644 --- a/cmd/auth/main.go +++ b/cmd/auth/main.go @@ -74,7 +74,7 @@ type config struct { SpicedbSchemaFile string `env:"SMQ_SPICEDB_SCHEMA_FILE" envDefault:"./docker/spicedb/schema.zed"` SpicedbPreSharedKey string `env:"SMQ_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` } func main() { diff --git a/cmd/channels/main.go b/cmd/channels/main.go index 8b9c56c08d..d41ca17c72 100644 --- a/cmd/channels/main.go +++ b/cmd/channels/main.go @@ -79,7 +79,7 @@ type config struct { InstanceID string `env:"SMQ_CHANNELS_INSTANCE_ID" envDefault:""` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` ESConsumerName string `env:"SMQ_CHANNELS_EVENT_CONSUMER" envDefault:"channels"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"` diff --git a/cmd/clients/main.go b/cmd/clients/main.go index b3030976cf..02453d37d0 100644 --- a/cmd/clients/main.go +++ b/cmd/clients/main.go @@ -87,7 +87,7 @@ type config struct { CacheKeyDuration time.Duration `env:"SMQ_CLIENTS_CACHE_KEY_DURATION" envDefault:"10m"` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` ESConsumerName string `env:"SMQ_CLIENTS_EVENT_CONSUMER" envDefault:"clients"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"` diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 8dae202479..b3837b27d2 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -43,12 +43,12 @@ const ( type config struct { LogLevel string `env:"SMQ_COAP_ADAPTER_LOG_LEVEL" envDefault:"info"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` } func main() { diff --git a/cmd/domains/main.go b/cmd/domains/main.go index 0a4195380e..36014fc3aa 100644 --- a/cmd/domains/main.go +++ b/cmd/domains/main.go @@ -79,7 +79,7 @@ type config struct { SpicedbSchemaFile string `env:"SMQ_SPICEDB_SCHEMA_FILE" envDefault:"schema.zed"` SpicedbPreSharedKey string `env:"SMQ_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` } func main() { diff --git a/cmd/groups/main.go b/cmd/groups/main.go index a4fa60d32c..85ab9975da 100644 --- a/cmd/groups/main.go +++ b/cmd/groups/main.go @@ -78,7 +78,7 @@ type config struct { InstanceID string `env:"SMQ_GROUPS_INSTANCE_ID" envDefault:""` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` ESConsumerName string `env:"SMQ_GROUPS_EVENT_CONSUMER" envDefault:"groups"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"` diff --git a/cmd/http/main.go b/cmd/http/main.go index 4b03eb2426..4ca040c685 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -55,12 +55,12 @@ const ( type config struct { LogLevel string `env:"SMQ_HTTP_ADAPTER_LOG_LEVEL" envDefault:"info"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` } func main() { diff --git a/cmd/journal/main.go b/cmd/journal/main.go index 4401644c1c..65fd1dfea4 100644 --- a/cmd/journal/main.go +++ b/cmd/journal/main.go @@ -51,7 +51,7 @@ const ( type config struct { LogLevel string `env:"SMQ_JOURNAL_LOG_LEVEL" envDefault:"info"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"SMQ_JOURNAL_INSTANCE_ID" envDefault:""` diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index 0e713513b4..e2829dc786 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -26,6 +26,7 @@ import ( smqlog "github.com/absmach/supermq/logger" "github.com/absmach/supermq/mqtt" "github.com/absmach/supermq/mqtt/events" + mqtttracing "github.com/absmach/supermq/mqtt/tracing" "github.com/absmach/supermq/pkg/errors" "github.com/absmach/supermq/pkg/grpcclient" jaegerclient "github.com/absmach/supermq/pkg/jaeger" @@ -33,10 +34,12 @@ import ( brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" msgevents "github.com/absmach/supermq/pkg/messaging/events" "github.com/absmach/supermq/pkg/messaging/handler" + mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt" "github.com/absmach/supermq/pkg/server" "github.com/absmach/supermq/pkg/uuid" "github.com/caarlos0/env/v11" "github.com/cenkalti/backoff/v4" + "github.com/eclipse/paho.mqtt.golang/packets" "golang.org/x/sync/errgroup" ) @@ -48,21 +51,26 @@ const ( ) type config struct { - LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` - MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` - MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` - MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` - MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` - HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` - HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` - HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` - Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""` - JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` - SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` - TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` + LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` + MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` + MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` + MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` + MQTTTargetUsername string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME" envDefault:""` + MQTTTargetPassword string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD" envDefault:""` + MQTTForwarderTimeout time.Duration `env:"SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` + MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` + MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"` + HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` + HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` + HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` + HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` + Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""` + JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` + SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` + InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` + TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` } func main() { @@ -121,6 +129,38 @@ func main() { }() tracer := tp.Tracer(svcName) + bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) + exitCode = 1 + return + } + defer bsub.Close() + bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub) + + mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout) + if err != nil { + logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err)) + exitCode = 1 + return + } + defer mpub.Close() + + mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL) + if err != nil { + logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err)) + exitCode = 1 + return + } + + fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger) + fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels) + if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil { + logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err)) + exitCode = 1 + return + } + np, err := brokers.NewPublisher(ctx, cfg.BrokerURL) if err != nil { logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) @@ -185,7 +225,10 @@ func main() { go chc.CallHome(ctx) } - var interceptor session.Interceptor + interceptor := interceptor{ + username: cfg.MQTTTargetUsername, + password: cfg.MQTTTargetPassword, + } logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort)) g.Go(func() error { return proxyMQTT(ctx, cfg, logger, h, interceptor) @@ -281,3 +324,25 @@ func stopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *s return nil } } + +type interceptor struct { + username string + password string +} + +func (ic interceptor) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) { + if connectPkt, ok := pkt.(*packets.ConnectPacket); ok { + if ic.username != "" { + connectPkt.Username = ic.username + connectPkt.UsernameFlag = true + } + if ic.password != "" { + connectPkt.Password = []byte(ic.password) + connectPkt.PasswordFlag = true + } + + return connectPkt, nil + } + + return pkt, nil +} diff --git a/cmd/users/main.go b/cmd/users/main.go index 275a477358..2514529a29 100644 --- a/cmd/users/main.go +++ b/cmd/users/main.go @@ -78,7 +78,7 @@ type config struct { JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"SMQ_USERS_INSTANCE_ID" envDefault:""` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SelfRegister bool `env:"SMQ_USERS_ALLOW_SELF_REGISTER" envDefault:"false"` OAuthUIRedirectURL string `env:"SMQ_OAUTH_UI_REDIRECT_URL" envDefault:"http://localhost:9095/domains"` diff --git a/cmd/ws/main.go b/cmd/ws/main.go index 8dc5223f92..f83a5ff2b8 100644 --- a/cmd/ws/main.go +++ b/cmd/ws/main.go @@ -51,12 +51,12 @@ const ( type config struct { LogLevel string `env:"SMQ_WS_ADAPTER_LOG_LEVEL" envDefault:"info"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESURL string `env:"SMQ_ES_URL" envDefault:"amqp://guest:guest@rabbitmq:5672/"` } func main() { diff --git a/coap/README.md b/coap/README.md index 24810dbf5b..eaa3631bf7 100644 --- a/coap/README.md +++ b/coap/README.md @@ -6,27 +6,27 @@ SuperMQ CoAP adapter provides an [CoAP](http://coap.technology/) API for sending The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | -| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info | -| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" | -| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 | -| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" | -| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" | -| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" | -| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 | -| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" | -| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" | -| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | -| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | -| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | -| SMQ_JAEGER_URL | Jaeger server URL | | -| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true | -| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" | +| Variable | Description | Default | +| ---------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- | +| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info | +| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" | +| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 | +| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" | +| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" | +| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" | +| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 | +| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" | +| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" | +| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | +| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | +| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | +| SMQ_JAEGER_URL | Jaeger server URL | | +| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | +| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true | +| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" | ## Deployment @@ -62,7 +62,7 @@ SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT="" \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY="" \ SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS="" \ -SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \ +SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \ SMQ_JAEGER_URL=http://localhost:14268/api/traces \ SMQ_JAEGER_TRACE_RATIO=1.0 \ SMQ_SEND_TELEMETRY=true \ diff --git a/docker/.env b/docker/.env index 7e006a36e3..58ba6a3e0a 100644 --- a/docker/.env +++ b/docker/.env @@ -20,6 +20,7 @@ SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT} # Configs for nats as MQTT broker SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz SMQ_NATS_WS_TARGET_PATH= +SMQ_NATS_MQTT_QOS=1 ## RabbitMQ SMQ_RABBITMQ_PORT=5672 @@ -29,19 +30,25 @@ SMQ_RABBITMQ_PASS=supermq SMQ_RABBITMQ_COOKIE=supermq SMQ_RABBITMQ_VHOST=/ SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ_RABBITMQ_PORT}${SMQ_RABBITMQ_VHOST} +SMQ_RABBITMQ_MQTT_QOS=2 +SMQ_RABBITMQ_WS_TARGET_PATH=/ ## Message Broker -SMQ_MESSAGE_BROKER_TYPE=nats -SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL} +SMQ_MESSAGE_BROKER_TYPE=rabbitmq +SMQ_MESSAGE_BROKER_URL=${SMQ_RABBITMQ_URL} ## MQTT Broker -SMQ_MQTT_BROKER_TYPE=nats -SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} +SMQ_MQTT_BROKER_TYPE=rabbitmq +SMQ_MQTT_BROKER_HEALTH_CHECK= +SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS} SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 +SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER} +SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS} SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK} SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 +SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH} ## Redis SMQ_REDIS_TCP_PORT=6379 @@ -60,7 +67,7 @@ SMQ_JAEGER_TRACE_RATIO=1.0 SMQ_JAEGER_MEMORY_MAX_TRACES=5000 ## Call home -SMQ_SEND_TELEMETRY=false +SMQ_SEND_TELEMETRY=true ## Postgres SMQ_POSTGRES_MAX_CONNECTIONS=100 @@ -327,6 +334,7 @@ SMQ_HTTP_ADAPTER_INSTANCE_ID= ### MQTT SMQ_MQTT_ADAPTER_LOG_LEVEL=debug SMQ_MQTT_ADAPTER_MQTT_PORT=1883 +SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s SMQ_MQTT_ADAPTER_WS_PORT=8080 SMQ_MQTT_ADAPTER_INSTANCE= SMQ_MQTT_ADAPTER_INSTANCE_ID= diff --git a/docker/README.md b/docker/README.md index 34d8148c5e..3f9292a6f6 100644 --- a/docker/README.md +++ b/docker/README.md @@ -26,46 +26,60 @@ To pull docker images from a specific release you need to change the value of `S SuperMQ supports configurable MQTT broker and Message broker, which also acts as an events store. SuperMQ uses two types of brokers: -1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This is NATS. +1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This is RabbitMQ 2. MESSAGE_BROKER: Manages message exchange between SuperMQ core, optional, and external services. This can either be 'NATS' or 'RabbitMQ'. This is used to store messages for distributed processing. Events store: This is used by SuperMQ services to store events for distributed processing. SuperMQ uses a single service to be the message broker and events store. This can either be 'NATS' or 'RabbitMQ'. Redis can also be used as an events store, but it requires a message broker to be deployed along with it for message exchange. -This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. +This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker. -The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use NATS as a MQTT_BROKER, MESSAGE_BROKER and EVENTS_STORE. +The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use RabbitMQ as a MQTT_BROKER and RabbitMQ as a MESSAGE_BROKER and EVENTS_STORE. Therefore, the following combinations are possible: -- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ -- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis -- MESSAGE_BROKER: NATS, EVENTS_STORE: NATS -- MESSAGE_BROKER: NATS, EVENTS_STORE: Redis +- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS +- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis +- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ +- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis -For Message brokers other than NATS, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker: +For Message brokers other than RabbitMQ, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker: ```bash -SMQ_MESSAGE_BROKER_TYPE=rabbitmq make dockers +SMQ_MESSAGE_BROKER_TYPE=nats make dockers ``` ```env -SMQ_MESSAGE_BROKER_TYPE=rabbitmq -SMQ_MESSAGE_BROKER_URL=${SMQ_RABBITMQ_URL} +SMQ_MESSAGE_BROKER_TYPE=nats +SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL} ``` For Redis as an events store, you would need to run RabbitMQ or NATS as a message broker. For example, to use Redis as an events store with rabbitmq as a message broker: ```bash -SMQ_ES_TYPE=redis SMQ_MESSAGE_BROKER_TYPE=rabbitmq make dockers +SMQ_ES_TYPE=redis SMQ_MESSAGE_BROKER_TYPE=nats make dockers ``` ```env -SMQ_MESSAGE_BROKER_TYPE=rabbitmq -SMQ_MESSAGE_BROKER_URL=${SMQ_RABBITMQ_URL} +SMQ_MESSAGE_BROKER_TYPE=nats +SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL} SMQ_ES_TYPE=redis SMQ_ES_URL=${SMQ_REDIS_URL} ``` +For MQTT broker other than RabbitMQ, you would need to change the `docker/.env`. For example, to use NATS as a MQTT broker: + +```env +SMQ_MQTT_BROKER_TYPE=nats +SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK} +SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS} +SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} +SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 +SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK} +SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE} +SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 +SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_WS_TARGET_PATH} +``` + ### RabbitMQ configuration ```yaml @@ -107,9 +121,9 @@ By using environment variables file at `docker/.env` you can modify the below gi `SMQ_NGINX_SERVER_NAME` environmental variable is used to configure nginx directive `server_name`. If environmental variable `SMQ_NGINX_SERVER_NAME` is empty then default value `localhost` will set to `server_name`. -`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned. +`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned. -`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned. +`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned. `SMQ_NGINX_SERVER_CLIENT_CA` environmental variable is used to configure nginx directive `ssl_client_certificate`. If environmental variable `SMQ_NGINX_SERVER_CLIENT_CA` is empty then by default certificate in the path `docker/ssl/certs/ca.crt` will be assigned. diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 476cef3bc4..b510acdf9a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -14,6 +14,7 @@ volumes: supermq-channels-db-volume: supermq-clients-redis-volume: supermq-broker-volume: + supermq-mqtt-broker-volume: supermq-spicedb-db-volume: supermq-auth-db-volume: supermq-pat-db-volume: @@ -364,7 +365,7 @@ services: - clients-db - users - auth - - nats + - rabbitmq restart: on-failure environment: SMQ_CLIENTS_LOG_LEVEL: ${SMQ_CLIENTS_LOG_LEVEL} @@ -523,7 +524,7 @@ services: - channels-db - users - auth - - nats + - rabbitmq restart: on-failure environment: SMQ_CHANNELS_LOG_LEVEL: ${SMQ_CHANNELS_LOG_LEVEL} @@ -683,7 +684,7 @@ services: depends_on: - users-db - auth - - nats + - rabbitmq restart: on-failure environment: SMQ_USERS_LOG_LEVEL: ${SMQ_USERS_LOG_LEVEL} @@ -789,7 +790,7 @@ services: depends_on: - groups-db - auth - - nats + - rabbitmq restart: on-failure environment: SMQ_GROUPS_LOG_LEVEL: ${SMQ_GROUPS_LOG_LEVEL} @@ -869,7 +870,7 @@ services: create_host_path: true jaeger: - image: jaegertracing/all-in-one:1.60 + image: jaegertracing/all-in-one:1.66.0 container_name: supermq-jaeger environment: COLLECTOR_OTLP_ENABLED: ${SMQ_JAEGER_COLLECTOR_OTLP_ENABLED} @@ -885,18 +886,23 @@ services: container_name: supermq-mqtt depends_on: - clients - - nats + - rabbitmq restart: on-failure environment: SMQ_MQTT_ADAPTER_LOG_LEVEL: ${SMQ_MQTT_ADAPTER_LOG_LEVEL} SMQ_MQTT_ADAPTER_MQTT_PORT: ${SMQ_MQTT_ADAPTER_MQTT_PORT} SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST} SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT} + SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME} + SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD} + SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT: ${SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT} SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK} + SMQ_MQTT_ADAPTER_MQTT_QOS: ${SMQ_MQTT_ADAPTER_MQTT_QOS} SMQ_MQTT_ADAPTER_WS_PORT: ${SMQ_MQTT_ADAPTER_WS_PORT} SMQ_MQTT_ADAPTER_INSTANCE_ID: ${SMQ_MQTT_ADAPTER_INSTANCE_ID} SMQ_MQTT_ADAPTER_WS_TARGET_HOST: ${SMQ_MQTT_ADAPTER_WS_TARGET_HOST} SMQ_MQTT_ADAPTER_WS_TARGET_PORT: ${SMQ_MQTT_ADAPTER_WS_TARGET_PORT} + SMQ_MQTT_ADAPTER_WS_TARGET_PATH: ${SMQ_MQTT_ADAPTER_WS_TARGET_PATH} SMQ_MQTT_ADAPTER_INSTANCE: ${SMQ_MQTT_ADAPTER_INSTANCE} SMQ_ES_URL: ${SMQ_ES_URL} SMQ_CLIENTS_AUTH_GRPC_URL: ${SMQ_CLIENTS_AUTH_GRPC_URL} @@ -954,7 +960,7 @@ services: container_name: supermq-http depends_on: - clients - - nats + - rabbitmq restart: on-failure environment: SMQ_HTTP_ADAPTER_LOG_LEVEL: ${SMQ_HTTP_ADAPTER_LOG_LEVEL} @@ -1042,7 +1048,7 @@ services: container_name: supermq-coap depends_on: - clients - - nats + - rabbitmq restart: on-failure environment: SMQ_COAP_ADAPTER_LOG_LEVEL: ${SMQ_COAP_ADAPTER_LOG_LEVEL} @@ -1119,7 +1125,7 @@ services: container_name: supermq-ws depends_on: - clients - - nats + - rabbitmq restart: on-failure environment: SMQ_WS_ADAPTER_LOG_LEVEL: ${SMQ_WS_ADAPTER_LOG_LEVEL} @@ -1202,20 +1208,22 @@ services: bind: create_host_path: true - nats: - image: nats:2.10.9-alpine - container_name: supermq-nats + rabbitmq: + image: rabbitmq:4.0.5-alpine + container_name: supermq-rabbitmq restart: on-failure - command: "--config=/etc/nats/nats.conf" environment: - - SMQ_NATS_PORT=${SMQ_NATS_PORT} - - SMQ_NATS_HTTP_PORT=${SMQ_NATS_HTTP_PORT} - - SMQ_NATS_JETSTREAM_KEY=${SMQ_NATS_JETSTREAM_KEY} + RABBITMQ_ERLANG_COOKIE: ${SMQ_RABBITMQ_COOKIE} + RABBITMQ_DEFAULT_USER: ${SMQ_RABBITMQ_USER} + RABBITMQ_DEFAULT_PASS: ${SMQ_RABBITMQ_PASS} + RABBITMQ_DEFAULT_VHOST: ${SMQ_RABBITMQ_VHOST} + RABBITMQ_CONFIG_FILES: /etc/rabbitmq/conf.d/ ports: - - ${SMQ_NATS_PORT}:${SMQ_NATS_PORT} - - ${SMQ_NATS_HTTP_PORT}:${SMQ_NATS_HTTP_PORT} + - ${SMQ_RABBITMQ_PORT}:${SMQ_RABBITMQ_PORT} + - ${SMQ_RABBITMQ_HTTP_PORT}:${SMQ_RABBITMQ_HTTP_PORT} volumes: - - supermq-broker-volume:/data - - ./nats:/etc/nats + - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/conf.d/10-defaults.conf + - supermq-mqtt-broker-volume:/var/lib/rabbitmq networks: - supermq-base-net diff --git a/docker/rabbitmq/enabled_plugins b/docker/rabbitmq/enabled_plugins new file mode 100644 index 0000000000..5358cb0171 --- /dev/null +++ b/docker/rabbitmq/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management,rabbitmq_mqtt]. diff --git a/docker/rabbitmq/rabbitmq.conf b/docker/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000000..31e326c6b1 --- /dev/null +++ b/docker/rabbitmq/rabbitmq.conf @@ -0,0 +1,15 @@ +## DEFAULT SETTINGS ARE NOT MEANT TO BE TAKEN STRAIGHT INTO PRODUCTION +## see https://www.rabbitmq.com/configure.html for further information +## on configuring RabbitMQ + +## allow access to the guest user from anywhere on the network +## https://www.rabbitmq.com/access-control.html#loopback-users +## https://www.rabbitmq.com/production-checklist.html#users +loopback_users.guest = false + +## Send all logs to stdout/TTY. Necessary to see logs when running via +## a container +log.console = true + +## Enable anonymous connection +mqtt.allow_anonymous = true diff --git a/go.mod b/go.mod index c9f688f67c..82487afbc8 100644 --- a/go.mod +++ b/go.mod @@ -13,13 +13,14 @@ require ( github.com/authzed/spicedb v1.40.0 github.com/caarlos0/env/v11 v11.3.1 github.com/cenkalti/backoff/v4 v4.3.0 + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/fatih/color v1.18.0 github.com/go-chi/chi/v5 v5.2.1 github.com/go-kit/kit v0.13.0 - github.com/gofrs/uuid/v5 v5.3.0 + github.com/gofrs/uuid/v5 v5.3.1 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - github.com/hashicorp/vault/api v1.15.0 + github.com/hashicorp/vault/api v1.16.0 github.com/hashicorp/vault/api/auth/approle v0.8.0 github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f github.com/jackc/pgtype v1.14.4 @@ -51,7 +52,7 @@ require ( golang.org/x/crypto v0.33.0 golang.org/x/oauth2 v0.26.0 golang.org/x/sync v0.11.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 + google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.5 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df @@ -79,7 +80,6 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dsnet/golib/memfile v1.0.0 // indirect - github.com/eclipse/paho.mqtt.golang v1.5.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -96,7 +96,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -110,7 +110,6 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v4 v4.18.3 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/jzelinskie/stringz v0.0.3 // indirect @@ -126,7 +125,7 @@ require ( github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect @@ -145,7 +144,7 @@ require ( github.com/samber/lo v1.49.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/smarty/assertions v1.15.0 // indirect + github.com/smarty/assertions v1.16.0 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -157,12 +156,12 @@ require ( go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect - golang.org/x/net v0.34.0 // indirect + golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect + golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.22.0 // indirect - golang.org/x/time v0.9.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 // indirect + golang.org/x/time v0.10.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index cd8c8ddca6..ff8df1547c 100644 --- a/go.sum +++ b/go.sum @@ -129,8 +129,8 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= -github.com/gofrs/uuid/v5 v5.3.0 h1:m0mUMr+oVYUdxpMLgSYCZiXe7PuVPnI94+OMeVBNedk= -github.com/gofrs/uuid/v5 v5.3.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= +github.com/gofrs/uuid/v5 v5.3.1 h1:aPx49MwJbekCzOyhZDjJVb0hx3A0KLjlbLx6p2gY0p0= +github.com/gofrs/uuid/v5 v5.3.1/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -156,8 +156,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0/go.mod h1:4EgsQoS4TOhJizV+JTFg40qx1Ofh3XmXEQNBpgvNT40= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -179,8 +179,8 @@ github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hashicorp/vault/api v1.15.0 h1:O24FYQCWwhwKnF7CuSqP30S51rTV7vz1iACXE/pj5DA= -github.com/hashicorp/vault/api v1.15.0/go.mod h1:+5YTO09JGn0u+b6ySD/LLVf8WkJCPLAL2Vkmrn2+CM8= +github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5LhwQN4= +github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= github.com/hashicorp/vault/api/auth/approle v0.8.0 h1:FuVtWZ0xD6+wz1x0l5s0b4852RmVXQNEiKhVXt6lfQY= github.com/hashicorp/vault/api/auth/approle v0.8.0/go.mod h1:NV7O9r5JUtNdVnqVZeMHva81AIdpG0WoIQohNt1VCPM= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= @@ -232,9 +232,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= +github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= -github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= -github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -309,8 +308,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= @@ -383,8 +382,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= -github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= +github.com/smarty/assertions v1.16.0 h1:EvHNkdRA4QHMrn75NZSoUQ/mAUXAYWfatfB01yTCzfY= +github.com/smarty/assertions v1.16.0/go.mod h1:duaaFdCS0K9dnoM50iyek/eYINOZ64gbh1Xlf6LG7AI= github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= @@ -485,8 +484,8 @@ golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZP golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= -golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= +golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs= +golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -511,8 +510,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE= golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -569,8 +568,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -599,10 +598,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 h1:A2ni10G3UlplFrWdCDJTl7D7mJ7GSRm37S+PDimaKRw= -google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= +google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6 h1:L9JNMl/plZH9wmzQUHleO/ZZDSN+9Gh41wPczNy+5Fk= +google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6 h1:2duwAxN2+k0xLNpjnHTXoMUgnv6VPSp5fiqTuwSxjmI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/http/README.md b/http/README.md index e0be9d0236..8ef497f6b0 100644 --- a/http/README.md +++ b/http/README.md @@ -6,23 +6,23 @@ HTTP adapter provides an HTTP API for sending messages through the platform. The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | -| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info | -| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" | -| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 | -| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" | -| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | -| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | -| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | -| SMQ_JAEGER_URL | Jaeger server URL | | -| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | -| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" | +| Variable | Description | Default | +| ---------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- | +| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info | +| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" | +| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 | +| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" | +| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | +| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | +| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | +| SMQ_JAEGER_URL | Jaeger server URL | | +| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | +| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | +| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" | ## Deployment @@ -54,7 +54,7 @@ SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT="" \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY="" \ SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS="" \ -SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \ +SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \ SMQ_JAEGER_URL=http://localhost:14268/api/traces \ SMQ_JAEGER_TRACE_RATIO=1.0 \ SMQ_SEND_TELEMETRY=true \ diff --git a/mqtt/README.md b/mqtt/README.md index 467cf090b5..fa34c7d5b5 100644 --- a/mqtt/README.md +++ b/mqtt/README.md @@ -6,28 +6,31 @@ MQTT adapter provides an MQTT API for sending messages through the platform. MQT The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | -| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info | -| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 | -| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost | -| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 | -| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" | -| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 | -| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost | -| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 | -| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" | -| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | -| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | -| SMQ_ES_URL | Event sourcing URL | | -| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | -| SMQ_JAEGER_URL | Jaeger server URL | | -| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | -| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" | +| Variable | Description | Default | +| ----------------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- | +| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info | +| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 | +| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost | +| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 | +| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 | +| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s | +| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" | +| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 | +| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost | +| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 | +| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt | +| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" | +| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | +| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | +| SMQ_ES_URL | Event sourcing URL | | +| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | +| SMQ_JAEGER_URL | Jaeger server URL | | +| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | +| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | +| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" | ## Deployment @@ -53,18 +56,21 @@ SMQ_MQTT_ADAPTER_LOG_LEVEL=info \ SMQ_MQTT_ADAPTER_MQTT_PORT=1883 \ SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=localhost \ SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 \ +SMQ_MQTT_ADAPTER_MQTT_QOS=1 \ +SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s \ SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK="" \ SMQ_MQTT_ADAPTER_WS_PORT=8080 \ SMQ_MQTT_ADAPTER_WS_TARGET_HOST=localhost \ SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 \ +SMQ_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \ SMQ_MQTT_ADAPTER_INSTANCE="" \ SMQ_CLIENTS_AUTH_GRPC_URL=localhost:7000 \ SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT="" \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY="" \ SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS="" \ -SMQ_ES_URL=nats://localhost:4222 \ -SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \ +SMQ_ES_URL=amqp://guest:guest@rabbitmq:5672/ \ +SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \ SMQ_JAEGER_URL=http://localhost:14268/api/traces \ SMQ_JAEGER_TRACE_RATIO=1.0 \ SMQ_SEND_TELEMETRY=true \ diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go new file mode 100644 index 0000000000..323854809c --- /dev/null +++ b/mqtt/forwarder.go @@ -0,0 +1,75 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package mqtt + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/absmach/supermq/pkg/messaging" +) + +// Forwarder specifies MQTT forwarder interface API. +type Forwarder interface { + // Forward subscribes to the Subscriber and + // publishes messages using provided Publisher. + Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error +} + +type forwarder struct { + topic string + logger *slog.Logger +} + +// NewForwarder returns new Forwarder implementation. +func NewForwarder(topic string, logger *slog.Logger) Forwarder { + return forwarder{ + topic: topic, + logger: logger, + } +} + +func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { + subCfg := messaging.SubscriberConfig{ + ID: id, + Topic: f.topic, + Handler: handle(ctx, pub, f.logger), + } + + return sub.Subscribe(ctx, subCfg) +} + +func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) handleFunc { + return func(msg *messaging.Message) error { + if msg.GetProtocol() == protocol { + return nil + } + // Use concatenation instead of fmt.Sprintf for the + // sake of simplicity and performance. + topic := "channels/" + msg.GetChannel() + "/messages" + if msg.GetSubtopic() != "" { + topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/") + } + + go func() { + if err := pub.Publish(ctx, topic, msg); err != nil { + logger.Warn(fmt.Sprintf("Failed to forward message: %s", err)) + } + }() + + return nil + } +} + +type handleFunc func(msg *messaging.Message) error + +func (h handleFunc) Handle(msg *messaging.Message) error { + return h(msg) +} + +func (h handleFunc) Cancel() error { + return nil +} diff --git a/mqtt/tracing/doc.go b/mqtt/tracing/doc.go new file mode 100644 index 0000000000..557d3934f1 --- /dev/null +++ b/mqtt/tracing/doc.go @@ -0,0 +1,12 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package tracing provides tracing instrumentation for SuperMQ MQTT adapter service. +// +// This package provides tracing middleware for SuperMQ MQTT adapter service. +// It can be used to trace incoming requests and add tracing capabilities to +// SuperMQ MQTT adapter service. +// +// For more details about tracing instrumentation for SuperMQ messaging refer +// to the documentation at https://docs.supermq.abstractmachines.fr/tracing/. +package tracing diff --git a/mqtt/tracing/forwarder.go b/mqtt/tracing/forwarder.go new file mode 100644 index 0000000000..c4db29ecbf --- /dev/null +++ b/mqtt/tracing/forwarder.go @@ -0,0 +1,63 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package tracing + +import ( + "context" + "fmt" + + "github.com/absmach/supermq/mqtt" + "github.com/absmach/supermq/pkg/messaging" + "github.com/absmach/supermq/pkg/server" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const forwardOP = "process" + +var _ mqtt.Forwarder = (*forwarderMiddleware)(nil) + +type forwarderMiddleware struct { + topic string + forwarder mqtt.Forwarder + tracer trace.Tracer + host server.Config +} + +// New creates new mqtt forwarder tracing middleware. +func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, topic string) mqtt.Forwarder { + return &forwarderMiddleware{ + forwarder: forwarder, + tracer: tracer, + topic: topic, + host: config, + } +} + +// Forward traces mqtt forward operations. +func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { + subject := fmt.Sprintf("channels.%s.messages", fm.topic) + spanName := fmt.Sprintf("%s %s", subject, forwardOP) + + ctx, span := fm.tracer.Start(ctx, + spanName, + trace.WithAttributes( + attribute.String("messaging.system", "mqtt"), + attribute.Bool("messaging.destination.anonymous", false), + attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"), + attribute.Bool("messaging.destination.temporary", true), + attribute.String("network.protocol.name", "mqtt"), + attribute.String("network.protocol.version", "3.1.1"), + attribute.String("network.transport", "tcp"), + attribute.String("network.type", "ipv4"), + attribute.String("messaging.operation", forwardOP), + attribute.String("messaging.client_id", id), + attribute.String("server.address", fm.host.Host), + attribute.String("server.socket.port", fm.host.Port), + ), + ) + defer span.End() + + return fm.forwarder.Forward(ctx, id, sub, pub) +} diff --git a/pkg/messaging/README.md b/pkg/messaging/README.md index f8b07f8eb1..6c5e8a042b 100644 --- a/pkg/messaging/README.md +++ b/pkg/messaging/README.md @@ -1,8 +1,8 @@ # Messaging -`messaging` package defines `Publisher`, `Subscriber` and an aggregate `Pubsub` interface. +`messaging` package defines `Publisher`, `Subscriber` and an aggregate `Pubsub` interface. -`Subscriber` interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ. +`Subscriber` interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ. `Publisher` interface defines methods used to publish messages to a message broker such as MQTT or NATS or RabbitMQ. diff --git a/pkg/messaging/brokers/brokers_nats.go b/pkg/messaging/brokers/brokers_nats.go index 2a7186e3ec..9fa57379ae 100644 --- a/pkg/messaging/brokers/brokers_nats.go +++ b/pkg/messaging/brokers/brokers_nats.go @@ -1,8 +1,8 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !rabbitmq -// +build !rabbitmq +//go:build nats +// +build nats package brokers diff --git a/pkg/messaging/brokers/brokers_rabbitmq.go b/pkg/messaging/brokers/brokers_rabbitmq.go index 8c3b7dd5f7..c339cb246a 100644 --- a/pkg/messaging/brokers/brokers_rabbitmq.go +++ b/pkg/messaging/brokers/brokers_rabbitmq.go @@ -1,8 +1,8 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build rabbitmq -// +build rabbitmq +//go:build !nats +// +build !nats package brokers diff --git a/pkg/messaging/brokers/tracing/brokers_nats.go b/pkg/messaging/brokers/tracing/brokers_nats.go index 9bca4a6a72..3693db0e21 100644 --- a/pkg/messaging/brokers/tracing/brokers_nats.go +++ b/pkg/messaging/brokers/tracing/brokers_nats.go @@ -1,8 +1,8 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !rabbitmq -// +build !rabbitmq +//go:build nats +// +build nats package brokers diff --git a/pkg/messaging/brokers/tracing/brokers_rabbitmq.go b/pkg/messaging/brokers/tracing/brokers_rabbitmq.go index 90d4567cc8..c914cc091d 100644 --- a/pkg/messaging/brokers/tracing/brokers_rabbitmq.go +++ b/pkg/messaging/brokers/tracing/brokers_rabbitmq.go @@ -1,9 +1,9 @@ -//go:build rabbitmq -// +build rabbitmq - // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 +//go:build !nats +// +build !nats + package brokers import ( diff --git a/pkg/messaging/mqtt/docs.go b/pkg/messaging/mqtt/docs.go new file mode 100644 index 0000000000..2afbf14587 --- /dev/null +++ b/pkg/messaging/mqtt/docs.go @@ -0,0 +1,11 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package mqtt hold the implementation of the Publisher and PubSub +// interfaces for the MQTT messaging system, the internal messaging +// broker of the SuperMQ IoT platform. Due to the practical requirements +// implementation Publisher is created alongside PubSub. The reason for +// this is that Subscriber implementation of MQTT brings the burden of +// additional struct fields which are not used by Publisher. Subscriber +// is not implemented separately because PubSub can be used where Subscriber is needed. +package mqtt diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go new file mode 100644 index 0000000000..ad51a04b89 --- /dev/null +++ b/pkg/messaging/mqtt/publisher.go @@ -0,0 +1,61 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package mqtt + +import ( + "context" + "errors" + "time" + + "github.com/absmach/supermq/pkg/messaging" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +var errPublishTimeout = errors.New("failed to publish due to timeout reached") + +var _ messaging.Publisher = (*publisher)(nil) + +type publisher struct { + client mqtt.Client + timeout time.Duration + qos uint8 +} + +// NewPublisher returns a new MQTT message publisher. +func NewPublisher(address, username, password string, qos uint8, timeout time.Duration) (messaging.Publisher, error) { + client, err := newClient(address, username, password, "mqtt-publisher", timeout) + if err != nil { + return nil, err + } + + ret := publisher{ + client: client, + timeout: timeout, + qos: qos, + } + return ret, nil +} + +func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { + if topic == "" { + return ErrEmptyTopic + } + + // Publish only the payload and not the whole message. + token := pub.client.Publish(topic, byte(pub.qos), false, msg.GetPayload()) + if token.Error() != nil { + return token.Error() + } + + if ok := token.WaitTimeout(pub.timeout); !ok { + return errPublishTimeout + } + + return nil +} + +func (pub publisher) Close() error { + pub.client.Disconnect(uint(pub.timeout)) + return nil +} diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go new file mode 100644 index 0000000000..273652eca6 --- /dev/null +++ b/pkg/messaging/mqtt/pubsub.go @@ -0,0 +1,235 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package mqtt + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/absmach/supermq/pkg/messaging" + mqtt "github.com/eclipse/paho.mqtt.golang" + "google.golang.org/protobuf/proto" +) + +var ( + // ErrConnect indicates that connection to MQTT broker failed. + ErrConnect = errors.New("failed to connect to MQTT broker") + + // errSubscribeTimeout indicates that the subscription failed due to timeout. + errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") + + // errUnsubscribeTimeout indicates that unsubscribe failed due to timeout. + errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") + + // errUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted. + errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic") + + // ErrNotSubscribed indicates that the topic is not subscribed to. + ErrNotSubscribed = errors.New("not subscribed") + + // ErrEmptyTopic indicates the absence of topic. + ErrEmptyTopic = errors.New("empty topic") + + // ErrEmptyID indicates the absence of ID. + ErrEmptyID = errors.New("empty ID") +) + +var _ messaging.PubSub = (*pubsub)(nil) + +type subscription struct { + client mqtt.Client + topics []string + cancel func() error +} + +type pubsub struct { + publisher + logger *slog.Logger + mu sync.RWMutex + address string + username string + password string + timeout time.Duration + subscriptions map[string]subscription +} + +// NewPubSub returns MQTT message publisher/subscriber. +func NewPubSub(url, username, password string, qos uint8, timeout time.Duration, logger *slog.Logger) (messaging.PubSub, error) { + client, err := newClient(url, username, password, "mqtt-publisher", timeout) + if err != nil { + return nil, err + } + ret := &pubsub{ + publisher: publisher{ + client: client, + timeout: timeout, + qos: qos, + }, + address: url, + username: username, + password: password, + timeout: timeout, + logger: logger, + subscriptions: make(map[string]subscription), + } + return ret, nil +} + +func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + if cfg.ID == "" { + return ErrEmptyID + } + if cfg.Topic == "" { + return ErrEmptyTopic + } + ps.mu.Lock() + defer ps.mu.Unlock() + + s, ok := ps.subscriptions[cfg.ID] + // If the client exists, check if it's subscribed to the topic and unsubscribe if needed. + switch ok { + case true: + if ok := s.contains(cfg.Topic); ok { + if err := s.unsubscribe(cfg.Topic, ps.timeout); err != nil { + return err + } + } + default: + client, err := newClient(ps.address, ps.username, ps.password, cfg.ID, ps.timeout) + if err != nil { + return err + } + s = subscription{ + client: client, + topics: []string{}, + cancel: cfg.Handler.Cancel, + } + } + s.topics = append(s.topics, cfg.Topic) + ps.subscriptions[cfg.ID] = s + + token := s.client.Subscribe(cfg.Topic, byte(ps.qos), ps.mqttHandler(cfg.Handler)) + if token.Error() != nil { + return token.Error() + } + if ok := token.WaitTimeout(ps.timeout); !ok { + return errSubscribeTimeout + } + + return nil +} + +func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { + if id == "" { + return ErrEmptyID + } + if topic == "" { + return ErrEmptyTopic + } + ps.mu.Lock() + defer ps.mu.Unlock() + + s, ok := ps.subscriptions[id] + if !ok || !s.contains(topic) { + return ErrNotSubscribed + } + + if err := s.unsubscribe(topic, ps.timeout); err != nil { + return err + } + ps.subscriptions[id] = s + + if len(s.topics) == 0 { + delete(ps.subscriptions, id) + } + return nil +} + +func (s *subscription) unsubscribe(topic string, timeout time.Duration) error { + if s.cancel != nil { + if err := s.cancel(); err != nil { + return err + } + } + + token := s.client.Unsubscribe(topic) + if token.Error() != nil { + return token.Error() + } + + if ok := token.WaitTimeout(timeout); !ok { + return errUnsubscribeTimeout + } + if ok := s.delete(topic); !ok { + return errUnsubscribeDeleteTopic + } + return token.Error() +} + +func newClient(address, username, password, id string, timeout time.Duration) (mqtt.Client, error) { + opts := mqtt.NewClientOptions(). + SetUsername(username). + SetPassword(password). + SetConnectRetry(true). + SetAutoReconnect(true). + AddBroker(address). + SetClientID(id) + client := mqtt.NewClient(opts) + token := client.Connect() + if token.Error() != nil { + return nil, token.Error() + } + + if ok := token.WaitTimeout(timeout); !ok { + return nil, ErrConnect + } + + return client, nil +} + +func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler { + return func(_ mqtt.Client, m mqtt.Message) { + var msg messaging.Message + if err := proto.Unmarshal(m.Payload(), &msg); err != nil { + ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) + return + } + + if err := h.Handle(&msg); err != nil { + ps.logger.Warn(fmt.Sprintf("Failed to handle SuperMQ message: %s", err)) + } + } +} + +// Contains checks if a topic is present. +func (s subscription) contains(topic string) bool { + return s.indexOf(topic) != -1 +} + +// Finds the index of an item in the topics. +func (s subscription) indexOf(element string) int { + for k, v := range s.topics { + if element == v { + return k + } + } + return -1 +} + +// Deletes a topic from the slice. +func (s *subscription) delete(topic string) bool { + index := s.indexOf(topic) + if index == -1 { + return false + } + topics := make([]string, len(s.topics)-1) + copy(topics[:index], s.topics[:index]) + copy(topics[index:], s.topics[index+1:]) + s.topics = topics + return true +} diff --git a/pkg/messaging/mqtt/pubsub_test.go b/pkg/messaging/mqtt/pubsub_test.go new file mode 100644 index 0000000000..9835dfb485 --- /dev/null +++ b/pkg/messaging/mqtt/pubsub_test.go @@ -0,0 +1,474 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package mqtt_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/absmach/supermq/pkg/messaging" + mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +const ( + topic = "topic" + chansPrefix = "channels" + channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" + subtopic = "engine" + tokenTimeout = 100 * time.Millisecond +) + +var data = []byte("payload") + +// ErrFailedHandleMessage indicates that the message couldn't be handled. +var errFailedHandleMessage = errors.New("failed to handle supermq message") + +func TestPublisher(t *testing.T) { + msgChan := make(chan []byte) + + // Subscribing with topic, and with subtopic, so that we can publish messages. + client, err := newClient(address, "clientID1", brokerTimeout) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + token := client.Subscribe(topic, qos, func(_ mqtt.Client, m mqtt.Message) { + msgChan <- m.Payload() + }) + if ok := token.WaitTimeout(tokenTimeout); !ok { + assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", topic)) + } + assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) + + token = client.Subscribe(fmt.Sprintf("%s.%s", topic, subtopic), qos, func(_ mqtt.Client, m mqtt.Message) { + msgChan <- m.Payload() + }) + if ok := token.WaitTimeout(tokenTimeout); !ok { + assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", fmt.Sprintf("%s.%s", topic, subtopic))) + } + assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) + + t.Cleanup(func() { + token := client.Unsubscribe(topic, fmt.Sprintf("%s.%s", topic, subtopic)) + token.WaitTimeout(tokenTimeout) + assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) + + client.Disconnect(100) + }) + + // Test publish with an empty topic. + err = pubsub.Publish(context.TODO(), "", &messaging.Message{Payload: data}) + assert.Equal(t, err, mqttpubsub.ErrEmptyTopic, fmt.Sprintf("Publish with empty topic: expected: %s, got: %s", mqttpubsub.ErrEmptyTopic, err)) + + cases := []struct { + desc string + channel string + subtopic string + payload []byte + }{ + { + desc: "publish message with nil payload", + payload: nil, + }, + { + desc: "publish message with string payload", + payload: data, + }, + { + desc: "publish message with channel", + payload: data, + channel: channel, + }, + { + desc: "publish message with subtopic", + payload: data, + subtopic: subtopic, + }, + { + desc: "publish message with channel and subtopic", + payload: data, + channel: channel, + subtopic: subtopic, + }, + } + for _, tc := range cases { + expectedMsg := messaging.Message{ + Publisher: "clientID11", + Channel: tc.channel, + Subtopic: tc.subtopic, + Payload: tc.payload, + } + + err := pubsub.Publish(context.TODO(), topic, &expectedMsg) + assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) + + data, err := proto.Marshal(&expectedMsg) + assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) + + receivedMsg := <-msgChan + if tc.payload != nil { + assert.Equal(t, expectedMsg.GetPayload(), receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg)) + } + } +} + +func TestSubscribe(t *testing.T) { + msgChan := make(chan *messaging.Message) + + // Creating client to Publish messages to subscribed topic. + client, err := newClient(address, "supermq", brokerTimeout) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + t.Cleanup(func() { + client.Unsubscribe() + client.Disconnect(100) + }) + + cases := []struct { + desc string + topic string + clientID string + err error + handler messaging.MessageHandler + }{ + { + desc: "Subscribe to a topic with an ID", + topic: topic, + clientID: "clientid1", + err: nil, + handler: handler{false, "clientid1", msgChan}, + }, + { + desc: "Subscribe to the same topic with a different ID", + topic: topic, + clientID: "clientid2", + err: nil, + handler: handler{false, "clientid2", msgChan}, + }, + { + desc: "Subscribe to an already subscribed topic with an ID", + topic: topic, + clientID: "clientid1", + err: nil, + handler: handler{false, "clientid1", msgChan}, + }, + { + desc: "Subscribe to a topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s", topic, subtopic), + clientID: "clientid1", + err: nil, + handler: handler{false, "clientid1", msgChan}, + }, + { + desc: "Subscribe to an already subscribed topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s", topic, subtopic), + clientID: "clientid1", + err: nil, + handler: handler{false, "clientid1", msgChan}, + }, + { + desc: "Subscribe to an empty topic with an ID", + topic: "", + clientID: "clientid1", + err: mqttpubsub.ErrEmptyTopic, + handler: handler{false, "clientid1", msgChan}, + }, + { + desc: "Subscribe to a topic with empty id", + topic: topic, + clientID: "", + err: mqttpubsub.ErrEmptyID, + handler: handler{false, "", msgChan}, + }, + } + for _, tc := range cases { + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + err = pubsub.Subscribe(context.TODO(), subCfg) + assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) + + if tc.err == nil { + expectedMsg := messaging.Message{ + Publisher: "clientID1", + Channel: channel, + Subtopic: subtopic, + Payload: data, + } + data, err := proto.Marshal(&expectedMsg) + assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) + + token := client.Publish(tc.topic, qos, false, data) + token.WaitTimeout(tokenTimeout) + assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error())) + + receivedMsg := <-msgChan + assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + } + } +} + +func TestPubSub(t *testing.T) { + msgChan := make(chan *messaging.Message) + + cases := []struct { + desc string + topic string + clientID string + err error + handler messaging.MessageHandler + }{ + { + desc: "Subscribe to a topic with an ID", + topic: topic, + clientID: "clientid7", + err: nil, + handler: handler{false, "clientid7", msgChan}, + }, + { + desc: "Subscribe to the same topic with a different ID", + topic: topic, + clientID: "clientid8", + err: nil, + handler: handler{false, "clientid8", msgChan}, + }, + { + desc: "Subscribe to a topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s", topic, subtopic), + clientID: "clientid7", + err: nil, + handler: handler{false, "clientid7", msgChan}, + }, + { + desc: "Subscribe to an empty topic with an ID", + topic: "", + clientID: "clientid7", + err: mqttpubsub.ErrEmptyTopic, + handler: handler{false, "clientid7", msgChan}, + }, + { + desc: "Subscribe to a topic with empty id", + topic: topic, + clientID: "", + err: mqttpubsub.ErrEmptyID, + handler: handler{false, "", msgChan}, + }, + } + for _, tc := range cases { + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + err := pubsub.Subscribe(context.TODO(), subCfg) + assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) + + if tc.err == nil { + // Use pubsub to subscribe to a topic, and then publish messages to that topic. + expectedMsg := messaging.Message{ + Publisher: "clientID", + Channel: channel, + Subtopic: subtopic, + Payload: data, + } + data, err := proto.Marshal(&expectedMsg) + assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err)) + + msg := messaging.Message{ + Payload: data, + } + // Publish message, and then receive it on message channel. + err = pubsub.Publish(context.TODO(), topic, &msg) + assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) + + receivedMsg := <-msgChan + assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) + } + } +} + +func TestUnsubscribe(t *testing.T) { + msgChan := make(chan *messaging.Message) + + cases := []struct { + desc string + topic string + clientID string + err error + subscribe bool // True for subscribe and false for unsubscribe. + handler messaging.MessageHandler + }{ + { + desc: "Subscribe to a topic with an ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "clientid4", + err: nil, + subscribe: true, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Subscribe to the same topic with a different ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "clientid9", + err: nil, + subscribe: true, + handler: handler{false, "clientid9", msgChan}, + }, + { + desc: "Unsubscribe from a topic with an ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "clientid4", + err: nil, + subscribe: false, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Unsubscribe from same topic with different ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "clientid9", + err: nil, + subscribe: false, + handler: handler{false, "clientid9", msgChan}, + }, + { + desc: "Unsubscribe from a non-existent topic with an ID", + topic: "h", + clientID: "clientid4", + err: mqttpubsub.ErrNotSubscribed, + subscribe: false, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Unsubscribe from an already unsubscribed topic with an ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "clientid4", + err: mqttpubsub.ErrNotSubscribed, + subscribe: false, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Subscribe to a topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), + clientID: "clientidd4", + err: nil, + subscribe: true, + handler: handler{false, "clientidd4", msgChan}, + }, + { + desc: "Unsubscribe from a topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), + clientID: "clientidd4", + err: nil, + subscribe: false, + handler: handler{false, "clientidd4", msgChan}, + }, + { + desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID", + topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), + clientID: "clientid4", + err: mqttpubsub.ErrNotSubscribed, + subscribe: false, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Unsubscribe from an empty topic with an ID", + topic: "", + clientID: "clientid4", + err: mqttpubsub.ErrEmptyTopic, + subscribe: false, + handler: handler{false, "clientid4", msgChan}, + }, + { + desc: "Unsubscribe from a topic with empty ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic), + clientID: "", + err: mqttpubsub.ErrEmptyID, + subscribe: false, + handler: handler{false, "", msgChan}, + }, + { + desc: "Subscribe to a new topic with an ID", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"), + clientID: "clientid55", + err: nil, + subscribe: true, + handler: handler{true, "clientid5", msgChan}, + }, + { + desc: "Unsubscribe from a topic with an ID with failing handler", + topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"), + clientID: "clientid55", + err: errFailedHandleMessage, + subscribe: false, + handler: handler{true, "clientid5", msgChan}, + }, + { + desc: "Subscribe to a new topic with subtopic with an ID", + topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic), + clientID: "clientid55", + err: nil, + subscribe: true, + handler: handler{true, "clientid5", msgChan}, + }, + { + desc: "Unsubscribe from a topic with subtopic with an ID with failing handler", + topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic), + clientID: "clientid55", + err: errFailedHandleMessage, + subscribe: false, + handler: handler{true, "clientid5", msgChan}, + }, + } + for _, tc := range cases { + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + switch tc.subscribe { + case true: + err := pubsub.Subscribe(context.TODO(), subCfg) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) + default: + err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) + } + } +} + +type handler struct { + fail bool + publisher string + msgChan chan *messaging.Message +} + +func (h handler) Handle(msg *messaging.Message) error { + if msg.GetPublisher() != h.publisher { + h.msgChan <- msg + } + return nil +} + +func (h handler) Cancel() error { + if h.fail { + return errFailedHandleMessage + } + return nil +} diff --git a/pkg/messaging/mqtt/setup_test.go b/pkg/messaging/mqtt/setup_test.go new file mode 100644 index 0000000000..4a90012e01 --- /dev/null +++ b/pkg/messaging/mqtt/setup_test.go @@ -0,0 +1,121 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package mqtt_test + +import ( + "fmt" + "log" + "log/slog" + "os" + "os/signal" + "syscall" + "testing" + "time" + + smqlog "github.com/absmach/supermq/logger" + "github.com/absmach/supermq/pkg/messaging" + mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" +) + +var ( + pubsub messaging.PubSub + logger *slog.Logger + address string +) + +const ( + username = "supermq-mqtt" + qos = 2 + port = "1883/tcp" + brokerTimeout = 30 * time.Second + poolMaxWait = 120 * time.Second +) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + container, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "eclipse-mosquitto", + Tag: "1.6.15", + }, func(config *docker.HostConfig) { + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + + handleInterrupt(pool, container) + + address = fmt.Sprintf("%s:%s", "localhost", container.GetPort(port)) + pool.MaxWait = poolMaxWait + + logger, err = smqlog.New(os.Stdout, "debug") + if err != nil { + log.Fatal(err.Error()) + } + + if err := pool.Retry(func() error { + pubsub, err = mqttpubsub.NewPubSub(address, "supermq", "", 2, brokerTimeout, logger) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + code := m.Run() + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) + + defer func() { + err = pubsub.Close() + if err != nil { + log.Fatal(err.Error()) + } + }() +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} + +func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) { + opts := mqtt.NewClientOptions(). + SetUsername(username). + AddBroker(address). + SetClientID(id) + + client := mqtt.NewClient(opts) + token := client.Connect() + if token.Error() != nil { + return nil, token.Error() + } + + ok := token.WaitTimeout(timeout) + if !ok { + return nil, mqttpubsub.ErrConnect + } + + if token.Error() != nil { + return nil, token.Error() + } + + return client, nil +} diff --git a/tools/config/golangci.yml b/tools/config/golangci.yml index 8c1b5f0177..1033af94a1 100644 --- a/tools/config/golangci.yml +++ b/tools/config/golangci.yml @@ -4,7 +4,7 @@ run: timeout: 10m build-tags: - - "nats" + - "rabbitmq" issues: max-issues-per-linter: 100 diff --git a/users/README.md b/users/README.md index 4db609d4d8..f1557eea4e 100644 --- a/users/README.md +++ b/users/README.md @@ -12,49 +12,49 @@ For in-depth explanation of the aforementioned scenarios, as well as thorough un The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ------------------------------ | ----------------------------------------------------------------------- | --------------------------------- | -| SMQ_USERS_LOG_LEVEL | Log level for users service (debug, info, warn, error) | info | -| SMQ_USERS_ADMIN_EMAIL | Default user, created on startup | | -| SMQ_USERS_ADMIN_PASSWORD | Default user password, created on startup | 12345678 | -| SMQ_USERS_PASS_REGEX | Password regex | ^.{8,}$ | -| SMQ_TOKEN_RESET_ENDPOINT | Password request reset endpoint, for constructing link | /reset-request | -| SMQ_USERS_HTTP_HOST | Users service HTTP host | localhost | -| SMQ_USERS_HTTP_PORT | Users service HTTP port | 9002 | -| SMQ_USERS_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" | -| SMQ_USERS_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" | -| SMQ_USERS_HTTP_SERVER_CA_CERTS | Path to the PEM encoded server CA certificate file | "" | -| SMQ_USERS_HTTP_CLIENT_CA_CERTS | Path to the PEM encoded client CA certificate file | "" | -| SMQ_AUTH_GRPC_URL | Auth service GRPC URL | localhost:8181 | -| SMQ_AUTH_GRPC_TIMEOUT | Auth service GRPC timeout | 1s | -| SMQ_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded client certificate file | "" | -| SMQ_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded client key file | "" | -| SMQ_AUTH_GRPC_SERVER_CA_CERTS | Path to the PEM encoded server CA certificate file | "" | -| SMQ_USERS_DB_HOST | Database host address | localhost | -| SMQ_USERS_DB_PORT | Database host port | 5432 | -| SMQ_USERS_DB_USER | Database user | supermq | -| SMQ_USERS_DB_PASS | Database password | supermq | -| SMQ_USERS_DB_NAME | Name of the database used by the service | users | -| SMQ_USERS_DB_SSL_MODE | Database connection SSL mode (disable, require, verify-ca, verify-full) | disable | -| SMQ_USERS_DB_SSL_CERT | Path to the PEM encoded certificate file | "" | -| SMQ_USERS_DB_SSL_KEY | Path to the PEM encoded key file | "" | -| SMQ_USERS_DB_SSL_ROOT_CERT | Path to the PEM encoded root certificate file | "" | -| SMQ_EMAIL_HOST | Mail server host | localhost | -| SMQ_EMAIL_PORT | Mail server port | 25 | -| SMQ_EMAIL_USERNAME | Mail server username | "" | -| SMQ_EMAIL_PASSWORD | Mail server password | "" | -| SMQ_EMAIL_FROM_ADDRESS | Email "from" address | "" | -| SMQ_EMAIL_FROM_NAME | Email "from" name | "" | -| SMQ_EMAIL_TEMPLATE | Email template for sending emails with password reset link | email.tmpl | -| SMQ_USERS_ES_URL | Event store URL | | -| SMQ_JAEGER_URL | Jaeger server URL | | -| SMQ_OAUTH_UI_REDIRECT_URL | OAuth UI redirect URL | | -| SMQ_OAUTH_UI_ERROR_URL | OAuth UI error URL | | -| SMQ_USERS_DELETE_INTERVAL | Interval for deleting users | 24h | -| SMQ_USERS_DELETE_AFTER | Time after which users are deleted | 720h | -| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server. | true | -| SMQ_USERS_INSTANCE_ID | SuperMQ instance ID | "" | +| Variable | Description | Default | +| ------------------------------ | ----------------------------------------------------------------------- | ----------------------------------- | +| SMQ_USERS_LOG_LEVEL | Log level for users service (debug, info, warn, error) | info | +| SMQ_USERS_ADMIN_EMAIL | Default user, created on startup | | +| SMQ_USERS_ADMIN_PASSWORD | Default user password, created on startup | 12345678 | +| SMQ_USERS_PASS_REGEX | Password regex | ^.{8,}$ | +| SMQ_TOKEN_RESET_ENDPOINT | Password request reset endpoint, for constructing link | /reset-request | +| SMQ_USERS_HTTP_HOST | Users service HTTP host | localhost | +| SMQ_USERS_HTTP_PORT | Users service HTTP port | 9002 | +| SMQ_USERS_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" | +| SMQ_USERS_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" | +| SMQ_USERS_HTTP_SERVER_CA_CERTS | Path to the PEM encoded server CA certificate file | "" | +| SMQ_USERS_HTTP_CLIENT_CA_CERTS | Path to the PEM encoded client CA certificate file | "" | +| SMQ_AUTH_GRPC_URL | Auth service GRPC URL | localhost:8181 | +| SMQ_AUTH_GRPC_TIMEOUT | Auth service GRPC timeout | 1s | +| SMQ_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded client certificate file | "" | +| SMQ_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded client key file | "" | +| SMQ_AUTH_GRPC_SERVER_CA_CERTS | Path to the PEM encoded server CA certificate file | "" | +| SMQ_USERS_DB_HOST | Database host address | localhost | +| SMQ_USERS_DB_PORT | Database host port | 5432 | +| SMQ_USERS_DB_USER | Database user | supermq | +| SMQ_USERS_DB_PASS | Database password | supermq | +| SMQ_USERS_DB_NAME | Name of the database used by the service | users | +| SMQ_USERS_DB_SSL_MODE | Database connection SSL mode (disable, require, verify-ca, verify-full) | disable | +| SMQ_USERS_DB_SSL_CERT | Path to the PEM encoded certificate file | "" | +| SMQ_USERS_DB_SSL_KEY | Path to the PEM encoded key file | "" | +| SMQ_USERS_DB_SSL_ROOT_CERT | Path to the PEM encoded root certificate file | "" | +| SMQ_EMAIL_HOST | Mail server host | localhost | +| SMQ_EMAIL_PORT | Mail server port | 25 | +| SMQ_EMAIL_USERNAME | Mail server username | "" | +| SMQ_EMAIL_PASSWORD | Mail server password | "" | +| SMQ_EMAIL_FROM_ADDRESS | Email "from" address | "" | +| SMQ_EMAIL_FROM_NAME | Email "from" name | "" | +| SMQ_EMAIL_TEMPLATE | Email template for sending emails with password reset link | email.tmpl | +| SMQ_USERS_ES_URL | Event store URL | | +| SMQ_JAEGER_URL | Jaeger server URL | | +| SMQ_OAUTH_UI_REDIRECT_URL | OAuth UI redirect URL | | +| SMQ_OAUTH_UI_ERROR_URL | OAuth UI error URL | | +| SMQ_USERS_DELETE_INTERVAL | Interval for deleting users | 24h | +| SMQ_USERS_DELETE_AFTER | Time after which users are deleted | 720h | +| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | +| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server. | true | +| SMQ_USERS_INSTANCE_ID | SuperMQ instance ID | "" | ## Deployment @@ -107,7 +107,7 @@ SMQ_EMAIL_PASSWORD="2b0d302e775b1e" \ SMQ_EMAIL_FROM_ADDRESS=from@example.com \ SMQ_EMAIL_FROM_NAME=Example \ SMQ_EMAIL_TEMPLATE="docker/templates/users.tmpl" \ -SMQ_USERS_ES_URL=nats://localhost:4222 \ +SMQ_USERS_ES_URL=amqp://guest:guest@rabbitmq:5672/ \ SMQ_JAEGER_URL=http://localhost:14268/api/traces \ SMQ_JAEGER_TRACE_RATIO=1.0 \ SMQ_SEND_TELEMETRY=true \ diff --git a/ws/README.md b/ws/README.md index d50f413ba1..28f686edd9 100644 --- a/ws/README.md +++ b/ws/README.md @@ -6,23 +6,23 @@ WebSocket adapter provides a [WebSocket](https://en.wikipedia.org/wiki/WebSocket The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- | -| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info | -| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" | -| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 | -| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" | -| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | -| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | -| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | -| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | -| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | -| SMQ_JAEGER_URL | Jaeger server URL | | -| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | -| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | -| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" | +| Variable | Description | Default | +| ---------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- | +| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info | +| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" | +| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 | +| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" | +| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | | +| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" | +| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" | +| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" | +| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | | +| SMQ_JAEGER_URL | Jaeger server URL | | +| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 | +| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true | +| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" | ## Deployment @@ -54,7 +54,7 @@ SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT="" \ SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY="" \ SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS="" \ -SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \ +SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \ SMQ_JAEGER_URL=http://localhost:14268/api/traces \ SMQ_JAEGER_TRACE_RATIO=1.0 \ SMQ_SEND_TELEMETRY=true \ diff --git a/ws/adapter.go b/ws/adapter.go index 02c4cfe39e..0517cbbe83 100644 --- a/ws/adapter.go +++ b/ws/adapter.go @@ -6,6 +6,7 @@ package ws import ( "context" "fmt" + "strings" grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1" grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1" @@ -93,6 +94,9 @@ func (svc *adapterService) authorize(ctx context.Context, clientKey, chanID stri authnReq := &grpcClientsV1.AuthnReq{ ClientSecret: clientKey, } + if strings.HasPrefix(clientKey, "Client") { + authnReq.ClientSecret = extractClientSecret(clientKey) + } authnRes, err := svc.clients.Authenticate(ctx, authnReq) if err != nil { return "", errors.Wrap(svcerr.ErrAuthentication, err) diff --git a/ws/handler.go b/ws/handler.go index 238011b28d..422e6fec4f 100644 --- a/ws/handler.go +++ b/ws/handler.go @@ -97,7 +97,9 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt token = string(s.Password) } - return h.authAccess(ctx, token, *topic, connections.Publish) + _, _, err := h.authAccess(ctx, token, *topic, connections.Publish) + + return err } // AuthSubscribe is called on device publish, @@ -111,16 +113,8 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error { return errMissingTopicSub } - var token string - switch { - case strings.HasPrefix(string(s.Password), "Client"): - token = strings.ReplaceAll(string(s.Password), "Client ", "") - default: - token = string(s.Password) - } - for _, topic := range *topics { - if err := h.authAccess(ctx, token, topic, connections.Subscribe); err != nil { + if _, _, err := h.authAccess(ctx, string(s.Password), topic, connections.Subscribe); err != nil { return err } } @@ -139,7 +133,6 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e if !ok { return errors.Wrap(errFailedPublish, errClientNotInitialized) } - h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic)) if len(*payload) == 0 { return errFailedMessagePublish @@ -160,41 +153,9 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e return errors.Wrap(errFailedParseSubtopic, err) } - var clientID, clientType string - switch { - case strings.HasPrefix(string(s.Password), "Client"): - clientKey := extractClientSecret(string(s.Password)) - authnRes, err := h.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{ClientSecret: clientKey}) - if err != nil { - return errors.Wrap(svcerr.ErrAuthentication, err) - } - if !authnRes.Authenticated { - return svcerr.ErrAuthentication - } - clientType = policies.ClientType - clientID = authnRes.GetId() - default: - token := string(s.Password) - authnSession, err := h.authn.Authenticate(ctx, extractBearerToken(token)) - if err != nil { - return err - } - clientType = policies.UserType - clientID = authnSession.DomainUserID - } - - ar := &grpcChannelsV1.AuthzReq{ - Type: uint32(connections.Publish), - ClientId: clientID, - ClientType: clientType, - ChannelId: chanID, - } - res, err := h.channels.Authorize(ctx, ar) + clientID, clientType, err := h.authAccess(ctx, string(s.Password), *topic, connections.Publish) if err != nil { - return err - } - if !res.GetAuthorized() { - return svcerr.ErrAuthorization + return errors.Wrap(errFailedPublish, err) } msg := messaging.Message{ @@ -213,6 +174,8 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e return errors.Wrap(errFailedPublishToMsgBroker, err) } + h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic)) + return nil } @@ -242,38 +205,33 @@ func (h *handler) Disconnect(ctx context.Context) error { return nil } -func (h *handler) authAccess(ctx context.Context, token, topic string, msgType connections.ConnType) error { - var clientID, clientType string - switch { - case strings.HasPrefix(token, "Client"): - clientKey := extractClientSecret(token) - authnRes, err := h.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{ClientSecret: clientKey}) - if err != nil { - return errors.Wrap(svcerr.ErrAuthentication, err) - } - if !authnRes.Authenticated { - return svcerr.ErrAuthentication - } - clientType = policies.ClientType - clientID = authnRes.GetId() - default: - authnSession, err := h.authn.Authenticate(ctx, extractBearerToken(token)) - if err != nil { - return err - } - clientType = policies.UserType - clientID = authnSession.DomainUserID +func (h *handler) authAccess(ctx context.Context, token, topic string, msgType connections.ConnType) (string, string, error) { + authnReq := &grpcClientsV1.AuthnReq{ + ClientSecret: token, } + if strings.HasPrefix(token, "Client") { + authnReq.ClientSecret = extractClientSecret(token) + } + + authnRes, err := h.clients.Authenticate(ctx, authnReq) + if err != nil { + return "", "", errors.Wrap(svcerr.ErrAuthentication, err) + } + if !authnRes.GetAuthenticated() { + return "", "", svcerr.ErrAuthentication + } + clientType := policies.ClientType + clientID := authnRes.GetId() // Topics are in the format: // channels//messages//.../ct/ if !channelRegExp.MatchString(topic) { - return errMalformedTopic + return "", "", errMalformedTopic } channelParts := channelRegExp.FindStringSubmatch(topic) if len(channelParts) < 1 { - return errMalformedTopic + return "", "", errMalformedTopic } chanID := channelParts[1] @@ -286,13 +244,13 @@ func (h *handler) authAccess(ctx context.Context, token, topic string, msgType c } res, err := h.channels.Authorize(ctx, ar) if err != nil { - return errors.Wrap(svcerr.ErrAuthorization, err) + return "", "", errors.Wrap(svcerr.ErrAuthorization, err) } if !res.GetAuthorized() { - return errors.Wrap(svcerr.ErrAuthorization, err) + return "", "", errors.Wrap(svcerr.ErrAuthorization, err) } - return nil + return clientID, clientType, nil } func parseSubtopic(subtopic string) (string, error) { @@ -325,19 +283,10 @@ func parseSubtopic(subtopic string) (string, error) { } // extractClientSecret returns value of the client secret. If there is no client key - an empty value is returned. -func extractClientSecret(topic string) string { - if !strings.HasPrefix(topic, apiutil.ClientPrefix) { - return "" - } - - return strings.TrimPrefix(topic, apiutil.ClientPrefix) -} - -// extractBearerToken returns value of the bearer token. If there is no bearer token - an empty value is returned. -func extractBearerToken(token string) string { - if !strings.HasPrefix(token, apiutil.BearerPrefix) { +func extractClientSecret(token string) string { + if !strings.HasPrefix(token, apiutil.ClientPrefix) { return "" } - return strings.TrimPrefix(token, apiutil.BearerPrefix) + return strings.TrimPrefix(token, apiutil.ClientPrefix) }