Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 68 additions & 31 deletions src/iocore/cache/RamCacheCLFUS.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,22 @@
#include "iocore/eventsystem/Tasks.h"
#include "fastlz/fastlz.h"
#include "tscore/CryptoHash.h"
#include "tscore/Regression.h"
#include <zlib.h>
#ifdef HAVE_LZMA_H
#include <lzma.h>
#endif

#define REQUIRED_COMPRESSION 0.9 // must get to this size or declared incompressible
#define REQUIRED_SHRINK 0.8 // must get to this size or keep original buffer (with padding)
#define HISTORY_HYSTERIA 10 // extra temporary history
#define ENTRY_OVERHEAD 256 // per-entry overhead to consider when computing cache value/size
#define LZMA_BASE_MEMLIMIT (64 * 1024 * 1024)
// #define CHECK_ACOUNTING 1 // very expensive double checking of all sizes

#define REQUEUE_HITS(_h) ((_h) ? ((_h) - 1) : 0)
#define CACHE_VALUE_HITS_SIZE(_h, _s) (static_cast<float>(((_h) + 1) / ((_s) + ENTRY_OVERHEAD)))
#define CACHE_VALUE(_x) CACHE_VALUE_HITS_SIZE((_x)->hits, (_x)->size)
constexpr float required_compression = 0.9f;
constexpr float required_shrink = 0.8f;
Comment thread
phongn marked this conversation as resolved.
Outdated
constexpr uint32_t history_hysteria = 10;
constexpr uint32_t entry_overhead = 256; // per-entry overhead to consider when computing cache value/size
constexpr uint32_t lzma_base_memlimit = 64 * 1024 * 1024;
Comment thread
phongn marked this conversation as resolved.
Outdated

#define AVERAGE_VALUE_OVER 100
#define REQUEUE_LIMIT 100
constexpr uint32_t average_value_over = 100;
constexpr uint32_t requeue_limit = 100;

#ifdef DEBUG

Expand Down Expand Up @@ -83,6 +81,24 @@ struct RamCacheCLFUSEntry {
Ptr<IOBufferData> data;
};

constexpr uint64_t
requeue_hits(const uint64_t hits)
{
return hits ? (hits - 1) : 0;
}

constexpr double
cache_value_hits_size(const uint64_t hits, const uint32_t size)
{
return static_cast<double>(hits + 1) / (size + entry_overhead);
}

constexpr double
cache_value(const RamCacheCLFUSEntry *const e)
{
return cache_value_hits_size(e->hits, e->size);
}

class RamCacheCLFUS : public RamCache
{
public:
Expand Down Expand Up @@ -230,7 +246,7 @@ check_accounting(RamCacheCLFUS *c)
RamCacheCLFUSEntry *y = c->lru[0].head;
while (y) {
x++;
xsize += y->size + ENTRY_OVERHEAD;
xsize += y->size + entry_overhead;
y = y->lru_link.next;
}
y = c->lru[1].head;
Expand Down Expand Up @@ -259,7 +275,7 @@ RamCacheCLFUS::get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey
if (e->key == *key && e->auxkey == auxkey) {
this->_move_compressed(e);
if (!e->flag_bits.lru) { // in memory
if (CACHE_VALUE(e) > this->_average_value) {
if (cache_value(e) > this->_average_value) {
this->_lru[e->flag_bits.lru].remove(e);
this->_lru[e->flag_bits.lru].enqueue(e);
}
Expand Down Expand Up @@ -290,7 +306,7 @@ RamCacheCLFUS::get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t auxkey
#ifdef HAVE_LZMA_H
case CACHE_COMPRESSION_LIBLZMA: {
size_t l = static_cast<size_t>(e->len), ipos = 0, opos = 0;
uint64_t memlimit = e->len * 2 + LZMA_BASE_MEMLIMIT;
uint64_t memlimit = e->len * 2 + lzma_base_memlimit;
if (LZMA_OK != lzma_stream_buffer_decode(&memlimit, 0, nullptr, reinterpret_cast<uint8_t *>(e->data->data()), &ipos,
e->compressed_len, reinterpret_cast<uint8_t *>(b), &opos, l)) {
goto Lfailed;
Expand Down Expand Up @@ -357,12 +373,12 @@ RamCacheCLFUS::_tick()
}
e->hits >>= 1;
if (e->hits) {
e->hits = REQUEUE_HITS(e->hits);
e->hits = requeue_hits(e->hits);
this->_lru[1].enqueue(e);
} else {
goto Lfree;
}
if (this->_history <= this->_objects + HISTORY_HYSTERIA) {
if (this->_history <= this->_objects + history_hysteria) {
return;
}
e = this->_lru[1].dequeue();
Expand Down Expand Up @@ -410,7 +426,7 @@ RamCacheCLFUS::_destroy(RamCacheCLFUSEntry *e)
this->_lru[e->flag_bits.lru].remove(e);
if (!e->flag_bits.lru) {
this->_objects--;
this->_bytes -= e->size + ENTRY_OVERHEAD;
this->_bytes -= e->size + entry_overhead;
ts::Metrics::Gauge::decrement(cache_rsb.ram_cache_bytes, e->size);
ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.ram_cache_bytes, e->size);
e->data = nullptr;
Expand Down Expand Up @@ -526,10 +542,10 @@ RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most)
goto Lcontinue;
}
}
if (l > REQUIRED_COMPRESSION * e->len) {
if (l > required_compression * e->len) {
e->flag_bits.incompressible = true;
}
if (l > REQUIRED_SHRINK * e->size) {
if (l > required_shrink * e->size) {
goto Lfailed;
}
if (l < e->len) {
Expand Down Expand Up @@ -581,10 +597,10 @@ RamCacheCLFUS::_requeue_victims(Que(RamCacheCLFUSEntry, lru_link) & victims)
{
RamCacheCLFUSEntry *victim = nullptr;
while ((victim = victims.dequeue())) {
this->_bytes += victim->size + ENTRY_OVERHEAD;
this->_bytes += victim->size + entry_overhead;
ts::Metrics::Gauge::increment(cache_rsb.ram_cache_bytes, victim->size);
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.ram_cache_bytes, victim->size);
victim->hits = REQUEUE_HITS(victim->hits);
victim->hits = requeue_hits(victim->hits);
this->_lru[0].enqueue(victim);
}
}
Expand Down Expand Up @@ -637,15 +653,15 @@ RamCacheCLFUS::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy,
return 1;
} else {
this->_lru[1].remove(e);
if (CACHE_VALUE(e) < this->_average_value) {
if (cache_value(e) < this->_average_value) {
this->_lru[1].enqueue(e);
return 0;
}
}
}
Que(RamCacheCLFUSEntry, lru_link) victims;
RamCacheCLFUSEntry *victim = nullptr;
int requeue_limit = REQUEUE_LIMIT;
int requeue_count = requeue_limit;
if (!this->_lru[1].head) { // initial fill
if (this->_bytes + size <= this->_max_bytes) {
goto Linsert;
Expand Down Expand Up @@ -674,12 +690,12 @@ RamCacheCLFUS::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy,
DDbg(dbg_ctl_ram_cache, "put %X %" PRId64 " NO VICTIM", key->slice32(3), auxkey);
return 0;
}
this->_average_value = (CACHE_VALUE(victim) + (this->_average_value * (AVERAGE_VALUE_OVER - 1))) / AVERAGE_VALUE_OVER;
if (CACHE_VALUE(victim) > this->_average_value && requeue_limit-- > 0) {
this->_average_value = (cache_value(victim) + (this->_average_value * (average_value_over - 1))) / average_value_over;
if (cache_value(victim) > this->_average_value && requeue_count-- > 0) {
this->_lru[0].enqueue(victim);
continue;
}
this->_bytes -= victim->size + ENTRY_OVERHEAD;
this->_bytes -= victim->size + entry_overhead;
ts::Metrics::Gauge::decrement(cache_rsb.ram_cache_bytes, victim->size);
ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.ram_cache_bytes, victim->size);
victims.enqueue(victim);
Expand All @@ -688,13 +704,13 @@ RamCacheCLFUS::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy,
} else {
this->_ncompressed--;
}
victim_value += CACHE_VALUE(victim);
victim_value += cache_value(victim);
this->_tick();
if (!e) {
goto Lhistory;
} else { // e from history
DDbg(dbg_ctl_ram_cache_compare, "put %f %f", victim_value, CACHE_VALUE(e));
if (this->_bytes + victim->size + size > this->_max_bytes && victim_value > CACHE_VALUE(e)) {
DDbg(dbg_ctl_ram_cache_compare, "put %f %f", victim_value, cache_value(e));
if (this->_bytes + victim->size + size > this->_max_bytes && victim_value > cache_value(e)) {
this->_requeue_victims(victims);
this->_lru[1].enqueue(e);
DDbg(dbg_ctl_ram_cache, "put %X %" PRId64 " size %d INC %" PRId64 " HISTORY", key->slice32(3), auxkey, e->size, e->hits);
Expand All @@ -708,10 +724,10 @@ RamCacheCLFUS::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy,
Linsert:
while ((victim = victims.dequeue())) {
if (this->_bytes + size + victim->size <= this->_max_bytes) {
this->_bytes += victim->size + ENTRY_OVERHEAD;
this->_bytes += victim->size + entry_overhead;
ts::Metrics::Gauge::increment(cache_rsb.ram_cache_bytes, victim->size);
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.ram_cache_bytes, victim->size);
victim->hits = REQUEUE_HITS(victim->hits);
victim->hits = requeue_hits(victim->hits);
this->_lru[0].enqueue(victim);
} else {
this->_victimize(victim);
Expand Down Expand Up @@ -741,7 +757,7 @@ RamCacheCLFUS::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool copy,
e->data->_mem_type = DEFAULT_ALLOC;
}
e->flag_bits.copy = copy;
this->_bytes += size + ENTRY_OVERHEAD;
this->_bytes += size + entry_overhead;
ts::Metrics::Gauge::increment(cache_rsb.ram_cache_bytes, size);
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.ram_cache_bytes, size);
e->size = size;
Expand Down Expand Up @@ -792,3 +808,24 @@ new_RamCacheCLFUS()
RamCacheCLFUS *r = new RamCacheCLFUS;
return r;
}

// Guards against PR #11733-style regressions of the CLFUS value metric: the value density
// must be computed in floating point. Integer division truncates (hits + 1) / (size + overhead)
// to 0 for normal object sizes, zeroing the metric and silently collapsing CLFUS to FIFO (no
// promote-on-hit, no clock second chance, no value-based ghost re-admission).
REGRESSION_TEST(ram_cache_clfus_value)([[maybe_unused]] RegressionTest *t, [[maybe_unused]] int level, int *pstatus)
{
*pstatus = REGRESSION_TEST_FAILED;

constexpr float v_one = cache_value_hits_size(1u, 16384u); // a typical 16 KiB object, seen once
constexpr float v_hot = cache_value_hits_size(100u, 16384u); // same size, many more hits
constexpr float v_small = cache_value_hits_size(10u, 1024u); // smaller object, equal hits
constexpr float v_large = cache_value_hits_size(10u, 16384u);

// A non-zero fraction: the integer-division regression makes this exactly 0.0f.
static_assert(v_one > 0.0f, "CLFUS value metric truncated to zero (integer division)");
static_assert(v_hot > v_one, "CLFUS value metric does not increase with hits");
static_assert(v_small > v_large, "CLFUS value metric does not decrease with size");

*pstatus = REGRESSION_TEST_PASSED;
}