Skip to content

Commit

Permalink
[22648] Unacknowledged sample removed in KeepAll mode (#5618)
Browse files Browse the repository at this point in the history
* Refs #22648: Regression test

Signed-off-by: Juanjo Garcia <juanjosegarcia@eprosima.com>

* Refs #22648: corrected bug

Signed-off-by: Juanjo Garcia <juanjosegarcia@eprosima.com>

* Update comment

Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com>

---------

Signed-off-by: Juanjo Garcia <juanjosegarcia@eprosima.com>
Signed-off-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com>
Co-authored-by: EugenioCollado <121509066+EugenioCollado@users.noreply.github.com>
(cherry picked from commit 68f97fe)

# Conflicts:
#	test/blackbox/common/DDSBlackboxTestsListeners.cpp
  • Loading branch information
juanjo4936 authored and mergify[bot] committed Feb 3, 2025
1 parent 7d13f82 commit ee0d3d6
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ bool DataWriterHistory::prepare_change(
if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
{
ret = this->mp_writer->try_remove_change(max_blocking_time, lock);
// If change was removed (ret == 1) in KeepAllHistory, it must have been acked
is_acked = ret;
}
else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
{
Expand Down
133 changes: 133 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3437,6 +3437,139 @@ TEST(DDSStatus, keyed_reliable_positive_acks_disabled_on_unack_sample_removed)
}

/*!
<<<<<<< HEAD
=======
* Regression Test for 22658: when the entire history is acked in volatile, given that the entries are deleted from the
* history, check_acked_status satisfies min_low_mark >= get_seq_num_min() because seq_num_min is unknown. This makes
* try_remove to fail, because it tries to remove changes but there were none. This causes prepare_change to not
* perform the changes, since the history was full and could not delete any changes.
*/

TEST(DDSStatus, entire_history_acked_volatile_unknown_pointer)
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_instances(1)
.resource_limits_max_samples(1)
.resource_limits_max_samples_per_instance(1)
.init();
ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator(2);
for (auto sample : data)
{
// A value of true means that the sample was sent successfully.
// This aligns with the expected behaviour of having the history
// acknowledged and emptied before the next message.
EXPECT_TRUE(writer.send_sample(sample));
}
}

/*¡
* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when writer with keep all
* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was
* checked before the waiting time, and is not re-checked. This should not happen.
*/
TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call)
{
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool
{
static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t,
std::chrono::steady_clock::time_point>> delayed_messages;

uint32_t old_pos = msg.pos;

// Parse writer ID and sequence number
msg.pos += 2; // flags
msg.pos += 2; // inline QoS
msg.pos += 4; // reader ID
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
eprosima::fastdds::rtps::SequenceNumber_t sn;
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);

// Restore buffer position
msg.pos = old_pos;

// Delay logic for user endpoints only
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
{
auto now = std::chrono::steady_clock::now();
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
[&sn](const auto& pair)
{
return pair.first == sn;
});

if (it == delayed_messages.end())
{
// If the sequence number is encountered for the first time, start the delay
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
return true; // Start dropping this message
}
else if (now < it->second)
{
// If the delay period has not elapsed, keep dropping the message
return true;
}
else
{
// Once the delay has elapsed, allow the message to proceed
delayed_messages.erase(it);
}
}
return false; // Allow message to proceed
};

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0))
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_instances(1)
.resource_limits_max_samples(1)
.resource_limits_max_samples_per_instance(1)
.disable_builtin_transport()
.add_user_transport_to_pparams(test_transport)
.init();
ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator(2);

for (auto sample : data)
{
writer.send_sample(sample);
}

EXPECT_EQ(writer.times_unack_sample_removed(), 0u);
}

/*!
>>>>>>> 68f97fe2 ([22648] Unacknowledged sample removed in KeepAll mode (#5618))
* Test that checks with a writer of each type that having the same listener attached, the notified writer in the
* callback is the corresponding writer that has removed a sample unacknowledged.
*/
Expand Down

0 comments on commit ee0d3d6

Please sign in to comment.