From 38a290159d74d3ef1703b5dd02e2b835f2781c2b Mon Sep 17 00:00:00 2001 From: Hannah Shi Date: Fri, 17 Jan 2025 08:44:31 -0800 Subject: [PATCH 1/7] [ObjC] Raise Socket closed error when cfstream endpoint receive 0 byte data (#38359) While investigating https://github.com/firebase/firebase-ios-sdk/issues/14018, I noticed a different behavior of cfstream endpoint, where it returns ok when reading 0 bytes data while [posix_endpoint](https://github.com/grpc/grpc/blob/v1.69.0/src/core/lib/event_engine/posix_engine/posix_endpoint.cc#L355-L356) raises socket closed error. This PR aligns the cfstream endpoint behavior with posix endpoint Closes #38359 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38359 from HannahShiSFB:event-engine-endpoint-lifecycle bbeab2aec4e25ab65443cf9fae673ad6df45d652 PiperOrigin-RevId: 716683888 --- src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc | 3 ++- tools/internal_ci/macos/grpc_objc_bazel_test.sh | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc index f231f9c5e342a..60f48bf22c710 100644 --- a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc @@ -298,7 +298,8 @@ void CFStreamEndpointImpl::DoRead( } buffer->RemoveLastNBytes(buffer->Length() - read_size); - on_read(absl::OkStatus()); + on_read(read_size == 0 ? absl::InternalError("Socket closed") + : absl::OkStatus()); } bool CFStreamEndpointImpl::Write( diff --git a/tools/internal_ci/macos/grpc_objc_bazel_test.sh b/tools/internal_ci/macos/grpc_objc_bazel_test.sh index 2fc8a6cb215b1..fba5688cf3d0d 100644 --- a/tools/internal_ci/macos/grpc_objc_bazel_test.sh +++ b/tools/internal_ci/macos/grpc_objc_bazel_test.sh @@ -131,6 +131,7 @@ EVENT_ENGINE_TEST_TARGETS=( //src/objective-c/tests:MacTests //src/objective-c/tests:UnitTests //src/objective-c/tests:EventEngineUnitTests + //src/objective-c/tests:CFStreamTests //src/objective-c/tests:tvtests_build_test ) @@ -143,7 +144,7 @@ objc_event_engine_bazel_tests/bazel_wrapper \ "${BAZEL_REMOTE_CACHE_ARGS[@]}" \ $BAZEL_FLAGS \ --test_env=GRPC_EXPERIMENTS=event_engine_client \ - --test_env=GRPC_VERBOSITY=debug --test_env=GRPC_TRACE=event_engine,api \ + --test_env=GRPC_VERBOSITY=debug --test_env=GRPC_TRACE=event_engine*,api \ "${OBJC_TEST_ENV_ARGS[@]}" \ -- \ "${EXAMPLE_TARGETS[@]}" \ From ad8fcaa20bc2a43057cd2addfb8cc61a6827eb2d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 17 Jan 2025 13:51:49 -0800 Subject: [PATCH 2/7] [CallTracer] Add hint on whether inbound message is compressed (#38485) Needed for https://github.com/grpc/proposal/blob/master/A72-open-telemetry-tracing.md Closes #38485 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38485 from yashykt:AddCompressedHintToRecordReceivedMessage a468e0a5387b35460620fb04dff8db70cea28f3d PiperOrigin-RevId: 716788992 --- BUILD | 1 + .../message_compress/compression_filter.cc | 8 +++--- .../chttp2/transport/call_tracer_wrapper.h | 8 +++--- src/core/telemetry/call_tracer.cc | 16 +++++------ src/core/telemetry/call_tracer.h | 9 ++++--- src/cpp/ext/filters/census/client_filter.cc | 26 +++++++++--------- .../filters/census/open_census_call_tracer.h | 9 +++---- .../ext/filters/census/server_call_tracer.cc | 27 ++++++++++--------- src/cpp/ext/otel/otel_client_call_tracer.cc | 26 +++++++++--------- src/cpp/ext/otel/otel_client_call_tracer.h | 9 +++---- src/cpp/ext/otel/otel_server_call_tracer.h | 27 ++++++++++--------- .../grpc_observability/client_call_tracer.cc | 4 +-- .../grpc_observability/client_call_tracer.h | 9 +++---- .../grpc_observability/server_call_tracer.cc | 26 +++++++++--------- .../grpc_observability/server_call_tracer.h | 9 +++---- test/core/end2end/tests/http2_stats.cc | 16 +++++------ test/core/test_util/fake_stats_plugin.h | 16 +++++------ .../transport/chttp2/hpack_encoder_test.cc | 8 +++--- test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 8 +++--- 19 files changed, 134 insertions(+), 128 deletions(-) diff --git a/BUILD b/BUILD index 9949296ccfb7e..564208a2f294b 100644 --- a/BUILD +++ b/BUILD @@ -1615,6 +1615,7 @@ grpc_cc_library( "//src/core:channel_args", "//src/core:context", "//src/core:error", + "//src/core:message", "//src/core:metadata_batch", "//src/core:ref_counted_string", "//src/core:slice_buffer", diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index 1f721e29b695c..a752ea76b3fa4 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -107,7 +107,7 @@ MessageHandle ChannelCompression::CompressMessage( << " alg=" << algorithm << " flags=" << message->flags(); auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { - call_tracer->RecordSendMessage(*message->payload()); + call_tracer->RecordSendMessage(*message); } // Check if we're allowed to compress this message // (apps might want to disable compression for certain messages to avoid @@ -140,7 +140,7 @@ MessageHandle ChannelCompression::CompressMessage( tmp.Swap(payload); flags |= GRPC_WRITE_INTERNAL_COMPRESS; if (call_tracer != nullptr) { - call_tracer->RecordSendCompressedMessage(*message->payload()); + call_tracer->RecordSendCompressedMessage(*message); } } else { if (GRPC_TRACE_FLAG_ENABLED(compression)) { @@ -162,7 +162,7 @@ absl::StatusOr ChannelCompression::DecompressMessage( << " alg=" << args.algorithm; auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { - call_tracer->RecordReceivedMessage(*message->payload()); + call_tracer->RecordReceivedMessage(*message); } // Check max message length. if (args.max_recv_message_length.has_value() && @@ -192,7 +192,7 @@ absl::StatusOr ChannelCompression::DecompressMessage( message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; if (call_tracer != nullptr) { - call_tracer->RecordReceivedDecompressedMessage(*message->payload()); + call_tracer->RecordReceivedDecompressedMessage(*message); } return std::move(message); } diff --git a/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h b/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h index cf535afc6cdf6..d1c1a99df5d22 100644 --- a/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h +++ b/src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h @@ -45,14 +45,14 @@ class Chttp2CallTracerWrapper final : public CallTracerInterface { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/src/core/telemetry/call_tracer.cc b/src/core/telemetry/call_tracer.cc index 80191bcbbf3f4..49467582cb115 100644 --- a/src/core/telemetry/call_tracer.cc +++ b/src/core/telemetry/call_tracer.cc @@ -101,13 +101,13 @@ class DelegatingClientCallTracer : public ClientCallTracer { tracer->RecordSendTrailingMetadata(send_trailing_metadata); } } - void RecordSendMessage(const SliceBuffer& send_message) override { + void RecordSendMessage(const Message& send_message) override { for (auto* tracer : tracers_) { tracer->RecordSendMessage(send_message); } } void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override { + const Message& send_compressed_message) override { for (auto* tracer : tracers_) { tracer->RecordSendCompressedMessage(send_compressed_message); } @@ -118,13 +118,13 @@ class DelegatingClientCallTracer : public ClientCallTracer { tracer->RecordReceivedInitialMetadata(recv_initial_metadata); } } - void RecordReceivedMessage(const SliceBuffer& recv_message) override { + void RecordReceivedMessage(const Message& recv_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedMessage(recv_message); } } void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override { + const Message& recv_decompressed_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedDecompressedMessage(recv_decompressed_message); } @@ -247,13 +247,13 @@ class DelegatingServerCallTracer : public ServerCallTracer { tracer->RecordSendTrailingMetadata(send_trailing_metadata); } } - void RecordSendMessage(const SliceBuffer& send_message) override { + void RecordSendMessage(const Message& send_message) override { for (auto* tracer : tracers_) { tracer->RecordSendMessage(send_message); } } void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override { + const Message& send_compressed_message) override { for (auto* tracer : tracers_) { tracer->RecordSendCompressedMessage(send_compressed_message); } @@ -264,13 +264,13 @@ class DelegatingServerCallTracer : public ServerCallTracer { tracer->RecordReceivedInitialMetadata(recv_initial_metadata); } } - void RecordReceivedMessage(const SliceBuffer& recv_message) override { + void RecordReceivedMessage(const Message& recv_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedMessage(recv_message); } } void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override { + const Message& recv_decompressed_message) override { for (auto* tracer : tracers_) { tracer->RecordReceivedDecompressedMessage(recv_decompressed_message); } diff --git a/src/core/telemetry/call_tracer.h b/src/core/telemetry/call_tracer.h index 427b958425a8f..13402d839eb4f 100644 --- a/src/core/telemetry/call_tracer.h +++ b/src/core/telemetry/call_tracer.h @@ -34,6 +34,7 @@ #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/call_final_info.h" +#include "src/core/lib/transport/message.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/telemetry/tcp_tracer.h" #include "src/core/util/ref_counted_string.h" @@ -95,19 +96,19 @@ class CallTracerInterface : public CallTracerAnnotationInterface { grpc_metadata_batch* send_initial_metadata) = 0; virtual void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const SliceBuffer& send_message) = 0; + virtual void RecordSendMessage(const Message& send_message) = 0; // Only invoked if message was actually compressed. virtual void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) = 0; + const Message& send_compressed_message) = 0; // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` // methods should only be invoked when the metadata/message was // successfully received, i.e., without any error. virtual void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) = 0; - virtual void RecordReceivedMessage(const SliceBuffer& recv_message) = 0; + virtual void RecordReceivedMessage(const Message& recv_message) = 0; // Only invoked if message was actually decompressed. virtual void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) = 0; + const Message& recv_decompressed_message) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0; struct TransportByteSize { diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index c8e25dec01560..f43efaac9f1d4 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -154,31 +154,33 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage( - const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } namespace { diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 6b50bc0c756cd..daa9fc47fd191 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -83,15 +83,14 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc index c685806d05b04..14d7c3ee57b7e 100644 --- a/src/cpp/ext/filters/census/server_call_tracer.cc +++ b/src/cpp/ext/filters/census/server_call_tracer.cc @@ -118,30 +118,31 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + void RecordSendMessage(const grpc_core::Message& send_message) override { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) override { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) override { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + void RecordReceivedMessage(const grpc_core::Message& recv_message) override { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) override { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void RecordReceivedTrailingMetadata( diff --git a/src/cpp/ext/otel/otel_client_call_tracer.cc b/src/cpp/ext/otel/otel_client_call_tracer.cc index c1ee654ef1ed9..facf1f4a8447f 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.cc +++ b/src/cpp/ext/otel/otel_client_call_tracer.cc @@ -111,29 +111,31 @@ void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: - RecordSendMessage(const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + RecordSendMessage(const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: - RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + RecordReceivedMessage(const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer:: diff --git a/src/cpp/ext/otel/otel_client_call_tracer.h b/src/cpp/ext/otel/otel_client_call_tracer.h index e489c7fa5f376..145a471877642 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.h +++ b/src/cpp/ext/otel/otel_client_call_tracer.h @@ -72,15 +72,14 @@ class OpenTelemetryPluginImpl::ClientCallTracer grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h index b4f32f6ae848a..cd3cb1325ce1e 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.h +++ b/src/cpp/ext/otel/otel_server_call_tracer.h @@ -66,28 +66,29 @@ class OpenTelemetryPluginImpl::ServerCallTracer void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + void RecordSendMessage(const grpc_core::Message& send_message) override { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); } void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) override { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) override { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + void RecordReceivedMessage(const grpc_core::Message& recv_message) override { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); } void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) override { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void RecordReceivedTrailingMetadata( diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc index 66b83488752e6..35644d59d630e 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -201,12 +201,12 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: } void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: - RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) { + RecordSendMessage(const grpc_core::Message& /*send_message*/) { ++sent_message_count_; } void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: - RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) { + RecordReceivedMessage(const grpc_core::Message& /*recv_message*/) { ++recv_message_count_; } diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h index 34d7196ba749f..b059bcb6310b8 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -55,16 +55,15 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage( - const grpc_core::SliceBuffer& /*send_message*/) override; + void RecordSendMessage(const grpc_core::Message& /*send_message*/) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} + const grpc_core::Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override; void RecordReceivedMessage( - const grpc_core::SliceBuffer& /*recv_message*/) override; + const grpc_core::Message& /*recv_message*/) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + const grpc_core::Message& /*recv_decompressed_message*/) override {} void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc index 4695758b908ad..93a602bcf58b5 100644 --- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc @@ -141,29 +141,31 @@ void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata( } void PythonOpenCensusServerCallTracer::RecordSendMessage( - const grpc_core::SliceBuffer& send_message) { - RecordAnnotation( - absl::StrFormat("Send message: %ld bytes", send_message.Length())); + const grpc_core::Message& send_message) { + RecordAnnotation(absl::StrFormat("Send message: %ld bytes", + send_message.payload()->Length())); ++sent_message_count_; } void PythonOpenCensusServerCallTracer::RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) { - RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", - send_compressed_message.Length())); + const grpc_core::Message& send_compressed_message) { + RecordAnnotation( + absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.payload()->Length())); } void PythonOpenCensusServerCallTracer::RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) { - RecordAnnotation( - absl::StrFormat("Received message: %ld bytes", recv_message.Length())); + const grpc_core::Message& recv_message) { + RecordAnnotation(absl::StrFormat("Received message: %ld bytes", + recv_message.payload()->Length())); ++recv_message_count_; } void PythonOpenCensusServerCallTracer::RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) { - RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", - recv_decompressed_message.Length())); + const grpc_core::Message& recv_decompressed_message) { + RecordAnnotation( + absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.payload()->Length())); } void PythonOpenCensusServerCallTracer::RecordCancel( diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h index ccb4a21bd3bce..15b2128d457cc 100644 --- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h @@ -82,19 +82,18 @@ class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override; - void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; + void RecordSendMessage(const grpc_core::Message& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& send_compressed_message) override; + const grpc_core::Message& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; - void RecordReceivedMessage( - const grpc_core::SliceBuffer& recv_message) override; + void RecordReceivedMessage(const grpc_core::Message& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& recv_decompressed_message) override; + const grpc_core::Message& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( grpc_metadata_batch* /*recv_trailing_metadata*/) override {} diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index 2cd3649117b4e..2be4102eed938 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -73,14 +73,14 @@ class FakeCallTracer : public ClientCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordReceivedTrailingMetadata( absl::Status /*status*/, @@ -173,14 +173,14 @@ class FakeServerCallTracer : public ServerCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/test/core/test_util/fake_stats_plugin.h b/test/core/test_util/fake_stats_plugin.h index ee293bc9acf02..8e64c57f49160 100644 --- a/test/core/test_util/fake_stats_plugin.h +++ b/test/core/test_util/fake_stats_plugin.h @@ -71,14 +71,14 @@ class FakeClientCallTracer : public ClientCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} void RecordReceivedTrailingMetadata( absl::Status /*status*/, @@ -171,14 +171,14 @@ class FakeServerCallTracer : public ServerCallTracer { grpc_metadata_batch* /*send_initial_metadata*/) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage(const SliceBuffer& /*send_message*/) override {} + void RecordSendMessage(const Message& /*send_message*/) override {} void RecordSendCompressedMessage( - const SliceBuffer& /*send_compressed_message*/) override {} + const Message& /*send_compressed_message*/) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} - void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {} + void RecordReceivedMessage(const Message& /*recv_message*/) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& /*recv_decompressed_message*/) override {} + const Message& /*recv_decompressed_message*/) override {} void RecordCancel(grpc_error_handle /*cancel_error*/) override {} void RecordReceivedTrailingMetadata( grpc_metadata_batch* /*recv_trailing_metadata*/) override {} diff --git a/test/core/transport/chttp2/hpack_encoder_test.cc b/test/core/transport/chttp2/hpack_encoder_test.cc index 947161ecf7583..89f916e6c5d4a 100644 --- a/test/core/transport/chttp2/hpack_encoder_test.cc +++ b/test/core/transport/chttp2/hpack_encoder_test.cc @@ -158,14 +158,14 @@ class FakeCallTracer final : public CallTracerInterface { grpc_metadata_batch* send_initial_metadata) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override {} - void RecordSendMessage(const SliceBuffer& send_message) override {} + void RecordSendMessage(const Message& send_message) override {} void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override {} + const Message& send_compressed_message) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override {} - void RecordReceivedMessage(const SliceBuffer& recv_message) override {} + void RecordReceivedMessage(const Message& recv_message) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override {} + const Message& recv_decompressed_message) override {} void RecordCancel(grpc_error_handle cancel_error) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 37006f4eede7a..dfe50ed4c8d07 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -63,14 +63,14 @@ class FakeCallTracer final : public CallTracerInterface { grpc_metadata_batch* send_initial_metadata) override {} void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override {} - void RecordSendMessage(const SliceBuffer& send_message) override {} + void RecordSendMessage(const Message& send_message) override {} void RecordSendCompressedMessage( - const SliceBuffer& send_compressed_message) override {} + const Message& send_compressed_message) override {} void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override {} - void RecordReceivedMessage(const SliceBuffer& recv_message) override {} + void RecordReceivedMessage(const Message& recv_message) override {} void RecordReceivedDecompressedMessage( - const SliceBuffer& recv_decompressed_message) override {} + const Message& recv_decompressed_message) override {} void RecordCancel(grpc_error_handle cancel_error) override {} std::shared_ptr StartNewTcpTrace() override { return nullptr; From 62d2909098c20c34390568e38c5ac6704796c0ad Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Jan 2025 13:54:10 -0800 Subject: [PATCH 3/7] Fill in some missing filter signatures Make a slice buffer function take a const where it should PiperOrigin-RevId: 716789806 --- src/core/lib/channel/promise_based_filter.h | 23 +++++++++++++++++++ src/core/lib/slice/slice_buffer.cc | 4 ++-- src/core/lib/slice/slice_buffer.h | 4 ++-- src/core/lib/transport/call_filters.h | 25 +++++++++++++++++++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 3fd30764c308a..4622e02f2aa94 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -511,6 +511,17 @@ inline auto InterceptClientToServerMessageHandler( }; } +template +inline auto InterceptClientToServerMessageHandler( + void (Derived::Call::*fn)(const Message&, Derived*), + FilterCallData* call_data, const CallArgs&) { + DCHECK(fn == &Derived::Call::OnClientToServerMessage); + return [call_data](MessageHandle msg) -> std::optional { + call_data->call.OnClientToServerMessage(*msg, call_data->channel); + return std::move(msg); + }; +} + template inline auto InterceptClientToServerMessageHandler( MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), @@ -655,6 +666,18 @@ inline void InterceptServerToClientMessage( }); } +template +inline void InterceptServerToClientMessage( + void (Derived::Call::*fn)(const Message&, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + DCHECK(fn == &Derived::Call::OnServerToClientMessage); + call_args.server_to_client_messages->InterceptAndMap( + [call_data](MessageHandle msg) -> std::optional { + call_data->call.OnServerToClientMessage(*msg, call_data->channel); + return std::move(msg); + }); +} + template inline void InterceptServerToClientMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*), diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 7bb88f2d549aa..9b6b3a3f69b39 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -421,8 +421,8 @@ void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer* src, size_t n, } } -void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n, - void* dst) { +void grpc_slice_buffer_copy_first_into_buffer(const grpc_slice_buffer* src, + size_t n, void* dst) { uint8_t* dstp = static_cast(dst); CHECK(src->length >= n); diff --git a/src/core/lib/slice/slice_buffer.h b/src/core/lib/slice/slice_buffer.h index ab6c19d4f55f8..2cca969930b98 100644 --- a/src/core/lib/slice/slice_buffer.h +++ b/src/core/lib/slice/slice_buffer.h @@ -27,8 +27,8 @@ #include "src/core/lib/slice/slice.h" // Copy the first n bytes of src into memory pointed to by dst. -void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n, - void* dst); +void grpc_slice_buffer_copy_first_into_buffer(const grpc_slice_buffer* src, + size_t n, void* dst); void grpc_slice_buffer_move_first_no_inline(grpc_slice_buffer* src, size_t n, grpc_slice_buffer* dst); diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index b4b5d96773812..733382e86e15e 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -540,6 +540,31 @@ struct AddOpImpl< } }; +// void $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*) +template +struct AddOpImpl { + static void Add(FilterType* channel_data, size_t call_offset, Layout& to) { + to.Add(0, 0, + Operator{ + channel_data, + call_offset, + [](void*, void* call_data, void* channel_data, + T value) -> Poll> { + (static_cast(call_data)->*impl)( + *value, static_cast(channel_data)); + return ResultOr{std::move(value), nullptr}; + }, + nullptr, + nullptr, + }); + } +}; + // $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) template From 0f3d88ae5cfd5c05e0ee196e11d365b1739f09db Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Jan 2025 16:28:56 -0800 Subject: [PATCH 4/7] [promises] Optimize Map(Map(Promise, Fn0), Fn1) --> Map(Promise, Fn1 . Fn0) (#38449) Our current promise `Map` function needs to check promise completion prior to executing the map, however we now have situations where the promise passed to `Map` is a `Map` itself (indeed we have nesting at least three levels deep in places!) We can optimize this slightly by flattening the map and composing the map functions, so that we don't need to check the finished edge at every layer. Closes #38449 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38449 from ctiller:mappy 22f7f14af43642eea9699a273d5468f68eb3fe21 PiperOrigin-RevId: 716833799 --- src/core/lib/promise/detail/promise_like.h | 8 ++ src/core/lib/promise/map.h | 136 +++++++++++++++------ src/core/lib/promise/poll.h | 3 + 3 files changed, 113 insertions(+), 34 deletions(-) diff --git a/src/core/lib/promise/detail/promise_like.h b/src/core/lib/promise/detail/promise_like.h index 18e9fa80b5b07..9e2a0a685737b 100644 --- a/src/core/lib/promise/detail/promise_like.h +++ b/src/core/lib/promise/detail/promise_like.h @@ -92,6 +92,10 @@ class PromiseLike< -> decltype(WrapInPoll(f_())) { return WrapInPoll(f_()); } + PromiseLike(const PromiseLike&) = default; + PromiseLike& operator=(const PromiseLike&) = default; + PromiseLike(PromiseLike&&) = default; + PromiseLike& operator=(PromiseLike&&) = default; using Result = typename PollTraits::Type; }; @@ -109,6 +113,10 @@ class PromiseLike< f_(); return Empty{}; } + PromiseLike(const PromiseLike&) = default; + PromiseLike& operator=(const PromiseLike&) = default; + PromiseLike(PromiseLike&&) = default; + PromiseLike& operator=(PromiseLike&&) = default; using Result = Empty; }; diff --git a/src/core/lib/promise/map.h b/src/core/lib/promise/map.h index d60ff5b46e60e..552d3c0d38dff 100644 --- a/src/core/lib/promise/map.h +++ b/src/core/lib/promise/map.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "absl/status/status.h" @@ -31,16 +32,82 @@ namespace grpc_core { namespace promise_detail { -// Implementation of mapping combinator - use this via the free function below! -// Promise is the type of promise to poll on, Fn is a function that takes the -// result of Promise and maps it to some new type. -template -class Map; +template +class WrappedFn; +template +class WrappedFn< + Fn, Arg, std::enable_if_t>>> { + public: + using Result = RemoveCVRef>; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit WrappedFn(Fn&& fn) + : fn_(std::move(fn)) {} + WrappedFn(const WrappedFn&) = delete; + WrappedFn& operator=(const WrappedFn&) = delete; + WrappedFn(WrappedFn&&) = default; + WrappedFn& operator=(WrappedFn&&) = default; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Result operator()(Arg&& arg) { + return fn_(std::forward(arg)); + } + + private: + GPR_NO_UNIQUE_ADDRESS Fn fn_; +}; + +template +class WrappedFn< + Fn, Arg, std::enable_if_t>>> { + public: + using Result = Empty; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit WrappedFn(Fn&& fn) + : fn_(std::move(fn)) {} + WrappedFn(const WrappedFn&) = delete; + WrappedFn& operator=(const WrappedFn&) = delete; + WrappedFn(WrappedFn&&) = default; + WrappedFn& operator=(WrappedFn&&) = default; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Empty operator()(Arg&& arg) { + fn_(std::forward(arg)); + return Empty{}; + } + + private: + GPR_NO_UNIQUE_ADDRESS Fn fn_; +}; + +template +class FusedFns { + using InnerResult = + decltype(std::declval()(std::declval())); + + public: + using Result = typename WrappedFn::Result; + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION FusedFns(Fn0 fn0, Fn1 fn1) + : fn0_(std::move(fn0)), fn1_(std::move(fn1)) {} + FusedFns(const FusedFns&) = delete; + FusedFns& operator=(const FusedFns&) = delete; + FusedFns(FusedFns&&) = default; + FusedFns& operator=(FusedFns&&) = default; + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Result operator()(PromiseResult arg) { + InnerResult inner_result = fn0_(std::move(arg)); + return fn1_(std::move(inner_result)); + } + + private: + GPR_NO_UNIQUE_ADDRESS Fn0 fn0_; + GPR_NO_UNIQUE_ADDRESS WrappedFn fn1_; +}; + +} // namespace promise_detail + +// Mapping combinator. +// Takes a promise, and a synchronous function to mutate its result, and +// returns a promise. template -class Map::Result>>::value>> { +class Map { + using PromiseType = promise_detail::PromiseLike; + public: GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Map(Promise promise, Fn fn) : promise_(std::move(promise)), fn_(std::move(fn)) {} @@ -52,9 +119,8 @@ class Map::Result; - using Result = - RemoveCVRef()(std::declval()))>; + using PromiseResult = typename PromiseType::Result; + using Result = typename promise_detail::WrappedFn::Result; GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Poll operator()() { Poll r = promise_(); @@ -65,17 +131,25 @@ class Map promise_; - Fn fn_; + template + friend class Map; + + GPR_NO_UNIQUE_ADDRESS PromiseType promise_; + GPR_NO_UNIQUE_ADDRESS promise_detail::WrappedFn fn_; }; -template -class Map::Result>>::value>> { +template +class Map, Fn1> { + using InnerMapFn = decltype(std::declval>().fn_); + using FusedFn = + promise_detail::FusedFns::PromiseResult, + InnerMapFn, Fn1>; + using PromiseType = typename Map::PromiseType; + public: - GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Map(Promise promise, Fn fn) - : promise_(std::move(promise)), fn_(std::move(fn)) {} + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Map(Map map, Fn1 fn1) + : promise_(std::move(map.promise_)), + fn_(FusedFn(std::move(map.fn_), std::move(fn1))) {} Map(const Map&) = delete; Map& operator=(const Map&) = delete; @@ -84,33 +158,27 @@ class Map::Result; - using Result = Empty; + using PromiseResult = typename Map::PromiseResult; + using Result = typename FusedFn::Result; GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Poll operator()() { Poll r = promise_(); if (auto* p = r.value_if_ready()) { - fn_(std::move(*p)); - return Empty{}; + return fn_(std::move(*p)); } return Pending(); } private: - PromiseLike promise_; - Fn fn_; -}; + template + friend class Map; -} // namespace promise_detail + GPR_NO_UNIQUE_ADDRESS PromiseType promise_; + GPR_NO_UNIQUE_ADDRESS FusedFn fn_; +}; -// Mapping combinator. -// Takes a promise, and a synchronous function to mutate its result, and -// returns a promise. template -GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline promise_detail::Map -Map(Promise promise, Fn fn) { - return promise_detail::Map(std::move(promise), std::move(fn)); -} +Map(Promise, Fn) -> Map; // Maps a promise to a new promise that returns a tuple of the original result // and a bool indicating whether there was ever a Pending{} value observed from diff --git a/src/core/lib/promise/poll.h b/src/core/lib/promise/poll.h index 01c6df53fdca5..b530bf32fd3b5 100644 --- a/src/core/lib/promise/poll.h +++ b/src/core/lib/promise/poll.h @@ -152,6 +152,9 @@ class Poll { }; }; +template +class Poll; + template <> class Poll { public: From 957589f2a8b0e058abf8a4c4c646b7840d1b862b Mon Sep 17 00:00:00 2001 From: Esun Kim Date: Fri, 17 Jan 2025 19:33:50 -0800 Subject: [PATCH 5/7] [CI] Added BAZEL_SH to rbe_windows2019/Dockerfile (#38484) Bazel 8 now requires gRPC to explicitly call `rules_shell_toolchains` when using shell. This was previously handled implicitly by Bazel. Without this explicit step, Bash-based targets will fail on Windows. Partial commit of #38254 Closes #38484 PiperOrigin-RevId: 716878551 --- WORKSPACE | 2 ++ bazel/grpc_deps.bzl | 11 +++++++++++ bazel/grpc_extra_deps.bzl | 4 ++++ .../toolchains/dockerfile/rbe_windows2019/Dockerfile | 4 ++-- .../toolchains/generate_windows_rbe_configs.sh | 2 +- .../toolchains/rbe_windows_vs2022_bazel7/config/BUILD | 2 +- tools/run_tests/sanity/check_bazel_workspace.py | 2 ++ 7 files changed, 23 insertions(+), 4 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 0f6ccf50d53c2..691f983ed728d 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -10,6 +10,8 @@ load("//bazel:grpc_extra_deps.bzl", "grpc_extra_deps") grpc_extra_deps() +# RBE + load("@bazel_toolchains//rules/exec_properties:exec_properties.bzl", "create_rbe_exec_properties_dict", "custom_exec_properties") custom_exec_properties( diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl index dcb1a7161df7d..a9354b97acebf 100644 --- a/bazel/grpc_deps.bzl +++ b/bazel/grpc_deps.bzl @@ -237,6 +237,17 @@ def grpc_deps(): ], ) + if "rules_shell" not in native.existing_rules(): + http_archive( + name = "rules_shell", + sha256 = "d8cd4a3a91fc1dc68d4c7d6b655f09def109f7186437e3f50a9b60ab436a0c53", + strip_prefix = "rules_shell-0.3.0", + urls = [ + "https://storage.googleapis.com/grpc-bazel-mirror/github.com/bazelbuild/rules_shell/releases/download/v0.3.0/rules_shell-v0.3.0.tar.gz", + "https://github.com/bazelbuild/rules_shell/releases/download/v0.3.0/rules_shell-v0.3.0.tar.gz", + ], + ) + if "io_bazel_rules_go" not in native.existing_rules(): http_archive( name = "io_bazel_rules_go", diff --git a/bazel/grpc_extra_deps.bzl b/bazel/grpc_extra_deps.bzl index 6ac6093a5fe78..3417c34473816 100644 --- a/bazel/grpc_extra_deps.bzl +++ b/bazel/grpc_extra_deps.bzl @@ -25,6 +25,7 @@ load("@google_cloud_cpp//bazel:google_cloud_cpp_deps.bzl", "google_cloud_cpp_dep load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies") load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies") load("@rules_python//python:repositories.bzl", "py_repositories") +load("@rules_shell//shell:repositories.bzl", "rules_shell_dependencies", "rules_shell_toolchains") def grpc_extra_deps(ignore_version_differences = False): """Loads the extra dependencies. @@ -49,6 +50,9 @@ def grpc_extra_deps(ignore_version_differences = False): ignore_version_differences: Plumbed directly to the invocation of apple_rules_dependencies. """ + rules_shell_dependencies() + rules_shell_toolchains() + protobuf_deps() rules_proto_dependencies() diff --git a/third_party/toolchains/dockerfile/rbe_windows2019/Dockerfile b/third_party/toolchains/dockerfile/rbe_windows2019/Dockerfile index 8b88954d8b230..777edf2883747 100644 --- a/third_party/toolchains/dockerfile/rbe_windows2019/Dockerfile +++ b/third_party/toolchains/dockerfile/rbe_windows2019/Dockerfile @@ -31,7 +31,6 @@ RUN New-Item -Path "C:/" -Name "TEMP" -ItemType "directory"; \ -OutFile C:/TEMP/vc_redist.x64.exe -UseBasicParsing; \ Start-Process -filepath C:/TEMP/vc_redist.x64.exe -ArgumentList '/install', '/passive', '/norestart' -Wait; \ Remove-Item C:/TEMP/vc_redist.x64.exe - # Install Visual Studio 2022 Build Tools. RUN Invoke-WebRequest "https://aka.ms/vs/17/release/vs_buildtools.exe" \ -OutFile C:/TEMP/vs_buildtools.exe -UseBasicParsing; \ @@ -53,7 +52,8 @@ RUN [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tl msys 'pacman --noconfirm -Syy zstd'; \ msys 'pacman --noconfirm -Syy git curl zip unzip'; \ $old_path = [Environment]::GetEnvironmentVariable(\"PATH\", \"Machine\"); \ - [Environment]::SetEnvironmentVariable(\"PATH\", $old_path + \";C:\msys64;C:\msys64\usr\bin\", \"Machine\"); + [Environment]::SetEnvironmentVariable(\"PATH\", $old_path + \";C:\msys64;C:\msys64\usr\bin\", \"Machine\"); \ + [Environment]::SetEnvironmentVariable(\"BAZEL_SH\", \"C:\msys64\usr\bin\bash.exe\", \"Machine\") # Install Python 3. RUN Invoke-WebRequest "https://www.python.org/ftp/python/3.10.4/python-3.10.4-amd64.exe" \ -OutFile C:/TEMP/python_install.exe -UseBasicParsing; \ diff --git a/third_party/toolchains/generate_windows_rbe_configs.sh b/third_party/toolchains/generate_windows_rbe_configs.sh index 31a4d915665a9..5ac540ea8d8e4 100755 --- a/third_party/toolchains/generate_windows_rbe_configs.sh +++ b/third_party/toolchains/generate_windows_rbe_configs.sh @@ -28,7 +28,7 @@ wget https://github.com/bazelbuild/bazel-toolchains/releases/download/v5.1.2/rbe RBE_CONFIGS_GEN_TOOL_PATH="./rbe_configs_gen_windows_amd64.exe" # Actions on RBE will run under a dedicated docker image. -WINDOWS_RBE_DOCKER_IMAGE=us-docker.pkg.dev/grpc-testing/testing-images-public/rbe_windows2019@sha256:5a97eb384a3089ac9180e6086ca89b1fdafa57735057624245b3d4a96b4744fe +WINDOWS_RBE_DOCKER_IMAGE=us-docker.pkg.dev/grpc-testing/testing-images-public/rbe_windows2019@sha256:cfef0ae3681d4070f6a93a88afea44d616f5cbdfae36559b9394274e74873bc6 # Bazel version used for configuring # Needs to be one of the versions from bazel/supported_versions.txt chosen so that the result is compatible diff --git a/third_party/toolchains/rbe_windows_vs2022_bazel7/config/BUILD b/third_party/toolchains/rbe_windows_vs2022_bazel7/config/BUILD index 7c0f6165d6a48..e194c7616db96 100755 --- a/third_party/toolchains/rbe_windows_vs2022_bazel7/config/BUILD +++ b/third_party/toolchains/rbe_windows_vs2022_bazel7/config/BUILD @@ -40,7 +40,7 @@ platform( "@platforms//cpu:x86_64", ], exec_properties = { - "container-image": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/rbe_windows2019@sha256:5a97eb384a3089ac9180e6086ca89b1fdafa57735057624245b3d4a96b4744fe", + "container-image": "docker://us-docker.pkg.dev/grpc-testing/testing-images-public/rbe_windows2019@sha256:cfef0ae3681d4070f6a93a88afea44d616f5cbdfae36559b9394274e74873bc6", "OSFamily": "Windows", }, ) diff --git a/tools/run_tests/sanity/check_bazel_workspace.py b/tools/run_tests/sanity/check_bazel_workspace.py index e6ebd880cf102..cdcfed0f9bbad 100755 --- a/tools/run_tests/sanity/check_bazel_workspace.py +++ b/tools/run_tests/sanity/check_bazel_workspace.py @@ -81,6 +81,7 @@ "com_google_libprotobuf_mutator", "com_github_cncf_xds", "google_cloud_cpp", + "rules_shell", ] _GRPC_BAZEL_ONLY_DEPS = [ @@ -109,6 +110,7 @@ "com_google_googleapis", "com_google_libprotobuf_mutator", "google_cloud_cpp", + "rules_shell", ] From 9a6bcd4c2f2913c1bfe8dccf9e536d8f53c360c2 Mon Sep 17 00:00:00 2001 From: Hannah Shi Date: Fri, 17 Jan 2025 20:45:16 -0800 Subject: [PATCH 6/7] [ObjC] build universal binary for grpc_proto_plugin on mac (#38378) https://github.com/grpc/grpc/issues/36801 ``` file bazel-bin/src/compiler/grpc_objective_c_plugin bazel-bin/src/compiler/grpc_objective_c_plugin: Mach-O universal binary with 2 architectures: [x86_64:Mach-O 64-bit executable x86_64] [arm64] bazel-bin/src/compiler/grpc_objective_c_plugin (for architecture x86_64): Mach-O 64-bit executable x86_64 bazel-bin/src/compiler/grpc_objective_c_plugin (for architecture arm64): Mach-O 64-bit executable arm64 ``` Closes #38378 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38378 from HannahShiSFB:build-universal-protoc-plugin-2 95335565637d25873c0c41c806c88bee1918a0c4 PiperOrigin-RevId: 716892888 --- bazel/grpc_build_system.bzl | 17 ++++++++++++++++- .../buildgen/extract_metadata_from_bazel_xml.py | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index 36fc71a295680..4f39602d3bd2d 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -27,6 +27,7 @@ Contains macros used throughout the repo. """ +load("@build_bazel_apple_support//rules:universal_binary.bzl", "universal_binary") load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test") load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner") load("@com_google_protobuf//bazel:cc_proto_library.bzl", "cc_proto_library") @@ -238,10 +239,24 @@ def grpc_cc_library( def grpc_proto_plugin(name, srcs = [], deps = []): native.cc_binary( - name = name, + name = name + "_native", srcs = srcs, deps = deps, ) + universal_binary( + name = name + "_universal", + binary = name + "_native", + ) + native.genrule( + name = name, + srcs = select({ + "@platforms//os:macos": [name + "_universal"], + "//conditions:default": [name + "_native"], + }), + outs = [name], + cmd = "cp $< $@", + executable = True, + ) def grpc_internal_proto_library( name, diff --git a/tools/buildgen/extract_metadata_from_bazel_xml.py b/tools/buildgen/extract_metadata_from_bazel_xml.py index 21b4da8bafe12..a9fa9cba24816 100755 --- a/tools/buildgen/extract_metadata_from_bazel_xml.py +++ b/tools/buildgen/extract_metadata_from_bazel_xml.py @@ -193,6 +193,7 @@ def _extract_rules_from_bazel_xml(xml_tree): "upb_proto_reflection_library", "alias", "bind", + "genrule", ]: if rule_name in result: raise Exception("Rule %s already present" % rule_name) From 3bffd176be7225fc09d811f24d08b95895bc30cd Mon Sep 17 00:00:00 2001 From: Tanvi Jagtap <139093547+tanvi-jagtap@users.noreply.github.com> Date: Sat, 18 Jan 2025 18:36:52 -0800 Subject: [PATCH 7/7] [PH2][BUILD] Moving to the right BUILD file (#38489) [PH2][BUILD] Moving to the right BUILD file Closes #38489 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38489 from tanvi-jagtap:ph2_move_build_file 5cc29809f54654ec70c3f655cd191bd31b8e7388 PiperOrigin-RevId: 717112275 --- BUILD | 36 -------------------------------- src/core/BUILD | 36 ++++++++++++++++++++++++++++++++ test/core/transport/chttp2/BUILD | 4 ++-- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/BUILD b/BUILD index 564208a2f294b..4d48011ed6586 100644 --- a/BUILD +++ b/BUILD @@ -4669,42 +4669,6 @@ grpc_cc_library( deps = ["gpr"], ) -grpc_cc_library( - name = "grpc_http2_client_transport", - srcs = [ - "//src/core:ext/transport/chttp2/transport/http2_client_transport.cc", - ], - hdrs = [ - "//src/core:ext/transport/chttp2/transport/http2_client_transport.h", - ], - external_deps = [], - language = "c++", - deps = [ - "grpc_base", - "hpack_encoder", - "hpack_parser", - "//src/core:grpc_promise_endpoint", - ], -) - -grpc_cc_library( - name = "grpc_http2_server_transport", - srcs = [ - "//src/core:ext/transport/chttp2/transport/http2_server_transport.cc", - ], - hdrs = [ - "//src/core:ext/transport/chttp2/transport/http2_server_transport.h", - ], - external_deps = [], - language = "c++", - deps = [ - "grpc_base", - "hpack_encoder", - "hpack_parser", - "//src/core:grpc_promise_endpoint", - ], -) - grpc_cc_library( name = "grpc_transport_chttp2", srcs = [ diff --git a/src/core/BUILD b/src/core/BUILD index 7cbc205ea7102..b6a41c497c012 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7575,6 +7575,42 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "http2_client_transport", + srcs = [ + "ext/transport/chttp2/transport/http2_client_transport.cc", + ], + hdrs = [ + "ext/transport/chttp2/transport/http2_client_transport.h", + ], + external_deps = [], + language = "c++", + deps = [ + "//:grpc_base", + "//:hpack_encoder", + "//:hpack_parser", + "//src/core:grpc_promise_endpoint", + ], +) + +grpc_cc_library( + name = "http2_server_transport", + srcs = [ + "ext/transport/chttp2/transport/http2_server_transport.cc", + ], + hdrs = [ + "ext/transport/chttp2/transport/http2_server_transport.h", + ], + external_deps = [], + language = "c++", + deps = [ + "//:grpc_base", + "//:hpack_encoder", + "//:hpack_parser", + "//src/core:grpc_promise_endpoint", + ], +) + grpc_cc_library( name = "grpc_transport_chttp2_alpn", srcs = [ diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 83b8810b05ca7..b09ce7776bcdb 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -495,7 +495,7 @@ grpc_cc_test( deps = [ "//:gpr", "//:grpc", - "//:grpc_http2_client_transport", + "//src/core:http2_client_transport", "//test/core/test_util:grpc_test_util", "//test/core/test_util:grpc_test_util_base", "//test/core/transport/util:mock_promise_endpoint", @@ -511,7 +511,7 @@ grpc_cc_test( deps = [ "//:gpr", "//:grpc", - "//:grpc_http2_server_transport", + "//src/core:http2_server_transport", "//test/core/test_util:grpc_test_util", "//test/core/test_util:grpc_test_util_base", "//test/core/transport/util:mock_promise_endpoint",