Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
client_mgmt.cpp
- Committer:
- dflet
- Date:
- 2015-06-06
- Revision:
- 0:547251f42a60
File content as of revision 0:547251f42a60:
/******************************************************************************
*
* Copyright (C) 2014 Texas Instruments Incorporated
*
* All rights reserved. Property of Texas Instruments Incorporated.
* Restricted rights to use, duplicate or disclose this code are
* granted through contract.
*
* The program may not be used without the written permission of
* Texas Instruments Incorporated or against the terms and conditions
* stipulated in the agreement under which this program has been supplied,
* and under no circumstances can it be used with non-TI connectivity device.
*
******************************************************************************/
#include "server_util.h"
#include "client_mgmt.h"
#include "server_pkts.h"
namespace mbed_mqtt {
#ifndef CFG_SR_MAX_CL_ID_SIZE
#define MAX_CLIENT_ID_LEN 64
#else
#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
#endif
#ifndef CFG_SR_MAX_NUM_CLIENT
#define MAX_CLIENTS 16 /* Must not exceed 32 */
#else
#define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT
#if (CFG_SR_MAX_NUM_CLIENT > 32)
#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
#endif
#endif
static struct client_usr {
void *ctx; /* Client net */
void *app; /* Client app */
void *will; /* Will topic */
uint32_t index;
uint32_t n_sub;
#define MQ_CONNECT_FLAG 0x00000001
#define CLEAN_SESS_FLAG 0x00000002
#define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */
uint32_t flags;
char client_id[MAX_CLIENT_ID_LEN];
struct pub_qos2_cq qos2_rx_cq;
struct pub_qos2_cq qos2_tx_cq;
} users[MAX_CLIENTS];
// TBD consistency between inline and macro functions
static inline bool is_connected(struct client_usr *cl_usr)
{
return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
}
static inline bool has_clean_session(struct client_usr *cl_usr)
{
return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
}
static inline bool is_assigned(struct client_usr *cl_usr)
{
return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
}
static inline void set_connect_state(struct client_usr *cl_usr,
bool connected)
{
if(connected)
cl_usr->flags |= MQ_CONNECT_FLAG;
else
cl_usr->flags &= ~MQ_CONNECT_FLAG;
}
static inline void set_clean_session(struct client_usr *cl_usr,
bool clean_session)
{
if(clean_session)
cl_usr->flags |= CLEAN_SESS_FLAG;
else
cl_usr->flags &= ~CLEAN_SESS_FLAG;
}
static inline void set_assigned(struct client_usr *cl_usr,
bool assignment)
{
if(assignment)
cl_usr->flags |= ASSIGNMENT_FLAG;
else
cl_usr->flags &= ~ASSIGNMENT_FLAG;
}
static void cl_usr_reset(struct client_usr *cl_usr)
{
cl_usr->ctx = NULL;
cl_usr->app = NULL;
cl_usr->will = NULL;
cl_usr->n_sub = 0;
cl_usr->flags = 0;
cl_usr->client_id[0] = '\0';
qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
}
static void cl_usr_init(void)
{
int32_t idx = 0;
for(idx = 0; idx < MAX_CLIENTS; idx++) {
struct client_usr *cl_usr = users + idx;
cl_usr->index = idx;
cl_usr_reset(cl_usr);
}
}
static void cl_usr_free(struct client_usr *cl_usr)
{
cl_usr_reset(cl_usr);
cl_usr->flags &= ~(MQ_CONNECT_FLAG |
CLEAN_SESS_FLAG |
ASSIGNMENT_FLAG);
}
void cl_sub_count_add(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
if(is_connected(cl_usr)) {
cl_usr->n_sub++;
USR_INFO("%s has added a new sub, now total is %u\n\r",
cl_usr->client_id, cl_usr->n_sub);
}
return;
}
void cl_sub_count_del(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
if(is_connected(cl_usr)) {
cl_usr->n_sub--;
USR_INFO("%s has deleted a sub, now total is %u\n\r",
cl_usr->client_id, cl_usr->n_sub);
}
return;
}
/*----------------------------------------------------------------------------
* QoS2 PUB RX Message handling mechanism and associated house-keeping
*--------------------------------------------------------------------------*/
static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
{
return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
}
static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
{
return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
}
static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
{
return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
}
static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
{
return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
}
static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
{
return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
}
bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
qos2_pub_rx_logup(cl_usr, msg_id));
}
static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
{
struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
uint8_t rd_idx = tx_cq->rd_idx;
uint8_t n_free = tx_cq->n_free;
uint8_t i = 0;
for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
true, tx_cq->id_vec[i]) <= 0)
break;
}
return;
}
static void ack2_msg_id_purge(struct client_usr *cl_usr)
{
qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
}
/*
*/
static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL};
static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;
/*
*/
static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL};
static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;
#define MQP_CL_MAP_GET(mqp) (mqp->private_)
#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map)
#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)
#define CL_BMAP(cl_usr) (1 << cl_usr->index)
static inline
int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
bool dup)
{
/* Error, if any, is handled in 'cl_net_close()' ....*/
return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
}
static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
{
struct mqtt_packet *mqp = NULL;
for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
_pub_dispatch(cl_usr, mqp, true);
}
#if 0
static struct client_usr *find_cl_usr(uint32_t index)
{
struct client_usr *cl_usr = users + 0;
int32_t i = 0;
for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
if(index == cl_usr->index)
break;
}
return (MAX_CLIENTS == i)? NULL : cl_usr;
}
#endif
#define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \
('\0' == cl_usr->client_id[0]))
#define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \
('\0' != cl_usr->client_id[0]))
#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \
((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \
((MQTT_QOS0 == qos)))? \
false : true)
static inline uint32_t _cl_bmap_get(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
}
uint32_t cl_bmap_get(void *usr_cl)
{
return usr_cl? _cl_bmap_get(usr_cl) : 0;
}
void *cl_app_hndl_get(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
}
void *cl_will_hndl_get(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
}
static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
{
//uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */
uint32_t sp_map = 0; /* client Map for sessions present */
enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
const uint32_t cl_ref = cl_map; /* Keep ref to original */
int32_t i = 0;
for(i = 0; i < MAX_CLIENTS; i++) {
if(cl_map & (1 << i)) {
struct client_usr *cl_usr = users + i; //find_cl_usr(i);
if(is_connected(cl_usr))
if((_pub_dispatch(cl_usr, mqp, false) > 0) &&
NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
continue;/* Processing done; next CL */
/* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
cl_map &= ~(1 << i);
if(IS_CL_INACTIVE(cl_usr))
sp_map |= (1 << i); /* CL: Maintain session */
}
}
if(sp_map) {
struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
if(cpy) {
MQP_CL_MAP_SET(cpy, sp_map);
mqp_ack_wlist_append(sess_mqp_wl, cpy);
} else
sp_map = 0;
}
if(cl_map) { /* Wait List Publish */
MQP_CL_MAP_SET(mqp, cl_map);
mqp_ack_wlist_append(qos_ack1_wl, mqp);
} else
mqp_free(mqp); /* PUB MQP now has no more use; must be freed */
USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
cl_ref, cl_map, sp_map);
return;
}
void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
{
pub_dispatch(cl_map, mqp);
return;
}
/* Move to mqtt_common.h and remove it from mqtt_client.h */
static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
{
mqp_free(mqp);
return MQP_ERR_PKT_LEN;
}
/* Move this to a common file */
int32_t cl_pub_msg_send(void *usr_cl,
const struct utf8_string *topic, const uint8_t *data_buf,
uint32_t data_len, enum mqtt_qos qos, bool retain)
{
struct mqtt_packet *mqp = NULL;
if((NULL == topic) ||
((data_len > 0) && (NULL == data_buf)) ||
(NULL == usr_cl))
return MQP_ERR_FNPARAM;
mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
if(NULL == mqp)
return MQP_ERR_PKT_AVL;
if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
(data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len))))
return len_err_free_mqp(mqp);
mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
pub_dispatch(_cl_bmap_get(usr_cl), mqp);
return MQP_CONTENT_LEN(mqp);
}
static void wl_remove(struct mqtt_ack_wlist *list,
struct mqtt_packet *prev,
struct mqtt_packet *elem)
{
if(prev)
prev->next = elem->next;
else
list->head = elem->next;
if(NULL == list->head)
list->tail = NULL;
if(list->tail == elem)
list->tail = prev;
return;
}
static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
{
struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
uint32_t cl_map = CL_BMAP(cl_usr);
for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
struct mqtt_packet *cpy = NULL;
if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
continue; /* MQP & CL: no association */
/* MQP has associated client(s) - process it */
/* Dissociate this client from MQP */
MQP_CL_MAP_CLR(mqp, cl_map);
next = mqp->next; /* House keeping */
if(0 == MQP_CL_MAP_GET(mqp)) {
/* MQP w/ no clients, remove from WL */
wl_remove(sess_mqp_wl, prev, mqp);
/* Ensure correct arrangement for WL */
cpy = mqp; mqp = prev;
} else {
/* MQP is associated w/ other clients */
cpy = mqp_server_copy(mqp);
if(NULL == cpy)
continue;
}
/* Got packet from session, dispatch it to CL */
pub_dispatch(cl_map, cpy);
}
return;
}
static
struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
struct client_usr *cl_usr, uint16_t msg_id,
struct mqtt_packet **prev)
{
struct mqtt_packet *mqp = NULL;
*prev = NULL;
for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
if(msg_id == mqp->msg_id) {
/* Found a MQP whose msg_id matches with input */
MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
return mqp;
}
}
return NULL;
}
static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
struct mqtt_packet *mqp)
{
if(0 == MQP_CL_MAP_GET(mqp)) {
wl_remove(wl, prev, mqp);
mqp_free(mqp);
return true;
}
return false;
}
static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
{
struct mqtt_packet *prev = NULL, *mqp = NULL;
if(false == has_clean_session(cl_usr)) {
mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
if(mqp)
wl_rmfree_try(qos_ack1_wl, prev, mqp);
}
return true;
}
static
void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
{
struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
uint32_t bmap = CL_BMAP(cl_usr);
for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
next = mqp->next;
/* Ideally, check should be done to see if cl_usr and mqp are
associated. If yes, then the bit should be cleared. At the
moment, blind clearing of the bit has been implemented and
it has no side effects.
*/
MQP_CL_MAP_CLR(mqp, bmap);
/* MQP with no clients has no more use, so try deleting MQP */
if(wl_rmfree_try(wl, prev, mqp))
mqp = prev; /* MQP deleted, prep for next iteration */
}
return;
}
static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
{
wl_purge(qos_ack1_wl, cl_usr);
}
static void sess_wl_mqp_purge(struct client_usr *cl_usr)
{
wl_purge(sess_mqp_wl, cl_usr);
}
static void session_resume(struct client_usr *cl_usr)
{
ack1_wl_mqp_dispatch(cl_usr);
ack2_msg_id_dispatch(cl_usr);
sess_wl_mqp_dispatch(cl_usr);
}
static void session_delete(struct client_usr *cl_usr)
{
ack1_wl_mqp_purge(cl_usr);
ack2_msg_id_purge(cl_usr);
sess_wl_mqp_purge(cl_usr);
}
static bool has_session_data(struct client_usr *cl_usr)
{
uint32_t map = CL_BMAP(cl_usr);
struct mqtt_packet *elem;
if(cl_usr->n_sub)
return true;
elem = qos_ack1_wl->head;
while(elem) {
if(MQP_CL_MAP_GET(elem) & map)
return true;
elem = elem->next;
}
elem = sess_mqp_wl->head;
while(elem) {
if(MQP_CL_MAP_GET(elem) & map)
return true;
elem = elem->next;
}
return false;
}
bool cl_can_session_delete(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
return cl_usr? (has_clean_session(cl_usr) ||
!has_session_data(cl_usr)) : false;
}
void cl_on_net_close(void *usr_cl)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
if(is_assigned(cl_usr)) {
if(false == has_session_data(cl_usr))
cl_usr_free(cl_usr);
} else if(has_clean_session(cl_usr)) {
session_delete(cl_usr);
cl_usr_free(cl_usr);
} else {
set_connect_state(cl_usr, false);
cl_usr->ctx = NULL;
cl_usr->app = NULL;
}
return;
}
static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
{
mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
MQTT_QOS0, true, msg_id);
if(qos2_pub_rx_is_done(cl_usr, msg_id))
qos2_pub_rx_unlog(cl_usr, msg_id);
return true;
}
static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
{
struct mqtt_packet *prev = NULL, *mqp = NULL;
mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {
wl_rmfree_try(qos_ack1_wl, prev, mqp);
mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
MQTT_QOS1, true, msg_id);
return true;
}
return false;
}
bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
bool rv = false;
if(NULL == cl_usr)
return rv;
switch(msg_type) {
case MQTT_PUBACK:
rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
break;
case MQTT_PUBREC:
rv = proc_pub_rec_rx(cl_usr, msg_id);
break;
case MQTT_PUBREL:
rv = proc_pub_rel_rx(cl_usr, msg_id);
break;
case MQTT_PUBCOMP:
rv = ack2_msg_id_unlog(cl_usr, msg_id);
break;
default:
break;
}
return rv;
}
static void assign_cl_index_as_id(struct client_usr *cl_usr)
{
/* TBD: need a better implementation */
char *client_id = cl_usr->client_id;
client_id[0] = 'S';
client_id[1] = 'e';
client_id[2] = 'l';
client_id[3] = 'f';
client_id[4] = '-';
client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
client_id[6] = ((cl_usr->index & 0x0f)) + 0x30;
client_id[7] = '\0';
/* Make sure that above array size is with in MAX_CLIENT_ID_LEN */
return;
}
static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
{
struct client_usr *cl_usr, *fr_usr = NULL;
int32_t idx = 0;
for(idx = 0; idx < MAX_CLIENTS; idx++) {
cl_usr = users + idx;
if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
fr_usr = cl_usr; /* Note 1st free cl_usr */
if((NULL == client_id) && (NULL != fr_usr)) {
/* Free cl_usr is present to create CL-ID */
break;
} else if((NULL != client_id) &&
(0 == strncmp(cl_usr->client_id, client_id,
MAX_CLIENT_ID_LEN))) {
/* Found a client obj with exact ID match */
if(is_connected(cl_usr)) {
/* Error: CL-ID is already active */
vh_buf[1] = CONNACK_RC_CLI_REJECT;
cl_usr = NULL;
}
break;
}
}
if(idx == MAX_CLIENTS) { /* Did not find a match */
cl_usr = fr_usr;
if(NULL == cl_usr)
/* Server Resource unavailable */
vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
}
if(NULL != cl_usr) {
if(NULL == client_id)
assign_cl_index_as_id(cl_usr); /* Get ID */
else if(IS_CL_USR_FREE(cl_usr))
strncpy(cl_usr->client_id, client_id,
MAX_CLIENT_ID_LEN);
set_assigned(cl_usr, true);
}
USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
cl_usr? cl_usr->client_id : "");
return cl_usr;
}
uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
void *app_cl, void *will, void **usr_cl)
{
uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
struct client_usr *cl_usr;
if(client_id && ('\0' == client_id[0]))
/* Shouldn't happen: CLIENT ID with a NUL only string */
return CONNACK_RC_CLI_REJECT;
cl_usr = assign_cl_usr(client_id, vh_buf);
if(NULL == cl_usr)
return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */
if((false == clean_session) && has_session_data(cl_usr))
vh_buf[0] = 0x01; /* Set Session Present Flag */
#if 0
if(0x00 == vh_buf[1])
*usr_cl = (void*)cl_usr;
#endif
*usr_cl = (void*)cl_usr;
cl_usr->ctx = ctx_cl;
cl_usr->app = app_cl;
cl_usr->will = will;
return ((vh_buf[0] << 8) | vh_buf[1]);
}
void cl_on_connack_send(void *usr_cl, bool clean_session)
{
struct client_usr *cl_usr = (struct client_usr*) usr_cl;
if(false == is_assigned(cl_usr)) {
USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
cl_usr->index);
return;
}
set_assigned(cl_usr, false);
set_clean_session(cl_usr, clean_session);
set_connect_state(cl_usr, true);
if(clean_session) {
session_delete(cl_usr);
} else {
session_resume(cl_usr);
}
DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);
return;
}
int32_t cl_mgmt_init(void)
{
cl_usr_init();
return 0;
}
}//namespace mbed_mqtt