Skip to content

Commit

Permalink
CCDB semaphore improvements
Browse files Browse the repository at this point in the history
- some function renaming
- better code modularity (factor out function to determine semaphore name)
- offer functions to remove a leaking CCDB semaphore from the system
- offer a function that can check for leaking semaphores given
  an existing CCDB cache directory
- offer a utility with which we can remove CCDB semaphores from the command line
  • Loading branch information
sawenzel committed Jan 26, 2024
1 parent 61ebac1 commit 81d66fb
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 29 deletions.
5 changes: 5 additions & 0 deletions CCDB/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ o2_add_executable(upload
SOURCES src/UploadTool.cxx
PUBLIC_LINK_LIBRARIES O2::CCDB)

o2_add_executable(cleansemaphores
COMPONENT_NAME ccdb
SOURCES src/CleanCCDBSemaphores.cxx
PUBLIC_LINK_LIBRARIES O2::CCDB)

o2_add_executable(downloadccdbfile
COMPONENT_NAME ccdb
SOURCES src/DownloadCCDBFile.cxx
Expand Down
10 changes: 8 additions & 2 deletions CCDB/include/CCDB/CcdbApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,14 @@ class CcdbApi //: public DatabaseInterface
void getFromSnapshot(bool createSnapshot, std::string const& path,
long timestamp, std::map<std::string, std::string>& headers,
std::string& snapshotpath, o2::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const;
void releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string path) const;
boost::interprocess::named_semaphore* createNamedSempahore(std::string path) const;
void releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string const& path) const;
boost::interprocess::named_semaphore* createNamedSemaphore(std::string const& path) const;
static std::string determineSemaphoreName(std::string const& basedir, std::string const& objectpath);
// queries and optionally removes a named semaphore from the system
// returns true when successful (either found or found + removed)
static bool removeSemaphore(std::string const& name, bool remove = false);
static void removeLeakingSemaphores(std::string const& basedir, bool remove = false);

void loadFileToMemory(o2::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders = nullptr) const;
void loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& path,
std::map<std::string, std::string> const& metadata, long timestamp,
Expand Down
96 changes: 69 additions & 27 deletions CCDB/src/CcdbApi.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ void CcdbApi::init(std::string const& host)

std::string snapshotReport{};
const char* cachedir = getenv("ALICEO2_CCDB_LOCALCACHE");
namespace fs = std::filesystem;
if (cachedir) {
if (cachedir[0] == 0) {
mSnapshotCachePath = ".";
mSnapshotCachePath = fs::canonical(fs::absolute("."));
} else {
mSnapshotCachePath = cachedir;
mSnapshotCachePath = fs::canonical(fs::absolute(cachedir));
}
snapshotReport = fmt::format("(cache snapshots to dir={}", mSnapshotCachePath);
}
Expand Down Expand Up @@ -1017,15 +1018,7 @@ void* CcdbApi::retrieveFromTFile(std::type_info const& tinfo, std::string const&
{
if (!mSnapshotCachePath.empty()) {
// protect this sensitive section by a multi-process named semaphore
boost::interprocess::named_semaphore* sem = nullptr;
std::hash<std::string> hasher;
const auto semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16);
try {
sem = new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
} catch (std::exception e) {
LOG(warn) << "Exception occurred during CCDB (cache) semaphore setup; Continuing without";
sem = nullptr;
}
boost::interprocess::named_semaphore* sem = createNamedSemaphore(path);
if (sem) {
sem->wait(); // wait until we can enter (no one else there)
}
Expand All @@ -1046,14 +1039,7 @@ void* CcdbApi::retrieveFromTFile(std::type_info const& tinfo, std::string const&
} else {
out << "CCDB-access[" << getpid() << "] ... " << mUniqueAgentID << "serving from local snapshot " << snapshotfile << "\n";
}
if (sem) {
sem->post();
if (sem->try_wait()) {
// if nobody else is waiting remove the semaphore resource
sem->post();
boost::interprocess::named_semaphore::remove(semhashedstring.c_str());
}
}
releaseNamedSemaphore(sem, path);
auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
if (!snapshoting) { // if snapshot was created at this call, the log was already done
logReading(path, timestamp, headers, "retrieve from snapshot");
Expand Down Expand Up @@ -1560,10 +1546,17 @@ void CcdbApi::scheduleDownload(RequestContext& requestContext, size_t* requestCo
asynchPerform(curl_handle, requestCounter);
}

boost::interprocess::named_semaphore* CcdbApi::createNamedSempahore(std::string path) const
std::string CcdbApi::determineSemaphoreName(std::string const& basedir, std::string const& ccdbpath)
{
std::hash<std::string> hasher;
std::string semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16);
std::string semhashedstring = "aliceccdb" + std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
return semhashedstring;
}

boost::interprocess::named_semaphore* CcdbApi::createNamedSemaphore(std::string const& path) const
{
std::string semhashedstring = determineSemaphoreName(mSnapshotCachePath, path);
// LOG(info) << "Creating named semaphore with name " << semhashedstring.c_str();
try {
return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
} catch (std::exception e) {
Expand All @@ -1572,15 +1565,64 @@ boost::interprocess::named_semaphore* CcdbApi::createNamedSempahore(std::string
}
}

void CcdbApi::releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string path) const
void CcdbApi::releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string const& path) const
{
if (sem) {
sem->post();
if (sem->try_wait()) { // if nobody else is waiting remove the semaphore resource
sem->post();
std::hash<std::string> hasher;
std::string semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16);
boost::interprocess::named_semaphore::remove(semhashedstring.c_str());
boost::interprocess::named_semaphore::remove(determineSemaphoreName(mSnapshotCachePath, path).c_str());
}
}
}

bool CcdbApi::removeSemaphore(std::string const& semaname, bool remove)
{
// removes a given named semaphore from the system
try {
boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
std::cout << "Found CCDB semaphore: " << semaname << "\n";
if (remove) {
auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
if (success) {
std::cout << "Removed CCDB semaphore: " << semaname << "\n";
}
return success;
}
return true;
} catch (std::exception const& e) {
// no EXISTING under this name semaphore found
// nothing to be done
}
return false;
}

// helper function checking for leaking semaphores associated to CCDB cache files and removing them
// walks a local CCDB snapshot tree and checks
void CcdbApi::removeLeakingSemaphores(std::string const& snapshotdir, bool remove)
{
namespace fs = std::filesystem;
std::string fileName{"snapshot.root"};
auto absolutesnapshotdir = fs::canonical(fs::absolute(snapshotdir));
for (const auto& entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
if (entry.is_directory()) {
const fs::path& currentDir = fs::canonical(fs::absolute(entry.path()));
fs::path filePath = currentDir / fileName;
if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
std::cout << "Directory with file '" << fileName << "': " << currentDir << std::endl;

// we need to obtain the path relative to snapshotdir
auto pathtokens = o2::utils::Str::tokenize(currentDir, '/', true);
auto numtokens = pathtokens.size();
if (numtokens < 3) {
// cannot be a CCDB path
continue;
}
// path are last 3 entries
std::string path = pathtokens[numtokens - 3] + "/" + pathtokens[numtokens - 2] + "/" + pathtokens[numtokens - 1];
auto semaname = o2::ccdb::CcdbApi::determineSemaphoreName(absolutesnapshotdir, path);
removeSemaphore(semaname, remove);
}
}
}
}
Expand Down Expand Up @@ -1614,7 +1656,7 @@ void CcdbApi::saveSnapshot(RequestContext& requestContext) const
{
// Consider saving snapshot
if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) { // store in the snapshot only if the object was not read from the snapshot
auto sem = createNamedSempahore(requestContext.path);
auto sem = createNamedSemaphore(requestContext.path);
if (sem) {
sem->wait(); // wait until we can enter (no one else there)
}
Expand Down Expand Up @@ -1665,7 +1707,7 @@ void CcdbApi::navigateSourcesAndLoadFile(RequestContext& requestContext, int& fr

std::string snapshotpath;
if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.path))) {
boost::interprocess::named_semaphore* sem = createNamedSempahore(requestContext.path);
boost::interprocess::named_semaphore* sem = createNamedSemaphore(requestContext.path);
if (sem) {
sem->wait(); // wait until we can enter (no one else there)
}
Expand Down
62 changes: 62 additions & 0 deletions CCDB/src/CleanCCDBSemaphores.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "CCDB/CcdbApi.h"
#include <iostream>
#include <boost/program_options.hpp>

namespace bpo = boost::program_options;

bool initOptionsAndParse(bpo::options_description& options, int argc, char* argv[], bpo::variables_map& vm)
{
options.add_options()(
"cachepath,p", bpo::value<std::string>()->default_value("ccdb"), "path to whole CCDB cache dir as a basis for semaphore search")(
"sema,s", bpo::value<std::string>()->default_value(""), "Specific named semaphore to be remove")(
"help,h", "Produce help message.");

try {
bpo::store(parse_command_line(argc, argv, options), vm);
// help
if (vm.count("help")) {
std::cout << options << std::endl;
return false;
}
bpo::notify(vm);
} catch (const bpo::error& e) {
std::cerr << e.what() << "\n\n";
std::cerr << "Error parsing command line arguments; Available options:\n";

std::cerr << options << std::endl;
return false;
}
return true;
}

// A simple tool to clean CCDB related semaphores
int main(int argc, char* argv[])
{
bpo::options_description options("Tool to find and remove leaking CCDB semaphore from the system");
bpo::variables_map vm;
if (!initOptionsAndParse(options, argc, argv, vm)) {
return 1;
}

std::string sema = vm["sema"].as<std::string>();
if (sema.size() > 0) {
if (o2::ccdb::CcdbApi::removeSemaphore(sema, true)) {
std::cout << "Successfully removed " << sema << "\n";
}
}

std::string path = vm["cachepath"].as<std::string>();
o2::ccdb::CcdbApi::removeLeakingSemaphores(path, true);
return 0;
}

0 comments on commit 81d66fb

Please sign in to comment.