Skip to content

Commit

Permalink
refactor: seperate proplet and manager as 2 seperate binaries
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
  • Loading branch information
rodneyosodo committed Dec 17, 2024
1 parent deb6325 commit 789e3d0
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 513 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ linters:
- dupl
- err113
- noctx
- cyclop

linters-settings:
gocritic:
Expand Down
19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0')
COMMIT ?= $(shell git rev-parse HEAD)
EXAMPLES = addition long-addition
SERVICES = manager proplet cli

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) \
go build -ldflags "-s -w \
-X 'github.com/absmach/magistrala.BuildTime=$(TIME)' \
-X 'github.com/absmach/magistrala.Version=$(VERSION)' \
-X 'github.com/absmach/magistrala.Commit=$(COMMIT)'" \
-o ${BUILD_DIR}/propellerd cmd/propellerd/main.go
-o ${BUILD_DIR}/$(1) cmd/$(1)/main.go
endef

.PHONY: build
build:
$(call compile_service)
$(SERVICES):
$(call compile_service,$(@))

install:
cp ${BUILD_DIR}/propellerd $(GOBIN)/propellerd

all: build
for file in $(BUILD_DIR)/*; do \
if [[ ! "$$file" =~ \.wasm$$ ]]; then \
cp "$$file" $(GOBIN)/propeller-`basename "$$file"`; \
fi \
done

.PHONY: all $(SERVICES)
all: $(SERVICES)

clean:
rm -rf build
Expand Down
10 changes: 3 additions & 7 deletions cmd/propellerd/main.go → cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

func main() {
rootCmd := &cobra.Command{
Use: "propellerd",
Short: "Propeller Daemon",
Long: `Propeller Daemon is a daemon that manages the lifecycle of Propeller components.`,
Use: "propeller-cli",
Short: "Propeller CLI",
Long: `Propeller CLI is a command line interface for interacting with Propeller components.`,
PersistentPreRun: func(_ *cobra.Command, _ []string) {
sdkConf := sdk.Config{
ManagerURL: propellerd.DefManagerURL,
Expand All @@ -23,13 +23,9 @@ func main() {
},
}

managerCmd := propellerd.NewManagerCmd()
tasksCmd := propellerd.NewTasksCmd()
propletCmd := propellerd.NewPropletCmd()

rootCmd.AddCommand(managerCmd)
rootCmd.AddCommand(tasksCmd)
rootCmd.AddCommand(propletCmd)

if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
Expand Down
74 changes: 51 additions & 23 deletions cmd/manager/start.go → cmd/manager/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package manager
package main

import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
Expand All @@ -18,33 +19,49 @@ import (
"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/pkg/scheduler"
"github.com/absmach/propeller/pkg/storage"
"github.com/caarlos0/env/v11"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
)

const svcName = "manager"

type Config struct {
LogLevel string
InstanceID string
MQTTAddress string
MQTTQoS uint8
MQTTTimeout time.Duration
ChannelID string
ThingID string
ThingKey string
const (
svcName = "manager"
defHTTPPort = "7070"
envPrefixHTTP = "MANAGER_HTTP_"
)

type config struct {
LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MANAGER_INSTANCE_ID"`
MQTTAddress string `env:"MANAGER_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTQoS uint8 `env:"MANAGER_MQTT_QOS" envDefault:"2"`
MQTTTimeout time.Duration `env:"MANAGER_MQTT_TIMEOUT" envDefault:"30s"`
ChannelID string `env:"MANAGER_CHANNEL_ID,notEmpty"`
ThingID string `env:"MANAGER_THING_ID,notEmpty"`
ThingKey string `env:"MANAGER_THING_KEY,notEmpty"`
Server server.Config
OTELURL url.URL
TraceRatio float64
OTELURL url.URL `env:"MANAGER_OTEL_URL"`
TraceRatio float64 `env:"MANAGER_TRACE_RATIO" envDefault:"0"`
}

func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) error {
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
}

if cfg.InstanceID == "" {
cfg.InstanceID = uuid.NewString()
}

var level slog.Level
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
return fmt.Errorf("failed to parse log level: %s", err.Error())
log.Fatalf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
Expand All @@ -59,11 +76,13 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er
default:
sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio)
if err != nil {
return fmt.Errorf("failed to initialize opentelemetry: %s", err.Error())
logger.Error("failed to initialize opentelemetry", slog.String("error", err.Error()))

return
}
defer func() {
if err := sdktp.Shutdown(ctx); err != nil {
slog.Error("error shutting down tracer provider", slog.Any("error", err))
logger.Error("error shutting down tracer provider", slog.Any("error", err))
}
}()
tp = sdktp
Expand All @@ -72,7 +91,9 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, svcName, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
return fmt.Errorf("failed to initialize mqtt pubsub: %s", err.Error())
logger.Error("failed to initialize mqtt pubsub", slog.String("error", err.Error()))

return
}

svc := manager.NewService(
Expand All @@ -90,10 +111,19 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er
svc = middleware.Metrics(counter, latency, svc)

if err := svc.Subscribe(ctx); err != nil {
return fmt.Errorf("failed to subscribe to manager channel: %s", err.Error())
logger.Error("failed to subscribe to manager channel", slog.String("error", err.Error()))

return
}

httpServerConfig := server.Config{Port: defHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))

return
}

hs := httpserver.NewServer(ctx, cancel, svcName, cfg.Server, api.MakeHandler(svc, logger, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)

g.Go(func() error {
return hs.Start()
Expand All @@ -106,6 +136,4 @@ func StartManager(ctx context.Context, cancel context.CancelFunc, cfg Config) er
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}

return nil
}
122 changes: 122 additions & 0 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/proplet"
"github.com/caarlos0/env/v11"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)

const svcName = "proplet"

type config struct {
LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"PROPLET_INSTANCE_ID"`
MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
RegistryURL string `env:"PROPLET_REGISTRY_URL"`
RegistryToken string `env:"PROPLET_REGISTRY_TOKEN"`
RegistryTimeout time.Duration `env:"PROPLET_REGISTRY_TIMEOUT" envDefault:"30s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
}

if cfg.InstanceID == "" {
cfg.InstanceID = uuid.NewString()
}

var level slog.Level
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})
logger := slog.New(logHandler)
slog.SetDefault(logger)

if cfg.RegistryURL != "" {
if err := checkRegistryConnectivity(ctx, cfg.RegistryURL, cfg.RegistryTimeout); err != nil {
logger.Error("failed to connect to registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err))

return
}

logger.Info("successfully connected to registry URL", slog.String("url", cfg.RegistryURL))
}

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.InstanceID, cfg.ThingID, cfg.ThingKey, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
logger.Error("failed to initialize mqtt client", slog.Any("error", err))

return
}
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.RegistryURL, cfg.RegistryToken, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
if err != nil {
logger.Error("failed to initialize service", slog.Any("error", err))

return
}

if err := service.Run(ctx, logger); err != nil {
logger.Error("failed to run service", slog.Any("error", err))

return
}

g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName)
})

if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service exited with error: %s", svcName, err))
}
}

func checkRegistryConnectivity(ctx context.Context, registryURL string, registryTimeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, registryTimeout)
defer cancel()

client := http.DefaultClient

req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to registry URL: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fegistry returned unexpected status: %d", resp.StatusCode)
}

return nil
}
Loading

0 comments on commit 789e3d0

Please sign in to comment.