Skip to content

Commit

Permalink
tcp_proxy: Add filter state receive_before_connect to tcp_proxy (#38189)
Browse files Browse the repository at this point in the history
Signed-off-by: Akshita Agarwal <akshita.agarwal@airbnb.com>
Co-authored-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
akshita31 and jrajahalme authored Feb 20, 2025
1 parent 6317db1 commit bcaa95f
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 21 deletions.
6 changes: 5 additions & 1 deletion changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,9 @@ new_features:
change: |
Made the :ref:`credential injector filter <envoy_v3_api_msg_extensions.filters.http.credential_injector.v3.CredentialInjector>`
work as an upstream filter.
- area: tcp_proxy
change: |
added :ref:`an option <config_network_filters_tcp_proxy_receive_before_connect>` to allow filters to read from the
downstream connection before TCP proxy has opened the upstream connection, by setting a filter state object for the key
``envoy.tcp_proxy.receive_before_connect``.
deprecated:
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ To define metadata that a suitable upstream host must match, use one of the foll
In addition, dynamic metadata can be set by earlier network filters on the ``StreamInfo``. Setting the dynamic metadata
must happen before ``onNewConnection()`` is called on the ``TcpProxy`` filter to affect load balancing.

.. _config_network_filters_tcp_proxy_receive_before_connect:

Early reception and delayed upstream connection establishment
-------------------------------------------------------------

``TcpProxy`` filter normally disables reading on the downstream connection until the upstream connection has been established. In some situations earlier filters in the filter chain (example as in https://github.com/envoyproxy/envoy/issues/9023) may need to read data from the downstream connection before allowing the upstream connection to be established.
This can be done by setting the ``StreamInfo`` filter state object for the key ``envoy.tcp_proxy.receive_before_connect`` to be `true`. Setting this filter state must happen in ``initializeReadFilterCallbacks()`` callback of the network filter so that it is done before ``TcpProxy`` filter is initialized.

When the ``envoy.tcp_proxy.receive_before_connect`` filter state is set, it is possible that the ``TcpProxy`` filter receives data before the upstream connection has been established.
In such a case, ``TcpProxy`` filter now buffers data it receives before the upstream connection has been established and flushes it once the upstream connection is established.
Filters can also delay the upstream connection setup by returning ``StopIteration`` from their ``onNewConnection`` and ``onData`` callbacks.
On receiving early data, TCP_PROXY will read disable the connection until the upstream connection is established. This is to protect the early buffer from overflowing.

.. _config_network_filters_tcp_proxy_tunneling_over_http:

Tunneling TCP over HTTP
Expand Down Expand Up @@ -72,6 +85,7 @@ The downstream statistics are rooted at *tcp.<stat_prefix>.* with the following
downstream_cx_rx_bytes_buffered, Gauge, Total bytes currently buffered from the downstream connection
downstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from downstream
downstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from downstream
early_data_received_count_total, Counter, Total number of connections where tcp proxy received data before upstream connection establishment is complete
idle_timeout, Counter, Total number of connections closed due to idle timeout
max_downstream_connection_duration, Counter, Total number of connections closed due to max_downstream_connection_duration timeout
on_demand_cluster_attempt, Counter, Total number of connections that requested on demand cluster
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ envoy_cc_library(
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
"//envoy/stats:timespan_interface",
"//envoy/stream_info:bool_accessor_interface",
"//envoy/stream_info:filter_state_interface",
"//envoy/tcp:conn_pool_interface",
"//envoy/tcp:upstream_interface",
Expand Down
74 changes: 65 additions & 9 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h"
#include "envoy/registry/registry.h"
#include "envoy/stats/scope.h"
#include "envoy/stream_info/bool_accessor.h"
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

Expand Down Expand Up @@ -300,10 +301,25 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec
ASSERT(getStreamInfo().getDownstreamBytesMeter() == nullptr);
ASSERT(getStreamInfo().getUpstreamBytesMeter() != nullptr);

// Need to disable reads so that we don't write to an upstream that might fail
// in onData(). This will get re-enabled when the upstream connection is
// established.
read_callbacks_->connection().readDisable(true);
const StreamInfo::BoolAccessor* receive_before_connect =
read_callbacks_->connection()
.streamInfo()
.filterState()
->getDataReadOnly<StreamInfo::BoolAccessor>(ReceiveBeforeConnectKey);

// If receive_before_connect is set, we will not read disable the downstream connection
// as a filter before TCP_PROXY has set this state so that it can process data before
// the upstream connection is established.
if (receive_before_connect != nullptr && receive_before_connect->value()) {
ENVOY_CONN_LOG(debug, "receive_before_connect is enabled", read_callbacks_->connection());
receive_before_connect_ = true;
} else {
// Need to disable reads so that we don't write to an upstream that might fail
// in onData(). This will get re-enabled when the upstream connection is
// established.
read_callbacks_->connection().readDisable(true);
}

getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>());
getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());

Expand Down Expand Up @@ -545,8 +561,12 @@ Network::FilterStatus Filter::establishUpstreamConnection() {
// cluster->trafficStats()->upstream_cx_none_healthy in the latter case.
getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
return Network::FilterStatus::StopIteration;
}
return Network::FilterStatus::StopIteration;
// If receive before connect is set, allow the FilterChain iteration to
// continue so that the other filters in the filter chain can process the data.
return receive_before_connect_ ? Network::FilterStatus::Continue
: Network::FilterStatus::StopIteration;
}

void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) {
Expand Down Expand Up @@ -795,12 +815,32 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
if (upstream_) {
getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length());
upstream_->encodeData(data, end_stream);
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
} else if (receive_before_connect_) {
ENVOY_CONN_LOG(trace, "Early data received. Length: {}", read_callbacks_->connection(),
data.length());

// Buffer data received before upstream connection exists.
early_data_buffer_.move(data);

// TCP_PROXY cannot correctly make a decision on the amount of data
// the preceding filters need to read before the upstream connection is established.
// Hence, to protect the early data buffer, TCP_PROXY read disables the downstream on
// receiving the first chunk of data. The filter setting the receive_before_connect state should
// have a limit on the amount of data it needs to read before the upstream connection is
// established and pause the filter chain (by returning `StopIteration`) till it has read the
// data it needs or a max limit has been reached.
read_callbacks_->connection().readDisable(true);

config_->stats().early_data_received_count_total_.inc();
if (!early_data_end_stream_) {
early_data_end_stream_ = end_stream;
}
}
// The upstream should consume all of the data.
// Before there is an upstream the connection should be readDisabled. If the upstream is
// destroyed, there should be no further reads as well.
ASSERT(0 == data.length());
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -947,9 +987,25 @@ void Filter::onConnectMaxAttempts() {

void Filter::onUpstreamConnection() {
connecting_ = false;
// Re-enable downstream reads now that the upstream connection is established
// so we have a place to send downstream data to.
read_callbacks_->connection().readDisable(false);

// If we have received any data before upstream connection is established, send it to
// the upstream connection.
if (early_data_buffer_.length() > 0) {
// Early data should only happen when receive_before_connect is enabled.
ASSERT(receive_before_connect_);

ENVOY_CONN_LOG(trace, "TCP:onUpstreamEvent() Flushing early data buffer to upstream",
read_callbacks_->connection());
getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length());
upstream_->encodeData(early_data_buffer_, early_data_end_stream_);
ASSERT(0 == early_data_buffer_.length());

// Re-enable downstream reads now that the early data buffer is flushed.
read_callbacks_->connection().readDisable(false);
} else if (!receive_before_connect_) {
// Re-enable downstream reads now that the upstream connection is established
read_callbacks_->connection().readDisable(false);
}

read_callbacks_->upstreamHost()->outlierDetector().putResult(
Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);
Expand Down
19 changes: 19 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/assert.h"
#include "source/common/common/logger.h"
#include "source/common/formatter/substitution_format_string.h"
Expand All @@ -43,6 +44,15 @@ namespace TcpProxy {

constexpr absl::string_view PerConnectionIdleTimeoutMs =
"envoy.tcp_proxy.per_connection_idle_timeout_ms";
/**
* ReceiveBeforeConnectKey is the key for the receive_before_connect filter state. The
* filter state value is a ``StreamInfo::BoolAccessor`` indicating whether the
* receive_before_connect functionality should be enabled. Network filters setting this filter
* state should return `StopIteration` in their `onNewConnection` and `onData` methods until they
* have read the data they need before the upstream connection establishment, and only then allow
* the filter chain to proceed to the TCP_PROXY filter.
*/
constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_before_connect";

/**
* All tcp proxy stats. @see stats_macros.h
Expand All @@ -54,6 +64,7 @@ constexpr absl::string_view PerConnectionIdleTimeoutMs =
COUNTER(downstream_cx_tx_bytes_total) \
COUNTER(downstream_flow_control_paused_reading_total) \
COUNTER(downstream_flow_control_resumed_reading_total) \
COUNTER(early_data_received_count_total) \
COUNTER(idle_timeout) \
COUNTER(max_downstream_connection_duration) \
COUNTER(upstream_flush_total) \
Expand Down Expand Up @@ -665,6 +676,14 @@ class Filter : public Network::ReadFilter,
uint32_t connect_attempts_{};
bool connecting_{};
bool downstream_closed_{};
// Stores the ReceiveBeforeConnect filter state value which can be set by preceding
// filters in the filter chain. When the filter state is set, TCP_PROXY doesn't disable
// downstream read during initialization. This feature can hence be used by preceding filters
// in the filter chain to read data from the downstream connection (for eg: to parse SNI) before
// the upstream connection is established.
bool receive_before_connect_{false};
bool early_data_end_stream_{false};
Buffer::OwnedImpl early_data_buffer_{};
HttpStreamDecoderFilterCallbacks upstream_decoder_filter_callbacks_;
};

Expand Down
1 change: 1 addition & 0 deletions test/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_cc_test_library(
"//source/common/network:upstream_server_name_lib",
"//source/common/network:upstream_socket_options_filter_state_lib",
"//source/common/stats:stats_lib",
"//source/common/stream_info:bool_accessor_lib",
"//source/common/tcp_proxy",
"//source/common/upstream:upstream_includes",
"//source/common/upstream:upstream_lib",
Expand Down
99 changes: 95 additions & 4 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "source/common/network/upstream_socket_options_filter_state.h"
#include "source/common/network/win32_redirect_records_option_impl.h"
#include "source/common/router/metadatamatchcriteria_impl.h"
#include "source/common/stream_info/bool_accessor_impl.h"
#include "source/common/stream_info/uint64_accessor_impl.h"
#include "source/common/tcp_proxy/tcp_proxy.h"
#include "source/common/upstream/upstream_impl.h"
Expand Down Expand Up @@ -71,7 +72,7 @@ class TcpProxyTest : public TcpProxyTestBase {
}));
}
using TcpProxyTestBase::setup;
void setup(uint32_t connections, bool set_redirect_records,
void setup(uint32_t connections, bool set_redirect_records, bool receive_before_connect,
const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config) override {
if (config.has_on_demand()) {
EXPECT_CALL(factory_context_.server_factory_context_.cluster_manager_,
Expand Down Expand Up @@ -142,17 +143,31 @@ class TcpProxyTest : public TcpProxyTestBase {
->addOption(
Network::SocketOptionFactory::buildWFPRedirectRecordsOptions(*redirect_records));
}

filter_callbacks_.connection().streamInfo().filterState()->setData(
TcpProxy::ReceiveBeforeConnectKey,
std::make_unique<StreamInfo::BoolAccessorImpl>(receive_before_connect),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::Connection);

filter_ = std::make_unique<Filter>(config_,
factory_context_.server_factory_context_.cluster_manager_);
EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true));
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));

if (!receive_before_connect) {
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
}

filter_->initializeReadFilterCallbacks(filter_callbacks_);
filter_callbacks_.connection_.stream_info_.downstream_connection_info_provider_
->setSslConnection(filter_callbacks_.connection_.ssl());
}

if (connections > 0) {
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
auto expected_status_on_new_connection = receive_before_connect
? Network::FilterStatus::Continue
: Network::FilterStatus::StopIteration;
EXPECT_EQ(expected_status_on_new_connection, filter_->onNewConnection());
EXPECT_EQ(absl::optional<uint64_t>(), filter_->computeHashKey());
EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection());
EXPECT_EQ(nullptr, filter_->metadataMatchCriteria());
Expand Down Expand Up @@ -735,6 +750,82 @@ TEST_P(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) {
filter_callbacks_.connection_.runLowWatermarkCallbacks();
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectBuffersOnEarlyData) {
setup(/*connections=*/1, /*set_redirect_records=*/false, /*receive_before_connect=*/true);
std::string early_data("early data");
Buffer::OwnedImpl early_data_buffer(early_data);

// Check that the early data is buffered and flushed to upstream when connection is established.
// Also check that downstream connection is read disabled.
EXPECT_CALL(*upstream_connections_.at(0), write(_, _)).Times(0);
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
filter_->onData(early_data_buffer, /*end_stream=*/false);

// Now when upstream connection is established, early buffer will be sent.
EXPECT_CALL(*upstream_connections_.at(0), write(BufferStringEqual(early_data), false));
raiseEventUpstreamConnected(/*conn_index=*/0);

// Any further communications between client and server can resume normally.
Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _));
filter_->onData(buffer, false);

Buffer::OwnedImpl response("world");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_callbacks_->onUpstreamData(response, false);
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectEarlyDataWithEndStream) {
setup(/*connections=*/1, /*set_redirect_records=*/false, /*receive_before_connect=*/true);
std::string early_data("early data");
Buffer::OwnedImpl early_data_buffer(early_data);

// Early data is sent and downstream connection has indicated end of stream.
EXPECT_CALL(*upstream_connections_.at(0), write(_, _)).Times(0);
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
filter_->onData(early_data_buffer, /*end_stream=*/true);

// Now when upstream connection is established, early buffer will be sent.
EXPECT_CALL(*upstream_connections_.at(0),
write(BufferStringEqual(early_data), /*end_stream*/ true));
raiseEventUpstreamConnected(/*conn_index=*/0);

// Any further communications between client and server can resume normally.
Buffer::OwnedImpl response("hello");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_callbacks_->onUpstreamData(response, false);
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectNoEarlyData) {
setup(1, /*set_redirect_records=*/false, /*receive_before_connect=*/true);
raiseEventUpstreamConnected(/*conn_index=*/0, /*expect_read_enable=*/false);

// Any data sent after upstream connection is established is flushed directly to upstream,
// and downstream connection is not read disabled.
Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(filter_callbacks_.connection_, readDisable(_)).Times(0);
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _));
filter_->onData(buffer, /*end_stream=*/false);

Buffer::OwnedImpl response("world");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_callbacks_->onUpstreamData(response, false);
}

TEST_P(TcpProxyTest, ReceiveBeforeConnectSetToFalse) {
setup(1, /*set_redirect_records=*/false, /*receive_before_connect=*/false);
raiseEventUpstreamConnected(/*conn_index=*/0, /*expect_read_enable=*/true);

// Any further communications between client and server can resume normally.
Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _));
filter_->onData(buffer, false);

Buffer::OwnedImpl response("world");
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _));
upstream_callbacks_->onUpstreamData(response, false);
}

TEST_P(TcpProxyTest, DownstreamDisconnectRemote) {
setup(1);

Expand Down Expand Up @@ -1642,7 +1733,7 @@ TEST_P(TcpProxyTest, UpstreamSocketOptionsReturnedEmpty) {
}

TEST_P(TcpProxyTest, TcpProxySetRedirectRecordsToUpstream) {
setup(1, true);
setup(/*connections=*/1, /*set_redirect_records=*/true, /*receive_before_connect=*/false);
EXPECT_TRUE(filter_->upstreamSocketOptions());
auto iterator = std::find_if(
filter_->upstreamSocketOptions()->begin(), filter_->upstreamSocketOptions()->end(),
Expand Down
Loading

0 comments on commit bcaa95f

Please sign in to comment.