Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp_proxy: fix crash during buffer watermarks callbacks #37689

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ bug_fixes:
- area: csrf
change: |
Handle requests that have a "privacy sensitive" / opaque origin (``Origin: null``) as if the request had no origin information.
- area: udp_proxy
change: |
Fix a bug that cause Envoy to crash due to segmentation fault when onBelowWriteBufferLowWatermark callback is called.
- area: orca
change: |
The previous ORCA parser will use ``:`` as the delimiter of key/value pair in the native HTTP report. This is wrong
Expand Down
22 changes: 17 additions & 5 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ UdpProxyFilter::createSession(Network::UdpRecvData::LocalPeerAddresses&& address
UdpProxyFilter::ActiveSession*
UdpProxyFilter::createSessionWithOptionalHost(Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host) {
ActiveSessionPtr new_session;
ActiveSessionSharedPtr new_session;
if (config_->tunnelingConfig()) {
ASSERT(!host);
new_session = std::make_unique<TunnelingActiveSession>(*this, std::move(addresses));
new_session = std::make_shared<TunnelingActiveSession>(*this, std::move(addresses));
} else {
new_session = std::make_unique<UdpActiveSession>(*this, std::move(addresses), host);
new_session = std::make_shared<UdpActiveSession>(*this, std::move(addresses), host);
}

if (!new_session->createFilterChain()) {
Expand Down Expand Up @@ -1109,17 +1109,29 @@ void UdpProxyFilter::TunnelingActiveSession::onAboveWriteBufferHighWatermark() {
}

void UdpProxyFilter::TunnelingActiveSession::onBelowWriteBufferLowWatermark() {
can_send_upstream_ = true;
flushBuffer();
// In cases where onBelowWriteBufferLowWatermark is called during an active
// write process to the upstream, flushBuffer will cause another write operation
// on the upstream connection, which causes nghttp2 based codec to segmentation fault,
// and oghttp2 based codec to drop the new written data. To avoid this, posting the
// flush operation on the dispatcher, so it can be done after current write operation
// had finished.
filter_.read_callbacks_->udpListener().dispatcher().post(
[session = shared_from_this()]() { session->flushBuffer(); });
}

void UdpProxyFilter::TunnelingActiveSession::flushBuffer() {
if (!upstream_) {
return;
}

while (!datagrams_buffer_.empty()) {
BufferedDatagramPtr buffered_datagram = std::move(datagrams_buffer_.front());
datagrams_buffer_.pop();
buffered_bytes_ -= buffered_datagram->buffer_->length();
upstream_->encodeData(*buffered_datagram->buffer_);
}

can_send_upstream_ = true;
}

void UdpProxyFilter::TunnelingActiveSession::maybeBufferDatagram(Network::UdpRecvData& data) {
Expand Down
22 changes: 13 additions & 9 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
bool on_session_complete_called_{false};
};

using ActiveSessionPtr = std::unique_ptr<ActiveSession>;
using ActiveSessionSharedPtr = std::shared_ptr<ActiveSession>;

class UdpActiveSession : public Network::UdpPacketProcessor, public ActiveSession {
public:
Expand Down Expand Up @@ -732,7 +732,8 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
*/
class TunnelingActiveSession : public ActiveSession,
public UpstreamTunnelCallbacks,
public HttpStreamCallbacks {
public HttpStreamCallbacks,
public std::enable_shared_from_this<TunnelingActiveSession> {
public:
TunnelingActiveSession(UdpProxyFilter& filter,
Network::UdpRecvData::LocalPeerAddresses&& addresses);
Expand Down Expand Up @@ -810,7 +811,9 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
LocalPeerHostAddresses key{value->addresses(), value->host()};
return this->operator()(key);
}
size_t operator()(const ActiveSessionPtr& value) const { return this->operator()(value.get()); }
size_t operator()(const ActiveSessionSharedPtr& value) const {
return this->operator()(value.get());
}

private:
const bool consider_host_;
Expand All @@ -822,19 +825,19 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,

HeterogeneousActiveSessionEqual(const bool consider_host) : consider_host_(consider_host) {}

bool operator()(const ActiveSessionPtr& lhs,
bool operator()(const ActiveSessionSharedPtr& lhs,
const Network::UdpRecvData::LocalPeerAddresses& rhs) const {
return lhs->addresses() == rhs;
}
bool operator()(const ActiveSessionPtr& lhs, const LocalPeerHostAddresses& rhs) const {
bool operator()(const ActiveSessionSharedPtr& lhs, const LocalPeerHostAddresses& rhs) const {
return this->operator()(lhs, rhs.local_peer_addresses_) &&
(consider_host_ ? &lhs->host().value().get() == &rhs.host_.value().get() : true);
}
bool operator()(const ActiveSessionPtr& lhs, const ActiveSession* rhs) const {
bool operator()(const ActiveSessionSharedPtr& lhs, const ActiveSession* rhs) const {
LocalPeerHostAddresses key{rhs->addresses(), rhs->host()};
return this->operator()(lhs, key);
}
bool operator()(const ActiveSessionPtr& lhs, const ActiveSessionPtr& rhs) const {
bool operator()(const ActiveSessionSharedPtr& lhs, const ActiveSessionSharedPtr& rhs) const {
return this->operator()(lhs, rhs.get());
}

Expand Down Expand Up @@ -878,8 +881,9 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter,
};

using ClusterInfoPtr = std::unique_ptr<ClusterInfo>;
using SessionStorageType = absl::flat_hash_set<ActiveSessionPtr, HeterogeneousActiveSessionHash,
HeterogeneousActiveSessionEqual>;
using SessionStorageType =
absl::flat_hash_set<ActiveSessionSharedPtr, HeterogeneousActiveSessionHash,
HeterogeneousActiveSessionEqual>;

const UdpProxyFilterConfigSharedPtr config_;
SessionStorageType sessions_;
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/udp/udp_proxy/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ MockUdpTunnelingConfig::~MockUdpTunnelingConfig() = default;

MockTunnelCreationCallbacks::~MockTunnelCreationCallbacks() = default;
MockUpstreamTunnelCallbacks::~MockUpstreamTunnelCallbacks() = default;
MockHttpUpstream::~MockHttpUpstream() = default;
MockHttpStreamCallbacks::~MockHttpStreamCallbacks() = default;

} // namespace SessionFilters
Expand Down
8 changes: 8 additions & 0 deletions test/extensions/filters/udp/udp_proxy/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ class MockUpstreamTunnelCallbacks : public UpstreamTunnelCallbacks {
MOCK_METHOD(void, onUpstreamData, (Buffer::Instance & data, bool end_stream));
};

class MockHttpUpstream : public HttpUpstream {
public:
~MockHttpUpstream() override;

MOCK_METHOD(void, encodeData, (Buffer::Instance & data));
MOCK_METHOD(void, onDownstreamEvent, (Network::ConnectionEvent event));
};

class MockHttpStreamCallbacks : public HttpStreamCallbacks {
public:
~MockHttpStreamCallbacks() override;
Expand Down
156 changes: 156 additions & 0 deletions test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ using testing::Return;
using testing::ReturnNew;
using testing::ReturnRef;
using testing::SaveArg;
using testing::Throw;

namespace Envoy {
namespace Extensions {
Expand All @@ -54,6 +55,19 @@ class TestUdpProxyFilter : public virtual UdpProxyFilter {
public:
using UdpProxyFilter::UdpProxyFilter;

std::shared_ptr<TunnelingActiveSession> createTunnelingSession() {
Network::UdpRecvData::LocalPeerAddresses addresses;
addresses.peer_ = Network::Utility::parseInternetAddressAndPortNoThrow("10.0.0.1:1000");
addresses.local_ = Network::Utility::parseInternetAddressAndPortNoThrow("10.0.0.2:80");

std::shared_ptr<TunnelingActiveSession> tunneling_session =
std::make_shared<TunnelingActiveSession>(*this, std::move(addresses));
sessions_.emplace(tunneling_session);
config_->stats().downstream_sess_active_.inc();

return tunneling_session;
}

TestUdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks,
const UdpProxyFilterConfigSharedPtr& config)
: UdpProxyFilter(callbacks, config) {}
Expand Down Expand Up @@ -1811,6 +1825,148 @@ stat_prefix: foo
EXPECT_EQ(output_.back(), "1");
}

TEST_F(UdpProxyFilterTest, TunnelingSessionHighWatermarkDoesNotThrow) {
Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_);
EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0);

setup(readConfig(R"EOF(
stat_prefix: foo
matcher:
on_no_match:
action:
name: route
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route
cluster: fake_cluster
tunneling_config:
proxy_host: host.com
target_host: host.com
default_target_port: 30
buffer_options:
max_buffered_datagrams: 1000
max_buffered_bytes: 10000
)EOF"),
true);

auto session = filter_->createTunnelingSession();
EXPECT_NO_THROW(session->onAboveWriteBufferHighWatermark());
session->onSessionComplete();
}

TEST_F(UdpProxyFilterTest, TunnelingSessionUpstreamClosedDuringFlush) {
Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_);
EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0);

setup(readConfig(R"EOF(
stat_prefix: foo
matcher:
on_no_match:
action:
name: route
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route
cluster: fake_cluster
tunneling_config:
proxy_host: host.com
target_host: host.com
default_target_port: 30
buffer_options:
max_buffered_datagrams: 1000
max_buffered_bytes: 10000
)EOF"),
true);

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.downstream_connection_info_provider_->setConnectionID(0);
Upstream::HostDescriptionConstSharedPtr upstream_host;
Network::ConnectionInfoSetterImpl address_provider(nullptr, nullptr);

auto session = filter_->createTunnelingSession();

Network::UdpRecvData data1;
data1.buffer_ = std::make_unique<Buffer::OwnedImpl>("initial buffered data");
session->writeUpstream(data1);

Event::PostCb resume_post_cb;
EXPECT_CALL(callbacks_.udp_listener_.dispatcher_, post(_)).WillOnce([&](Event::PostCb cb) {
session->onUpstreamEvent(Network::ConnectionEvent::RemoteClose);
resume_post_cb = std::move(cb);
});

auto* upstream = new NiceMock<SessionFilters::MockHttpUpstream>();
EXPECT_CALL(*upstream, encodeData(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
session->onBelowWriteBufferLowWatermark();
}));

session->onNewSession();
session->onStreamReady(&stream_info, std::unique_ptr<HttpUpstream>{upstream}, upstream_host,
address_provider, nullptr);
}

TEST_F(UdpProxyFilterTest, TunnelingSessionFlushBufferCauseLowWatermark) {
Event::MockTimer* idle_timer = new Event::MockTimer(&callbacks_.udp_listener_.dispatcher_);
EXPECT_CALL(*idle_timer, enableTimer(_, _)).Times(0);

setup(readConfig(R"EOF(
stat_prefix: foo
matcher:
on_no_match:
action:
name: route
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.udp.udp_proxy.v3.Route
cluster: fake_cluster
tunneling_config:
proxy_host: host.com
target_host: host.com
default_target_port: 30
buffer_options:
max_buffered_datagrams: 1000
max_buffered_bytes: 10000
)EOF"),
true);

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.downstream_connection_info_provider_->setConnectionID(0);
Upstream::HostDescriptionConstSharedPtr upstream_host;
Network::ConnectionInfoSetterImpl address_provider(nullptr, nullptr);

auto session = filter_->createTunnelingSession();

// Since stream is not ready, two datagrams will be buffered.
Network::UdpRecvData data1;
data1.buffer_ = std::make_unique<Buffer::OwnedImpl>("initial buffered data");
session->writeUpstream(data1);
Network::UdpRecvData data2;
data2.buffer_ = std::make_unique<Buffer::OwnedImpl>("initial buffered data");
session->writeUpstream(data2);

bool writing = false;
Event::PostCb resume_post_cb;
EXPECT_CALL(callbacks_.udp_listener_.dispatcher_, post(_)).WillOnce([&](Event::PostCb cb) {
writing = false;
resume_post_cb = std::move(cb);
});

auto* upstream = new NiceMock<SessionFilters::MockHttpUpstream>();
EXPECT_CALL(*upstream, encodeData(_))
.WillOnce(Invoke([&](Buffer::Instance&) -> void {
writing = true;
session->onBelowWriteBufferLowWatermark();
}))
.WillOnce(Invoke([&](Buffer::Instance&) -> void {
if (writing) {
throw EnvoyException("write upstream operation while already during previous write");
}

resume_post_cb();
}));

session->onNewSession();
session->onStreamReady(&stream_info, std::unique_ptr<HttpUpstream>{upstream}, upstream_host,
address_provider, nullptr);
}

using MockUdpTunnelingConfig = SessionFilters::MockUdpTunnelingConfig;
using MockUpstreamTunnelCallbacks = SessionFilters::MockUpstreamTunnelCallbacks;
using MockTunnelCreationCallbacks = SessionFilters::MockTunnelCreationCallbacks;
Expand Down