diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index f13d72669..9af24f5de 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -12,10 +12,12 @@ translate text from English to Spanish. --> **Related Issues/PRs (if applicable)**: + " ]]; then + echo "PR description contains '-->'. Please remove all the comment out lines in the template after carefully reading them." + exit 1 + fi + if [[ ! $TITLE =~ "**Commit Message**:" ]]; then + echo "PR description must begin with '**Commit Message**:'." + exit 1 + fi + + title: + name: Title runs-on: ubuntu-latest steps: - uses: amannn/action-semantic-pull-request@v5 @@ -33,3 +58,8 @@ jobs: controller translator examples + subjectPattern: ^(?![A-Z]).+$ + subjectPatternError: | + The subject "{subject}" found in the pull request title "{title}" + didn't match the configured pattern. Please ensure that the subject + doesn't start with an uppercase character. diff --git a/.github/workflows/style.yaml b/.github/workflows/style.yaml new file mode 100644 index 000000000..f7a9b3d5e --- /dev/null +++ b/.github/workflows/style.yaml @@ -0,0 +1,30 @@ +name: Style +on: + pull_request: + branches: + - main + push: + branches: + - main + +jobs: + style: + if: github.event_name == 'pull_request' || github.event_name == 'push' + name: Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + cache: false + go-version-file: go.mod + - uses: actions/cache@v4 + with: + path: | + ~/.cache/go-build + ~/.cache/golangci-lint + ~/go/pkg/mod + ~/go/bin + key: code-style-check-${{ hashFiles('**/go.mod', '**/go.sum', '**/Makefile') }} + - name: Run code style check + run: make check diff --git a/.github/workflows/commit.yaml b/.github/workflows/tests.yaml similarity index 90% rename from .github/workflows/commit.yaml rename to .github/workflows/tests.yaml index 10dcb78e7..1ebf8db9d 100644 --- a/.github/workflows/commit.yaml +++ b/.github/workflows/tests.yaml @@ -1,11 +1,21 @@ -name: Commit +name: Tests on: pull_request: branches: - main + paths-ignore: + - '**/*.md' + - 'site/**' + - 'netlify.toml' + push: branches: - main + paths-ignore: + - '**/*.md' + - 'site/**' + - 'netlify.toml' + # If the PR is coming from a fork, they are not allowed to access secrets by default. # This even is triggered only if the PR gets labeled with 'safe to test' which can only be added by the maintainers. # Jobs do not use secrets in the workflow will ignore this event. @@ -20,27 +30,6 @@ concurrency: cancel-in-progress: true jobs: - style: - if: github.event_name == 'pull_request' || github.event_name == 'push' - name: Code Style Check - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - cache: false - go-version-file: go.mod - - uses: actions/cache@v4 - with: - path: | - ~/.cache/go-build - ~/.cache/golangci-lint - ~/go/pkg/mod - ~/go/bin - key: code-style-check-${{ hashFiles('**/go.mod', '**/go.sum', '**/Makefile') }} - - name: Run code style check - run: make check - unittest: if: github.event_name == 'pull_request' || github.event_name == 'push' name: Unit Test @@ -193,7 +182,7 @@ jobs: # Docker builds are verified in test_e2e job, so we only need to push the images when the event is a push event. if: github.event_name == 'push' name: Push Docker Images - needs: [style, unittest, test_cel_validation, test_controller, test_extproc, test_e2e] + needs: [unittest, test_cel_validation, test_controller, test_extproc, test_e2e] uses: ./.github/workflows/docker_builds_template.yaml push_helm: diff --git a/Makefile b/Makefile index 9f49d66d6..843253537 100644 --- a/Makefile +++ b/Makefile @@ -34,13 +34,12 @@ help: @echo " test Run the unit tests for the codebase." @echo " test-cel Run the integration tests of CEL validation rules in API definitions with envtest." @echo " This will be needed when changing API definitions." - @echo " test-doctest Run the integration tests for documentation site." @echo " test-extproc Run the integration tests for extproc without controller or k8s at all." @echo " test-controller Run the integration tests for the controller with envtest." @echo " test-e2e Run the end-to-end tests with a local kind cluster." @echo "" @echo "For example, 'make precommit test' should be enough for initial iterations, and later 'make test-cel' etc. for the normal development cycle." - @echo "Note that some cases run by test-e2e, test-extproc, and test-doctest use credentials and these will be skipped when not available." + @echo "Note that some cases run by test-e2e or test-extproc use credentials and these will be skipped when not available." @echo "" @echo "" @@ -48,7 +47,7 @@ help: .PHONY: lint lint: golangci-lint @echo "lint => ./..." - @$(GOLANGCI_LINT) run --build-tags==test_cel_validation,test_controller,test_extproc,test_doctest ./... + @$(GOLANGCI_LINT) run --build-tags==test_cel_validation,test_controller,test_extproc ./... .PHONY: codespell CODESPELL_SKIP := $(shell cat .codespell.skip | tr \\n ',') @@ -163,14 +162,6 @@ test-e2e: kind @echo "Run E2E tests" @go test ./tests/e2e/... $(GO_TEST_ARGS) $(GO_TEST_E2E_ARGS) -tags test_e2e -# This runs the integration tests for the documentation site. -# -# The dependencies for this target depends on the test case being run. -.PHONY: test-doctest -test-doctest: kind - @echo "Run doctest" - @go test ./tests/doctest/... $(GO_TEST_ARGS) $(GO_TEST_E2E_ARGS) -tags test_doctest - # This builds a binary for the given command under the internal/cmd directory. # # Example: diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 9388b7c45..bdea057c0 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -72,7 +72,7 @@ func main() { }() // Start the controller. - if err := controller.StartControllers(ctx, k8sConfig, setupLog, options); err != nil { + if err := controller.StartControllers(ctx, k8sConfig, ctrl.Log.WithName("controller"), options); err != nil { setupLog.Error(err, "failed to start controller") } } diff --git a/docs/proposals/001-ai-gateway-proposal/control_plane.png b/docs/proposals/001-ai-gateway-proposal/control_plane.png new file mode 100644 index 000000000..d6086f6aa Binary files /dev/null and b/docs/proposals/001-ai-gateway-proposal/control_plane.png differ diff --git a/docs/proposals/001-ai-gateway-proposal/data_plane.png b/docs/proposals/001-ai-gateway-proposal/data_plane.png new file mode 100644 index 000000000..6b0df3a71 Binary files /dev/null and b/docs/proposals/001-ai-gateway-proposal/data_plane.png differ diff --git a/docs/proposals/001-ai-gateway-proposal/proposal.md b/docs/proposals/001-ai-gateway-proposal/proposal.md new file mode 100644 index 000000000..7efcf48e6 --- /dev/null +++ b/docs/proposals/001-ai-gateway-proposal/proposal.md @@ -0,0 +1,591 @@ +# Envoy AI Gateway + +## Table of Contents + + + +- [Summary](#summary) +- [Goals](#goals) +- [Non-Goals](#non-goals) +- [Design](#design) + - [Personas](#personas) + - [Axioms](#axioms) + - [AIGatewayRoute](#aigatewayroute) + - [AIServiceBackend](#aiservicebackend) + - [BackendSecurityPolicy](#backendsecuritypolicy) + - [Token Usage based Rate Limiting](#token-usage-rate-limiting) + - [Diagrams](#diagrams) + + + +## Summary +The `Envoy AI Gateway` is to act as a centralized access point for managing and controlling access to various AI models within an organization. +It provides a single interface for developers to interact with different AI Services while ensuring security, governance and observability over AI traffic. + +It introduces new Custom Resource Definitions(CRD) to support the requirements of the `Envoy AI Gateway`: **AIGatewayRoute**, **AIServiceBackend** and **BackendSecurityPolicy**. + +* The `AIGatewayRoute` specifies the schema for the user requests and routing rules to the `AIServiceBackend`s. +* The `AIServiceBackend` defines the AI service backend schema and security policy for various AI Services. This resource is managed by the Inference Platform Admin persona. +* The `BackendSecurityPolicy` defines the authentication policy for upstream AI services using API key or cloud credentials. +* Rate Limiting for LLM workload is based on tokens, we extend `Envoy Gateway` to support generic cost based rate limiting as envoy so far only supports request based rate limiting. + +## Goals +- Documentation of the 0.1 API decisions for posterity. +- Document `Envoy AI Gateway` MVP features: + - Upstream Model Access: Support accessing models from an initial list of AI Services: AWS Bedrock, OpenAI. + - Unified Client Access: Support a unified AI gateway API across AI Services. + - Traffic Management: Monitor and regulate AI service traffic, including token rate limiting by tracking token usages for LLM models. + - Observability: Provide detailed insights into usage patterns, performance and potential issues through logging and metrics collection. + - Policy Enforcement: Allow organizations to set specific rules and guidelines for how AI models can be accessed and used. + + +## Non-Goals + +- non-MVP features +- Routing for LLM serving instances in a Kubernetes cluster + +## Design + +### Personas + +Before diving into the details of the API, descriptions of the personas will help shape the thought process of the API design. + +#### Inference Platform Admin + +The Inference Platform Admin manages the gateway infrastructure necessary to route inference requests to a variety of AI Services. +Including handling Ops for: + - A list of AI Services and supported models. + - AI Services API schema conversion and centralized upstream authentication configurations. + - Traffic policy including rate limiting, fallback resilience between AI Service backends. + +#### Payment Team + +- Reports per user/tenant/model LLM token usage for billing purpose. + +#### Security Team + +- Security team to control the ACL for accessing the models from AI Services. + +### Axioms + +The API design is based on these axioms: + +- This solution should be composable with other Gateway solutions. +- Gateway architecture should be extensible when customization is required. +- The MVP heavily assumes that the requests are sent using the OpenAI spec, but open to the extension in the future. + + +### AIGatewayRoute + +`AIGatewayRoute` defines the unified user request schema and the routing rules to a list of supported `AIServiceBackend`s such as AWS Bedrock, GCP Vertex AI, Azure OpenAI and KServe for self-hosted LLMs. + +- `AIGatewayRoute` serves as a way to define the unified AI Gateway API which allows downstream clients to use a single schema API to interact with multiple `AIServiceBackend`s. +- `AIGatewayRoute`s are defined to route to the `AIServiceBackend`s based on the HTTP header/path matching. The rules are matched in the `Envoy AI Gateway` external proc as the backend needs to be determined for request body transformation and upstream authentication. +The `HTTPRoute` handles upstream routing once backend is selected using the AI gateway routing header. + + +```golang +// AIGatewayRouteSpec details the AIGatewayRoute configuration. +type AIGatewayRouteSpec struct { +// TargetRefs are the names of the Gateway resources this AIGatewayRoute is being attached to. +// +// +kubebuilder:validation:MinItems=1 +// +kubebuilder:validation:MaxItems=128 +TargetRefs []gwapiv1a2.LocalPolicyTargetReferenceWithSectionName `json:"targetRefs"` +// APISchema specifies the API schema of the input that the target Gateway(s) will receive. +// Based on this schema, the ai-gateway will perform the necessary transformation to the +// output schema specified in the selected AIServiceBackend during the routing process. +// +// Currently, the only supported schema is OpenAI as the input schema. +// +// +kubebuilder:validation:Required +// +kubebuilder:validation:XValidation:rule="self.name == 'OpenAI'" +APISchema VersionedAPISchema `json:"schema"` +// Rules is the list of AIGatewayRouteRule that this AIGatewayRoute will match the traffic to. +// Each rule is a subset of the HTTPRoute in the Gateway API (https://gateway-api.sigs.k8s.io/api-types/httproute/). +// +// AI Gateway controller will generate a HTTPRoute based on the configuration given here with the additional +// modifications to achieve the necessary jobs, notably inserting the AI Gateway filter responsible for +// the transformation of the request and response, etc. +// +// In the matching conditions in the AIGatewayRouteRule, `x-ai-eg-model` header is available +// if we want to describe the routing behavior based on the model name. The model name is extracted +// from the request content before the routing decision. +// +// How multiple rules are matched is the same as the Gateway API. See for the details: +// https://gateway-api.sigs.k8s.io/reference/spec/#gateway.networking.k8s.io%2fv1.HTTPRoute +// +// +kubebuilder:validation:Required +// +kubebuilder:validation:MaxItems=128 +Rules []AIGatewayRouteRule `json:"rules"` + +// FilterConfig is the configuration for the AI Gateway filter inserted in the generated HTTPRoute. +// +// An AI Gateway filter is responsible for the transformation of the request and response +// as well as the routing behavior based on the model name extracted from the request content, etc. +// +// Currently, the filter is only implemented as an external process filter, which might be +// extended to other types of filters in the future. See https://github.com/envoyproxy/ai-gateway/issues/90 +FilterConfig *AIGatewayFilterConfig `json:"filterConfig,omitempty"` + +// LLMRequestCosts specifies how to capture the cost of the LLM-related request, notably the token usage. +// The AI Gateway filter will capture each specified number and store it in the Envoy's dynamic +// metadata per HTTP request. The namespaced key is "io.envoy.ai_gateway", +// +// For example, let's say we have the following LLMRequestCosts configuration: +// +// llmRequestCosts: +// - metadataKey: llm_input_token +// type: InputToken +// - metadataKey: llm_output_token +// type: OutputToken +// - metadataKey: llm_total_token +// type: TotalToken +// +// Then, with the following BackendTrafficPolicy of Envoy Gateway, you can have three +// rate limit buckets for each unique x-user-id header value. One bucket is for the input token, +// the other is for the output token, and the last one is for the total token. +// Each bucket will be reduced by the corresponding token usage captured by the AI Gateway filter. +// +// +optional +// +kubebuilder:validation:MaxItems=36 +LLMRequestCosts []LLMRequestCost `json:"llmRequestCosts,omitempty"` +} + +// AIGatewayRouteRule is a rule that defines the routing behavior of the AIGatewayRoute. +type AIGatewayRouteRule struct { +// BackendRefs is the list of AIServiceBackend that this rule will route the traffic to. +// Each backend can have a weight that determines the traffic distribution. +// +// The namespace of each backend is "local", i.e. the same namespace as the AIGatewayRoute. +// +// +optional +// +kubebuilder:validation:MaxItems=128 +BackendRefs []AIGatewayRouteRuleBackendRef `json:"backendRefs,omitempty"` + +// Matches is the list of AIGatewayRouteMatch that this rule will match the traffic to. +// This is a subset of the HTTPRouteMatch in the Gateway API. See for the details: +// https://gateway-api.sigs.k8s.io/reference/spec/#gateway.networking.k8s.io%2fv1.HTTPRouteMatch +// +// +optional +// +kubebuilder:validation:MaxItems=128 +Matches []AIGatewayRouteRuleMatch `json:"matches,omitempty"` +} + +// LLMRequestCost specifies "where" the request cost is stored in the filter metadata as well as +// "how" the cost is calculated. By default, the cost is retrieved from "output token" in the response body. +// +// This can be used to subtract the usage token from the usage quota in the rate limit filter when +// the request completes combined with `apply_on_stream_done` and `hits_addend` fields of +// the rate limit configuration https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto#config-route-v3-ratelimit +// which is introduced in Envoy 1.33 (to be released soon as of writing). +type LLMRequestCost struct { +// MetadataKey is the key of the metadata storing the request cost. +MetadataKey string `json:"metadataKey"` +// Type is the kind of the request cost calculation. +Type LLMRequestCostType `json:"type"` +// CELExpression is the CEL expression to calculate the cost of the request. +// This is not empty when the Type is LLMRequestCostTypeCELExpression. +CELExpression string `json:"celExpression,omitempty"` +} +``` + + +### AIServiceBackend + +`AIServiceBackend` defines the AI service API schema and a reference to the `Envoy Gateway` backend for the target destination. + +- The Gateway routes the traffic to the appropriate `AIServiceBackend` by converting the unified API schema to the AI service API schema. +- The `AIServiceBackend` is attached with the `BackendSecurityPolicy` to perform the upstream authentication. + +```golang +// AIServiceBackendSpec details the AIServiceBackend configuration. +type AIServiceBackendSpec struct { +// APISchema specifies the API schema of the output format of requests from +// Envoy that this AIServiceBackend can accept as incoming requests. +// Based on this schema, the ai-gateway will perform the necessary transformation for +// the pair of AIGatewayRouteSpec.APISchema and AIServiceBackendSpec.APISchema. +// +// This is required to be set. +// +// +kubebuilder:validation:Required +APISchema VersionedAPISchema `json:"schema"` +// BackendRef is the reference to the Backend resource that this AIServiceBackend corresponds to. +// +// A backend can be of either k8s Service or Backend resource of Envoy Gateway. +// +// This is required to be set. +// +// +kubebuilder:validation:Required +BackendRef gwapiv1.BackendObjectReference `json:"backendRef"` + +// BackendSecurityPolicyRef is the name of the BackendSecurityPolicy resources this backend +// is being attached to. +// +// +optional +BackendSecurityPolicyRef *gwapiv1.LocalObjectReference `json:"backendSecurityPolicyRef,omitempty"` +} +``` + +### BackendSecurityPolicy +The `BeckendSecurityPolicy` defines the authentication methods of the upstream AI service. `APIKey` provides a simple authentication method to +authenticate with upstream AI services such as OpenAI or Anthropic. For accessing models via cloud providers such as AWS, GCP, the cloud credential is managed with Kubernetes secrets or exchanged +using OIDC federation. + +```golang +// BackendSecurityPolicySpec specifies authentication rules on access the provider from the Gateway. +// Only one mechanism to access a backend(s) can be specified. +// +// Only one type of BackendSecurityPolicy can be defined. +// +kubebuilder:validation:MaxProperties=2 +type BackendSecurityPolicySpec struct { +// Type specifies the auth mechanism used to access the provider. Currently, only "APIKey", AND "AWSCredentials" are supported. +// +// +kubebuilder:validation:Enum=APIKey;AWSCredentials +Type BackendSecurityPolicyType `json:"type"` + +// APIKey is a mechanism to access a backend(s). The API key will be injected into the Authorization header. +// +// +optional +APIKey *BackendSecurityPolicyAPIKey `json:"apiKey,omitempty"` + +// AWSCredentials is a mechanism to access a backend(s). AWS specific logic will be applied. +// +// +optional +AWSCredentials *BackendSecurityPolicyAWSCredentials `json:"awsCredentials,omitempty"` +} +// BackendSecurityPolicyAWSCredentials contains the supported authentication mechanisms to access aws +type BackendSecurityPolicyAWSCredentials struct { +// Region specifies the AWS region associated with the policy. +// +// +kubebuilder:validation:MinLength=1 +Region string `json:"region"` + +// CredentialsFile specifies the credentials file to use for the AWS provider. +// +// +optional +CredentialsFile *AWSCredentialsFile `json:"credentialsFile,omitempty"` + +// OIDCExchangeToken specifies the oidc configurations used to obtain an oidc token. The oidc token will be +// used to obtain temporary credentials to access AWS. +// +// +optional +OIDCExchangeToken *AWSOIDCExchangeToken `json:"oidcExchangeToken,omitempty"` +} +``` + +### Token Usage Rate Limiting + +AI Gateway project extended the `Envoy Gateway` `BackendTrafficPolicy` with a generic usage based rate limiting in [#4957](https://github.com/envoyproxy/gateway/pull/4957). +For supporting token usage based rate limiting, it reduces the rate limit counter in the response path. Since the reduction happens after the response is complete, the rate limiting is not enforced for the current but the subsequent requests. +The token usages are extracted from the standard token usage fields according to the OpenAI schema in the ext proc `processResponseBody` handler. + +```go +type RateLimitCost struct { + // Request specifies the number to reduce the rate limit counters + // on the request path. If this is not specified, the default behavior + // is to reduce the rate limit counters by 1. + // + // When Envoy receives a request that matches the rule, it tries to reduce the + // rate limit counters by the specified number. If the counter doesn't have + // enough capacity, the request is rate limited. + // + // +optional + // +notImplementedHide + Request *RateLimitCostSpecifier `json:"request,omitempty"` + // Response specifies the number to reduce the rate limit counters + // after the response is sent back to the client or the request stream is closed. + // + // The cost is used to reduce the rate limit counters for the matching requests. + // Since the reduction happens after the request stream is complete, the rate limit + // won't be enforced for the current request, but for the subsequent matching requests. + // + // This is optional and if not specified, the rate limit counters are not reduced + // on the response path. + // + // Currently, this is only supported for HTTP Global Rate Limits. + // + // +optional + // +notImplementedHide + Response *RateLimitCostSpecifier `json:"response,omitempty"` +} +// RateLimitCostSpecifier specifies where the Envoy retrieves the number to reduce the rate limit counters. +// +// +kubebuilder:validation:XValidation:rule="!(has(self.number) && has(self.metadata))",message="only one of number or metadata can be specified" +type RateLimitCostSpecifier struct { +// From specifies where to get the rate limit cost. Currently, only "Number" and "Metadata" are supported. +// +// +kubebuilder:validation:Required +From RateLimitCostFrom `json:"from"` +// Number specifies the fixed usage number to reduce the rate limit counters. +// Using zero can be used to only check the rate limit counters without reducing them. +// +// +optional +// +notImplementedHide +Number *uint64 `json:"number,omitempty"` +// Metadata specifies the per-request metadata to retrieve the usage number from. +// +// +optional +// +notImplementedHide +Metadata *RateLimitCostMetadata `json:"metadata,omitempty"` +} +// RateLimitCostMetadata specifies the filter metadata to retrieve the usage number from. +type RateLimitCostMetadata struct { +// Namespace is the namespace of the dynamic metadata. +// +// +kubebuilder:validation:Required +Namespace string `json:"namespace"` +// Key is the key to retrieve the usage number from the filter metadata. +// +// +kubebuilder:validation:Required +Key string `json:"key"` +} +``` + +```go +/// RateLimitRule defines the semantics for matching attributes +// from the incoming requests, and setting limits for them. +type RateLimitRule struct { +// ClientSelectors holds the list of select conditions to select +// specific clients using attributes from the traffic flow. +// All individual select conditions must hold True for this rule +// and its limit to be applied. +// +// If no client selectors are specified, the rule applies to all traffic of +// the targeted Route. +// +// If the policy targets a Gateway, the rule applies to each Route of the Gateway. +// Please note that each Route has its own rate limit counters. For example, +// if a Gateway has two Routes, and the policy has a rule with limit 10rps, +// each Route will have its own 10rps limit. +// +// +optional +// +kubebuilder:validation:MaxItems=8 +ClientSelectors []RateLimitSelectCondition `json:"clientSelectors,omitempty"` +// Limit holds the rate limit values. +// This limit is applied for traffic flows when the selectors +// compute to True, causing the request to be counted towards the limit. +// The limit is enforced and the request is rate limited, i.e. a response with +// 429 HTTP status code is sent back to the client when +// the selected requests have reached the limit. +Limit RateLimitValue `json:"limit"` +// Cost specifies the cost of requests and responses for the rule. +// +// This is optional and if not specified, the default behavior is to reduce the rate limit counters by 1 on +// the request path and do not reduce the rate limit counters on the response path. +// +// +optional +// +notImplementedHide +Cost *RateLimitCost `json:"cost,omitempty"` +} +``` + +### Yaml Examples + +#### AIGatewayRoute +The routing calculation in done in the `ExtProc` by analyzing the match rules on `AIGatewayRoute` spec to emulate the behavior in order to perform the AI Service specific transformation and authentication before the routing filter is applied, + because it happens at the very end of the filter chain. + +The `AIServiceBackend` rules are specified on the `AIGatewayRoute` based on model header matching, in this example `anthropic.claude-3-5-sonnet` is routed to the AWS Bedrock and `llama-3.3-70b-instruction` is routed to the KServe backend for the self-hosted llama model. +`LLMRequestCost` is specified with the metadata key `llm_total_token` to store the cost of the LLM request. + +```yaml +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIGatewayRoute +metadata: + name: llmroute + namespace: ai-gateway +spec: + targetRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: eg + schema: + name: OpenAI + rules: + - matches: + - headers: + - type: Exact + name: x-ai-eg-model + value: anthropic.claude-3-5-sonnet-20240620-v1:0 + backendRefs: + - name: awsbedrock-backend + - matches: + - headers: + - type: Exact + name: x-ai-eg-model + value: llama-3.3-70b-instruction + backendRefs: + - name: kserve-llama-backend + # The following metadata keys are used to store the costs from the LLM request. + llmRequestCosts: + - metadataKey: llm_total_token + type: TotalToken + filterConfig: + externalProcess: + replicas: 1 +``` + +#### BackendSecurityPolicy +`BackendSecurityPolicy` specifies the API key or credentials that `Envoy AI Gateway` uses to authenticate with the upstream AI service. +In this example API key is used to authenticate with OpenAI service and AWS credential is used to authenticate with AWS Bedrock service. +```yaml +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: BackendSecurityPolicy +metadata: + name: aws-bedrock-credential + namespace: default +spec: + type: AWSCredentials + awsCredentials: + region: us-east-1 + credentialsFile: + secretRef: + name: aws-credential + profile: default +--- +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: BackendSecurityPolicy +metadata: + name: openai-ai-key + namespace: default +spec: + type: APIKey + apiKey: + secretRef: + name: openai-api-key +``` + +#### AIServiceBackend +Based on the gateway routes, we define the AWS Bedrock and KServe `AIServiceBackend` along with the `Envoy Gateway` backend resource using the FQDN for the routing destination. +```yaml +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIServiceBackend +metadata: + name: awsbedrock-backend + namespace: ai-gateway +spec: + schema: + name: "AWSBedrock" + backendRef: + group: "gateway.envoyproxy.io" + kind: "Backend" + name: "llm-bedrock-backend" + BackendSecurityPolicyRef: + group: "aigateway.envoyproxy.io" + kind: "BackendSecurityPolicy" + name: "aws-bedrock-credential" +--- +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIServiceBackend +metadata: + name: kserve-llama-backend + namespace: ai-gateway +spec: + schema: + name: "OpenAI" + backendRef: + group: "gateway.envoyproxy.io" + kind: "Backend" + name: "kserve-llama-backend" + BackendSecurityPolicyRef: + group: "aigateway.envoyproxy.io" + kind: "BackendSecurityPolicy" + name: "openai-ai-key" +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: Backend +metadata: + name: kserve-llama-backend + namespace: ai-gateway +spec: + endpoints: + - fqdn: + hostname: llama-3-3-70b-instruct-vllm.example.com + port: 443 +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: Backend +metadata: + name: llm-bedrock-backend + namespace: ai-gateway +spec: + endpoints: + - fqdn: + hostname: bedrock-runtime.us-east-1.amazonaws.com + port: 443 + +``` + +#### BackendTrafficPolicy + +```yaml +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: BackendTrafficPolicy +metadata: + name: llama-ratelimit +spec: + # Applies the rate limit policy to the gateway. + targetRefs: + - name: eg + kind: Gateway + group: gateway.networking.k8s.io + rateLimit: + type: Global + global: + rules: + - clientSelectors: + - name: x-ai-eg-model + type: exact + value: llama-3.3-70b-instruction + - name: x-user-id + type: Distinct + limit: + # configure the number of allowed token per minute, per user and model + requests: 1000 + unit: Minute + cost: + response: + from: Metadata + metadata: + namespace: "io.envoy.ai_gateway" + key: "llm_total_token" +``` + +## Diagrams +### Control Plane +`Envoy AI Gateway` extends `Envoy Gateway` using an Extension Server. `Envoy Gateway` can be configured to call an external server over gRPC with +the xDS configuration before it is sent to Envoy Proxy. The `Envoy Gateway` extension Server provides a mechanism where `Envoy Gateway` tracks +custom resources and then calls a set of hooks that allow the generated xDS configuration to be modified before it is sent to Envoy Proxy. + +![Data Plane](./control_plane.png) + +AI Gateway ExtProc controller watches the `AIGatewayRoute` resource and perform the follow steps: +- Reconciles the `Envoy Gateway` deployment and creates the extension policy. +- Reconciles the `Envoy Gateway` ext proc deployment and mount the API key or AWS credential secret if the `AIServiceBackend` is AWS. +- Reconciles `AIGatewayRoute` to calculate the backend and generates the `HTTPRoute` resource attaching the upstream host rewrite filter. + +Envoy Gateway controller watches the `BackendTrafficPolicy` to dynamically update the xDS configuration for the rate limiting filter. + +### Data Plane + +Much of this is better explained visually: + +Below is a detailed view how an inference request works on `Envoy AI Gateway`. + +![Data Plane](./data_plane.png) + +This diagram lightly follows the example request for routing to Anthropic claude 3.5 sonnet model on AWS Bedrock. +The flow can be described as: +- The request comes into `Envoy AI Gateway` instances. +- Ext Authorization filter is applied for checking if the user or account is authorized to access the model. +- `Envoy AI Gateway` ExtProc calculates the backend by matching request headers such as model name and inject the routing header `x-ai-eg-selected-backend` for envoy routing filter. +- `Envoy AI Gateway` ExtProc translates the user inference request (OpenAI) to the API schema of the AI service backend. +- AI service authentication policy is applied based on the AI service backend: + - API key is injected to the request headers for the AI Services that supports API keys. + - AWS requests are signed by ExtProc and credentials are injected for AWS Bedrock service authentication. +- Rate limiting filter is applied for request based usage tracking. +- Request is routed by the envoy proxy to the specified or calculated AI service backend. +- Upon receiving the response from the AI service, the token usage limit is reduced by extracting the usage fields of the chat completion response. + - the rate limit is enforced on the subsequent request. + diff --git a/examples/extproc_custom_router/main.go b/examples/extproc_custom_router/main.go index 49ad5120e..56fe9fcfd 100644 --- a/examples/extproc_custom_router/main.go +++ b/examples/extproc_custom_router/main.go @@ -4,25 +4,25 @@ import ( "fmt" "github.com/envoyproxy/ai-gateway/cmd/extproc/mainlib" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" ) -// newCustomRouter implements [extprocapi.NewCustomRouter]. -func newCustomRouter(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router { +// newCustomRouter implements [x.NewCustomRouter]. +func newCustomRouter(defaultRouter x.Router, config *filterapi.Config) x.Router { // You can poke the current configuration of the routes, and the list of backends // specified in the AIGatewayRoute.Rules, etc. return &myCustomRouter{config: config, defaultRouter: defaultRouter} } -// myCustomRouter implements [extprocapi.Router]. +// myCustomRouter implements [filterapi.Router]. type myCustomRouter struct { - config *filterconfig.Config - defaultRouter extprocapi.Router + config *filterapi.Config + defaultRouter x.Router } -// Calculate implements [extprocapi.Router.Calculate]. -func (m *myCustomRouter) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) { +// Calculate implements [x.Router.Calculate]. +func (m *myCustomRouter) Calculate(headers map[string]string) (backend *filterapi.Backend, err error) { // Simply logs the headers and delegates the calculation to the default router. modelName, ok := headers[m.config.ModelNameHeaderKey] if !ok { @@ -35,7 +35,7 @@ func (m *myCustomRouter) Calculate(headers map[string]string) (backend *filterco // This demonstrates how to build a custom router for the external processor. func main() { // Initializes the custom router. - extprocapi.NewCustomRouter = newCustomRouter + x.NewCustomRouter = newCustomRouter // Executes the main function of the external processor. mainlib.Main() } diff --git a/filterconfig/filterconfig.go b/filterapi/filterconfig.go similarity index 98% rename from filterconfig/filterconfig.go rename to filterapi/filterconfig.go index 87ecb1227..a74d3afa1 100644 --- a/filterconfig/filterconfig.go +++ b/filterapi/filterconfig.go @@ -1,4 +1,4 @@ -// Package filterconfig provides the configuration for the AI Gateway-implemented filter +// Package filterapi provides the configuration for the AI Gateway-implemented filter // which is currently an external processor (See https://github.com/envoyproxy/ai-gateway/issues/90). // // This is a public package so that the filter can be testable without @@ -7,7 +7,7 @@ // This configuration must be decoupled from the Envoy Gateway types as well as its implementation // details. Also, the configuration must not be tied with k8s so it can be tested and iterated // without the need for the k8s cluster. -package filterconfig +package filterapi import ( "os" diff --git a/filterconfig/filterconfig_test.go b/filterapi/filterconfig_test.go similarity index 91% rename from filterconfig/filterconfig_test.go rename to filterapi/filterconfig_test.go index b55acb580..aca35fe17 100644 --- a/filterconfig/filterconfig_test.go +++ b/filterapi/filterconfig_test.go @@ -1,4 +1,4 @@ -package filterconfig_test +package filterapi_test import ( "log/slog" @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/yaml" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/extproc" ) @@ -18,8 +18,8 @@ func TestDefaultConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, server) - var cfg filterconfig.Config - err = yaml.Unmarshal([]byte(filterconfig.DefaultConfig), &cfg) + var cfg filterapi.Config + err = yaml.Unmarshal([]byte(filterapi.DefaultConfig), &cfg) require.NoError(t, err) err = server.LoadConfig(&cfg) @@ -66,7 +66,7 @@ rules: value: gpt4.4444 ` require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600)) - cfg, err := filterconfig.UnmarshalConfigYaml(configPath) + cfg, err := filterapi.UnmarshalConfigYaml(configPath) require.NoError(t, err) require.Equal(t, "ai_gateway_llm_ns", cfg.MetadataNamespace) require.Equal(t, "token_usage_key", cfg.LLMRequestCosts[0].MetadataKey) diff --git a/extprocapi/extproc.go b/filterapi/x/x.go similarity index 65% rename from extprocapi/extproc.go rename to filterapi/x/x.go index 7efbfa221..5fc102a94 100644 --- a/extprocapi/extproc.go +++ b/filterapi/x/x.go @@ -1,7 +1,7 @@ -// Package extprocapi is for building a custom external process. -package extprocapi +// Package x is an experimental package that provides the customizability of the AI Gateway filter. +package x -import "github.com/envoyproxy/ai-gateway/filterconfig" +import "github.com/envoyproxy/ai-gateway/filterapi" // NewCustomRouter is the function to create a custom router over the default router. // This is nil by default and can be set by the custom build of external processor. @@ -13,7 +13,7 @@ var NewCustomRouter NewCustomRouterFn // This is called when the new configuration is loaded. // // The defaultRouter can be used to delegate the calculation to the default router implementation. -type NewCustomRouterFn func(defaultRouter Router, config *filterconfig.Config) Router +type NewCustomRouterFn func(defaultRouter Router, config *filterapi.Config) Router // Router is the interface for the router. // @@ -21,9 +21,9 @@ type NewCustomRouterFn func(defaultRouter Router, config *filterconfig.Config) R type Router interface { // Calculate determines the backend to route to based on the request headers. // - // The request headers include the populated [filterconfig.Config.ModelNameHeaderKey] - // with the parsed model name based on the [filterconfig.Config] given to the NewCustomRouterFn. + // The request headers include the populated [filterapi.Config.ModelNameHeaderKey] + // with the parsed model name based on the [filterapi.Config] given to the NewCustomRouterFn. // // Returns the backend. - Calculate(requestHeaders map[string]string) (backend *filterconfig.Backend, err error) + Calculate(requestHeaders map[string]string) (backend *filterapi.Backend, err error) } diff --git a/go.mod b/go.mod index f2bf45a6f..cbbf95f2e 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/google/go-cmp v0.6.0 github.com/openai/openai-go v0.1.0-alpha.49 github.com/stretchr/testify v1.10.0 - github.com/yuin/goldmark v1.7.8 golang.org/x/exp v0.0.0-20250128144449-3edf0e91c1ae golang.org/x/oauth2 v0.25.0 google.golang.org/grpc v1.70.0 diff --git a/go.sum b/go.sum index 28e0b15bb..f48747539 100644 --- a/go.sum +++ b/go.sum @@ -169,8 +169,6 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.7.8 h1:iERMLn0/QJeHFhxSt3p6PeN9mGnvIKSpG9YYorDMnic= -github.com/yuin/goldmark v1.7.8/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= diff --git a/internal/controller/ai_gateway_route.go b/internal/controller/ai_gateway_route.go index a208981a4..b08ac1c2b 100644 --- a/internal/controller/ai_gateway_route.go +++ b/internal/controller/ai_gateway_route.go @@ -18,7 +18,7 @@ import ( gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) const ( @@ -44,7 +44,7 @@ func NewAIGatewayRouteController( return &aiGatewayRouteController{ client: client, kube: kube, - logger: logger.WithName("ai-gateway-route-controller"), + logger: logger, eventChan: ch, } } @@ -152,7 +152,7 @@ func (c *aiGatewayRouteController) ensuresExtProcConfigMapExists(ctx context.Con Name: name, Namespace: aiGatewayRoute.Namespace, }, - Data: map[string]string{expProcConfigFileName: filterconfig.DefaultConfig}, + Data: map[string]string{expProcConfigFileName: filterapi.DefaultConfig}, } if err := ctrlutil.SetControllerReference(aiGatewayRoute, configMap, c.client.Scheme()); err != nil { c.logger.Error(err, "failed to set controller reference for service", "namespace", configMap.Namespace, "name", configMap.Name) diff --git a/internal/controller/ai_gateway_route_test.go b/internal/controller/ai_gateway_route_test.go index 2d073f5b8..9cc7b6b1c 100644 --- a/internal/controller/ai_gateway_route_test.go +++ b/internal/controller/ai_gateway_route_test.go @@ -17,7 +17,7 @@ import ( gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) func Test_extProcName(t *testing.T) { @@ -46,7 +46,7 @@ func TestAIGatewayRouteController_ensuresExtProcConfigMapExists(t *testing.T) { require.Equal(t, extProcName(aiGatewayRoute), configMap.Name) require.Equal(t, "default", configMap.Namespace) require.Equal(t, ownerRef, configMap.OwnerReferences) - require.Equal(t, filterconfig.DefaultConfig, configMap.Data[expProcConfigFileName]) + require.Equal(t, filterapi.DefaultConfig, configMap.Data[expProcConfigFileName]) // Doing it again should not fail. err = c.ensuresExtProcConfigMapExists(context.Background(), aiGatewayRoute) diff --git a/internal/controller/ai_service_backend.go b/internal/controller/ai_service_backend.go index 148a67324..0b65893c2 100644 --- a/internal/controller/ai_service_backend.go +++ b/internal/controller/ai_service_backend.go @@ -27,7 +27,7 @@ func NewAIServiceBackendController(client client.Client, kube kubernetes.Interfa return &aiBackendController{ client: client, kube: kube, - logger: logger.WithName("ai-service-backend-controller"), + logger: logger, eventChan: ch, } } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index e59dacf2b..21d4caaae 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -69,7 +69,8 @@ func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logg } sinkChan := make(chan ConfigSinkEvent, 100) - routeC := NewAIGatewayRouteController(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan) + routeC := NewAIGatewayRouteController(c, kubernetes.NewForConfigOrDie(config), logger. + WithName("ai-gateway-route"), sinkChan) if err = ctrl.NewControllerManagedBy(mgr). For(&aigv1a1.AIGatewayRoute{}). Owns(&egv1a1.EnvoyExtensionPolicy{}). @@ -80,36 +81,38 @@ func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logg return fmt.Errorf("failed to create controller for AIGatewayRoute: %w", err) } - backendC := NewAIServiceBackendController(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan) + backendC := NewAIServiceBackendController(c, kubernetes.NewForConfigOrDie(config), logger. + WithName("ai-service-backend"), sinkChan) if err = ctrl.NewControllerManagedBy(mgr). For(&aigv1a1.AIServiceBackend{}). Complete(backendC); err != nil { return fmt.Errorf("failed to create controller for AIServiceBackend: %w", err) } - backendSecurityPolicyC := newBackendSecurityPolicyController(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan) + backendSecurityPolicyC := newBackendSecurityPolicyController(c, kubernetes.NewForConfigOrDie(config), logger. + WithName("backend-security-policy"), sinkChan) if err = ctrl.NewControllerManagedBy(mgr). For(&aigv1a1.BackendSecurityPolicy{}). Complete(backendSecurityPolicyC); err != nil { return fmt.Errorf("failed to create controller for BackendSecurityPolicy: %w", err) } - secretC := NewSecretController(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan) + secretC := NewSecretController(c, kubernetes.NewForConfigOrDie(config), logger. + WithName("secret"), sinkChan) if err = ctrl.NewControllerManagedBy(mgr). For(&corev1.Secret{}). Complete(secretC); err != nil { return fmt.Errorf("failed to create controller for Secret: %w", err) } - sink := newConfigSink(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan, options.ExtProcImage) + sink := newConfigSink(c, kubernetes.NewForConfigOrDie(config), logger. + WithName("config-sink"), sinkChan, options.ExtProcImage) // Before starting the manager, initialize the config sink to sync all AIServiceBackend and AIGatewayRoute objects in the cluster. - logger.Info("Initializing config sink") if err = sink.init(ctx); err != nil { return fmt.Errorf("failed to initialize config sink: %w", err) } - logger.Info("Starting controller manager") if err = mgr.Start(ctx); err != nil { // This blocks until the manager is stopped. return fmt.Errorf("failed to start controller manager: %w", err) } diff --git a/internal/controller/sink.go b/internal/controller/sink.go index 5a3f86a56..caf5cd3ba 100644 --- a/internal/controller/sink.go +++ b/internal/controller/sink.go @@ -11,6 +11,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" uuid2 "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" "k8s.io/utils/ptr" @@ -20,13 +21,14 @@ import ( "sigs.k8s.io/yaml" aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/llmcostcel" ) const ( - selectedBackendHeaderKey = "x-ai-eg-selected-backend" - hostRewriteHTTPFilterName = "ai-eg-host-rewrite" + selectedBackendHeaderKey = "x-ai-eg-selected-backend" + hostRewriteHTTPFilterName = "ai-eg-host-rewrite" + extProcConfigAnnotationKey = "aigateway.envoyproxy.io/extproc-config-uuid" ) // mountedExtProcSecretPath specifies the secret file mounted on the external proc. The idea is to update the mounted @@ -70,7 +72,7 @@ func newConfigSink( c := &configSink{ client: kubeClient, kube: kube, - logger: logger.WithName("config-sink"), + logger: logger, defaultExtProcImage: extProcImage, defaultExtProcImagePullPolicy: corev1.PullIfNotPresent, eventChan: eventChan, @@ -197,7 +199,8 @@ func (c *configSink) syncAIGatewayRoute(aiGatewayRoute *aigv1a1.AIGatewayRoute) } // Update the extproc configmap. - if err := c.updateExtProcConfigMap(aiGatewayRoute, string(uuid2.NewUUID())); err != nil { + uuid := string(uuid2.NewUUID()) + if err := c.updateExtProcConfigMap(aiGatewayRoute, uuid); err != nil { c.logger.Error(err, "failed to update extproc configmap", "namespace", aiGatewayRoute.Namespace, "name", aiGatewayRoute.Name) return } @@ -208,6 +211,13 @@ func (c *configSink) syncAIGatewayRoute(aiGatewayRoute *aigv1a1.AIGatewayRoute) c.logger.Error(err, "failed to deploy ext proc", "namespace", aiGatewayRoute.Namespace, "name", aiGatewayRoute.Name) return } + + // Annotate all pods with the new config. + err = c.annotateExtProcPods(context.Background(), aiGatewayRoute, uuid) + if err != nil { + c.logger.Error(err, "failed to annotate pods", "namespace", aiGatewayRoute.Namespace, "name", aiGatewayRoute.Name) + return + } } func (c *configSink) syncAIServiceBackend(aiBackend *aigv1a1.AIServiceBackend) { @@ -249,17 +259,17 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou panic(fmt.Errorf("failed to get configmap %s: %w", extProcName(aiGatewayRoute), err)) } - ec := &filterconfig.Config{UUID: uuid} + ec := &filterapi.Config{UUID: uuid} spec := &aiGatewayRoute.Spec - ec.Schema.Name = filterconfig.APISchemaName(spec.APISchema.Name) + ec.Schema.Name = filterapi.APISchemaName(spec.APISchema.Name) ec.Schema.Version = spec.APISchema.Version ec.ModelNameHeaderKey = aigv1a1.AIModelHeaderKey ec.SelectedBackendHeaderKey = selectedBackendHeaderKey - ec.Rules = make([]filterconfig.RouteRule, len(spec.Rules)) + ec.Rules = make([]filterapi.RouteRule, len(spec.Rules)) for i := range spec.Rules { rule := &spec.Rules[i] - ec.Rules[i].Backends = make([]filterconfig.Backend, len(rule.BackendRefs)) + ec.Rules[i].Backends = make([]filterapi.Backend, len(rule.BackendRefs)) for j := range rule.BackendRefs { backend := &rule.BackendRefs[j] key := fmt.Sprintf("%s.%s", backend.Name, aiGatewayRoute.Namespace) @@ -269,7 +279,7 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou if err != nil { return fmt.Errorf("failed to get AIServiceBackend %s: %w", key, err) } else { - ec.Rules[i].Backends[j].Schema.Name = filterconfig.APISchemaName(backendObj.Spec.APISchema.Name) + ec.Rules[i].Backends[j].Schema.Name = filterapi.APISchemaName(backendObj.Spec.APISchema.Name) ec.Rules[i].Backends[j].Schema.Version = backendObj.Spec.APISchema.Version } @@ -284,16 +294,16 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou switch backendSecurityPolicy.Spec.Type { case aigv1a1.BackendSecurityPolicyTypeAPIKey: - ec.Rules[i].Backends[j].Auth = &filterconfig.BackendAuth{ - APIKey: &filterconfig.APIKeyAuth{Filename: path.Join(backendSecurityMountPath(volumeName), "/apiKey")}, + ec.Rules[i].Backends[j].Auth = &filterapi.BackendAuth{ + APIKey: &filterapi.APIKeyAuth{Filename: path.Join(backendSecurityMountPath(volumeName), "/apiKey")}, } case aigv1a1.BackendSecurityPolicyTypeAWSCredentials: if backendSecurityPolicy.Spec.AWSCredentials == nil { return fmt.Errorf("AWSCredentials type selected but not defined %s", backendSecurityPolicy.Name) } if backendSecurityPolicy.Spec.AWSCredentials.CredentialsFile != nil { - ec.Rules[i].Backends[j].Auth = &filterconfig.BackendAuth{ - AWSAuth: &filterconfig.AWSAuth{ + ec.Rules[i].Backends[j].Auth = &filterapi.BackendAuth{ + AWSAuth: &filterapi.AWSAuth{ CredentialFileName: path.Join(backendSecurityMountPath(volumeName), "/credentials"), Region: backendSecurityPolicy.Spec.AWSCredentials.Region, }, @@ -317,7 +327,7 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou } } } - ec.Rules[i].Headers = make([]filterconfig.HeaderMatch, len(rule.Matches)) + ec.Rules[i].Headers = make([]filterapi.HeaderMatch, len(rule.Matches)) for j, match := range rule.Matches { ec.Rules[i].Headers[j].Name = match.Headers[0].Name ec.Rules[i].Headers[j].Value = match.Headers[0].Value @@ -326,16 +336,16 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou ec.MetadataNamespace = aigv1a1.AIGatewayFilterMetadataNamespace for _, cost := range aiGatewayRoute.Spec.LLMRequestCosts { - fc := filterconfig.LLMRequestCost{MetadataKey: cost.MetadataKey} + fc := filterapi.LLMRequestCost{MetadataKey: cost.MetadataKey} switch cost.Type { case aigv1a1.LLMRequestCostTypeInputToken: - fc.Type = filterconfig.LLMRequestCostTypeInputToken + fc.Type = filterapi.LLMRequestCostTypeInputToken case aigv1a1.LLMRequestCostTypeOutputToken: - fc.Type = filterconfig.LLMRequestCostTypeOutputToken + fc.Type = filterapi.LLMRequestCostTypeOutputToken case aigv1a1.LLMRequestCostTypeTotalToken: - fc.Type = filterconfig.LLMRequestCostTypeTotalToken + fc.Type = filterapi.LLMRequestCostTypeTotalToken case aigv1a1.LLMRequestCostTypeCEL: - fc.Type = filterconfig.LLMRequestCostTypeCELExpression + fc.Type = filterapi.LLMRequestCostTypeCELExpression expr := *cost.CELExpression // Sanity check the CEL expression. _, err := llmcostcel.NewProgram(expr) @@ -434,6 +444,31 @@ func (c *configSink) newHTTPRoute(dst *gwapiv1.HTTPRoute, aiGatewayRoute *aigv1a return nil } +// annotateExtProcPods annotates the external processor pods with the new config uuid. +// This is necessary to make the config update faster. +// +// See https://neonmirrors.net/post/2022-12/reducing-pod-volume-update-times/ for explanation. +func (c *configSink) annotateExtProcPods(ctx context.Context, aiGatewayRoute *aigv1a1.AIGatewayRoute, uuid string) error { + pods, err := c.kube.CoreV1().Pods(aiGatewayRoute.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", extProcName(aiGatewayRoute)), + }) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + for _, pod := range pods.Items { + c.logger.Info("annotating pod", "namespace", pod.Namespace, "name", pod.Name) + _, err = c.kube.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.MergePatchType, + []byte(fmt.Sprintf( + `{"metadata":{"annotations":{"%s":"%s"}}}`, extProcConfigAnnotationKey, uuid), + ), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("failed to patch pod %s: %w", pod.Name, err) + } + } + return nil +} + // syncExtProcDeployment syncs the external processor's Deployment and Service. func (c *configSink) syncExtProcDeployment(ctx context.Context, aiGatewayRoute *aigv1a1.AIGatewayRoute) error { name := extProcName(aiGatewayRoute) diff --git a/internal/controller/sink_test.go b/internal/controller/sink_test.go index f700cc8d3..5eb4da08c 100644 --- a/internal/controller/sink_test.go +++ b/internal/controller/sink_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os" + "strconv" "testing" "time" @@ -24,7 +25,7 @@ import ( gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" aigv1a1 "github.com/envoyproxy/ai-gateway/api/v1alpha1" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) func TestConfigSink_init(t *testing.T) { @@ -314,7 +315,7 @@ func Test_updateExtProcConfigMap(t *testing.T) { for _, tc := range []struct { name string route *aigv1a1.AIGatewayRoute - exp *filterconfig.Config + exp *filterapi.Config }{ { name: "basic", @@ -368,46 +369,46 @@ func Test_updateExtProcConfigMap(t *testing.T) { }, }, }, - exp: &filterconfig.Config{ + exp: &filterapi.Config{ UUID: string(uuid2.NewUUID()), - Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI, Version: "v123"}, + Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI, Version: "v123"}, ModelNameHeaderKey: aigv1a1.AIModelHeaderKey, MetadataNamespace: aigv1a1.AIGatewayFilterMetadataNamespace, SelectedBackendHeaderKey: selectedBackendHeaderKey, - Rules: []filterconfig.RouteRule{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{ - {Name: "apple.ns", Weight: 1, Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}, Auth: &filterconfig.BackendAuth{ - APIKey: &filterconfig.APIKeyAuth{ + Backends: []filterapi.Backend{ + {Name: "apple.ns", Weight: 1, Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaAWSBedrock}, Auth: &filterapi.BackendAuth{ + APIKey: &filterapi.APIKeyAuth{ Filename: "/etc/backend_security_policy/rule0-backref0-some-backend-security-policy-1/apiKey", }, }}, {Name: "pineapple.ns", Weight: 2}, }, - Headers: []filterconfig.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "some-ai"}}, + Headers: []filterapi.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "some-ai"}}, }, { - Backends: []filterconfig.Backend{{Name: "cat.ns", Weight: 1, Auth: &filterconfig.BackendAuth{ - APIKey: &filterconfig.APIKeyAuth{ + Backends: []filterapi.Backend{{Name: "cat.ns", Weight: 1, Auth: &filterapi.BackendAuth{ + APIKey: &filterapi.APIKeyAuth{ Filename: "/etc/backend_security_policy/rule1-backref0-some-backend-security-policy-1/apiKey", }, }}}, - Headers: []filterconfig.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "another-ai"}}, + Headers: []filterapi.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "another-ai"}}, }, { - Backends: []filterconfig.Backend{{Name: "pen.ns", Weight: 2, Auth: &filterconfig.BackendAuth{ - AWSAuth: &filterconfig.AWSAuth{ + Backends: []filterapi.Backend{{Name: "pen.ns", Weight: 2, Auth: &filterapi.BackendAuth{ + AWSAuth: &filterapi.AWSAuth{ CredentialFileName: "/etc/backend_security_policy/rule2-backref0-some-backend-security-policy-2/credentials", Region: "us-east-1", }, }}}, - Headers: []filterconfig.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "another-ai-2"}}, + Headers: []filterapi.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "another-ai-2"}}, }, }, - LLMRequestCosts: []filterconfig.LLMRequestCost{ - {Type: filterconfig.LLMRequestCostTypeOutputToken, MetadataKey: "output-token"}, - {Type: filterconfig.LLMRequestCostTypeInputToken, MetadataKey: "input-token"}, - {Type: filterconfig.LLMRequestCostTypeTotalToken, MetadataKey: "total-token"}, - {Type: filterconfig.LLMRequestCostTypeCELExpression, MetadataKey: "cel-token", CELExpression: "model == 'cool_model' ? input_tokens * output_tokens : total_tokens"}, + LLMRequestCosts: []filterapi.LLMRequestCost{ + {Type: filterapi.LLMRequestCostTypeOutputToken, MetadataKey: "output-token"}, + {Type: filterapi.LLMRequestCostTypeInputToken, MetadataKey: "input-token"}, + {Type: filterapi.LLMRequestCostTypeTotalToken, MetadataKey: "total-token"}, + {Type: filterapi.LLMRequestCostTypeCELExpression, MetadataKey: "cel-token", CELExpression: "model == 'cool_model' ? input_tokens * output_tokens : total_tokens"}, }, }, }, @@ -426,7 +427,7 @@ func Test_updateExtProcConfigMap(t *testing.T) { require.NotNil(t, cm) data := cm.Data[expProcConfigFileName] - var actual filterconfig.Config + var actual filterapi.Config require.NoError(t, yaml.Unmarshal([]byte(data), &actual)) require.Equal(t, tc.exp, &actual) }) @@ -783,3 +784,39 @@ func Test_backendSecurityPolicyVolumeName(t *testing.T) { mountPath := backendSecurityPolicyVolumeName(1, 2, "name") require.Equal(t, "rule1-backref2-name", mountPath) } + +func Test_annotateExtProcPods(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + kube := fake2.NewClientset() + + eventChan := make(chan ConfigSinkEvent) + s := newConfigSink(fakeClient, kube, logr.Discard(), eventChan, "defaultExtProcImage") + + aiGatewayRoute := &aigv1a1.AIGatewayRoute{ + ObjectMeta: metav1.ObjectMeta{Name: "myroute", Namespace: "foons"}, + } + + for i := range 5 { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "somepod" + strconv.Itoa(i), + Namespace: "foons", + Labels: map[string]string{"app": extProcName(aiGatewayRoute)}, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "someapp"}}}, + } + _, err := kube.CoreV1().Pods("foons").Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + } + + uuid := string(uuid2.NewUUID()) + err := s.annotateExtProcPods(context.Background(), aiGatewayRoute, uuid) + require.NoError(t, err) + + // Check that all pods have been annotated. + for i := range 5 { + pod, err := kube.CoreV1().Pods("foons").Get(context.Background(), "somepod"+strconv.Itoa(i), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, uuid, pod.Annotations[extProcConfigAnnotationKey]) + } +} diff --git a/internal/extproc/backendauth/api_key.go b/internal/extproc/backendauth/api_key.go index 231624a9a..3a94fb0e5 100644 --- a/internal/extproc/backendauth/api_key.go +++ b/internal/extproc/backendauth/api_key.go @@ -8,7 +8,7 @@ import ( corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) // apiKeyHandler implements [Handler] for api key authz. @@ -16,7 +16,7 @@ type apiKeyHandler struct { apiKey string } -func newAPIKeyHandler(auth *filterconfig.APIKeyAuth) (Handler, error) { +func newAPIKeyHandler(auth *filterapi.APIKeyAuth) (Handler, error) { secret, err := os.ReadFile(auth.Filename) if err != nil { return nil, fmt.Errorf("failed to read api key file: %w", err) diff --git a/internal/extproc/backendauth/api_key_test.go b/internal/extproc/backendauth/api_key_test.go index 69e9b52ea..cfb3ce135 100644 --- a/internal/extproc/backendauth/api_key_test.go +++ b/internal/extproc/backendauth/api_key_test.go @@ -8,7 +8,7 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) func TestNewAPIKeyHandler(t *testing.T) { @@ -21,7 +21,7 @@ func TestNewAPIKeyHandler(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Sync()) - auth := filterconfig.APIKeyAuth{Filename: apiKeyFile} + auth := filterapi.APIKeyAuth{Filename: apiKeyFile} handler, err := newAPIKeyHandler(&auth) require.NoError(t, err) require.NotNil(t, handler) @@ -39,7 +39,7 @@ func TestApiKeyHandler_Do(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Sync()) - auth := filterconfig.APIKeyAuth{Filename: apiKeyFile} + auth := filterapi.APIKeyAuth{Filename: apiKeyFile} handler, err := newAPIKeyHandler(&auth) require.NoError(t, err) require.NotNil(t, handler) diff --git a/internal/extproc/backendauth/auth.go b/internal/extproc/backendauth/auth.go index 6264230a4..e8375843f 100644 --- a/internal/extproc/backendauth/auth.go +++ b/internal/extproc/backendauth/auth.go @@ -5,7 +5,7 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) // Handler is the interface that deals with the backend auth for a specific backend. @@ -17,7 +17,7 @@ type Handler interface { } // NewHandler returns a new implementation of [Handler] based on the configuration. -func NewHandler(config *filterconfig.BackendAuth) (Handler, error) { +func NewHandler(config *filterapi.BackendAuth) (Handler, error) { if config.AWSAuth != nil { return newAWSHandler(config.AWSAuth) } else if config.APIKey != nil { diff --git a/internal/extproc/backendauth/aws.go b/internal/extproc/backendauth/aws.go index ea8eb7637..0d6385e3d 100644 --- a/internal/extproc/backendauth/aws.go +++ b/internal/extproc/backendauth/aws.go @@ -20,7 +20,7 @@ import ( corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) // awsHandler implements [Handler] for AWS Bedrock authz. @@ -34,7 +34,7 @@ type awsHandler struct { proxyURL string } -func newAWSHandler(awsAuth *filterconfig.AWSAuth) (*awsHandler, error) { +func newAWSHandler(awsAuth *filterapi.AWSAuth) (*awsHandler, error) { var credentials aws.Credentials var region string var oidcHandler *oidcHandler diff --git a/internal/extproc/backendauth/aws_test.go b/internal/extproc/backendauth/aws_test.go index d7eda2950..3a3da56cb 100644 --- a/internal/extproc/backendauth/aws_test.go +++ b/internal/extproc/backendauth/aws_test.go @@ -8,14 +8,14 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) func TestNewAWSHandler(t *testing.T) { t.Setenv("AWS_ACCESS_KEY_ID", "test") t.Setenv("AWS_SECRET_ACCESS_KEY", "secret") - handler, err := newAWSHandler(&filterconfig.AWSAuth{}) + handler, err := newAWSHandler(&filterapi.AWSAuth{}) require.NoError(t, err) require.NotNil(t, handler) } @@ -35,7 +35,7 @@ func TestAWSHandler_Do(t *testing.T) { require.NoError(t, err) require.NoError(t, file.Sync()) - credentialFileHandler, err := newAWSHandler(&filterconfig.AWSAuth{ + credentialFileHandler, err := newAWSHandler(&filterapi.AWSAuth{ CredentialFileName: awsCredentialFile, Region: "us-east-1", }) diff --git a/internal/extproc/mocks_test.go b/internal/extproc/mocks_test.go index f0e684204..d1f2d5f9e 100644 --- a/internal/extproc/mocks_test.go +++ b/internal/extproc/mocks_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" "github.com/envoyproxy/ai-gateway/internal/extproc/router" "github.com/envoyproxy/ai-gateway/internal/extproc/translator" ) @@ -21,7 +21,7 @@ import ( var ( _ ProcessorIface = &mockProcessor{} _ translator.Translator = &mockTranslator{} - _ extprocapi.Router = &mockRouter{} + _ x.Router = &mockRouter{} ) func newMockProcessor(_ *processorConfig, _ *slog.Logger) *mockProcessor { @@ -111,14 +111,14 @@ type mockRouter struct { t *testing.T expHeaders map[string]string retBackendName string - retVersionedAPISchema filterconfig.VersionedAPISchema + retVersionedAPISchema filterapi.VersionedAPISchema retErr error } // Calculate implements [router.Router.Calculate]. -func (m mockRouter) Calculate(headers map[string]string) (*filterconfig.Backend, error) { +func (m mockRouter) Calculate(headers map[string]string) (*filterapi.Backend, error) { require.Equal(m.t, m.expHeaders, headers) - b := &filterconfig.Backend{Name: m.retBackendName, Schema: m.retVersionedAPISchema} + b := &filterapi.Backend{Name: m.retBackendName, Schema: m.retVersionedAPISchema} return b, m.retErr } diff --git a/internal/extproc/processor.go b/internal/extproc/processor.go index dcc7b3b21..7af784240 100644 --- a/internal/extproc/processor.go +++ b/internal/extproc/processor.go @@ -14,8 +14,8 @@ import ( "github.com/google/cel-go/cel" "google.golang.org/protobuf/types/known/structpb" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" "github.com/envoyproxy/ai-gateway/internal/extproc/backendauth" "github.com/envoyproxy/ai-gateway/internal/extproc/router" "github.com/envoyproxy/ai-gateway/internal/extproc/translator" @@ -25,17 +25,18 @@ import ( // processorConfig is the configuration for the processor. // This will be created by the server and passed to the processor when it detects a new configuration. type processorConfig struct { + uuid string bodyParser router.RequestBodyParser - router extprocapi.Router + router x.Router modelNameHeaderKey, selectedBackendHeaderKey string - factories map[filterconfig.VersionedAPISchema]translator.Factory + factories map[filterapi.VersionedAPISchema]translator.Factory backendAuthHandlers map[string]backendauth.Handler metadataNamespace string requestCosts []processorConfigRequestCost } type processorConfigRequestCost struct { - *filterconfig.LLMRequestCost + *filterapi.LLMRequestCost celProg cel.Program } @@ -218,13 +219,13 @@ func (p *Processor) maybeBuildDynamicMetadata() (*structpb.Struct, error) { c := &p.config.requestCosts[i] var cost uint32 switch c.Type { - case filterconfig.LLMRequestCostTypeInputToken: + case filterapi.LLMRequestCostTypeInputToken: cost = p.costs.InputTokens - case filterconfig.LLMRequestCostTypeOutputToken: + case filterapi.LLMRequestCostTypeOutputToken: cost = p.costs.OutputTokens - case filterconfig.LLMRequestCostTypeTotalToken: + case filterapi.LLMRequestCostTypeTotalToken: cost = p.costs.TotalTokens - case filterconfig.LLMRequestCostTypeCELExpression: + case filterapi.LLMRequestCostTypeCELExpression: costU64, err := llmcostcel.EvaluateProgram( c.celProg, p.requestHeaders[p.config.modelNameHeaderKey], diff --git a/internal/extproc/processor_test.go b/internal/extproc/processor_test.go index 5f9f687e0..862f84751 100644 --- a/internal/extproc/processor_test.go +++ b/internal/extproc/processor_test.go @@ -10,7 +10,7 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/extproc/router" "github.com/envoyproxy/ai-gateway/internal/extproc/translator" "github.com/envoyproxy/ai-gateway/internal/llmcostcel" @@ -74,15 +74,15 @@ func TestProcessor_ProcessResponseBody(t *testing.T) { p := &Processor{translator: mt, logger: slog.Default(), config: &processorConfig{ metadataNamespace: "ai_gateway_llm_ns", requestCosts: []processorConfigRequestCost{ - {LLMRequestCost: &filterconfig.LLMRequestCost{Type: filterconfig.LLMRequestCostTypeOutputToken, MetadataKey: "output_token_usage"}}, - {LLMRequestCost: &filterconfig.LLMRequestCost{Type: filterconfig.LLMRequestCostTypeInputToken, MetadataKey: "input_token_usage"}}, + {LLMRequestCost: &filterapi.LLMRequestCost{Type: filterapi.LLMRequestCostTypeOutputToken, MetadataKey: "output_token_usage"}}, + {LLMRequestCost: &filterapi.LLMRequestCost{Type: filterapi.LLMRequestCostTypeInputToken, MetadataKey: "input_token_usage"}}, { celProg: celProgInt, - LLMRequestCost: &filterconfig.LLMRequestCost{Type: filterconfig.LLMRequestCostTypeCELExpression, MetadataKey: "cel_int"}, + LLMRequestCost: &filterapi.LLMRequestCost{Type: filterapi.LLMRequestCostTypeCELExpression, MetadataKey: "cel_int"}, }, { celProg: celProgUint, - LLMRequestCost: &filterconfig.LLMRequestCost{Type: filterconfig.LLMRequestCostTypeCELExpression, MetadataKey: "cel_uint"}, + LLMRequestCost: &filterapi.LLMRequestCost{Type: filterapi.LLMRequestCostTypeCELExpression, MetadataKey: "cel_uint"}, }, }, }} @@ -125,11 +125,11 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { rbp := mockRequestBodyParser{t: t, retModelName: "some-model", expPath: "/foo"} rt := mockRouter{ t: t, expHeaders: headers, retBackendName: "some-backend", - retVersionedAPISchema: filterconfig.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, + retVersionedAPISchema: filterapi.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, } p := &Processor{config: &processorConfig{ bodyParser: rbp.impl, router: rt, - factories: make(map[filterconfig.VersionedAPISchema]translator.Factory), + factories: make(map[filterapi.VersionedAPISchema]translator.Factory), }, requestHeaders: headers, logger: slog.Default()} _, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.ErrorContains(t, err, "failed to find factory for output schema {\"some-schema\" \"v10.0\"}") @@ -139,12 +139,12 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { rbp := mockRequestBodyParser{t: t, retModelName: "some-model", expPath: "/foo"} rt := mockRouter{ t: t, expHeaders: headers, retBackendName: "some-backend", - retVersionedAPISchema: filterconfig.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, + retVersionedAPISchema: filterapi.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, } factory := mockTranslatorFactory{t: t, retErr: errors.New("test error"), expPath: "/foo"} p := &Processor{config: &processorConfig{ bodyParser: rbp.impl, router: rt, - factories: map[filterconfig.VersionedAPISchema]translator.Factory{ + factories: map[filterapi.VersionedAPISchema]translator.Factory{ {Name: "some-schema", Version: "v10.0"}: factory.impl, }, }, requestHeaders: headers, logger: slog.Default()} @@ -156,12 +156,12 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { rbp := mockRequestBodyParser{t: t, retModelName: "some-model", expPath: "/foo"} rt := mockRouter{ t: t, expHeaders: headers, retBackendName: "some-backend", - retVersionedAPISchema: filterconfig.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, + retVersionedAPISchema: filterapi.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, } factory := mockTranslatorFactory{t: t, retTranslator: mockTranslator{t: t, retErr: errors.New("test error")}, expPath: "/foo"} p := &Processor{config: &processorConfig{ bodyParser: rbp.impl, router: rt, - factories: map[filterconfig.VersionedAPISchema]translator.Factory{ + factories: map[filterapi.VersionedAPISchema]translator.Factory{ {Name: "some-schema", Version: "v10.0"}: factory.impl, }, }, requestHeaders: headers, logger: slog.Default()} @@ -174,7 +174,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { rbp := mockRequestBodyParser{t: t, retModelName: "some-model", expPath: "/foo", retRb: someBody} rt := mockRouter{ t: t, expHeaders: headers, retBackendName: "some-backend", - retVersionedAPISchema: filterconfig.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, + retVersionedAPISchema: filterapi.VersionedAPISchema{Name: "some-schema", Version: "v10.0"}, } headerMut := &extprocv3.HeaderMutation{} bodyMut := &extprocv3.BodyMutation{} @@ -182,7 +182,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { factory := mockTranslatorFactory{t: t, retTranslator: mt, expPath: "/foo"} p := &Processor{config: &processorConfig{ bodyParser: rbp.impl, router: rt, - factories: map[filterconfig.VersionedAPISchema]translator.Factory{ + factories: map[filterapi.VersionedAPISchema]translator.Factory{ {Name: "some-schema", Version: "v10.0"}: factory.impl, }, selectedBackendHeaderKey: "x-ai-gateway-backend-key", diff --git a/internal/extproc/router/request_body.go b/internal/extproc/router/request_body.go index d1361fac8..724e039f7 100644 --- a/internal/extproc/router/request_body.go +++ b/internal/extproc/router/request_body.go @@ -6,7 +6,7 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/apischema/openai" ) @@ -14,8 +14,8 @@ import ( type RequestBodyParser func(path string, body *extprocv3.HttpBody) (modelName string, rb RequestBody, err error) // NewRequestBodyParser creates a new RequestBodyParser based on the schema. -func NewRequestBodyParser(schema filterconfig.VersionedAPISchema) (RequestBodyParser, error) { - if schema.Name == filterconfig.APISchemaOpenAI { +func NewRequestBodyParser(schema filterapi.VersionedAPISchema) (RequestBodyParser, error) { + if schema.Name == filterapi.APISchemaOpenAI { return openAIParseBody, nil } return nil, fmt.Errorf("unsupported API schema: %s", schema) diff --git a/internal/extproc/router/request_body_test.go b/internal/extproc/router/request_body_test.go index e42469e9f..dc76f9f1d 100644 --- a/internal/extproc/router/request_body_test.go +++ b/internal/extproc/router/request_body_test.go @@ -7,18 +7,18 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/apischema/openai" ) func TestNewRequestBodyParser(t *testing.T) { t.Run("ok", func(t *testing.T) { - res, err := NewRequestBodyParser(filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}) + res, err := NewRequestBodyParser(filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}) require.NotNil(t, res) require.NoError(t, err) }) t.Run("error", func(t *testing.T) { - res, err := NewRequestBodyParser(filterconfig.VersionedAPISchema{Name: "foo"}) + res, err := NewRequestBodyParser(filterapi.VersionedAPISchema{Name: "foo"}) require.Nil(t, res) require.Error(t, err) }) diff --git a/internal/extproc/router/router.go b/internal/extproc/router/router.go index acb8996e5..87f5b4f96 100644 --- a/internal/extproc/router/router.go +++ b/internal/extproc/router/router.go @@ -6,18 +6,18 @@ import ( "golang.org/x/exp/rand" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" ) -// router implements [extprocapi.Router]. +// router implements [filterapi.Router]. type router struct { - rules []filterconfig.RouteRule + rules []filterapi.RouteRule rng *rand.Rand } -// NewRouter creates a new [extprocapi.Router] implementation for the given config. -func NewRouter(config *filterconfig.Config, newCustomFn extprocapi.NewCustomRouterFn) (extprocapi.Router, error) { +// NewRouter creates a new [filterapi.Router] implementation for the given config. +func NewRouter(config *filterapi.Config, newCustomFn x.NewCustomRouterFn) (x.Router, error) { r := &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))} //nolint:gosec if newCustomFn != nil { customRouter := newCustomFn(r, config) @@ -26,9 +26,9 @@ func NewRouter(config *filterconfig.Config, newCustomFn extprocapi.NewCustomRout return r, nil } -// Calculate implements [extprocapi.Router.Calculate]. -func (r *router) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) { - var rule *filterconfig.RouteRule +// Calculate implements [filterapi.Router.Calculate]. +func (r *router) Calculate(headers map[string]string) (backend *filterapi.Backend, err error) { + var rule *filterapi.RouteRule for i := range r.rules { _rule := &r.rules[i] for _, hdr := range _rule.Headers { @@ -46,7 +46,7 @@ func (r *router) Calculate(headers map[string]string) (backend *filterconfig.Bac return r.selectBackendFromRule(rule), nil } -func (r *router) selectBackendFromRule(rule *filterconfig.RouteRule) (backend *filterconfig.Backend) { +func (r *router) selectBackendFromRule(rule *filterapi.RouteRule) (backend *filterapi.Backend) { // Each backend has a weight, so we randomly select depending on the weight. // This is a pretty naive implementation and can be buggy, so fix it later. totalWeight := 0 diff --git a/internal/extproc/router/router_test.go b/internal/extproc/router/router_test.go index 15f7f2c8c..c6a359b02 100644 --- a/internal/extproc/router/router_test.go +++ b/internal/extproc/router/router_test.go @@ -5,20 +5,20 @@ import ( "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" ) -// dummyCustomRouter implements [extprocapi.Router]. +// dummyCustomRouter implements [filterapi.Router]. type dummyCustomRouter struct{ called bool } -func (c *dummyCustomRouter) Calculate(map[string]string) (*filterconfig.Backend, error) { +func (c *dummyCustomRouter) Calculate(map[string]string) (*filterapi.Backend, error) { c.called = true return nil, nil } func TestRouter_NewRouter_Custom(t *testing.T) { - r, err := NewRouter(&filterconfig.Config{}, func(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router { + r, err := NewRouter(&filterapi.Config{}, func(defaultRouter x.Router, config *filterapi.Config) x.Router { require.NotNil(t, defaultRouter) _, ok := defaultRouter.(*router) require.True(t, ok) // Checking if the default router is correctly passed. @@ -34,23 +34,23 @@ func TestRouter_NewRouter_Custom(t *testing.T) { } func TestRouter_Calculate(t *testing.T) { - outSchema := filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI} - _r, err := NewRouter(&filterconfig.Config{ - Rules: []filterconfig.RouteRule{ + outSchema := filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI} + _r, err := NewRouter(&filterapi.Config{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{ + Backends: []filterapi.Backend{ {Name: "foo", Schema: outSchema, Weight: 1}, {Name: "bar", Schema: outSchema, Weight: 3}, }, - Headers: []filterconfig.HeaderMatch{ + Headers: []filterapi.HeaderMatch{ {Name: "x-model-name", Value: "llama3.3333"}, }, }, { - Backends: []filterconfig.Backend{ + Backends: []filterapi.Backend{ {Name: "openai", Schema: outSchema}, }, - Headers: []filterconfig.HeaderMatch{ + Headers: []filterapi.HeaderMatch{ {Name: "x-model-name", Value: "gpt4.4444"}, }, }, @@ -87,15 +87,15 @@ func TestRouter_Calculate(t *testing.T) { } func TestRouter_selectBackendFromRule(t *testing.T) { - _r, err := NewRouter(&filterconfig.Config{}, nil) + _r, err := NewRouter(&filterapi.Config{}, nil) require.NoError(t, err) r, ok := _r.(*router) require.True(t, ok) - outSchema := filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI} + outSchema := filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI} - rule := &filterconfig.RouteRule{ - Backends: []filterconfig.Backend{ + rule := &filterapi.RouteRule{ + Backends: []filterapi.Backend{ {Name: "foo", Schema: outSchema, Weight: 1}, {Name: "bar", Schema: outSchema, Weight: 3}, }, diff --git a/internal/extproc/server.go b/internal/extproc/server.go index 1e14b6b16..533e3418f 100644 --- a/internal/extproc/server.go +++ b/internal/extproc/server.go @@ -6,21 +6,30 @@ import ( "fmt" "io" "log/slog" + "slices" + "strings" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/cel-go/cel" "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" - "github.com/envoyproxy/ai-gateway/extprocapi" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" + "github.com/envoyproxy/ai-gateway/filterapi/x" "github.com/envoyproxy/ai-gateway/internal/extproc/backendauth" "github.com/envoyproxy/ai-gateway/internal/extproc/router" "github.com/envoyproxy/ai-gateway/internal/extproc/translator" "github.com/envoyproxy/ai-gateway/internal/llmcostcel" ) +const ( + redactedKey = "[REDACTED]" +) + +var sensitiveHeaderKeys = []string{"authorization"} + // Server implements the external process server. type Server[P ProcessorIface] struct { logger *slog.Logger @@ -35,17 +44,17 @@ func NewServer[P ProcessorIface](logger *slog.Logger, newProcessor func(*process } // LoadConfig updates the configuration of the external processor. -func (s *Server[P]) LoadConfig(config *filterconfig.Config) error { +func (s *Server[P]) LoadConfig(config *filterapi.Config) error { bodyParser, err := router.NewRequestBodyParser(config.Schema) if err != nil { return fmt.Errorf("cannot create request body parser: %w", err) } - rt, err := router.NewRouter(config, extprocapi.NewCustomRouter) + rt, err := router.NewRouter(config, x.NewCustomRouter) if err != nil { return fmt.Errorf("cannot create router: %w", err) } - factories := make(map[filterconfig.VersionedAPISchema]translator.Factory) + factories := make(map[filterapi.VersionedAPISchema]translator.Factory) backendAuthHandlers := make(map[string]backendauth.Handler) for _, r := range config.Rules { for _, b := range r.Backends { @@ -80,6 +89,7 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error { } newConfig := &processorConfig{ + uuid: config.UUID, bodyParser: bodyParser, router: rt, selectedBackendHeaderKey: config.SelectedBackendHeaderKey, modelNameHeaderKey: config.ModelNameHeaderKey, @@ -95,6 +105,7 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error { // Process implements [extprocv3.ExternalProcessorServer]. func (s *Server[P]) Process(stream extprocv3.ExternalProcessor_ProcessServer) error { p := s.newProcessor(s.config, s.logger) + s.logger.Debug("handling a new stream", slog.Any("config_uuid", s.config.uuid)) return s.process(p, stream) } @@ -131,7 +142,11 @@ func (s *Server[P]) processMsg(ctx context.Context, p P, req *extprocv3.Processi switch value := req.Request.(type) { case *extprocv3.ProcessingRequest_RequestHeaders: requestHdrs := req.GetRequestHeaders().Headers - s.logger.Debug("request headers processing", slog.Any("request_headers", requestHdrs)) + // If DEBUG log level is enabled, filter sensitive headers before logging. + if s.logger.Enabled(ctx, slog.LevelDebug) { + filteredHdrs := filterSensitiveHeaders(requestHdrs, s.logger, sensitiveHeaderKeys) + s.logger.Debug("request headers processing", slog.Any("request_headers", filteredHdrs)) + } resp, err := p.ProcessRequestHeaders(ctx, requestHdrs) if err != nil { return nil, fmt.Errorf("cannot process request headers: %w", err) @@ -141,7 +156,11 @@ func (s *Server[P]) processMsg(ctx context.Context, p P, req *extprocv3.Processi case *extprocv3.ProcessingRequest_RequestBody: s.logger.Debug("request body processing", slog.Any("request", req)) resp, err := p.ProcessRequestBody(ctx, value.RequestBody) - s.logger.Debug("request body processed", slog.Any("response", resp)) + // If DEBUG log level is enabled, filter sensitive body before logging. + if s.logger.Enabled(ctx, slog.LevelDebug) { + filteredBody := filterSensitiveBody(resp, s.logger, sensitiveHeaderKeys) + s.logger.Debug("request body processed", slog.Any("response", filteredBody)) + } if err != nil { return nil, fmt.Errorf("cannot process request body: %w", err) } @@ -178,3 +197,58 @@ func (s *Server[P]) Check(context.Context, *grpc_health_v1.HealthCheckRequest) ( func (s *Server[P]) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error { return status.Error(codes.Unimplemented, "Watch is not implemented") } + +// filterSensitiveHeaders filters out sensitive headers from the provided HeaderMap. +// Specifically, it redacts the value of the "authorization" header and logs this action. +// The function returns a new HeaderMap with the filtered headers. +func filterSensitiveHeaders(headers *corev3.HeaderMap, logger *slog.Logger, sensitiveKeys []string) *corev3.HeaderMap { + if headers == nil { + logger.Debug("received nil HeaderMap, returning empty HeaderMap") + return &corev3.HeaderMap{} + } + filteredHeaders := &corev3.HeaderMap{} + for _, header := range headers.Headers { + // We convert the header key to lowercase to make the comparison case-insensitive but we don't modify the original header. + if slices.Contains(sensitiveKeys, strings.ToLower(header.GetKey())) { + logger.Debug("filtering sensitive header", slog.String("header_key", header.Key)) + filteredHeaders.Headers = append(filteredHeaders.Headers, &corev3.HeaderValue{ + Key: header.Key, + Value: redactedKey, + }) + } else { + filteredHeaders.Headers = append(filteredHeaders.Headers, header) + } + } + return filteredHeaders +} + +// filterSensitiveBody filters out sensitive information from the response body. +// It creates a copy of the response body to avoid modifying the original body, +// as the API Key is needed for the request. The function returns a new +// ProcessingResponse with the filtered body for logging. +func filterSensitiveBody(resp *extprocv3.ProcessingResponse, logger *slog.Logger, sensitiveKeys []string) *extprocv3.ProcessingResponse { + if resp == nil { + logger.Debug("received nil ProcessingResponse, returning empty ProcessingResponse") + return &extprocv3.ProcessingResponse{} + } + filteredResp := &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_RequestBody{ + RequestBody: &extprocv3.BodyResponse{ + Response: &extprocv3.CommonResponse{ + HeaderMutation: resp.Response.(*extprocv3.ProcessingResponse_RequestBody).RequestBody.Response.GetHeaderMutation(), + BodyMutation: resp.Response.(*extprocv3.ProcessingResponse_RequestBody).RequestBody.Response.GetBodyMutation(), + ClearRouteCache: resp.Response.(*extprocv3.ProcessingResponse_RequestBody).RequestBody.Response.GetClearRouteCache(), + }, + }, + }, + ModeOverride: resp.ModeOverride, + } + for _, setHeader := range filteredResp.Response.(*extprocv3.ProcessingResponse_RequestBody).RequestBody.Response.GetHeaderMutation().GetSetHeaders() { + // We convert the header key to lowercase to make the comparison case-insensitive but we don't modify the original header. + if slices.Contains(sensitiveKeys, strings.ToLower(setHeader.Header.GetKey())) { + logger.Debug("filtering sensitive header", slog.String("header_key", setHeader.Header.Key)) + setHeader.Header.RawValue = []byte(redactedKey) + } + } + return filteredResp +} diff --git a/internal/extproc/server_test.go b/internal/extproc/server_test.go index 91e0e00cb..4d44ebf7a 100644 --- a/internal/extproc/server_test.go +++ b/internal/extproc/server_test.go @@ -15,7 +15,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/llmcostcel" ) @@ -23,35 +23,36 @@ func requireNewServerWithMockProcessor(t *testing.T) *Server[*mockProcessor] { s, err := NewServer[*mockProcessor](slog.Default(), newMockProcessor) require.NoError(t, err) require.NotNil(t, s) + s.config = &processorConfig{} return s } func TestServer_LoadConfig(t *testing.T) { t.Run("invalid input schema", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - err := s.LoadConfig(&filterconfig.Config{ - Schema: filterconfig.VersionedAPISchema{Name: "some-invalid-schema"}, + err := s.LoadConfig(&filterapi.Config{ + Schema: filterapi.VersionedAPISchema{Name: "some-invalid-schema"}, }) require.Error(t, err) require.ErrorContains(t, err, "cannot create request body parser") }) t.Run("ok", func(t *testing.T) { - config := &filterconfig.Config{ + config := &filterapi.Config{ MetadataNamespace: "ns", - LLMRequestCosts: []filterconfig.LLMRequestCost{ - {MetadataKey: "key", Type: filterconfig.LLMRequestCostTypeOutputToken}, - {MetadataKey: "cel_key", Type: filterconfig.LLMRequestCostTypeCELExpression, CELExpression: "1 + 1"}, + LLMRequestCosts: []filterapi.LLMRequestCost{ + {MetadataKey: "key", Type: filterapi.LLMRequestCostTypeOutputToken}, + {MetadataKey: "cel_key", Type: filterapi.LLMRequestCostTypeCELExpression, CELExpression: "1 + 1"}, }, - Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}, + Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}, SelectedBackendHeaderKey: "x-ai-eg-selected-backend", ModelNameHeaderKey: "x-model-name", - Rules: []filterconfig.RouteRule{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{ - {Name: "kserve", Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}}, - {Name: "awsbedrock", Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}}, + Backends: []filterapi.Backend{ + {Name: "kserve", Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}}, + {Name: "awsbedrock", Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaAWSBedrock}}, }, - Headers: []filterconfig.HeaderMatch{ + Headers: []filterapi.HeaderMatch{ { Name: "x-model-name", Value: "llama3.3333", @@ -59,10 +60,10 @@ func TestServer_LoadConfig(t *testing.T) { }, }, { - Backends: []filterconfig.Backend{ - {Name: "openai", Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}}, + Backends: []filterapi.Backend{ + {Name: "openai", Schema: filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}}, }, - Headers: []filterconfig.HeaderMatch{ + Headers: []filterapi.HeaderMatch{ { Name: "x-model-name", Value: "gpt4.4444", @@ -82,13 +83,13 @@ func TestServer_LoadConfig(t *testing.T) { require.Equal(t, "x-ai-eg-selected-backend", s.config.selectedBackendHeaderKey) require.Equal(t, "x-model-name", s.config.modelNameHeaderKey) require.Len(t, s.config.factories, 2) - require.NotNil(t, s.config.factories[filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}]) - require.NotNil(t, s.config.factories[filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}]) + require.NotNil(t, s.config.factories[filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}]) + require.NotNil(t, s.config.factories[filterapi.VersionedAPISchema{Name: filterapi.APISchemaAWSBedrock}]) require.Len(t, s.config.requestCosts, 2) - require.Equal(t, filterconfig.LLMRequestCostTypeOutputToken, s.config.requestCosts[0].Type) + require.Equal(t, filterapi.LLMRequestCostTypeOutputToken, s.config.requestCosts[0].Type) require.Equal(t, "key", s.config.requestCosts[0].MetadataKey) - require.Equal(t, filterconfig.LLMRequestCostTypeCELExpression, s.config.requestCosts[1].Type) + require.Equal(t, filterapi.LLMRequestCostTypeCELExpression, s.config.requestCosts[1].Type) require.Equal(t, "1 + 1", s.config.requestCosts[1].CELExpression) prog := s.config.requestCosts[1].celProg require.NotNil(t, prog) @@ -259,3 +260,51 @@ func TestServer_Process(t *testing.T) { require.Error(t, err, "context canceled") }) } + +func TestFilterSensitiveHeaders(t *testing.T) { + logger, buf := newTestLoggerWithBuffer() + hm := &corev3.HeaderMap{Headers: []*corev3.HeaderValue{{Key: "foo", Value: "bar"}, {Key: "authorization", Value: "sensitive"}}} + filtered := filterSensitiveHeaders(hm, logger, []string{"authorization"}) + require.Len(t, filtered.Headers, 2) + for _, h := range filtered.Headers { + if h.Key == "authorization" { + require.Equal(t, "[REDACTED]", h.Value) + } else { + require.Equal(t, "bar", h.Value) + } + } + require.Contains(t, buf.String(), "filtering sensitive header") +} + +func TestFilterSensitiveBody(t *testing.T) { + logger, buf := newTestLoggerWithBuffer() + resp := &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_RequestBody{ + RequestBody: &extprocv3.BodyResponse{ + Response: &extprocv3.CommonResponse{ + HeaderMutation: &extprocv3.HeaderMutation{ + SetHeaders: []*corev3.HeaderValueOption{ + {Header: &corev3.HeaderValue{ + Key: ":path", + Value: "/model/some-random-model/converse", + }}, + {Header: &corev3.HeaderValue{ + Key: "Authorization", + Value: "sensitive", + }}, + }, + }, + BodyMutation: &extprocv3.BodyMutation{}, + }, + }, + }, + } + filtered := filterSensitiveBody(resp, logger, []string{"authorization"}) + require.NotNil(t, filtered) + for _, h := range filtered.Response.(*extprocv3.ProcessingResponse_RequestBody).RequestBody.Response.GetHeaderMutation().GetSetHeaders() { + if h.Header.Key == "Authorization" { + require.Equal(t, "[REDACTED]", string(h.Header.RawValue)) + } + } + require.Contains(t, buf.String(), "filtering sensitive header") +} diff --git a/internal/extproc/translator/translator.go b/internal/extproc/translator/translator.go index ed2eb603a..aa2b1e90e 100644 --- a/internal/extproc/translator/translator.go +++ b/internal/extproc/translator/translator.go @@ -8,7 +8,7 @@ import ( extprocv3http "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/internal/extproc/router" ) @@ -35,13 +35,13 @@ func isGoodStatusCode(code int) bool { type Factory func(path string) (Translator, error) // NewFactory returns a callback function that creates a translator for the given API schema combination. -func NewFactory(in, out filterconfig.VersionedAPISchema) (Factory, error) { - if in.Name == filterconfig.APISchemaOpenAI { +func NewFactory(in, out filterapi.VersionedAPISchema) (Factory, error) { + if in.Name == filterapi.APISchemaOpenAI { // TODO: currently, we ignore the LLMAPISchema."Version" field. switch out.Name { - case filterconfig.APISchemaOpenAI: + case filterapi.APISchemaOpenAI: return newOpenAIToOpenAITranslator, nil - case filterconfig.APISchemaAWSBedrock: + case filterapi.APISchemaAWSBedrock: return newOpenAIToAWSBedrockTranslator, nil } } diff --git a/internal/extproc/translator/translator_test.go b/internal/extproc/translator/translator_test.go index 64c420236..bec838993 100644 --- a/internal/extproc/translator/translator_test.go +++ b/internal/extproc/translator/translator_test.go @@ -5,21 +5,21 @@ import ( "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) func TestNewFactory(t *testing.T) { t.Run("error", func(t *testing.T) { _, err := NewFactory( - filterconfig.VersionedAPISchema{Name: "Foo", Version: "v100"}, - filterconfig.VersionedAPISchema{Name: "Bar", Version: "v123"}, + filterapi.VersionedAPISchema{Name: "Foo", Version: "v100"}, + filterapi.VersionedAPISchema{Name: "Bar", Version: "v123"}, ) require.ErrorContains(t, err, "unsupported API schema combination: client={Foo v100}, backend={Bar v123}") }) t.Run("openai to openai", func(t *testing.T) { f, err := NewFactory( - filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}, - filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}, + filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}, + filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}, ) require.NoError(t, err) require.NotNil(t, f) @@ -32,8 +32,8 @@ func TestNewFactory(t *testing.T) { }) t.Run("openai to aws bedrock", func(t *testing.T) { f, err := NewFactory( - filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}, - filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}, + filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}, + filterapi.VersionedAPISchema{Name: filterapi.APISchemaAWSBedrock}, ) require.NoError(t, err) require.NotNil(t, f) diff --git a/internal/extproc/watcher.go b/internal/extproc/watcher.go index 14dce6135..70ffab516 100644 --- a/internal/extproc/watcher.go +++ b/internal/extproc/watcher.go @@ -8,13 +8,13 @@ import ( "strings" "time" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) -// ConfigReceiver is an interface that can receive *filterconfig.Config updates. +// ConfigReceiver is an interface that can receive *filterapi.Config updates. type ConfigReceiver interface { // LoadConfig updates the configuration. - LoadConfig(config *filterconfig.Config) error + LoadConfig(config *filterapi.Config) error } type configWatcher struct { @@ -81,7 +81,7 @@ func (cw *configWatcher) loadConfig(ctx context.Context) error { cw.diff(previous, current) } - cfg, err := filterconfig.UnmarshalConfigYaml(cw.path) + cfg, err := filterapi.UnmarshalConfigYaml(cw.path) if err != nil { return err } diff --git a/internal/extproc/watcher_test.go b/internal/extproc/watcher_test.go index 46000c62e..3f383bf57 100644 --- a/internal/extproc/watcher_test.go +++ b/internal/extproc/watcher_test.go @@ -12,24 +12,24 @@ import ( "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) // mockReceiver is a mock implementation of Receiver. type mockReceiver struct { - cfg *filterconfig.Config + cfg *filterapi.Config mux sync.Mutex } // LoadConfig implements ConfigReceiver. -func (m *mockReceiver) LoadConfig(cfg *filterconfig.Config) error { +func (m *mockReceiver) LoadConfig(cfg *filterapi.Config) error { m.mux.Lock() defer m.mux.Unlock() m.cfg = cfg return nil } -func (m *mockReceiver) getConfig() *filterconfig.Config { +func (m *mockReceiver) getConfig() *filterapi.Config { m.mux.Lock() defer m.mux.Unlock() return m.cfg diff --git a/site/docs/getting-started/connect-providers/aws-bedrock.md b/site/docs/getting-started/connect-providers/aws-bedrock.md index 803abda0b..24a78ec55 100644 --- a/site/docs/getting-started/connect-providers/aws-bedrock.md +++ b/site/docs/getting-started/connect-providers/aws-bedrock.md @@ -47,7 +47,8 @@ The credentials will be stored in Kubernetes secrets. ### 2. Apply Configuration -Apply the updated configuration and wait for the Gateway pod to be ready, and restart the ext-proc to pick up the updated secrets: +Apply the updated configuration and wait for the Gateway pod to be ready. If you already have a Gateway running, +then the secret credential update will be picked up automatically in a few seconds. ```shell kubectl apply -f basic.yaml @@ -56,8 +57,6 @@ kubectl wait pods --timeout=2m \ -l gateway.envoyproxy.io/owning-gateway-name=envoy-ai-gateway-basic \ -n envoy-gateway-system \ --for=condition=Ready - -kubectl rollout restart deployment/ai-eg-route-extproc-envoy-ai-gateway-basic ``` ### 4. Test the Configuration diff --git a/site/docs/getting-started/connect-providers/index.md b/site/docs/getting-started/connect-providers/index.md index c1aeaddb8..211d0fcf0 100644 --- a/site/docs/getting-started/connect-providers/index.md +++ b/site/docs/getting-started/connect-providers/index.md @@ -20,7 +20,8 @@ Currently, Envoy AI Gateway supports the following providers: Before configuring any provider: 1. Complete the [Basic Usage](../basic-usage.md) guide -2. Remove the basic configuration with the mock backend: +2. Remove the basic configuration with the mock backend + ```shell kubectl delete -f https://raw.githubusercontent.com/envoyproxy/ai-gateway/main/examples/basic/basic.yaml @@ -29,11 +30,12 @@ Before configuring any provider: -n envoy-gateway-system \ --for=delete ``` -3. Download Configuration Template: -```shell -curl -O https://raw.githubusercontent.com/envoyproxy/ai-gateway/main/examples/basic/basic.yaml -``` +3. Download configuration template + + ```shell + curl -O https://raw.githubusercontent.com/envoyproxy/ai-gateway/main/examples/basic/basic.yaml + ``` ## Security Best Practices diff --git a/site/docs/getting-started/connect-providers/openai.md b/site/docs/getting-started/connect-providers/openai.md index 1d4df2d1e..ce04fed49 100644 --- a/site/docs/getting-started/connect-providers/openai.md +++ b/site/docs/getting-started/connect-providers/openai.md @@ -11,17 +11,21 @@ This guide will help you configure Envoy AI Gateway to work with OpenAI's models ## Prerequisites Before you begin, you'll need: + - An OpenAI API key from [OpenAI's platform](https://platform.openai.com) - Basic setup completed from the [Basic Usage](../basic-usage.md) guide - Basic configuration removed as described in the [Advanced Configuration](./index.md) overview ## Configuration Steps + :::info Ready to proceed? Ensure you have followed the steps in [Connect Providers](../connect-providers/) ::: + ### 1. Configure OpenAI Credentials Edit the `basic.yaml` file to replace the OpenAI placeholder value: + - Find the section containing `OPENAI_API_KEY` - Replace it with your actual OpenAI API key @@ -32,7 +36,8 @@ The key will be stored in a Kubernetes secret. ### 2. Apply Configuration -Apply the updated configuration and wait for the Gateway pod to be ready, and restart the ext-proc to pick up the updated secrets: +Apply the updated configuration and wait for the Gateway pod to be ready. If you already have a Gateway running, +then the secret credential update will be picked up automatically in a few seconds. ```shell kubectl apply -f basic.yaml @@ -41,11 +46,10 @@ kubectl wait pods --timeout=2m \ -l gateway.envoyproxy.io/owning-gateway-name=envoy-ai-gateway-basic \ -n envoy-gateway-system \ --for=condition=Ready - -kubectl rollout restart deployment/ai-eg-route-extproc-envoy-ai-gateway-basic ``` ### 3. Test the Configuration + You should have set `$GATEWAY_URL` as part of the basic setup before connecting to providers. See the [Basic Usage](../basic-usage.md) page for instructions. @@ -71,18 +75,22 @@ If you encounter issues: 1. Verify your API key is correct and active 2. Check pod status: + ```shell kubectl get pods -n envoy-gateway-system ``` + 3. View controller logs: + ```shell kubectl logs -n envoy-ai-gateway-system deployment/ai-gateway-controller ``` 4. View External Process Logs - ```shell - kubectl logs services/ai-eg-route-extproc-envoy-ai-gateway-basic - ``` + + ```shell + kubectl logs services/ai-eg-route-extproc-envoy-ai-gateway-basic + ``` 5. Common errors: - 401: Invalid API key @@ -92,4 +100,5 @@ If you encounter issues: ## Next Steps After configuring OpenAI: + - [Connect AWS Bedrock](./aws-bedrock.md) to add another provider diff --git a/site/docs/getting-started/prerequisites.md b/site/docs/getting-started/prerequisites.md index c7a2191f3..0dc53e8dd 100644 --- a/site/docs/getting-started/prerequisites.md +++ b/site/docs/getting-started/prerequisites.md @@ -9,6 +9,33 @@ import TabItem from '@theme/TabItem'; Before you begin using Envoy AI Gateway, you'll need to ensure you have the following prerequisites in place: +## Required Tools + +Make sure you have the following tools installed: + +- `kubectl` - The Kubernetes command-line tool +- `helm` - The package manager for Kubernetes +- `curl` - For testing API endpoints (installed by default on most systems) + +:::tip Verify Installation +Run these commands to verify your tools are properly installed: + +Verify kubectl installation: +```shell +kubectl version --client +``` + +Verify helm installation: +```shell +helm version +``` + +Verify curl installation: +```shell +curl --version +``` +::: + ## Kubernetes Cluster :::info Version Requirements @@ -114,30 +141,3 @@ helm upgrade -i eg oci://docker.io/envoyproxy/gateway-helm \ kubectl wait --timeout=2m -n envoy-gateway-system deployment/envoy-gateway --for=condition=Available ``` - -## Required Tools - -Make sure you have the following tools installed: - -- `kubectl` - The Kubernetes command-line tool -- `helm` - The package manager for Kubernetes -- `curl` - For testing API endpoints (installed by default on most systems) - -:::tip Verify Installation -Run these commands to verify your tools are properly installed: - -Verify kubectl installation: -```shell -kubectl version --client -``` - -Verify helm installation: -```shell -helm version -``` - -Verify curl installation: -```shell -curl --version -``` -::: diff --git a/site/docs/index.md b/site/docs/index.md index b2af47764..5fe720ec8 100644 --- a/site/docs/index.md +++ b/site/docs/index.md @@ -6,20 +6,26 @@ sidebar_position: 1 # Envoy AI Gateway Overview -Welcome to the **Envoy AI Gateway** documentation! This open-source project, built on **Envoy Proxy**, aims to simplify how application clients interact with **Generative AI (GenAI)** services. It provides a secure, scalable, and efficient way to manage LLM/AI traffic, with backend rate limiting and policy control. +Welcome to the **Envoy AI Gateway** documentation! This open-source project, built on **Envoy +Proxy**, aims to simplify how application clients interact with **Generative AI (GenAI)** services. +It provides a secure, scalable, and efficient way to manage LLM/AI traffic, with backend rate +limiting and policy control. ## **Project Overview** -The **Envoy AI Gateway** was created to address the complexity of connecting applications to GenAI services by leveraging Envoy's flexibility and Kubernetes-native features. The project has evolved through contributions from the Envoy community, fostering a collaborative approach to solving real-world challenges. +The **Envoy AI Gateway** was created to address the complexity of connecting applications to GenAI +services by leveraging Envoy's flexibility and Kubernetes-native features. The project has evolved +through contributions from the Envoy community, fostering a collaborative approach to solving +real-world challenges. ### **Key Objectives** + - Provide a unified layer for routing and managing LLM/AI traffic. - Support automatic failover mechanisms to ensure service reliability. - Ensure end-to-end security, including upstream authorization for LLM/AI traffic. - Implement a policy framework to support usage limiting use cases. - Foster an open-source community to address GenAI-specific routing and quality of service needs. - ## **Release Goals** The initial release focuses on key foundational features to provide LLM/AI traffic management: @@ -27,29 +33,41 @@ The initial release focuses on key foundational features to provide LLM/AI traff - **Request Routing**: Directs API requests to appropriate GenAI services - **Authentication and Authorization**: Implement API key validation to secure communication. - **Backend Security Policy**: Introduces fine-grained access control for backend services. -This also controls LLM/AI backend usage using token-per-second (TPS) policies to prevent overuse. -- **Multi-Upstream Provider Support for LLM/AI Services**: The ability to receive requests in the format of one LLM provider and route them to different upstream providers, ensuring compatibility with their expected formats. This is made possible through built-in transformation capabilities that adapt requests and responses accordingly. -- **AWS Request Signing**: Supports external processing for secure communication with AWS-hosted LLM/AI services. - -Documentation for installation, setup, and contribution guidelines is included to help new users and contributors get started easily. + This also controls LLM/AI backend usage using token-per-second (TPS) policies to prevent overuse. +- **Multi-Upstream Provider Support for LLM/AI Services**: The ability to receive requests in the + format of one LLM provider and route them to different upstream providers, ensuring compatibility + with their expected formats. This is made possible through built-in transformation capabilities that + adapt requests and responses accordingly. +- **AWS Request Signing**: Supports external processing for secure communication with AWS-hosted + LLM/AI services. +Documentation for installation, setup, and contribution guidelines is included to help new users and +contributors get started easily. ## **Community Collaboration** -Weekly community meetings are held every Thursday to discuss updates, address issues, and review contributions. +[Weekly community meetings][meeting-notes] are held every Thursday to discuss updates, address +issues, and review contributions. ## **Architecture Overview** - ## **Get Involved** We welcome community contributions! Here's how you can participate: -- Attend the weekly community meetings to stay updated and share ideas. + +- Attend the [weekly community meetings][meeting-notes] to stay updated and share ideas. - Submit feature requests and pull requests via the GitHub repository. -- Join discussions in the #envoy-ai-gateway Slack channel. +- Join discussions in the [#envoy-ai-gateway] Slack channel. -Refer to the contribution guide in the GitHub repository for detailed instructions on setting up your environment and contributing. +Refer to [this contributing guide][contributing.md] for detailed instructions on setting up your +environment and contributing. --- -The **Envoy AI Gateway** addresses the growing demand for secure, scalable, and efficient AI/LLM traffic management. Your contributions and feedback are key to its success and to advancing the future of AI service integration. +The **Envoy AI Gateway** addresses the growing demand for secure, scalable, and efficient AI/LLM +traffic management. Your contributions and feedback are key to its success and to advancing the +future of AI service integration. + +[meeting-notes]: https://docs.google.com/document/d/10e1sfsF-3G3Du5nBHGmLjXw5GVMqqCvFDqp_O65B0_w +[#envoy-ai-gateway]: https://envoyproxy.slack.com/archives/C07Q4N24VAA +[contributing.md]: https://github.com/envoyproxy/ai-gateway/blob/main/CONTRIBUTING.md diff --git a/site/docusaurus.config.ts b/site/docusaurus.config.ts index 05902a816..24211e69e 100644 --- a/site/docusaurus.config.ts +++ b/site/docusaurus.config.ts @@ -52,6 +52,10 @@ const config: Config = { theme: { customCss: './src/css/custom.css', }, + // Will be passed to @docusaurus/plugin-google-gtag (only enabled when explicitly specified) + gtag: { + trackingID: 'G-DXJEH1ZRXX', + }, } satisfies Preset.Options, ], ], diff --git a/site/src/pages/index.tsx b/site/src/pages/index.tsx index 08d705dab..a721f164f 100644 --- a/site/src/pages/index.tsx +++ b/site/src/pages/index.tsx @@ -29,7 +29,7 @@ export default function Home(): JSX.Element { return ( + description={`${siteConfig.tagline}`}>
diff --git a/tests/doctest/doctest.go b/tests/doctest/doctest.go deleted file mode 100644 index 3738d8a84..000000000 --- a/tests/doctest/doctest.go +++ /dev/null @@ -1,150 +0,0 @@ -//go:build test_doctest - -package doctest - -import ( - "bytes" - "context" - "fmt" - "os" - "os/exec" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/yuin/goldmark" - "github.com/yuin/goldmark/ast" - "github.com/yuin/goldmark/parser" - "github.com/yuin/goldmark/text" -) - -// codeBlock represents a single code block in a markdown file. -type codeBlock struct { - // lines is each line of the code block. - lines []string -} - -// String implements the fmt.Stringer interface for debugging. -func (c codeBlock) String() string { - var str string - for i, line := range c.lines { - str += fmt.Sprintf("%d: %s", i, line) - } - return str -} - -// requireRunAllLines runs all lines in a code block, skipping empty/commented lines. -func (c codeBlock) requireRunAllLines(t *testing.T) { - for _, line := range c.lines { - requireRunBashCommand(t, line) - } -} - -// requireExtractCodeBlocks extracts all code blocks from a markdown file. -// -// This skips all lines starting with "#", which are comments. -func requireExtractCodeBlocks(t *testing.T, path string) []codeBlock { - source, err := os.ReadFile(path) - require.NoError(t, err) - - md := goldmark.New(goldmark.WithParserOptions(parser.WithAutoHeadingID())) - doc := md.Parser().Parse(text.NewReader(source)) - - var codeBlocks []codeBlock - err = ast.Walk(doc, func(n ast.Node, entering bool) (ast.WalkStatus, error) { - if entering { - if rawBlock, ok := n.(*ast.FencedCodeBlock); ok { - var blk codeBlock - for i := 0; i < rawBlock.Lines().Len(); i++ { - line := rawBlock.Lines().At(i) - if len(line.Value(source)) == 0 || line.Value(source)[0] == '#' { - continue - } - blk.lines = append(blk.lines, string(line.Value(source))) - } - codeBlocks = append(codeBlocks, blk) - } - } - return ast.WalkContinue, nil - }) - require.NoError(t, err) - return codeBlocks -} - -// requireExecutableInPath checks if the executables are in the PATH. -func requireExecutableInPath(t *testing.T, executables ...string) { - // Always require "bash" to run the code blocks. - _, err := exec.LookPath("bash") - require.NoError(t, err, "bash not found in PATH") - for _, executable := range executables { - _, err := exec.LookPath(executable) - require.NoError(t, err, "executable %s not found in PATH", executable) - } -} - -// requireRunBashCommand runs a bash command. This is used to run the code blocks in the markdown file. -func requireRunBashCommand(t *testing.T, command string) { - fmt.Printf("\u001b[32m=== Running command: %s\u001B[0m\n", command) - cmd := newBashCommand(command) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - require.NoError(t, cmd.Run()) -} - -func requireRunBashCommandEventually(t *testing.T, command string, timeout, interval time.Duration) { - fmt.Printf("\u001b[32m=== Running command eventually (timeout=%s,interval=%s): %s\u001B[0m\n", - timeout, interval, command) - require.Eventually(t, func() bool { - cmd := newBashCommand(command) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - return cmd.Run() == nil - }, timeout, interval) -} - -func requireStartBackgroundBashCommand(t *testing.T, command string) (kill func()) { - fmt.Printf("\u001b[32m=== Starting background command: %s\u001B[0m\n", command) - cmd := newBashCommand(command) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - require.NoError(t, cmd.Start()) - return func() { _ = cmd.Process.Signal(os.Interrupt) } -} - -func runBashCommandAndIgnoreError(command string) { - cmd := newBashCommand(command) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - _ = cmd.Run() -} - -func newBashCommand(command string) *exec.Cmd { - return exec.Command("bash", "-c", command) -} - -// requireNewKindCluster creates a new kind cluster if it does not already exist. -func requireNewKindCluster(t *testing.T, clusterName string) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - const kindPath = "../../.bin/kind" // This is automatically installed as a dependency in the Makefile. - - cmd := exec.CommandContext(ctx, kindPath, "create", "cluster", "--name", clusterName) - out, err := cmd.CombinedOutput() - if err != nil && !bytes.Contains(out, []byte("already exist")) { - require.NoError(t, err, "error creating kind cluster") - } - - cmd = exec.CommandContext(ctx, kindPath, "export", "kubeconfig", "--name", clusterName) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - require.NoError(t, cmd.Run()) -} - -// getEnvVarOrSkip requires an environment variable to be set. -func getEnvVarOrSkip(t *testing.T, envVar string) string { - value := os.Getenv(envVar) - if value == "" { - t.Skipf("Environment variable %s is not set", envVar) - } - return value -} diff --git a/tests/doctest/getting_started_test.go b/tests/doctest/getting_started_test.go deleted file mode 100644 index 4eb71b1de..000000000 --- a/tests/doctest/getting_started_test.go +++ /dev/null @@ -1,102 +0,0 @@ -//go:build test_doctest - -package doctest - -import ( - "os" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -// TestGettingStarted tests the code blocks of docs/getting_started.md file. -func TestGettingStarted(t *testing.T) { - t.Skip("TODO") - - requireNewKindCluster(t, "envoy-ai-gateway-getting-started") - requireExecutableInPath(t, "curl", "helm", "kubectl") - - path := "../../site/docs/getting_started.md" - codeBlocks := requireExtractCodeBlocks(t, path) - - for _, block := range codeBlocks { - t.Log(block) - } - - t.Run("EG Install", func(t *testing.T) { - egInstallBlock := codeBlocks[0] - require.Len(t, egInstallBlock.lines, 2) - egInstallBlock.requireRunAllLines(t) - }) - - t.Run("AI Gateway install", func(t *testing.T) { - aiGatewayBlock := codeBlocks[1] - require.Len(t, aiGatewayBlock.lines, 3) - aiGatewayBlock.requireRunAllLines(t) - }) - - t.Run("AI Gateway EG config", func(t *testing.T) { - aiGatewayEGConfigBlock := codeBlocks[2] - require.Len(t, aiGatewayEGConfigBlock.lines, 4) - aiGatewayEGConfigBlock.requireRunAllLines(t) - }) - - t.Run("Deploy Basic Gateway", func(t *testing.T) { - deployGatewayBlock := codeBlocks[3] - require.Len(t, deployGatewayBlock.lines, 2) - requireRunBashCommand(t, deployGatewayBlock.lines[0]) - // Gateway deployment may take a while to be ready (managed by the EG operator). - requireRunBashCommandEventually(t, deployGatewayBlock.lines[1], time.Minute, 2*time.Second) - }) - - t.Run("Make a request", func(t *testing.T) { - makeRequestBlock := codeBlocks[4] - require.Len(t, makeRequestBlock.lines, 2) - // Run the port-forward command in the background. - kill := requireStartBackgroundBashCommand(t, makeRequestBlock.lines[0]) - defer kill() - // Then make the request. - requireRunBashCommandEventually(t, makeRequestBlock.lines[1], time.Minute, 2*time.Second) - }) - - // The next code block is just the example output of the previous code block. - _ = codeBlocks[5] - - t.Run("Delete Gateway", func(t *testing.T) { - deleteGatewayBlock := codeBlocks[6] - require.Len(t, deleteGatewayBlock.lines, 2) - requireRunBashCommand(t, deleteGatewayBlock.lines[0]) // Delete the Gateway. - runBashCommandAndIgnoreError(deleteGatewayBlock.lines[1]) // Wait for the Gateway to be deleted. - }) - - t.Run("OpenAI and AWS", func(t *testing.T) { - openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY") - awsAccessKeyID := getEnvVarOrSkip(t, "TEST_AWS_ACCESS_KEY_ID") - awsSecretAccessKey := getEnvVarOrSkip(t, "TEST_AWS_SECRET_ACCESS_KEY") - - tmpFile := t.TempDir() + "/openai-and-aws.yaml" - // Replace the placeholders with the actual values. - _f, err := os.ReadFile("../../examples/basic/basic.yaml") - require.NoError(t, err) - f := strings.ReplaceAll(string(_f), "OPENAI_API_KEY", openAIAPIKey) - f = strings.ReplaceAll(f, "AWS_ACCESS_KEY_ID", awsAccessKeyID) - f = strings.ReplaceAll(f, "AWS_SECRET_ACCESS_KEY", awsSecretAccessKey) - require.NoError(t, os.WriteFile(tmpFile, []byte(f), 0o600)) - - // Apply the configuration. - requireRunBashCommand(t, "kubectl apply -f "+tmpFile) - - openAIAndAWSBlock := codeBlocks[7] - require.Len(t, openAIAndAWSBlock.lines, 4) - // Wait for the gateway to be ready. - requireRunBashCommandEventually(t, openAIAndAWSBlock.lines[0], time.Minute, 2*time.Second) - // Run the port-forward command in the background. - kill := requireStartBackgroundBashCommand(t, openAIAndAWSBlock.lines[1]) - defer kill() - // Then make the request to OpenAI and AWS. - requireRunBashCommandEventually(t, openAIAndAWSBlock.lines[2], 30*time.Second, 2*time.Second) - requireRunBashCommandEventually(t, openAIAndAWSBlock.lines[3], 30*time.Second, 2*time.Second) - }) -} diff --git a/tests/e2e/basic_test.go b/tests/e2e/basic_test.go index 8ae2a368c..2c8230a64 100644 --- a/tests/e2e/basic_test.go +++ b/tests/e2e/basic_test.go @@ -15,38 +15,40 @@ import ( ) // TestExamplesBasic tests the basic example in examples/basic directory. -// -// This requires the following environment variables to be set: -// - TEST_AWS_ACCESS_KEY_ID -// - TEST_AWS_SECRET_ACCESS_KEY -// - TEST_OPENAI_API_KEY -// -// The test will be skipped if any of these are not set. func Test_Examples_Basic(t *testing.T) { - openAiApiKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY") - awsAccessKeyID := getEnvVarOrSkip(t, "TEST_AWS_ACCESS_KEY_ID") - awsSecretAccessKey := getEnvVarOrSkip(t, "TEST_AWS_SECRET_ACCESS_KEY") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() const manifest = "../../examples/basic/basic.yaml" - read, err := os.ReadFile(manifest) - require.NoError(t, err) - // Replace the placeholder with the actual API key. - replaced := strings.ReplaceAll(string(read), "OPENAI_API_KEY", openAiApiKey) - replaced = strings.ReplaceAll(replaced, "AWS_ACCESS_KEY_ID", awsAccessKeyID) - replaced = strings.ReplaceAll(replaced, "AWS_SECRET_ACCESS_KEY", awsSecretAccessKey) - require.NoError(t, kubectlApplyManifestStdin(ctx, replaced)) + require.NoError(t, kubectlApplyManifest(ctx, manifest)) const egSelector = "gateway.envoyproxy.io/owning-gateway-name=envoy-ai-gateway-basic" requireWaitForPodReady(t, egNamespace, egSelector) - t.Run("/chat/completions", func(t *testing.T) { - for _, tc := range []struct { - name string - modelName string - }{ + testUpstreamCase := examplesBasicTestCase{name: "testupsream", modelName: "some-cool-self-hosted-model"} + testUpstreamCase.run(t, egNamespace, egSelector) + + // This requires the following environment variables to be set: + // - TEST_AWS_ACCESS_KEY_ID + // - TEST_AWS_SECRET_ACCESS_KEY + // - TEST_OPENAI_API_KEY + // + // The test will be skipped if any of these are not set. + t.Run("with credentials", func(t *testing.T) { + openAiApiKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY") + awsAccessKeyID := getEnvVarOrSkip(t, "TEST_AWS_ACCESS_KEY_ID") + awsSecretAccessKey := getEnvVarOrSkip(t, "TEST_AWS_SECRET_ACCESS_KEY") + read, err := os.ReadFile(manifest) + require.NoError(t, err) + // Replace the placeholder with the actual API key. + replaced := strings.ReplaceAll(string(read), "OPENAI_API_KEY", openAiApiKey) + replaced = strings.ReplaceAll(replaced, "AWS_ACCESS_KEY_ID", awsAccessKeyID) + replaced = strings.ReplaceAll(replaced, "AWS_SECRET_ACCESS_KEY", awsSecretAccessKey) + require.NoError(t, kubectlApplyManifestStdin(ctx, replaced)) + + time.Sleep(5 * time.Second) // At least 5 seconds for the updated secret to be propagated. + + for _, tc := range []examplesBasicTestCase{ { name: "openai", modelName: "gpt-4o-mini", @@ -55,41 +57,46 @@ func Test_Examples_Basic(t *testing.T) { name: "aws", modelName: "us.meta.llama3-2-1b-instruct-v1:0", }, - { - name: "testupsream", - modelName: "some-cool-self-hosted-model", - }, } { - t.Run(tc.name, func(t *testing.T) { - require.Eventually(t, func() bool { - fwd := requireNewHTTPPortForwarder(t, egNamespace, egSelector, egDefaultPort) - defer fwd.kill() + tc.run(t, egNamespace, egSelector) + } + }) +} + +type examplesBasicTestCase struct { + name string + modelName string +} + +func (tc examplesBasicTestCase) run(t *testing.T, egNamespace, egSelector string) { + t.Run(tc.name, func(t *testing.T) { + require.Eventually(t, func() bool { + fwd := requireNewHTTPPortForwarder(t, egNamespace, egSelector, egDefaultPort) + defer fwd.kill() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - client := openai.NewClient(option.WithBaseURL(fwd.address() + "/v1/")) + client := openai.NewClient(option.WithBaseURL(fwd.address() + "/v1/")) - chatCompletion, err := client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{ - Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ - openai.UserMessage("Say this is a test"), - }), - Model: openai.F(tc.modelName), - }) - if err != nil { - t.Logf("error: %v", err) - return false - } - var choiceNonEmpty bool - for _, choice := range chatCompletion.Choices { - t.Logf("choice: %s", choice.Message.Content) - if choice.Message.Content != "" { - choiceNonEmpty = true - } - } - return choiceNonEmpty - }, 10*time.Second, 1*time.Second) + chatCompletion, err := client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{ + Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ + openai.UserMessage("Say this is a test"), + }), + Model: openai.F(tc.modelName), }) - } + if err != nil { + t.Logf("error: %v", err) + return false + } + var choiceNonEmpty bool + for _, choice := range chatCompletion.Choices { + t.Logf("choice: %s", choice.Message.Content) + if choice.Message.Content != "" { + choiceNonEmpty = true + } + } + return choiceNonEmpty + }, 20*time.Second, 3*time.Second) }) } diff --git a/tests/extproc/custom_extproc_test.go b/tests/extproc/custom_extproc_test.go index 44f21a17e..b86ef2e89 100644 --- a/tests/extproc/custom_extproc_test.go +++ b/tests/extproc/custom_extproc_test.go @@ -15,7 +15,7 @@ import ( "github.com/openai/openai-go/option" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/tests/internal/testupstreamlib" ) @@ -25,15 +25,15 @@ func TestExtProcCustomRouter(t *testing.T) { requireRunEnvoy(t, "/dev/null") requireTestUpstream(t) configPath := t.TempDir() + "/extproc-config.yaml" - requireWriteFilterConfig(t, configPath, &filterconfig.Config{ + requireWriteFilterConfig(t, configPath, &filterapi.Config{ Schema: openAISchema, // This can be any header key, but it must match the envoy.yaml routing configuration. SelectedBackendHeaderKey: "x-selected-backend-name", ModelNameHeaderKey: "x-model-name", - Rules: []filterconfig.RouteRule{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{{Name: "testupstream", Schema: openAISchema}}, - Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "something-cool"}}, + Backends: []filterapi.Backend{{Name: "testupstream", Schema: openAISchema}}, + Headers: []filterapi.HeaderMatch{{Name: "x-model-name", Value: "something-cool"}}, }, }, }) diff --git a/tests/extproc/extproc_test.go b/tests/extproc/extproc_test.go index cb0be1f59..950c3beb9 100644 --- a/tests/extproc/extproc_test.go +++ b/tests/extproc/extproc_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" "sigs.k8s.io/yaml" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) const listenerAddress = "http://localhost:1062" @@ -25,14 +25,14 @@ const listenerAddress = "http://localhost:1062" var envoyYamlBase string var ( - openAISchema = filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI} - awsBedrockSchema = filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock} + openAISchema = filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI} + awsBedrockSchema = filterapi.VersionedAPISchema{Name: filterapi.APISchemaAWSBedrock} ) // requireExtProc starts the external processor with the provided executable and configPath // with additional environment variables. // -// The config must be in YAML format specified in [filterconfig.Config] type. +// The config must be in YAML format specified in [filterapi.Config] type. func requireExtProc(t *testing.T, stdout io.Writer, executable, configPath string, envs ...string) { cmd := exec.Command(executable) cmd.Stdout = stdout @@ -93,8 +93,8 @@ func getEnvVarOrSkip(t *testing.T, envVar string) string { return value } -// requireWriteFilterConfig writes the provided [filterconfig.Config] to the configPath in YAML format. -func requireWriteFilterConfig(t *testing.T, configPath string, config *filterconfig.Config) { +// requireWriteFilterConfig writes the provided [filterapi.Config] to the configPath in YAML format. +func requireWriteFilterConfig(t *testing.T, configPath string, config *filterapi.Config) { configBytes, err := yaml.Marshal(config) require.NoError(t, err) require.NoError(t, os.WriteFile(configPath, configBytes, 0o600)) diff --git a/tests/extproc/real_providers_test.go b/tests/extproc/real_providers_test.go index 0dd0a92a2..117882451 100644 --- a/tests/extproc/real_providers_test.go +++ b/tests/extproc/real_providers_test.go @@ -17,7 +17,7 @@ import ( "github.com/openai/openai-go/option" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" ) // TestRealProviders tests the end-to-end flow of the external processor with Envoy and real providers. @@ -58,31 +58,31 @@ func TestWithRealProviders(t *testing.T) { require.NoError(t, err) require.NoError(t, awsFile.Sync()) - requireWriteFilterConfig(t, configPath, &filterconfig.Config{ + requireWriteFilterConfig(t, configPath, &filterapi.Config{ MetadataNamespace: "ai_gateway_llm_ns", - LLMRequestCosts: []filterconfig.LLMRequestCost{ - {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, - {MetadataKey: "some_cel", Type: filterconfig.LLMRequestCostTypeCELExpression, CELExpression: "1+1"}, + LLMRequestCosts: []filterapi.LLMRequestCost{ + {MetadataKey: "used_token", Type: filterapi.LLMRequestCostTypeInputToken}, + {MetadataKey: "some_cel", Type: filterapi.LLMRequestCostTypeCELExpression, CELExpression: "1+1"}, }, Schema: openAISchema, // This can be any header key, but it must match the envoy.yaml routing configuration. SelectedBackendHeaderKey: "x-selected-backend-name", ModelNameHeaderKey: "x-model-name", - Rules: []filterconfig.RouteRule{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterconfig.BackendAuth{ - APIKey: &filterconfig.APIKeyAuth{Filename: apiKeyFilePath}, + Backends: []filterapi.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterapi.BackendAuth{ + APIKey: &filterapi.APIKeyAuth{Filename: apiKeyFilePath}, }}}, - Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}}, + Headers: []filterapi.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}}, }, { - Backends: []filterconfig.Backend{ - {Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterconfig.BackendAuth{AWSAuth: &filterconfig.AWSAuth{ + Backends: []filterapi.Backend{ + {Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterapi.BackendAuth{AWSAuth: &filterapi.AWSAuth{ CredentialFileName: awsFilePath, Region: "us-east-1", }}}, }, - Headers: []filterconfig.HeaderMatch{ + Headers: []filterapi.HeaderMatch{ {Name: "x-model-name", Value: "us.meta.llama3-2-1b-instruct-v1:0"}, {Name: "x-model-name", Value: "us.anthropic.claude-3-5-sonnet-20240620-v1:0"}, }, diff --git a/tests/extproc/testupstream_test.go b/tests/extproc/testupstream_test.go index 6354ddf22..eb426fe7c 100644 --- a/tests/extproc/testupstream_test.go +++ b/tests/extproc/testupstream_test.go @@ -15,7 +15,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" - "github.com/envoyproxy/ai-gateway/filterconfig" + "github.com/envoyproxy/ai-gateway/filterapi" "github.com/envoyproxy/ai-gateway/tests/internal/testupstreamlib" ) @@ -29,23 +29,23 @@ func TestWithTestUpstream(t *testing.T) { configPath := t.TempDir() + "/extproc-config.yaml" requireTestUpstream(t) - requireWriteFilterConfig(t, configPath, &filterconfig.Config{ + requireWriteFilterConfig(t, configPath, &filterapi.Config{ MetadataNamespace: "ai_gateway_llm_ns", - LLMRequestCosts: []filterconfig.LLMRequestCost{ - {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, + LLMRequestCosts: []filterapi.LLMRequestCost{ + {MetadataKey: "used_token", Type: filterapi.LLMRequestCostTypeInputToken}, }, Schema: openAISchema, // This can be any header key, but it must match the envoy.yaml routing configuration. SelectedBackendHeaderKey: "x-selected-backend-name", ModelNameHeaderKey: "x-model-name", - Rules: []filterconfig.RouteRule{ + Rules: []filterapi.RouteRule{ { - Backends: []filterconfig.Backend{{Name: "testupstream", Schema: openAISchema}}, - Headers: []filterconfig.HeaderMatch{{Name: "x-test-backend", Value: "openai"}}, + Backends: []filterapi.Backend{{Name: "testupstream", Schema: openAISchema}}, + Headers: []filterapi.HeaderMatch{{Name: "x-test-backend", Value: "openai"}}, }, { - Backends: []filterconfig.Backend{{Name: "testupstream", Schema: awsBedrockSchema}}, - Headers: []filterconfig.HeaderMatch{{Name: "x-test-backend", Value: "aws-bedrock"}}, + Backends: []filterapi.Backend{{Name: "testupstream", Schema: awsBedrockSchema}}, + Headers: []filterapi.HeaderMatch{{Name: "x-test-backend", Value: "aws-bedrock"}}, }, }, })