From 6c13917d1b32b348d022fb39fc74426562b92dfc Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Fri, 20 Dec 2024 13:42:14 +0300 Subject: [PATCH] address comments Signed-off-by: nyagamunene --- .golangci.yaml | 1 + cmd/proxy/main.go | 47 ++++++++++++++++++++++++++++---------- proplet/requests.go | 18 +++++++++------ proplet/service.go | 25 ++++++++++---------- proxy/config/mqtt.go | 8 ------- proxy/{config => }/http.go | 32 ++++++++++++++------------ proxy/{mqtt => }/mqtt.go | 23 ++++++++++++------- proxy/service.go | 46 ++++++++++++++++--------------------- task/task.go | 26 ++++++++++----------- task/url.go | 45 ++++++++++++++++++++++++++++++++++++ 10 files changed, 169 insertions(+), 102 deletions(-) delete mode 100644 proxy/config/mqtt.go rename proxy/{config => }/http.go (77%) rename proxy/{mqtt => }/mqtt.go (87%) create mode 100644 task/url.go diff --git a/.golangci.yaml b/.golangci.yaml index 6bf3b58..25c0799 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -34,6 +34,7 @@ linters: - noctx - cyclop - tagalign + - recvcheck linters-settings: gocritic: diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 7c5c90d..40ac1ce 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -8,21 +8,38 @@ import ( "os" "github.com/absmach/propeller/proxy" - "github.com/absmach/propeller/proxy/config" "github.com/caarlos0/env/v11" "golang.org/x/sync/errgroup" ) -const ( - svcName = "proxy" - logLevel = "info" -) +const svcName = "proxy" + +type config struct { + LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"` + + BrokerURL string `env:"PROXY_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + PropletKey string `env:"PROXY_PROPLET_KEY,notEmpty"` + PropletID string `env:"PROXY_PROPLET_ID,notEmpty" ` + ChannelID string `env:"PROXY_CHANNEL_ID,notEmpty"` + + ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"` + Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"` + Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""` + Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""` + Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""` + RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"` +} func main() { g, ctx := errgroup.WithContext(context.Background()) + cfg := config{} + if err := env.Parse(&cfg); err != nil { + log.Fatalf("failed to load configuration : %s", err.Error()) + } + var level slog.Level - if err := level.UnmarshalText([]byte(logLevel)); err != nil { + if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { log.Fatalf("failed to parse log level: %s", err.Error()) } logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ @@ -31,14 +48,20 @@ func main() { logger := slog.New(logHandler) slog.SetDefault(logger) - mqttCfg := config.MQTTProxyConfig{} - if err := env.Parse(&mqttCfg); err != nil { - logger.Error("failed to load mqtt config", slog.Any("error", err)) + mqttCfg := proxy.MQTTProxyConfig{ + BrokerURL: cfg.BrokerURL, + Password: cfg.PropletKey, + PropletID: cfg.PropletID, + ChannelID: cfg.ChannelID, } - httpCfg := config.HTTPProxyConfig{} - if err := env.Parse(&httpCfg); err != nil { - logger.Error("failed to load http config", slog.Any("error", err)) + httpCfg := proxy.HTTPProxyConfig{ + ChunkSize: cfg.ChunkSize, + Authenticate: cfg.Authenticate, + Token: cfg.Token, + Username: cfg.Username, + Password: cfg.Password, + RegistryURL: cfg.RegistryURL, } logger.Info("successfully initialized MQTT and HTTP config") diff --git a/proplet/requests.go b/proplet/requests.go index 8d42405..12d5687 100644 --- a/proplet/requests.go +++ b/proplet/requests.go @@ -1,13 +1,17 @@ package proplet -import "errors" +import ( + "errors" + + "github.com/absmach/propeller/task" +) type startRequest struct { - ID string - FunctionName string - WasmFile []byte - WasmFileDownloadPath string - Params []uint64 + ID string + FunctionName string + WasmFile []byte + imageURL task.URLValue + Params []uint64 } func (r startRequest) Validate() error { @@ -17,7 +21,7 @@ func (r startRequest) Validate() error { if r.FunctionName == "" { return errors.New("function name is required") } - if r.WasmFile == nil && r.WasmFileDownloadPath == "" { + if r.WasmFile == nil && r.imageURL == (task.URLValue{}) { return errors.New("either wasm file or wasm file download path is required") } diff --git a/proplet/service.go b/proplet/service.go index f5428c3..77b0bfe 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -138,11 +138,11 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri } req := startRequest{ - ID: payload.ID, - FunctionName: payload.Name, - WasmFile: payload.File, - WasmFileDownloadPath: payload.DownloadFile, - Params: payload.Inputs, + ID: payload.ID, + FunctionName: payload.Name, + WasmFile: payload.File, + imageURL: payload.ImageURL, + Params: payload.Inputs, } if err := req.Validate(); err != nil { return err @@ -159,7 +159,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri } pl := map[string]interface{}{ - "app_name": req.WasmFileDownloadPath, + "app_name": req.imageURL, } tp := fmt.Sprintf(fetchRequestTopicTemplate, p.channelID) if err := p.pubsub.Publish(ctx, tp, pl); err != nil { @@ -167,19 +167,20 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri } go func() { - p.logger.Info("Waiting for chunks", slog.String("app_name", req.WasmFileDownloadPath)) + p.logger.Info("Waiting for chunks", slog.String("app_name", req.imageURL.String())) for { p.chunksMutex.Lock() - metadata, exists := p.chunkMetadata[req.WasmFileDownloadPath] - receivedChunks := len(p.chunks[req.WasmFileDownloadPath]) + urlStr := req.imageURL.String() + metadata, exists := p.chunkMetadata[urlStr] + receivedChunks := len(p.chunks[urlStr]) p.chunksMutex.Unlock() if exists && receivedChunks == metadata.TotalChunks { - p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.WasmFileDownloadPath)) - wasmBinary := assembleChunks(p.chunks[req.WasmFileDownloadPath]) + p.logger.Info("All chunks received, deploying app", slog.String("app_name", urlStr)) + wasmBinary := assembleChunks(p.chunks[urlStr]) if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil { - p.logger.Error("Failed to start app", slog.String("app_name", req.WasmFileDownloadPath), slog.Any("error", err)) + p.logger.Error("Failed to start app", slog.String("app_name", urlStr), slog.Any("error", err)) } break diff --git a/proxy/config/mqtt.go b/proxy/config/mqtt.go deleted file mode 100644 index a1f8be2..0000000 --- a/proxy/config/mqtt.go +++ /dev/null @@ -1,8 +0,0 @@ -package config - -type MQTTProxyConfig struct { - BrokerURL string `env:"PROXY_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` - Password string `env:"PROXY_PROPLET_KEY,notEmpty"` - PropletID string `env:"PROXY_PROPLET_ID,notEmpty" ` - ChannelID string `env:"PROXY_CHANNEL_ID,notEmpty"` -} diff --git a/proxy/config/http.go b/proxy/http.go similarity index 77% rename from proxy/config/http.go rename to proxy/http.go index 6b7336c..db69c4e 100644 --- a/proxy/config/http.go +++ b/proxy/http.go @@ -1,4 +1,4 @@ -package config +package proxy import ( "context" @@ -9,6 +9,7 @@ import ( "log" "github.com/absmach/propeller/proplet" + "github.com/absmach/propeller/task" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" @@ -21,12 +22,12 @@ const ( ) type HTTPProxyConfig struct { - ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"` - Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"` - Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""` - Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""` - Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""` - RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"` + ChunkSize int + Authenticate bool + Token string + Username string + Password string + RegistryURL string } func (c *HTTPProxyConfig) setupAuthentication(repo *remote.Repository) { @@ -123,36 +124,37 @@ func createChunks(data []byte, containerPath string, chunkSize int) []proplet.Ch return chunks } -func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath string, chunkSize int) ([]proplet.ChunkPayload, error) { - repo, err := remote.NewRepository(containerPath) +func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath task.URLValue, chunkSize int) ([]proplet.ChunkPayload, error) { + reference := containerPath.String() + repo, err := remote.NewRepository(reference) if err != nil { - return nil, fmt.Errorf("failed to create repository for %s: %w", containerPath, err) + return nil, fmt.Errorf("failed to create repository for %s: %w", reference, err) } c.setupAuthentication(repo) - manifest, err := c.fetchManifest(ctx, repo, containerPath) + manifest, err := c.fetchManifest(ctx, repo, reference) if err != nil { return nil, err } largestLayer, err := findLargestLayer(manifest) if err != nil { - return nil, fmt.Errorf("failed to find layer for %s: %w", containerPath, err) + return nil, fmt.Errorf("failed to find layer for %s: %w", reference, err) } log.Printf("Container size: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/size) layerReader, err := repo.Fetch(ctx, largestLayer) if err != nil { - return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerPath, err) + return nil, fmt.Errorf("failed to fetch layer for %s: %w", reference, err) } defer layerReader.Close() data, err := io.ReadAll(layerReader) if err != nil { - return nil, fmt.Errorf("failed to read layer for %s: %w", containerPath, err) + return nil, fmt.Errorf("failed to read layer for %s: %w", reference, err) } - return createChunks(data, containerPath, chunkSize), nil + return createChunks(data, reference, chunkSize), nil } diff --git a/proxy/mqtt/mqtt.go b/proxy/mqtt.go similarity index 87% rename from proxy/mqtt/mqtt.go rename to proxy/mqtt.go index e1afb0e..2bd9279 100644 --- a/proxy/mqtt/mqtt.go +++ b/proxy/mqtt.go @@ -1,4 +1,4 @@ -package mqtt +package proxy import ( "context" @@ -8,10 +8,17 @@ import ( "time" "github.com/absmach/propeller/proplet" - "github.com/absmach/propeller/proxy/config" + "github.com/absmach/propeller/task" mqtt "github.com/eclipse/paho.mqtt.golang" ) +type MQTTProxyConfig struct { + BrokerURL string + Password string + PropletID string + ChannelID string +} + const ( connTimeout = 10 reconnTimeout = 1 @@ -22,10 +29,10 @@ const ( type RegistryClient struct { client mqtt.Client - config *config.MQTTProxyConfig + config *MQTTProxyConfig } -func NewMQTTClient(cfg *config.MQTTProxyConfig) (*RegistryClient, error) { +func NewMQTTClient(cfg *MQTTProxyConfig) (*RegistryClient, error) { opts := mqtt.NewClientOptions(). AddBroker(cfg.BrokerURL). SetClientID("Proplet-" + cfg.PropletID). @@ -71,14 +78,14 @@ func (c *RegistryClient) Connect(ctx context.Context) error { return nil } -func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error { +func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- task.URLValue) error { handler := func(client mqtt.Client, msg mqtt.Message) { data := msg.Payload() payLoad := struct { - Appname string `json:"app_name"` + Appname task.URLValue `json:"app_name"` }{ - Appname: "", + Appname: task.URLValue{}, } err := json.Unmarshal(data, &payLoad) @@ -90,7 +97,7 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- str select { case containerChan <- payLoad.Appname: - log.Printf("Received container request: %s", payLoad.Appname) + log.Printf("Received container request: %s", payLoad.Appname.String()) case <-ctx.Done(): return diff --git a/proxy/service.go b/proxy/service.go index 74d5e47..439e79a 100644 --- a/proxy/service.go +++ b/proxy/service.go @@ -6,42 +6,39 @@ import ( "log/slog" "github.com/absmach/propeller/proplet" - "github.com/absmach/propeller/proxy/config" - "github.com/absmach/propeller/proxy/mqtt" + "github.com/absmach/propeller/task" ) const chunkBuffer = 10 type ProxyService struct { - orasconfig *config.HTTPProxyConfig - mqttClient *mqtt.RegistryClient + orasconfig *HTTPProxyConfig + mqttClient *RegistryClient logger *slog.Logger - containerChan chan string + containerChan chan task.URLValue dataChan chan proplet.ChunkPayload } -func NewService(ctx context.Context, mqttCfg *config.MQTTProxyConfig, httpCfg *config.HTTPProxyConfig, logger *slog.Logger) (*ProxyService, error) { - mqttClient, err := mqtt.NewMQTTClient(mqttCfg) +func NewService(ctx context.Context, mqttCfg *MQTTProxyConfig, httpCfg *HTTPProxyConfig, logger *slog.Logger) (*ProxyService, error) { + mqttClient, err := NewMQTTClient(mqttCfg) if err != nil { return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) } - logger.Info("successfully initialized MQTT client") - return &ProxyService{ orasconfig: httpCfg, mqttClient: mqttClient, logger: logger, - containerChan: make(chan string, 1), + containerChan: make(chan task.URLValue, 1), dataChan: make(chan proplet.ChunkPayload, chunkBuffer), }, nil } -func (s *ProxyService) MQTTClient() *mqtt.RegistryClient { +func (s *ProxyService) MQTTClient() *RegistryClient { return s.mqttClient } -func (s *ProxyService) ContainerChan() chan string { +func (s *ProxyService) ContainerChan() chan task.URLValue { return s.containerChan } @@ -53,7 +50,9 @@ func (s *ProxyService) StreamHTTP(ctx context.Context) error { case containerName := <-s.containerChan: chunks, err := s.orasconfig.FetchFromReg(ctx, containerName, s.orasconfig.ChunkSize) if err != nil { - s.logger.Error("failed to fetch container", "container", containerName, "error", err) + s.logger.Error("failed to fetch container", + slog.Any("container name", containerName), + slog.Any("error", err)) continue } @@ -63,9 +62,9 @@ func (s *ProxyService) StreamHTTP(ctx context.Context) error { select { case s.dataChan <- chunk: s.logger.Info("sent container chunk to MQTT stream", - "container", containerName, - "chunk", chunk.ChunkIdx, - "total", chunk.TotalChunks) + slog.Any("container", containerName), + slog.Int("chunk", chunk.ChunkIdx), + slog.Int("total", chunk.TotalChunks)) case <-ctx.Done(): return ctx.Err() } @@ -84,24 +83,19 @@ func (s *ProxyService) StreamMQTT(ctx context.Context) error { case chunk := <-s.dataChan: if err := s.mqttClient.PublishContainer(ctx, chunk); err != nil { s.logger.Error("failed to publish container chunk", - "error", err, - "chunk", chunk.ChunkIdx, - "total", chunk.TotalChunks) + slog.Any("error", err), + slog.Int("chunk", chunk.ChunkIdx), + slog.Int("total", chunk.TotalChunks)) continue } - s.logger.Info("published container chunk", - "chunk_name", chunk.AppName, - "chunk_no", chunk.ChunkIdx, - "total", chunk.TotalChunks) - containerChunks[chunk.AppName]++ if containerChunks[chunk.AppName] == chunk.TotalChunks { s.logger.Info("successfully sent all chunks", - "container", chunk.AppName, - "total_chunks", chunk.TotalChunks) + slog.String("container", chunk.AppName), + slog.Int("total_chunks", chunk.TotalChunks)) delete(containerChunks, chunk.AppName) } } diff --git a/task/task.go b/task/task.go index c018fef..daf0595 100644 --- a/task/task.go +++ b/task/task.go @@ -1,8 +1,6 @@ package task -import ( - "time" -) +import "time" type State uint8 @@ -32,17 +30,17 @@ func (s State) String() string { } type Task struct { - ID string `json:"id"` - Name string `json:"name"` - State State `json:"state"` - DownloadFile string `json:"download_file,omitempty"` - File []byte `json:"file,omitempty"` - Inputs []uint64 `json:"inputs,omitempty"` - Results []uint64 `json:"results,omitempty"` - StartTime time.Time `json:"start_time"` - FinishTime time.Time `json:"finish_time"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Name string `json:"name"` + State State `json:"state"` + ImageURL URLValue `json:"image_url,omitempty"` + File []byte `json:"file,omitempty"` + Inputs []uint64 `json:"inputs,omitempty"` + Results []uint64 `json:"results,omitempty"` + StartTime time.Time `json:"start_time"` + FinishTime time.Time `json:"finish_time"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } type TaskPage struct { diff --git a/task/url.go b/task/url.go new file mode 100644 index 0000000..2f70d13 --- /dev/null +++ b/task/url.go @@ -0,0 +1,45 @@ +package task + +import ( + "encoding/json" + "net/url" +) + +type URLValue url.URL + +func (u URLValue) MarshalJSON() ([]byte, error) { + return json.Marshal((*url.URL)(&u).String()) +} + +func (u *URLValue) UnmarshalJSON(data []byte) error { + var urlStr string + if err := json.Unmarshal(data, &urlStr); err != nil { + return err + } + + if urlStr == "" { + return nil + } + + parsedURL, err := url.Parse(urlStr) + if err != nil { + return err + } + + *u = URLValue(*parsedURL) + + return nil +} + +func (u *URLValue) URL() *url.URL { + if u == nil { + return nil + } + val := url.URL(*u) + + return &val +} + +func (u URLValue) String() string { + return (*url.URL)(&u).String() +}