Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
  • Loading branch information
nyagamunene committed Dec 20, 2024
1 parent b439ca0 commit 6c13917
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 102 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ linters:
- noctx
- cyclop
- tagalign
- recvcheck

linters-settings:
gocritic:
Expand Down
47 changes: 35 additions & 12 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,38 @@ import (
"os"

"github.com/absmach/propeller/proxy"
"github.com/absmach/propeller/proxy/config"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)

const (
svcName = "proxy"
logLevel = "info"
)
const svcName = "proxy"

type config struct {
LogLevel string `env:"PROXY_LOG_LEVEL" envDefault:"info"`

BrokerURL string `env:"PROXY_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
PropletKey string `env:"PROXY_PROPLET_KEY,notEmpty"`
PropletID string `env:"PROXY_PROPLET_ID,notEmpty" `
ChannelID string `env:"PROXY_CHANNEL_ID,notEmpty"`

ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"`
Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"`
Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""`
Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""`
Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""`
RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"`
}

func main() {
g, ctx := errgroup.WithContext(context.Background())

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
}

var level slog.Level
if err := level.UnmarshalText([]byte(logLevel)); err != nil {
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Expand All @@ -31,14 +48,20 @@ func main() {
logger := slog.New(logHandler)
slog.SetDefault(logger)

mqttCfg := config.MQTTProxyConfig{}
if err := env.Parse(&mqttCfg); err != nil {
logger.Error("failed to load mqtt config", slog.Any("error", err))
mqttCfg := proxy.MQTTProxyConfig{
BrokerURL: cfg.BrokerURL,
Password: cfg.PropletKey,
PropletID: cfg.PropletID,
ChannelID: cfg.ChannelID,
}

httpCfg := config.HTTPProxyConfig{}
if err := env.Parse(&httpCfg); err != nil {
logger.Error("failed to load http config", slog.Any("error", err))
httpCfg := proxy.HTTPProxyConfig{
ChunkSize: cfg.ChunkSize,
Authenticate: cfg.Authenticate,
Token: cfg.Token,
Username: cfg.Username,
Password: cfg.Password,
RegistryURL: cfg.RegistryURL,
}

logger.Info("successfully initialized MQTT and HTTP config")
Expand Down
18 changes: 11 additions & 7 deletions proplet/requests.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package proplet

import "errors"
import (
"errors"

"github.com/absmach/propeller/task"
)

type startRequest struct {
ID string
FunctionName string
WasmFile []byte
WasmFileDownloadPath string
Params []uint64
ID string
FunctionName string
WasmFile []byte
imageURL task.URLValue
Params []uint64
}

func (r startRequest) Validate() error {
Expand All @@ -17,7 +21,7 @@ func (r startRequest) Validate() error {
if r.FunctionName == "" {
return errors.New("function name is required")
}
if r.WasmFile == nil && r.WasmFileDownloadPath == "" {
if r.WasmFile == nil && r.imageURL == (task.URLValue{}) {
return errors.New("either wasm file or wasm file download path is required")
}

Expand Down
25 changes: 13 additions & 12 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri
}

req := startRequest{
ID: payload.ID,
FunctionName: payload.Name,
WasmFile: payload.File,
WasmFileDownloadPath: payload.DownloadFile,
Params: payload.Inputs,
ID: payload.ID,
FunctionName: payload.Name,
WasmFile: payload.File,
imageURL: payload.ImageURL,
Params: payload.Inputs,
}
if err := req.Validate(); err != nil {
return err
Expand All @@ -159,27 +159,28 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri
}

pl := map[string]interface{}{
"app_name": req.WasmFileDownloadPath,
"app_name": req.imageURL,
}
tp := fmt.Sprintf(fetchRequestTopicTemplate, p.channelID)
if err := p.pubsub.Publish(ctx, tp, pl); err != nil {
return err
}

go func() {
p.logger.Info("Waiting for chunks", slog.String("app_name", req.WasmFileDownloadPath))
p.logger.Info("Waiting for chunks", slog.String("app_name", req.imageURL.String()))

for {
p.chunksMutex.Lock()
metadata, exists := p.chunkMetadata[req.WasmFileDownloadPath]
receivedChunks := len(p.chunks[req.WasmFileDownloadPath])
urlStr := req.imageURL.String()
metadata, exists := p.chunkMetadata[urlStr]
receivedChunks := len(p.chunks[urlStr])
p.chunksMutex.Unlock()

if exists && receivedChunks == metadata.TotalChunks {
p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.WasmFileDownloadPath))
wasmBinary := assembleChunks(p.chunks[req.WasmFileDownloadPath])
p.logger.Info("All chunks received, deploying app", slog.String("app_name", urlStr))
wasmBinary := assembleChunks(p.chunks[urlStr])
if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil {
p.logger.Error("Failed to start app", slog.String("app_name", req.WasmFileDownloadPath), slog.Any("error", err))
p.logger.Error("Failed to start app", slog.String("app_name", urlStr), slog.Any("error", err))
}

break
Expand Down
8 changes: 0 additions & 8 deletions proxy/config/mqtt.go

This file was deleted.

32 changes: 17 additions & 15 deletions proxy/config/http.go → proxy/http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package config
package proxy

import (
"context"
Expand All @@ -9,6 +9,7 @@ import (
"log"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/task"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
Expand All @@ -21,12 +22,12 @@ const (
)

type HTTPProxyConfig struct {
ChunkSize int `env:"PROXY_CHUNK_SIZE" envDefault:"512000"`
Authenticate bool `env:"PROXY_AUTHENTICATE" envDefault:"false"`
Token string `env:"PROXY_REGISTRY_TOKEN" envDefault:""`
Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""`
Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""`
RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"`
ChunkSize int
Authenticate bool
Token string
Username string
Password string
RegistryURL string
}

func (c *HTTPProxyConfig) setupAuthentication(repo *remote.Repository) {
Expand Down Expand Up @@ -123,36 +124,37 @@ func createChunks(data []byte, containerPath string, chunkSize int) []proplet.Ch
return chunks
}

func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath string, chunkSize int) ([]proplet.ChunkPayload, error) {
repo, err := remote.NewRepository(containerPath)
func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerPath task.URLValue, chunkSize int) ([]proplet.ChunkPayload, error) {
reference := containerPath.String()
repo, err := remote.NewRepository(reference)
if err != nil {
return nil, fmt.Errorf("failed to create repository for %s: %w", containerPath, err)
return nil, fmt.Errorf("failed to create repository for %s: %w", reference, err)
}

c.setupAuthentication(repo)

manifest, err := c.fetchManifest(ctx, repo, containerPath)
manifest, err := c.fetchManifest(ctx, repo, reference)
if err != nil {
return nil, err
}

largestLayer, err := findLargestLayer(manifest)
if err != nil {
return nil, fmt.Errorf("failed to find layer for %s: %w", containerPath, err)
return nil, fmt.Errorf("failed to find layer for %s: %w", reference, err)
}

log.Printf("Container size: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/size)

layerReader, err := repo.Fetch(ctx, largestLayer)
if err != nil {
return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerPath, err)
return nil, fmt.Errorf("failed to fetch layer for %s: %w", reference, err)
}
defer layerReader.Close()

data, err := io.ReadAll(layerReader)
if err != nil {
return nil, fmt.Errorf("failed to read layer for %s: %w", containerPath, err)
return nil, fmt.Errorf("failed to read layer for %s: %w", reference, err)
}

return createChunks(data, containerPath, chunkSize), nil
return createChunks(data, reference, chunkSize), nil
}
23 changes: 15 additions & 8 deletions proxy/mqtt/mqtt.go → proxy/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mqtt
package proxy

import (
"context"
Expand All @@ -8,10 +8,17 @@ import (
"time"

"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/proxy/config"
"github.com/absmach/propeller/task"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

type MQTTProxyConfig struct {
BrokerURL string
Password string
PropletID string
ChannelID string
}

const (
connTimeout = 10
reconnTimeout = 1
Expand All @@ -22,10 +29,10 @@ const (

type RegistryClient struct {
client mqtt.Client
config *config.MQTTProxyConfig
config *MQTTProxyConfig
}

func NewMQTTClient(cfg *config.MQTTProxyConfig) (*RegistryClient, error) {
func NewMQTTClient(cfg *MQTTProxyConfig) (*RegistryClient, error) {
opts := mqtt.NewClientOptions().
AddBroker(cfg.BrokerURL).
SetClientID("Proplet-" + cfg.PropletID).
Expand Down Expand Up @@ -71,14 +78,14 @@ func (c *RegistryClient) Connect(ctx context.Context) error {
return nil
}

func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- string) error {
func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- task.URLValue) error {
handler := func(client mqtt.Client, msg mqtt.Message) {
data := msg.Payload()

payLoad := struct {
Appname string `json:"app_name"`
Appname task.URLValue `json:"app_name"`
}{
Appname: "",
Appname: task.URLValue{},
}

err := json.Unmarshal(data, &payLoad)
Expand All @@ -90,7 +97,7 @@ func (c *RegistryClient) Subscribe(ctx context.Context, containerChan chan<- str

select {
case containerChan <- payLoad.Appname:
log.Printf("Received container request: %s", payLoad.Appname)
log.Printf("Received container request: %s", payLoad.Appname.String())
case <-ctx.Done():

return
Expand Down
Loading

0 comments on commit 6c13917

Please sign in to comment.