Skip to content

Commit

Permalink
Connect to local mqtt broker
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffMboya committed Jan 13, 2025
1 parent 9b10f26 commit 1321428
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 23 deletions.
7 changes: 7 additions & 0 deletions embed-proplet/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ void main(void)
return;
}

/* Wait for MQTT connection to be established */
LOG_INF("Waiting for MQTT connection...");
while (!mqtt_connected) {
k_sleep(K_MSEC(100));
}
LOG_INF("MQTT connected successfully.");

/* Publish discovery announcement */
if (mqtt_client_discovery_announce(PROPLET_ID, CHANNEL_ID) != 0) {
LOG_ERR("Discovery announcement failed");
Expand Down
73 changes: 58 additions & 15 deletions embed-proplet/src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,45 @@ static struct mqtt_client client_ctx;
static struct sockaddr_storage broker_addr;

/* Flag to indicate connection status */
static bool mqtt_connected = false;
bool mqtt_connected = false;

static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt)
{
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result == 0) {
case MQTT_EVT_CONNACK: {
const struct mqtt_connack_param *connack = &evt->param.connack;

if (evt->result == 0 && connack->return_code == MQTT_CONNECTION_ACCEPTED) {
mqtt_connected = true;
LOG_INF("Connected to MQTT broker");
LOG_INF("MQTT connection accepted by broker");
} else {
LOG_ERR("Connection failed, result: %d", evt->result);
mqtt_connected = false;

LOG_ERR("MQTT connection failed. Result: %d, Return Code: %d", evt->result, connack->return_code);

switch (connack->return_code) {
case MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
LOG_ERR("Error: MQTT_UNACCEPTABLE_PROTOCOL_VERSION - The server does not support the MQTT protocol version requested.");
break;
case MQTT_IDENTIFIER_REJECTED:
LOG_ERR("Error: MQTT_IDENTIFIER_REJECTED - The client identifier is not allowed by the server.");
break;
case MQTT_SERVER_UNAVAILABLE:
LOG_ERR("Error: MQTT_SERVER_UNAVAILABLE - The MQTT service is unavailable.");
break;
case MQTT_BAD_USER_NAME_OR_PASSWORD:
LOG_ERR("Error: MQTT_BAD_USER_NAME_OR_PASSWORD - Username or password is malformed.");
break;
case MQTT_NOT_AUTHORIZED:
LOG_ERR("Error: MQTT_NOT_AUTHORIZED - The client is not authorized to connect.");
break;
default:
LOG_ERR("Error: Unknown connection return code (%d)", connack->return_code);
break;
}
}
break;
}

case MQTT_EVT_DISCONNECT:
mqtt_connected = false;
Expand All @@ -48,15 +74,6 @@ static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt
case MQTT_EVT_PUBLISH: {
const struct mqtt_publish_param *publish = &evt->param.publish;
LOG_INF("Message received on topic: %s", publish->message.topic.topic.utf8);

/* Handle messages */
if (strstr(publish->message.topic.topic.utf8, "start")) {
LOG_INF("Start command received");
/* Handle start command */
} else if (strstr(publish->message.topic.topic.utf8, "stop")) {
LOG_INF("Stop command received");
/* Handle stop command */
}
break;
}

Expand All @@ -69,6 +86,7 @@ static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt
break;

default:
LOG_WRN("Unhandled MQTT event: %d", evt->type);
break;
}
}
Expand Down Expand Up @@ -141,11 +159,36 @@ int mqtt_client_discovery_announce(const char *proplet_id, const char *channel_i
.retain_flag = 0,
};

if (!mqtt_connected) {
LOG_ERR("MQTT client is not connected. Discovery announcement aborted.");
return -ENOTCONN;
}

int ret = mqtt_publish(&client_ctx, &param);
if (ret != 0) {
LOG_ERR("Failed to publish discovery announcement, ret=%d", ret);
LOG_ERR("Failed to publish discovery announcement. Error code: %d", ret);

switch (ret) {
case -ENOTCONN:
LOG_ERR("Error: MQTT client is not connected to the broker.");
break;
case -EIO:
LOG_ERR("Error: I/O error occurred while publishing.");
break;
case -EINVAL:
LOG_ERR("Error: Invalid parameter provided to publish.");
break;
case -ENOMEM:
LOG_ERR("Error: Insufficient memory to process the publish request.");
break;
default:
LOG_ERR("Error: Unknown publishing error.");
break;
}
return ret;
}

LOG_INF("Discovery announcement published successfully to topic: %s", topic);
return ret;
}

Expand Down
12 changes: 4 additions & 8 deletions embed-proplet/src/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@
#define MQTT_CLIENT_H

#include <zephyr/net/mqtt.h>
#include <stdbool.h>

/* Initialize and connect the MQTT client */
int mqtt_client_init_and_connect(void);

/* Process MQTT events */
void mqtt_client_process(void);
extern bool mqtt_connected;

/* Publish discovery announcement */
int mqtt_client_init_and_connect(void);
int mqtt_client_discovery_announce(const char *proplet_id, const char *channel_id);

/* Subscribe to required topics */
int mqtt_client_subscribe(const char *channel_id);
void mqtt_client_process(void);

#endif /* MQTT_CLIENT_H */

0 comments on commit 1321428

Please sign in to comment.