Skip to content

Commit

Permalink
Reload filecache if offsets missing
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 14, 2024
1 parent e584f7e commit 2b6403d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
45 changes: 34 additions & 11 deletions src/gribjump/info/InfoCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ std::shared_ptr<JumpInfo> InfoCache::get(const eckit::URI& uri) {
std::shared_ptr<JumpInfo> InfoCache::get(const eckit::PathName& path, const eckit::Offset offset) {

std::shared_ptr<FileCache> filecache = getFileCache(path);
filecache->load();
filecache->reloadMissing({offset});

// return it if in memory cache
{
std::shared_ptr<JumpInfo> info = filecache->find(offset);
if (info) return info;

LOG_DEBUG_LIB(LibGribJump) << "InfoCache file " << path << " does not contain JumpInfo for field at offset " << offset << std::endl;

}

// Extract explicitly
Expand All @@ -137,15 +136,7 @@ std::shared_ptr<JumpInfo> InfoCache::get(const eckit::PathName& path, const ecki
std::vector<std::shared_ptr<JumpInfo>> InfoCache::get(const eckit::PathName& path, const eckit::OffsetList& offsets) {

std::shared_ptr<FileCache> filecache = getFileCache(path);
filecache->load();

std::vector<eckit::Offset> missingOffsets;

for (const auto& offset : offsets) {
if (!filecache->find(offset)) {
missingOffsets.push_back(offset);
}
}
std::vector<eckit::Offset> missingOffsets = filecache->reloadMissing(offsets);

if (!missingOffsets.empty()) {
if (!lazy_) {
Expand Down Expand Up @@ -297,6 +288,12 @@ void FileCache::load() {
loaded_ = true;
}

// e.g. if the file on disk has been updated
void FileCache::reload() {
clear();
load();
}

void FileCache::encode(eckit::Stream& s) {
std::lock_guard<std::mutex> lock(mutex_);

Expand Down Expand Up @@ -411,6 +408,32 @@ std::shared_ptr<JumpInfo> FileCache::find(eckit::Offset offset) {
return nullptr;
}

// Reload the cache if any are missing offsets e.g. because the file has been updated
std::vector<eckit::Offset> FileCache::reloadMissing(const eckit::OffsetList& offsets) {

// Check if any are missing and reload if necessary
size_t j=offsets.size();
for (size_t i = 0; i < offsets.size(); i++) {
const auto& offset = offsets[i];
if (!find(offset)) {
reload();
j=i;
break;
}
}

// Find if any are still missing. We assume previously found offsets have not been removed.
std::vector<eckit::Offset> missingOffsets;
for (size_t i = j; i < offsets.size(); i++) {
const auto& offset = offsets[i];
if (!find(offset)) {
missingOffsets.push_back(offset);
}
}

return missingOffsets;
}

size_t FileCache::count() {
std::lock_guard<std::mutex> lock(mutex_);
return map_.size();
Expand Down
5 changes: 4 additions & 1 deletion src/gribjump/info/InfoCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ class FileCache {
~FileCache();

void load();
void reload();
void print(std::ostream& s);
bool loaded() const { return loaded_; }

std::vector<eckit::Offset> reloadMissing(const eckit::OffsetList& offsets);

// For tests only
size_t size() const { return map_.size(); }

Expand Down Expand Up @@ -140,7 +143,7 @@ class FileCache {

eckit::PathName path_;
bool loaded_ = false;
std::mutex mutex_; //< mutex for map_
mutable std::mutex mutex_; //< mutex for map_
infomap_t map_;
};

Expand Down
14 changes: 13 additions & 1 deletion tests/test_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace test {
// See also tests/tools/callback_vs_scan.sh.in
CASE( "test_plugin" ){

// --- Setup ------------------------------------------------------------------------------------------
// write the test_plugin.yaml file
std::string s1 = eckit::LocalPathName::cwd();
eckit::PathName configPath(s1.c_str());
Expand Down Expand Up @@ -70,13 +71,16 @@ CASE( "test_plugin" ){

eckit::PathName gribName = "extract_ranges.grib"; // contains 3 messages, expver=xxxx, step=1,2,3. Expect 2 messages selected by the regex

// --- Write fdb data ---------------------------------------------------------------------------------

fdb5::FDB fdb; // callback should be automatically registered
fdb.archive(*gribName.fileHandle());
fdb.flush();

fdb.archive(*gribName.fileHandle());
fdb.flush();

// --- Test -------------------------------------------------------------------------------------------
// Look at the gribjump files in tmpdir
std::vector<eckit::PathName> files;
std::vector<eckit::PathName> dir;
Expand All @@ -88,7 +92,15 @@ CASE( "test_plugin" ){
std::cout << file << std::endl;
FileCache cache = FileCache(file);
cache.print(std::cout);
EXPECT(cache.size() == 4); // match 2 messages, twice.
EXPECT_EQUAL(cache.size(), 4); // match 2 messages, twice.

// Check what happens when a cached file is modified
fdb.archive(*gribName.fileHandle());
fdb.flush();
EXPECT_EQUAL(cache.size(), 4); // does not autoreload
cache.reload();
EXPECT_EQUAL(cache.size(), 6);

}
EXPECT(count == 1 ); // because we should be appending to the same file
}
Expand Down

0 comments on commit 2b6403d

Please sign in to comment.