Skip to content

Commit

Permalink
Adds initial controller implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Takeshi Yoneda <t.y.mathetake@gmail.com>
  • Loading branch information
mathetake committed Jan 8, 2025
1 parent cb6b2e0 commit 86ab274
Show file tree
Hide file tree
Showing 17 changed files with 1,457 additions and 50 deletions.
51 changes: 49 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,54 @@
package main

import "github.com/envoyproxy/ai-gateway/internal/version"
import (
"context"
"flag"
"log"
"log/slog"
"os"
"os/signal"
"syscall"

"github.com/go-logr/logr"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/envoyproxy/ai-gateway/internal/controller"
)

var (
logLevel = flag.String("logLevel", "info", "log level")
extProcImage = flag.String("extprocImage",
"ghcr.io/envoyproxy/ai-gateway-extproc:latest", "image for the external processor")
)

func main() {
println(version.Version)
flag.Parse()
var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := logr.FromSlogHandler(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))
klog.SetLogger(l)

k8sConfig, err := ctrl.GetConfig()
if err != nil {
log.Fatalf("failed to get k8s config: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: starts the extension server?

if err := controller.StartControllers(ctx, k8sConfig, l, *logLevel, *extProcImage); err != nil {
log.Fatalf("failed to start controller: %v", err)
}
}
33 changes: 21 additions & 12 deletions extprocconfig/extprocconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ import (
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// DefaultConfig is the default configuration for the external processor
// that can be used as a fallback when the configuration is not explicitly provided.
const DefaultConfig = `
inputSchema:
schema: OpenAI
selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
modelNameHeaderKey: x-envoy-ai-gateway-llm-model
`

// Config is the configuration for the external processor.
//
// The configuration is loaded from a file path specified via the command line flag -configPath to the external processor.
Expand All @@ -22,8 +31,8 @@ import (
//
// inputSchema:
// schema: OpenAI
// backendRoutingHeaderKey: x-backend-name
// modelNameHeaderKey: x-model-name
// selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
// modelNameHeaderKey: x-envoy-ai-gateway-llm-model
// tokenUsageMetadata:
// namespace: ai_gateway_llm_ns
// key: token_usage_key
Expand All @@ -38,24 +47,24 @@ import (
// outputSchema:
// schema: AWSBedrock
// headers:
// - name: x-model-name
// - name: x-envoy-ai-gateway-llm-model
// value: llama3.3333
// - backends:
// - name: openai
// outputSchema:
// schema: OpenAI
// headers:
// - name: x-model-name
// - name: x-envoy-ai-gateway-llm-model
// value: gpt4.4444
//
// where the input of the external processor is in the OpenAI schema, the model name is populated in the header x-model-name,
// The model name header `x-model-name` is used in the header matching to make the routing decision. **After** the routing decision is made,
// the selected backend name is populated in the header `x-backend-name`. For example, when the model name is `llama3.3333`,
// where the input of the external processor is in the OpenAI schema, the model name is populated in the header x-envoy-ai-gateway-llm-model,
// The model name header `x-envoy-ai-gateway-llm-model` is used in the header matching to make the routing decision. **After** the routing decision is made,
// the selected backend name is populated in the header `x-envoy-ai-gateway-selected-backend`. For example, when the model name is `llama3.3333`,
// the request is routed to either backends `kserve` or `awsbedrock` with weights 1 and 10 respectively, and the selected
// backend, say `awsbedrock`, is populated in the header `x-backend-name`.
// backend, say `awsbedrock`, is populated in the header `x-envoy-ai-gateway-selected-backend`.
//
// From Envoy configuration perspective, configuring the header matching based on `x-backend-name` is enough to route the request to the selected backend.
// That is because the matching decision is made by the external processor and the selected backend is populated in the header `x-backend-name`.
// From Envoy configuration perspective, configuring the header matching based on `x-envoy-ai-gateway-selected-backend` is enough to route the request to the selected backend.
// That is because the matching decision is made by the external processor and the selected backend is populated in the header `x-envoy-ai-gateway-selected-backend`.
type Config struct {
// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token, optional.
// If this is provided, the external processor will populate the usage token in the filter metadata at the end of the
Expand All @@ -65,9 +74,9 @@ type Config struct {
InputSchema VersionedAPISchema `yaml:"inputSchema"`
// ModelNameHeaderKey is the header key to be populated with the model name by the external processor.
ModelNameHeaderKey string `yaml:"modelNameHeaderKey"`
// BackendRoutingHeaderKey is the header key to be populated with the backend name by the external processor
// SelectedBackendHeaderKey is the header key to be populated with the backend name by the external processor
// **after** the routing decision is made by the external processor using Rules.
BackendRoutingHeaderKey string `yaml:"backendRoutingHeaderKey"`
SelectedBackendHeaderKey string `yaml:"selectedBackendHeaderKey"`
// Rules is the routing rules to be used by the external processor to make the routing decision.
// Inside the routing rules, the header ModelNameHeaderKey may be used to make the routing decision.
Rules []RouteRule `yaml:"rules"`
Expand Down
34 changes: 26 additions & 8 deletions extprocconfig/extprocconfig_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
package extprocconfig
package extprocconfig_test

import (
"log/slog"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/envoyproxy/ai-gateway/extprocconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc"
)

func TestDefaultConfig(t *testing.T) {
server, err := extproc.NewServer(slog.Default(), extproc.NewProcessor)
require.NoError(t, err)
require.NotNil(t, server)

var cfg extprocconfig.Config
err = yaml.Unmarshal([]byte(extprocconfig.DefaultConfig), &cfg)
require.NoError(t, err)

err = server.LoadConfig(&cfg)
require.NoError(t, err)
}

func TestUnmarshalConfigYaml(t *testing.T) {
configPath := path.Join(t.TempDir(), "config.yaml")
const config = `
inputSchema:
schema: OpenAI
backendRoutingHeaderKey: x-backend-name
modelNameHeaderKey: x-model-name
selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
modelNameHeaderKey: x-envoy-ai-gateway-llm-model
tokenUsageMetadata:
namespace: ai_gateway_llm_ns
key: token_usage_key
Expand All @@ -29,24 +47,24 @@ rules:
outputSchema:
schema: AWSBedrock
headers:
- name: x-model-name
- name: x-envoy-ai-gateway-llm-model
value: llama3.3333
- backends:
- name: openai
outputSchema:
schema: OpenAI
headers:
- name: x-model-name
- name: x-envoy-ai-gateway-llm-model
value: gpt4.4444
`
require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600))
cfg, err := UnmarshalConfigYaml(configPath)
cfg, err := extprocconfig.UnmarshalConfigYaml(configPath)
require.NoError(t, err)
require.Equal(t, "ai_gateway_llm_ns", cfg.TokenUsageMetadata.Namespace)
require.Equal(t, "token_usage_key", cfg.TokenUsageMetadata.Key)
require.Equal(t, "OpenAI", string(cfg.InputSchema.Schema))
require.Equal(t, "x-backend-name", cfg.BackendRoutingHeaderKey)
require.Equal(t, "x-model-name", cfg.ModelNameHeaderKey)
require.Equal(t, "x-envoy-ai-gateway-selected-backend", cfg.SelectedBackendHeaderKey)
require.Equal(t, "x-envoy-ai-gateway-llm-model", cfg.ModelNameHeaderKey)
require.Len(t, cfg.Rules, 2)
require.Equal(t, "llama3.3333", cfg.Rules[0].Headers[0].Value)
require.Equal(t, "gpt4.4444", cfg.Rules[1].Headers[0].Value)
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.28.7
github.com/envoyproxy/gateway v1.2.4
github.com/envoyproxy/go-control-plane/envoy v1.32.2
github.com/go-logr/logr v1.4.2
github.com/google/go-cmp v0.6.0
github.com/openai/openai-go v0.1.0-alpha.43
github.com/stretchr/testify v1.10.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.1
k8s.io/api v0.31.2
k8s.io/apiextensions-apiserver v0.31.2
k8s.io/apimachinery v0.32.0
k8s.io/client-go v0.31.2
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/gateway-api v1.2.1
Expand Down Expand Up @@ -44,7 +49,6 @@ require (
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
Expand Down Expand Up @@ -86,12 +90,9 @@ require (
golang.org/x/time v0.7.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.2 // indirect
k8s.io/apiextensions-apiserver v0.31.2 // indirect
k8s.io/client-go v0.31.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
Expand Down
103 changes: 103 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package controller

import (
"context"
"fmt"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/go-logr/logr"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"

aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1"
)

var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(aigv1a1.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
utilruntime.Must(egv1a1.AddToScheme(scheme))
utilruntime.Must(gwapiv1.Install(scheme))
utilruntime.Must(gwapiv1b1.Install(scheme))
}

func newClients(config *rest.Config) (kubeClient client.Client, kube kubernetes.Interface, err error) {
kubeClient, err = client.New(config, client.Options{Scheme: scheme})
if err != nil {
return nil, nil, fmt.Errorf("failed to create new client: %w", err)
}

kube, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
return kubeClient, kube, nil
}

// StartControllers starts the controllers for the AI Gateway.
// This blocks until the manager is stopped.
func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logger, logLevel string, extProcImage string) error {
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "envoy-ai-gateway-controller",
})
if err != nil {
return fmt.Errorf("failed to create new controller manager: %w", err)
}

clientForRouteC, kubeForRouteC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

sinkChan := make(chan configSinkEvent, 100)
routeC := newLLMRouteController(clientForRouteC, kubeForRouteC, logger, logLevel, extProcImage, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMRoute{}).
Complete(routeC); err != nil {
return fmt.Errorf("failed to create controller for LLMRoute: %w", err)
}

clientForBackendC, kubeForBackendC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

backendC := newLLMBackendController(clientForBackendC, kubeForBackendC, logger, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMBackend{}).
Complete(backendC); err != nil {
return fmt.Errorf("failed to create controller for LLMBackend: %w", err)
}

clientForConfigSink, kubeForConfigSink, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

sink := newConfigSink(clientForConfigSink, kubeForConfigSink, logger, sinkChan)

// Wait for the manager to become the leader before starting the controllers.
<-mgr.Elected()

// Before starting the manager, initialize the config sink to sync all LLMBackend and LLMRoute objects in the cluster.
if err = sink.init(ctx); err != nil {
return fmt.Errorf("failed to initialize config sink: %w", err)
}

if err = mgr.Start(ctx); err != nil { // This blocks until the manager is stopped.
return fmt.Errorf("failed to start controller manager: %w", err)
}
return nil
}
49 changes: 49 additions & 0 deletions internal/controller/llmbackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package controller

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1"
)

// llmBackendController implements [reconcile.TypedReconciler] for [aigv1a1.LLMBackend].
//
// This handles the LLMBackend resource and sends it to the config sink so that it can modify the configuration together with the state of other resources.
type llmBackendController struct {
client client.Client
kube kubernetes.Interface
logger logr.Logger
eventChan chan configSinkEvent
}

func newLLMBackendController(client client.Client, kube kubernetes.Interface, logger logr.Logger, ch chan configSinkEvent) *llmBackendController {
return &llmBackendController{
client: client,
kube: kube,
logger: logger,
eventChan: ch,
}
}

// Reconcile implements the [reconcile.TypedReconciler] for [aigv1a1.LLMBackend].
func (l *llmBackendController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var llmBackend aigv1a1.LLMBackend
if err := l.client.Get(ctx, req.NamespacedName, &llmBackend); err != nil {
if client.IgnoreNotFound(err) == nil {
l.eventChan <- configSinkEventLLMBackendDeleted{namespace: req.Namespace, name: req.Name}
ctrl.Log.Info("Deleting LLMBackend",
"namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Send the LLMBackend to the config sink so that it can modify the configuration together with the state of other resources.
l.eventChan <- llmBackend.DeepCopy()
return ctrl.Result{}, nil
}
Loading

0 comments on commit 86ab274

Please sign in to comment.