Skip to content

Commit

Permalink
Merge Master
Browse files Browse the repository at this point in the history
  • Loading branch information
tanvi-jagtap committed Jan 24, 2025
2 parents b72317a + ed39aad commit cf66e64
Show file tree
Hide file tree
Showing 73 changed files with 755 additions and 222 deletions.
2 changes: 1 addition & 1 deletion src/core/ext/filters/message_size/message_size_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ServerMetadataHandle CheckPayload(const Message& msg,
<< (is_send ? "send" : "recv") << " len:" << msg.payload()->Length()
<< " max:" << *max_length;
if (msg.payload()->Length() <= *max_length) return nullptr;
return ServerMetadataFromStatus(
return CancelledServerMetadataFromStatus(
GRPC_STATUS_RESOURCE_EXHAUSTED,
absl::StrFormat("%s: %s message larger than max (%u vs. %d)",
is_client ? "CLIENT" : "SERVER",
Expand Down
51 changes: 39 additions & 12 deletions src/core/lib/promise/try_seq.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,45 @@ namespace grpc_core {

namespace promise_detail {

// TrySeq Promise combinator.
//
// Input :
// 1. The TrySeq combinator needs minimum one promise as input.
// 2. The first input to TrySeq combinator is a promise.
// 3. The remaining inputs to TrySeq combinator are Promise Factories (functors
// that return a promise). The input type of the Nth functor should be the
// return value of the (N-1)th promise.
// 4. Functors can return promises with return type any of the following :
// 1. StatusOr<> to signal that a value is fed forward, or Status to
// indicate only success/failure. In the case of returning Status, the
// next functor in the chain takes no arguments.
// 2. StatusFlag and ValueOrStatus can be return types if rich error
// information is not necessar. In this case the next functor in the chain
// takes no arguments.
//
// Return :
// Polling the TrySeq Promise combinator returns Poll<StatusOr<T>> where T is
// the type returned by the last promise in the list of input promises.
//
// Polling the TrySeq combinator works in the following way :
// Run the first promise. If it returns Pending{}, nothing else is executed.
// If the first promise returns a value, pass this result to the second functor,
// and run the returned promise. If it returns Pending{}, nothing else is
// executed. If it returns a value, pass this result to the third functor, and
// run the returned promise. etc. Return the final value.
//
// If any of the promises in the TrySeq chain returns a failure status, TrySeq
// will NOT proceed with the execution of the remaining promises. If you want
// the execution to continue when a failure status is received, use the Seq
// combinator instead.
//
// Promises in the TrySeq combinator are run in order, serially and on the same
// thread.
//
// Example :
// The unit tests (esp ThreeTypedPendingThens) in try_seq_test.cc provide all
// possible permutations of how TrySeq combinator can be used.

template <typename T, typename Ignored = void>
struct TrySeqTraitsWithSfinae {
using UnwrappedType = T;
Expand Down Expand Up @@ -256,18 +295,6 @@ struct TrySeqContainerResultTraits {

} // namespace promise_detail

// Try a sequence of operations.
// * Run the first functor as a promise.
// * Feed its success result into the second functor to create a promise,
// then run that.
// * ...
// * Feed the second-final success result into the final functor to create a
// promise, then run that, with the overall success result being that
// promises success result.
// If any step fails, fail everything.
// Functors can return StatusOr<> to signal that a value is fed forward, or
// Status to indicate only success/failure. In the case of returning Status,
// the construction functors take no arguments.
template <typename F>
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline F TrySeq(F functor) {
return functor;
Expand Down
13 changes: 10 additions & 3 deletions src/core/lib/surface/client_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,16 @@ void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
});
};
});
auto primary_ops = AllOk<StatusFlag>(
TrySeq(std::move(send_message), std::move(send_close_from_client)),
TrySeq(std::move(recv_initial_metadata), std::move(recv_message)));
// We capture 'this' in the op handlers, but the call may be destroyed before
// the party owned by CallInitiator/CallHandler is destroyed -- meaning that
// op callbacks may happen after call destruction if we don't hold a ref.
// We do that via an implicitly captured one in a Map() here so that we don't
// need a ref held per batch operation -- they have the same lifetime always.
auto primary_ops = Map(
AllOk<StatusFlag>(
TrySeq(std::move(send_message), std::move(send_close_from_client)),
TrySeq(std::move(recv_initial_metadata), std::move(recv_message))),
[self = WeakRef()](StatusFlag x) { return x; });
Party::WakeupHold wakeup_hold;
if (const grpc_op* op = op_index.op(GRPC_OP_SEND_INITIAL_METADATA)) {
wakeup_hold = StartCall(*op);
Expand Down
4 changes: 3 additions & 1 deletion src/core/util/uri.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ struct QueryParameterFormatter {

std::string URI::ToString() const {
std::vector<std::string> parts = {PercentEncode(scheme_, IsSchemeChar), ":"};
if (!authority_.empty()) {
// If path starts with '//' we need to encode the authority to ensure that
// we can round-trip the URI through a parse/encode/parse loop.
if (!authority_.empty() || absl::StartsWith(path_, "//")) {
parts.emplace_back("//");
parts.emplace_back(PercentEncode(authority_, IsAuthorityChar));
}
Expand Down
7 changes: 7 additions & 0 deletions src/core/util/uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class URI {
// the wire in an HTTP request.
std::string EncodedPathAndQueryParams() const;

bool operator==(const URI& other) const {
return scheme_ == other.scheme_ && authority_ == other.authority_ &&
path_ == other.path_ &&
query_parameter_pairs_ == other.query_parameter_pairs_ &&
fragment_ == other.fragment_;
}

private:
URI(std::string scheme, std::string authority, std::string path,
std::vector<QueryParam> query_parameter_pairs, std::string fragment);
Expand Down
8 changes: 8 additions & 0 deletions test/core/end2end/tests/disappearing_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ static void OneRequestAndShutdownServer(CoreEnd2endTest& test) {
}

CORE_END2END_TEST(CoreClientChannelTest, DisappearingServer) {
// TODO(ctiller): Currently v3 connections are tracked as a set of
// OrphanablePtr<ServerTransport> in the Server class. This allows us to only
// remove and destroy them which means we have no means of sending a goaway
// (and chaotic good anyway doesn't yet support goaways).
// After the `server_listener` experiment is completely rolled out we should
// migrate both v1 server channels and v3 transports to a common data
// structure around LogicalConnection instances. We could then use that
// data structure to broadcast goaways to transports at the appropriate time.
SKIP_IF_V3();
OneRequestAndShutdownServer(*this);
InitServer(ChannelArgs());
Expand Down
6 changes: 0 additions & 6 deletions test/core/end2end/tests/max_message_length.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ void TestMaxMessageLengthOnServerOnResponse(CoreEnd2endTest& test) {
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnClientOnRequestViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 5));
TestMaxMessageLengthOnClientOnRequest(*this);
Expand Down Expand Up @@ -182,7 +181,6 @@ CORE_END2END_TEST(
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnServerOnRequestViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
TestMaxMessageLengthOnServerOnRequest(*this);
Expand All @@ -191,7 +189,6 @@ CORE_END2END_TEST(CoreEnd2endTest,
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnClientOnResponseViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
TestMaxMessageLengthOnClientOnResponse(*this);
Expand Down Expand Up @@ -238,15 +235,13 @@ CORE_END2END_TEST(
CORE_END2END_TEST(CoreEnd2endTest,
MaxMessageLengthOnServerOnResponseViaChannelArg) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
TestMaxMessageLengthOnServerOnResponse(*this);
}

CORE_END2END_TEST(Http2Test, MaxMessageLengthOnServerOnRequestWithCompression) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
// Set limit via channel args.
InitServer(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
InitClient(ChannelArgs());
Expand Down Expand Up @@ -283,7 +278,6 @@ CORE_END2END_TEST(Http2Test, MaxMessageLengthOnServerOnRequestWithCompression) {
CORE_END2END_TEST(Http2Test,
MaxMessageLengthOnClientOnResponseWithCompression) {
SKIP_IF_MINSTACK();
SKIP_IF_V3();
// Set limit via channel args.
InitServer(ChannelArgs());
InitClient(ChannelArgs().Set(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 5));
Expand Down
Loading

0 comments on commit cf66e64

Please sign in to comment.