Skip to content

Commit

Permalink
fix(lib,tests,Samples): multiple fixes here and there and some more A…
Browse files Browse the repository at this point in the history
…PI breakages.

Updated todo.

Signed-off-by: Federico Di Pierro <nierro92@gmail.com>
  • Loading branch information
FedeDP committed Jun 9, 2023
1 parent 4995059 commit 60c13a1
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 88 deletions.
12 changes: 3 additions & 9 deletions Lib/core/evts.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,13 @@ _public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) {
return processed;
}

_public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) {
_public_ int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

mod->batch.len = len;
return 0;
}

_public_ int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns) {
M_MOD_ASSERT(mod);

// src_deregister and src_register already consume a token


// TODO: src_deregister and src_register already consume a token
/* If it was already set, remove old timer */
if (mod->batch.timer.ns != 0) {
m_mod_src_deregister_tmr(mod, &mod->batch.timer);
Expand Down
2 changes: 1 addition & 1 deletion Lib/core/mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ _public_ int m_mod_deregister(m_mod_t **mod) {
return mod_deregister(mod, true);
}

_public_ int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst) {
_public_ int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst) {
M_MOD_ASSERT(mod);
M_PARAM_ASSERT(rate <= BILLION);

Expand Down
4 changes: 2 additions & 2 deletions Lib/core/ps.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ static int send_msg(m_mod_t *mod, const m_mod_t *recipient, const char *topic,
M_PARAM_ASSERT(message);

mod->stats.sent_msgs++;
ps_priv_t m = { { false, mod, topic, message }, flags, NULL };
ps_priv_t m = { { mod, topic, message }, flags, NULL };
return tell_pubsub_msg(&m, recipient, mod->ctx);
}

Expand All @@ -146,7 +146,7 @@ int tell_system_pubsub_msg(const m_mod_t *recipient, m_ctx_t *c, m_mod_t *sender
// A module sent a M_PS_MOD_POISONPILL message to another, or it was stopped
sender->stats.sent_msgs++;
}
ps_priv_t m = { { true, sender, topic, NULL }, 0, NULL };
ps_priv_t m = { { sender, topic, NULL }, 0, NULL };
return tell_pubsub_msg(&m, recipient, c);
}

Expand Down
7 changes: 3 additions & 4 deletions Lib/core/public/module/mod.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef enum {
M_SRC_AUTOFREE = 1 << 3, // Automatically free userdata upon source deregistation.
M_SRC_ONESHOT = 1 << 4, // Run just once then automatically deregister source.
M_SRC_DUP = 1 << 5, // Duplicate PubSub topic, source fd or source path.
M_SRC_FORCE = 1 << 6, // Force the registration of a src, even if it is already existent (it deregisters previous src)
M_SRC_FD_AUTOCLOSE = M_SRC_SHIFT(M_SRC_TYPE_FD, 1 << 0), // Automatically close fd upon deregistation.
M_SRC_TMR_ABSOLUTE = M_SRC_SHIFT(M_SRC_TYPE_TMR, 1 << 0), // Absolute timer
} m_src_flags;
Expand Down Expand Up @@ -87,7 +88,6 @@ typedef struct {

/* PubSub messages */
typedef struct {
bool system; // Is this a system message?
const m_mod_t *sender;
const char *topic;
const void *data; // NULL for system messages
Expand Down Expand Up @@ -269,11 +269,10 @@ int m_mod_src_register_thresh(m_mod_t *mod, const m_src_thresh_t *thr, m_src_fla
int m_mod_src_deregister_thresh(m_mod_t *mod, const m_src_thresh_t *thr);

/* Event batch management */
int m_mod_set_batch_size(m_mod_t *mod, size_t len);
int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns);
int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns);

/* Mod tokenbucket */
int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst);
int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst);

/* Generic event source registering functions */
#define m_mod_src_register(mod, X, flags, userptr) _Generic((X) + 0, \
Expand Down
63 changes: 37 additions & 26 deletions Lib/core/src.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,56 +200,56 @@ static ev_src_t *create_src(m_mod_t *mod, m_src_types type, process_cb proc,
}

static int fdcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
int fd = *((int *)my_data);
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return fd - src->fd_src.fd;
return other->fd_src.fd - node->fd_src.fd;
}

static int tmrcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_tmr_t *its = (const m_src_tmr_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return its->ns - src->tmr_src.its.ns;
return other->tmr_src.its.ns - node->tmr_src.its.ns;
}

static int sgncmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_sgn_t *sgs = (const m_src_sgn_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return sgs->signo - src->sgn_src.sgs.signo;
return other->sgn_src.sgs.signo - node->sgn_src.sgs.signo;
}

static int pathcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_path_t *pt = (const m_src_path_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return strcmp(pt->path, src->path_src.pt.path);
return strcmp(other->path_src.pt.path, node->path_src.pt.path);
}

static int pidcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_pid_t *pid = (const m_src_pid_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return pid->pid - src->pid_src.pid.pid;
return other->pid_src.pid.pid - node->pid_src.pid.pid;
}

static int taskcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_task_t *tid = (const m_src_task_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return tid->tid - src->task_src.tid.tid;
return other->task_src.tid.tid - node->task_src.tid.tid;
}

static int threshcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_thresh_t *thr = (const m_src_thresh_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

long double my_val = (long double)thr->activity_freq
+ (long double)thr->inactive_ms;
long double their_val = (long double)src->thresh_src.thr.activity_freq
+ (long double)src->thresh_src.thr.inactive_ms;
return my_val - their_val;
long double other_val = (long double)other->thresh_src.thr.activity_freq
+ (long double)other->thresh_src.thr.inactive_ms;
long double node_val = (long double)node->thresh_src.thr.activity_freq
+ (long double)node->thresh_src.thr.inactive_ms;
return other_val - node_val;
}

static ev_src_t *process_ps(ev_src_t *this, m_ctx_t *c, int idx, evt_priv_t *evt) {
Expand Down Expand Up @@ -373,6 +373,10 @@ int register_mod_src(m_mod_t *mod, m_src_types type, const void *src_data,
return -EINVAL;
}
int ret = m_bst_insert(mod->srcs[type], src);
if (ret == -EEXIST && flags & M_SRC_FORCE) {
m_bst_remove(mod->srcs[type], src);
ret = m_bst_insert(mod->srcs[type], src);
}
if (ret == 0) {
/* If a src is registered at runtime, start receiving its events immediately */
if (m_mod_is(mod, M_MOD_RUNNING)) {
Expand All @@ -394,7 +398,14 @@ int deregister_mod_src(m_mod_t *mod, m_src_types type, void *src_data) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

return m_bst_remove(mod->srcs[type], src_data);
// Create onetime src to check the bst
ev_src_t *src = create_src(mod, type, src_procs_map[type], src_data, 0, NULL);
if (!src) {
return -EINVAL;
}
int ret = m_bst_remove(mod->srcs[type], src);
m_mem_unref(src);
return ret;
}

int start_task(m_ctx_t *c, ev_src_t *src) {
Expand Down
58 changes: 18 additions & 40 deletions Lib/utils/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ X_LOG_LEVELS
/** **/

static inline m_logger_level find_level(const char *level_str) {
if (!level_str) {
return -1;
}
static const char *lvl_names[] = {
#define X_LOG_LEVEL(name) #name,
X_LOG_LEVELS
Expand All @@ -47,27 +50,27 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) {
X_LOG_CTXS
#undef X_LOG_CTX
};
char *log_env;

// Load fallback global level
int global_level = find_level(getenv("LIBMODULE_LOG"));
if (global_level == -1) {
// Default value
global_level = ERR;
}

char env_name[64];
bool log_set[X_LOG_CTX_MAX] = {0};
// Now load log levels for each context
for (int i = 0; i < X_LOG_CTX_MAX; i++) {
/* Default values */
// Default noop logger
libmodule_logger.DEBUG[i] = libmodule_log_noop;
libmodule_logger.INFO[i] = libmodule_log_noop;
libmodule_logger.WARN[i] = libmodule_log_noop;
libmodule_logger.ERR[i] = libmodule_log_noop;

int log_level = ERR;

snprintf(env_name, sizeof(env_name), "LIBMODULE_LOG_%s", ctx_names[i]);
log_env = getenv(env_name);
if (log_env) {
log_level = find_level(log_env);
if (log_level != -1) {
log_set[i] = true;
} else {
// Default value
log_level = ERR;
}
int log_level = find_level(getenv(env_name));
if (log_level == -1) {
log_level = global_level;
}
switch (log_level) {
case DEBUG:
Expand All @@ -81,32 +84,7 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) {
break;
}
}

int log_level = ERR;
log_env = getenv("LIBMODULE_LOG");
if (log_env) {
log_level = find_level(log_env);
if (log_level == -1) {
// Default value
log_level = ERR;
}
}
for (int i = 0; i < X_LOG_CTX_MAX; i++) {
if (!log_set[i]) {
switch (log_level) {
case DEBUG:
libmodule_logger.DEBUG[i] = libmodule_log_DEBUG;
case INFO:
libmodule_logger.INFO[i] = libmodule_log_INFO;
case WARN:
libmodule_logger.WARN[i] = libmodule_log_WARN;
default:
libmodule_logger.ERR[i] = libmodule_log_ERR;
break;
}
}
}


const char *log_file_path = getenv("LIBMODULE_LOG_OUTPUT");
if (log_file_path) {
libmodule_logger.log_file = fopen(log_file_path, "w");
Expand Down
2 changes: 1 addition & 1 deletion Samples/Task/pippo.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static bool m_mod_on_start(m_mod_t *mod) {

m_mod_src_register(mod, &((m_src_tmr_t) { CLOCK_MONOTONIC, (uint64_t)1 * 1000 * 1000 * 1000 }), 0, &tmrData); // 1s
m_mod_src_register(mod, &((m_src_task_t) { 8, inc }), 0, &thData);
m_mod_set_batch_timeout(mod, 1500); // 1500ms!
m_mod_batch_set(mod, 0, 1500); // 1500ms!
return true;
}

Expand Down
20 changes: 15 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@

#### Mod

- [ ] Drop `system` field from m_evt_ps_t?
- [ ] Double check `from_user` in `mod_deregister`
- [ ] Rename `m_mod_set_batch_size` to `m_mod_batch_set_size`
- [ ] Unify `m_mod_set_batch_size` and `m_mod_set_batch_timeout` under `m_mod_batch_set(size_t, uint64_t)`
- [ ] Rename `m_mod_set_tokenbucket` to `m_mod_tb_set`?
- [x] Drop `system` field from m_evt_ps_t?
- [x] Unify `m_mod_set_batch_size` and `m_mod_set_batch_timeout` under `m_mod_batch_set(size_t, uint64_t)`
- [x] Rename `m_mod_set_tokenbucket` to `m_mod_tb_set`?
- [ ] Properly fixup M_MOD_CONSUME_TOKEN() to only be called by external API (ie: user visible)

### Src
- [x] add an M_SRC_FORCE flag to register_mod_src to force register a src even if the same is already existing (deregistering the old one)?
- [x] double check m_bst_insert/remove usage in src API + add unit tests!

#### Ctx
- [ ] use pthread_setname_np() to store each context thread name (max 16chars len; drop ctx->name field) ?

#### Generic

- [ ] expose a `libmodule_set_loglevel` API?

#### DOC

Expand Down
2 changes: 2 additions & 0 deletions tests/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ int main(void) {
cmocka_unit_test(test_mod_rm_fd_NULL_self),
cmocka_unit_test(test_mod_rm_fd),

cmocka_unit_test(test_mod_srcs),

/* Test module subscribe */
cmocka_unit_test(test_mod_subscribe_NULL_topic),
cmocka_unit_test(test_mod_subscribe_NULL_self),
Expand Down
28 changes: 28 additions & 0 deletions tests/test_mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,34 @@ void test_mod_add_fd(void **state) {
assert_true(ret == -EEXIST);
}


void test_mod_srcs(void **state) {
(void) state; /* unused */

// 1000s just to test
const m_src_tmr_t my_tmr = {.ns = 1000000000000 };

int ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE, NULL);
assert_true(ret == 0);

/* Try to register again, expect -EEXIST error */
ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE, NULL);
assert_true(ret == -EEXIST);

/* Register again, forcing the registration. */
ret = m_mod_src_register_tmr(test_mod, &my_tmr, M_SRC_FD_AUTOCLOSE | M_SRC_FORCE, NULL);
assert_true(ret == 0);

size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR);
assert_true(len == 1);

ret = m_mod_src_deregister_tmr(test_mod, &my_tmr);
assert_true(ret == 0);

len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR);
assert_true(len == 0);
}

void test_mod_rm_wrong_fd(void **state) {
(void) state; /* unused */

Expand Down
1 change: 1 addition & 0 deletions tests/test_mod.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void test_mod_unbecome(void **state);
void test_mod_add_wrong_fd(void **state);
void test_mod_add_fd_NULL_self(void **state);
void test_mod_add_fd(void **state);
void test_mod_srcs(void **state);
void test_mod_rm_wrong_fd(void **state);
void test_mod_rm_wrong_fd_2(void **state);
void test_mod_rm_fd_NULL_self(void **state);
Expand Down

0 comments on commit 60c13a1

Please sign in to comment.