Skip to content

Commit

Permalink
[PH2][Test][Promise] MPSC (grpc#38408)
Browse files Browse the repository at this point in the history
Do Not Review [PH2][Test][Promise] MPSC

Closes grpc#38408

COPYBARA_INTEGRATE_REVIEW=grpc#38408 from tanvi-jagtap:ph2_mpsc_02 5f43282
PiperOrigin-RevId: 726726871
  • Loading branch information
tanvi-jagtap authored and copybara-github committed Feb 14, 2025
1 parent 4374486 commit 3d1f7b7
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 2 deletions.
43 changes: 43 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions test/core/promise/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,37 @@ grpc_cc_test(
],
)

grpc_cc_test(
name = "party_mpsc_test",
srcs = ["party_mpsc_test.cc"],
external_deps = [
"absl/base:core_headers",
"absl/log:log",
"gtest",
],
tags = ["party_mpsc_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:exec_ctx",
"//:gpr",
"//:grpc_unsecure",
"//:ref_counted_ptr",
"//src/core:1999",
"//src/core:context",
"//src/core:default_event_engine",
"//src/core:event_engine_memory_allocator",
"//src/core:memory_quota",
"//src/core:mpsc",
"//src/core:notification",
"//src/core:poll",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:sleep",
"//src/core:time",
],
)

grpc_cc_benchmark(
name = "bm_party",
srcs = ["bm_party.cc"],
Expand Down
189 changes: 189 additions & 0 deletions test/core/promise/party_mpsc_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2025 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <stdio.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/log/log.h"
#include "gtest/gtest.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/event_engine_context.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/util/notification.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/sync.h"
#include "src/core/util/time.h"

namespace grpc_core {

// Testing Promise Parties with MPSC Queues

class PartyMpscTest : public ::testing::Test {
protected:
RefCountedPtr<Party> MakeParty() {
auto arena = SimpleArenaAllocator()->MakeArena();
arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine_.get());
return Party::Make(std::move(arena));
}

private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
grpc_event_engine::experimental::GetDefaultEventEngine();
};

struct Payload {
std::unique_ptr<int> x;
bool operator==(const Payload& other) const {
return (x == nullptr && other.x == nullptr) ||
(x != nullptr && other.x != nullptr && *x == *other.x);
}
bool operator!=(const Payload& other) const { return !(*this == other); }
explicit Payload(std::unique_ptr<int> x) : x(std::move(x)) {}
Payload(const Payload& other)
: x(other.x ? std::make_unique<int>(*other.x) : nullptr) {}

friend std::ostream& operator<<(std::ostream& os, const Payload& payload) {
if (payload.x == nullptr) return os << "Payload{nullptr}";
return os << "Payload{" << *payload.x << "}";
}
};

Payload MakePayload(int value) { return Payload{std::make_unique<int>(value)}; }

auto OnCompleteNoop() {
return [](Empty) {};
}

constexpr int kMpscNumPayloads = 20;
constexpr int kMpscNumThreads = 8;

TEST_F(PartyMpscTest, MpscManySendersManyPartyIntegrationStressTest) {
// This is a Integration and Stress Test.
// It tests if Promise Party works well with MPSC in an multi-threaded
// environment. Using multiple Party objects, with each Party on a different
// thread. We will Spawn promises on each Party that write to the MPSC queue,
// and this will ensure that we have multiple threads concurrently trying to
// Send on the same MPSC. We will have one receiver running on a spearate
// thread using a separate Party object.
//
// Asserts:
// 1. If there is a bug in MPSC which causes any resource to be accessed
// concurrently, we should see a TSAN failure with this test - because this
// test is multi-threaded and using different Party objects.
// 2. All payloads are sent and received.
// Note : Both MPSC and Party can be used independent of each other.
//
// Number of Receivers = 1 // Will be 1 always for MPSC
// Number of Senders = kMpscNumThreads - 1
// Number of Payloads = (kMpscNumThreads - 1) * kMpscNumPayloads
// Number of Parties = kMpscNumThreads
// Number of Threads = kMpscNumThreads

std::vector<std::string> execution_order(kMpscNumThreads);
MpscReceiver<Payload> receiver((kMpscNumThreads - 1) * kMpscNumPayloads);
std::vector<MpscSender<Payload>> senders;
std::vector<RefCountedPtr<Party>> parties;
for (int i = 0; i < kMpscNumThreads; i++) {
if (i < kMpscNumThreads - 1) {
senders.emplace_back(receiver.MakeSender());
}
parties.emplace_back(MakeParty());
}
std::vector<std::thread> threads;
threads.reserve(kMpscNumThreads);

// Spawn on different Party objects using different threads.
// Each Spawned promise will perform the MPSC Send operation.
for (int i = 0; i < kMpscNumThreads - 1; i++) {
MpscSender<Payload>& sender = senders[i];
std::string& order = execution_order[i];
RefCountedPtr<Party>& party = parties[i];
threads.emplace_back([&order, &party, &sender]() {
for (int j = 0; j < kMpscNumPayloads; j++) {
party->Spawn(
"send",
[&sender, &order, value = j]() {
auto send_promise = sender.Send(MakePayload(value));
Poll<bool> send_result = send_promise();
absl::StrAppend(&order, "S", value);
},
OnCompleteNoop());
}
});
}

// Spawn promises on the last Party object using the last thread.
// These Spawned promises will read from the MPSC queue.
int num_messages_sent = (kMpscNumThreads - 1) * kMpscNumPayloads;
std::string& receive_order = execution_order[kMpscNumThreads - 1];
RefCountedPtr<Party>& receive_party = parties[kMpscNumThreads - 1];
threads.emplace_back([&receive_order, &receive_party, &receiver,
&num_messages_sent]() {
for (int j = 0; j < num_messages_sent; j++) {
receive_party->Spawn(
"receive",
[&receiver, &receive_order]() {
auto receive_promise = receiver.Next();
Poll<ValueOrFailure<Payload>> receive_result = receive_promise();
absl::StrAppend(&receive_order, "R");
},
OnCompleteNoop());
}
});

for (auto& thread : threads) {
thread.join(); // Wait for all threads to finish and join.
}

// Asserting that all payloads were sent and received.
for (int i = 0; i < kMpscNumThreads - 1; i++) {
for (int j = 0; j < kMpscNumPayloads; j++) {
// This check ensures that we sent all the payloads.
EXPECT_TRUE(
absl::StrContains(execution_order[i], absl::StrFormat("S%d", j)));
}
}
// For every payload received, one "R" was appended to the receive order.
// This check ensures that we received all the payloads.
EXPECT_EQ(receive_order.length(), num_messages_sent);
}

} // namespace grpc_core

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}
2 changes: 0 additions & 2 deletions test/core/promise/party_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,6 @@ void StressTestAsserts(std::vector<Timestamp>& start_times,

LOG(INFO) << "Small thread run time : " << fastest_thread_run_time;

// TODO(tjagtap) : Too many ways to check the same thing. Explore what we
// want to keep and what to remove. Just presenting all the options here.
Duration total_sleep_time =
Duration::Milliseconds(kNumThreads * kNumSpawns * average_sleep_ms);
float run_time_by_sleep_time = 3.5;
Expand Down
24 changes: 24 additions & 0 deletions tools/run_tests/generated/tests.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3d1f7b7

Please sign in to comment.