Skip to content

Commit

Permalink
fix the test on binpb
Browse files Browse the repository at this point in the history
  • Loading branch information
YanzhaoW committed Nov 20, 2024
1 parent e7750b2 commit b475f1d
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 69 deletions.
1 change: 1 addition & 0 deletions backend/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ auto main(int argc, char** argv) -> int
app.read_data();
app.switch_on();
app.start_analysis();
app.wait_for_finish();
}
catch (const CLI::ParseError& e)
{
Expand Down
11 changes: 5 additions & 6 deletions backend/srs/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace srs

App::~App() noexcept
{
io_work_guard_.reset();
if (working_thread_.joinable())
{
spdlog::debug("Application: Working thread is still running. Trying to stop the io context ...");
Expand All @@ -44,6 +43,7 @@ namespace srs
});
io_context_.join();
end_of_work();
spdlog::debug("Application: working thread is finished");
};
working_thread_ = std::jthread{ monitoring_action };
}
Expand Down Expand Up @@ -80,7 +80,9 @@ namespace srs
{
set_status_acq_off(true);
}
io_work_guard_.reset();
set_status_acq_on(false);
spdlog::debug("Application is exited");
}

void App::set_print_mode(DataPrintMode mode) { data_processor_->set_print_mode(mode); }
Expand Down Expand Up @@ -123,9 +125,6 @@ namespace srs
data_reader_->start();
}

void App::start_analysis()
{
data_processor_->start();
// working_thread_.join();
}
void App::start_analysis() { data_processor_->start(); }
void App::wait_for_finish() { working_thread_.join(); }
} // namespace srs
3 changes: 3 additions & 0 deletions backend/srs/Application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace srs
App& operator=(App&&) = delete;
~App() noexcept;

// public APIs
void configure_fec() {}
void switch_on();
void switch_off();
Expand All @@ -33,6 +34,7 @@ namespace srs
void notify_status_change() { status_.status_change.notify_all(); }
void start_analysis();
void exit();
void wait_for_finish();
void wait_for_status(auto&& condition, std::chrono::seconds time_duration = DEFAULT_STATUS_WAITING_TIME_SECONDS)
{
status_.wait_for_status(std::forward<decltype(condition)>(condition), time_duration);
Expand All @@ -52,6 +54,7 @@ namespace srs
// [[nodiscard]] auto get_fec_config() const -> const auto& { return fec_config_; }
[[nodiscard]] auto get_status() const -> const auto& { return status_; }
[[nodiscard]] auto get_io_context() -> auto& { return io_context_; }
auto get_data_reader() -> DataReader* { return data_reader_.get(); }

private:
using udp = asio::ip::udp;
Expand Down
4 changes: 2 additions & 2 deletions backend/srs/analysis/DataProcessManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace srs

if (is_stopped)
{
spdlog::info("Shutting down all data writers...");
spdlog::debug("Shutting down all data writers...");
}

auto make_writer_future = [&](auto& writer)
Expand Down Expand Up @@ -75,7 +75,7 @@ namespace srs
writers_.wait_for_finished();
if (is_stopped)
{
spdlog::debug("All data writers are shutdown.");
spdlog::info("All data writers are shutdown.");
}
}

Expand Down
18 changes: 13 additions & 5 deletions backend/srs/analysis/DataProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ namespace srs
spdlog::trace("Try to stop data monitor");
monitor_.stop();
data_queue_.abort();
spdlog::info("Stopping analysis loop ...");
data_processes_.stop();
spdlog::info("Analysis loop is stopped");
// spdlog::info("Stopping analysis loop ...");
// data_processes_.stop();
// spdlog::info("Analysis loop is stopped");
}

spdlog::trace("Data processor is stopped");
}

void DataProcessor::read_data_once(std::span<BufferElementType> read_data)
Expand All @@ -128,9 +130,14 @@ namespace srs
spdlog::trace("entering analysis loop");
// TODO: Use direct binary data

while (not is_stopped.load())
while (true)
{
data_processes_.analysis_one(data_queue_);
if (is_stopped.load())
{
data_processes_.analysis_one(data_queue_, true);
break;
}
data_processes_.analysis_one(data_queue_, false);
update_monitor();
print_data();

Expand All @@ -146,6 +153,7 @@ namespace srs
spdlog::critical("Exception occured: {}", ex.what());
app_->exit();
}
spdlog::debug("Analysis loop is stopped");
}

void DataProcessor::update_monitor()
Expand Down
2 changes: 1 addition & 1 deletion backend/srs/converters/RawToDelimRawConveter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace srs
auto size = static_cast<SizeType>(input.size());
output.reserve(size + sizeof(size));
auto deserialize_to = zpp::bits::out{ output, zpp::bits::append{}, zpp::bits::endian::big{} };
deserialize_to(size, input).or_throw();
deserialize_to(size, zpp::bits::unsized(input)).or_throw();
}
};
} // namespace srs
16 changes: 13 additions & 3 deletions backend/srs/readers/RawFrameReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

#include <algorithm>
#include <fstream>
#include <print>
#include <srs/converters/RawToDelimRawConveter.hpp>
#include <srs/utils/CommonDefitions.hpp>
#include <string>
#include <vector>
#include <print>
#include <zpp_bits.h>

namespace srs
{
Expand Down Expand Up @@ -35,15 +36,23 @@ namespace srs
auto read_one_frame() -> std::string_view
{
std::ranges::fill(input_buffer_, 0);
std::ranges::fill(size_buffer_, 0);
auto size = Raw2DelimRawConverter::SizeType{};

if (input_file_.eof())
{
spdlog::info("End of the binary file.");
return {};
}

std::println("==================passing here 3");
input_file_ >> size;
input_file_.read(size_buffer_.data(), static_cast<int64_t>(size_buffer_.size()));
auto serialize_to = zpp::bits::in{ size_buffer_, zpp::bits::endian::big{} };
serialize_to(size).or_throw();
if (size > input_buffer_.size())
{
spdlog::critical("Read buffer size too small for the frame.");
return {};
}
auto read_size = static_cast<std::size_t>(input_file_.read(input_buffer_.data(), size).gcount());

if (read_size != size)
Expand All @@ -57,6 +66,7 @@ namespace srs
private:
std::string input_filename_;
std::ifstream input_file_;
std::array<char, sizeof(Raw2DelimRawConverter::SizeType)> size_buffer_{};
std::array<char, LARGE_READ_MSG_BUFFER_SIZE> input_buffer_{};
};
} // namespace srs
28 changes: 17 additions & 11 deletions backend/srs/utils/ConnectionBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ namespace srs

// possible overload from derived class
void read_data_handle(std::span<BufferElementType> read_data) {}
void close() {}
void close() { close_socket(); }
void on_fail() { spdlog::debug("default on_fail is called!"); }
auto get_executor() { return app_->get_io_context().get_executor(); }

void listen(this auto&& self, bool is_continuous = false);
void communicate(this auto&& self, const std::vector<CommunicateEntryType>& data, uint16_t address);
void close_socket();

auto send_continuous_message() -> asio::experimental::coro<int(std::optional<std::string_view>)>;

Expand All @@ -60,9 +59,11 @@ namespace srs

protected:
auto new_shared_socket(int port_number) -> std::unique_ptr<udp::socket>;
void close_socket();

private:
int local_port_number_ = 0;
std::atomic<bool> is_socket_closed_{ false };
uint32_t counter_ = INIT_COUNT_VALUE;
std::string name_ = "ConnectionBase";
gsl::not_null<App*> app_;
Expand Down Expand Up @@ -108,7 +109,7 @@ namespace srs

if (not msg.has_value())
{
close_socket();
close();
co_return;
}
else
Expand All @@ -124,13 +125,15 @@ namespace srs
spdlog::debug("Connection {}: starting to listen ...", connection->get_name());
while (true)
{
if (not connection->socket_->is_open())
if (not connection->socket_->is_open() or connection->is_socket_closed_.load())
{
break;
co_return;
}

spdlog::trace("reading messages ...");
auto receive_data_size = co_await connection->socket_->async_receive(
asio::buffer(connection->read_msg_buffer_), asio::use_awaitable);
spdlog::trace("Messages received");
auto read_msg = std::span{ connection->read_msg_buffer_.data(), receive_data_size };
connection->read_data_handle(read_msg);
// spdlog::info("Connection {}: received {} bytes data", connection->get_name(), read_msg.size());
Expand Down Expand Up @@ -169,14 +172,15 @@ namespace srs
auto interrupt_signal = asio::signal_set(co_await asio::this_coro::executor, SIGINT);
spdlog::trace("Connection {}: waiting for signals", connection->get_name());
auto [error, sig_num] = co_await interrupt_signal.async_wait(asio::as_tuple(asio::use_awaitable));
if (error)
if (error == asio::error::operation_aborted)
{
spdlog::trace("Connection {}: Signal ended with {}", connection->get_name(), error.message());
}
else
{
fmt::print("\n");
spdlog::trace("Connection {}: Signal ID {} is called!", connection->get_name(), sig_num);
spdlog::trace(
"Connection {}: Signal ID {} is called with {:?}!", connection->get_name(), sig_num, error.message());
connection->close();
}
}
Expand All @@ -186,9 +190,11 @@ namespace srs
{
auto socket = std::make_unique<udp::socket>(
app_->get_io_context(), udp::endpoint{ udp::v4(), static_cast<asio::ip::port_type>(port_number) });
spdlog::trace("Openning the socket from ip: {} with port: {}",
spdlog::debug("Connection {}: Openning the socket from ip: {} with port: {}",
name_,
socket->local_endpoint().address().to_string(),
socket->local_endpoint().port());
local_port_number_ = socket->local_endpoint().port();
return socket;
}

Expand All @@ -204,7 +210,6 @@ namespace srs
void ConnectionBase<size>::listen(this auto&& self, bool is_continuous)
{
using asio::experimental::awaitable_operators::operator||;
spdlog::debug("Connection {}: creating socket with local port number: {}", self.name_, self.local_port_number_);
if (self.socket_ == nullptr)
{
self.socket_ = self.new_shared_socket(self.local_port_number_);
Expand Down Expand Up @@ -233,12 +238,13 @@ namespace srs
template <int size>
void ConnectionBase<size>::close_socket()
{
if (socket_->is_open())
if (not is_socket_closed_.load())
{
is_socket_closed_.store(true);
spdlog::trace("Connection {}: Closing the socket ...", name_);
socket_->cancel();
socket_->close();
spdlog::trace("Connection {}: Socket is closed.", name_);
spdlog::trace("Connection {}: Socket is closed and cancelled.", name_);
}
}
} // namespace srs
6 changes: 3 additions & 3 deletions backend/srs/utils/Connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ namespace srs
{
public:
Stopper(const Stopper&) = delete;
Stopper(Stopper&&) = default;
Stopper(Stopper&&) = delete;
Stopper& operator=(const Stopper&) = delete;
Stopper& operator=(Stopper&&) = default;
Stopper& operator=(Stopper&&) = delete;

explicit Stopper(const ConnectionInfo& info)
: ConnectionBase(info, "Stopper")
Expand All @@ -50,7 +50,7 @@ namespace srs

static void on_fail() { spdlog::debug("on_fail of stopper is called"); }
void acq_off();
void close() {}
// void close() {}
};

class DataReader : public ConnectionBase<LARGE_READ_MSG_BUFFER_SIZE>
Expand Down
1 change: 1 addition & 0 deletions backend/srs/writers/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <boost/thread/future.hpp>
#include <map>
#include <spdlog/spdlog.h>
#include <srs/converters/DataConvertOptions.hpp>
#include <srs/data/SRSDataStructs.hpp>
#include <srs/writers/DataWriterOptions.hpp>
Expand Down
2 changes: 1 addition & 1 deletion backend/srs/writers/UDPWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace srs
}

// INFO: this will be called in coroutine
void close() { connection_.close_socket(); }
void close() { connection_.close(); }

// Getters:
auto get_local_socket() -> const auto& { return connection_.get_socket(); }
Expand Down
5 changes: 4 additions & 1 deletion test/backend/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
enable_testing()

add_executable(srs_backend_test test_main.cpp)
add_executable(srs_backend_test)
target_sources(srs_backend_test PRIVATE test_main.cpp)
target_link_libraries(
srs_backend_test
PRIVATE GTest::gtest_main
Expand All @@ -15,3 +16,5 @@ target_link_libraries(

target_include_directories(srs_backend_test PRIVATE ${CMAKE_SOURCE_DIR}/test/backend
${CMAKE_SOURCE_DIR}/backend)

add_subdirectory(srs)
1 change: 1 addition & 0 deletions test/backend/srs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target_sources(srs_backend_test PRIVATE TestOutputs.cpp)
7 changes: 3 additions & 4 deletions test/backend/srs/SRSEmulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
#include <boost/asio.hpp>
#include <boost/thread/future.hpp>
#include <optional>
#include <print>
#include <srs/readers/RawFrameReader.hpp>
#include <srs/utils/CommonAlias.hpp>
#include <srs/writers/UDPWriter.hpp>
#include <string>
#include <print>

namespace srs::test
{
class SRSEmulator
{
public:
explicit SRSEmulator(std::string_view filename, App& app)
explicit SRSEmulator(std::string_view filename, int port, App& app)
: source_filename_{ filename }
, frame_reader_{ source_filename_ }
, udp_writer_{ app, asio::ip::udp::endpoint{ udp::v4(), 0 } }
, udp_writer_{ app, asio::ip::udp::endpoint{ udp::v4(), static_cast<asio::ip::port_type>(port) } }
{
}

Expand All @@ -30,7 +30,6 @@ namespace srs::test
{
return;
}
std::println("data: {}", read_str);
auto send_fut = boost::async([read_str]() { return std::optional<std::string_view>{ read_str }; });
udp_writer_.write(std::move(send_fut)).get();
}
Expand Down
Loading

0 comments on commit b475f1d

Please sign in to comment.