Skip to content

Commit

Permalink
extproc: sets token usage into filter metadata (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathetake authored Jan 3, 2025
1 parent 282587e commit 1390554
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 12 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ test-cel: envtest apigen format
KUBEBUILDER_ASSETS="$$($(ENVTEST) use $$k8sVersion -p path)" \
go test ./tests/cel-validation --tags celvalidation -count=1; \
done

# This runs the end-to-end tests for extproc without controller or k8s at all.
# It is useful for the fast iteration of the extproc code.
.PHONY: test-extproc-e2e # This requires the extproc binary to be built.
Expand Down
22 changes: 21 additions & 1 deletion extprocconfig/extprocconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// depending on the Envoy Gateway as well as it can be used outside the Envoy AI Gateway.
//
// This configuration must be decoupled from the Envoy Gateway types as well as its implementation
// details.
// details. Also, the configuration must not be tied with k8s so it can be tested and iterated
// without the need for the k8s cluster.
package extprocconfig

import (
Expand All @@ -23,6 +24,9 @@ import (
// schema: OpenAI
// backendRoutingHeaderKey: x-backend-name
// modelNameHeaderKey: x-model-name
// tokenUsageMetadata:
// namespace: ai_gateway_llm_ns
// key: token_usage_key
// rules:
// - backends:
// - name: kserve
Expand Down Expand Up @@ -53,6 +57,10 @@ import (
// From Envoy configuration perspective, configuring the header matching based on `x-backend-name` is enough to route the request to the selected backend.
// That is because the matching decision is made by the external processor and the selected backend is populated in the header `x-backend-name`.
type Config struct {
// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token, optional.
// If this is provided, the external processor will populate the usage token in the filter metadata at the end of the
// response body processing.
TokenUsageMetadata *TokenUsageMetadata `yaml:"tokenUsageMetadata,omitempty"`
// InputSchema specifies the API schema of the input format of requests to the external processor.
InputSchema VersionedAPISchema `yaml:"inputSchema"`
// ModelNameHeaderKey is the header key to be populated with the model name by the external processor.
Expand All @@ -65,6 +73,18 @@ type Config struct {
Rules []RouteRule `yaml:"rules"`
}

// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token.
// This can be used to subtract the usage token from the usage quota in the rate limit filter when
// the request completes combined with `apply_on_stream_done` and `hits_addend` fields of
// the rate limit configuration https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto#config-route-v3-ratelimit
// which is introduced in Envoy 1.33 (to be released soon as of writing).
type TokenUsageMetadata struct {
// Namespace is the namespace of the metadata.
Namespace string `yaml:"namespace"`
// Key is the key of the metadata.
Key string `yaml:"key"`
}

// VersionedAPISchema corresponds to LLMAPISchema in api/v1alpha1/api.go.
type VersionedAPISchema struct {
// Schema is the API schema.
Expand Down
5 changes: 5 additions & 0 deletions extprocconfig/extprocconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ inputSchema:
schema: OpenAI
backendRoutingHeaderKey: x-backend-name
modelNameHeaderKey: x-model-name
tokenUsageMetadata:
namespace: ai_gateway_llm_ns
key: token_usage_key
rules:
- backends:
- name: kserve
Expand All @@ -39,6 +42,8 @@ rules:
require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600))
cfg, err := UnmarshalConfigYaml(configPath)
require.NoError(t, err)
require.Equal(t, "ai_gateway_llm_ns", cfg.TokenUsageMetadata.Namespace)
require.Equal(t, "token_usage_key", cfg.TokenUsageMetadata.Key)
require.Equal(t, "OpenAI", string(cfg.InputSchema.Schema))
require.Equal(t, "x-backend-name", cfg.BackendRoutingHeaderKey)
require.Equal(t, "x-model-name", cfg.ModelNameHeaderKey)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/stretchr/testify v1.10.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.35.2
k8s.io/apimachinery v0.32.0
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/gateway-api v1.2.1
Expand Down Expand Up @@ -84,7 +85,6 @@ require (
golang.org/x/time v0.7.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.2 // indirect
Expand Down
3 changes: 2 additions & 1 deletion internal/extproc/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type mockTranslator struct {
retHeaderMutation *extprocv3.HeaderMutation
retBodyMutation *extprocv3.BodyMutation
retOverride *extprocv3http.ProcessingMode
retUsedToken uint32
retErr error
}

Expand All @@ -90,7 +91,7 @@ func (m mockTranslator) ResponseBody(body io.Reader, _ bool) (headerMutation *ex
require.NoError(m.t, err)
require.Equal(m.t, m.expResponseBody.Body, buf)
}
return m.retHeaderMutation, m.retBodyMutation, usedToken, m.retErr
return m.retHeaderMutation, m.retBodyMutation, m.retUsedToken, m.retErr
}

// mockRouter implements [router.Router] for testing.
Expand Down
24 changes: 21 additions & 3 deletions internal/extproc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/protobuf/types/known/structpb"

"github.com/envoyproxy/ai-gateway/extprocconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/backendauth"
Expand All @@ -25,6 +26,7 @@ type processorConfig struct {
ModelNameHeaderKey, backendRoutingHeaderKey string
factories map[extprocconfig.VersionedAPISchema]translator.Factory
backendAuthHandlers map[string]backendauth.Handler
tokenUsageMetadata *extprocconfig.TokenUsageMetadata
}

// ProcessorIface is the interface for the processor.
Expand Down Expand Up @@ -157,9 +159,6 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB
return nil, fmt.Errorf("failed to transform response: %w", err)
}

// TODO: set the used token in metadata.
_ = usedToken

resp := &extprocv3.ProcessingResponse{
Response: &extprocv3.ProcessingResponse_ResponseBody{
ResponseBody: &extprocv3.BodyResponse{
Expand All @@ -170,9 +169,28 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB
},
},
}
if p.config.tokenUsageMetadata != nil {
resp.DynamicMetadata = buildTokenUsageDynamicMetadata(p.config.tokenUsageMetadata, usedToken)
}
return resp, nil
}

func buildTokenUsageDynamicMetadata(md *extprocconfig.TokenUsageMetadata, usage uint32) *structpb.Struct {
return &structpb.Struct{
Fields: map[string]*structpb.Value{
md.Namespace: {
Kind: &structpb.Value_StructValue{
StructValue: &structpb.Struct{
Fields: map[string]*structpb.Value{
md.Key: {Kind: &structpb.Value_NumberValue{NumberValue: float64(usage)}},
},
},
},
},
},
}
}

// headersToMap converts a [corev3.HeaderMap] to a Go map for easier processing.
func headersToMap(headers *corev3.HeaderMap) map[string]string {
// TODO: handle multiple headers with the same key.
Expand Down
10 changes: 8 additions & 2 deletions internal/extproc/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,19 @@ func TestProcessor_ProcessResponseBody(t *testing.T) {
inBody := &extprocv3.HttpBody{Body: []byte("some-body")}
expBodyMut := &extprocv3.BodyMutation{}
expHeadMut := &extprocv3.HeaderMutation{}
mt := &mockTranslator{t: t, expResponseBody: inBody, retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut}
p := &Processor{translator: mt}
mt := &mockTranslator{t: t, expResponseBody: inBody, retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut, retUsedToken: 123}
p := &Processor{translator: mt, config: &processorConfig{tokenUsageMetadata: &extprocconfig.TokenUsageMetadata{
Namespace: "ai_gateway_llm_ns", Key: "token_usage",
}}}
res, err := p.ProcessResponseBody(context.Background(), inBody)
require.NoError(t, err)
commonRes := res.Response.(*extprocv3.ProcessingResponse_ResponseBody).ResponseBody.Response
require.Equal(t, expBodyMut, commonRes.BodyMutation)
require.Equal(t, expHeadMut, commonRes.HeaderMutation)

md := res.DynamicMetadata
require.NotNil(t, md)
require.Equal(t, float64(123), md.Fields["ai_gateway_llm_ns"].GetStructValue().Fields["token_usage"].GetNumberValue())
})
}

Expand Down
1 change: 1 addition & 0 deletions internal/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (s *Server[P]) LoadConfig(config *extprocconfig.Config) error {
ModelNameHeaderKey: config.ModelNameHeaderKey,
factories: factories,
backendAuthHandlers: backendAuthHandlers,
tokenUsageMetadata: config.TokenUsageMetadata,
}
s.config = newConfig // This is racey, but we don't care.
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/extproc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestServer_LoadConfig(t *testing.T) {
})
t.Run("ok", func(t *testing.T) {
config := &extprocconfig.Config{
TokenUsageMetadata: &extprocconfig.TokenUsageMetadata{Namespace: "ns", Key: "key"},
InputSchema: extprocconfig.VersionedAPISchema{Schema: extprocconfig.APISchemaOpenAI},
BackendRoutingHeaderKey: "x-backend-name",
ModelNameHeaderKey: "x-model-name",
Expand Down Expand Up @@ -70,6 +71,9 @@ func TestServer_LoadConfig(t *testing.T) {
require.NoError(t, err)

require.NotNil(t, s.config)
require.NotNil(t, s.config.tokenUsageMetadata)
require.Equal(t, "ns", s.config.tokenUsageMetadata.Namespace)
require.Equal(t, "key", s.config.tokenUsageMetadata.Key)
require.NotNil(t, s.config.router)
require.NotNil(t, s.config.bodyParser)
require.Equal(t, "x-backend-name", s.config.backendRoutingHeaderKey)
Expand Down
12 changes: 11 additions & 1 deletion tests/extproc/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ static_resources:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
codec_type: auto
access_log:
- name: log_used_token
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: ACCESS_LOG_PATH
json_format:
"used_token": "%DYNAMIC_METADATA(ai_gateway_llm_ns:used_token)%"
route_config:
virtual_hosts:
- name: local_route
Expand Down Expand Up @@ -42,7 +49,6 @@ static_resources:
http_filters:
- name: envoy.filters.http.ext_proc
typed_config:
# https://github.com/envoyproxy/envoy/pull/27263/files
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor
allow_mode_override: true
mutation_rules:
Expand All @@ -55,6 +61,10 @@ static_resources:
grpc_service:
envoy_grpc:
cluster_name: extproc_cluster
metadataOptions:
receivingNamespaces:
untyped:
- ai_gateway_llm_ns
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
Expand Down
50 changes: 47 additions & 3 deletions tests/extproc/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package extproc

import (
"bufio"
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"os"
"os/exec"
Expand Down Expand Up @@ -37,12 +40,17 @@ var (
// - TEST_AWS_SECRET_ACCESS_KEY
// - TEST_OPENAI_API_KEY
//
// The test will be skipped if any of these are not set.
// The test will fail if any of these are not set.
func TestE2E(t *testing.T) {
requireBinaries(t)
requireRunEnvoy(t)
accessLogPath := t.TempDir() + "/access.log"
requireRunEnvoy(t, accessLogPath)
configPath := t.TempDir() + "/extproc-config.yaml"
requireWriteExtProcConfig(t, configPath, &extprocconfig.Config{
TokenUsageMetadata: &extprocconfig.TokenUsageMetadata{
Namespace: "ai_gateway_llm_ns",
Key: "used_token",
},
InputSchema: openAISchema,
// This can be any header key, but it must match the envoy.yaml routing configuration.
BackendRoutingHeaderKey: "x-selected-backend-name",
Expand Down Expand Up @@ -92,6 +100,41 @@ func TestE2E(t *testing.T) {
}
})

// Read all access logs and check if the used token is logged.
// If the used token is set correctly in the metadata, it should be logged in the access log.
t.Run("check-used-token-metadata-access-log", func(t *testing.T) {
// Since the access log might not be written immediately, we wait for the log to be written.
require.Eventually(t, func() bool {
accessLog, err := os.ReadFile(accessLogPath)
require.NoError(t, err)
// This should match the format of the access log in envoy.yaml.
type lineFormat struct {
UsedToken any `json:"used_token"`
}
scanner := bufio.NewScanner(bytes.NewReader(accessLog))
for scanner.Scan() {
line := scanner.Bytes()
var l lineFormat
if err = json.Unmarshal(line, &l); err != nil {
t.Logf("error unmarshalling line: %v", err)
continue
}
t.Logf("line: %s", line)
// The access formatter somehow changed its behavior sometimes between 1.31 and the latest Envoy,
// so we need to check for both float64 and string.
if num, ok := l.UsedToken.(float64); ok && num > 0 {
return true
} else if str, ok := l.UsedToken.(string); ok {
if num, err := strconv.Atoi(str); err == nil && num > 0 {
return true
}
}
t.Log("cannot find used token in line")
}
return false
}, 10*time.Second, 1*time.Second)
})

// TODO: add streaming endpoints.
// TODO: add more tests like updating the config, signal handling, etc.
}
Expand All @@ -115,11 +158,12 @@ func requireExtProc(t *testing.T, configPath string) {
}

// requireRunEnvoy starts the Envoy proxy with the provided configuration.
func requireRunEnvoy(t *testing.T) {
func requireRunEnvoy(t *testing.T, accessLogPath string) {
openAIAPIKey := requireEnvVar(t, "TEST_OPENAI_API_KEY")

tmpDir := t.TempDir()
envoyYaml := strings.Replace(envoyYamlBase, "TEST_OPENAI_API_KEY", openAIAPIKey, 1)
envoyYaml = strings.Replace(envoyYaml, "ACCESS_LOG_PATH", accessLogPath, 1)

// Write the envoy.yaml file.
envoyYamlPath := tmpDir + "/envoy.yaml"
Expand Down

0 comments on commit 1390554

Please sign in to comment.