Skip to content

Commit

Permalink
[PH2][Test][Promise]
Browse files Browse the repository at this point in the history
  • Loading branch information
tanvi-jagtap committed Feb 3, 2025
1 parent c3f52a1 commit d5ed183
Showing 1 changed file with 14 additions and 40 deletions.
54 changes: 14 additions & 40 deletions test/core/promise/party_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1178,55 +1178,29 @@ struct Payload {

Payload MakePayload(int value) { return Payload{std::make_unique<int>(value)}; }

auto MakeSenderPromise(MpscSender<Payload>& sender, Notification& sent,
std::string& execution_order, int value) {
return [&sender, &sent, &execution_order, value]() {
auto send_promise = sender.Send(MakePayload(value));
Poll<bool> send_result = send_promise();
// Even though we know that sending is not complete, we don't know when it
// will be complete. There is no callback mechanism. We may send our sent
// notification before the send is complete. What is the solution for this?
// EXPECT_TRUE(send_result.ready());
absl::StrAppend(&execution_order, "S", value);
sent.Notify();
};
}

auto MakeReceiverPromise(MpscReceiver<Payload>& receiver, Notification& sent,
std::string& execution_order, int value) {
return [&receiver, &sent, &execution_order, value]() {
sent.WaitForNotification();
auto receive_promise = receiver.Next();
Poll<ValueOrFailure<Payload>> receive_result = receive_promise();
absl::StrAppend(&execution_order, "R", value);
LOG(INFO) << "Received " << value;
};
}

auto OnCompleteNotify(Notification& notification) {
return [&notification](Empty) { notification.Notify(); };
}

auto OnCompleteNoop() {
return [](Empty) {};
}

constexpr int kMpscNumPayloads = 20;
constexpr int kMpscNumThreads = 8;
constexpr int kMpscSleepMs = 10;

TEST_F(PartyTest, MpscManySendersManyPartyStressTest) {
// This is a Integration and Stress Test.
// It tests if Promise Party works well with MPSC in an multi-threaded
// environment. Using multiple parties, on a different thread will ensure that
// we have multiple threads concurrently trying to send on the same MPSC. We
// will have only one receiver. Asserts
// 1. All payloads are sent and received.
// 2. If there is a bug in MPSC which causes any resource to be accessed
// concurrently, we should see a TSAN with this test. Because this test is
// multi-threaded and using different parties.
// environment. Using multiple Party objects, each Party on a different
// thread. We will Spawn promises which write to the MPSC queue on each party,
// and this will ensure that we have multiple threads concurrently trying to
// send on the same MPSC. We will have only one receiver running on a spearate
// thread using a separate Party object.
//
// Asserts:
// 1. If there is a bug in MPSC which causes any resource to be accessed
// concurrently, we should see a TSAN failure with this test - because this
// test is multi-threaded and using different parties.
// 2. All payloads are sent and received.
// Note : Both MPSC and Party can be used independent of each other.
//

// Number of Receivers = 1 // Will be 1 always for MPSC
// Number of Senders = kMpscNumThreads - 1
// Number of Payloads = (kMpscNumThreads - 1) * kMpscNumPayloads
Expand Down Expand Up @@ -1265,7 +1239,7 @@ TEST_F(PartyTest, MpscManySendersManyPartyStressTest) {
});
}

// Receive payloads on the last party and last thread.
// Receive payloads on the last party and Spawn using the last thread.
int num_messages_sent = (kMpscNumThreads - 1) * kMpscNumPayloads;
std::string& receive_order = execution_order[kMpscNumThreads - 1];
RefCountedPtr<Party>& receive_party = parties[kMpscNumThreads - 1];
Expand All @@ -1278,6 +1252,7 @@ TEST_F(PartyTest, MpscManySendersManyPartyStressTest) {
[&receiver, &receive_order]() {
auto receive_promise = receiver.Next();
Poll<ValueOrFailure<Payload>> receive_result = receive_promise();
EXPECT_TRUE(receive_result.ready());
absl::StrAppend(&receive_order, "R");
},
OnCompleteNoop());
Expand All @@ -1287,7 +1262,6 @@ TEST_F(PartyTest, MpscManySendersManyPartyStressTest) {
thread.join();
}
for (int i = 0; i < kMpscNumThreads - 1; i++) {
// Generate the expected order for each thread.
for (int j = 0; j < kMpscNumPayloads; j++) {
// This check ensures that we sent all the payloads.
EXPECT_TRUE(
Expand Down

0 comments on commit d5ed183

Please sign in to comment.