Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the cluster cache by memory instead of number of clusters #956

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
42 changes: 21 additions & 21 deletions include/zim/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ namespace zim
efficientOrder
};

/** Get the maximum size of the cluster cache.
*
* @return The maximum memory size used the cluster cache.
*/
size_t LIBZIM_API getClusterCacheMaxSize();

/** Get the current size of the cluster cache.
*
* @return The current memory size used by the cluster cache.
*/
size_t LIBZIM_API getClusterCacheCurrentSize();

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param sizeInB The memory limit (in bytes) for the cluster cache.
*/
void LIBZIM_API setClusterCacheMaxSize(size_t sizeInB);

/**
* The Archive class to access content in a zim file.
*
Expand Down Expand Up @@ -534,27 +555,6 @@ namespace zim
*/
std::shared_ptr<FileImpl> getImpl() const { return m_impl; }

/** Get the maximum size of the cluster cache.
*
* @return The maximum number of clusters stored in the cache.
*/
size_t getClusterCacheMaxSize() const;

/** Get the current size of the cluster cache.
*
* @return The number of clusters currently stored in the cache.
*/
size_t getClusterCacheCurrentSize() const;

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param nbClusters The maximum number of clusters stored in the cache.
*/
void setClusterCacheMaxSize(size_t nbClusters);

/** Get the size of the dirent cache.
*
* @return The maximum number of dirents stored in the cache.
Expand Down
4 changes: 2 additions & 2 deletions meson_options.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option('CLUSTER_CACHE_SIZE', type : 'string', value : '16',
description : 'set cluster cache size to number (default:16)')
option('CLUSTER_CACHE_SIZE', type : 'string', value : '536870912',
description : 'set cluster cache size to number (default:512MB)')
option('DIRENT_CACHE_SIZE', type : 'string', value : '512',
description : 'set dirent cache size to number (default:512)')
option('DIRENT_LOOKUP_CACHE_SIZE', type : 'string', value : '1024',
Expand Down
13 changes: 6 additions & 7 deletions src/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,19 @@ namespace zim
return m_impl->hasNewNamespaceScheme();
}

size_t Archive::getClusterCacheMaxSize() const
size_t getClusterCacheMaxSize()
{
return m_impl->getClusterCacheMaxSize();
return getClusterCache().getMaxCost();
}

size_t Archive::getClusterCacheCurrentSize() const
size_t getClusterCacheCurrentSize()
{
return m_impl->getClusterCacheCurrentSize();
return getClusterCache().getCurrentCost();
}

void Archive::setClusterCacheMaxSize(size_t nbClusters)
void setClusterCacheMaxSize(size_t sizeInB)
{
m_impl->setClusterCacheMaxSize(nbClusters);
getClusterCache().setMaxCost(sizeInB);
}

size_t Archive::getDirentCacheMaxSize() const
Expand All @@ -534,7 +534,6 @@ namespace zim
m_impl->setDirentCacheMaxSize(nbDirents);
}


size_t Archive::getDirentLookupCacheMaxSize() const
{
return m_impl->getDirentLookupCacheMaxSize();
Expand Down
5 changes: 5 additions & 0 deletions src/buffer_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ zsize_t BufferReader::size() const
return source.size();
}

size_t BufferReader::getMemorySize() const
{
return source.size().v;
}

offset_t BufferReader::offset() const
{
return offset_t((offset_type)(static_cast<const void*>(source.data(offset_t(0)))));
Expand Down
1 change: 1 addition & 0 deletions src/buffer_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class LIBZIM_PRIVATE_API BufferReader : public Reader {
virtual ~BufferReader() {};

zsize_t size() const override;
size_t getMemorySize() const override;
offset_t offset() const override;

const Buffer get_buffer(offset_t offset, zsize_t size) const override;
Expand Down
45 changes: 42 additions & 3 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

#include "log.h"

#include "config.h"

log_define("zim.cluster")

#define log_debug1(e)
Expand Down Expand Up @@ -86,7 +84,8 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
Cluster::Cluster(std::unique_ptr<IStreamReader> reader_, Compression comp, bool isExtended)
: compression(comp),
isExtended(isExtended),
m_reader(std::move(reader_))
m_reader(std::move(reader_)),
m_memorySize(0)
{
if (isExtended) {
read_header<uint64_t>();
Expand Down Expand Up @@ -179,4 +178,44 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
}
}

// This function must return a constant size for a given cluster.
// This is important as we want to remove the same size that what we add when we remove
// the cluster from the cache.
// However, because of partial decompression, this size can change:
// - As we advance in the compression, we can create new blob readers in `m_blobReaders`
// - The stream itself may allocate memory.
// To solve this, we take the average and say a cluster's blob readers will half be created and
// so we assume a readers size of half the full uncompressed cluster data size.
// If cluster is not compressed, we never store its content (mmap is created on demand and not cached),
// so we use a size of 0 for the readers.
// It also appears that when we get the size of the stream, we reach a state where no
// futher allocation will be done by it. Probably because:
// - We already started to decompress the stream to read the offsets
// - Cluster data size is smaller than window size associated to compression level (?)
// We anyway check that and print a warning if this is not the case, hopping that user will create
// an issue allowing us for further analysis.
// Note:
// - No need to protect this method from concurent access as it will be called by the concurent_cache which will
// have a lock (on lru cache) to ensure only one thread access it in the same time.
size_t Cluster::getMemorySize() const {
if (!m_memorySize) {
auto offsets_size = sizeof(offset_t) * m_blobOffsets.size();
auto readers_size = 0;
if (isCompressed()) {
readers_size = m_blobOffsets.back().v / 2;
}
m_streamSize = m_reader->getMemorySize();
// Compression level define a huge window and make decompression stream allocate a huge memory to store it.
// However, the used memory will not be greater than the content itself, even if window is bigger.
// On linux (at least), the real used memory will be the actual memory used, not the one allocated.
// So, let's clamm the the stream size to the size of the content itself.
m_memorySize = offsets_size + readers_size + std::min<size_type>(m_streamSize, m_blobOffsets.back().v);
}
auto streamSize = m_reader->getMemorySize();
if (streamSize != m_streamSize) {
std::cerr << "WARNING: stream size have changed from " << m_streamSize << " to " << streamSize << std::endl;
std::cerr << "Please open an issue on https://github.com/openzim/libzim/issues with this message and the zim file you use" << std::endl;
}
return m_memorySize;
}
}
10 changes: 10 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ namespace zim

mutable std::mutex m_readerAccessMutex;
mutable BlobReaders m_blobReaders;
mutable size_t m_memorySize;
mutable size_t m_streamSize;


template<typename OFFSET_TYPE>
Expand All @@ -90,9 +92,17 @@ namespace zim
Blob getBlob(blob_index_t n) const;
Blob getBlob(blob_index_t n, offset_t offset, zsize_t size) const;

size_t getMemorySize() const;

static std::shared_ptr<Cluster> read(const Reader& zimReader, offset_t clusterOffset);
};

struct ClusterMemorySize {
static size_t cost(const std::shared_ptr<const Cluster>& cluster) {
return cluster->getMemorySize();
}
};

}

#endif // ZIM_CLUSTER_H
13 changes: 13 additions & 0 deletions src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ void LZMA_INFO::stream_end_decode(stream_t* stream)
lzma_end(stream);
}

size_t LZMA_INFO::state_size(const stream_t& stream)
{
return lzma_memusage(&stream);
}


const std::string ZSTD_INFO::name = "zstd";

Expand Down Expand Up @@ -170,3 +175,11 @@ void ZSTD_INFO::stream_end_decode(stream_t* stream)
void ZSTD_INFO::stream_end_encode(stream_t* stream)
{
}

size_t ZSTD_INFO::state_size(const stream_t& stream) {
if (stream.decoder_stream) {
return ZSTD_sizeof_CStream(stream.encoder_stream);
} else {
return ZSTD_sizeof_DStream(stream.decoder_stream);
}
}
2 changes: 2 additions & 0 deletions src/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct LZMA_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static CompStatus stream_run(stream_t* stream, CompStep step);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down Expand Up @@ -94,6 +95,7 @@ struct LIBZIM_PRIVATE_API ZSTD_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static void stream_end_encode(stream_t* stream);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down
80 changes: 66 additions & 14 deletions src/concurrent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,38 @@

#include "lrucache.h"

#include <chrono>
#include <cstddef>
#include <future>
#include <mutex>

namespace zim
{

template<typename CostEstimation>
struct FutureToValueCostEstimation {
template<typename T>
static size_t cost(const std::shared_future<T>& future) {
// The future is the value in the cache.
// When calling getOrPut, if the key is not in the cache,
// we add a future and then we compute the value and set the future.
// But lrucache call us when we add the future, meaning before we have
// computed the value. If we wait here (or use future.get), we will dead lock
// as we need to exit before setting the value.
// So in this case, we return 0. `ConcurrentCache::getOrPut` will correctly increase
// the current cache size when it have an actual value.
// We still need to compute the size of the value if the future has a value as it
// is also use to decrease the cache size when the value is drop.
std::future_status status = future.wait_for(std::chrono::nanoseconds::zero());
if (status == std::future_status::ready) {
return CostEstimation::cost(future.get());
} else {
return 0;
}
}

Check notice on line 55 in src/concurrent_cache.h

View check run for this annotation

codefactor.io / CodeFactor

src/concurrent_cache.h#L55

Redundant blank line at the end of a code block should be deleted. (whitespace/blank_line)
};

/**
ConcurrentCache implements a concurrent thread-safe cache

Expand All @@ -39,16 +64,16 @@
safe, and, in case of a cache miss, will block until that element becomes
available.
*/
template <typename Key, typename Value>
class ConcurrentCache
template <typename Key, typename Value, typename CostEstimation>
class ConcurrentCache: private lru_cache<Key, std::shared_future<Value>, FutureToValueCostEstimation<CostEstimation>>
{
private: // types
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;
typedef lru_cache<Key, ValuePlaceholder, FutureToValueCostEstimation<CostEstimation>> Impl;

public: // types
explicit ConcurrentCache(size_t maxEntries)
: impl_(maxEntries)
explicit ConcurrentCache(size_t maxCost)
: Impl(maxCost)
{}

// Gets the entry corresponding to the given key. If the entry is not in the
Expand All @@ -65,11 +90,33 @@
{
std::promise<Value> valuePromise;
std::unique_lock<std::mutex> l(lock_);
const auto x = impl_.getOrPut(key, valuePromise.get_future().share());
auto shared_future = valuePromise.get_future().share();
const auto x = Impl::getOrPut(key, shared_future);
l.unlock();
if ( x.miss() ) {
try {
valuePromise.set_value(f());
auto cost = CostEstimation::cost(x.value().get());
// There is a small window when the valuePromise may be drop from lru cache after
// we set the value but before we increase the size of the cache.
// In this case we decrease the size of `cost` before increasing it.
// First of all it should be pretty rare as we have just put the future in the cache so it
// should not be the least used item.
// If it happens, this should not be a problem if current_size is bigger than `cost` (most of the time)
// For the really rare specific case of current cach size being lower than `cost` (if possible),
// `decreaseCost` will clamp the new size to 0.
{
std::unique_lock<std::mutex> l(lock_);
// There is a window when the shared_future is drop from the cache while we are computing the value.
// If this is the case, we readd the shared_future in the cache.
if (!Impl::exists(key)) {
// We don't have have to increase the cache as the future is already set, so the cost will be valid.
Impl::put(key, shared_future);
} else {
// We just have to increase the cost as we used 0 for unset future.
Impl::increaseCost(cost);
}
}
} catch (std::exception& e) {
drop(key);
throw;
Expand All @@ -82,26 +129,31 @@
bool drop(const Key& key)
{
std::unique_lock<std::mutex> l(lock_);
return impl_.drop(key);
return Impl::drop(key);
}

template<class F>
void dropAll(F f) {
std::unique_lock<std::mutex> l(lock_);
Impl::dropAll(f);
}

size_t getMaxSize() const {
size_t getMaxCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.getMaxSize();
return Impl::getMaxCost();
}

size_t getCurrentSize() const {
size_t getCurrentCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.size();
return Impl::cost();
}

void setMaxSize(size_t newSize) {
void setMaxCost(size_t newSize) {
std::unique_lock<std::mutex> l(lock_);
return impl_.setMaxSize(newSize);
return Impl::setMaxCost(newSize);
}

private: // data
Impl impl_;
mutable std::mutex lock_;
};

Expand Down
Loading
Loading