diff --git a/examples/cpp/hello_world/HelloWorld_main.cpp b/examples/cpp/hello_world/HelloWorld_main.cpp index 77870d6539d..4be8ad2f496 100644 --- a/examples/cpp/hello_world/HelloWorld_main.cpp +++ b/examples/cpp/hello_world/HelloWorld_main.cpp @@ -34,13 +34,13 @@ int main( char** argv) { auto ret = EXIT_SUCCESS; - hello_world_config config = parse_cli_options(argc, argv); + CLIParser::hello_world_config config = CLIParser::parse_cli_options(argc, argv); if (config.entity == "publisher") { try { - HelloWorldPublisher hello_world_publisher; + HelloWorldPublisher hello_world_publisher(config); hello_world_publisher.run(); } catch (const std::runtime_error& e) @@ -55,7 +55,7 @@ int main( { try { - HelloWorldSubscriberWaitset hello_world_subscriber_waitset; + HelloWorldSubscriberWaitset hello_world_subscriber_waitset(config); hello_world_subscriber_waitset.run(); } catch (const std::runtime_error& e) @@ -68,7 +68,7 @@ int main( { try { - HelloWorldSubscriber hello_world_subscriber; + HelloWorldSubscriber hello_world_subscriber(config); hello_world_subscriber.run(); } catch (const std::runtime_error& e) @@ -82,7 +82,7 @@ int main( else { EPROSIMA_LOG_ERROR(CLI_PARSE, "unknown entity " + config.entity); - print_help(EXIT_FAILURE); + CLIParser::print_help(EXIT_FAILURE); } Log::Reset(); diff --git a/examples/cpp/hello_world/Publisher.cpp b/examples/cpp/hello_world/Publisher.cpp index bee64531215..cdfcdd5ae35 100644 --- a/examples/cpp/hello_world/Publisher.cpp +++ b/examples/cpp/hello_world/Publisher.cpp @@ -33,8 +33,10 @@ using namespace eprosima::fastdds::dds; std::atomic HelloWorldPublisher::stop_(false); +std::condition_variable HelloWorldPublisher::matched_cv_; -HelloWorldPublisher::HelloWorldPublisher() +HelloWorldPublisher::HelloWorldPublisher( + const CLIParser::hello_world_config& config) : participant_(nullptr) , publisher_(nullptr) , topic_(nullptr) @@ -44,6 +46,10 @@ HelloWorldPublisher::HelloWorldPublisher() // Set up the data type with initial values hello_.index(0); hello_.message("Hello world"); + matched_ = 0; + + // Get CLI options + samples_ = config.samples; // Create the participant auto factory = DomainParticipantFactory::get_instance(); @@ -119,7 +125,7 @@ void HelloWorldPublisher::run() { std::thread pub_thread([&] { - while (!is_stopped()) + while (!is_stopped() && (samples_ == 0 || hello_.index() < samples_)) { if (publish()) { @@ -129,7 +135,15 @@ void HelloWorldPublisher::run() std::this_thread::sleep_for(std::chrono::milliseconds(period_)); } }); - std::cout << "Publisher running. Please press Ctrl+C to stop the Publisher at any time." << std::endl; + if (samples_ == 0) + { + std::cout << "Publisher running. Please press Ctrl+C to stop the Publisher at any time." << std::endl; + } + else + { + std::cout << "Publisher running " << samples_ << + " samples. Please press Ctrl+C to stop the Publisher at any time." << std::endl; + } signal(SIGINT, [](int /*signum*/) { std::cout << "\nSIGINT received, stopping Publisher execution." << std::endl; @@ -162,10 +176,14 @@ bool HelloWorldPublisher::publish() matched_cv_.wait(matched_lock, [&]() { // at least one has been discovered - return matched_ > 0; + return matched_ > 0 || is_stopped(); }); - hello_.index(hello_.index() + 1); - return writer_->write(&hello_); + if (!is_stopped()) + { + hello_.index(hello_.index() + 1); + return writer_->write(&hello_); + } + return false; } bool HelloWorldPublisher::is_stopped() @@ -176,4 +194,5 @@ bool HelloWorldPublisher::is_stopped() void HelloWorldPublisher::stop() { stop_.store(true); + matched_cv_.notify_one(); } diff --git a/examples/cpp/hello_world/Publisher.hpp b/examples/cpp/hello_world/Publisher.hpp index 529eee83e9d..c3c252abb6e 100644 --- a/examples/cpp/hello_world/Publisher.hpp +++ b/examples/cpp/hello_world/Publisher.hpp @@ -26,6 +26,7 @@ #include #include +#include "cli_options.hpp" #include "HelloWorldPubSubTypes.h" using namespace eprosima::fastdds::dds; @@ -34,7 +35,8 @@ class HelloWorldPublisher : public DataWriterListener { public: - HelloWorldPublisher(); + HelloWorldPublisher( + const CLIParser::hello_world_config& config); ~HelloWorldPublisher() override; @@ -73,9 +75,11 @@ class HelloWorldPublisher : public DataWriterListener int16_t matched_; + uint16_t samples_; + std::mutex mutex_; - std::condition_variable matched_cv_; + static std::condition_variable matched_cv_; const uint8_t period_ = 100; // in ms }; diff --git a/examples/cpp/hello_world/Subscriber.cpp b/examples/cpp/hello_world/Subscriber.cpp index 4520f4a5470..0bd1276b325 100644 --- a/examples/cpp/hello_world/Subscriber.cpp +++ b/examples/cpp/hello_world/Subscriber.cpp @@ -37,13 +37,18 @@ using namespace eprosima::fastdds::dds; std::atomic HelloWorldSubscriber::stop_(false); std::condition_variable HelloWorldSubscriber::terminate_cv_; -HelloWorldSubscriber::HelloWorldSubscriber() +HelloWorldSubscriber::HelloWorldSubscriber( + const CLIParser::hello_world_config& config) : participant_(nullptr) , subscriber_(nullptr) , topic_(nullptr) , reader_(nullptr) , type_(new HelloWorldPubSubType()) { + // Get CLI options + samples_ = config.samples; + received_samples_ = 0; + // Create the participant auto factory = DomainParticipantFactory::get_instance(); participant_ = factory->create_participant_with_default_profile(nullptr, StatusMask::none()); @@ -119,16 +124,30 @@ void HelloWorldSubscriber::on_data_available( { if (info.instance_state == ALIVE_INSTANCE_STATE && info.valid_data) { + received_samples_++; // Print Hello world message data std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index() << "' RECEIVED" << std::endl; + if (samples_ > 0 && (received_samples_ >= samples_)) + { + stop(); + } } } } void HelloWorldSubscriber::run() { - std::cout << "Subscriber running. Please press Ctrl+C to stop the Subscriber at any time." << std::endl; + if (samples_ == 0) + { + std::cout << "Subscriber running. Please press Ctrl+C to stop the Subscriber at any time." << std::endl; + } + else + { + std::cout << "Subscriber running until " << samples_ << + " samples have been received. Please press Ctrl+C to stop the Subscriber at any time." << std::endl; + } + signal(SIGINT, [](int /*signum*/) { std::cout << "\nSIGINT received, stopping Subscriber execution." << std::endl; diff --git a/examples/cpp/hello_world/Subscriber.hpp b/examples/cpp/hello_world/Subscriber.hpp index 2f1e0e4c565..fa3ab6a30a4 100644 --- a/examples/cpp/hello_world/Subscriber.hpp +++ b/examples/cpp/hello_world/Subscriber.hpp @@ -26,6 +26,7 @@ #include #include +#include "cli_options.hpp" #include "HelloWorldPubSubTypes.h" using namespace eprosima::fastdds::dds; @@ -34,7 +35,8 @@ class HelloWorldSubscriber : public DataReaderListener { public: - HelloWorldSubscriber(); + HelloWorldSubscriber( + const CLIParser::hello_world_config& config); virtual ~HelloWorldSubscriber(); @@ -70,6 +72,10 @@ class HelloWorldSubscriber : public DataReaderListener TypeSupport type_; + uint16_t samples_; + + uint16_t received_samples_; + static std::atomic stop_; mutable std::mutex terminate_cv_mtx_; diff --git a/examples/cpp/hello_world/SubscriberWaitset.cpp b/examples/cpp/hello_world/SubscriberWaitset.cpp index 2239a2d8c78..c588edbacb9 100644 --- a/examples/cpp/hello_world/SubscriberWaitset.cpp +++ b/examples/cpp/hello_world/SubscriberWaitset.cpp @@ -39,13 +39,18 @@ using namespace eprosima::fastdds::dds; std::atomic HelloWorldSubscriberWaitset::stop_(false); GuardCondition HelloWorldSubscriberWaitset::terminate_condition_; -HelloWorldSubscriberWaitset::HelloWorldSubscriberWaitset() +HelloWorldSubscriberWaitset::HelloWorldSubscriberWaitset( + const CLIParser::hello_world_config& config) : participant_(nullptr) , subscriber_(nullptr) , topic_(nullptr) , reader_(nullptr) , type_(new HelloWorldPubSubType()) { + // Get CLI options + samples_ = config.samples; + received_samples_ = 0; + // Create the participant auto factory = DomainParticipantFactory::get_instance(); participant_ = factory->create_participant_with_default_profile(nullptr, StatusMask::none()); @@ -145,9 +150,14 @@ void HelloWorldSubscriberWaitset::run() { if (info.instance_state == ALIVE_INSTANCE_STATE && info.valid_data) { + received_samples_++; // Print Hello world message data std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index() << "' RECEIVED" << std::endl; + if (samples_ > 0 && (received_samples_ >= samples_)) + { + stop(); + } } } } @@ -156,8 +166,16 @@ void HelloWorldSubscriberWaitset::run() } }); - std::cout << "Waitset Subscriber running. Please press Ctrl+C to stop the Waitset Subscriber at any time." - << std::endl; + if (samples_ == 0) + { + std::cout << "Waitset Subscriber running. Please press Ctrl+C to stop the Waitset Subscriber at any time." + << std::endl; + } + else + { + std::cout << "Waitset Subscriber running until " << samples_ << + " samples have been received. Please press Ctrl+C to stop the Waitset Subscriber at any time." << std::endl; + } signal(SIGINT, [](int /*signum*/) { std::cout << "\nSIGINT received, stopping Waitset Subscriber execution." << std::endl; diff --git a/examples/cpp/hello_world/SubscriberWaitset.hpp b/examples/cpp/hello_world/SubscriberWaitset.hpp index 14d064d0ade..60959f618de 100644 --- a/examples/cpp/hello_world/SubscriberWaitset.hpp +++ b/examples/cpp/hello_world/SubscriberWaitset.hpp @@ -28,6 +28,7 @@ #include #include +#include "cli_options.hpp" #include "HelloWorldPubSubTypes.h" using namespace eprosima::fastdds::dds; @@ -36,7 +37,8 @@ class HelloWorldSubscriberWaitset { public: - HelloWorldSubscriberWaitset(); + HelloWorldSubscriberWaitset( + const CLIParser::hello_world_config& config); virtual ~HelloWorldSubscriberWaitset(); @@ -65,6 +67,10 @@ class HelloWorldSubscriberWaitset WaitSet wait_set_; + uint16_t samples_; + + uint16_t received_samples_; + static std::atomic stop_; static GuardCondition terminate_condition_; diff --git a/examples/cpp/hello_world/cli_options.hpp b/examples/cpp/hello_world/cli_options.hpp index 5236ed3ffad..1c70f397589 100644 --- a/examples/cpp/hello_world/cli_options.hpp +++ b/examples/cpp/hello_world/cli_options.hpp @@ -15,76 +15,111 @@ #include #include -struct subscriber_config -{ - bool use_waitset; -}; +#ifndef _FASTDDS_HELLO_WORLD_CLI_PARSER_HPP_ +#define _FASTDDS_HELLO_WORLD_CLI_PARSER_HPP_ -struct hello_world_config +class CLIParser { - std::string entity; - subscriber_config sub_config; -}; +public: + CLIParser() = delete; -void print_help( - uint8_t return_code) -{ - std::cout << "Usage: hello_world [options]" << std::endl; - std::cout << "" << std::endl; - std::cout << "Entities:" << std::endl; - std::cout << " publisher Run a publisher entity" << std::endl; - std::cout << " subscriber Run a subscriber entity" << std::endl; - std::cout << "Common options:" << std::endl; - std::cout << " -h, --help Print this help message" << std::endl; - std::cout << "Subscriber options:" << std::endl; - std::cout << " -w, --waitset Use waitset read condition" << std::endl; - std::exit(return_code); -} + struct subscriber_config + { + bool use_waitset; + }; -hello_world_config parse_cli_options ( - int argc, - char* argv[]) -{ - hello_world_config config; - config.entity = ""; - config.sub_config.use_waitset = false; + struct hello_world_config + { + std::string entity; + uint16_t samples; + subscriber_config sub_config; + }; - for (int i = 1; i < argc; ++i) + static void print_help( + uint8_t return_code) { - std::string arg = argv[i]; - if (arg == "-h" || arg == "--help") - { - print_help(EXIT_SUCCESS); - } - else if (arg == "publisher" || arg == "subscriber") - { - config.entity = arg; - } - else if (arg == "-w" || arg == "--waitset") + std::cout << "Usage: hello_world [options]" << std::endl; + std::cout << "" << std::endl; + std::cout << "Entities:" << std::endl; + std::cout << " publisher Run a publisher entity" << std::endl; + std::cout << " subscriber Run a subscriber entity" << std::endl; + std::cout << "Common options:" << std::endl; + std::cout << " -h, --help Print this help message" << std::endl; + std::cout << " -s, --samples Amount of samples to be sent or" << std::endl; + std::cout << " received (default: 0 [unlimited])" << std::endl; + std::cout << "Subscriber options:" << std::endl; + std::cout << " -w, --waitset Use waitset read condition" << std::endl; + std::exit(return_code); + } + + static hello_world_config parse_cli_options( + int argc, + char* argv[]) + { + hello_world_config config; + config.entity = ""; + config.samples = 0; + config.sub_config.use_waitset = false; + + for (int i = 1; i < argc; ++i) { - if (config.entity == "subscriber") + std::string arg = argv[i]; + if (arg == "-h" || arg == "--help") + { + print_help(EXIT_SUCCESS); + } + else if (arg == "publisher" || arg == "subscriber") + { + config.entity = arg; + } + else if (arg == "-s" || arg == "--samples") { - config.sub_config.use_waitset = true; + if (i + 1 < argc) + { + try + { + config.samples = std::stoi(argv[++i]); + } + catch (const std::invalid_argument& e) + { + EPROSIMA_LOG_ERROR(CLI_PARSE, "invalid sample argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSE, "missing argument for " + arg); + print_help(EXIT_FAILURE); + } + } + else if (arg == "-w" || arg == "--waitset") + { + if (config.entity == "subscriber") + { + config.sub_config.use_waitset = true; + } + else + { + EPROSIMA_LOG_ERROR(CLI_PARSE, "waitset can only be used with the subscriber entity"); + print_help(EXIT_FAILURE); + } } else { - EPROSIMA_LOG_ERROR(CLI_PARSE, "waitset can only be used with the subscriber entity"); + EPROSIMA_LOG_ERROR(CLI_PARSE, "unknown option " + arg); print_help(EXIT_FAILURE); } } - else + + if (config.entity == "") { - EPROSIMA_LOG_ERROR(CLI_PARSE, "unknown option " + arg); + EPROSIMA_LOG_ERROR(CLI_PARSE, "entity not specified"); print_help(EXIT_FAILURE); } - } - if (config.entity == "") - { - EPROSIMA_LOG_ERROR(CLI_PARSE, "entity not specified"); - print_help(EXIT_FAILURE); + return config; } +}; - return config; -} +#endif // _FASTDDS_HELLO_WORLD_CLI_PARSER_HPP_ diff --git a/test/examples/hello_world.compose.yml b/test/examples/hello_world.compose.yml index 587db20ad03..e2ad7390b7d 100644 --- a/test/examples/hello_world.compose.yml +++ b/test/examples/hello_world.compose.yml @@ -12,7 +12,7 @@ services: LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@ EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/hello_world@FILE_EXTENSION@ FASTDDS_DEFAULT_PROFILES_FILE: @PROJECT_BINARY_DIR@/examples/cpp/hello_world/hello_world_profile.xml - command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ subscriber --waitset" + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ subscriber --waitset --samples 10" subscriber-publisher: image: @DOCKER_IMAGE_NAME@ @@ -24,6 +24,6 @@ services: LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@ EXAMPLE_DIR: @PROJECT_BINARY_DIR@/examples/cpp/hello_world FASTDDS_DEFAULT_PROFILES_FILE: @PROJECT_BINARY_DIR@/examples/cpp/hello_world/hello_world_profile.xml - command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ subscriber & $${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ publisher" + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ subscriber --samples 10 & $${EXAMPLE_DIR}/hello_world@FILE_EXTENSION@ publisher --samples 10" depends_on: - waitset-subscriber diff --git a/test/examples/test_examples.py b/test/examples/test_examples.py index d6fc93974e9..a1e89044349 100644 --- a/test/examples/test_examples.py +++ b/test/examples/test_examples.py @@ -1,8 +1,5 @@ """.""" -import os -import signal import subprocess -from subprocess import Popen, PIPE, TimeoutExpired def test_basic_configuration(): """.""" @@ -44,30 +41,34 @@ def test_hello_world(): """.""" ret = False out = '' - with Popen('@DOCKER_EXECUTABLE@ compose -f hello_world.compose.yml up', - shell=True, - stdout=PIPE, - preexec_fn=os.setsid) as process: - try: - out = process.communicate(timeout=3)[0].decode().split('\n') - except TimeoutExpired: - os.killpg(process.pid, signal.SIGINT) - out = process.communicate()[0].decode().split('\n') - sent = 0 - received = 0 - for line in out: - if 'SENT' in line: - sent += 1 - continue + try: + out = subprocess.check_output( + '@DOCKER_EXECUTABLE@ compose -f hello_world.compose.yml up', + stderr=subprocess.STDOUT, + shell=True, + timeout=30 + ).decode().split('\n') + + sent = 0 + received = 0 + for line in out: + if 'SENT' in line: + sent += 1 + continue + + if 'RECEIVED' in line: + received += 1 + continue - if 'RECEIVED' in line: - received += 1 - continue + if sent != 0 and received != 0 and sent * 2 == received: + ret = True + else: + raise subprocess.CalledProcessError(1, '') - if sent != 0 and received != 0 and sent * 2 == received: - ret = True - else: + except subprocess.CalledProcessError: for l in out: print(l) + except subprocess.TimeoutExpired: + print('TIMEOUT') assert(ret)