diff --git a/CODEOWNERS b/CODEOWNERS index 815be15111f9..dd995bd374a7 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -31,6 +31,7 @@ extensions/filters/common/original_src @klarose @mattklein123 # external processing filter /*/extensions/filters/http/ext_proc @gbrail @stevenzzzz @tyxia @mattklein123 @yanavlasov @yanjunxiang-google /*/extensions/filters/common/mutation_rules @gbrail @tyxia @mattklein123 @yanavlasov +/*/extensions/http/ext_proc/response_processors/save_processing_response @pradeepcrao @botengyao @yanjunxiang-google # jwt_authn http filter extension /*/extensions/filters/http/jwt_authn @taoxuy @lizan @tyxia @yanavlasov # grpc_field_extraction http filter extension diff --git a/api/BUILD b/api/BUILD index 79e325dab2a5..7ae8106dec4e 100644 --- a/api/BUILD +++ b/api/BUILD @@ -273,6 +273,7 @@ proto_library( "//envoy/extensions/http/custom_response/local_response_policy/v3:pkg", "//envoy/extensions/http/custom_response/redirect_policy/v3:pkg", "//envoy/extensions/http/early_header_mutation/header_mutation/v3:pkg", + "//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg", "//envoy/extensions/http/header_formatters/preserve_case/v3:pkg", "//envoy/extensions/http/header_validators/envoy_default/v3:pkg", "//envoy/extensions/http/injected_credentials/generic/v3:pkg", diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index 7aa4b23c20b4..7e4fbffb658b 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -4,6 +4,7 @@ package envoy.extensions.filters.http.ext_proc.v3; import "envoy/config/common/mutation_rules/v3/mutation_rules.proto"; import "envoy/config/core/v3/base.proto"; +import "envoy/config/core/v3/extension.proto"; import "envoy/config/core/v3/grpc_service.proto"; import "envoy/config/core/v3/http_service.proto"; import "envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto"; @@ -96,7 +97,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` object in a namespace matching the filter // name. // -// [#next-free-field: 23] +// [#next-free-field: 24] message ExternalProcessor { // Describes the route cache action to be taken when an external processor response // is received in response to request headers. @@ -328,6 +329,11 @@ message ExternalProcessor { // the ``allowed_override_modes`` allow-list below. // Since request_header_mode is not applicable in any way, it's ignored in comparison. repeated ProcessingMode allowed_override_modes = 22; + + // Decorator to introduce custom logic that runs after a message received from + // the External Processor is processed. + // [#extension-category: envoy.http.ext_proc.response_processors] + config.core.v3.TypedExtensionConfig on_processing_response = 23; } // ExtProcHttpService is used for HTTP communication between the filter and the external processing service. diff --git a/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/BUILD b/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/BUILD new file mode 100644 index 000000000000..d49202b74ab4 --- /dev/null +++ b/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "@com_github_cncf_xds//udpa/annotations:pkg", + "@com_github_cncf_xds//xds/annotations/v3:pkg", + ], +) diff --git a/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.proto b/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.proto new file mode 100644 index 000000000000..bfea5fd1878e --- /dev/null +++ b/api/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.proto @@ -0,0 +1,63 @@ +syntax = "proto3"; + +package envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3; + +import "xds/annotations/v3/status.proto"; + +import "udpa/annotations/status.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3"; +option java_outer_classname = "SaveProcessingResponseProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3;save_processing_responsev3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; +option (xds.annotations.v3.file_status).work_in_progress = true; + +// [#protodoc-title: Save Processing Response from external processor.] +// [#extension: envoy.http.ext_proc.response_processors.save_processing_response] + +// Extension to save the :ref:`response +// ` from the external processor as +// filter state with name +// "envoy.http.ext_proc.response_processors.save_processing_response[.:ref:`filter_state_name_suffix +// `]. +// This extension supports saving of request and response headers and trailers, +// and immediate response. +// [#next-free-field: 7] +message SaveProcessingResponse { + message SaveOptions { + // Whether or not to save the response for the response type. + bool save_response = 1; + + // When true, saves the response if there was an error when processing + // the response from the external processor. + bool save_on_error = 2; + } + + // The default filter state name is + // "envoy.http.ext_proc.response_processors.save_processing_response". + // If defined, ``filter_state_name_suffix`` is appended to this. + // For example, setting ``filter_state_name_suffix`` to "xyz" will set the + // filter state name to "envoy.http.ext_proc.response_processors.save_processing_response.xyz" + string filter_state_name_suffix = 1; + + // Save the response to filter state when :ref:`request_headers + // ` is set. + SaveOptions save_request_headers = 2; + + // Save the response to filter state when :ref:`response_headers + // ` is set. + SaveOptions save_response_headers = 3; + + // Save the response to filter state when :ref:`request_trailers + // ` is set. + SaveOptions save_request_trailers = 4; + + // Save the response to filter state when :ref:`response_trailers + // ` is set. + SaveOptions save_response_trailers = 5; + + // Save the response to filter state when :ref:`immediate_response + // ` is set. + SaveOptions save_immediate_response = 6; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 2f72d3743bb9..b06da412ad58 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -211,6 +211,7 @@ proto_library( "//envoy/extensions/http/custom_response/local_response_policy/v3:pkg", "//envoy/extensions/http/custom_response/redirect_policy/v3:pkg", "//envoy/extensions/http/early_header_mutation/header_mutation/v3:pkg", + "//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg", "//envoy/extensions/http/header_formatters/preserve_case/v3:pkg", "//envoy/extensions/http/header_validators/envoy_default/v3:pkg", "//envoy/extensions/http/injected_credentials/generic/v3:pkg", diff --git a/changelogs/current.yaml b/changelogs/current.yaml index d9b1fa3a4bba..7d879edc9806 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -175,6 +175,11 @@ new_features: change: | Added the initial support for shared libraries to be loaded by Envoy at runtime. Please refer to the overview documentation for the feature :ref:`here `. +- area: ext_proc + change: | + Added an :ref:`extension + ` to save the + :ref:`response ` from the external processor to filter state. - area: http change: | Made the :ref:`credential injector filter ` diff --git a/docs/root/api-v3/config/config.rst b/docs/root/api-v3/config/config.rst index b425c6e684a5..f12fad766ea9 100644 --- a/docs/root/api-v3/config/config.rst +++ b/docs/root/api-v3/config/config.rst @@ -23,6 +23,7 @@ Extensions health_checker/health_checker http/early_header_mutation http/custom_response + http/ext_proc http/header_formatters http/header_validators http/original_ip_detection diff --git a/docs/root/api-v3/config/http/ext_proc.rst b/docs/root/api-v3/config/http/ext_proc.rst new file mode 100644 index 000000000000..d74ca5110648 --- /dev/null +++ b/docs/root/api-v3/config/http/ext_proc.rst @@ -0,0 +1,8 @@ +External Processing Extensions +============================== + +.. toctree:: + :glob: + :maxdepth: 2 + + ../../extensions/http/ext_proc/*/*/v3/* diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index c074ac86a3e2..e249070462a1 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -410,6 +410,11 @@ EXTENSIONS = { "envoy.http.custom_response.redirect_policy": "//source/extensions/http/custom_response/redirect_policy:redirect_policy_lib", "envoy.http.custom_response.local_response_policy": "//source/extensions/http/custom_response/local_response_policy:local_response_policy_lib", + # + # External Processing Response Processors + # + "envoy.http.ext_proc.response_processors.save_processing_response": "//source/extensions/http/ext_proc/response_processors/save_processing_response:save_processing_response_lib", + # # Injected credentials # diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index 5f39f086d84c..4854da4018ee 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -1739,6 +1739,11 @@ envoy.http.custom_response.local_response_policy: status: wip type_urls: - envoy.extensions.http.custom_response.local_response_policy.v3.LocalResponsePolicy +envoy.http.ext_proc.response_processors.save_processing_response: + categories: + - envoy.http.ext_proc.response_processors + security_posture: unknown + status: wip envoy.config_subscription.rest: categories: - envoy.config_subscription diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 2fafe8df478b..63cad33a57b3 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -34,6 +34,7 @@ envoy_cc_library( ":client_interface", ":matching_utils_lib", ":mutation_utils_lib", + ":on_processing_response_interface", "//envoy/event:timer_interface", "//envoy/http:filter_interface", "//envoy/http:header_map_interface", @@ -141,3 +142,15 @@ envoy_cc_library( "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "on_processing_response_interface", + hdrs = ["on_processing_response.h"], + tags = ["skip_on_windows"], + deps = [ + "//envoy/config:typed_config_interface", + "//envoy/server:factory_context_interface", + "//source/common/stats:symbol_table_lib", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 9b0287de85cc..f4efcc23a508 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -1,16 +1,20 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include #include +#include #include "envoy/config/common/mutation_rules/v3/mutation_rules.pb.h" #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h" +#include "source/common/config/utility.h" #include "source/common/http/utility.h" #include "source/common/protobuf/utility.h" #include "source/common/runtime/runtime_features.h" #include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" #include "source/extensions/filters/http/ext_proc/mutation_utils.h" +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" #include "absl/strings/str_format.h" #include "absl/strings/string_view.h" @@ -225,6 +229,8 @@ FilterConfig::FilterConfig(const ExternalProcessor& config, expression_manager_(builder, context.localInfo(), config.request_attributes(), config.response_attributes()), immediate_mutation_checker_(context.regexEngine()), + on_processing_response_factory_cb_( + createOnProcessingResponseCb(config, context, stats_prefix)), thread_local_stream_manager_slot_(context.threadLocal().allocateSlot()) { if (config.disable_clear_route_cache()) { @@ -1297,26 +1303,50 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { case ProcessingResponse::ResponseCase::kRequestHeaders: setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleHeadersResponse(response->request_headers()); + if (on_processing_response_) { + on_processing_response_->afterProcessingRequestHeaders(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kResponseHeaders: setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); + if (on_processing_response_) { + on_processing_response_->afterProcessingResponseHeaders(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kRequestBody: setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleBodyResponse(response->request_body()); + if (on_processing_response_) { + on_processing_response_->afterProcessingRequestBody(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kResponseBody: setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleBodyResponse(response->response_body()); + if (on_processing_response_) { + on_processing_response_->afterProcessingResponseBody(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kRequestTrailers: setDecoderDynamicMetadata(*response); processing_status = decoding_state_.handleTrailersResponse(response->request_trailers()); + if (on_processing_response_) { + on_processing_response_->afterProcessingRequestTrailers(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kResponseTrailers: setEncoderDynamicMetadata(*response); processing_status = encoding_state_.handleTrailersResponse(response->response_trailers()); + if (on_processing_response_) { + on_processing_response_->afterProcessingResponseTrailers(*response, processing_status, + decoder_callbacks_->streamInfo()); + } break; case ProcessingResponse::ResponseCase::kImmediateResponse: if (config_->disableImmediateResponse()) { @@ -1325,6 +1355,11 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { "Treat the immediate response message as spurious response."); processing_status = absl::FailedPreconditionError("unhandled immediate response due to config disabled it"); + + if (on_processing_response_) { + on_processing_response_->afterReceivingImmediateResponse(*response, processing_status, + decoder_callbacks_->streamInfo()); + } } else { setDecoderDynamicMetadata(*response); // We won't be sending anything more to the stream after we @@ -1333,6 +1368,10 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { processing_complete_ = true; onFinishProcessorCalls(Grpc::Status::Ok); closeStream(); + if (on_processing_response_) { + on_processing_response_->afterReceivingImmediateResponse(*response, processing_status, + decoder_callbacks_->streamInfo()); + } sendImmediateResponse(response->immediate_response()); processing_status = absl::OkStatus(); } @@ -1624,6 +1663,35 @@ std::string responseCaseToString(const ProcessingResponse::ResponseCase response } } +std::function()> FilterConfig::createOnProcessingResponseCb( + const ExternalProcessor& config, Envoy::Server::Configuration::CommonFactoryContext& context, + const std::string& stats_prefix) { + if (!config.has_on_processing_response()) { + return nullptr; + } + auto& factory = Envoy::Config::Utility::getAndCheckFactory( + config.on_processing_response()); + auto on_processing_response_config = Envoy::Config::Utility::translateAnyToFactoryConfig( + config.on_processing_response().typed_config(), context.messageValidationVisitor(), factory); + if (on_processing_response_config == nullptr) { + return nullptr; + } + std::shared_ptr shared_on_processing_response_config = + std::move(on_processing_response_config); + return [&factory, shared_on_processing_response_config, &context, + stats_prefix]() -> std::unique_ptr { + return factory.createOnProcessingResponse(*shared_on_processing_response_config, context, + stats_prefix); + }; +} + +std::unique_ptr FilterConfig::createOnProcessingResponse() const { + if (!on_processing_response_factory_cb_) { + return nullptr; + } + return on_processing_response_factory_cb_(); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 731004e50edc..5434301b4e39 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -26,6 +26,7 @@ #include "source/extensions/filters/http/ext_proc/client.h" #include "source/extensions/filters/http/ext_proc/client_base.h" #include "source/extensions/filters/http/ext_proc/matching_utils.h" +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" #include "source/extensions/filters/http/ext_proc/processor_state.h" namespace Envoy { @@ -280,12 +281,17 @@ class FilterConfig { return grpc_service_; } + std::unique_ptr createOnProcessingResponse() const; + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { const std::string final_prefix = absl::StrCat(prefix, "ext_proc.", filter_stats_prefix); return {ALL_EXT_PROC_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } + static std::function()> createOnProcessingResponseCb( + const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config, + Envoy::Server::Configuration::CommonFactoryContext& context, const std::string& stats_prefix); const bool failure_mode_allow_; const bool observability_mode_; envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::RouteCacheAction @@ -319,6 +325,9 @@ class FilterConfig { const ExpressionManager expression_manager_; const ImmediateMutationChecker immediate_mutation_checker_; + + const std::function()> on_processing_response_factory_cb_; + ThreadLocal::SlotPtr thread_local_stream_manager_slot_; }; @@ -398,7 +407,8 @@ class Filter : public Logger::Loggable, encoding_state_(*this, config->processingMode(), config->untypedForwardingMetadataNamespaces(), config->typedForwardingMetadataNamespaces(), - config->untypedReceivingMetadataNamespaces()) {} + config->untypedReceivingMetadataNamespaces()), + on_processing_response_(config->createOnProcessingResponse()) {} const FilterConfig& config() const { return *config_; } const envoy::config::core::v3::GrpcService& grpcServiceConfig() const { @@ -516,10 +526,18 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; + std::vector untyped_forwarding_namespaces_{}; + std::vector typed_forwarding_namespaces_{}; + std::vector untyped_receiving_namespaces_{}; + Http::StreamFilterCallbacks* filter_callbacks_; + Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_; + // The gRPC stream to the external processor, which will be opened // when it's time to send the first message. ExternalProcessorStream* stream_ = nullptr; + std::unique_ptr on_processing_response_; + // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. @@ -531,12 +549,6 @@ class Filter : public Logger::Loggable, // Set to true when the mergePerRouteConfig() method has been called. bool route_config_merged_ = false; - - std::vector untyped_forwarding_namespaces_{}; - std::vector typed_forwarding_namespaces_{}; - std::vector untyped_receiving_namespaces_{}; - Http::StreamFilterCallbacks* filter_callbacks_; - Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_; }; extern std::string responseCaseToString( diff --git a/source/extensions/filters/http/ext_proc/on_processing_response.h b/source/extensions/filters/http/ext_proc/on_processing_response.h new file mode 100644 index 000000000000..7423ccce4767 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/on_processing_response.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include + +#include "envoy/config/typed_config.h" +#include "envoy/server/factory_context.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" +#include "envoy/stream_info/stream_info.h" + +#include "source/common/stats/symbol_table.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +// Interface to inject custom decorators that are called after receiving a response from the +// external processor. +class OnProcessingResponse { +public: + virtual ~OnProcessingResponse() = default; + + // Called after processing the response from the external processor with :ref:`request_headers + // ` set. + virtual void + afterProcessingRequestHeaders(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`response_headers + // ` set. + virtual void + afterProcessingResponseHeaders(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`request_body + // ` set. + virtual void + afterProcessingRequestBody(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`response_body + // ` set. + virtual void + afterProcessingResponseBody(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`request_trailers + // ` set. + virtual void + afterProcessingRequestTrailers(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`response_trailers + // ` set. + virtual void + afterProcessingResponseTrailers(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) PURE; + + // Called after processing the response from the external processor with :ref:`immediate_response + // ` set. + virtual void + afterReceivingImmediateResponse(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) PURE; +}; + +class OnProcessingResponseFactory : public Config::TypedFactory { +public: + ~OnProcessingResponseFactory() override = default; + virtual std::unique_ptr + createOnProcessingResponse(const Protobuf::Message& config, + Envoy::Server::Configuration::CommonFactoryContext& context, + const std::string& stats_prefix) const PURE; + + std::string category() const override { return "envoy.http.ext_proc.response_processors"; } +}; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/ext_proc/response_processors/save_processing_response/BUILD b/source/extensions/http/ext_proc/response_processors/save_processing_response/BUILD new file mode 100644 index 000000000000..13826fae867d --- /dev/null +++ b/source/extensions/http/ext_proc/response_processors/save_processing_response/BUILD @@ -0,0 +1,30 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_extension( + name = "save_processing_response_lib", + srcs = [ + "save_processing_response.cc", + "save_processing_response_factory.cc", + ], + hdrs = [ + "save_processing_response.h", + "save_processing_response_factory.h", + ], + deps = [ + "//envoy/registry", + "//envoy/server:factory_context_interface", + "//envoy/stream_info:stream_info_interface", + "//source/extensions/filters/http/ext_proc:on_processing_response_interface", + "@com_google_absl//absl/strings:string_view", + "@envoy_api//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) diff --git a/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.cc b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.cc new file mode 100644 index 000000000000..17e4c38ff5bf --- /dev/null +++ b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.cc @@ -0,0 +1,76 @@ +#include "source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h" + +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.h" +#include "envoy/stream_info/stream_info.h" + +namespace Envoy { +namespace Http { +namespace ExternalProcessing { + +SaveProcessingResponse::SaveProcessingResponse(const SaveProcessingResponseProto& config) + : filter_state_name_(config.filter_state_name_suffix().empty() + ? SaveProcessingResponseFilterState::kFilterStateName + : absl::StrCat(SaveProcessingResponseFilterState::kFilterStateName, + ".", config.filter_state_name_suffix())), + save_request_headers_(config.save_request_headers()), + save_response_headers_(config.save_response_headers()), + save_request_trailers_(config.save_request_trailers()), + save_response_trailers_(config.save_response_trailers()), + save_immediate_response_(config.save_immediate_response()) {} + +void SaveProcessingResponse::addToFilterState( + const SaveOptions& save_options, + const envoy::service::ext_proc::v3::ProcessingResponse& processing_response, + absl::Status status, Envoy::StreamInfo::StreamInfo& stream_info) { + if (!save_options.save_response) { + return; + } + if (status.ok() || save_options.save_on_error) { + SaveProcessingResponseFilterState* filter_state = + stream_info.filterState()->getDataMutable( + filter_state_name_); + if (filter_state == nullptr) { + auto shared_filter_state = std::make_shared(); + filter_state = shared_filter_state.get(); + stream_info.filterState()->setData(filter_state_name_, shared_filter_state, + Envoy::StreamInfo::FilterState::StateType::Mutable); + } + + filter_state->response.emplace(SaveProcessingResponseFilterState::Response{ + .processing_status = status, .processing_response = processing_response}); + } +} + +void SaveProcessingResponse::afterProcessingRequestHeaders( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status status, + Envoy::StreamInfo::StreamInfo& stream_info) { + addToFilterState(save_request_headers_, response, status, stream_info); +} + +void SaveProcessingResponse::afterProcessingResponseHeaders( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status status, + Envoy::StreamInfo::StreamInfo& stream_info) { + addToFilterState(save_response_headers_, response, status, stream_info); +} + +void SaveProcessingResponse::afterProcessingRequestTrailers( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status status, + Envoy::StreamInfo::StreamInfo& stream_info) { + addToFilterState(save_request_trailers_, response, status, stream_info); +} + +void SaveProcessingResponse::afterProcessingResponseTrailers( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status status, + Envoy::StreamInfo::StreamInfo& stream_info) { + addToFilterState(save_response_trailers_, response, status, stream_info); +} + +void SaveProcessingResponse::afterReceivingImmediateResponse( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status status, + Envoy::StreamInfo::StreamInfo& stream_info) { + addToFilterState(save_immediate_response_, response, status, stream_info); +} + +} // namespace ExternalProcessing +} // namespace Http +} // namespace Envoy diff --git a/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h new file mode 100644 index 000000000000..3e4799c4bf24 --- /dev/null +++ b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h @@ -0,0 +1,83 @@ +#pragma once + +#include +#include + +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.h" +#include "envoy/server/factory_context.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" +#include "envoy/stream_info/filter_state.h" +#include "envoy/stream_info/stream_info.h" + +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Http { +namespace ExternalProcessing { + +struct SaveProcessingResponseFilterState + : public std::enable_shared_from_this, + public Envoy::StreamInfo::FilterState::Object { + static constexpr absl::string_view kFilterStateName = + "envoy.http.ext_proc.response_processors.save_processing_response"; + struct Response { + absl::Status processing_status; + envoy::service::ext_proc::v3::ProcessingResponse processing_response; + }; + absl::optional response; +}; + +class SaveProcessingResponse + : public Envoy::Extensions::HttpFilters::ExternalProcessing::OnProcessingResponse { +public: + using SaveProcessingResponseProto = envoy::extensions::http::ext_proc::response_processors:: + save_processing_response::v3::SaveProcessingResponse; + + SaveProcessingResponse(const SaveProcessingResponseProto&); + + void + afterProcessingRequestHeaders(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingResponseHeaders(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + // Not implemented. + void afterProcessingRequestBody(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override{}; + // Not implemented. + void afterProcessingResponseBody(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override{}; + void afterProcessingRequestTrailers(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingResponseTrailers(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterReceivingImmediateResponse(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + +private: + struct SaveOptions { + SaveOptions(const SaveProcessingResponseProto::SaveOptions& save_options) + : save_response{save_options.save_response()}, save_on_error{save_options.save_on_error()} { + } + const bool save_response = false; + const bool save_on_error = false; + }; + + void addToFilterState(const SaveOptions& save_options, + const envoy::service::ext_proc::v3::ProcessingResponse& processing_response, + absl::Status status, Envoy::StreamInfo::StreamInfo& stream_info); + + const std::string filter_state_name_; + + SaveOptions save_request_headers_; + SaveOptions save_response_headers_; + SaveOptions save_request_trailers_; + SaveOptions save_response_trailers_; + SaveOptions save_immediate_response_; +}; + +} // namespace ExternalProcessing +} // namespace Http +} // namespace Envoy diff --git a/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.cc b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.cc new file mode 100644 index 000000000000..2b0691fd7674 --- /dev/null +++ b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.cc @@ -0,0 +1,31 @@ +#include "source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.h" + +#include + +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.h" +#include "envoy/registry/registry.h" + +namespace Envoy { +namespace Http { +namespace ExternalProcessing { + +std::string SaveProcessingResponseFactory::name() const { + return "envoy.extensions.http.ext_proc.save_processing_response"; +} + +std::unique_ptr +SaveProcessingResponseFactory::createOnProcessingResponse( + const Protobuf::Message& config, Envoy::Server::Configuration::CommonFactoryContext& context, + const std::string&) const { + const SaveProcessingResponseProto& save_processing_response_config = + MessageUtil::downcastAndValidate( + config, context.messageValidationVisitor()); + return std::make_unique(save_processing_response_config); +} + +REGISTER_FACTORY(SaveProcessingResponseFactory, + Envoy::Extensions::HttpFilters::ExternalProcessing::OnProcessingResponseFactory); + +} // namespace ExternalProcessing +} // namespace Http +} // namespace Envoy diff --git a/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.h b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.h new file mode 100644 index 000000000000..f056aea900ba --- /dev/null +++ b/source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.h @@ -0,0 +1,36 @@ +#pragma once + +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.h" +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.validate.h" +#include "envoy/server/factory_context.h" + +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" +#include "source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h" + +namespace Envoy { +namespace Http { +namespace ExternalProcessing { + +class SaveProcessingResponseFactory + : public Envoy::Extensions::HttpFilters::ExternalProcessing::OnProcessingResponseFactory { +public: + using SaveProcessingResponseProto = envoy::extensions::http::ext_proc::response_processors:: + save_processing_response::v3::SaveProcessingResponse; + ~SaveProcessingResponseFactory() override = default; + + std::unique_ptr + createOnProcessingResponse(const Protobuf::Message& config, + Envoy::Server::Configuration::CommonFactoryContext& context, + const std::string&) const override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return ProtobufTypes::MessagePtr{new envoy::extensions::http::ext_proc::response_processors:: + save_processing_response::v3::SaveProcessingResponse()}; + } + + std::string name() const override; +}; + +} // namespace ExternalProcessing +} // namespace Http +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index eff8008e92ba..6bdfce930138 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -46,12 +46,15 @@ envoy_extension_cc_test( ":utils_lib", "//envoy/http:filter_interface", "//envoy/network:filter_interface", + "//envoy/registry", "//source/common/http:context_lib", "//source/common/http:rds_lib", "//source/common/network:address_lib", "//source/common/protobuf", "//source/common/stats:isolated_store_lib", "//source/extensions/filters/http/ext_proc", + "//source/extensions/filters/http/ext_proc:on_processing_response_interface", + "//source/extensions/http/ext_proc/response_processors/save_processing_response:save_processing_response_lib", "//test/common/http:common_lib", "//test/common/http:conn_manager_impl_test_base_lib", "//test/mocks/event:event_mocks", @@ -61,7 +64,10 @@ envoy_extension_cc_test( "//test/mocks/server:overload_manager_mocks", "//test/mocks/server:server_factory_context_mocks", "//test/proto:helloworld_proto_cc_proto", + "//test/test_common:registry_lib", + "//test/test_common:utility_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) @@ -292,7 +298,14 @@ envoy_extension_cc_test_library( tags = ["skip_on_windows"], deps = [ "//envoy/http:header_map_interface", + "//envoy/stream_info:stream_info_interface", + "//source/common/common:assert_lib", + "//source/common/common:macros", + "//source/common/protobuf", + "//source/extensions/filters/http/ext_proc:on_processing_response_interface", "//test/test_common:utility_lib", + "@com_google_absl//absl/container:flat_hash_set", + "@com_google_absl//absl/status", "@com_google_absl//absl/strings:str_format", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 00e9f49c3bee..544c0bb4fec2 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -2,9 +2,11 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/extensions/http/ext_proc/response_processors/save_processing_response/v3/save_processing_response.pb.h" #include "envoy/http/filter.h" #include "envoy/network/connection.h" #include "envoy/network/filter.h" +#include "envoy/registry/registry.h" #include "envoy/service/ext_proc/v3/external_processor.pb.h" #include "source/common/http/conn_manager_impl.h" @@ -13,6 +15,9 @@ #include "source/common/protobuf/protobuf.h" #include "source/common/stats/isolated_store_impl.h" #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" +#include "source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response.h" +#include "source/extensions/http/ext_proc/response_processors/save_processing_response/save_processing_response_factory.h" #include "test/common/http/common.h" #include "test/common/http/conn_manager_impl_test_base.h" @@ -30,6 +35,7 @@ #include "test/mocks/tracing/mocks.h" #include "test/mocks/upstream/cluster_manager.h" #include "test/test_common/printers.h" +#include "test/test_common/registry.h" #include "test/test_common/utility.h" #include "gmock/gmock.h" @@ -70,6 +76,8 @@ using ::Envoy::Http::TestRequestHeaderMapImpl; using ::Envoy::Http::TestRequestTrailerMapImpl; using ::Envoy::Http::TestResponseHeaderMapImpl; using ::Envoy::Http::TestResponseTrailerMapImpl; +using ::Envoy::Http::ExternalProcessing::SaveProcessingResponseFactory; +using ::Envoy::Http::ExternalProcessing::SaveProcessingResponseFilterState; using ::testing::AnyNumber; using ::testing::Eq; @@ -5295,6 +5303,880 @@ TEST_F(HttpFilterTest, InvalidResponseContentLength) { filter_->onDestroy(); } +TEST_F(HttpFilterTest, OnProcessingResponseHeaders) { + TestOnProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + on_processing_response: + name: "abc" + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + )EOF"); + + request_headers_.addCopy(LowerCaseString("x-some-other-header"), "yes"); + request_headers_.addCopy(LowerCaseString("x-do-we-want-this"), "no"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& header_resp) { + auto headers_mut = header_resp.mutable_response()->mutable_header_mutation(); + auto add1 = headers_mut->add_set_headers(); + add1->mutable_header()->set_key("x-new-header"); + add1->mutable_header()->set_raw_value("new"); + add1->mutable_append()->set_value(false); + auto add2 = headers_mut->add_set_headers(); + add2->mutable_header()->set_key("x-some-other-header"); + add2->mutable_header()->set_raw_value("no"); + add2->mutable_append()->set_value(true); + *headers_mut->add_remove_headers() = "x-do-we-want-this"; + }); + + // We should now have changed the original header a bit + TestRequestHeaderMapImpl expected{{":path", "/"}, + {":method", "POST"}, + {":scheme", "http"}, + {"host", "host"}, + {"x-new-header", "new"}, + {"x-some-other-header", "yes"}, + {"x-some-other-header", "no"}}; + EXPECT_THAT(&request_headers_, HeaderMapEqualIgnoreOrder(&expected)); + + ASSERT_TRUE( + dynamic_metadata_.filter_metadata().contains("envoy.test.ext_proc.request_headers_response")); + const auto& request_headers_struct_metadata = + dynamic_metadata_.filter_metadata().at("envoy.test.ext_proc.request_headers_response"); + ProtobufWkt::Struct expected_request_headers; + TestUtility::loadFromJson(R"EOF( +{ + "x-do-we-want-this": "remove", + "x-new-header": "new", + "x-some-other-header": "no" +})EOF", + expected_request_headers); + EXPECT_TRUE( + TestUtility::protoEqual(request_headers_struct_metadata, expected_request_headers, true)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders& response_headers, ProcessingResponse&, + HeadersResponse& header_resp) { + EXPECT_FALSE(response_headers.end_of_stream()); + TestRequestHeaderMapImpl expected_response{ + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "3"}}; + EXPECT_THAT(response_headers.headers(), HeaderProtosEqual(expected_response)); + + auto* resp_headers_mut = header_resp.mutable_response()->mutable_header_mutation(); + auto* resp_add1 = resp_headers_mut->add_set_headers(); + resp_add1->mutable_header()->set_key("x-new-header"); + resp_add1->mutable_header()->set_raw_value("new"); + }); + + // We should now have changed the original header a bit + TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}, + {"content-length", "3"}, + {"x-new-header", "new"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + + ASSERT_TRUE(dynamic_metadata_.filter_metadata().contains( + "envoy.test.ext_proc.response_headers_response")); + const auto& response_headers_struct_metadata = + dynamic_metadata_.filter_metadata().at("envoy.test.ext_proc.response_headers_response"); + ProtobufWkt::Struct expected_response_headers; + TestUtility::loadFromJson(R"EOF( +{ + "x-new-header": "new", +})EOF", + expected_response_headers); + + EXPECT_TRUE( + TestUtility::protoEqual(response_headers_struct_metadata, expected_response_headers, true)); + + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, SaveProcessingResponseHeaders) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + save_request_headers: + save_response: true + save_response_headers: + save_response: true + )EOF"); + + request_headers_.addCopy(LowerCaseString("x-some-other-header"), "yes"); + request_headers_.addCopy(LowerCaseString("x-do-we-want-this"), "no"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& header_resp) { + auto headers_mut = header_resp.mutable_response()->mutable_header_mutation(); + auto add1 = headers_mut->add_set_headers(); + add1->mutable_header()->set_key("x-new-header"); + add1->mutable_header()->set_raw_value("new"); + add1->mutable_append()->set_value(false); + auto add2 = headers_mut->add_set_headers(); + add2->mutable_header()->set_key("x-some-other-header"); + add2->mutable_header()->set_raw_value("no"); + add2->mutable_append()->set_value(true); + *headers_mut->add_remove_headers() = "x-do-we-want-this"; + }); + + // We should now have changed the original header a bit + TestRequestHeaderMapImpl expected{{":path", "/"}, + {":method", "POST"}, + {":scheme", "http"}, + {"host", "host"}, + {"x-new-header", "new"}, + {"x-some-other-header", "yes"}, + {"x-some-other-header", "no"}}; + EXPECT_THAT(&request_headers_, HeaderMapEqualIgnoreOrder(&expected)); + auto filter_state = stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName); + ASSERT_TRUE(filter_state->response.has_value()); + + envoy::service::ext_proc::v3::ProcessingResponse expected_response; + TestUtility::loadFromJson( + R"EOF( + { + "requestHeaders": { + "response": { + "headerMutation": { + "setHeaders": [{ + "header": { + "key": "x-new-header", + "rawValue": "bmV3" + }, + "append": false + }, { + "header": { + "key": "x-some-other-header", + "rawValue": "bm8=" + }, + "append": true + }], + "removeHeaders": ["x-do-we-want-this"] + } + } + } +})EOF", + expected_response); + + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response)); + + filter_state->response.reset(); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "3"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + processResponseHeaders(false, [](const HttpHeaders& response_headers, ProcessingResponse&, + HeadersResponse& header_resp) { + EXPECT_FALSE(response_headers.end_of_stream()); + TestRequestHeaderMapImpl expected_response{ + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "3"}}; + EXPECT_THAT(response_headers.headers(), HeaderProtosEqual(expected_response)); + + auto* resp_headers_mut = header_resp.mutable_response()->mutable_header_mutation(); + auto* resp_add1 = resp_headers_mut->add_set_headers(); + resp_add1->mutable_header()->set_key("x-new-header"); + resp_add1->mutable_header()->set_raw_value("new"); + }); + + // We should now have changed the original header a bit + TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}, + {"content-length", "3"}, + {"x-new-header", "new"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + + ASSERT_TRUE(filter_state->response.has_value()); + + envoy::service::ext_proc::v3::ProcessingResponse expected_response_headers; + TestUtility::loadFromJson( + R"EOF( +{ + "responseHeaders": { + "response": { + "headerMutation": { + "setHeaders": [{ + "header": { + "key": "x-new-header", + "rawValue": "bmV3" + }, + "append": false + }] + } + } + } +})EOF", + expected_response_headers); + + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response_headers)); + filter_state->response.reset(); + + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, OnProcessingResponseBodies) { + TestOnProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SKIP" + response_header_mode: "SKIP" + request_body_mode: "BUFFERED" + response_body_mode: "BUFFERED" + request_trailer_mode: "SKIP" + response_trailer_mode: "SKIP" + on_processing_response: + name: "abc" + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + )EOF"); + + // Create synthetic HTTP request + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + request_headers_.addCopy(LowerCaseString("content-length"), 100); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false)); + + Buffer::OwnedImpl req_data; + TestUtility::feedBufferWithRandomCharacters(req_data, 100); + Buffer::OwnedImpl buffered_request_data; + setUpDecodingBuffering(buffered_request_data, true); + + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(req_data, true)); + + processRequestBody([&buffered_request_data](const HttpBody& req_body, ProcessingResponse&, + BodyResponse& body_resp) { + EXPECT_TRUE(req_body.end_of_stream()); + EXPECT_EQ(buffered_request_data.toString(), req_body.body()); + auto* body_mut = body_resp.mutable_response()->mutable_body_mutation(); + body_mut->set_clear_body(true); + }); + EXPECT_EQ(0, buffered_request_data.length()); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + ASSERT_TRUE( + dynamic_metadata_.filter_metadata().contains("envoy.test.ext_proc.request_body_response")); + const auto& request_body_struct_metadata = + dynamic_metadata_.filter_metadata().at("envoy.test.ext_proc.request_body_response"); + ProtobufWkt::Struct expected_request_body; + TestUtility::loadFromJson(R"EOF( +{ + "clear_body": "1" +})EOF", + expected_request_body); + + EXPECT_TRUE(TestUtility::protoEqual(request_body_struct_metadata, expected_request_body, true)); + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + response_headers_.addCopy(LowerCaseString("content-length"), "100"); + + EXPECT_EQ(Filter1xxHeadersStatus::Continue, filter_->encode1xxHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl resp_data; + TestUtility::feedBufferWithRandomCharacters(resp_data, 100); + Buffer::OwnedImpl buffered_response_data; + setUpEncodingBuffering(buffered_response_data, true); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true)); + + processResponseBody([&buffered_response_data](const HttpBody& req_body, ProcessingResponse&, + BodyResponse& body_resp) { + EXPECT_TRUE(req_body.end_of_stream()); + EXPECT_EQ(buffered_response_data.toString(), req_body.body()); + auto* body_mut = body_resp.mutable_response()->mutable_body_mutation(); + body_mut->set_body("Hello, World!"); + }); + EXPECT_EQ("Hello, World!", buffered_response_data.toString()); + + ASSERT_TRUE( + dynamic_metadata_.filter_metadata().contains("envoy.test.ext_proc.response_body_response")); + const auto& response_body_struct_metadata = + dynamic_metadata_.filter_metadata().at("envoy.test.ext_proc.response_body_response"); + ProtobufWkt::Struct expected_response_body; + TestUtility::loadFromJson(R"EOF( +{ + "body": "Hello, World!" +})EOF", + expected_response_body); + EXPECT_TRUE(TestUtility::protoEqual(response_body_struct_metadata, expected_response_body, true)); + + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(2, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); +} + +TEST_F(HttpFilterTest, SaveImmediateResponse) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + save_immediate_response: + save_response: true + )EOF"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(encoder_callbacks_, sendLocalReply(::Envoy::Http::Code::BadRequest, "Bad request", _, + Eq(absl::nullopt), "Got_a_bad_request")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, Unused, + Unused) { modify_headers(immediate_response_headers); })); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_raw_value("text/plain"); + auto* hdr2 = immediate_headers->add_set_headers(); + hdr2->mutable_append()->set_value(true); + hdr2->mutable_header()->set_key("x-another-thing"); + hdr2->mutable_header()->set_raw_value("1"); + auto* hdr3 = immediate_headers->add_set_headers(); + hdr3->mutable_append()->set_value(true); + hdr3->mutable_header()->set_key("x-another-thing"); + hdr3->mutable_header()->set_raw_value("2"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + TestResponseHeaderMapImpl expected_response_headers{ + {"content-type", "text/plain"}, {"x-another-thing", "1"}, {"x-another-thing", "2"}}; + EXPECT_THAT(&immediate_response_headers, HeaderMapEqualIgnoreOrder(&expected_response_headers)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + auto filter_state = stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName); + ASSERT_TRUE(filter_state->response.has_value()); + envoy::service::ext_proc::v3::ProcessingResponse expected_response; + TestUtility::loadFromJson( + R"EOF( +{ + "immediateResponse": { + "status": { + "code": "BadRequest" + }, + "headers": { + "setHeaders": [{ + "header": { + "key": "content-type", + "rawValue": "dGV4dC9wbGFpbg==" + } + }, { + "header": { + "key": "x-another-thing", + "rawValue": "MQ==" + }, + "append": true + }, { + "header": { + "key": "x-another-thing", + "rawValue": "Mg==" + }, + "append": true + }] + }, + "body": "QmFkIHJlcXVlc3Q=", + "details": "Got a bad request" + } +})EOF", + expected_response); + + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response)); + + filter_state->response.reset(); + + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + checkGrpcCallHeaderOnlyStats(envoy::config::core::v3::TrafficDirection::INBOUND); + expectNoGrpcCall(envoy::config::core::v3::TrafficDirection::OUTBOUND); + + expectFilterState(Envoy::ProtobufWkt::Struct()); +} + +TEST_F(HttpFilterTest, SaveImmediateResponseOnError) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + disable_immediate_response: true + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + save_immediate_response: + save_response: true + save_on_error: true + )EOF"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + TestResponseHeaderMapImpl immediate_response_headers; + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_raw_value("text/plain"); + auto* hdr2 = immediate_headers->add_set_headers(); + hdr2->mutable_append()->set_value(true); + hdr2->mutable_header()->set_key("x-another-thing"); + hdr2->mutable_header()->set_raw_value("1"); + auto* hdr3 = immediate_headers->add_set_headers(); + hdr3->mutable_append()->set_value(true); + hdr3->mutable_header()->set_key("x-another-thing"); + hdr3->mutable_header()->set_raw_value("2"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + auto filter_state = stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName); + ASSERT_TRUE(filter_state->response.has_value()); + envoy::service::ext_proc::v3::ProcessingResponse expected_response; + TestUtility::loadFromJson( + R"EOF( +{ + "immediateResponse": { + "status": { + "code": "BadRequest" + }, + "headers": { + "setHeaders": [{ + "header": { + "key": "content-type", + "rawValue": "dGV4dC9wbGFpbg==" + } + }, { + "header": { + "key": "x-another-thing", + "rawValue": "MQ==" + }, + "append": true + }, { + "header": { + "key": "x-another-thing", + "rawValue": "Mg==" + }, + "append": true + }] + }, + "body": "QmFkIHJlcXVlc3Q=", + "details": "Got a bad request" + } +})EOF", + expected_response); + + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response)); + + filter_state->response.reset(); + + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(0, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + expectNoGrpcCall(envoy::config::core::v3::TrafficDirection::OUTBOUND); + + expectFilterState(Envoy::ProtobufWkt::Struct()); +} + +TEST_F(HttpFilterTest, DontSaveImmediateResponse) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + save_immediate_response: + save_response: false + )EOF"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(encoder_callbacks_, sendLocalReply(::Envoy::Http::Code::BadRequest, "Bad request", _, + Eq(absl::nullopt), "Got_a_bad_request")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, Unused, + Unused) { modify_headers(immediate_response_headers); })); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_raw_value("text/plain"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + TestResponseHeaderMapImpl expected_response_headers{{"content-type", "text/plain"}}; + EXPECT_THAT(&immediate_response_headers, HeaderMapEqualIgnoreOrder(&expected_response_headers)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName), + nullptr); + + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + checkGrpcCallHeaderOnlyStats(envoy::config::core::v3::TrafficDirection::INBOUND); + expectNoGrpcCall(envoy::config::core::v3::TrafficDirection::OUTBOUND); + + expectFilterState(Envoy::ProtobufWkt::Struct()); +} + +TEST_F(HttpFilterTest, DontSaveImmediateResponseOnError) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + disable_immediate_response: true + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + save_immediate_response: + save_response: true + )EOF"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_raw_value("text/plain"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + TestResponseHeaderMapImpl expected_response_headers{{"content-type", "text/plain"}}; + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + Buffer::OwnedImpl resp_data("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_data, false)); + Buffer::OwnedImpl empty_data; + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(empty_data, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + EXPECT_EQ(stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName), + nullptr); + + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(0, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + expectNoGrpcCall(envoy::config::core::v3::TrafficDirection::OUTBOUND); + + expectFilterState(Envoy::ProtobufWkt::Struct()); +} + +TEST_F(HttpFilterTest, SaveResponseTrailers) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "STREAMED" + response_body_mode: "STREAMED" + request_trailer_mode: "SEND" + response_trailer_mode: "SEND" + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + filter_state_name_suffix: "test" + save_request_trailers: + save_response: true + save_response_trailers: + save_response: true + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + const uint32_t chunk_number = 20; + sendChunkRequestData(chunk_number, true); + const std::string filter_state_name = + absl::StrCat(SaveProcessingResponseFilterState::kFilterStateName, ".test"); + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_trailers_)); + processRequestTrailers( + [](const HttpTrailers&, ProcessingResponse&, TrailersResponse& trailers_resp) { + auto headers_mut = trailers_resp.mutable_header_mutation(); + auto add1 = headers_mut->add_set_headers(); + add1->mutable_header()->set_key("x-new-header1"); + add1->mutable_header()->set_raw_value("new"); + add1->mutable_append()->set_value(false); + auto add2 = headers_mut->add_set_headers(); + add2->mutable_header()->set_key("x-some-other-header1"); + add2->mutable_header()->set_raw_value("no"); + add2->mutable_append()->set_value(true); + *headers_mut->add_remove_headers() = "x-do-we-want-this"; + }, + true); + auto filter_state = stream_info_.filterState()->getDataMutable( + filter_state_name); + ASSERT_TRUE(filter_state->response.has_value()); + envoy::service::ext_proc::v3::ProcessingResponse expected_response; + TestUtility::loadFromJson( + R"EOF( + { + "requestTrailers": { + "headerMutation": { + "setHeaders": [{ + "header": { + "key": "x-new-header1", + "rawValue": "bmV3" + }, + "append": false + }, { + "header": { + "key": "x-some-other-header1", + "rawValue": "bm8=" + }, + "append": true + }], + "removeHeaders": ["x-do-we-want-this"] + } + } +})EOF", + expected_response); + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response)); + + filter_state->response.reset(); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(true, absl::nullopt); + sendChunkResponseData(chunk_number * 2, true); + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + processResponseTrailers( + [](const HttpTrailers&, ProcessingResponse&, TrailersResponse& trailers_resp) { + auto headers_mut = trailers_resp.mutable_header_mutation(); + auto* resp_add1 = headers_mut->add_set_headers(); + resp_add1->mutable_header()->set_key("x-new-header1"); + resp_add1->mutable_header()->set_raw_value("new"); + }, + true); + processResponseTrailers(absl::nullopt, false); + envoy::service::ext_proc::v3::ProcessingResponse expected_response_trailers; + TestUtility::loadFromJson( + R"EOF( + { + "responseTrailers": { + "headerMutation": { + "setHeaders": [{ + "header": { + "key": "x-new-header1", + "rawValue": "bmV3" + }, + "append": false + }] + } + } +})EOF", + expected_response_trailers); + EXPECT_TRUE(TestUtility::protoEqual(filter_state->response.value().processing_response, + expected_response_trailers)); + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + // Total gRPC messages include two headers and two trailers on top of the req/resp chunk data. + uint32_t total_msg = 3 * chunk_number + 4; + EXPECT_EQ(total_msg, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(total_msg, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::INBOUND, chunk_number); + checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::OUTBOUND, 2 * chunk_number); +} + +TEST_F(HttpFilterTest, DontSaveProcessingResponse) { + SaveProcessingResponseFactory factory; + Envoy::Registry::InjectFactory registration(factory); + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "STREAMED" + response_body_mode: "STREAMED" + request_trailer_mode: "SEND" + response_trailer_mode: "SEND" + on_processing_response: + name: "abc" + typed_config: + '@type': type.googleapis.com/envoy.extensions.http.ext_proc.response_processors.save_processing_response.v3.SaveProcessingResponse + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + const uint32_t chunk_number = 20; + sendChunkRequestData(chunk_number, true); + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_trailers_)); + processRequestTrailers(absl::nullopt, true); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(true, absl::nullopt); + sendChunkResponseData(chunk_number * 2, true); + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + processResponseTrailers(absl::nullopt, false); + EXPECT_EQ(stream_info_.filterState()->getDataMutable( + SaveProcessingResponseFilterState::kFilterStateName), + nullptr); + filter_->onDestroy(); + + EXPECT_EQ(1, config_->stats().streams_started_.value()); + // Total gRPC messages include two headers and two trailers on top of the req/resp chunk data. + uint32_t total_msg = 3 * chunk_number + 4; + EXPECT_EQ(total_msg, config_->stats().stream_msgs_sent_.value()); + EXPECT_EQ(total_msg, config_->stats().stream_msgs_received_.value()); + EXPECT_EQ(1, config_->stats().streams_closed_.value()); + + checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::INBOUND, chunk_number); + checkGrpcCallStatsAll(envoy::config::core::v3::TrafficDirection::OUTBOUND, 2 * chunk_number); +} } // namespace } // namespace ExternalProcessing } // namespace HttpFilters diff --git a/test/extensions/filters/http/ext_proc/utils.cc b/test/extensions/filters/http/ext_proc/utils.cc index 896a3515f7b4..3baaefeb516f 100644 --- a/test/extensions/filters/http/ext_proc/utils.cc +++ b/test/extensions/filters/http/ext_proc/utils.cc @@ -1,7 +1,20 @@ #include "test/extensions/filters/http/ext_proc/utils.h" +#include +#include + +#include "envoy/http/header_map.h" +#include "envoy/stream_info/stream_info.h" + +#include "source/common/common/assert.h" +#include "source/common/common/macros.h" +#include "source/common/protobuf/protobuf.h" + #include "test/test_common/utility.h" +#include "absl/container/flat_hash_set.h" +#include "absl/status/status.h" + namespace Envoy { namespace Extensions { namespace HttpFilters { @@ -33,6 +46,77 @@ envoy::config::core::v3::HeaderValue makeHeaderValue(const std::string& key, return v; } +void TestOnProcessingResponse::afterProcessingRequestHeaders( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_request_headers()); + ASSERT(response.request_headers().has_response() && + response.request_headers().response().has_header_mutation()); + stream_info.setDynamicMetadata( + "envoy.test.ext_proc.request_headers_response", + getHeaderMutations(response.request_headers().response().header_mutation())); +} + +void TestOnProcessingResponse::afterProcessingResponseHeaders( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_response_headers()); + ASSERT(response.response_headers().has_response() && + response.response_headers().response().has_header_mutation()); + stream_info.setDynamicMetadata( + "envoy.test.ext_proc.response_headers_response", + getHeaderMutations(response.response_headers().response().header_mutation())); +} + +void TestOnProcessingResponse::afterProcessingRequestBody( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_request_body()); + ASSERT(response.request_body().has_response() && + response.request_body().response().has_body_mutation()); + stream_info.setDynamicMetadata( + "envoy.test.ext_proc.request_body_response", + getBodyMutation(response.request_body().response().body_mutation())); +} + +void TestOnProcessingResponse::afterProcessingResponseBody( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_response_body()); + ASSERT(response.response_body().has_response() && + response.response_body().response().has_body_mutation()); + stream_info.setDynamicMetadata( + "envoy.test.ext_proc.response_body_response", + getBodyMutation(response.response_body().response().body_mutation())); +} + +void TestOnProcessingResponse::afterProcessingRequestTrailers( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_request_trailers()); + ASSERT(response.request_trailers().has_header_mutation()); + stream_info.setDynamicMetadata("envoy.test.ext_proc.request_trailers_response", + getHeaderMutations(response.request_trailers().header_mutation())); +} + +void TestOnProcessingResponse::afterProcessingResponseTrailers( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_response_trailers()); + ASSERT(response.response_trailers().has_header_mutation()); + stream_info.setDynamicMetadata( + "envoy.test.ext_proc.response_trailers_response", + getHeaderMutations(response.response_trailers().header_mutation())); +} + +void TestOnProcessingResponse::afterReceivingImmediateResponse( + const envoy::service::ext_proc::v3::ProcessingResponse& response, absl::Status, + Envoy::StreamInfo::StreamInfo& stream_info) { + ASSERT(response.has_immediate_response()); + ASSERT(response.immediate_response().has_headers()); + stream_info.setDynamicMetadata("envoy.test.ext_proc.response_immediate_response", + getHeaderMutations(response.immediate_response().headers())); +} } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/http/ext_proc/utils.h b/test/extensions/filters/http/ext_proc/utils.h index 351d40bd747e..c3832f7348f0 100644 --- a/test/extensions/filters/http/ext_proc/utils.h +++ b/test/extensions/filters/http/ext_proc/utils.h @@ -1,7 +1,13 @@ #pragma once +#include + #include "envoy/config/core/v3/base.pb.h" #include "envoy/http/header_map.h" +#include "envoy/server/factory_context.h" +#include "envoy/stream_info/stream_info.h" + +#include "source/extensions/filters/http/ext_proc/on_processing_response.h" #include "absl/strings/str_format.h" #include "gmock/gmock.h" @@ -56,6 +62,79 @@ MATCHER_P2(SingleProtoHeaderValueIs, key, value, envoy::config::core::v3::HeaderValue makeHeaderValue(const std::string& key, const std::string& value); + +class TestOnProcessingResponse : public OnProcessingResponse { +public: + void + afterProcessingRequestHeaders(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) override; + + void afterProcessingResponseHeaders(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingRequestBody(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingResponseBody(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingRequestTrailers(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void afterProcessingResponseTrailers(const envoy::service::ext_proc::v3::ProcessingResponse&, + absl::Status, Envoy::StreamInfo::StreamInfo&) override; + void + afterReceivingImmediateResponse(const envoy::service::ext_proc::v3::ProcessingResponse& response, + absl::Status processing_status, + Envoy::StreamInfo::StreamInfo&) override; + +private: + Envoy::ProtobufWkt::Struct + getHeaderMutations(const envoy::service::ext_proc::v3::HeaderMutation& header_mutation) { + Envoy::ProtobufWkt::Struct struct_metadata; + for (auto& header : header_mutation.set_headers()) { + Envoy::ProtobufWkt::Value value; + value.mutable_string_value()->assign(header.header().raw_value()); + struct_metadata.mutable_fields()->insert(std::make_pair(header.header().key(), value)); + } + for (auto& header : header_mutation.remove_headers()) { + Envoy::ProtobufWkt::Value value; + value.mutable_string_value()->assign("remove"); + struct_metadata.mutable_fields()->insert(std::make_pair(header, value)); + } + return struct_metadata; + } + Envoy::ProtobufWkt::Struct + getBodyMutation(const envoy::service::ext_proc::v3::BodyMutation& body_mutation) { + Envoy::ProtobufWkt::Struct struct_metadata; + if (body_mutation.has_body()) { + Envoy::ProtobufWkt::Value value; + value.mutable_string_value()->assign(body_mutation.body()); + struct_metadata.mutable_fields()->insert(std::make_pair("body", value)); + } else { + Envoy::ProtobufWkt::Value value; + value.mutable_string_value()->assign(absl::StrCat(body_mutation.clear_body())); + struct_metadata.mutable_fields()->insert(std::make_pair("clear_body", value)); + } + return struct_metadata; + } +}; + +class TestOnProcessingResponseFactory : public OnProcessingResponseFactory { +public: + ~TestOnProcessingResponseFactory() override = default; + std::unique_ptr + createOnProcessingResponse(const Protobuf::Message&, + Envoy::Server::Configuration::CommonFactoryContext&, + const std::string&) const override { + return std::make_unique(); + } + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + // Using Struct instead of a custom filter config proto. This is only allowed in tests. + return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Struct()}; + } + + std::string name() const override { return "on_processing_response"; } +}; + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/tools/extensions/extensions_schema.yaml b/tools/extensions/extensions_schema.yaml index 7f817e263fae..1ed5df37dfb7 100644 --- a/tools/extensions/extensions_schema.yaml +++ b/tools/extensions/extensions_schema.yaml @@ -140,6 +140,7 @@ categories: - envoy.load_balancing_policies - envoy.http.early_header_mutation - envoy.http.custom_response +- envoy.http.ext_proc.response_processors - envoy.router.cluster_specifier_plugin - envoy.tap_sinks.udp_sink - envoy.tracers.opentelemetry.resource_detectors