Skip to content

Commit

Permalink
[ext_proc] Add extension point to run custom logic after onReceiveMes…
Browse files Browse the repository at this point in the history
…sage (#37916)

Commit Message:
Additional Description:

Fixes #34501 
Risk Level:
Testing:
Docs Changes:
Release Notes:
Platform Specific Features:

---------

Signed-off-by: pcrao <pcrao@google.com>
  • Loading branch information
pradeepcrao authored Feb 20, 2025
1 parent d76d53b commit 71d1eb6
Show file tree
Hide file tree
Showing 25 changed files with 1,610 additions and 8 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extensions/filters/common/original_src @klarose @mattklein123
# external processing filter
/*/extensions/filters/http/ext_proc @gbrail @stevenzzzz @tyxia @mattklein123 @yanavlasov @yanjunxiang-google
/*/extensions/filters/common/mutation_rules @gbrail @tyxia @mattklein123 @yanavlasov
/*/extensions/http/ext_proc/response_processors/save_processing_response @pradeepcrao @botengyao @yanjunxiang-google
# jwt_authn http filter extension
/*/extensions/filters/http/jwt_authn @taoxuy @lizan @tyxia @yanavlasov
# grpc_field_extraction http filter extension
Expand Down
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ proto_library(
"//envoy/extensions/http/custom_response/local_response_policy/v3:pkg",
"//envoy/extensions/http/custom_response/redirect_policy/v3:pkg",
"//envoy/extensions/http/early_header_mutation/header_mutation/v3:pkg",
"//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg",
"//envoy/extensions/http/header_formatters/preserve_case/v3:pkg",
"//envoy/extensions/http/header_validators/envoy_default/v3:pkg",
"//envoy/extensions/http/injected_credentials/generic/v3:pkg",
Expand Down
8 changes: 7 additions & 1 deletion api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package envoy.extensions.filters.http.ext_proc.v3;

import "envoy/config/common/mutation_rules/v3/mutation_rules.proto";
import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/extension.proto";
import "envoy/config/core/v3/grpc_service.proto";
import "envoy/config/core/v3/http_service.proto";
import "envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto";
Expand Down Expand Up @@ -96,7 +97,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// <arch_overview_advanced_filter_state_sharing>` object in a namespace matching the filter
// name.
//
// [#next-free-field: 23]
// [#next-free-field: 24]
message ExternalProcessor {
// Describes the route cache action to be taken when an external processor response
// is received in response to request headers.
Expand Down Expand Up @@ -328,6 +329,11 @@ message ExternalProcessor {
// the ``allowed_override_modes`` allow-list below.
// Since request_header_mode is not applicable in any way, it's ignored in comparison.
repeated ProcessingMode allowed_override_modes = 22;

// Decorator to introduce custom logic that runs after a message received from
// the External Processor is processed.
// [#extension-category: envoy.http.ext_proc.response_processors]
config.core.v3.TypedExtensionConfig on_processing_response = 23;
}

// ExtProcHttpService is used for HTTP communication between the filter and the external processing service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
syntax = "proto3";

package envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3;

import "xds/annotations/v3/status.proto";

import "udpa/annotations/status.proto";

option java_package = "io.envoyproxy.envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3";
option java_outer_classname = "SaveProcessingResponseProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3;save_processing_responsev3";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
option (xds.annotations.v3.file_status).work_in_progress = true;

// [#protodoc-title: Save Processing Response from external processor.]
// [#extension: envoy.http.ext_proc.response_processors.save_processing_response]

// Extension to save the :ref:`response
// <envoy_v3_api_msg_service.ext_proc.v3.ProcessingResponse>` from the external processor as
// filter state with name
// "envoy.http.ext_proc.response_processors.save_processing_response[.:ref:`filter_state_name_suffix
// <envoy_v3_api_field_extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse.filter_state_name>`].
// This extension supports saving of request and response headers and trailers,
// and immediate response.
// [#next-free-field: 7]
message SaveProcessingResponse {
message SaveOptions {
// Whether or not to save the response for the response type.
bool save_response = 1;

// When true, saves the response if there was an error when processing
// the response from the external processor.
bool save_on_error = 2;
}

// The default filter state name is
// "envoy.http.ext_proc.response_processors.save_processing_response".
// If defined, ``filter_state_name_suffix`` is appended to this.
// For example, setting ``filter_state_name_suffix`` to "xyz" will set the
// filter state name to "envoy.http.ext_proc.response_processors.save_processing_response.xyz"
string filter_state_name_suffix = 1;

// Save the response to filter state when :ref:`request_headers
// <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.request_headers>` is set.
SaveOptions save_request_headers = 2;

// Save the response to filter state when :ref:`response_headers
// <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.response_headers>` is set.
SaveOptions save_response_headers = 3;

// Save the response to filter state when :ref:`request_trailers
// <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.request_trailers>` is set.
SaveOptions save_request_trailers = 4;

// Save the response to filter state when :ref:`response_trailers
// <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.response_trailers>` is set.
SaveOptions save_response_trailers = 5;

// Save the response to filter state when :ref:`immediate_response
// <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.immediate_response>` is set.
SaveOptions save_immediate_response = 6;
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ proto_library(
"//envoy/extensions/http/custom_response/local_response_policy/v3:pkg",
"//envoy/extensions/http/custom_response/redirect_policy/v3:pkg",
"//envoy/extensions/http/early_header_mutation/header_mutation/v3:pkg",
"//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg",
"//envoy/extensions/http/header_formatters/preserve_case/v3:pkg",
"//envoy/extensions/http/header_validators/envoy_default/v3:pkg",
"//envoy/extensions/http/injected_credentials/generic/v3:pkg",
Expand Down
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ new_features:
change: |
Added the initial support for shared libraries to be loaded by Envoy at runtime. Please refer to the overview documentation for the
feature :ref:`here <arch_overview_dynamic_modules>`.
- area: ext_proc
change: |
Added an :ref:`extension
<envoy_v3_api_msg_extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse>` to save the
:ref:`response <envoy_v3_api_msg_service.ext_proc.v3.ProcessingResponse>` from the external processor to filter state.
- area: http
change: |
Made the :ref:`credential injector filter <envoy_v3_api_msg_extensions.filters.http.credential_injector.v3.CredentialInjector>`
Expand Down
1 change: 1 addition & 0 deletions docs/root/api-v3/config/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Extensions
health_checker/health_checker
http/early_header_mutation
http/custom_response
http/ext_proc
http/header_formatters
http/header_validators
http/original_ip_detection
Expand Down
8 changes: 8 additions & 0 deletions docs/root/api-v3/config/http/ext_proc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
External Processing Extensions
==============================

.. toctree::
:glob:
:maxdepth: 2

../../extensions/http/ext_proc/*/*/v3/*
5 changes: 5 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ EXTENSIONS = {
"envoy.http.custom_response.redirect_policy": "//source/extensions/http/custom_response/redirect_policy:redirect_policy_lib",
"envoy.http.custom_response.local_response_policy": "//source/extensions/http/custom_response/local_response_policy:local_response_policy_lib",

#
# External Processing Response Processors
#
"envoy.http.ext_proc.response_processors.save_processing_response": "//source/extensions/http/ext_proc/response_processors/save_processing_response:save_processing_response_lib",

#
# Injected credentials
#
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,11 @@ envoy.http.custom_response.local_response_policy:
status: wip
type_urls:
- envoy.extensions.http.custom_response.local_response_policy.v3.LocalResponsePolicy
envoy.http.ext_proc.response_processors.save_processing_response:
categories:
- envoy.http.ext_proc.response_processors
security_posture: unknown
status: wip
envoy.config_subscription.rest:
categories:
- envoy.config_subscription
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_library(
":client_interface",
":matching_utils_lib",
":mutation_utils_lib",
":on_processing_response_interface",
"//envoy/event:timer_interface",
"//envoy/http:filter_interface",
"//envoy/http:header_map_interface",
Expand Down Expand Up @@ -141,3 +142,15 @@ envoy_cc_library(
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "on_processing_response_interface",
hdrs = ["on_processing_response.h"],
tags = ["skip_on_windows"],
deps = [
"//envoy/config:typed_config_interface",
"//envoy/server:factory_context_interface",
"//source/common/stats:symbol_table_lib",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)
68 changes: 68 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#include "source/extensions/filters/http/ext_proc/ext_proc.h"

#include <functional>
#include <memory>
#include <utility>

#include "envoy/config/common/mutation_rules/v3/mutation_rules.pb.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h"

#include "source/common/config/utility.h"
#include "source/common/http/utility.h"
#include "source/common/protobuf/utility.h"
#include "source/common/runtime/runtime_features.h"
#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h"
#include "source/extensions/filters/http/ext_proc/mutation_utils.h"
#include "source/extensions/filters/http/ext_proc/on_processing_response.h"

#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
Expand Down Expand Up @@ -225,6 +229,8 @@ FilterConfig::FilterConfig(const ExternalProcessor& config,
expression_manager_(builder, context.localInfo(), config.request_attributes(),
config.response_attributes()),
immediate_mutation_checker_(context.regexEngine()),
on_processing_response_factory_cb_(
createOnProcessingResponseCb(config, context, stats_prefix)),
thread_local_stream_manager_slot_(context.threadLocal().allocateSlot()) {

if (config.disable_clear_route_cache()) {
Expand Down Expand Up @@ -1297,26 +1303,50 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
setDecoderDynamicMetadata(*response);
processing_status = decoding_state_.handleHeadersResponse(response->request_headers());
if (on_processing_response_) {
on_processing_response_->afterProcessingRequestHeaders(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
setEncoderDynamicMetadata(*response);
processing_status = encoding_state_.handleHeadersResponse(response->response_headers());
if (on_processing_response_) {
on_processing_response_->afterProcessingResponseHeaders(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
setDecoderDynamicMetadata(*response);
processing_status = decoding_state_.handleBodyResponse(response->request_body());
if (on_processing_response_) {
on_processing_response_->afterProcessingRequestBody(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kResponseBody:
setEncoderDynamicMetadata(*response);
processing_status = encoding_state_.handleBodyResponse(response->response_body());
if (on_processing_response_) {
on_processing_response_->afterProcessingResponseBody(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
setDecoderDynamicMetadata(*response);
processing_status = decoding_state_.handleTrailersResponse(response->request_trailers());
if (on_processing_response_) {
on_processing_response_->afterProcessingRequestTrailers(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
setEncoderDynamicMetadata(*response);
processing_status = encoding_state_.handleTrailersResponse(response->response_trailers());
if (on_processing_response_) {
on_processing_response_->afterProcessingResponseTrailers(*response, processing_status,
decoder_callbacks_->streamInfo());
}
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
if (config_->disableImmediateResponse()) {
Expand All @@ -1325,6 +1355,11 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
"Treat the immediate response message as spurious response.");
processing_status =
absl::FailedPreconditionError("unhandled immediate response due to config disabled it");

if (on_processing_response_) {
on_processing_response_->afterReceivingImmediateResponse(*response, processing_status,
decoder_callbacks_->streamInfo());
}
} else {
setDecoderDynamicMetadata(*response);
// We won't be sending anything more to the stream after we
Expand All @@ -1333,6 +1368,10 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
processing_complete_ = true;
onFinishProcessorCalls(Grpc::Status::Ok);
closeStream();
if (on_processing_response_) {
on_processing_response_->afterReceivingImmediateResponse(*response, processing_status,
decoder_callbacks_->streamInfo());
}
sendImmediateResponse(response->immediate_response());
processing_status = absl::OkStatus();
}
Expand Down Expand Up @@ -1624,6 +1663,35 @@ std::string responseCaseToString(const ProcessingResponse::ResponseCase response
}
}

std::function<std::unique_ptr<OnProcessingResponse>()> FilterConfig::createOnProcessingResponseCb(
const ExternalProcessor& config, Envoy::Server::Configuration::CommonFactoryContext& context,
const std::string& stats_prefix) {
if (!config.has_on_processing_response()) {
return nullptr;
}
auto& factory = Envoy::Config::Utility::getAndCheckFactory<OnProcessingResponseFactory>(
config.on_processing_response());
auto on_processing_response_config = Envoy::Config::Utility::translateAnyToFactoryConfig(
config.on_processing_response().typed_config(), context.messageValidationVisitor(), factory);
if (on_processing_response_config == nullptr) {
return nullptr;
}
std::shared_ptr<const Protobuf::Message> shared_on_processing_response_config =
std::move(on_processing_response_config);
return [&factory, shared_on_processing_response_config, &context,
stats_prefix]() -> std::unique_ptr<OnProcessingResponse> {
return factory.createOnProcessingResponse(*shared_on_processing_response_config, context,
stats_prefix);
};
}

std::unique_ptr<OnProcessingResponse> FilterConfig::createOnProcessingResponse() const {
if (!on_processing_response_factory_cb_) {
return nullptr;
}
return on_processing_response_factory_cb_();
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Loading

0 comments on commit 71d1eb6

Please sign in to comment.