Skip to content

Commit

Permalink
Consolidate chunk handling
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
  • Loading branch information
JeffMboya committed Dec 15, 2024
1 parent dd4354d commit 4473324
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 26 deletions.
6 changes: 3 additions & 3 deletions proplet/proplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ var _ Service = (*PropletService)(nil)
func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
if err := p.mqttService.SubscribeToManagerTopics(ctx,
func(topic string, msg map[string]interface{}) error {
return p.handleStartCommand(ctx, topic, msg, logger)
return p.handleStartCmd(ctx, topic, msg, logger)
},
func(topic string, msg map[string]interface{}) error {
return p.handleStopCommand(ctx, topic, msg, logger)
return p.handleStopCmd(ctx, topic, msg, logger)
},
func(topic string, msg map[string]interface{}) error {
return p.registryUpdate(ctx, topic, msg, logger)
Expand All @@ -75,7 +75,7 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error {
}

if err := p.mqttService.SubscribeToRegistryTopic(ctx, func(topic string, msg map[string]interface{}) error {
return p.handleChunk(ctx, topic, msg)
return p.handleAppChunks(ctx, topic, msg)
}); err != nil {
return fmt.Errorf("failed to subscribe to registry topic: %w", err)
}
Expand Down
37 changes: 14 additions & 23 deletions proplet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewWazeroRuntime(ctx context.Context) *WazeroRuntime {
}
}

func (p *PropletService) handleStartCommand(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error {
func (p *PropletService) handleStartCmd(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error {
rpcReq, err := parseRPCRequest(msg)
if err != nil {
return err
Expand Down Expand Up @@ -108,7 +108,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context, _ string, msg m
return nil
}

func (p *PropletService) handleStopCommand(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error {
func (p *PropletService) handleStopCmd(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error {
rpcReq, err := parseRPCRequest(msg)
if err != nil {
return err
Expand All @@ -131,7 +131,7 @@ func (p *PropletService) handleStopCommand(ctx context.Context, _ string, msg ma
return nil
}

func (p *PropletService) handleChunk(ctx context.Context, _ string, msg map[string]interface{}) error {
func (p *PropletService) handleAppChunks(_ context.Context, _ string, msg map[string]interface{}) error {
var chunk ChunkPayload
data, err := json.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -159,23 +159,23 @@ func (p *PropletService) handleChunk(ctx context.Context, _ string, msg map[stri

if len(p.chunks[chunk.AppName]) == p.chunkMetadata[chunk.AppName].TotalChunks {
log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName)
go p.deployAndRunApp(ctx, chunk.AppName)
var wasmBinary []byte
for _, chunk := range p.chunks[chunk.AppName] {
wasmBinary = append(wasmBinary, chunk...)
}
p.wasmBinary = wasmBinary
delete(p.chunks, chunk.AppName)

log.Printf("Binary for app '%s' assembled successfully. Ready to deploy.\n", chunk.AppName)
}

return nil
}

func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) {
log.Printf("Assembling chunks for app '%s'\n", appName)

p.chunksMutex.Lock()
chunks := p.chunks[appName]
delete(p.chunks, appName)
p.chunksMutex.Unlock()

wasmBinary := assembleChunks(chunks)
func (p *PropletService) deployApp(ctx context.Context, appName string) {

Check failure on line 175 in proplet/service.go

View workflow job for this annotation

GitHub Actions / Lint and Build

func `(*PropletService).deployApp` is unused (unused)
log.Printf("Deploying app '%s'\n", appName)

function, err := p.runtime.StartApp(ctx, appName, wasmBinary, "main")
function, err := p.runtime.StartApp(ctx, appName, p.wasmBinary, "main")
if err != nil {
log.Printf("Failed to start app '%s': %v\n", appName, err)

Expand All @@ -192,15 +192,6 @@ func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) {
log.Printf("App '%s' started successfully\n", appName)
}

func assembleChunks(chunks [][]byte) []byte {
var wasmBinary []byte
for _, chunk := range chunks {
wasmBinary = append(wasmBinary, chunk...)
}

return wasmBinary
}

func (c *ChunkPayload) Validate() error {
if c.AppName == "" {
return errors.New("chunk validation: app_name is required but missing")
Expand Down

0 comments on commit 4473324

Please sign in to comment.