diff --git a/changelogs/current.yaml b/changelogs/current.yaml index aa4910780d1d..3de9dcd78f16 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -163,6 +163,9 @@ bug_fixes: - area: csrf change: | Handle requests that have a "privacy sensitive" / opaque origin (``Origin: null``) as if the request had no origin information. +- area: udp_proxy + change: | + Fix a bug that cause Envoy to crash due to segmentation fault when onBelowWriteBufferLowWatermark callback is called. - area: orca change: | The previous ORCA parser will use ``:`` as the delimiter of key/value pair in the native HTTP report. This is wrong diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index d0e941b2a347..e85cf26c86f0 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -279,12 +279,12 @@ UdpProxyFilter::createSession(Network::UdpRecvData::LocalPeerAddresses&& address UdpProxyFilter::ActiveSession* UdpProxyFilter::createSessionWithOptionalHost(Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host) { - ActiveSessionPtr new_session; + ActiveSessionSharedPtr new_session; if (config_->tunnelingConfig()) { ASSERT(!host); - new_session = std::make_unique(*this, std::move(addresses)); + new_session = std::make_shared(*this, std::move(addresses)); } else { - new_session = std::make_unique(*this, std::move(addresses), host); + new_session = std::make_shared(*this, std::move(addresses), host); } if (!new_session->createFilterChain()) { @@ -1109,17 +1109,29 @@ void UdpProxyFilter::TunnelingActiveSession::onAboveWriteBufferHighWatermark() { } void UdpProxyFilter::TunnelingActiveSession::onBelowWriteBufferLowWatermark() { - can_send_upstream_ = true; - flushBuffer(); + // In cases where onBelowWriteBufferLowWatermark is called during an active + // write process to the upstream, flushBuffer will cause another write operation + // on the upstream connection, which causes nghttp2 based codec to segmentation fault, + // and oghttp2 based codec to drop the new written data. To avoid this, posting the + // flush operation on the dispatcher, so it can be done after current write operation + // had finished. + filter_.read_callbacks_->udpListener().dispatcher().post( + [session = shared_from_this()]() { session->flushBuffer(); }); } void UdpProxyFilter::TunnelingActiveSession::flushBuffer() { + if (!upstream_) { + return; + } + while (!datagrams_buffer_.empty()) { BufferedDatagramPtr buffered_datagram = std::move(datagrams_buffer_.front()); datagrams_buffer_.pop(); buffered_bytes_ -= buffered_datagram->buffer_->length(); upstream_->encodeData(*buffered_datagram->buffer_); } + + can_send_upstream_ = true; } void UdpProxyFilter::TunnelingActiveSession::maybeBufferDatagram(Network::UdpRecvData& data) { diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index b4e46ca5cd74..fcb7bb14910d 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -673,7 +673,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, bool on_session_complete_called_{false}; }; - using ActiveSessionPtr = std::unique_ptr; + using ActiveSessionSharedPtr = std::shared_ptr; class UdpActiveSession : public Network::UdpPacketProcessor, public ActiveSession { public: @@ -732,7 +732,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, */ class TunnelingActiveSession : public ActiveSession, public UpstreamTunnelCallbacks, - public HttpStreamCallbacks { + public HttpStreamCallbacks, + public std::enable_shared_from_this { public: TunnelingActiveSession(UdpProxyFilter& filter, Network::UdpRecvData::LocalPeerAddresses&& addresses); @@ -810,7 +811,9 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, LocalPeerHostAddresses key{value->addresses(), value->host()}; return this->operator()(key); } - size_t operator()(const ActiveSessionPtr& value) const { return this->operator()(value.get()); } + size_t operator()(const ActiveSessionSharedPtr& value) const { + return this->operator()(value.get()); + } private: const bool consider_host_; @@ -822,19 +825,19 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, HeterogeneousActiveSessionEqual(const bool consider_host) : consider_host_(consider_host) {} - bool operator()(const ActiveSessionPtr& lhs, + bool operator()(const ActiveSessionSharedPtr& lhs, const Network::UdpRecvData::LocalPeerAddresses& rhs) const { return lhs->addresses() == rhs; } - bool operator()(const ActiveSessionPtr& lhs, const LocalPeerHostAddresses& rhs) const { + bool operator()(const ActiveSessionSharedPtr& lhs, const LocalPeerHostAddresses& rhs) const { return this->operator()(lhs, rhs.local_peer_addresses_) && (consider_host_ ? &lhs->host().value().get() == &rhs.host_.value().get() : true); } - bool operator()(const ActiveSessionPtr& lhs, const ActiveSession* rhs) const { + bool operator()(const ActiveSessionSharedPtr& lhs, const ActiveSession* rhs) const { LocalPeerHostAddresses key{rhs->addresses(), rhs->host()}; return this->operator()(lhs, key); } - bool operator()(const ActiveSessionPtr& lhs, const ActiveSessionPtr& rhs) const { + bool operator()(const ActiveSessionSharedPtr& lhs, const ActiveSessionSharedPtr& rhs) const { return this->operator()(lhs, rhs.get()); } @@ -878,8 +881,9 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, }; using ClusterInfoPtr = std::unique_ptr; - using SessionStorageType = absl::flat_hash_set; + using SessionStorageType = + absl::flat_hash_set; const UdpProxyFilterConfigSharedPtr config_; SessionStorageType sessions_; diff --git a/test/extensions/filters/udp/udp_proxy/mocks.cc b/test/extensions/filters/udp/udp_proxy/mocks.cc index 7c654662a83e..61668b63a571 100644 --- a/test/extensions/filters/udp/udp_proxy/mocks.cc +++ b/test/extensions/filters/udp/udp_proxy/mocks.cc @@ -38,6 +38,7 @@ MockUdpTunnelingConfig::~MockUdpTunnelingConfig() = default; MockTunnelCreationCallbacks::~MockTunnelCreationCallbacks() = default; MockUpstreamTunnelCallbacks::~MockUpstreamTunnelCallbacks() = default; +MockHttpUpstream::~MockHttpUpstream() = default; MockHttpStreamCallbacks::~MockHttpStreamCallbacks() = default; } // namespace SessionFilters diff --git a/test/extensions/filters/udp/udp_proxy/mocks.h b/test/extensions/filters/udp/udp_proxy/mocks.h index fc9135e99b90..0a68c166faf7 100644 --- a/test/extensions/filters/udp/udp_proxy/mocks.h +++ b/test/extensions/filters/udp/udp_proxy/mocks.h @@ -92,6 +92,14 @@ class MockUpstreamTunnelCallbacks : public UpstreamTunnelCallbacks { MOCK_METHOD(void, onUpstreamData, (Buffer::Instance & data, bool end_stream)); }; +class MockHttpUpstream : public HttpUpstream { +public: + ~MockHttpUpstream() override; + + MOCK_METHOD(void, encodeData, (Buffer::Instance & data)); + MOCK_METHOD(void, onDownstreamEvent, (Network::ConnectionEvent event)); +}; + class MockHttpStreamCallbacks : public HttpStreamCallbacks { public: ~MockHttpStreamCallbacks() override; diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index 219dbe974929..f40f4fc9b046 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -43,6 +43,7 @@ using testing::Return; using testing::ReturnNew; using testing::ReturnRef; using testing::SaveArg; +using testing::Throw; namespace Envoy { namespace Extensions { @@ -54,6 +55,19 @@ class TestUdpProxyFilter : public virtual UdpProxyFilter { public: using UdpProxyFilter::UdpProxyFilter; + std::shared_ptr createTunnelingSession() { + Network::UdpRecvData::LocalPeerAddresses addresses; + addresses.peer_ = Network::Utility::parseInternetAddressAndPortNoThrow("10.0.0.1:1000"); + addresses.local_ = Network::Utility::parseInternetAddressAndPortNoThrow("10.0.0.2:80"); + + std::shared_ptr tunneling_session = + std::make_shared(*this, std::move(addresses)); + sessions_.emplace(tunneling_session); + config_->stats().downstream_sess_active_.inc(); + + return tunneling_session; + } + TestUdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks, const UdpProxyFilterConfigSharedPtr& config) : UdpProxyFilter(callbacks, config) {} @@ -1811,6 +1825,148 @@ stat_prefix: foo EXPECT_EQ(output_.back(), "1"); } +TEST_F(UdpProxyFilterTest, TunnelingSessionHighWatermarkDoesNotThrow) { + Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); + EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0); + + setup(readConfig(R"EOF( +stat_prefix: foo +matcher: + on_no_match: + action: + name: route + typed_config: + '@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route + cluster: fake_cluster +tunneling_config: + proxy_host: host.com + target_host: host.com + default_target_port: 30 + buffer_options: + max_buffered_datagrams: 1000 + max_buffered_bytes: 10000 + )EOF"), + true); + + auto session = filter_->createTunnelingSession(); + EXPECT_NO_THROW(session->onAboveWriteBufferHighWatermark()); + session->onSessionComplete(); +} + +TEST_F(UdpProxyFilterTest, TunnelingSessionUpstreamClosedDuringFlush) { + Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); + EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0); + + setup(readConfig(R"EOF( +stat_prefix: foo +matcher: + on_no_match: + action: + name: route + typed_config: + '@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route + cluster: fake_cluster +tunneling_config: + proxy_host: host.com + target_host: host.com + default_target_port: 30 + buffer_options: + max_buffered_datagrams: 1000 + max_buffered_bytes: 10000 + )EOF"), + true); + + NiceMock stream_info; + stream_info.downstream_connection_info_provider_->setConnectionID(0); + Upstream::HostDescriptionConstSharedPtr upstream_host; + Network::ConnectionInfoSetterImpl address_provider(nullptr, nullptr); + + auto session = filter_->createTunnelingSession(); + + Network::UdpRecvData data1; + data1.buffer_ = std::make_unique("initial buffered data"); + session->writeUpstream(data1); + + Event::PostCb resume_post_cb; + EXPECT_CALL(callbacks_.udp_listener_.dispatcher_, post(_)).WillOnce([&](Event::PostCb cb) { + session->onUpstreamEvent(Network::ConnectionEvent::RemoteClose); + resume_post_cb = std::move(cb); + }); + + auto* upstream = new NiceMock(); + EXPECT_CALL(*upstream, encodeData(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + session->onBelowWriteBufferLowWatermark(); + })); + + session->onNewSession(); + session->onStreamReady(&stream_info, std::unique_ptr{upstream}, upstream_host, + address_provider, nullptr); +} + +TEST_F(UdpProxyFilterTest, TunnelingSessionFlushBufferCauseLowWatermark) { + Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_); + EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0); + + setup(readConfig(R"EOF( +stat_prefix: foo +matcher: + on_no_match: + action: + name: route + typed_config: + '@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route + cluster: fake_cluster +tunneling_config: + proxy_host: host.com + target_host: host.com + default_target_port: 30 + buffer_options: + max_buffered_datagrams: 1000 + max_buffered_bytes: 10000 + )EOF"), + true); + + NiceMock stream_info; + stream_info.downstream_connection_info_provider_->setConnectionID(0); + Upstream::HostDescriptionConstSharedPtr upstream_host; + Network::ConnectionInfoSetterImpl address_provider(nullptr, nullptr); + + auto session = filter_->createTunnelingSession(); + + // Since stream is not ready, two datagrams will be buffered. + Network::UdpRecvData data1; + data1.buffer_ = std::make_unique("initial buffered data"); + session->writeUpstream(data1); + Network::UdpRecvData data2; + data2.buffer_ = std::make_unique("initial buffered data"); + session->writeUpstream(data2); + + bool writing = false; + Event::PostCb resume_post_cb; + EXPECT_CALL(callbacks_.udp_listener_.dispatcher_, post(_)).WillOnce([&](Event::PostCb cb) { + writing = false; + resume_post_cb = std::move(cb); + }); + + auto* upstream = new NiceMock(); + EXPECT_CALL(*upstream, encodeData(_)) + .WillOnce(Invoke([&](Buffer::Instance&) -> void { + writing = true; + session->onBelowWriteBufferLowWatermark(); + })) + .WillOnce(Invoke([&](Buffer::Instance&) -> void { + if (writing) { + throw EnvoyException("write upstream operation while already during previous write"); + } + + resume_post_cb(); + })); + + session->onNewSession(); + session->onStreamReady(&stream_info, std::unique_ptr{upstream}, upstream_host, + address_provider, nullptr); +} + using MockUdpTunnelingConfig = SessionFilters::MockUdpTunnelingConfig; using MockUpstreamTunnelCallbacks = SessionFilters::MockUpstreamTunnelCallbacks; using MockTunnelCreationCallbacks = SessionFilters::MockTunnelCreationCallbacks;