Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds initial controller implementation #71

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,54 @@
package main

import "github.com/envoyproxy/ai-gateway/internal/version"
import (
"context"
"flag"
"log"
"log/slog"
"os"
"os/signal"
"syscall"

"github.com/go-logr/logr"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/envoyproxy/ai-gateway/internal/controller"
)

var (
logLevel = flag.String("logLevel", "info", "log level")
extProcImage = flag.String("extprocImage",
"ghcr.io/envoyproxy/ai-gateway-extproc:latest", "image for the external processor")
)

func main() {
println(version.Version)
flag.Parse()
var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := logr.FromSlogHandler(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))
klog.SetLogger(l)

k8sConfig, err := ctrl.GetConfig()
if err != nil {
log.Fatalf("failed to get k8s config: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: starts the extension server?

if err := controller.StartControllers(ctx, k8sConfig, l, *logLevel, *extProcImage); err != nil {
log.Fatalf("failed to start controller: %v", err)
}
}
33 changes: 21 additions & 12 deletions extprocconfig/extprocconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ import (
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// DefaultConfig is the default configuration for the external processor
// that can be used as a fallback when the configuration is not explicitly provided.
const DefaultConfig = `
inputSchema:
schema: OpenAI
selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
modelNameHeaderKey: x-envoy-ai-gateway-llm-model
`

// Config is the configuration for the external processor.
//
// The configuration is loaded from a file path specified via the command line flag -configPath to the external processor.
Expand All @@ -22,8 +31,8 @@ import (
//
// inputSchema:
// schema: OpenAI
// backendRoutingHeaderKey: x-backend-name
// modelNameHeaderKey: x-model-name
// selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we want to call it selected-backend, the backend may not be necessarily selected by gateway. User can still specify the backend in the request

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, the backend will be "calculated" and "selected" following the matching rule of LLMRoute, so the word "selected" makes sense regardless of who actually "select".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the internal use only, and not user facing, so it should be fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true, you might not be able to select the backend just based on the model.. For example there is case where the same anthropic models are supported by both google and aws. In this case user needs to set the backend header to determine where to route to.

see https://aws.amazon.com/bedrock/claude/ and https://cloud.google.com/solutions/anthropic?hl=en.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait a second, this is nothing to do with model...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say you havve

apiVersion: aigateway.envoyproxy.io/v1alpha1
kind: LLMRoute
metadata:
  name: some-route
  namespace: default
spec:
  inputSchema:
    schema: OpenAI
  rules:
    - matches:
      - headers:
        - type: Exact
          name: some-random-header-that-can-be-sent-directly-by-clients
          value: foo
      backendRefs:
        - name: some-backend-name
......

and clients send curl -H "some-random-header-that-can-be-sent-directly-by-clients: foo" then internally extproc sets the header $selectedBackendHeaderKey: some-backend-name. That's what this does and this is the completely implementation detail. This package is not CRD but a configuration of extproc itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are saying we are not providing the standard ai gateway backend routing header to user, they MUST define the matching rules on LLMRoute for each backend user creates. Is that correct ?

Copy link
Member Author

@mathetake mathetake Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly currently, though we can provide some "canonical-backend-choosing-header-key" defined in the api/v1alpha package and prioritize the header value when present, which effectively ignores LLMRoute.Rules if the header exists. This is another topic we should discuss in another issue/pr if you want to support that. This configuration key is complete implementation detail regardless of whether or not we do that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will raise an issue tomorrow!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what exactly I was thinking to create those rules for user, if this routing header exists then we ignore the rules on llm route. I think this can provide better user experience.

// modelNameHeaderKey: x-envoy-ai-gateway-llm-model
// tokenUsageMetadata:
// namespace: ai_gateway_llm_ns
// key: token_usage_key
Expand All @@ -38,24 +47,24 @@ import (
// outputSchema:
// schema: AWSBedrock
// headers:
// - name: x-model-name
// - name: x-envoy-ai-gateway-llm-model
// value: llama3.3333
// - backends:
// - name: openai
// outputSchema:
// schema: OpenAI
// headers:
// - name: x-model-name
// - name: x-envoy-ai-gateway-llm-model
// value: gpt4.4444
//
// where the input of the external processor is in the OpenAI schema, the model name is populated in the header x-model-name,
// The model name header `x-model-name` is used in the header matching to make the routing decision. **After** the routing decision is made,
// the selected backend name is populated in the header `x-backend-name`. For example, when the model name is `llama3.3333`,
// where the input of the external processor is in the OpenAI schema, the model name is populated in the header x-envoy-ai-gateway-llm-model,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since user can configure the model routing header name, we can say configured modelNameHeaderKey

Copy link
Member Author

@mathetake mathetake Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, no, users do not choose the model routing header name at this point.

LLMModelHeaderKey = "x-envoy-ai-gateway-llm-model"

It should be good to make this part of the LLMRoute resource, but the comment here matches what it is now

// The model name header `x-envoy-ai-gateway-llm-model` is used in the header matching to make the routing decision. **After** the routing decision is made,
// the selected backend name is populated in the header `x-envoy-ai-gateway-selected-backend`. For example, when the model name is `llama3.3333`,
// the request is routed to either backends `kserve` or `awsbedrock` with weights 1 and 10 respectively, and the selected
// backend, say `awsbedrock`, is populated in the header `x-backend-name`.
// backend, say `awsbedrock`, is populated in the header `x-envoy-ai-gateway-selected-backend`.
//
// From Envoy configuration perspective, configuring the header matching based on `x-backend-name` is enough to route the request to the selected backend.
// That is because the matching decision is made by the external processor and the selected backend is populated in the header `x-backend-name`.
// From Envoy configuration perspective, configuring the header matching based on `x-envoy-ai-gateway-selected-backend` is enough to route the request to the selected backend.
// That is because the matching decision is made by the external processor and the selected backend is populated in the header `x-envoy-ai-gateway-selected-backend`.
type Config struct {
// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token, optional.
// If this is provided, the external processor will populate the usage token in the filter metadata at the end of the
Expand All @@ -65,9 +74,9 @@ type Config struct {
InputSchema VersionedAPISchema `yaml:"inputSchema"`
// ModelNameHeaderKey is the header key to be populated with the model name by the external processor.
ModelNameHeaderKey string `yaml:"modelNameHeaderKey"`
// BackendRoutingHeaderKey is the header key to be populated with the backend name by the external processor
// SelectedBackendHeaderKey is the header key to be populated with the backend name by the external processor
// **after** the routing decision is made by the external processor using Rules.
BackendRoutingHeaderKey string `yaml:"backendRoutingHeaderKey"`
SelectedBackendHeaderKey string `yaml:"selectedBackendHeaderKey"`
// Rules is the routing rules to be used by the external processor to make the routing decision.
// Inside the routing rules, the header ModelNameHeaderKey may be used to make the routing decision.
Rules []RouteRule `yaml:"rules"`
Expand Down
34 changes: 26 additions & 8 deletions extprocconfig/extprocconfig_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
package extprocconfig
package extprocconfig_test

import (
"log/slog"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/envoyproxy/ai-gateway/extprocconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc"
)

func TestDefaultConfig(t *testing.T) {
server, err := extproc.NewServer(slog.Default(), extproc.NewProcessor)
require.NoError(t, err)
require.NotNil(t, server)

var cfg extprocconfig.Config
err = yaml.Unmarshal([]byte(extprocconfig.DefaultConfig), &cfg)
require.NoError(t, err)

err = server.LoadConfig(&cfg)
require.NoError(t, err)
}

func TestUnmarshalConfigYaml(t *testing.T) {
configPath := path.Join(t.TempDir(), "config.yaml")
const config = `
inputSchema:
schema: OpenAI
backendRoutingHeaderKey: x-backend-name
modelNameHeaderKey: x-model-name
selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
modelNameHeaderKey: x-envoy-ai-gateway-llm-model
tokenUsageMetadata:
namespace: ai_gateway_llm_ns
key: token_usage_key
Expand All @@ -29,24 +47,24 @@ rules:
outputSchema:
schema: AWSBedrock
headers:
- name: x-model-name
- name: x-envoy-ai-gateway-llm-model
value: llama3.3333
- backends:
- name: openai
outputSchema:
schema: OpenAI
headers:
- name: x-model-name
- name: x-envoy-ai-gateway-llm-model
value: gpt4.4444
`
require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600))
cfg, err := UnmarshalConfigYaml(configPath)
cfg, err := extprocconfig.UnmarshalConfigYaml(configPath)
require.NoError(t, err)
require.Equal(t, "ai_gateway_llm_ns", cfg.TokenUsageMetadata.Namespace)
require.Equal(t, "token_usage_key", cfg.TokenUsageMetadata.Key)
require.Equal(t, "OpenAI", string(cfg.InputSchema.Schema))
require.Equal(t, "x-backend-name", cfg.BackendRoutingHeaderKey)
require.Equal(t, "x-model-name", cfg.ModelNameHeaderKey)
require.Equal(t, "x-envoy-ai-gateway-selected-backend", cfg.SelectedBackendHeaderKey)
require.Equal(t, "x-envoy-ai-gateway-llm-model", cfg.ModelNameHeaderKey)
require.Len(t, cfg.Rules, 2)
require.Equal(t, "llama3.3333", cfg.Rules[0].Headers[0].Value)
require.Equal(t, "gpt4.4444", cfg.Rules[1].Headers[0].Value)
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.28.7
github.com/envoyproxy/gateway v1.2.4
github.com/envoyproxy/go-control-plane/envoy v1.32.2
github.com/go-logr/logr v1.4.2
github.com/google/go-cmp v0.6.0
github.com/openai/openai-go v0.1.0-alpha.43
github.com/stretchr/testify v1.10.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.1
k8s.io/api v0.31.2
k8s.io/apiextensions-apiserver v0.31.2
k8s.io/apimachinery v0.32.0
k8s.io/client-go v0.31.2
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/gateway-api v1.2.1
Expand Down Expand Up @@ -44,7 +49,6 @@ require (
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
Expand Down Expand Up @@ -86,12 +90,9 @@ require (
golang.org/x/time v0.7.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.2 // indirect
k8s.io/apiextensions-apiserver v0.31.2 // indirect
k8s.io/client-go v0.31.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
Expand Down
103 changes: 103 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package controller

import (
"context"
"fmt"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/go-logr/logr"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"

aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1"
)

var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(aigv1a1.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
utilruntime.Must(egv1a1.AddToScheme(scheme))
utilruntime.Must(gwapiv1.Install(scheme))
utilruntime.Must(gwapiv1b1.Install(scheme))
}

func newClients(config *rest.Config) (kubeClient client.Client, kube kubernetes.Interface, err error) {
kubeClient, err = client.New(config, client.Options{Scheme: scheme})
if err != nil {
return nil, nil, fmt.Errorf("failed to create new client: %w", err)
}

kube, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
return kubeClient, kube, nil
}

// StartControllers starts the controllers for the AI Gateway.
// This blocks until the manager is stopped.
func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logger, logLevel string, extProcImage string) error {
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "envoy-ai-gateway-controller",
})
if err != nil {
return fmt.Errorf("failed to create new controller manager: %w", err)
}

clientForRouteC, kubeForRouteC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

sinkChan := make(chan configSinkEvent, 100)
routeC := newLLMRouteController(clientForRouteC, kubeForRouteC, logger, logLevel, extProcImage, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMRoute{}).
Complete(routeC); err != nil {
return fmt.Errorf("failed to create controller for LLMRoute: %w", err)
}

clientForBackendC, kubeForBackendC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

backendC := newLLMBackendController(clientForBackendC, kubeForBackendC, logger, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMBackend{}).
Complete(backendC); err != nil {
return fmt.Errorf("failed to create controller for LLMBackend: %w", err)
}

clientForConfigSink, kubeForConfigSink, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

sink := newConfigSink(clientForConfigSink, kubeForConfigSink, logger, sinkChan)

// Wait for the manager to become the leader before starting the controllers.
<-mgr.Elected()

// Before starting the manager, initialize the config sink to sync all LLMBackend and LLMRoute objects in the cluster.
if err = sink.init(ctx); err != nil {
return fmt.Errorf("failed to initialize config sink: %w", err)
}

if err = mgr.Start(ctx); err != nil { // This blocks until the manager is stopped.
return fmt.Errorf("failed to start controller manager: %w", err)
}
return nil
}
49 changes: 49 additions & 0 deletions internal/controller/llmbackend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package controller

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1"
)

// llmBackendController implements [reconcile.TypedReconciler] for [aigv1a1.LLMBackend].
//
// This handles the LLMBackend resource and sends it to the config sink so that it can modify the configuration together with the state of other resources.
type llmBackendController struct {
client client.Client
kube kubernetes.Interface
logger logr.Logger
eventChan chan configSinkEvent
}

func newLLMBackendController(client client.Client, kube kubernetes.Interface, logger logr.Logger, ch chan configSinkEvent) *llmBackendController {
return &llmBackendController{
client: client,
kube: kube,
logger: logger,
eventChan: ch,
}
}

// Reconcile implements the [reconcile.TypedReconciler] for [aigv1a1.LLMBackend].
func (l *llmBackendController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var llmBackend aigv1a1.LLMBackend
if err := l.client.Get(ctx, req.NamespacedName, &llmBackend); err != nil {
if client.IgnoreNotFound(err) == nil {
l.eventChan <- configSinkEventLLMBackendDeleted{namespace: req.Namespace, name: req.Name}
ctrl.Log.Info("Deleting LLMBackend",
"namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Send the LLMBackend to the config sink so that it can modify the configuration together with the state of other resources.
l.eventChan <- llmBackend.DeepCopy()
return ctrl.Result{}, nil
}
Loading
Loading