Skip to content

Commit

Permalink
Add option for cache miss on failure (#1296)
Browse files Browse the repository at this point in the history
* Add option for cache miss on failure
* Update config docs
* fix tests and address comments
* address comments
  • Loading branch information
V-FEXrt authored Jun 16, 2023
1 parent a2dcf9f commit 996e405
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 33 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ contain JSON5 source where the root object may contain the following keys.
| log_header_align | A boolean that specifies whether or not to align log header output | No | Boolean | Yes | Yes | False
| max_cache_size | The number of bytes after which the shared cache will start a collection | No | Integer | Yes | Yes | 25GB
| low_cache_size | The number of bytes that the cache tries to reach during a collection | No | Integer | Yes | Yes | 15 GB
| cache_miss_on_failure | if `true` shared cache will report a cache miss instead of terminating when something goes wrong | No | Boolean | Yes | Yes | False

Below is a full example

Expand Down
38 changes: 22 additions & 16 deletions src/job_cache/job_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,27 @@ wcl::optional<ConnectError> Cache::backoff_try_connect(int attempts) {
return {};
}

Cache::Cache(std::string dir, uint64_t max, uint64_t low) {
Cache::Cache(std::string dir, uint64_t max, uint64_t low, bool miss) {
cache_dir = dir;
max_size = max;
low_threshold = low;
miss_on_failure = miss;

mkdir_no_fail(cache_dir.c_str());

launch_daemon();

// TODO: Add config var to determine if fail is a cache miss
auto error = backoff_try_connect(14);
if (error && true) {
wcl::log::error("could not connect to daemon. dir = %s", cache_dir.c_str()).urgent()();
exit(1);
if (!error) {
return;
}

wcl::log::error("could not connect to daemon. dir = %s", cache_dir.c_str()).urgent()();
if (miss_on_failure) {
return;
}

exit(1);
}

wcl::result<FindJobResponse, FindJobError> Cache::read_impl(const FindJobRequest &find_request) {
Expand Down Expand Up @@ -287,7 +293,7 @@ wcl::result<FindJobResponse, FindJobError> Cache::read_impl(const FindJobRequest
}

FindJobResponse Cache::read(const FindJobRequest &find_request) {
static int lifetime_retries = 0;
static int misses_from_failure = 0;

wcl::xoshiro_256 rng(wcl::xoshiro_256::get_rng_seed());
useconds_t backoff = 1000;
Expand All @@ -299,9 +305,10 @@ FindJobResponse Cache::read(const FindJobRequest &find_request) {
return *response;
}

// TODO: Add config var to determine if fail is a cache miss
if (lifetime_retries > 100 && false) {
wcl::log::warning("Cache::read(): reached maximum lifetime retries. Triggering cache miss")();
if (miss_on_failure && misses_from_failure > 30) {
wcl::log::warning(
"Cache::read(): reached maximum cache misses for this invocation. Triggering early "
"miss.")();
return FindJobResponse(wcl::optional<MatchingJob>{});
}

Expand All @@ -310,8 +317,6 @@ FindJobResponse Cache::read(const FindJobRequest &find_request) {
backoff *= 2;

// Retry
lifetime_retries++;

wcl::log::info("Relaunching the daemon.")();
launch_daemon();

Expand All @@ -324,13 +329,14 @@ FindJobResponse Cache::read(const FindJobRequest &find_request) {
wcl::log::error("Cache::read(): at least one connect failure occured")();
}

// TODO: Add config var to determine if fail is a cache miss
if (true) {
wcl::log::error("Cache::read(): Failed to read from daemon cache.").urgent()();
exit(1);
wcl::log::error("Cache::read(): Failed to read from daemon cache.").urgent()();

if (miss_on_failure) {
misses_from_failure++;
return FindJobResponse(wcl::optional<MatchingJob>{});
}

return FindJobResponse(wcl::optional<MatchingJob>{});
exit(1);
}

void Cache::add(const AddJobRequest &add_request) {
Expand Down
3 changes: 2 additions & 1 deletion src/job_cache/job_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum class FindJobError {
class Cache {
private:
wcl::unique_fd socket_fd;
bool miss_on_failure = false;

// Daemon parameters
std::string cache_dir;
Expand All @@ -65,7 +66,7 @@ class Cache {
Cache() = delete;
Cache(const Cache &) = delete;

Cache(std::string dir, uint64_t max, uint64_t low);
Cache(std::string dir, uint64_t max, uint64_t low, bool miss);

FindJobResponse read(const FindJobRequest &find_request);
void add(const AddJobRequest &add_request);
Expand Down
14 changes: 11 additions & 3 deletions src/runtime/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ POLICY_STATIC_DEFINES(LogHeaderSourceWidthPolicy)
POLICY_STATIC_DEFINES(LabelFilterPolicy)
POLICY_STATIC_DEFINES(SharedCacheMaxSize)
POLICY_STATIC_DEFINES(SharedCacheLowSize)
POLICY_STATIC_DEFINES(SharedCacheMissOnFailure)
POLICY_STATIC_DEFINES(LogHeaderAlignPolicy)

/********************************************************************
Expand Down Expand Up @@ -268,10 +269,17 @@ void SharedCacheLowSize::set(SharedCacheLowSize& p, const JAST& json) {
}
}

void SharedCacheMissOnFailure::set(SharedCacheMissOnFailure& p, const JAST& json) {
auto json_shared_cache_miss_on_failure = json.expect_boolean();
if (json_shared_cache_miss_on_failure) {
p.cache_miss_on_failure = *json_shared_cache_miss_on_failure;
}
}

void LogHeaderAlignPolicy::set(LogHeaderAlignPolicy& p, const JAST& json) {
auto json_log_header_algih = json.expect_boolean();
if (json_log_header_algih) {
p.log_header_align = *json_log_header_algih;
auto json_log_header_align = json.expect_boolean();
if (json_log_header_align) {
p.log_header_align = *json_log_header_align;
}
}

Expand Down
47 changes: 38 additions & 9 deletions src/runtime/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ struct WakeConfigOverrides {
// Determines the size of the cache that collection
// tries to get us back to.
wcl::optional<uint64_t> low_cache_size;

// Determines if log headers should be aligned
wcl::optional<bool> log_header_align;

// Determines if job cache should terminate on error or return a cache miss
wcl::optional<bool> cache_miss_on_failure;
};

template <class T>
using Override = wcl::optional<T> WakeConfigOverrides::*;

/********************************************************************
* Polcies
* Policies
********************************************************************/

struct VersionPolicy {
Expand Down Expand Up @@ -172,6 +177,29 @@ struct SharedCacheLowSize {
static void emit(const SharedCacheLowSize& p, std::ostream& os) { os << p.*value; }
};

struct SharedCacheMissOnFailure {
using type = bool;
using input_type = type;
static constexpr const char* key = "cache_miss_on_failure";
static constexpr bool allowed_in_wakeroot = true;
static constexpr bool allowed_in_userconfig = true;
type cache_miss_on_failure = false;
static constexpr type SharedCacheMissOnFailure::*value =
&SharedCacheMissOnFailure::cache_miss_on_failure;
static constexpr Override<input_type> override_value =
&WakeConfigOverrides::cache_miss_on_failure;

static void set(SharedCacheMissOnFailure& p, const JAST& json);
static void set_input(SharedCacheMissOnFailure& p, const input_type& v) { p.*value = v; }
static void emit(const SharedCacheMissOnFailure& p, std::ostream& os) {
if (p.cache_miss_on_failure) {
os << "true";
} else {
os << "false";
}
}
};

struct LogHeaderAlignPolicy {
using type = bool;
using input_type = type;
Expand Down Expand Up @@ -214,8 +242,8 @@ static inline const char* to_string(WakeConfigProvenance p) {
return "Unknown (this is an error, please report this to Wake devs)";
}

template <class... Polcies>
struct WakeConfigImpl : public Polcies... {
template <class... Policies>
struct WakeConfigImpl : public Policies... {
WakeConfigImpl(const WakeConfigImpl&) = delete;
WakeConfigImpl(WakeConfigImpl&&) = delete;
WakeConfigImpl() = default;
Expand Down Expand Up @@ -295,35 +323,36 @@ struct WakeConfigImpl : public Polcies... {
protected:
static std::set<std::string> wakeroot_allowed_keys() {
std::set<std::string> out;
call_all(add_wakeroot_key<Polcies>(out)...);
call_all(add_wakeroot_key<Policies>(out)...);
return out;
}

static std::set<std::string> userconfig_allowed_keys() {
std::set<std::string> out;
call_all(add_userconfig_key<Polcies>(out)...);
call_all(add_userconfig_key<Policies>(out)...);
return out;
}

template <WakeConfigProvenance p>
void set_all(const JAST& json) {
call_all(set_policy<p, Polcies>(json)...);
call_all(set_policy<p, Policies>(json)...);
}

void override_all(const WakeConfigOverrides& overrides) {
call_all(override_policy<Polcies>(overrides)...);
call_all(override_policy<Policies>(overrides)...);
}

public:
void emit(std::ostream& os) const {
os << "Wake config:" << std::endl;
call_all(emit_each<Polcies>(os)...);
call_all(emit_each<Policies>(os)...);
}
};

using WakeConfigImplFull =
WakeConfigImpl<UserConfigPolicy, VersionPolicy, LogHeaderPolicy, LogHeaderSourceWidthPolicy,
LabelFilterPolicy, SharedCacheMaxSize, SharedCacheLowSize, LogHeaderAlignPolicy>;
LabelFilterPolicy, SharedCacheMaxSize, SharedCacheLowSize,
SharedCacheMissOnFailure, LogHeaderAlignPolicy>;

struct WakeConfig final : public WakeConfigImplFull {
static bool init(const std::string& wakeroot_path, const WakeConfigOverrides& overrides);
Expand Down
1 change: 1 addition & 0 deletions tests/config/empty/stdout
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Wake config:
label_filter = '.*' (Default)
max_cache_size = '26843545600' (Default)
low_cache_size = '16106127360' (Default)
cache_miss_on_failure = 'false' (Default)
log_header_align = 'false' (Default)
1 change: 1 addition & 0 deletions tests/config/extra_keys/stdout
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Wake config:
label_filter = '.*' (Default)
max_cache_size = '26843545600' (Default)
low_cache_size = '16106127360' (Default)
cache_miss_on_failure = 'false' (Default)
log_header_align = 'false' (Default)
1 change: 1 addition & 0 deletions tests/config/missing_user_config/stdout
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Wake config:
label_filter = '.*' (Default)
max_cache_size = '26843545600' (Default)
low_cache_size = '16106127360' (Default)
cache_miss_on_failure = 'false' (Default)
log_header_align = 'false' (Default)
3 changes: 2 additions & 1 deletion tests/config/nominal/.wakeroot
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"log_header_source_width": 13,
"max_cache_size": 1024,
"low_cache_size": 512,
"log_header_align": true
"log_header_align": true,
"cache_miss_on_failure": true
}
1 change: 1 addition & 0 deletions tests/config/nominal/stdout
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Wake config:
label_filter = '.*' (Default)
max_cache_size = '1024' (WakeRoot)
low_cache_size = '512' (WakeRoot)
cache_miss_on_failure = 'true' (WakeRoot)
log_header_align = 'true' (WakeRoot)
4 changes: 2 additions & 2 deletions tools/wake-unit/fuzz_test_job_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ TEST_FUNC(void, fuzz_loop, const FuzzLoopConfig& config, wcl::xoshiro_256 gen) {

mkdir(config.cache_dir.c_str(), 0777);
mkdir(config.dir.c_str(), 0777);
job_cache::Cache cache(config.cache_dir, 1ULL << 24ULL, (1 << 23ULL) + (1 << 22ULL));
job_cache::Cache cache(config.cache_dir, 1ULL << 24ULL, (1 << 23ULL) + (1 << 22ULL), false);

std::string out_dir = wcl::join_paths(config.dir, "outputs");
for (size_t i = 0; i < config.number_of_steps; ++i) {
Expand Down Expand Up @@ -424,4 +424,4 @@ TEST(job_cache_large_par_fuzz) {
if (fut.valid()) fut.wait();
}
}
*/
*/
11 changes: 11 additions & 0 deletions tools/wake/cli_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct CommandLineOptions {
bool clean;
bool list_outputs;
wcl::optional<bool> log_header_align;
wcl::optional<bool> cache_miss_on_failure;

const char *percent_str;
const char *jobs_str;
Expand Down Expand Up @@ -152,6 +153,8 @@ struct CommandLineOptions {
{0, "log-header-source-width", GOPT_ARGUMENT_REQUIRED},
{0, "log-header-align", GOPT_ARGUMENT_FORBIDDEN},
{0, "no-log-header-align", GOPT_ARGUMENT_FORBIDDEN},
{0, "cache-miss-on-failure", GOPT_ARGUMENT_FORBIDDEN},
{0, "no-cache-miss-on-failure", GOPT_ARGUMENT_FORBIDDEN},
{':', "shebang", GOPT_ARGUMENT_REQUIRED},
{0, 0, GOPT_LAST}
};
Expand Down Expand Up @@ -221,6 +224,14 @@ struct CommandLineOptions {
log_header_align = wcl::some(false);
}

if (arg(options, "cache-miss-on-failure")->count) {
cache_miss_on_failure = wcl::some(true);
}

if (arg(options, "no-cache-miss-on-failure")->count) {
cache_miss_on_failure = wcl::some(false);
}

auto lhsw_str = arg(options, "log-header-source-width")->argument;
if (lhsw_str) log_header_source_width = wcl::make_some<int64_t>(std::stol(lhsw_str));

Expand Down
4 changes: 3 additions & 1 deletion tools/wake/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ int main(int argc, char **argv) {
}
config_override.log_header_source_width = clo.log_header_source_width;
config_override.log_header_align = clo.log_header_align;
config_override.cache_miss_on_failure = clo.cache_miss_on_failure;

if (!WakeConfig::init(".wakeroot", config_override)) {
return 1;
Expand Down Expand Up @@ -369,7 +370,8 @@ int main(int argc, char **argv) {
const char *job_cache_dir = getenv("WAKE_EXPERIMENTAL_JOB_CACHE");
if (job_cache_dir != nullptr) {
cache = std::make_unique<job_cache::Cache>(job_cache_dir, WakeConfig::get()->max_cache_size,
WakeConfig::get()->low_cache_size);
WakeConfig::get()->low_cache_size,
WakeConfig::get()->cache_miss_on_failure);
set_job_cache(cache.get());
}

Expand Down

0 comments on commit 996e405

Please sign in to comment.