Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce void* parameter for user data in listeners #207

Merged
merged 4 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 103 additions & 21 deletions core/pbcc_subscribe_event_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ typedef struct {
pbarray_t* listeners;
} pbcc_object_listener_t;

/**
* @brief Real-time status update listener definition.
*
* Object listeners entry object with information about the callback to use
* when status update arrives.
*/
typedef struct {
/** Real-time status update handling listener function. */
pubnub_subscribe_status_callback_t callback;
/** User provided data to be passed to the callback. */
void* user_data;
/** Object references counter. */
pbref_counter_t* counter;
} pbcc_status_listener_t;

/**
* @brief Listener definition.
*
Expand All @@ -67,6 +82,8 @@ typedef struct {
pubnub_subscribe_listener_type type;
/** Real-time update handling listener function. */
pubnub_subscribe_message_callback_t callback;
/** User provided data to be passed to the callback. */
void* user_data;
/** Object references counter. */
pbref_counter_t* counter;
} pbcc_listener_t;
Expand Down Expand Up @@ -157,14 +174,16 @@ static void pbcc_object_listener_free_(pbcc_object_listener_t* listener);
* @param type Type of real-time update for which listener will be
* called.
* @param callback Real-time update handling listener function.
* @param user_data User provided data to be passed to the callback.
* @return Pointer to the ready to use real-time events listener or `NULL` in
* case of insufficient memory error. The returned pointer must be
* passed to the `_pbcc_listener_free` to avoid a memory leak.
*/
static pbcc_listener_t* pbcc_listener_alloc_(
const void* subscription,
pubnub_subscribe_listener_type type,
pubnub_subscribe_message_callback_t callback);
pubnub_subscribe_message_callback_t callback,
void* user_data);

/**
* @brief Add / register real-time update listener.
Expand Down Expand Up @@ -243,6 +262,24 @@ static pbarray_t* pbcc_initialize_array_(
pbarray_t** array,
pbarray_element_free free_fn);

/** @brief Create real-time status update listener.
*
* @param callback Real-time status update handling listener function.
* @param user_data User provided data to be passed to the callback.
* @return Pointer to the ready to use real-time status listener or `NULL` in
* case of insufficient memory error. The returned pointer must be
* passed to the `_pbcc_status_listener_free` to avoid a memory leak.
*/
static pbcc_status_listener_t* pbcc_status_listener_alloc_(
pubnub_subscribe_status_callback_t callback,
void* user_data);

/** @brief Clean up resources used by status listener object.
*
* @param listener Pointer to the status listener, which should free up
* resources.
*/
static void pbcc_status_listener_free_(pbcc_status_listener_t* listener);

// ----------------------------------------------
// Functions
Expand All @@ -262,26 +299,35 @@ pbcc_event_listener_t* pbcc_event_listener_alloc(const pubnub_t* pb)

enum pubnub_res pbcc_event_listener_add_status_listener(
pbcc_event_listener_t* listener,
const pubnub_subscribe_status_callback_t cb)
const pubnub_subscribe_status_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb) { return PNR_INVALID_PARAMETERS; }

pubnub_mutex_lock(listener->mutw);
/** Check whether listeners array should be created on demand or not. */
if (NULL == pbcc_initialize_array_(&listener->global_status, NULL)) {
if (NULL == pbcc_initialize_array_(&listener->global_status,
(pbarray_element_free)pbcc_status_listener_free_)) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
}

pbcc_status_listener_t* _listener = pbcc_status_listener_alloc_(cb, user_data);
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
}

const pbarray_res result = pbarray_add(listener->global_status, cb);
const pbarray_res result = pbarray_add(listener->global_status, _listener);
pubnub_mutex_unlock(listener->mutw);

return PBAR_OUT_OF_MEMORY != result ? PNR_OK : PNR_OUT_OF_MEMORY;
}

enum pubnub_res pbcc_event_listener_remove_status_listener(
pbcc_event_listener_t* listener,
pubnub_subscribe_status_callback_t cb)
pubnub_subscribe_status_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb) { return PNR_INVALID_PARAMETERS; }

Expand All @@ -291,7 +337,18 @@ enum pubnub_res pbcc_event_listener_remove_status_listener(
return PNR_OK;
}

pbarray_remove(listener->global_status, (void**)&cb, true);
pbcc_status_listener_t* _listener = pbcc_status_listener_alloc_(cb, user_data);
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
}

pbarray_remove(listener->global_status, (void**)&_listener, true);

/**
* It is safe to release temporarily object which used only to match object.
*/
pbcc_status_listener_free_(_listener);
pubnub_mutex_unlock(listener->mutw);

return PNR_OK;
Expand All @@ -300,7 +357,8 @@ enum pubnub_res pbcc_event_listener_remove_status_listener(
enum pubnub_res pbcc_event_listener_add_message_listener(
pbcc_event_listener_t* listener,
const pubnub_subscribe_listener_type type,
const pubnub_subscribe_message_callback_t cb)
const pubnub_subscribe_message_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb) { return PNR_INVALID_PARAMETERS; }

Expand All @@ -313,7 +371,7 @@ enum pubnub_res pbcc_event_listener_add_message_listener(
return PNR_OUT_OF_MEMORY;
}

pbcc_listener_t* _listener = pbcc_listener_alloc_(NULL, type, cb);
pbcc_listener_t* _listener = pbcc_listener_alloc_(NULL, type, cb, user_data);
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
Expand All @@ -331,7 +389,8 @@ enum pubnub_res pbcc_event_listener_add_message_listener(
enum pubnub_res pbcc_event_listener_remove_message_listener(
pbcc_event_listener_t* listener,
const pubnub_subscribe_listener_type type,
const pubnub_subscribe_message_callback_t cb)
const pubnub_subscribe_message_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb)
return PNR_INVALID_PARAMETERS;
Expand All @@ -342,7 +401,7 @@ enum pubnub_res pbcc_event_listener_remove_message_listener(
return PNR_OK;
}

pbcc_listener_t* _listener = pbcc_listener_alloc_(NULL, type, cb);
pbcc_listener_t* _listener = pbcc_listener_alloc_(NULL, type, cb, user_data);
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
Expand All @@ -366,7 +425,8 @@ enum pubnub_res pbcc_event_listener_add_subscription_object_listener(
const pubnub_subscribe_listener_type type,
pbarray_t* names,
const void* subscription,
const pubnub_subscribe_message_callback_t cb)
const pubnub_subscribe_message_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb) { return PNR_INVALID_PARAMETERS; }

Expand All @@ -384,7 +444,7 @@ enum pubnub_res pbcc_event_listener_add_subscription_object_listener(
}
}

pbcc_listener_t* _listener = pbcc_listener_alloc_(subscription, type, cb);
pbcc_listener_t* _listener = pbcc_listener_alloc_(subscription, type, cb, user_data);
bool added = false;
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
Expand Down Expand Up @@ -419,17 +479,18 @@ enum pubnub_res pbcc_event_listener_remove_subscription_object_listener(
const pubnub_subscribe_listener_type type,
pbarray_t* names,
const void* subscription,
const pubnub_subscribe_message_callback_t cb)
const pubnub_subscribe_message_callback_t cb,
void* user_data)
{
if (NULL == listener || NULL == cb) { return PNR_INVALID_PARAMETERS; }

pubnub_mutex_lock(listener->mutw);
if (NULL == listener->global_events) {
if (NULL == listener->listeners) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OK;
}

pbcc_listener_t* _listener = pbcc_listener_alloc_(subscription, type, cb);
pbcc_listener_t* _listener = pbcc_listener_alloc_(subscription, type, cb, user_data);
if (NULL == _listener) {
pubnub_mutex_unlock(listener->mutw);
return PNR_OUT_OF_MEMORY;
Expand Down Expand Up @@ -464,10 +525,10 @@ void pbcc_event_listener_emit_status(
{ reason, channels, channel_groups };
const size_t status_count = pbarray_count(listener->global_status);
for (size_t i = 0; i < status_count; ++i) {
const pubnub_subscribe_status_callback_t cb = (
pubnub_subscribe_status_callback_t)
const pbcc_status_listener_t* sl = (
pbcc_status_listener_t*)
pbarray_element_at(listener->global_status, i);
cb(listener->pb, status, data);
sl->callback(listener->pb, status, data, sl->user_data);
}
pubnub_mutex_unlock(listener->mutw);
}
Expand Down Expand Up @@ -589,13 +650,27 @@ void pbcc_object_listener_free_(pbcc_object_listener_t* listener)
pbcc_listener_t* pbcc_listener_alloc_(
const void* subscription,
const pubnub_subscribe_listener_type type,
const pubnub_subscribe_message_callback_t callback)
const pubnub_subscribe_message_callback_t callback,
void* user_data)
{
PBCC_ALLOCATE_TYPE(listener, pbcc_listener_t, true, NULL);
listener->counter = pbref_counter_alloc();
listener->type = type;
listener->subscription_object = subscription;
listener->callback = callback;
listener->user_data = user_data;

return listener;
}

pbcc_status_listener_t* pbcc_status_listener_alloc_(
pubnub_subscribe_status_callback_t callback,
void* user_data)
{
PBCC_ALLOCATE_TYPE(listener, pbcc_status_listener_t, true, NULL);
listener->counter = pbref_counter_alloc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see counter, but was it used anywhere? For pbcc_listener_t it is used with pbcc_add_listener_ and pbcc_remove_listener_.

listener->callback = callback;
listener->user_data = user_data;

return listener;
}
Expand Down Expand Up @@ -623,7 +698,8 @@ enum pubnub_res pbcc_remove_listener_(

if (_listener->type == listener->type &&
_listener->subscription_object == listener->subscription_object &&
_listener->callback == listener->callback) {
_listener->callback == listener->callback &&
_listener->user_data == listener->user_data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should actually check _listener->user_data == listener->user_data when removing listeners? This forces user to store more data just to remove the listener, but if subscription objects and callbacks matches, maybe it should be enough to remove the listener?

pbref_counter_decrement(_listener->counter);
pbarray_remove(listeners, (void**)&_listener, true);
}
Expand All @@ -639,6 +715,12 @@ void pbcc_listener_free_(pbcc_listener_t* listener)
if (0 == pbref_counter_free(listener->counter)) { free(listener); }
}

void pbcc_status_listener_free_(pbcc_status_listener_t* listener)
{
if (NULL == listener) { return; }
if (0 == pbref_counter_free(listener->counter)) { free(listener); }
}

void pbcc_event_listener_emit_message_(
const pbcc_event_listener_t* listener,
pbarray_t* listeners,
Expand All @@ -652,7 +734,7 @@ void pbcc_event_listener_emit_message_(
pbcc_listener_t* _listener = (pbcc_listener_t*)
pbarray_element_at(listeners, i);
if (NULL == _listener->subscription_object || _listener->type == type)
_listener->callback(listener->pb, message);
_listener->callback(listener->pb, message, _listener->user_data);
}
}

Expand Down
Loading
Loading