Skip to content

Commit

Permalink
Merge Master
Browse files Browse the repository at this point in the history
  • Loading branch information
tanvi-jagtap committed Jan 20, 2025
2 parents 3f513cf + 3bffd17 commit b618813
Show file tree
Hide file tree
Showing 39 changed files with 381 additions and 211 deletions.
37 changes: 1 addition & 36 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,7 @@ grpc_cc_library(
"//src/core:channel_args",
"//src/core:context",
"//src/core:error",
"//src/core:message",
"//src/core:metadata_batch",
"//src/core:ref_counted_string",
"//src/core:slice_buffer",
Expand Down Expand Up @@ -4668,42 +4669,6 @@ grpc_cc_library(
deps = ["gpr"],
)

grpc_cc_library(
name = "grpc_http2_client_transport",
srcs = [
"//src/core:ext/transport/chttp2/transport/http2_client_transport.cc",
],
hdrs = [
"//src/core:ext/transport/chttp2/transport/http2_client_transport.h",
],
external_deps = [],
language = "c++",
deps = [
"grpc_base",
"hpack_encoder",
"hpack_parser",
"//src/core:grpc_promise_endpoint",
],
)

grpc_cc_library(
name = "grpc_http2_server_transport",
srcs = [
"//src/core:ext/transport/chttp2/transport/http2_server_transport.cc",
],
hdrs = [
"//src/core:ext/transport/chttp2/transport/http2_server_transport.h",
],
external_deps = [],
language = "c++",
deps = [
"grpc_base",
"hpack_encoder",
"hpack_parser",
"//src/core:grpc_promise_endpoint",
],
)

grpc_cc_library(
name = "grpc_transport_chttp2",
srcs = [
Expand Down
2 changes: 2 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ load("//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")

grpc_extra_deps()

# RBE

load("@bazel_toolchains//rules/exec_properties:exec_properties.bzl", "create_rbe_exec_properties_dict", "custom_exec_properties")

custom_exec_properties(
Expand Down
17 changes: 16 additions & 1 deletion bazel/grpc_build_system.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Contains macros used throughout the repo.
"""

load("@build_bazel_apple_support//rules:universal_binary.bzl", "universal_binary")
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test")
load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner")
load("@com_google_protobuf//bazel:cc_proto_library.bzl", "cc_proto_library")
Expand Down Expand Up @@ -238,10 +239,24 @@ def grpc_cc_library(

def grpc_proto_plugin(name, srcs = [], deps = []):
native.cc_binary(
name = name,
name = name + "_native",
srcs = srcs,
deps = deps,
)
universal_binary(
name = name + "_universal",
binary = name + "_native",
)
native.genrule(
name = name,
srcs = select({
"@platforms//os:macos": [name + "_universal"],
"//conditions:default": [name + "_native"],
}),
outs = [name],
cmd = "cp $< $@",
executable = True,
)

def grpc_internal_proto_library(
name,
Expand Down
11 changes: 11 additions & 0 deletions bazel/grpc_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ def grpc_deps():
],
)

if "rules_shell" not in native.existing_rules():
http_archive(
name = "rules_shell",
sha256 = "d8cd4a3a91fc1dc68d4c7d6b655f09def109f7186437e3f50a9b60ab436a0c53",
strip_prefix = "rules_shell-0.3.0",
urls = [
"https://storage.googleapis.com/grpc-bazel-mirror/github.com/bazelbuild/rules_shell/releases/download/v0.3.0/rules_shell-v0.3.0.tar.gz",
"https://github.com/bazelbuild/rules_shell/releases/download/v0.3.0/rules_shell-v0.3.0.tar.gz",
],
)

if "io_bazel_rules_go" not in native.existing_rules():
http_archive(
name = "io_bazel_rules_go",
Expand Down
4 changes: 4 additions & 0 deletions bazel/grpc_extra_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ load("@google_cloud_cpp//bazel:google_cloud_cpp_deps.bzl", "google_cloud_cpp_dep
load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")
load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies")
load("@rules_python//python:repositories.bzl", "py_repositories")
load("@rules_shell//shell:repositories.bzl", "rules_shell_dependencies", "rules_shell_toolchains")

def grpc_extra_deps(ignore_version_differences = False):
"""Loads the extra dependencies.
Expand All @@ -49,6 +50,9 @@ def grpc_extra_deps(ignore_version_differences = False):
ignore_version_differences: Plumbed directly to the invocation of
apple_rules_dependencies.
"""
rules_shell_dependencies()
rules_shell_toolchains()

protobuf_deps()

rules_proto_dependencies()
Expand Down
36 changes: 36 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7575,6 +7575,42 @@ grpc_cc_library(
],
)

grpc_cc_library(
name = "http2_client_transport",
srcs = [
"ext/transport/chttp2/transport/http2_client_transport.cc",
],
hdrs = [
"ext/transport/chttp2/transport/http2_client_transport.h",
],
external_deps = [],
language = "c++",
deps = [
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//src/core:grpc_promise_endpoint",
],
)

grpc_cc_library(
name = "http2_server_transport",
srcs = [
"ext/transport/chttp2/transport/http2_server_transport.cc",
],
hdrs = [
"ext/transport/chttp2/transport/http2_server_transport.h",
],
external_deps = [],
language = "c++",
deps = [
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//src/core:grpc_promise_endpoint",
],
)

grpc_cc_library(
name = "grpc_transport_chttp2_alpn",
srcs = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ MessageHandle ChannelCompression::CompressMessage(
<< " alg=" << algorithm << " flags=" << message->flags();
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
call_tracer->RecordSendMessage(*message);
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
Expand Down Expand Up @@ -140,7 +140,7 @@ MessageHandle ChannelCompression::CompressMessage(
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
if (call_tracer != nullptr) {
call_tracer->RecordSendCompressedMessage(*message->payload());
call_tracer->RecordSendCompressedMessage(*message);
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(compression)) {
Expand All @@ -162,7 +162,7 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
<< " alg=" << args.algorithm;
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message->payload());
call_tracer->RecordReceivedMessage(*message);
}
// Check max message length.
if (args.max_recv_message_length.has_value() &&
Expand Down Expand Up @@ -192,7 +192,7 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
if (call_tracer != nullptr) {
call_tracer->RecordReceivedDecompressedMessage(*message->payload());
call_tracer->RecordReceivedDecompressedMessage(*message);
}
return std::move(message);
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ class Chttp2CallTracerWrapper final : public CallTracerInterface {
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendMessage(const Message& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
const Message& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedMessage(const Message& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
const Message& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
Expand Down
23 changes: 23 additions & 0 deletions src/core/lib/channel/promise_based_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,17 @@ inline auto InterceptClientToServerMessageHandler(
};
}

template <typename Derived>
inline auto InterceptClientToServerMessageHandler(
void (Derived::Call::*fn)(const Message&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs&) {
DCHECK(fn == &Derived::Call::OnClientToServerMessage);
return [call_data](MessageHandle msg) -> std::optional<MessageHandle> {
call_data->call.OnClientToServerMessage(*msg, call_data->channel);
return std::move(msg);
};
}

template <typename Derived>
inline auto InterceptClientToServerMessageHandler(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
Expand Down Expand Up @@ -655,6 +666,18 @@ inline void InterceptServerToClientMessage(
});
}

template <typename Derived>
inline void InterceptServerToClientMessage(
void (Derived::Call::*fn)(const Message&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
DCHECK(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> std::optional<MessageHandle> {
call_data->call.OnServerToClientMessage(*msg, call_data->channel);
return std::move(msg);
});
}

template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ void CFStreamEndpointImpl::DoRead(
}

buffer->RemoveLastNBytes(buffer->Length() - read_size);
on_read(absl::OkStatus());
on_read(read_size == 0 ? absl::InternalError("Socket closed")
: absl::OkStatus());
}

bool CFStreamEndpointImpl::Write(
Expand Down
8 changes: 8 additions & 0 deletions src/core/lib/promise/detail/promise_like.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class PromiseLike<
-> decltype(WrapInPoll(f_())) {
return WrapInPoll(f_());
}
PromiseLike(const PromiseLike&) = default;
PromiseLike& operator=(const PromiseLike&) = default;
PromiseLike(PromiseLike&&) = default;
PromiseLike& operator=(PromiseLike&&) = default;
using Result = typename PollTraits<decltype(WrapInPoll(f_()))>::Type;
};

Expand All @@ -109,6 +113,10 @@ class PromiseLike<
f_();
return Empty{};
}
PromiseLike(const PromiseLike&) = default;
PromiseLike& operator=(const PromiseLike&) = default;
PromiseLike(PromiseLike&&) = default;
PromiseLike& operator=(PromiseLike&&) = default;
using Result = Empty;
};

Expand Down
Loading

0 comments on commit b618813

Please sign in to comment.