From 60c13a11ace946c241dfaafe838e970c19f9e63d Mon Sep 17 00:00:00 2001 From: Federico Di Pierro Date: Fri, 9 Jun 2023 15:32:45 +0200 Subject: [PATCH] fix(lib,tests,Samples): multiple fixes here and there and some more API breakages. Updated todo. Signed-off-by: Federico Di Pierro --- Lib/core/evts.c | 12 ++----- Lib/core/mod.c | 2 +- Lib/core/ps.c | 4 +-- Lib/core/public/module/mod.h | 7 ++-- Lib/core/src.c | 63 +++++++++++++++++++++--------------- Lib/utils/log.c | 58 +++++++++++---------------------- Samples/Task/pippo.c | 2 +- TODO.md | 20 +++++++++--- tests/main.c | 2 ++ tests/test_mod.c | 28 ++++++++++++++++ tests/test_mod.h | 1 + 11 files changed, 111 insertions(+), 88 deletions(-) diff --git a/Lib/core/evts.c b/Lib/core/evts.c index 9722c66f..e3c37edc 100644 --- a/Lib/core/evts.c +++ b/Lib/core/evts.c @@ -89,19 +89,13 @@ _public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) { return processed; } -_public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) { +_public_ int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns) { M_MOD_ASSERT(mod); M_MOD_CONSUME_TOKEN(mod); mod->batch.len = len; - return 0; -} - -_public_ int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns) { - M_MOD_ASSERT(mod); - - // src_deregister and src_register already consume a token - + + // TODO: src_deregister and src_register already consume a token /* If it was already set, remove old timer */ if (mod->batch.timer.ns != 0) { m_mod_src_deregister_tmr(mod, &mod->batch.timer); diff --git a/Lib/core/mod.c b/Lib/core/mod.c index b3297401..d029999b 100644 --- a/Lib/core/mod.c +++ b/Lib/core/mod.c @@ -547,7 +547,7 @@ _public_ int m_mod_deregister(m_mod_t **mod) { return mod_deregister(mod, true); } -_public_ int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst) { +_public_ int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst) { M_MOD_ASSERT(mod); M_PARAM_ASSERT(rate <= BILLION); diff --git a/Lib/core/ps.c b/Lib/core/ps.c index 38347264..cf2f58a2 100644 --- a/Lib/core/ps.c +++ b/Lib/core/ps.c @@ -135,7 +135,7 @@ static int send_msg(m_mod_t *mod, const m_mod_t *recipient, const char *topic, M_PARAM_ASSERT(message); mod->stats.sent_msgs++; - ps_priv_t m = { { false, mod, topic, message }, flags, NULL }; + ps_priv_t m = { { mod, topic, message }, flags, NULL }; return tell_pubsub_msg(&m, recipient, mod->ctx); } @@ -146,7 +146,7 @@ int tell_system_pubsub_msg(const m_mod_t *recipient, m_ctx_t *c, m_mod_t *sender // A module sent a M_PS_MOD_POISONPILL message to another, or it was stopped sender->stats.sent_msgs++; } - ps_priv_t m = { { true, sender, topic, NULL }, 0, NULL }; + ps_priv_t m = { { sender, topic, NULL }, 0, NULL }; return tell_pubsub_msg(&m, recipient, c); } diff --git a/Lib/core/public/module/mod.h b/Lib/core/public/module/mod.h index d41f2072..f8b228cf 100644 --- a/Lib/core/public/module/mod.h +++ b/Lib/core/public/module/mod.h @@ -34,6 +34,7 @@ typedef enum { M_SRC_AUTOFREE = 1 << 3, // Automatically free userdata upon source deregistation. M_SRC_ONESHOT = 1 << 4, // Run just once then automatically deregister source. M_SRC_DUP = 1 << 5, // Duplicate PubSub topic, source fd or source path. + M_SRC_FORCE = 1 << 6, // Force the registration of a src, even if it is already existent (it deregisters previous src) M_SRC_FD_AUTOCLOSE = M_SRC_SHIFT(M_SRC_TYPE_FD, 1 << 0), // Automatically close fd upon deregistation. M_SRC_TMR_ABSOLUTE = M_SRC_SHIFT(M_SRC_TYPE_TMR, 1 << 0), // Absolute timer } m_src_flags; @@ -87,7 +88,6 @@ typedef struct { /* PubSub messages */ typedef struct { - bool system; // Is this a system message? const m_mod_t *sender; const char *topic; const void *data; // NULL for system messages @@ -269,11 +269,10 @@ int m_mod_src_register_thresh(m_mod_t *mod, const m_src_thresh_t *thr, m_src_fla int m_mod_src_deregister_thresh(m_mod_t *mod, const m_src_thresh_t *thr); /* Event batch management */ -int m_mod_set_batch_size(m_mod_t *mod, size_t len); -int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns); +int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns); /* Mod tokenbucket */ -int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst); +int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst); /* Generic event source registering functions */ #define m_mod_src_register(mod, X, flags, userptr) _Generic((X) + 0, \ diff --git a/Lib/core/src.c b/Lib/core/src.c index d140cb3a..3498f152 100644 --- a/Lib/core/src.c +++ b/Lib/core/src.c @@ -200,56 +200,56 @@ static ev_src_t *create_src(m_mod_t *mod, m_src_types type, process_cb proc, } static int fdcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - int fd = *((int *)my_data); + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return fd - src->fd_src.fd; + return other->fd_src.fd - node->fd_src.fd; } static int tmrcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_tmr_t *its = (const m_src_tmr_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return its->ns - src->tmr_src.its.ns; + return other->tmr_src.its.ns - node->tmr_src.its.ns; } static int sgncmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_sgn_t *sgs = (const m_src_sgn_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return sgs->signo - src->sgn_src.sgs.signo; + return other->sgn_src.sgs.signo - node->sgn_src.sgs.signo; } static int pathcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_path_t *pt = (const m_src_path_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return strcmp(pt->path, src->path_src.pt.path); + return strcmp(other->path_src.pt.path, node->path_src.pt.path); } static int pidcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_pid_t *pid = (const m_src_pid_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return pid->pid - src->pid_src.pid.pid; + return other->pid_src.pid.pid - node->pid_src.pid.pid; } static int taskcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_task_t *tid = (const m_src_task_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return tid->tid - src->task_src.tid.tid; + return other->task_src.tid.tid - node->task_src.tid.tid; } static int threshcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_thresh_t *thr = (const m_src_thresh_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - long double my_val = (long double)thr->activity_freq - + (long double)thr->inactive_ms; - long double their_val = (long double)src->thresh_src.thr.activity_freq - + (long double)src->thresh_src.thr.inactive_ms; - return my_val - their_val; + long double other_val = (long double)other->thresh_src.thr.activity_freq + + (long double)other->thresh_src.thr.inactive_ms; + long double node_val = (long double)node->thresh_src.thr.activity_freq + + (long double)node->thresh_src.thr.inactive_ms; + return other_val - node_val; } static ev_src_t *process_ps(ev_src_t *this, m_ctx_t *c, int idx, evt_priv_t *evt) { @@ -373,6 +373,10 @@ int register_mod_src(m_mod_t *mod, m_src_types type, const void *src_data, return -EINVAL; } int ret = m_bst_insert(mod->srcs[type], src); + if (ret == -EEXIST && flags & M_SRC_FORCE) { + m_bst_remove(mod->srcs[type], src); + ret = m_bst_insert(mod->srcs[type], src); + } if (ret == 0) { /* If a src is registered at runtime, start receiving its events immediately */ if (m_mod_is(mod, M_MOD_RUNNING)) { @@ -394,7 +398,14 @@ int deregister_mod_src(m_mod_t *mod, m_src_types type, void *src_data) { M_MOD_ASSERT(mod); M_MOD_CONSUME_TOKEN(mod); - return m_bst_remove(mod->srcs[type], src_data); + // Create onetime src to check the bst + ev_src_t *src = create_src(mod, type, src_procs_map[type], src_data, 0, NULL); + if (!src) { + return -EINVAL; + } + int ret = m_bst_remove(mod->srcs[type], src); + m_mem_unref(src); + return ret; } int start_task(m_ctx_t *c, ev_src_t *src) { diff --git a/Lib/utils/log.c b/Lib/utils/log.c index de33d5de..fe22a272 100644 --- a/Lib/utils/log.c +++ b/Lib/utils/log.c @@ -27,6 +27,9 @@ X_LOG_LEVELS /** **/ static inline m_logger_level find_level(const char *level_str) { + if (!level_str) { + return -1; + } static const char *lvl_names[] = { #define X_LOG_LEVEL(name) #name, X_LOG_LEVELS @@ -47,27 +50,27 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) { X_LOG_CTXS #undef X_LOG_CTX }; - char *log_env; + + // Load fallback global level + int global_level = find_level(getenv("LIBMODULE_LOG")); + if (global_level == -1) { + // Default value + global_level = ERR; + } + char env_name[64]; - bool log_set[X_LOG_CTX_MAX] = {0}; + // Now load log levels for each context for (int i = 0; i < X_LOG_CTX_MAX; i++) { - /* Default values */ + // Default noop logger libmodule_logger.DEBUG[i] = libmodule_log_noop; libmodule_logger.INFO[i] = libmodule_log_noop; libmodule_logger.WARN[i] = libmodule_log_noop; libmodule_logger.ERR[i] = libmodule_log_noop; - - int log_level = ERR; + snprintf(env_name, sizeof(env_name), "LIBMODULE_LOG_%s", ctx_names[i]); - log_env = getenv(env_name); - if (log_env) { - log_level = find_level(log_env); - if (log_level != -1) { - log_set[i] = true; - } else { - // Default value - log_level = ERR; - } + int log_level = find_level(getenv(env_name)); + if (log_level == -1) { + log_level = global_level; } switch (log_level) { case DEBUG: @@ -81,32 +84,7 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) { break; } } - - int log_level = ERR; - log_env = getenv("LIBMODULE_LOG"); - if (log_env) { - log_level = find_level(log_env); - if (log_level == -1) { - // Default value - log_level = ERR; - } - } - for (int i = 0; i < X_LOG_CTX_MAX; i++) { - if (!log_set[i]) { - switch (log_level) { - case DEBUG: - libmodule_logger.DEBUG[i] = libmodule_log_DEBUG; - case INFO: - libmodule_logger.INFO[i] = libmodule_log_INFO; - case WARN: - libmodule_logger.WARN[i] = libmodule_log_WARN; - default: - libmodule_logger.ERR[i] = libmodule_log_ERR; - break; - } - } - } - + const char *log_file_path = getenv("LIBMODULE_LOG_OUTPUT"); if (log_file_path) { libmodule_logger.log_file = fopen(log_file_path, "w"); diff --git a/Samples/Task/pippo.c b/Samples/Task/pippo.c index 45695429..635c95b2 100644 --- a/Samples/Task/pippo.c +++ b/Samples/Task/pippo.c @@ -30,7 +30,7 @@ static bool m_mod_on_start(m_mod_t *mod) { m_mod_src_register(mod, &((m_src_tmr_t) { CLOCK_MONOTONIC, (uint64_t)1 * 1000 * 1000 * 1000 }), 0, &tmrData); // 1s m_mod_src_register(mod, &((m_src_task_t) { 8, inc }), 0, &thData); - m_mod_set_batch_timeout(mod, 1500); // 1500ms! + m_mod_batch_set(mod, 0, 1500); // 1500ms! return true; } diff --git a/TODO.md b/TODO.md index d022be59..e0357ea6 100644 --- a/TODO.md +++ b/TODO.md @@ -4,11 +4,21 @@ #### Mod -- [ ] Drop `system` field from m_evt_ps_t? -- [ ] Double check `from_user` in `mod_deregister` -- [ ] Rename `m_mod_set_batch_size` to `m_mod_batch_set_size` -- [ ] Unify `m_mod_set_batch_size` and `m_mod_set_batch_timeout` under `m_mod_batch_set(size_t, uint64_t)` -- [ ] Rename `m_mod_set_tokenbucket` to `m_mod_tb_set`? +- [x] Drop `system` field from m_evt_ps_t? +- [x] Unify `m_mod_set_batch_size` and `m_mod_set_batch_timeout` under `m_mod_batch_set(size_t, uint64_t)` +- [x] Rename `m_mod_set_tokenbucket` to `m_mod_tb_set`? +- [ ] Properly fixup M_MOD_CONSUME_TOKEN() to only be called by external API (ie: user visible) + +### Src +- [x] add an M_SRC_FORCE flag to register_mod_src to force register a src even if the same is already existing (deregistering the old one)? +- [x] double check m_bst_insert/remove usage in src API + add unit tests! + +#### Ctx +- [ ] use pthread_setname_np() to store each context thread name (max 16chars len; drop ctx->name field) ? + +#### Generic + +- [ ] expose a `libmodule_set_loglevel` API? #### DOC diff --git a/tests/main.c b/tests/main.c index b8638bc4..2833bfe2 100644 --- a/tests/main.c +++ b/tests/main.c @@ -79,6 +79,8 @@ int main(void) { cmocka_unit_test(test_mod_rm_fd_NULL_self), cmocka_unit_test(test_mod_rm_fd), + cmocka_unit_test(test_mod_srcs), + /* Test module subscribe */ cmocka_unit_test(test_mod_subscribe_NULL_topic), cmocka_unit_test(test_mod_subscribe_NULL_self), diff --git a/tests/test_mod.c b/tests/test_mod.c index 38936cbd..283e0c4e 100644 --- a/tests/test_mod.c +++ b/tests/test_mod.c @@ -261,6 +261,34 @@ void test_mod_add_fd(void **state) { assert_true(ret == -EEXIST); } + +void test_mod_srcs(void **state) { + (void) state; /* unused */ + + // 1000s just to test + const m_src_tmr_t my_tmr = {.ns = 1000000000000 }; + + int ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE, NULL); + assert_true(ret == -EEXIST); + + /* Register again, forcing the registration. */ + ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE | M_SRC_FORCE, NULL); + assert_true(ret == 0); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR); + assert_true(len == 1); + + ret = m_mod_src_deregister_tmr(test_mod, &my_tmr); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR); + assert_true(len == 0); +} + void test_mod_rm_wrong_fd(void **state) { (void) state; /* unused */ diff --git a/tests/test_mod.h b/tests/test_mod.h index ea6539e3..01475575 100644 --- a/tests/test_mod.h +++ b/tests/test_mod.h @@ -27,6 +27,7 @@ void test_mod_unbecome(void **state); void test_mod_add_wrong_fd(void **state); void test_mod_add_fd_NULL_self(void **state); void test_mod_add_fd(void **state); +void test_mod_srcs(void **state); void test_mod_rm_wrong_fd(void **state); void test_mod_rm_wrong_fd_2(void **state); void test_mod_rm_fd_NULL_self(void **state);