Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extproc: refactors flags and adds test #247

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ func parseAndValidateFlags(args []string) (
extensionServerPort string,
err error,
) {
// Create a new FlagSet, rather than using the global flag.CommandLine.
fs := flag.NewFlagSet("my-service-flags", flag.ContinueOnError)
fs := flag.NewFlagSet("AI Gateway Controller", flag.ContinueOnError)

// Define flags within this FlagSet.
extProcLogLevelPtr := fs.String(
"extProcLogLevel",
"info",
Expand All @@ -58,13 +56,11 @@ func parseAndValidateFlags(args []string) (
"gRPC port for the extension server",
)

// Parse the passed-in arguments.
if err = fs.Parse(args); err != nil {
err = fmt.Errorf("failed to parse flags: %w", err)
return
}

// Validate log levels by trying to unmarshal them.
var slogLevel slog.Level
if err = slogLevel.UnmarshalText([]byte(*extProcLogLevelPtr)); err != nil {
err = fmt.Errorf("invalid external processor log level: %q", *extProcLogLevelPtr)
Expand Down
3 changes: 1 addition & 2 deletions cmd/controller/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (

func Test_parseAndValidateFlags(t *testing.T) {
t.Run("no flags", func(t *testing.T) {
args := []string{}
extProcLogLevel, extProcImage, enableLeaderElection, logLevel, extensionServerPort, err := parseAndValidateFlags(args)
extProcLogLevel, extProcImage, enableLeaderElection, logLevel, extensionServerPort, err := parseAndValidateFlags([]string{})
require.Equal(t, "info", extProcLogLevel)
require.Equal(t, "ghcr.io/envoyproxy/ai-gateway/extproc:latest", extProcImage)
require.True(t, enableLeaderElection)
Expand Down
76 changes: 48 additions & 28 deletions cmd/extproc/mainlib/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mainlib
import (
"context"
"flag"
"fmt"
"log"
"log/slog"
"net"
Expand All @@ -20,38 +21,60 @@ import (
"github.com/envoyproxy/ai-gateway/internal/version"
)

const (
defaultAddress = ":1063"
defaultLogLevel = "info"
)
// parseAndValidateFlags parses and validates the flags passed to the external processor.
func parseAndValidateFlags(args []string) (configPath, addr string, logLevel slog.Level, err error) {
fs := flag.NewFlagSet("AI Gateway External Processor", flag.ContinueOnError)
configPathPtr := fs.String(
"configPath",
"",
"path to the configuration file. The file must be in YAML format specified in filterapi.Config type. "+
"The configuration file is watched for changes.",
)
extProcAddrPtr := fs.String(
"extProcAddr",
":1063",
"gRPC address for the external processor. For example, :1063 or unix:///tmp/ext_proc.sock",
)
logLevelPtr := fs.String(
"logLevel",
"info", "log level for the external processor. One of 'debug', 'info', 'warn', or 'error'.",
)

var (
configPath = flag.String("configPath", "", "path to the configuration file. "+
"The file must be in YAML format specified in filterconfig.Config type. The configuration file is watched for changes.")
extProcAddr = flag.String("extProcAddr", defaultAddress, "gRPC address for the external processor")
logLevel = flag.String("logLevel", defaultLogLevel, "log level")
)
if err = fs.Parse(args); err != nil {
err = fmt.Errorf("failed to parse flags: %w", err)
return
}

if *configPathPtr == "" {
err = fmt.Errorf("configPath must be provided")
return
}

if err = logLevel.UnmarshalText([]byte(*logLevelPtr)); err != nil {
err = fmt.Errorf("failed to unmarshal log level: %w", err)
return
}

configPath = *configPathPtr
addr = *extProcAddrPtr
return
}

// Main is a main function for the external processor exposed
// for allowing users to build their own external processor.
func Main() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
configPath, extProcAddr, level, err := parseAndValidateFlags(os.Args[1:])
if err != nil {
log.Fatalf("failed to parse and validate flags: %v", err)
}
l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))

l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}))

l.Info("starting external processor",
slog.String("version", version.Version),
slog.String("address", *extProcAddr))

if *configPath == "" {
log.Fatal("configPath must be provided")
}
slog.String("address", extProcAddr),
slog.String("configPath", configPath),
)

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
Expand All @@ -61,7 +84,7 @@ func Main() {
cancel()
}()

lis, err := net.Listen(listenAddress(*extProcAddr))
lis, err := net.Listen(listenAddress(extProcAddr))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand All @@ -71,7 +94,7 @@ func Main() {
log.Fatalf("failed to create external processor server: %v", err)
}

if err := extproc.StartConfigWatcher(ctx, *configPath, server, l, time.Second*5); err != nil {
if err := extproc.StartConfigWatcher(ctx, configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
}

Expand All @@ -87,9 +110,6 @@ func Main() {

// listenAddress returns the network and address for the given address flag.
func listenAddress(addrFlag string) (string, string) {
if addrFlag == "" {
return "tcp", defaultAddress
}
if strings.HasPrefix(addrFlag, "unix://") {
return "unix", strings.TrimPrefix(addrFlag, "unix://")
}
Expand Down
88 changes: 87 additions & 1 deletion cmd/extproc/mainlib/main_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,104 @@
package mainlib

import (
"log/slog"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_parseAndValidateFlags(t *testing.T) {
t.Run("ok flags", func(t *testing.T) {
for _, tc := range []struct {
name string
args []string
configPath string
addr string
logLevel slog.Level
}{
{
name: "minimal flags",
args: []string{"-configPath", "/path/to/config.yaml"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelInfo,
},
{
name: "custom addr",
args: []string{"-configPath", "/path/to/config.yaml", "-extProcAddr", "unix:///tmp/ext_proc.sock"},
configPath: "/path/to/config.yaml",
addr: "unix:///tmp/ext_proc.sock",
logLevel: slog.LevelInfo,
},
{
name: "log level debug",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "debug"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelDebug,
},
{
name: "log level warn",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "warn"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelWarn,
},
{
name: "log level error",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "error"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelError,
},
{
name: "all flags",
args: []string{"-configPath", "/path/to/config.yaml", "-extProcAddr", "unix:///tmp/ext_proc.sock", "-logLevel", "debug"},
configPath: "/path/to/config.yaml",
addr: "unix:///tmp/ext_proc.sock",
logLevel: slog.LevelDebug,
},
} {
t.Run(tc.name, func(t *testing.T) {
configPath, addr, logLevel, err := parseAndValidateFlags(tc.args)
assert.Equal(t, tc.configPath, configPath)
assert.Equal(t, tc.addr, addr)
assert.Equal(t, tc.logLevel, logLevel)
assert.NoError(t, err)
})
}
})
t.Run("invalid flags", func(t *testing.T) {
for _, tc := range []struct {
name string
flags []string
expErr string
}{
{
name: "missing configPath",
flags: []string{"-extProcAddr", ":1063"},
expErr: "configPath must be provided",
},
{
name: "invalid logLevel",
flags: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "invalid"},
expErr: `failed to unmarshal log level: slog: level string "invalid": unknown name`,
},
} {
t.Run(tc.name, func(t *testing.T) {
_, _, _, err := parseAndValidateFlags(tc.flags)
assert.EqualError(t, err, tc.expErr)
})
}
})
}

func TestListenAddress(t *testing.T) {
tests := []struct {
addr string
wantNetwork string
wantAddress string
}{
{"", "tcp", defaultAddress},
{":8080", "tcp", ":8080"},
{"unix:///var/run/ai-gateway/extproc.sock", "unix", "/var/run/ai-gateway/extproc.sock"},
}
Expand Down
2 changes: 1 addition & 1 deletion examples/extproc_custom_router/README.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This example shows how to insert a custom router in the custom external process using `filterconfig` package.
This example shows how to insert a custom router in the custom external process using `filterapi` package.