Skip to content

Commit

Permalink
[CallAttemptTracer] Fix call attempt tracer lifetimes for retries (gr…
Browse files Browse the repository at this point in the history
…pc#38729)

Fix grpc#38728 heap-use-after-free. Details in the issue.

Also, fix a bug in chttp2 where we are using the parent call tracer instead of the call attempt tracer to record annotations for a stream.

Test is being added in grpc#38437

Closes grpc#38729

COPYBARA_INTEGRATE_REVIEW=grpc#38729 from yashykt:FixCallAttemptTracer cb09add
PiperOrigin-RevId: 729307540
  • Loading branch information
yashykt authored and copybara-github committed Feb 21, 2025
1 parent ac9ceb2 commit 211f9ec
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 50 deletions.
1 change: 1 addition & 0 deletions bazel/experiments.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions src/core/client_channel/client_channel_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2379,12 +2379,14 @@ class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final

namespace {

void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) {
ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer(
Arena* arena, bool is_transparent_retry) {
auto* call_tracer = DownCast<ClientCallTracer*>(
arena->GetContext<CallTracerAnnotationInterface>());
if (call_tracer == nullptr) return;
if (call_tracer == nullptr) return nullptr;
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
arena->SetContext<CallTracerInterface>(tracer);
return tracer;
}

} // namespace
Expand All @@ -2396,9 +2398,10 @@ ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
? "LoadBalancedCall"
: nullptr),
chand_(chand),
call_attempt_tracer_(
CreateCallAttemptTracer(arena, is_transparent_retry)),
on_commit_(std::move(on_commit)),
arena_(arena) {
CreateCallAttemptTracer(arena, is_transparent_retry);
GRPC_TRACE_LOG(client_channel_lb_call, INFO)
<< "chand=" << chand_ << " lb_call=" << this << ": created";
}
Expand Down
8 changes: 6 additions & 2 deletions src/core/client_channel/client_channel_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ class ClientChannelFilter::LoadBalancedCall
protected:
ClientChannelFilter* chand() const { return chand_; }
ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const {
return DownCast<ClientCallTracer::CallAttemptTracer*>(
arena_->GetContext<CallTracerInterface>());
return call_attempt_tracer_;
}
ConnectedSubchannel* connected_subchannel() const {
return connected_subchannel_.get();
Expand Down Expand Up @@ -431,6 +430,11 @@ class ClientChannelFilter::LoadBalancedCall
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0;

ClientChannelFilter* chand_;
// When we start a new attempt for a call, we might not have cleaned up the
// previous attempt yet leading to a situation where we have two active call
// attempt tracers, and so we cannot rely on the arena to give us the right
// tracer when performing cleanup.
ClientCallTracer::CallAttemptTracer* call_attempt_tracer_;

absl::AnyInvocable<void()> on_commit_;

Expand Down
23 changes: 13 additions & 10 deletions src/core/ext/filters/http/message_compress/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ ChannelCompression::ChannelCompression(const ChannelArgs& args)
}

MessageHandle ChannelCompression::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
MessageHandle message, grpc_compression_algorithm algorithm,
CallTracerInterface* call_tracer) const {
GRPC_TRACE_LOG(compression, INFO)
<< "CompressMessage: len=" << message->payload()->Length()
<< " alg=" << algorithm << " flags=" << message->flags();
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message);
}
Expand Down Expand Up @@ -155,12 +155,12 @@ MessageHandle ChannelCompression::CompressMessage(
}

absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
bool is_client, MessageHandle message, DecompressArgs args) const {
bool is_client, MessageHandle message, DecompressArgs args,
CallTracerInterface* call_tracer) const {
GRPC_TRACE_LOG(compression, INFO)
<< "DecompressMessage: len=" << message->payload()->Length()
<< " max=" << args.max_recv_message_length.value_or(-1)
<< " alg=" << args.algorithm;
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message);
}
Expand Down Expand Up @@ -233,14 +233,15 @@ void ClientCompressionFilter::Call::OnClientInitialMetadata(
"ClientCompressionFilter::Call::OnClientInitialMetadata");
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
call_tracer_ = MaybeGetContext<CallTracerInterface>();
}

MessageHandle ClientCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ClientCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnClientToServerMessage");
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
return filter->compression_engine_.CompressMessage(
std::move(message), compression_algorithm_, call_tracer_);
}

void ClientCompressionFilter::Call::OnServerInitialMetadata(
Expand All @@ -256,7 +257,7 @@ ClientCompressionFilter::Call::OnServerToClientMessage(
GRPC_LATENT_SEE_INNER_SCOPE(
"ClientCompressionFilter::Call::OnServerToClientMessage");
return filter->compression_engine_.DecompressMessage(
/*is_client=*/true, std::move(message), decompress_args_);
/*is_client=*/true, std::move(message), decompress_args_, call_tracer_);
}

void ServerCompressionFilter::Call::OnClientInitialMetadata(
Expand All @@ -272,7 +273,8 @@ ServerCompressionFilter::Call::OnClientToServerMessage(
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnClientToServerMessage");
return filter->compression_engine_.DecompressMessage(
/*is_client=*/false, std::move(message), decompress_args_);
/*is_client=*/false, std::move(message), decompress_args_,
MaybeGetContext<CallTracerInterface>());
}

void ServerCompressionFilter::Call::OnServerInitialMetadata(
Expand All @@ -287,8 +289,9 @@ MessageHandle ServerCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ServerCompressionFilter* filter) {
GRPC_LATENT_SEE_INNER_SCOPE(
"ServerCompressionFilter::Call::OnServerToClientMessage");
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
return filter->compression_engine_.CompressMessage(
std::move(message), compression_algorithm_,
MaybeGetContext<CallTracerInterface>());
}

} // namespace grpc_core
12 changes: 8 additions & 4 deletions src/core/ext/filters/http/message_compress/compression_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ class ChannelCompression {

// Compress one message synchronously.
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
grpc_compression_algorithm algorithm,
CallTracerInterface* call_tracer) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(bool is_client,
MessageHandle message,
DecompressArgs args) const;
absl::StatusOr<MessageHandle> DecompressMessage(
bool is_client, MessageHandle message, DecompressArgs args,
CallTracerInterface* call_tracer) const;

private:
// Max receive message length, if set.
Expand Down Expand Up @@ -137,6 +138,9 @@ class ClientCompressionFilter final
private:
grpc_compression_algorithm compression_algorithm_;
ChannelCompression::DecompressArgs decompress_args_;
// TODO(yashykt): Remove call_tracer_ after migration to call v3 stack. (See
// https://github.com/grpc/grpc/pull/38729 for more information.)
CallTracerInterface* call_tracer_ = nullptr;
};

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void Chttp2CallTracerWrapper::RecordIncomingBytes(
stream_->stats.incoming.header_bytes += transport_byte_size.header_bytes;
// Update new API.
if (!IsCallTracerInTransportEnabled()) return;
auto* call_tracer = stream_->arena->GetContext<CallTracerInterface>();
auto* call_tracer = stream_->CallTracer();
if (call_tracer != nullptr) {
call_tracer->RecordIncomingBytes(transport_byte_size);
}
Expand All @@ -44,7 +44,7 @@ void Chttp2CallTracerWrapper::RecordOutgoingBytes(
stream_->stats.outgoing.header_bytes +=
transport_byte_size.header_bytes; // Update new API.
if (!IsCallTracerInTransportEnabled()) return;
auto* call_tracer = stream_->arena->GetContext<CallTracerInterface>();
auto* call_tracer = stream_->CallTracer();
if (call_tracer != nullptr) {
call_tracer->RecordOutgoingBytes(transport_byte_size);
}
Expand Down
27 changes: 17 additions & 10 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ namespace {
using EventEngine = ::grpc_event_engine::experimental::EventEngine;
using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;

grpc_core::CallTracerAnnotationInterface* CallTracerIfSampled(
grpc_core::CallTracerAnnotationInterface* ParentCallTracerIfSampled(
grpc_chttp2_stream* s) {
auto* call_tracer =
auto* parent_call_tracer =
s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
if (call_tracer == nullptr || !call_tracer->IsSampled()) {
if (parent_call_tracer == nullptr || !parent_call_tracer->IsSampled()) {
return nullptr;
}
return call_tracer;
return parent_call_tracer;
}

std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
Expand Down Expand Up @@ -1381,16 +1381,16 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
}

static void trace_annotations(grpc_chttp2_stream* s) {
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s->call_tracer != nullptr) {
s->call_tracer->RecordAnnotation(
if (!grpc_core::IsCallTracerTransportFixEnabled()) {
if (s->parent_call_tracer != nullptr) {
s->parent_call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
} else {
auto* call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>();
auto* call_tracer = s->CallTracer();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
Expand Down Expand Up @@ -1646,8 +1646,15 @@ static void perform_stream_op_locked(void* stream_op,
grpc_chttp2_transport* t = s->t.get();

s->traced = op->is_traced;
if (!grpc_core::IsCallTracerInTransportEnabled()) {
s->call_tracer = CallTracerIfSampled(s);
if (!grpc_core::IsCallTracerTransportFixEnabled()) {
s->parent_call_tracer = ParentCallTracerIfSampled(s);
}
// TODO(yashykt): Remove call_tracer field after transition to call v3. (See
// https://github.com/grpc/grpc/pull/38729 for more information.) On the
// client, the call attempt tracer will be available for use when the
// send_initial_metadata op arrives.
if (s->t->is_client && op->send_initial_metadata) {
s->call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>();
}
s->tcp_tracer = TcpTracerIfSampled(s);
if (GRPC_TRACE_FLAG_ENABLED(http)) {
Expand Down
3 changes: 2 additions & 1 deletion src/core/ext/transport/chttp2/transport/hpack_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,8 @@ grpc_error_handle HPackParser::ParseInput(
HandleMetadataSoftSizeLimitExceeded(&input);
}
global_stats().IncrementHttp2MetadataSize(state_.frame_length);
if (call_tracer != nullptr && metadata_buffer_ != nullptr) {
if (call_tracer != nullptr && call_tracer->IsSampled() &&
metadata_buffer_ != nullptr) {
MetadataSizesAnnotation metadata_sizes_annotation(
metadata_buffer_, state_.metadata_early_detection.soft_limit(),
state_.metadata_early_detection.hard_limit());
Expand Down
16 changes: 12 additions & 4 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,10 @@ struct grpc_chttp2_stream {

grpc_core::Chttp2CallTracerWrapper call_tracer_wrapper;

/// Only set when enabled.
// TODO(roth): Remove this when the call_tracer_in_transport
// experiment finishes rolling out.
grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
// TODO(roth): Remove this when call v3 is supported.
grpc_core::CallTracerInterface* call_tracer = nullptr;
// TODO(yashykt): Remove this once call_tracer_transport_fix is rolled out
grpc_core::CallTracerAnnotationInterface* parent_call_tracer = nullptr;

/// Only set when enabled.
std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer;
Expand All @@ -700,6 +700,14 @@ struct grpc_chttp2_stream {
// The last time a stream window update was received.
grpc_core::Timestamp last_window_update_time =
grpc_core::Timestamp::InfPast();

// TODO(yashykt): Remove this when call v3 is supported.
grpc_core::CallTracerInterface* CallTracer() const {
if (t->is_client) {
return call_tracer;
}
return arena->GetContext<grpc_core::CallTracerInterface>();
}
};

#define GRPC_ARG_PING_TIMEOUT_MS "grpc.http2.ping_timeout_ms"
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chttp2/transport/parsing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,8 @@ grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
s->call_tracer_wrapper.RecordIncomingBytes(
{0, 0, GRPC_SLICE_LENGTH(slice)});
call_tracer =
grpc_core::IsCallTracerInTransportEnabled()
? s->arena->GetContext<grpc_core::CallTracerInterface>()
grpc_core::IsCallTracerTransportFixEnabled()
? s->CallTracer()
: s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
}
grpc_error_handle error = parser->Parse(
Expand Down
18 changes: 8 additions & 10 deletions src/core/ext/transport/chttp2/transport/writing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ class StreamWriteContext {
grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished,
absl::OkStatus(),
"send_initial_metadata_finished");
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s_->call_tracer) {
if (!grpc_core::IsCallTracerTransportFixEnabled()) {
if (s_->parent_call_tracer != nullptr) {
grpc_core::HttpAnnotation::WriteStats write_stats;
write_stats.target_write_size = write_context_->target_write_size();
s_->call_tracer->RecordAnnotation(
s_->parent_call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(
grpc_core::HttpAnnotation::Type::kHeadWritten,
gpr_now(GPR_CLOCK_REALTIME))
Expand All @@ -485,8 +485,7 @@ class StreamWriteContext {
.Add(write_stats));
}
} else {
auto* call_tracer =
s_->arena->GetContext<grpc_core::CallTracerInterface>();
auto* call_tracer = s_->CallTracer();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
grpc_core::HttpAnnotation::WriteStats write_stats;
write_stats.target_write_size = write_context_->target_write_size();
Expand Down Expand Up @@ -638,17 +637,16 @@ class StreamWriteContext {
}
grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
absl::OkStatus());
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s_->call_tracer) {
s_->call_tracer->RecordAnnotation(
if (!grpc_core::IsCallTracerTransportFixEnabled()) {
if (s_->parent_call_tracer != nullptr) {
s_->parent_call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s_->t->flow_control.stats())
.Add(s_->flow_control.stats()));
}
} else {
auto* call_tracer =
s_->arena->GetContext<grpc_core::CallTracerInterface>();
auto* call_tracer = s_->CallTracer();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
Expand Down
15 changes: 15 additions & 0 deletions src/core/lib/experiments/experiments.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 211f9ec

Please sign in to comment.