Skip to content

Commit

Permalink
Merge pull request #56 from art-daq/eflumerf/RegisterMethods
Browse files Browse the repository at this point in the history
Add method to register and unregister as a reader or writer
  • Loading branch information
ron003 authored Jan 22, 2025
2 parents c30d054 + 76f7627 commit 5f0bd5a
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 36 deletions.
8 changes: 8 additions & 0 deletions artdaq-core/Core/SharedMemoryEventReceiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ artdaq::SharedMemoryEventReceiver::SharedMemoryEventReceiver(uint32_t shm_key, u
, broadcasts_(broadcast_shm_key)
{
TLOG(TLVL_DEBUG + 33) << "SharedMemoryEventReceiver CONSTRUCTOR";
data_.RegisterReader();
broadcasts_.RegisterReader();
}

artdaq::SharedMemoryEventReceiver::~SharedMemoryEventReceiver() noexcept
{
data_.UnregisterReader();
broadcasts_.UnregisterReader();
};

bool artdaq::SharedMemoryEventReceiver::ReadyForRead(bool broadcast, size_t timeout_us)
{
TLOG(TLVL_DEBUG + 33) << "ReadyForRead BEGIN timeout_us=" << timeout_us;
Expand Down
2 changes: 1 addition & 1 deletion artdaq-core/Core/SharedMemoryEventReceiver.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public:
/**
* \brief SharedMemoryEventReceiver Destructor
*/
virtual ~SharedMemoryEventReceiver() = default;
virtual ~SharedMemoryEventReceiver() noexcept;

/**
* \brief Determine whether an event is available for reading
Expand Down
6 changes: 6 additions & 0 deletions artdaq-core/Core/SharedMemoryFragmentManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ artdaq::SharedMemoryFragmentManager::SharedMemoryFragmentManager(uint32_t shm_ke
: SharedMemoryManager(shm_key, buffer_count, max_buffer_size, buffer_timeout_us)
, active_buffer_(-1)
{
RegisterWriter();
}

artdaq::SharedMemoryFragmentManager::~SharedMemoryFragmentManager() noexcept
{
UnregisterWriter();
};

bool artdaq::SharedMemoryFragmentManager::ReadyForWrite(bool overwrite)
{
TLOG(TLVL_DEBUG + 40) << "ReadyForWrite: active_buffer is " << active_buffer_;
Expand Down
2 changes: 1 addition & 1 deletion artdaq-core/Core/SharedMemoryFragmentManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public:
/**
* \brief SharedMemoryFragmentManager destructor
*/
virtual ~SharedMemoryFragmentManager() = default;
virtual ~SharedMemoryFragmentManager() noexcept;
SharedMemoryFragmentManager(SharedMemoryFragmentManager const&) = delete; ///< Copy Constructor is deleted
SharedMemoryFragmentManager(SharedMemoryFragmentManager&&) = delete; ///< Move Constructor is deleted
SharedMemoryFragmentManager& operator=(SharedMemoryFragmentManager const&) = delete; ///< Copy Assignment Operator is deleted
Expand Down
39 changes: 5 additions & 34 deletions artdaq-core/Core/SharedMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,7 @@ bool artdaq::SharedMemoryManager::Attach(size_t timeout_usec)
int artdaq::SharedMemoryManager::GetBufferForReading()
{
TLOG(TLVL_GETBUFFER) << "GetBufferForReading BEGIN";

if (!registered_reader_)
{
shm_ptr_->reader_count++;
registered_reader_ = true;
}
RegisterReader();

TLOG(TLVL_GETBUFFER) << "Scanning " << shm_ptr_->buffer_count << " buffers";

Expand Down Expand Up @@ -330,7 +325,7 @@ int artdaq::SharedMemoryManager::GetBufferForReading()
if (semaphore.flags == BufferSemaphoreFlags::Full && (semaphore.id == -1 || semaphore.id == manager_id_) && (shm_ptr_->destructive_read_mode || sequence_id > last_seen_id_))
{
TLOG(TLVL_GETBUFFER + 1) << "ID " << manager_id_ << " Buffer " << buffer_num << ": sem=" << FlagToString(semaphore.flags)
<< " (looking for " << FlagToString(BufferSemaphoreFlags::Full) << "), sem_id=" << semaphore.id << ", seq_id=" << sequence_id << ", last_seen_id_=" << last_seen_id_;
<< " (looking for " << FlagToString(BufferSemaphoreFlags::Full) << "), sem_id=" << semaphore.id << ", seq_id=" << sequence_id << ", last_seen_id_=" << last_seen_id_ << ", reader_count=" << reader_count;
// Claim the buffer if it is in my sequence, I haven't claimed buffers before, or if we are in Broadcast mode
if (last_seen_id_ == 0 || !shm_ptr_->destructive_read_mode || sequence_id % reader_count == last_seen_id_ % reader_count || sequence_id + reader_count < last_seen_id_)
{
Expand Down Expand Up @@ -374,11 +369,7 @@ int artdaq::SharedMemoryManager::GetBufferForWriting(bool overwrite)
{
TLOG(TLVL_GETBUFFER + 1) << "GetBufferForWriting BEGIN, overwrite=" << (overwrite ? "true" : "false");

if (!registered_writer_)
{
shm_ptr_->writer_count++;
registered_writer_ = true;
}
RegisterWriter();

auto wp = shm_ptr_->writer_pos.load();

Expand Down Expand Up @@ -517,12 +508,6 @@ size_t artdaq::SharedMemoryManager::ReadReadyCount()
return 0;
}

if (!registered_reader_)
{
shm_ptr_->reader_count++;
registered_reader_ = true;
}

TLOG(TLVL_READREADY) << std::hex << std::showbase << shm_key_ << " ReadReadyCount BEGIN" << std::dec;
TLOG(TLVL_READREADY) << "ReadReadyCount scanning " << shm_ptr_->buffer_count << " buffers";
size_t count = 0;
Expand Down Expand Up @@ -595,12 +580,6 @@ bool artdaq::SharedMemoryManager::ReadyForRead()
return false;
}

if (!registered_reader_)
{
shm_ptr_->reader_count++;
registered_reader_ = true;
}

TLOG(TLVL_READREADY) << std::hex << std::showbase << shm_key_ << " ReadyForRead BEGIN" << std::dec;

auto rp = shm_ptr_->reader_pos.load();
Expand Down Expand Up @@ -1253,16 +1232,8 @@ void artdaq::SharedMemoryManager::Detach(bool throwException, const std::string&
shmBuf->semaphore.compare_exchange_strong(semaphore, release); // Ignoring return code in Detach
}
}
if (registered_reader_)
{
shm_ptr_->reader_count--;
registered_reader_ = false;
}
if (registered_writer_)
{
shm_ptr_->writer_count--;
registered_writer_ = false;
}
UnregisterReader();
UnregisterWriter();
}

if (shm_ptr_ != nullptr)
Expand Down
36 changes: 36 additions & 0 deletions artdaq-core/Core/SharedMemoryManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,42 @@ public:
if (manager_id_ == 0 && IsValid()) shm_ptr_->next_id = 1;
}

void RegisterReader()
{
if (IsValid() && !registered_reader_)
{
shm_ptr_->reader_count++;
registered_reader_ = true;
}
}

void RegisterWriter()
{
if (IsValid() && !registered_writer_)
{
shm_ptr_->writer_count++;
registered_writer_ = true;
}
}

void UnregisterReader()
{
if (IsValid() && registered_reader_)
{
shm_ptr_->reader_count--;
registered_reader_ = false;
}
}

void UnregisterWriter()
{
if (IsValid() && registered_writer_)
{
shm_ptr_->writer_count--;
registered_writer_ = false;
}
}

/**
* \brief Get the ID number of the current SharedMemoryManager
* \return The ID number of the current SharedMemoryManager
Expand Down
2 changes: 2 additions & 0 deletions test/Core/SharedMemoryManager_t.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ BOOST_AUTO_TEST_CASE(RoundRobin)
size_t misses_after_start = 0;
size_t last_read_id = 0;
artdaq::SharedMemoryManager reader_man(key);
reader_man.RegisterReader();
auto my_id = static_cast<size_t>(reader_man.GetMyId() - 1);

TLOG(TLVL_INFO) << "Reader " << my_id << " waiting for other readers..." << reader_man.GetReaderCount();
Expand Down Expand Up @@ -402,6 +403,7 @@ BOOST_AUTO_TEST_CASE(RoundRobin)
}
}
TLOG(TLVL_INFO) << "Reader " << my_id << " read " << counter << " buffers, " << ooo_counter << " of them were out of round-robin order, and " << misses_after_start << " times there was no data available";
reader_man.UnregisterReader();
};

auto writer_proc = [&man, n_writes]() {
Expand Down

0 comments on commit 5f0bd5a

Please sign in to comment.