diff --git a/c/mqtt.c b/c/mqtt.c index 20a56af..cdfeb50 100644 --- a/c/mqtt.c +++ b/c/mqtt.c @@ -19,7 +19,7 @@ #define O_PLMT 1 #undef MQTT_DEBUG_FOREIGN -#define MQTT_DEBUG_FOREIGN 1 +//#define MQTT_DEBUG_FOREIGN 1 #ifdef MQTT_DEBUG_FOREIGN #define _LOG(...) printf(__VA_ARGS__);fflush(stdout); @@ -29,7 +29,7 @@ #define PACK_MAJOR 1 #define PACK_MINOR 0 -#define PACK_REVISION 1 +#define PACK_REVISION 3 #define PACK_VERSION_NUMBER (PACK_MAJOR*10000+PACK_MINOR*100+PACK_REVISION) @@ -48,7 +48,7 @@ static PL_engine_t current_engine; static atom_t ATOM_is_async; static atom_t ATOM_client_id; static atom_t ATOM_keepalive; -static atom_t ATOM_use_callbacks; +//static atom_t ATOM_use_callbacks; static atom_t ATOM_message_id; static atom_t ATOM_topic; @@ -86,6 +86,7 @@ static atom_t ATOM_on_publish; static atom_t ATOM_on_subscribe; static atom_t ATOM_on_unsubscribe; static atom_t ATOM_hooks; +static atom_t ATOM_debug_hooks; // these are options going with hook to prolog static functor_t FUNCTOR_topic1; @@ -212,14 +213,14 @@ close_list(term_t list) static int destroy_mqtt(swi_mqtt *m) { - _LOG("--- (f-c) destroy_mqtt swi_mqtt: %p refs: %d\n", m, m->refs); + _LOG("--- (f-b) destroy_mqtt swi_mqtt: %p refs: %d\n", m, m->refs); if (m && m->mosq && m->refs == 0) { if (m->is_in_use == TRUE) { - _LOG("--- (f-c) destroy_mqtt - connection is in use\n"); + _LOG("--- (f-b) destroy_mqtt - connection is in use\n"); return FALSE; } else { - _LOG("--- (f-c) destroy_mqtt - release mosq and blob\n"); + _LOG("--- (f-b) destroy_mqtt - release mosq and blob\n"); mosquitto_destroy(m->mosq); m->mosq = NULL; free(m); @@ -256,7 +257,7 @@ static int set_engine_for_callbacks(swi_mqtt *m) return TRUE; } - _LOG("--- (f-c) set_engine_for_callbacks > failed to recover an engine. probably mqtt_disconnect was called...\n"); + _LOG("--- (f-x) set_engine_for_callbacks > failed to recover an engine. probably mqtt_disconnect was called...\n"); return FALSE; } @@ -270,38 +271,54 @@ static int unset_engine_for_callbacks(swi_mqtt *m) return FALSE; } */ - /******************************* - * CALLBACKS * - *******************************/ -void on_disconnect_callback(struct mosquitto *mosq, void *obj, int reason) +int have_thread_engine(swi_mqtt *m) { - swi_mqtt *m = (swi_mqtt *)obj; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_disconnect_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); - if (thread_id < 0) { if (set_engine_for_callbacks(m) == FALSE) { - return; + return FALSE; } else { thread_id = PL_thread_self(); - _LOG("--- (f-c) on_disconnect_callback > recovered engine with thread: %d\n", thread_id); + _LOG("--- (f-x) have_thread_engine > got on: %d\n", thread_id); } + } else { + _LOG("--- (f-x) have_thread_engine > have on: %d\n", thread_id); } + return TRUE; +} - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + /******************************* + * CALLBACKS * + *******************************/ - add_int_option( t1, FUNCTOR_reason1, reason); +#define CB_PREPARE_FRAME() fid_t fid = PL_open_foreign_frame();predicate_t callback_to_use;term_t t0 = PL_new_term_refs(2);term_t t1 = t0 + 1;unify_swi_mqtt(t0, m); + +#define CB_CALL_N_CLOSE_FRAME(cb_sufx) if (m->use_callbacks == TRUE){callback_to_use = m->callback_##cb_sufx##2;} else {callback_to_use = PRED_##cb_sufx##2;} \ + PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, callback_to_use, t0);PL_discard_foreign_frame(fid); + +void on_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; + + _LOG("--- (f-x) on_log > mosq: %p\n", mosq); + + if (have_thread_engine(m) == FALSE) + { + return; + } + + CB_PREPARE_FRAME() + + + add_int_option( t1, FUNCTOR_level1, level); + add_char_option(t1, FUNCTOR_log1, str); close_list(t1); - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_disconnect2, t0); - PL_discard_foreign_frame(fid); + CB_CALL_N_CLOSE_FRAME(on_log) // unset_engine_for_callbacks(m); } @@ -310,212 +327,133 @@ void on_disconnect_callback(struct mosquitto *mosq, void *obj, int reason) void on_connect_callback(struct mosquitto *mosq, void *obj, int result) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_connect_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_connect > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_connect_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + CB_PREPARE_FRAME() + add_int_option( t1, FUNCTOR_result1, result); close_list(t1); - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_connect2, t0); - PL_discard_foreign_frame(fid); - - // unset_engine_for_callbacks(m); + CB_CALL_N_CLOSE_FRAME(on_connect) } -void on_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +void on_disconnect_callback(struct mosquitto *mosq, void *obj, int reason) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_message_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_disconnect > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_message_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); - - add_int_option( t1, FUNCTOR_message_id1, message->mid); - add_char_option(t1, FUNCTOR_topic1, message->topic); - add_char_option(t1, FUNCTOR_payload1, message->payload); - add_int_option( t1, FUNCTOR_payload_len1, message->payloadlen); - add_int_option( t1, FUNCTOR_qos1, message->qos); - add_int_option( t1, FUNCTOR_retain1, message->retain); - close_list(t1); + CB_PREPARE_FRAME() - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_message2, t0); - PL_discard_foreign_frame(fid); + add_int_option( t1, FUNCTOR_reason1, reason); + close_list(t1); - // unset_engine_for_callbacks(m); + CB_CALL_N_CLOSE_FRAME(on_disconnect) } -void on_publish_callback(struct mosquitto *mosq, void *obj, int mid) +void on_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_publish_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_message > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_publish_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + CB_PREPARE_FRAME() - add_int_option( t1, FUNCTOR_message_id1, mid); + add_int_option( t1, FUNCTOR_message_id1, message->mid); + add_char_option(t1, FUNCTOR_topic1, message->topic); + add_char_option(t1, FUNCTOR_payload1, message->payload); + add_int_option( t1, FUNCTOR_payload_len1, message->payloadlen); + add_int_option( t1, FUNCTOR_qos1, message->qos); + add_int_option( t1, FUNCTOR_retain1, message->retain); close_list(t1); - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_publish2, t0); - PL_discard_foreign_frame(fid); - - // unset_engine_for_callbacks(m); + CB_CALL_N_CLOSE_FRAME(on_message) } - -void on_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +void on_publish_callback(struct mosquitto *mosq, void *obj, int mid) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_log_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_publish > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_log_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + CB_PREPARE_FRAME() - add_int_option( t1, FUNCTOR_level1, level); - add_char_option(t1, FUNCTOR_log1, str); + add_int_option( t1, FUNCTOR_message_id1, mid); close_list(t1); - - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_log2, t0); - PL_discard_foreign_frame(fid); - - // unset_engine_for_callbacks(m); + + CB_CALL_N_CLOSE_FRAME(on_publish) } - void on_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_subscribe_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_subscribe > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_subscribe_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + CB_PREPARE_FRAME() add_int_option( t1, FUNCTOR_message_id1, mid); //add_int_option(t1, FUNCTOR_log1, qos_count); close_list(t1); - - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_subscribe2, t0); - PL_discard_foreign_frame(fid); - + CB_CALL_N_CLOSE_FRAME(on_subscribe) /* obj - the user data provided in * mid - the message id of the subscribe message. * qos_count - the number of granted subscriptions (size of granted_qos). * granted_qos - an array of integers indicating the granted QoS for each of * the subscriptions. */ - // unset_engine_for_callbacks(m); } - - void on_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid) { swi_mqtt *m = (swi_mqtt *)obj; + //predicate_t callback_to_use; - int thread_id = PL_thread_self(); - _LOG("--- (f-c) on_unsubscribe_callback > mosq: %p obj: %p-%p, on thread: %d\n", mosq, obj, m->mosq, thread_id); + _LOG("--- (f-x) on_unsubscribe > mosq: %p\n", mosq); - if (thread_id < 0) + if (have_thread_engine(m) == FALSE) { - if (set_engine_for_callbacks(m) == FALSE) - { - return; - } else { - thread_id = PL_thread_self(); - _LOG("--- (f-c) on_unsubscribe_callback > recovered engine with thread: %d\n", thread_id); - } + return; } - fid_t fid = PL_open_foreign_frame(); - term_t t0 = PL_new_term_refs(2); - term_t t1 = t0 + 1; - unify_swi_mqtt(t0, m); + CB_PREPARE_FRAME() add_int_option( t1, FUNCTOR_message_id1, mid); close_list(t1); - - PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, PRED_on_unsubscribe2, t0); - PL_discard_foreign_frame(fid); - - // unset_engine_for_callbacks(m); + CB_CALL_N_CLOSE_FRAME(on_unsubscribe) } @@ -530,7 +468,7 @@ void on_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid) static void acquire_mqtt_symbol(atom_t symbol) { - _LOG("--- (f-c) acquire_mqtt_symbol symbol: %d\n", (int) symbol); + _LOG("--- (f-b) acquire_mqtt_symbol symbol: %d\n", (int) symbol); swi_mqtt *m = PL_blob_data(symbol, NULL, NULL); m->symbol = symbol; @@ -540,7 +478,7 @@ acquire_mqtt_symbol(atom_t symbol) static int release_mqtt_symbol(atom_t symbol) { - _LOG("--- (f-c) release_mqtt_symbol symbol: %d\n", (int) symbol); + _LOG("--- (f-b) release_mqtt_symbol symbol: %d\n", (int) symbol); swi_mqtt *m = PL_blob_data(symbol, NULL, NULL); m->refs--; @@ -551,7 +489,7 @@ release_mqtt_symbol(atom_t symbol) static int compare_mqtt_symbols(atom_t a, atom_t b) { - _LOG("--- (f-c) compare_mqtt_symbols\n"); + _LOG("--- (f-b) compare_mqtt_symbols\n"); swi_mqtt *ma = PL_blob_data(a, NULL, NULL); swi_mqtt *mb = PL_blob_data(b, NULL, NULL); @@ -563,7 +501,7 @@ compare_mqtt_symbols(atom_t a, atom_t b) static int write_mqtt_symbol(IOSTREAM *s, atom_t symbol, int flags) { - _LOG("--- (f-c) write_mqtt_symbol\n"); + _LOG("--- (f-b) write_mqtt_symbol\n"); swi_mqtt *m = PL_blob_data(symbol, NULL, NULL); // Sfprintf(s, "(%p-%p)", m, m->mosq); @@ -584,7 +522,7 @@ static PL_blob_t mqtt_blob = static int unify_swi_mqtt(term_t handle, swi_mqtt *m) { - _LOG("--- (f-c) unify_swi_mqtt\n"); + _LOG("--- (f-b) unify_swi_mqtt\n"); if ( PL_unify_blob(handle, m, sizeof(*m), &mqtt_blob) ) return TRUE; @@ -597,7 +535,7 @@ static int get_swi_mqtt(term_t handle, swi_mqtt **mp) { PL_blob_t *type; - _LOG("--- (f-c) get_swi_mqtt\n"); + _LOG("--- (f-b) get_swi_mqtt\n"); void *data; if ( PL_get_blob(handle, &data, NULL, &type) && type == &mqtt_blob) @@ -618,7 +556,7 @@ get_swi_mqtt(term_t handle, swi_mqtt **mp) static void release_swi_mqtt(swi_mqtt *m) { - _LOG("--- (f-c) release_swi_mqtt\n"); + _LOG("--- (f-b) release_swi_mqtt\n"); UNLOCK(m); } @@ -875,7 +813,20 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) char* client_id; int keepalive = 60; int is_async = FALSE; - int use_hooks = TRUE; + int use_hooks = FALSE; + int use_debug_hooks = FALSE; + + char* hook_module = NULL; + char* hook_on_log = NULL; + char* hook_on_connect = NULL; + char* hook_on_disconnect = NULL; + char* hook_on_message = NULL; + char* hook_on_publish = NULL; + char* hook_on_subscribe = NULL; + char* hook_on_unsubscribe = NULL; + + + _LOG("--- (f-c) c_mqtt_connect\n"); @@ -911,12 +862,25 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) _PL_get_arg(1, head, arg); - if ( name == ATOM_client_id ) { if (!PL_get_chars( arg, &client_id, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} - } else if ( name == ATOM_is_async ) { if (!PL_get_bool( arg, &is_async) ) { result = FALSE;goto CLEANUP;} - } else if ( name == ATOM_hooks ) { if (!PL_get_bool( arg, &use_hooks) ) { result = FALSE;goto CLEANUP;} - } else if ( name == ATOM_keepalive ) { if (!PL_get_integer(arg, &keepalive) ) { result = FALSE; goto CLEANUP;} + if ( name == ATOM_client_id ) { if (!PL_get_chars( arg, &client_id, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + + } else if ( name == ATOM_module ) { if (!PL_get_chars( arg, &hook_module, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + } else if ( name == ATOM_on_log ) { if (!PL_get_chars( arg, &hook_on_log, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + + } else if ( name == ATOM_on_connect) { if (!PL_get_chars( arg, &hook_on_connect, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + } else if ( name == ATOM_on_disconnect) { if (!PL_get_chars( arg, &hook_on_disconnect, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + + } else if ( name == ATOM_on_message ) { if (!PL_get_chars( arg, &hook_on_message, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + } else if ( name == ATOM_on_publish ) { if (!PL_get_chars( arg, &hook_on_publish, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + + } else if ( name == ATOM_on_subscribe) { if (!PL_get_chars( arg, &hook_on_subscribe, CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + } else if ( name == ATOM_on_unsubscribe){ if (!PL_get_chars( arg, &hook_on_unsubscribe,CVT_WRITE | BUF_MALLOC) ) { result = FALSE; goto CLEANUP;} + + } else if ( name == ATOM_hooks ) { if (!PL_get_bool( arg, &use_hooks) ) { result = FALSE;goto CLEANUP;} + } else if ( name == ATOM_debug_hooks) { if (!PL_get_bool( arg, &use_debug_hooks) ) { result = FALSE;goto CLEANUP;} + } else if ( name == ATOM_is_async ) { if (!PL_get_bool( arg, &is_async) ) { result = FALSE;goto CLEANUP;} + } else if ( name == ATOM_keepalive ) { if (!PL_get_integer(arg, &keepalive) ) { result = FALSE; goto CLEANUP;} } - } else { result = FALSE; //pl_error("c_mosquitto_connect", 4, NULL, ERR_TYPE, head, "option"); goto CLEANUP; @@ -942,6 +906,10 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) } m->refs = 0; + m->use_callbacks = FALSE; + m->is_async = is_async; + m->is_async_loop_started = false; + m->keepalive = keepalive; // pass swi_mqtt as user object (will be available in all callbacks) mosq = mosquitto_new(client_id, true, m); @@ -951,17 +919,64 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) _LOG("--- (f-c) c_mqtt_connect > new mqtt: %p\n", mosq); m->mosq = mosq; _LOG("--- (f-c) c_mqtt_connect > m->mosq: %p\n", m->mosq); - m->is_async = is_async; - m->is_async_loop_started = false; - - m->keepalive = keepalive; // if ATOM_use_callbacks set --> + _LOG("--- (f-c) c_mqtt_connect > hooks: %d debug_hooks: %d\n", use_hooks, use_debug_hooks); // set all callbacks - - // use custom hooks, or dump info with dummy... if (use_hooks) { + _LOG("--- (f-c) c_mqtt_connect > setting custom hooks in module: %s\n", hook_module); + m->use_callbacks = TRUE; + if (hook_on_log) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_log: %s\n", hook_on_log); + m->callback_on_log2 = PL_predicate(hook_on_log, 2, hook_module); + mosquitto_log_callback_set(m->mosq, on_log_callback); + } + + if (hook_on_connect) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_connect: %s\n", hook_on_connect); + m->callback_on_connect2 = PL_predicate(hook_on_connect, 2, hook_module); + mosquitto_connect_callback_set(m->mosq, on_connect_callback); + } + if (hook_on_disconnect) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_disconnect: %s\n", hook_on_disconnect); + m->callback_on_disconnect2 = PL_predicate(hook_on_disconnect, 2, hook_module); + mosquitto_disconnect_callback_set(m->mosq, on_disconnect_callback); + } + + if (hook_on_message) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_message: %s\n", hook_on_message); + m->callback_on_message2 = PL_predicate(hook_on_message, 2, hook_module); + mosquitto_message_callback_set(m->mosq, on_message_callback); + } + + if (hook_on_publish) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_publish: %s\n", hook_on_publish); + m->callback_on_publish2 = PL_predicate(hook_on_publish, 2, hook_module); + mosquitto_publish_callback_set(m->mosq, on_publish_callback); + } + + if (hook_on_subscribe) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_subscribe: %s\n", hook_on_subscribe); + m->callback_on_subscribe2 = PL_predicate(hook_on_subscribe, 2, hook_module); + mosquitto_subscribe_callback_set(m->mosq, on_subscribe_callback); + } + if (hook_on_unsubscribe) + { + _LOG("--- (f-c) c_mqtt_connect > setting custom hook on_unsubscribe: %s\n", hook_on_unsubscribe); + m->callback_on_unsubscribe2 = PL_predicate(hook_on_unsubscribe, 2, hook_module); + mosquitto_unsubscribe_callback_set(m->mosq, on_unsubscribe_callback); + } + + } else if (use_debug_hooks) + { + _LOG("--- (f-c) c_mqtt_connect > setting debug hooks\n"); mosquitto_log_callback_set(m->mosq, on_log_callback); mosquitto_connect_callback_set(m->mosq, on_connect_callback); @@ -1251,7 +1266,7 @@ install_mqtt(void) ATOM_is_async = PL_new_atom("is_async"); ATOM_client_id = PL_new_atom("client_id"); ATOM_keepalive = PL_new_atom("keepalive"); - ATOM_use_callbacks = PL_new_atom("use_callbacks"); + //ATOM_use_callbacks = PL_new_atom("use_callbacks"); ATOM_message_id = PL_new_atom("message_id"); ATOM_topic = PL_new_atom("topic"); @@ -1285,12 +1300,13 @@ install_mqtt(void) ATOM_on_connect = PL_new_atom("on_connect"); ATOM_on_disconnect = PL_new_atom("on_disconnect"); ATOM_on_log = PL_new_atom("on_log"); - ATOM_on_message = PL_new_atom("message"); + ATOM_on_message = PL_new_atom("on_message"); ATOM_on_publish = PL_new_atom("on_publish"); ATOM_on_subscribe = PL_new_atom("on_subscribe"); ATOM_on_unsubscribe = PL_new_atom("on_unsubscribe"); ATOM_hooks = PL_new_atom("hooks"); + ATOM_debug_hooks = PL_new_atom("debug_hooks"); // now options (functors with arity 1) FUNCTOR_topic1 = PL_new_functor(ATOM_topic, 1); diff --git a/c/mqtt.o b/c/mqtt.o index aaeeae3..31f7565 100644 Binary files a/c/mqtt.o and b/c/mqtt.o differ diff --git a/examples/callbacks.pl b/examples/callbacks.pl new file mode 100644 index 0000000..e30e81c --- /dev/null +++ b/examples/callbacks.pl @@ -0,0 +1,58 @@ +:- use_module(library(mqtt)). + +versions:- + mqtt_version(MqttVerMa, MqttVerMi, MqttVerRe), + pack_version(PackVerMa, PackVerMi, PackVerRe), + format('% Pack version: ~w.~w.~w Mosquitto version: ~w.~w.~w~n', [PackVerMa, PackVerMi, PackVerRe, MqttVerMa, MqttVerMi, MqttVerRe]), + true. + + +/* + + +*/ +simple_pub(Topic, Value) :- + + mqtt_connect(A, 'localhost', 1883, + [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false), + debug_hooks(false), hooks(true), + module(user), + on_log(my_on_log), on_message(my_on_msg), on_publish(my_on_pub), + on_subscribe(my_on_sub), on_unsubscribe(my_on_uns), + on_connect(my_on_con), on_disconnect(my_on_dis) + ]), + mqtt_pub(A, Topic, Value), + mqtt_disconnect(A). + + +/* + + +*/ +simple_sub(Topic) :- + mqtt_connect(A, 'localhost', 1883, + [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false), + debug_hooks(false), hooks(true), + module(user), + on_log(my_on_log), on_message(my_on_msg), on_publish(my_on_pub), + on_subscribe(my_on_sub), on_unsubscribe(my_on_uns), + on_connect(my_on_con), on_disconnect(my_on_dis) + ]), + mqtt_sub(A, Topic, []), + mqtt_loop(A),sleep(1), + mqtt_loop(A),sleep(1), + mqtt_loop(A), + mqtt_unsub(A, Topic), + mqtt_disconnect(A). + + +my_on_log(C,D) :- format('% log > ~w - ~w~n', [C,D]). + +my_on_msg(C,D) :- format('% msg > ~w - ~w~n', [C,D]). +my_on_pub(C,D) :- format('% pub > ~w - ~w~n', [C,D]). + +my_on_con(C,D) :- format('% con > ~w - ~w~n', [C,D]). +my_on_dis(C,D) :- format('% dis > ~w - ~w~n', [C,D]). + +my_on_sub(C,D) :- format('% sub > ~w - ~w~n', [C,D]). +my_on_uns(C,D) :- format('% uns > ~w - ~w~n', [C,D]). diff --git a/examples/mqtt_timer.pl b/examples/mqtt_timer.pl index ed2bc71..c5949f6 100644 --- a/examples/mqtt_timer.pl +++ b/examples/mqtt_timer.pl @@ -34,14 +34,27 @@ timer_sub :- - mqtt_connect(P, 'localhost', 1883, [alias(swi_mqtt_pub), client_id(swi_mqtt_pub_client), keepalive(60), is_async(true)]), + mqtt_connect(P, 'localhost', 1883, + [alias(swi_mqtt_pub), client_id(swi_mqtt_pub_client), keepalive(60), is_async(true), + debug_hooks(false), hooks(true), + module(user), + on_publish(my_on_pub), + on_disconnect(my_on_dis) + ]), assert(pub_connection(P)), - mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt_timer), client_id(swi_mqtt_timer_client), keepalive(120), is_async(true)]), + mqtt_connect(A, 'localhost', 1883, + [alias(swi_mqtt_timer), client_id(swi_mqtt_timer_client), keepalive(120), is_async(true), + debug_hooks(false), hooks(true), + module(user), + on_message(my_on_msg), on_log(my_on_log), + on_subscribe(my_on_sub), on_unsubscribe(my_on_uns), + on_disconnect(my_on_dis) + ]), create_thread_worker(W), assert( worker_running(W) ), assert( worker_data(W, A) ), - mqtt_sub(A, '/swi/+/timestamp', []), + mqtt_sub(A, 'timestamp', []), true. @@ -80,10 +93,9 @@ get_time(TimeRaw), % just seconds, no microsecs format_time(atom(Time), '%s', TimeRaw), - format_time(atom(TimeFmt), '%F %T', TimeRaw), - format('% pub time: ~w~n', [TimeFmt]), + %format_time(atom(TimeFmt), '%F %T', TimeRaw), format('% pub time: ~w~n', [TimeFmt]), %mqtt_pub(Conn, '/swi/host/timestamp', Time), - timer_pub('/swi/host/timestamp', Time), + timer_pub('timestamp', Time), sleep(10), do_mqtt_work(ThreadId), true. @@ -97,4 +109,24 @@ member(payload(T), Data), stamp_date_time(T, X, 'UTC'), format('% got time: ~w~n', X). -*/ \ No newline at end of file +*/ + + +my_on_log(C,D) :- format('% log > ~w - ~w~n', [C,D]). + +my_on_msg(_C,D) :- + format('% msg > ~w~n', [D]), + member(topic(timestamp), D), + member(payload(Time), D), + format('% msg > got: ~w seconds~n', [Time]), + true. + +my_on_pub(_C,D) :- + format('% pub > ~w~n', [D]), + true. + +my_on_con(C,D) :- format('% con > ~w - ~w~n', [C,D]). +my_on_dis(C,D) :- format('% dis > ~w - ~w~n', [C,D]). + +my_on_sub(C,D) :- format('% sub > ~w - ~w~n', [C,D]). +my_on_uns(C,D) :- format('% uns > ~w - ~w~n', [C,D]). diff --git a/examples/simple.pl b/examples/simple.pl index 62d6811..e0650c5 100644 --- a/examples/simple.pl +++ b/examples/simple.pl @@ -24,7 +24,7 @@ */ simple_pub(Topic, Value) :- - mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false)]), + mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false), debug_hooks(true)]), mqtt_pub(A, Topic, Value), mqtt_disconnect(A). @@ -47,7 +47,7 @@ */ simple_sub(Topic) :- - mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false)]), + mqtt_connect(A, 'localhost', 1883, [alias(swi_mqtt), client_id(swi_mqtt_client), keepalive(120), is_async(false), debug_hooks(true)]), mqtt_sub(A, Topic, []), mqtt_loop(A), diff --git a/lib/x86_64-linux/mqtt.so b/lib/x86_64-linux/mqtt.so index 4208302..fe0c517 100644 Binary files a/lib/x86_64-linux/mqtt.so and b/lib/x86_64-linux/mqtt.so differ diff --git a/pack.pl b/pack.pl index be3996f..d659423 100644 --- a/pack.pl +++ b/pack.pl @@ -1,8 +1,8 @@ name(mqtt). title('mqtt - pub/sub pack for SWI-Prolog using mosquitto'). -version('1.0.2'). +version('1.0.3'). author('Oleh Lozynskyy', 'oleh.lozynskyy+github@gmail.com'). packager('Oleh Lozynskyy', 'oleh.lozynskyy+github@gmail.com'). maintainer('Oleh Lozynskyy', 'oleh.lozynskyy+github@gmail.com'). home('https://github.com/olsky/swi-mqtt-pack'). -download('https://github.com/olsky/swi-mqtt-pack/releases/download/v1.0.2/mqtt-1.0.2.zip'). +download('https://github.com/olsky/swi-mqtt-pack/releases/download/v1.0.3/mqtt-1.0.3.zip').