From 2b6403d8b83704d35894b810764fdf090c1a34ff Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Thu, 14 Nov 2024 21:49:51 +0000 Subject: [PATCH] Reload filecache if offsets missing --- src/gribjump/info/InfoCache.cc | 45 +++++++++++++++++++++++++--------- src/gribjump/info/InfoCache.h | 5 +++- tests/test_plugin.cc | 14 ++++++++++- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 8eb7a94..7ed2647 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -110,7 +110,7 @@ std::shared_ptr InfoCache::get(const eckit::URI& uri) { std::shared_ptr InfoCache::get(const eckit::PathName& path, const eckit::Offset offset) { std::shared_ptr filecache = getFileCache(path); - filecache->load(); + filecache->reloadMissing({offset}); // return it if in memory cache { @@ -118,7 +118,6 @@ std::shared_ptr InfoCache::get(const eckit::PathName& path, const ecki if (info) return info; LOG_DEBUG_LIB(LibGribJump) << "InfoCache file " << path << " does not contain JumpInfo for field at offset " << offset << std::endl; - } // Extract explicitly @@ -137,15 +136,7 @@ std::shared_ptr InfoCache::get(const eckit::PathName& path, const ecki std::vector> InfoCache::get(const eckit::PathName& path, const eckit::OffsetList& offsets) { std::shared_ptr filecache = getFileCache(path); - filecache->load(); - - std::vector missingOffsets; - - for (const auto& offset : offsets) { - if (!filecache->find(offset)) { - missingOffsets.push_back(offset); - } - } + std::vector missingOffsets = filecache->reloadMissing(offsets); if (!missingOffsets.empty()) { if (!lazy_) { @@ -297,6 +288,12 @@ void FileCache::load() { loaded_ = true; } +// e.g. if the file on disk has been updated +void FileCache::reload() { + clear(); + load(); +} + void FileCache::encode(eckit::Stream& s) { std::lock_guard lock(mutex_); @@ -411,6 +408,32 @@ std::shared_ptr FileCache::find(eckit::Offset offset) { return nullptr; } +// Reload the cache if any are missing offsets e.g. because the file has been updated +std::vector FileCache::reloadMissing(const eckit::OffsetList& offsets) { + + // Check if any are missing and reload if necessary + size_t j=offsets.size(); + for (size_t i = 0; i < offsets.size(); i++) { + const auto& offset = offsets[i]; + if (!find(offset)) { + reload(); + j=i; + break; + } + } + + // Find if any are still missing. We assume previously found offsets have not been removed. + std::vector missingOffsets; + for (size_t i = j; i < offsets.size(); i++) { + const auto& offset = offsets[i]; + if (!find(offset)) { + missingOffsets.push_back(offset); + } + } + + return missingOffsets; +} + size_t FileCache::count() { std::lock_guard lock(mutex_); return map_.size(); diff --git a/src/gribjump/info/InfoCache.h b/src/gribjump/info/InfoCache.h index b8de63e..b695a5e 100644 --- a/src/gribjump/info/InfoCache.h +++ b/src/gribjump/info/InfoCache.h @@ -99,9 +99,12 @@ class FileCache { ~FileCache(); void load(); + void reload(); void print(std::ostream& s); bool loaded() const { return loaded_; } + std::vector reloadMissing(const eckit::OffsetList& offsets); + // For tests only size_t size() const { return map_.size(); } @@ -140,7 +143,7 @@ class FileCache { eckit::PathName path_; bool loaded_ = false; - std::mutex mutex_; //< mutex for map_ + mutable std::mutex mutex_; //< mutex for map_ infomap_t map_; }; diff --git a/tests/test_plugin.cc b/tests/test_plugin.cc index 1696af9..b16bb56 100644 --- a/tests/test_plugin.cc +++ b/tests/test_plugin.cc @@ -35,6 +35,7 @@ namespace test { // See also tests/tools/callback_vs_scan.sh.in CASE( "test_plugin" ){ + // --- Setup ------------------------------------------------------------------------------------------ // write the test_plugin.yaml file std::string s1 = eckit::LocalPathName::cwd(); eckit::PathName configPath(s1.c_str()); @@ -70,6 +71,8 @@ CASE( "test_plugin" ){ eckit::PathName gribName = "extract_ranges.grib"; // contains 3 messages, expver=xxxx, step=1,2,3. Expect 2 messages selected by the regex + // --- Write fdb data --------------------------------------------------------------------------------- + fdb5::FDB fdb; // callback should be automatically registered fdb.archive(*gribName.fileHandle()); fdb.flush(); @@ -77,6 +80,7 @@ CASE( "test_plugin" ){ fdb.archive(*gribName.fileHandle()); fdb.flush(); + // --- Test ------------------------------------------------------------------------------------------- // Look at the gribjump files in tmpdir std::vector files; std::vector dir; @@ -88,7 +92,15 @@ CASE( "test_plugin" ){ std::cout << file << std::endl; FileCache cache = FileCache(file); cache.print(std::cout); - EXPECT(cache.size() == 4); // match 2 messages, twice. + EXPECT_EQUAL(cache.size(), 4); // match 2 messages, twice. + + // Check what happens when a cached file is modified + fdb.archive(*gribName.fileHandle()); + fdb.flush(); + EXPECT_EQUAL(cache.size(), 4); // does not autoreload + cache.reload(); + EXPECT_EQUAL(cache.size(), 6); + } EXPECT(count == 1 ); // because we should be appending to the same file }