Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the cluster cache by memory instead of number of clusters #956

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
42 changes: 21 additions & 21 deletions include/zim/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ namespace zim
efficientOrder
};

/** Get the maximum size of the cluster cache.
*
* @return The maximum memory size used the cluster cache.
*/
size_t LIBZIM_API getClusterCacheMaxSize();

/** Get the current size of the cluster cache.
*
* @return The current memory size used by the cluster cache.
*/
size_t LIBZIM_API getClusterCacheCurrentSize();

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param sizeInB The memory limit (in bytes) for the cluster cache.
*/
void LIBZIM_API setClusterCacheMaxSize(size_t sizeInB);

/**
* The Archive class to access content in a zim file.
*
Expand Down Expand Up @@ -534,27 +555,6 @@ namespace zim
*/
std::shared_ptr<FileImpl> getImpl() const { return m_impl; }

/** Get the maximum size of the cluster cache.
*
* @return The maximum number of clusters stored in the cache.
*/
size_t getClusterCacheMaxSize() const;

/** Get the current size of the cluster cache.
*
* @return The number of clusters currently stored in the cache.
*/
size_t getClusterCacheCurrentSize() const;

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param nbClusters The maximum number of clusters stored in the cache.
*/
void setClusterCacheMaxSize(size_t nbClusters);

/** Get the size of the dirent cache.
*
* @return The maximum number of dirents stored in the cache.
Expand Down
4 changes: 2 additions & 2 deletions meson_options.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option('CLUSTER_CACHE_SIZE', type : 'string', value : '16',
description : 'set cluster cache size to number (default:16)')
option('CLUSTER_CACHE_SIZE', type : 'string', value : '536870912',
description : 'set cluster cache size to number (default:512MB)')
option('DIRENT_CACHE_SIZE', type : 'string', value : '512',
description : 'set dirent cache size to number (default:512)')
option('DIRENT_LOOKUP_CACHE_SIZE', type : 'string', value : '1024',
Expand Down
13 changes: 6 additions & 7 deletions src/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,19 @@ namespace zim
return m_impl->hasNewNamespaceScheme();
}

size_t Archive::getClusterCacheMaxSize() const
size_t getClusterCacheMaxSize()
{
return m_impl->getClusterCacheMaxSize();
return getClusterCache().getMaxCost();
}

size_t Archive::getClusterCacheCurrentSize() const
size_t getClusterCacheCurrentSize()
{
return m_impl->getClusterCacheCurrentSize();
return getClusterCache().getCurrentCost();
}

void Archive::setClusterCacheMaxSize(size_t nbClusters)
void setClusterCacheMaxSize(size_t sizeInB)
{
m_impl->setClusterCacheMaxSize(nbClusters);
getClusterCache().setMaxCost(sizeInB);
}

size_t Archive::getDirentCacheMaxSize() const
Expand All @@ -534,7 +534,6 @@ namespace zim
m_impl->setDirentCacheMaxSize(nbDirents);
}


size_t Archive::getDirentLookupCacheMaxSize() const
{
return m_impl->getDirentLookupCacheMaxSize();
Expand Down
5 changes: 5 additions & 0 deletions src/buffer_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ zsize_t BufferReader::size() const
return source.size();
}

size_t BufferReader::getMemorySize() const
{
return source.size().v;
}

offset_t BufferReader::offset() const
{
return offset_t((offset_type)(static_cast<const void*>(source.data(offset_t(0)))));
Expand Down
1 change: 1 addition & 0 deletions src/buffer_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class LIBZIM_PRIVATE_API BufferReader : public Reader {
virtual ~BufferReader() {};

zsize_t size() const override;
size_t getMemorySize() const override;
offset_t offset() const override;

const Buffer get_buffer(offset_t offset, zsize_t size) const override;
Expand Down
45 changes: 42 additions & 3 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

#include "log.h"

#include "config.h"

log_define("zim.cluster")

#define log_debug1(e)
Expand Down Expand Up @@ -86,7 +84,8 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
Cluster::Cluster(std::unique_ptr<IStreamReader> reader_, Compression comp, bool isExtended)
: compression(comp),
isExtended(isExtended),
m_reader(std::move(reader_))
m_reader(std::move(reader_)),
m_memorySize(0)
{
if (isExtended) {
read_header<uint64_t>();
Expand Down Expand Up @@ -179,4 +178,44 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
}
}

// This function must return a constant size for a given cluster.
// This is important as we want to remove the same size that what we add when we remove
// the cluster from the cache.
// However, because of partial decompression, this size can change:
// - As we advance in the compression, we can create new blob readers in `m_blobReaders`
// - The stream itself may allocate memory.
// To solve this, we take the average and say a cluster's blob readers will half be created and
// so we assume a readers size of half the full uncompressed cluster data size.
// If cluster is not compressed, we never store its content (mmap is created on demand and not cached),
// so we use a size of 0 for the readers.
// It also appears that when we get the size of the stream, we reach a state where no
// futher allocation will be done by it. Probably because:
// - We already started to decompress the stream to read the offsets
// - Cluster data size is smaller than window size associated to compression level (?)
// We anyway check that and print a warning if this is not the case, hopping that user will create
// an issue allowing us for further analysis.
// Note:
// - No need to protect this method from concurent access as it will be called by the concurent_cache which will
// have a lock (on lru cache) to ensure only one thread access it in the same time.
size_t Cluster::getMemorySize() const {
if (!m_memorySize) {
auto offsets_size = sizeof(offset_t) * m_blobOffsets.size();
auto readers_size = 0;
if (isCompressed()) {
readers_size = m_blobOffsets.back().v / 2;
}
m_streamSize = m_reader->getMemorySize();
// Compression level define a huge window and make decompression stream allocate a huge memory to store it.
// However, the used memory will not be greater than the content itself, even if window is bigger.
// On linux (at least), the real used memory will be the actual memory used, not the one allocated.
// So, let's clamm the the stream size to the size of the content itself.
m_memorySize = offsets_size + readers_size + std::min<size_type>(m_streamSize, m_blobOffsets.back().v);
}
auto streamSize = m_reader->getMemorySize();
if (streamSize != m_streamSize) {
std::cerr << "WARNING: stream size have changed from " << m_streamSize << " to " << streamSize << std::endl;
std::cerr << "Please open an issue on https://github.com/openzim/libzim/issues with this message and the zim file you use" << std::endl;
}
return m_memorySize;
}
}
10 changes: 10 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ namespace zim

mutable std::mutex m_readerAccessMutex;
mutable BlobReaders m_blobReaders;
mutable size_t m_memorySize;
mutable size_t m_streamSize;


template<typename OFFSET_TYPE>
Expand All @@ -90,9 +92,17 @@ namespace zim
Blob getBlob(blob_index_t n) const;
Blob getBlob(blob_index_t n, offset_t offset, zsize_t size) const;

size_t getMemorySize() const;

static std::shared_ptr<Cluster> read(const Reader& zimReader, offset_t clusterOffset);
};

struct ClusterMemorySize {
static size_t cost(const std::shared_ptr<const Cluster>& cluster) {
return cluster->getMemorySize();
}
};

}

#endif // ZIM_CLUSTER_H
13 changes: 13 additions & 0 deletions src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ void LZMA_INFO::stream_end_decode(stream_t* stream)
lzma_end(stream);
}

size_t LZMA_INFO::state_size(const stream_t& stream)
{
return lzma_memusage(&stream);
}


const std::string ZSTD_INFO::name = "zstd";

Expand Down Expand Up @@ -170,3 +175,11 @@ void ZSTD_INFO::stream_end_decode(stream_t* stream)
void ZSTD_INFO::stream_end_encode(stream_t* stream)
{
}

size_t ZSTD_INFO::state_size(const stream_t& stream) {
if (stream.decoder_stream) {
return ZSTD_sizeof_CStream(stream.encoder_stream);
} else {
return ZSTD_sizeof_DStream(stream.decoder_stream);
}
}
2 changes: 2 additions & 0 deletions src/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct LZMA_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static CompStatus stream_run(stream_t* stream, CompStep step);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down Expand Up @@ -94,6 +95,7 @@ struct LIBZIM_PRIVATE_API ZSTD_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static void stream_end_encode(stream_t* stream);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down
71 changes: 57 additions & 14 deletions src/concurrent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,38 @@

#include "lrucache.h"

#include <chrono>
#include <cstddef>
#include <future>
#include <mutex>

namespace zim
{

template<typename CostEstimation>
struct FutureToValueCostEstimation {
template<typename T>
static size_t cost(const std::shared_future<T>& future) {
// The future is the value in the cache.
// When calling getOrPut, if the key is not in the cache,
// we add a future and then we compute the value and set the future.
// But lrucache call us when we add the future, meaning before we have
// computed the value. If we wait here (or use future.get), we will dead lock
// as we need to exit before setting the value.
// So in this case, we return 0. `ConcurrentCache::getOrPut` will correctly increase
// the current cache size when it have an actual value.
// We still need to compute the size of the value if the future has a value as it
// is also use to decrease the cache size when the value is drop.
std::future_status status = future.wait_for(std::chrono::nanoseconds::zero());
if (status == std::future_status::ready) {
return CostEstimation::cost(future.get());
} else {
return 0;
}
}

Check notice on line 55 in src/concurrent_cache.h

View check run for this annotation

codefactor.io / CodeFactor

src/concurrent_cache.h#L55

Redundant blank line at the end of a code block should be deleted. (whitespace/blank_line)
};

/**
ConcurrentCache implements a concurrent thread-safe cache

Expand All @@ -39,16 +64,16 @@
safe, and, in case of a cache miss, will block until that element becomes
available.
*/
template <typename Key, typename Value>
class ConcurrentCache
template <typename Key, typename Value, typename CostEstimation>
class ConcurrentCache: private lru_cache<Key, std::shared_future<Value>, FutureToValueCostEstimation<CostEstimation>>
{
private: // types
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;
typedef lru_cache<Key, ValuePlaceholder, FutureToValueCostEstimation<CostEstimation>> Impl;

public: // types
explicit ConcurrentCache(size_t maxEntries)
: impl_(maxEntries)
explicit ConcurrentCache(size_t maxCost)
: Impl(maxCost)
{}

// Gets the entry corresponding to the given key. If the entry is not in the
Expand All @@ -65,11 +90,24 @@
{
std::promise<Value> valuePromise;
std::unique_lock<std::mutex> l(lock_);
const auto x = impl_.getOrPut(key, valuePromise.get_future().share());
const auto x = Impl::getOrPut(key, valuePromise.get_future().share());
l.unlock();
if ( x.miss() ) {
try {
valuePromise.set_value(f());
auto cost = CostEstimation::cost(x.value().get());
// There is a small window when the valuePromise may be drop from lru cache after
// we set the value but before we increase the size of the cache.
// In this case decrease the size of `cost` before increasing it.
// First of all it should be pretty rare as we have just put the future in the cache so it
// should not be the least used item.
// If it happens, this should not be a problem if current_size is bigger than `cost` (most of the time)
// For the really rare specific case of current cach size being lower than `cost` (if possible),
// `decreaseCost` will clamp the new size to 0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like such subtle pitfalls here and there (that may turn into time-bombs). This makes me come up with the following question and proposal:

Is memory usage accounting going to expand beyond clusters? If not, then why don't we implement item cost tracking at ConcurrentCache level, leaving lru_cache absolutely intact?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is memory usage accounting going to expand beyond clusters?

I don't know. The may purpose of all this is to avoid a exaggerated memory consumption for long time running kiwix-serve with a "lot" (not defined, relative to the system) of zim to serve.

We have assumed it was the cluster cache taking all this memory. Mainly because we currently have a limit per zim file (and so, a lot of zim files make the cache mostly unlimited) and this cache should be more memory consuming than dirent cache.

So there is no current plan to extend this limitation to anything else for now, but we may if we found our assumption was wrong.

If not, then why don't we implement item cost tracking at ConcurrentCache level, leaving lru_cache absolutely intact?

Your idea seems good at first glance. However thinking twice, here few limitations (not necessarily blockers):

  • We would have to make the inner lru_cache used by ConcurrentCache unlimited (can be easily made with a huge size_t limit, but still)
  • Concurrent cache would have to re-implement all the put/putMissing and drop method to drop itself the clusters from the cache and trace the allocated memory.
  • All the thing about what is the (constant) memory consumption of a cluster would still stand.

At the end, it would simply move all the things about memory consumption a level up (in ConcurrentCache) and make it call lru_cache to drop an item even if its internal limit is not reach.
It would indeed make lru_cache almost unmodified but I'm not sure it would worth it.
(And I don't see a reason we would like to keep lru_cache clean from memory limitation but accept the modifications in ConcurrentCache)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And I don't see a reason we would like to keep lru_cache clean from memory limitation but accept the modifications in ConcurrentCache)

The main benefit would be avoiding design quirks.

  • Concurrent cache would have to re-implement all the put/putMissing and drop method to drop itself the clusters from the cache and trace the allocated memory.

ConcurrentCache is a relatively lightweight wrapper around lru_cache (without any is-a relation between them). Its public API related to memory usage management consists of just three member functions:

  • getOrPut()
  • drop()
  • setMaxSize()

Maintaining memory limits entirely within that small set of functions can be achieved with less changes. It also simplifies the implementation in which the cost of an item is evaluated only once (on addition) and is stored with the item.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main benefit would be avoiding design quirks.

But ConcurrentCache is a thin wrapper to make the lru_cache thread safe.
Adding a memory limitation on it seems to me this is a quirk we should avoid.

Maintaining memory limits entirely within that small set of functions can be achieved with less changes. It also simplifies the implementation in which the cost of an item is evaluated only once (on addition) and is stored with the item.

I've started implementing your idea to see if I was wrong. Here few things have faced:

  • We have to decrease the size when we drop an item. So we need to make lru_cache::dropLast return the drop item. However, current drop implementation return false if key is not found. As we want to return the value and can't use std::optional (we limit ourselves to cpp14), we would have to throw the std::out_of_range instead of returning false. Add so all user of drop have to catch it.
  • We still have to drop all cluster from a archive at archive destruction. So we need a dropAll. Either it is implemented in lru_cache (and we have to return all of them to be able to decrease the size) or it is implemented in ConcurrentCache (and we break encapsulation).
  • lru_cache may drop value by itself. This should not happen in practice as ConcurrentCache will configure it to be unlimited. But it create somehow an inconsistent api as we have code which can drop value from lru_cache without ConcurrentCache catching it.

Anyway, last commit implement your idea. We can discuss about real implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it was more complicated than I thought, but it helped to surface a potential problem that was present with the old implementation too. I felt that something was wrong, but I thought that it was just an uncomfortable feeling caused by the arcane interplay between lru_cache and ConcurrentCache. Now with the cost accounting fully done in a single class the problem was easier to spot.

Consider working with two ZIM files - one on (very) slow drive and the other on a fast drive. A cluster from the slow ZIM file is requested. A placeholder for it is created in the cache, and while the cluster is loading, a burst of activity on the fast ZIM file happens that fills the cache with entries from the fast ZIM file and drops the placeholder entry of the slow ZIM file. Since the cost of that placeholder entry is not known, a fake value of 0 is used instead (the result being that the removal of the entry has no effect and leaves no trace). Then loading of the slow cluster completes and the cache cost is increased by its actual memory usage value, but that item is no longer in the cache! Repetition of that scenario can lead to the current "fake" cost of the cache being above the limit, even though the cache is empty!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same scenario can occur more naturally as follows:

  • slow access: cluster not in the cache (cache miss)
  • fast access: cluster already in the cache (cache hit, the cluster is moved to the beginning of the LRU list)

Thus, a cache miss, followed by a large enough number of cache hits (so that the cache miss is pushed to the end of the LRU queue), followed by another cache miss (occurring before the first cache miss is fully resolved), will result in the first cache miss being evicted from the cache before it was even properly recorded and the cache memory usage balance off by the cost of that ill-fated cluster.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last commit should fix this point.
(I have removed the "WIP" commit)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't

{
std::unique_lock<std::mutex> l(lock_);
Impl::increaseCost(cost);
}
} catch (std::exception& e) {
drop(key);
throw;
Expand All @@ -82,26 +120,31 @@
bool drop(const Key& key)
{
std::unique_lock<std::mutex> l(lock_);
return impl_.drop(key);
return Impl::drop(key);
}

template<class F>
void dropAll(F f) {
std::unique_lock<std::mutex> l(lock_);
Impl::dropAll(f);
}

size_t getMaxSize() const {
size_t getMaxCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.getMaxSize();
return Impl::getMaxCost();
}

size_t getCurrentSize() const {
size_t getCurrentCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.size();
return Impl::cost();
}

void setMaxSize(size_t newSize) {
void setMaxCost(size_t newSize) {
std::unique_lock<std::mutex> l(lock_);
return impl_.setMaxSize(newSize);
return Impl::setMaxCost(newSize);
}

private: // data
Impl impl_;
mutable std::mutex lock_;
};

Expand Down
5 changes: 5 additions & 0 deletions src/decoderstreamreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "compression.h"
#include "istreamreader.h"
#include <cstddef>

namespace zim
{
Expand All @@ -49,6 +50,10 @@ class DecoderStreamReader : public IStreamReader
Decoder::stream_end_decode(&m_decoderState);
}

size_t getMemorySize() const override {
return m_encodedDataReader->getMemorySize() + m_encodedDataChunk.size().v + Decoder::state_size(m_decoderState);
}

private: // functions
void readNextChunk()
{
Expand Down
Loading
Loading