diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index abc2e0be..eab37fca 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,3 +49,5 @@ install( COMPONENT dev) add_subdirectory(roundtrip) + +add_subdirectory(hop) diff --git a/examples/hop/CMakeLists.txt b/examples/hop/CMakeLists.txt new file mode 100644 index 00000000..f6d2e5ab --- /dev/null +++ b/examples/hop/CMakeLists.txt @@ -0,0 +1,32 @@ +# +# Copyright(c) 2024 ZettaScale Technology and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +project(helloworld LANGUAGES C CXX) +cmake_minimum_required(VERSION 3.16) + +set(CMAKE_CXX_STANDARD 17) + +find_package(CycloneDDS REQUIRED) +if(NOT TARGET CycloneDDS-CXX::ddscxx) + find_package(CycloneDDS-CXX REQUIRED) +endif() + +idlcxx_generate(TARGET hop_type FILES hop_type.idl) +idlcxx_generate(TARGET mop_type FILES mop_type.idl) + +add_executable(hop hop.cpp) +target_link_libraries(hop CycloneDDS-CXX::ddscxx hop_type) + +add_executable(mop mop.cpp) +target_link_libraries(mop CycloneDDS-CXX::ddscxx mop_type) + +add_executable(cwpl cwpl.cpp) +target_link_libraries(cwpl CycloneDDS-CXX::ddscxx mop_type) diff --git a/examples/hop/cwpl.cpp b/examples/hop/cwpl.cpp new file mode 100644 index 00000000..0e70b5c0 --- /dev/null +++ b/examples/hop/cwpl.cpp @@ -0,0 +1,293 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "dds/dds.hpp" +#include "mop_type.hpp" + +using namespace org::eclipse::cyclonedds; +using namespace std::chrono_literals; +using namespace std::chrono; + +using CLK = high_resolution_clock; + +enum class Type { T8, T128, T1k, T8k, T128k }; + +static Type type = Type::T128; +static uint32_t pairid = 0; +static uint32_t ntopics = 10; +static bool random_timing = false; +static std::optional datafile; + +static dds::core::Time mkDDSTime (const time_point x) +{ + int64_t t = duration_cast(x.time_since_epoch()).count(); + return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); +} + +static volatile std::atomic interrupted = false; +static void sigh(int sig) +{ + static_cast(sig); + interrupted = true; +} + +template +static dds::sub::DataReader make_reader(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector spart{"P" + std::to_string(pairid + 1 - 2 * (pairid % 2))}; + dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart); + dds::sub::Subscriber sub{dp, sqos}; + return dds::sub::DataReader{sub, tp, tp.qos()}; +} + +template +static dds::pub::DataWriter make_writer(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector ppart{"P" + std::to_string(pairid)}; + dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart); + dds::pub::Publisher pub{dp, pqos}; + return dds::pub::DataWriter{pub, tp, tp.qos()}; +} + +template +static void source(dds::pub::DataWriter wr) +{ + T sample{}; + sample.k(0); + auto now = CLK::now(); + while (!interrupted) + { + wr.write(sample, mkDDSTime(CLK::now())); + ++sample.seq(); + now += 10ms; + std::this_thread::sleep_until(now); + } +} + +template +static void randomsource(dds::pub::DataWriter wr) +{ + std::random_device ran_dev; + std::mt19937 prng(ran_dev()); + std::exponential_distribution intvdist(100); + + T sample{}; + sample.k(0); + auto now = CLK::now(); + while (!interrupted) + { + wr.write(sample, mkDDSTime(CLK::now())); + ++sample.seq(); + auto delay = std::chrono::duration(intvdist(prng)); + if (delay > 1s) + delay = 1s; + now += std::chrono::duration_cast(delay); + std::this_thread::sleep_until(now); + } +} + +// t = reception time, l = latency, i = topic index, k = source key +struct TLK { int64_t t; double l; uint32_t k; }; +struct TLIK { int64_t t; double l; size_t i; uint32_t k; }; +struct LIK { double l; size_t i; uint32_t k; }; + +template +class Sink : public dds::sub::NoOpDataReaderListener { +public: + Sink() = default; + + const std::vector& lats() const { + return lats_; + }; + +private: + void on_data_available(dds::sub::DataReader& rd) + { + const auto now_clk = CLK::now(); + const int64_t now = duration_cast(now_clk.time_since_epoch()).count(); + auto xs = rd.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); + lats_.push_back(TLK{now, lat / 1e3, x.data().k()}); + } else { + interrupted = true; + } + }; + } + + std::vector lats_; +}; + +template +static void run() +{ + dds::domain::DomainParticipant dp{0}; + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::History::KeepLast(1); + std::vector> tps; + std::vector> wrs; + for (uint32_t i = 0; i < ntopics; i++) + tps.push_back(dds::topic::Topic{dp, "Mop" + std::to_string(i), tpqos}); + for (auto& tp : tps) + wrs.push_back(make_writer(tp)); + std::vector> rds; + std::vector> ls; + for (size_t i = 0; i < tps.size(); i++) + ls.push_back(Sink{}); + for (size_t i = 0; i < tps.size(); i++) + rds.push_back(make_reader(tps[i])); + for (size_t i = 0; i < tps.size(); i++) + rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available()); + + signal(SIGINT, sigh); + signal(SIGTERM, sigh); + std::vector threads; + for (auto wr : wrs) + threads.push_back(std::thread(random_timing ? randomsource : source, wr)); + + // latencies in microseconds + std::vector lats; + while (!interrupted) + std::this_thread::sleep_for(103ms); + for (auto& t : threads) + t.join(); + for (auto rd : rds) + rd.close(); + for (auto wr : wrs) + wr.close(); + // collect latencies for all topics and sort by reception time + std::vector tlats; + for (size_t i = 0; i < ls.size(); i++) + for (const auto& x : ls[i].lats()) + tlats.push_back(TLIK{x.t, x.l, i, x.k}); + std::sort(tlats.begin(), tlats.end(), [](const TLIK& a, const TLIK& b) -> bool { return a.t < b.t; }); + // then reduce to just latency, topic and key + for (const auto& x : tlats) + lats.push_back(LIK{x.l, x.i, x.k}); + + if (datafile.has_value()) + { + std::ofstream f; + f.open(datafile.value()); + for (const auto& l : lats) + f << l.l << " " << l.i << " " << l.k << std::endl; + f.close(); + } + const size_t n = lats.size(); + if (n < 2) { + std::cout << "insufficient data" << std::endl; + } else { + std::sort(lats.begin(), lats.end(), [](const LIK& a, const LIK& b) -> bool { return a.l < b.l; }); + std::cout + << "received " << n + << " samples; min " << lats[0].l + << " max-1 " << lats[n-2].l + << " max " << lats[n-1].l << std::endl; + } +} + +[[noreturn]] +static void usage() +{ + std::cout + << "usage: cwpl [OPTIONS] id" << std::endl + << std::endl + << "OPTIONS:" << std::endl + << "-tTYPE type to use one of 8, 128 (def), 1k, 8k, 128k" << std::endl + << "-nNTPS use N (def = 10) topics in parallel" << std::endl + << "-r use randomized write intervals with average 10ms" << std::endl + << "-oFILE write latencies to FILE" << std::endl + << std::endl + << "id = 0 writes in partition P0, reads from P1" << std::endl + << "id = 1 writes in partition P1, reads from P0" << std::endl + << "id = 2 writes in partition P2, reads from P3" << std::endl + << "etc." << std::endl; + std::exit(1); +} + +static Type convert_typestr (const std::string& typestr) +{ + if (typestr == "8") { + return Type::T8; + } else if (typestr == "128") { + return Type::T128; + } else if (typestr == "1k") { + return Type::T1k; + } else if (typestr == "8k") { + return Type::T8k; + } else if (typestr == "128k") { + return Type::T128k; + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); + return Type::T128; + } +} + +int main (int argc, char **argv) +{ + if (argc < 1) + usage(); + + int opt; + while ((opt = getopt (argc, argv, "n:o:rt:")) != EOF) + { + switch (opt) + { + case 'n': + ntopics = static_cast(std::atoi(optarg)); + break; + case 'o': + datafile = std::string(optarg); + break; + case 'r': + random_timing = true; + break; + case 't': + type = convert_typestr(std::string(optarg)); + break; + default: + usage(); + } + } + if (argc - optind != 1) + { + usage(); + } + pairid = static_cast(std::atoi(argv[optind])); + switch (type) + { + case Type::T8: run(); break; + case Type::T128: run(); break; + case Type::T1k: run(); break; + case Type::T8k: run(); break; + case Type::T128k: run(); break; + } + return 0; +} diff --git a/examples/hop/hop.cpp b/examples/hop/hop.cpp new file mode 100644 index 00000000..5c5cd7bb --- /dev/null +++ b/examples/hop/hop.cpp @@ -0,0 +1,355 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "dds/dds.hpp" +#include "hop_type.hpp" + +using namespace org::eclipse::cyclonedds; +using namespace std::chrono_literals; + +static bool use_listener = true; +static double junkrate = 0.0; + +template +static dds::core::Time mkDDSTime (const std::chrono::time_point x) +{ + int64_t t = std::chrono::duration_cast(x.time_since_epoch()).count(); + return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); +} + +static volatile std::atomic interrupted = false; +static void sigh(int sig) +{ + static_cast(sig); + interrupted = true; +} + +static const dds::sub::status::DataState not_read() +{ + return dds::sub::status::DataState(dds::sub::status::SampleState::not_read(), + dds::sub::status::ViewState::any(), + dds::sub::status::InstanceState::any()); +} + +template +static dds::sub::DataReader make_reader(dds::topic::Topic tp, int stage) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector spart{"P" + std::to_string(stage)}; + dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart); + dds::sub::Subscriber sub{dp, sqos}; + return dds::sub::DataReader{sub, tp, tp.qos()}; +} + +template +static dds::pub::DataWriter make_writer(dds::topic::Topic tp, int stage) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector ppart{"P" + std::to_string(stage)}; + dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart); + dds::pub::Publisher pub{dp, pqos}; + return dds::pub::DataWriter{pub, tp, tp.qos()}; +} + +// to be run on a separate thread +template +static void junksource(dds::topic::Topic tp) +{ + std::random_device ran_dev; + std::mt19937 prng(ran_dev()); + //std::uniform_int_distribution<> wrdist(0, tps.size() - 1); + std::exponential_distribution intvdist(junkrate); + std::vector> wrs; + wrs.push_back(make_writer(tp, -1)); + T sample{}; + auto now = std::chrono::high_resolution_clock::now(); + while (!interrupted) + { + //wrs[wrdist(prng)] << sample; + wrs[0] << sample; + ++sample.seq(); + auto delay = std::chrono::duration(intvdist(prng)); + if (delay > 1s) + delay = 1s; + now += std::chrono::duration_cast(delay); + std::this_thread::sleep_until(now); + } + std::cout << "wrote " << sample.seq() << " junk samples" << std::endl; +} + +template +static dds::sub::DataReader make_junkreader(dds::topic::Topic tp) +{ + return make_reader(tp, -1); +} + +template +static void source(dds::topic::Topic tp, int stage, const std::optional) +{ + auto wr = make_writer(tp, stage); + signal(SIGINT, sigh); + T sample{}; + auto now = std::chrono::high_resolution_clock::now(); + // give forwarders and sink time to start & discovery to run + std::cout << "starting in 1s" << std::endl; + now += 1s; + std::this_thread::sleep_until(now); + while (!interrupted) + { + wr.write(sample, mkDDSTime(now)); + ++sample.seq(); + now += 10ms; + std::this_thread::sleep_until(now); + } + std::cout << "wrote " << sample.seq() << " samples" << std::endl; +} + +template +static void run_reader(dds::sub::DataReaderListener *list, dds::sub::DataReader rd, std::function action) +{ + if (use_listener) + { + rd.listener(list, dds::core::status::StatusMask::data_available()); + while (!interrupted) + std::this_thread::sleep_for(103ms); + } + else + { + dds::core::cond::WaitSet ws; + dds::sub::cond::ReadCondition rc{rd, not_read()}; + ws += rc; + while (!interrupted) + { + ws.wait(); + action(); + } + } +} + +template +class Forward : public dds::sub::NoOpDataReaderListener { +public: + Forward() = delete; + Forward(dds::sub::DataReader rd, dds::pub::DataWriter wr) : rd_{rd}, wr_{wr} { } + + void run() + { + run_reader(this, rd_, [this](){action();}); + } + +private: + void action() + { + auto xs = rd_.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + wr_.write (x.data(), x.info().timestamp()); + } else { + interrupted = true; + } + }; + } + + void on_data_available(dds::sub::DataReader&) + { + action(); + } + + dds::sub::DataReader rd_; + dds::pub::DataWriter wr_; +}; + +template +static void forward(dds::topic::Topic tp, int stage, const std::optional) +{ + auto rd = make_reader(tp, stage); + auto wr = make_writer(tp, stage + 1); + Forward x{rd, wr}; + x.run(); +} + +template +class Sink : public dds::sub::NoOpDataReaderListener { +public: + Sink() = delete; + Sink(dds::sub::DataReader rd, std::vector& lats) : rd_{rd}, lats_{lats} { } + + void run() + { + run_reader(this, rd_, [this](){action();}); + } + +private: + void action() + { + const auto now = std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + auto xs = rd_.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); + lats_.push_back(lat / 1e3); + } else { + interrupted = true; + } + }; + } + + void on_data_available(dds::sub::DataReader&) + { + action(); + } + + dds::sub::DataReader rd_; + std::vector& lats_; +}; + +template +static void sink(dds::topic::Topic tp, int stage, const std::optional datafile) +{ + // latencies in microseconds + std::vector lats; + // read until source disappears + // always create the "junk reader": it costs us nothing if no junk data is being published + { + auto rd = make_reader(tp, stage); + Sink x{rd, lats}; + x.run(); + } + // destructors will have run, latencies are ours now + if (datafile.has_value()) + { + std::ofstream f; + f.open(datafile.value()); + for (const auto l : lats) + f << l << std::endl; + f.close(); + } + const size_t n = lats.size(); + if (n < 2) { + std::cout << "insufficient data" << std::endl; + } else { + std::sort(lats.begin(), lats.end()); + std::cout << "received " << n << " samples; min " << lats[0] << " max-1 " << lats[n-2] << " max " << lats[n-1] << std::endl; + } +} + +enum class Mode { Source, Forward, Sink }; + +template +static void run(const Mode mode, int stage, const std::optional datafile) +{ + dds::domain::DomainParticipant dp{0}; + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::History::KeepLast(1); + dds::topic::Topic tp(dp, "Hop", tpqos); + std::thread junkthr; + if (junkrate > 0) + junkthr = std::thread(junksource, tp); + auto junkrd = make_junkreader(tp); + switch (mode) + { + case Mode::Source: source(tp, stage, datafile); break; + case Mode::Forward: forward(tp, stage, datafile); break; + case Mode::Sink: sink(tp, stage, datafile); break; + } + if (junkthr.joinable()) + junkthr.join(); +} + +// type=128 n=1 bash -c 'bin/hop sink -ohop-result.$n.txt $n $type & i=0;while [[ i -lt n ]]; do bin/hop forward $i $type & i=$((i+1)) ; done ; bin/hop source 0 $type' +// for n in {8..10} ; do n=$n type=128 bash -c 'bin/hop sink -ohop-result.$n.txt $n $type & i=0;while [[ i -lt n ]]; do bin/hop forward $i $type & i=$((i+1)) ; done ; bin/hop source 0 $type & p=$! ; sleep 10 ; kill -INT $p ; wait' ; done + +[[noreturn]] +static void usage() +{ + std::cout + << "usage: hop {source|forward|sink} [OPTIONS] STAGE TYPE" << std::endl + << "OPTIONS:" << std::endl + << "-jRATE write junk at RATE Hz" << std::endl + << "-w: use waitset instead of listener (forward, sink)" << std::endl + << "-oFILE write latencies to FILE (sink)" << std::endl + << "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); +} + +int main (int argc, char **argv) +{ + if (argc < 2) + usage(); + const std::string modestr = std::string(argv[1]); + Mode mode; + if (modestr == "source") { + mode = Mode::Source; + } else if (modestr == "forward") { + mode = Mode::Forward; + } else if (modestr == "sink") { + mode = Mode::Sink; + } else { + std::cout << "invalid mode, should be source, forward or sink" << std::endl; + return 1; + } + + std::optional datafile; + optind = 2; + int opt; + while ((opt = getopt (argc, argv, "j:o:w")) != EOF) + { + switch (opt) + { + case 'j': + junkrate = std::atof(optarg); + break; + case 'o': + datafile = std::string(optarg); + break; + case 'w': + use_listener = false; + break; + default: + usage(); + } + } + + if (argc - optind != 2) + usage(); + const int stage = std::atoi(argv[optind]); + const std::string typestr = std::string(argv[optind + 1]); + if (typestr == "8") { + run(mode, stage, datafile); + } else if (typestr == "128") { + run(mode, stage, datafile); + } else if (typestr == "1k") { + run(mode, stage, datafile); + } else if (typestr == "8k") { + run(mode, stage, datafile); + } else if (typestr == "128k") { + run(mode, stage, datafile); + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + return 1; + } + return 0; +} diff --git a/examples/hop/hop_type.idl b/examples/hop/hop_type.idl new file mode 100644 index 00000000..004abf96 --- /dev/null +++ b/examples/hop/hop_type.idl @@ -0,0 +1,37 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +@final @topic +struct Hop8 { + uint32 seq; + octet z[8 - 4]; +}; +@final @topic +struct Hop128 { + uint32 seq; + octet z[128 - 4]; +}; +@final @topic +struct Hop1k { + uint32 seq; + octet z[1024 - 4]; +}; +@final @topic +struct Hop8k { + uint32 seq; + octet z[8*1024 - 4]; +}; +@final @topic +struct Hop128k { + uint32 seq; + octet z[128*1024 - 4]; +}; diff --git a/examples/hop/mop.cpp b/examples/hop/mop.cpp new file mode 100644 index 00000000..fbe168b7 --- /dev/null +++ b/examples/hop/mop.cpp @@ -0,0 +1,396 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "dds/dds.hpp" +#include "mop_type.hpp" + +using namespace org::eclipse::cyclonedds; +using namespace std::chrono_literals; +using namespace std::chrono; + +using CLK = high_resolution_clock; + +enum class Type { T8, T128, T1k, T8k, T128k }; + +static Type type = Type::T128; +static bool sleep_between_write = false; +static uint32_t keyval = static_cast(getpid()); +static uint32_t ntopics = 10; +static uint32_t stagger = 0; // ms +static std::optional pubidx; +static std::optional datafile; + +static dds::core::Time mkDDSTime (const time_point x) +{ + int64_t t = duration_cast(x.time_since_epoch()).count(); + return dds::core::Time(t / 1000000000, static_cast(t % 1000000000)); +} + +static volatile std::atomic interrupted = false; +static void sigh(int sig) +{ + static_cast(sig); + interrupted = true; +} + +template +static dds::sub::DataReader make_reader(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector spart{"P"}; + dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart); + dds::sub::Subscriber sub{dp, sqos}; + return dds::sub::DataReader{sub, tp, tp.qos()}; +} + +template +static dds::pub::DataWriter make_writer(dds::topic::Topic tp) +{ + dds::domain::DomainParticipant dp = tp.domain_participant(); + std::vector ppart{"P"}; + dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart); + dds::pub::Publisher pub{dp, pqos}; + return dds::pub::DataWriter{pub, tp, tp.qos()}; +} + +static void make_start_entities (dds::domain::DomainParticipant dp, dds::sub::DataReader& rd, dds::pub::DataWriter& wr) +{ + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::Durability::TransientLocal() + << dds::core::policy::History::KeepLast(1); + auto tp = dds::topic::Topic(dp, "MopSync", tpqos); + rd = make_reader(tp); + wr = make_writer(tp); +} + +static time_point get_start_time(dds::sub::DataReader rd, dds::pub::DataWriter wr) +{ + auto tstart = CLK::now() + 1s; + int64_t tstart_int64 = duration_cast(tstart.time_since_epoch()).count(); + std::cout << keyval << " proposing " << tstart.time_since_epoch().count() << std::endl; + wr << MopSync{keyval, tstart_int64}; + while (CLK::now() < tstart - 2ms) + { + auto ms = rd.take(); + for (const auto& m : ms) + { + if (!m.info().valid()) + continue; + auto prop = time_point(nanoseconds(m.data().tstart())); + if (prop > tstart) + { + tstart = prop; + std::cout << keyval << " updating to " << tstart.time_since_epoch().count() << " from " << m.data().k() << std::endl; + } + } + std::this_thread::sleep_for(1ms); + } + tstart += milliseconds(stagger); + std::cout << keyval << " starting at " << tstart.time_since_epoch().count() << std::endl; + return tstart; +} + +template +static void source(std::vector>& tps) +{ + // make entities for synchronised start first, make sure they stay around while + // we measure to avoid disturbing the measurement with the entity deletion and + // associated discovery work + dds::sub::DataReader start_rd = dds::core::null; + dds::pub::DataWriter start_wr = dds::core::null; + make_start_entities(tps[0].domain_participant(), start_rd, start_wr); + std::vector> wrs; + for (auto tp : tps) + wrs.push_back(make_writer(tp)); + signal(SIGINT, sigh); + signal(SIGTERM, sigh); + T sample{}; + sample.k(keyval); + auto now = get_start_time (start_rd, start_wr); + std::this_thread::sleep_until(now); + while (!interrupted) + { + if (pubidx.has_value()) + { + wrs[pubidx.value()].write(sample, mkDDSTime(CLK::now())); + } + else + { + auto nowx = now; + for (auto wr : wrs) + { + wr.write(sample, mkDDSTime(CLK::now())); + if (sleep_between_write) + { + nowx += 100us; + std::this_thread::sleep_until(nowx); + } + } + } + ++sample.seq(); + now += 10ms; + std::this_thread::sleep_until(now); + } + std::cout << keyval << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl; +} + +// t = reception time, l = latency, i = topic index, k = source key +struct TLK { int64_t t; double l; uint32_t k; }; +struct TLIK { int64_t t; double l; size_t i; uint32_t k; }; +struct LIK { double l; size_t i; uint32_t k; }; + +template +class Sink : public dds::sub::NoOpDataReaderListener { +public: + Sink() = default; + + const std::vector& lats() const { + return lats_; + }; + +private: + void on_data_available(dds::sub::DataReader& rd) + { + const auto now_clk = CLK::now(); + const int64_t now = duration_cast(now_clk.time_since_epoch()).count(); + auto xs = rd.take(); + for (const auto& x : xs) { + if (x.info().valid()) { + const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec()); + lats_.push_back(TLK{now, lat / 1e3, x.data().k()}); + } else { + interrupted = true; + } + }; + } + + std::vector lats_; +}; + +template +static void sink(std::vector>& tps) +{ + // latencies in microseconds + std::vector lats; + // read until source disappears + { + std::vector> rds; + std::vector> ls; + for (size_t i = 0; i < tps.size(); i++) + ls.push_back(Sink{}); + for (size_t i = 0; i < tps.size(); i++) + rds.push_back(make_reader(tps[i])); + for (size_t i = 0; i < tps.size(); i++) + rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available()); + while (!interrupted) + std::this_thread::sleep_for(103ms); + for (auto rd : rds) + rd.close(); + // collect latencies for all topics and sort by reception time + std::vector tlats; + for (size_t i = 0; i < ls.size(); i++) + for (const auto& x : ls[i].lats()) + tlats.push_back(TLIK{x.t, x.l, i, x.k}); + std::sort(tlats.begin(), tlats.end(), [](const TLIK& a, const TLIK& b) -> bool { return a.t < b.t; }); + // then reduce to just latency, topic and key + for (const auto& x : tlats) + lats.push_back(LIK{x.l, x.i, x.k}); + } + if (datafile.has_value()) + { + std::ofstream f; + f.open(datafile.value()); + for (const auto& l : lats) + f << l.l << " " << l.i << " " << l.k << std::endl; + f.close(); + } + const size_t n = lats.size(); + if (n < 2) { + std::cout << "insufficient data" << std::endl; + } else { + std::sort(lats.begin(), lats.end(), [](const LIK& a, const LIK& b) -> bool { return a.l < b.l; }); + std::cout + << "received " << n + << " samples; min " << lats[0].l + << " max-1 " << lats[n-2].l + << " max " << lats[n-1].l << std::endl; + } +} + +enum class Mode { Source, Sink }; + +template +static void run(const Mode mode) +{ + dds::domain::DomainParticipant dp{0}; + auto tpqos = dp.default_topic_qos() + << dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite()) + << dds::core::policy::History::KeepLast(1); + std::vector> tps; + for (uint32_t i = 0; i < ntopics; i++) + tps.push_back(dds::topic::Topic{dp, "Mop" + std::to_string(i), tpqos}); + switch (mode) + { + case Mode::Source: source(tps); break; + case Mode::Sink: sink(tps); break; + } +} + +[[noreturn]] +static void usage() +{ + std::cout + << "usage: mop {source|sink} [OPTIONS]" << std::endl + << std::endl + << "COMMON OPTIONS:" << std::endl + << "-tTYPE type to use one of 8, 128 (def), 1k, 8k, 128k" << std::endl + << "-nNTPS use N (def = 10) topics in parallel" << std::endl + << std::endl + << "SOURCE OPTIONS:" << std::endl + << "-kKVAL use KVAL as key value instead of process id" << std::endl + << "-pIDX publish only on topic IDX" << std::endl + << "-sDELAY stagger: offset by DELAY ms (def = 0)" << std::endl + << "-x sleep 100us between successive writes" << std::endl + << std::endl + << "SINK OPTIONS:" << std::endl + << "-oFILE write latencies to FILE" << std::endl; + std::exit(1); +} + +static Type convert_typestr (const std::string& typestr) +{ + if (typestr == "8") { + return Type::T8; + } else if (typestr == "128") { + return Type::T128; + } else if (typestr == "1k") { + return Type::T1k; + } else if (typestr == "8k") { + return Type::T8k; + } else if (typestr == "128k") { + return Type::T128k; + } else { + std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl; + std::exit(1); + return Type::T128; + } +} + +static bool handle_common_opt (int opt) +{ + switch (opt) + { + case 'n': + ntopics = static_cast(std::atoi(optarg)); + return true; + case 't': + type = convert_typestr(std::string(optarg)); + return true; + default: + // not a common option + return false; + } +} + +int main (int argc, char **argv) +{ + if (argc < 2) + usage(); + const std::string modestr = std::string(argv[1]); + Mode mode; + if (modestr == "source") { + mode = Mode::Source; + } else if (modestr == "sink") { + mode = Mode::Sink; + } else { + std::cout << "invalid mode, should be source or sink" << std::endl; + return 1; + } + + const std::string common_opt = "n:t:"; + optind = 2; + int opt; + switch (mode) + { + case Mode::Source: + while ((opt = getopt (argc, argv, (common_opt + "k:p:s:x").c_str())) != EOF) + { + if (handle_common_opt (opt)) + continue; + switch (opt) + { + case 'k': + keyval = static_cast(std::atoi(optarg)); + break; + case 'p': + pubidx = static_cast(std::atoi(optarg)); + break; + case 's': + stagger = static_cast(std::atoi(optarg)); + break; + case 'x': + sleep_between_write = true; + break; + default: + usage(); + } + } + break; + case Mode::Sink: + while ((opt = getopt (argc, argv, (common_opt + "o:").c_str())) != EOF) + { + if (handle_common_opt (opt)) + continue; + switch (opt) + { + case 'o': + datafile = std::string(optarg); + break; + default: + usage(); + } + } + break; + } + if (pubidx.has_value() && pubidx.value() >= ntopics) + { + std::cout << "topic index for publishing out of range" << std::endl; + return 1; + } + if (argc - optind != 0) + { + usage(); + } + switch (type) + { + case Type::T8: run(mode); break; + case Type::T128: run(mode); break; + case Type::T1k: run(mode); break; + case Type::T8k: run(mode); break; + case Type::T128k: run(mode); break; + } + return 0; +} diff --git a/examples/hop/mop_type.idl b/examples/hop/mop_type.idl new file mode 100644 index 00000000..220398da --- /dev/null +++ b/examples/hop/mop_type.idl @@ -0,0 +1,48 @@ +/* + * Copyright(c) 2024 ZettaScale Technology and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +@final @topic +struct MopSync { + @key uint32 k; + int64 tstart; +}; + +@final @topic +struct Mop8 { + @key uint32 k; + uint32 seq; + //octet z[8 - 8]; +}; +@final @topic +struct Mop128 { + @key uint32 k; + uint32 seq; + octet z[128 - 8]; +}; +@final @topic +struct Mop1k { + @key uint32 k; + uint32 seq; + octet z[1024 - 8]; +}; +@final @topic +struct Mop8k { + @key uint32 k; + uint32 seq; + octet z[8*1024 - 8]; +}; +@final @topic +struct Mop128k { + @key uint32 k; + uint32 seq; + octet z[128*1024 - 8]; +}; diff --git a/examples/hop/runexp.bash b/examples/hop/runexp.bash new file mode 100644 index 00000000..5ad38812 --- /dev/null +++ b/examples/hop/runexp.bash @@ -0,0 +1,29 @@ +# using default 10 topics, 128 bytes + +# first arg: part of filename of output file +# second arg: if empty, no stagger; if non-empty: stagger by 1ms +# remaining args: passed on to source +run () { + logname=$1 ; shift + stagger=$1 ; shift + + # writing all this junk keeps my CPU awake (macOS, M1) + bin/hop sink -j100000 0 128 & + bin/hop source -j100000 0 128 & kpids=$! + sleep 2 + for i in {0..1} ; do + bin/mop sink -o mop-$logname-$i.txt & + done + for i in {0..1} ; do + s=$((2 * i)) + bin/mop source -k$i ${stagger:+-s$s} "$@" & kpids="$kpids $!" + done + sleep 10 + kill -INT $kpids + wait +} + +run s0x0 "" # both sources start at same time, no sleep between writes +run s0x1 "" -x # both sources start at same time, 100us sleep between writes +run s1x0 "s" # source 1 starts 1ms after source 1, no sleep between writes +run s1x1 "s" -x # source 1 starts 1ms after source 1, 100us sleep between writes