Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Diff: server_pkts.cpp
- Revision:
- 0:547251f42a60
diff -r 000000000000 -r 547251f42a60 server_pkts.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_pkts.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,1000 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "server_pkts.h" + +namespace mbed_mqtt { + +/*----------------------------------------------------------------------------- + * Note: Do not create additional dependency of this file on any header other + * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the + * mqtt_common.[hc] files must be facilitated to create a stand-alone library. + *----------------------------------------------------------------------------- + */ + +static void *mutex = NULL; +static void (*mutex_lockin)(void*) = NULL; +static void (*mutex_unlock)(void*) = NULL; + +#define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); +#define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); + +static bool aux_dbg_enbl = false; +static int32_t (*debug_printf)(const char *fmt, ...) = NULL; + +#define USR_INFO debug_printf +#define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) + +static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end, + struct utf8_string *utf8) +{ + const uint8_t *ref = buf; /* Reference */ + uint16_t len = 0; /* UTF8 Size */ + + if(end - buf < 2) + return -1; /* No valid buffer to hold UTF8 size */ + + buf += buf_rd_nbo_2B(buf, &len); + if(end - buf < len) + return -1; /* No valid buffer to hold UTF8 name */ + + utf8->length = len; + utf8->buffer = len? (char*)buf : NULL; + + return buf + len - ref; +} + +static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL; +static const struct device_net_services *net_ops = NULL; + +#ifndef CFG_SR_MQTT_CTXS +#define MAX_NWCONN 6 +#else +#define MAX_NWCONN CFG_SR_MQTT_CTXS +#endif + +static struct client_ctx contexts[MAX_NWCONN]; + +static struct client_ctx *used_ctxs = NULL; +static struct client_ctx *free_ctxs = NULL; + + +#define NETWORK_CLOSE_FLAG 0x00200000 +#define NW_CONN_ERROR_FLAG 0x00400000 +#define RCVD_CONN_MSG_FLAG 0x00800000 + +#define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) + +static void cl_ctx_init(void) +{ + int32_t i = 0; + for(i = 0; i < MAX_NWCONN; i++) { + struct client_ctx *cl_ctx = contexts + i; + + cl_ctx_reset(cl_ctx); + + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + } +} + +static void cl_ctx_free(struct client_ctx *cl_ctx) +{ + cl_ctx_reset(cl_ctx); + + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + + return; +} + +static struct client_ctx *cl_ctx_alloc(void) +{ + struct client_ctx *cl_ctx = free_ctxs; + if(cl_ctx) { + free_ctxs = cl_ctx->next; + cl_ctx->next = NULL; + } else + USR_INFO("S: fatal, no free cl_ctx\n\r"); + + return cl_ctx; +} + +static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx) +{ + return (cl_ctx->flags & RCVD_CONN_MSG_FLAG); +} + +static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx) +{ + cl_ctx->flags |= RCVD_CONN_MSG_FLAG; +} + +static void used_ctxs_insert(struct client_ctx *cl_ctx) +{ + cl_ctx_timeout_insert(&used_ctxs, cl_ctx); +} + +static void used_ctxs_remove(struct client_ctx *cl_ctx) +{ + cl_ctx_remove(&used_ctxs, cl_ctx); +} + +static int32_t loopb_net = -1; +static const uint8_t LOOP_DATA[] = {0x00, 0x01}; +#define LOOP_DLEN sizeof(LOOP_DATA) +static uint16_t loopback_port = 0; +static bool pending_trigs = false; + +static int32_t loopb_trigger(void) +{ + uint8_t ip_addr[] = {127,0,0,1}; + int32_t rv = 0; + + if((-1 != loopb_net) && (false == pending_trigs)) { + rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, + loopback_port, ip_addr, 4); + if(0 == rv) + pending_trigs = true; + } + + return rv; +} + +static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err) +{ + DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net); + + net_ops->close(cl_ctx->net); + cl_ctx->net = -1; + + if(cl_ctx->usr) + usr_cbs->on_cl_net_close(cl_ctx->usr, due2err); + + used_ctxs_remove(cl_ctx); + cl_ctx_free(cl_ctx); +} + +static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err) +{ + if(due2err) + cl_ctx->flags |= NW_CONN_ERROR_FLAG; + + cl_ctx->flags |= NETWORK_CLOSE_FLAG; + + loopb_trigger(); +} + +static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len) +{ + int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL); + if(rv <= 0) { + do_net_close_tx(cl_ctx, true); + rv = MQP_ERR_NETWORK; + } + + USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r", + *buf, len, cl_ctx->net, rv? "Sent" : "Fail"); + return rv; +} + +static +int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data) +{ + uint8_t buf[4]; + uint32_t len = 2; + + if(false == had_rcvd_conn_msg(cl_ctx)) + return MQP_ERR_NOTCONN; + + buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); + buf[1] = has_vh ? 2 : 0; + + if(has_vh) + len += buf_wr_nbo_2B(buf + 2, vh_data); + + return cl_ctx_send(cl_ctx, buf, len); +} + +static +int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, + uint16_t vh_data) +{ + struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl; + + return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos, + has_vh, vh_data) : -1; +} + +int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, + uint16_t vh_data) +{ + return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); +} + +int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data) +{ + int32_t rv; + + MUTEX_LOCKIN(); + rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); + MUTEX_UNLOCK(); + + return rv; +} + +int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf) +{ + struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl; + + int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, + true, (vh_buf[0] << 8) | vh_buf[1]); + + if((rv > 0) && (0x00 != vh_buf[1])) + do_net_close_tx(cl_ctx, true); + + return rv; +} + +static +int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + int32_t rv = 0; + uint8_t *buf = MQP_FHEADER_BUF(mqp); + + if(dup) + *buf |= DUP_FLAG_VAL(true); + + rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp)); + + *buf &= ~DUP_FLAG_VAL(true); + + return rv; +} + +int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); +} + +int32_t +mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + int32_t rv; + + MUTEX_LOCKIN(); + rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); + MUTEX_UNLOCK(); + + return rv; +} + +#define MQP_MAX_TOPICS 16 +#define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2) + +static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs, + uint32_t pay_len, uint16_t msg_id) +{ + uint8_t *ref = buf += MAX_FH_LEN; + + if(MQP_SUBACK_PAY_OFS != pay_ofs) + return MQP_ERR_PKT_LEN; + + buf += buf_wr_nbo_2B(buf, msg_id); + ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES, + pay_len + buf - ref); + + ref -= 1; + *ref = MAKE_FH_BYTE1(MQTT_SUBACK, + MAKE_FH_FLAGS(false, MQTT_QOS0, false)); + + return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref); +} + +static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id); +} + +/*---------------------------------------------------------------------------- + * Receive Routines + *---------------------------------------------------------------------------- + */ + +/* Candidate to be moved to mqtt_common.c file */ +static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) +{ + uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); + + if(mqp_raw->pl_len < 2) + return false; /* Bytes for MSG ID not available */ + + buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); + mqp_raw->vh_len += 2; + mqp_raw->pl_len -= 2; + + return true; +} + +#define BRK_IF_RD_ERR_UTF8(buf, end, utf8) \ + if(rd_buf_utf8(buf, end, utf8) < 0) \ + break; + +static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos) +{ + struct utf8_string utf8; + uint8_t *ref = buf; + + buf += mqp_buf_rd_utf8(buf, end, &utf8); + + /* Assess that UTF8 has been read and QOS can be read */ + if((buf > ref) && (end > buf)) { + utf8_qos->buffer = utf8.buffer; + utf8_qos->length = utf8.length; + utf8_qos->qosreq = (enum mqtt_qos)*buf++; + + return buf - ref; + } + + return -1; +} + +static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw, + struct utf8_strqos *qos_topics, uint32_t *n_topics) +{ + uint8_t *buf, *end; + uint32_t i = 0; + + if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) + return false; /* Problem in contents received from client */ + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + + for(i = 0; (i < *n_topics) && (buf < end); i++) { + struct utf8_strqos *qos_top = qos_topics + i; + int32_t len = buf_rd_utf8_qos(buf, end, qos_top); + if(len < 0) + break; /* Failed to read Topic */ + + buf += len; + } + + *n_topics = i; + + return ((0 == i) || (buf != end))? false : true; +} + +static +bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint32_t n_topics = MQP_MAX_TOPICS; + uint16_t msg_id; + + struct utf8_strqos qos_topics[MQP_MAX_TOPICS]; + uint8_t ack[MQP_MAX_TOPICS + MQP_SUBACK_PAY_OFS]; + + if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics)) + return false; + + msg_id = mqp_raw->msg_id; + + /* All topics have been now put in array, pass-on info to upper layer */ + if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics, + msg_id, ack + MQP_SUBACK_PAY_OFS)) { + + sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id); + + return true; + } + + return false; +} + + +static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw, + struct utf8_string *topics, uint32_t *n_topics) +{ + uint8_t *buf, *end; + uint32_t i = 0; + + if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) + return false; /* Problem in contents received from client */ + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + + for(i = 0; (i < *n_topics) && (buf < end); i++) { + struct utf8_string *topic = topics + i; + int32_t len = mqp_buf_rd_utf8(buf, end, topic); + if(len < 0) + break; /* Failed to read Topic */ + + buf += len; + } + + *n_topics = i; + + return ((0 == i) || (buf != end))? false : true; +} + +static +bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint32_t n_topics = MQP_MAX_TOPICS; + uint16_t msg_id; + + struct utf8_string topics[MQP_MAX_TOPICS]; + + if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics)) + return false; + + msg_id = mqp_raw->msg_id; + + /* All topics have been now put in array, pass-on info to upper layer */ + if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) { + unsub_ack_send(cl_ctx, msg_id); + return true; + } + + return false; +} + +static bool proc_pingreq_rx(struct client_ctx *cl_ctx) +{ + vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00); + return true; +} + +static bool proc_disconn_rx(struct client_ctx *cl_ctx) +{ + do_net_close_rx(cl_ctx, false); + return true; +} + +static +bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + bool rv = mqp_proc_pub_rx(mqp_raw); + uint8_t B = mqp_raw->fh_byte1; + enum mqtt_qos qos = ENUM_QOS(B); + struct utf8_string topic; + uint16_t msg_id = 0; + + if(false == rv) + return rv; /* Didn't get a good PUB Packet */ + + msg_id = mqp_raw->msg_id; + + topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw); + topic.length = MQP_PUB_TOP_LEN(mqp_raw); + + rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic, + MQP_PUB_PAY_BUF(mqp_raw), + MQP_PUB_PAY_LEN(mqp_raw), + msg_id, BOOL_DUP(B), qos, + BOOL_RETAIN(B)); + if(false == rv) + return rv; + + if(MQTT_QOS1 == qos) + vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); + + if(MQTT_QOS2 == qos) + vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); + + return rv; +} + +static +bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) || + (false == usr_cbs->ack_notify(cl_ctx->usr, + mqp_raw->msg_type, + mqp_raw->msg_id))) + return false; + + return true; +} + +#define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD +//#define KA_TIMEOUT_NONE (0xFFFFFFFF) + +static void rx_timeout_update(struct client_ctx *cl_ctx) +{ + if(false == had_rcvd_conn_msg(cl_ctx)) + return; + + cl_ctx_timeout_update(cl_ctx, net_ops->time()); + + used_ctxs_remove(cl_ctx); + used_ctxs_insert(cl_ctx); + + return; +} + +static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver) +{ + const char *buf = utf8->buffer; + + /* Check for protocol version 3.1.1 */ + if((4 == utf8->length) && + (buf[0] == 'M') && + (buf[1] == 'Q') && + (buf[2] == 'T') && + (buf[3] == 'T') && + (0x04 == ver)) + return true; + + /* Check for protocol version 3.1 */ + if((6 == utf8->length) && + (buf[0] == 'M') && + (buf[1] == 'Q') && + (buf[2] == 'I') && + (buf[3] == 's') && + (buf[4] == 'd') && + (buf[5] == 'p') && + (0x03 == ver)) + return true; + + return false; +} + +static uint16_t proc_connect_vh_rx(struct mqtt_packet *mqp_raw, + uint8_t *conn_flags, uint16_t *ka_secs) +{ + struct utf8_string utf8; + uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw); + uint8_t *end = buf + mqp_raw->pl_len; + uint8_t *ref = buf; + + buf += mqp_buf_rd_utf8(buf, end, &utf8); + if(end - buf < 1) + return CONNACK_RC_BAD_PROTOV; /* No proto ver */ + + if(false == proc_protocol_info(&utf8, *buf++)) + return CONNACK_RC_BAD_PROTOV; + + *conn_flags = *buf++; + + if(end - buf < 2) + return 0xFF; /* Bad packet composition */ + + *ka_secs = (buf[0] << 8) | buf[1]; + buf += 2; + *ka_secs += *ka_secs >> 1; + + mqp_raw->vh_len = buf - ref; + mqp_raw->pl_len -= buf - ref; + + return 0; +} + +#define RET_IF_RD_CONN_ERROR(buf, end, utf8) \ + { \ + int32_t len = mqp_buf_rd_utf8(buf, end, utf8); \ + if(len < 0) \ + return 0x00FF; \ + \ + buf += len; \ + } + +uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags, + struct utf8_string *free_utf8s, + struct utf8_string **used_refs) +{ + struct utf8_string *utf8; + + utf8 = used_refs[0] = free_utf8s + 0; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + + if(conn_flags & WILL_CONFIG_VAL) { + utf8 = used_refs[1] = free_utf8s + 1; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + + utf8 = used_refs[2] = free_utf8s + 2; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + if((0 == (conn_flags & USER_NAME_OPVAL)) && + (0 != (conn_flags & PASS_WORD_OPVAL))) + return 0x00FF; /* Bad combination */ + + if(conn_flags & USER_NAME_OPVAL) { + utf8 = used_refs[3] = free_utf8s + 3; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + if(conn_flags & PASS_WORD_OPVAL) { + utf8 = used_refs[4] = free_utf8s + 4; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + return 0; +} + +static +bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL}; + struct utf8_string free_utf8s[5]; + uint8_t conn_flags, *buf, *end; + bool clean_session = false; + uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */ + + set_rcvd_conn_msg(cl_ctx); + + ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs); + if(ack_vh16) + goto proc_connect_rx_exit1; + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + ack_vh16 = proc_connect_pl_rx(buf, end, conn_flags, + free_utf8s, used_refs); + if(ack_vh16) + goto proc_connect_rx_exit1; + + clean_session = (conn_flags & CLEAN_START_VAL)? true : false; + ack_vh16 = (!used_refs[0]->length && !clean_session)? + CONNACK_RC_CLI_REJECT : 0; /* Validate 0 byte Client ID */ + if(ack_vh16) + goto proc_connect_rx_exit1; + + /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */ + ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0], + &cl_ctx->usr); + proc_connect_rx_exit1: + + DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16, + ack_vh16 & 0xFF? "error" : "good"); + + if(0xFF != (ack_vh16 & 0xFF)) + vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16); + + if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) { + rx_timeout_update(cl_ctx); + usr_cbs->on_connack_send(cl_ctx->usr, clean_session); + } else { + return false; + } + + return true; +} + + +static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list) +{ + int32_t i = 0; + + for(i = 0; (i < size) && (NULL != list); i++, list = list->next) + recv_hvec[i] = list->net; + + recv_hvec[i] = -1; + + return; +} + +static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint8_t msg_type = mqp_raw->msg_type; + bool rv = false; + + USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r", + mqp_raw->fh_byte1, cl_ctx->net); + + if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx)) + goto process_recv_exit1; /* Proto Violation */ + + rx_timeout_update(cl_ctx); + + switch(msg_type) { + + case MQTT_CONNECT: + rv = proc_connect_rx(cl_ctx, mqp_raw); + break; + + case MQTT_DISCONNECT: + rv = proc_disconn_rx(cl_ctx); + break; + + case MQTT_SUBSCRIBE: + rv = proc_sub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_UNSUBSCRIBE: + rv = proc_unsub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PINGREQ: + rv = proc_pingreq_rx(cl_ctx); + break; + + case MQTT_PUBLISH: + rv = proc_pub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PUBACK: + case MQTT_PUBREC: + case MQTT_PUBREL: + case MQTT_PUBCOMP: + rv = proc_ack_msg_rx(cl_ctx, mqp_raw); + break; + + default: + break; + } + + process_recv_exit1: + DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r", + mqp_raw->msg_id, rv? "Good" : "Fail"); + + return rv; +} + + +/* Terminate net connections which haven't received PKTs beyond expected time. + Caller must ensure atomic enviroment for execution of this routine. +*/ +static void ka_sequence(uint32_t *secs2wait) +{ + struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */ + uint32_t now_secs = net_ops->time(); + + while(NULL != cl_ctx) { + struct client_ctx *next = cl_ctx->next; + if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) { + bool due2err = false; + if(cl_ctx->flags & NW_CONN_ERROR_FLAG) + due2err = true; + + cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG | + NETWORK_CLOSE_FLAG); + + /* Close network: Timeout or TX err */ + do_net_close_rx(cl_ctx, due2err); + } + + cl_ctx = next; + } + + cl_ctx = used_ctxs; + if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) || + ((NULL == cl_ctx))) + *secs2wait = IO_MON_NO_TIMEOUT; + else + *secs2wait = cl_ctx->timeout - now_secs; + + return; +} + +/* Put a new functiona name such as mk_new_ctx() or setup_ctx() and + processing to restrict limit number of connections. + + Return value as well. +*/ +static bool accept_ctx(int32_t net, uint32_t wait_secs) +{ + struct client_ctx *cl_ctx = cl_ctx_alloc(); + if(NULL == cl_ctx) + return false; + + cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip, + &cl_ctx->ip_length); + if(-1 == cl_ctx->net) { + cl_ctx_free(cl_ctx); + USR_INFO("S: failed to accept new NW connection\n\r"); + return false; + } + + DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net); + + /* Timeout to receive MQTT_CONNECT */ + cl_ctx->timeout = wait_secs + net_ops->time(); + + used_ctxs_insert(cl_ctx); + return true; +} + +static struct client_ctx *net_cl_ctx_find(int32_t net) +{ + struct client_ctx *cl_ctx = used_ctxs; + while(cl_ctx) { + if(net == cl_ctx->net) + break; + + cl_ctx = cl_ctx->next; + } + + if(NULL == cl_ctx) + USR_INFO("Did not find ctx for net %d\n\r", net); + + return cl_ctx; +} + +static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */ +static int32_t send_hvec = -1; +static int32_t rsvd_hvec = -1; +static int32_t listen_net = -1; + +static struct mqtt_packet rx_mqp; +static uint8_t rxb[MQP_SERVER_RX_LEN]; +static uint16_t listener_port = 0; + +static inline +int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out) +{ + int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL); + if(rv <= 0) + rv = MQP_ERR_NETWORK; + + return rv; +} + +static int32_t proc_loopback_recv(int32_t net) +{ + uint8_t buf[LOOP_DLEN]; + + /* Thanks for waking-up thread and do nothing in this routine */ + int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); + pending_trigs = false; + + if(rv <= 0) { + net_ops->close(net); + return MQP_ERR_LIBQUIT; + } + + return rv; +} + +static void proc_net_data_recv(int32_t net) +{ + struct client_ctx *cl_ctx = net_cl_ctx_find(net); + bool dummy; + int32_t rv; + + mqp_reset(&rx_mqp); /* Start w/ a clean buffer */ + + rv = net_recv(net, &rx_mqp, 0, &dummy); + if(rv > 0) + /* Working Principle: Only RX processing errors should be + reported as 'false'. Status of TX as a follow-up to RX + messages need not be reported by the xyz_rx() routines. + Error observed in TX is either dealt in next iteration + of RX loop. + */ + if(false == process_recv(cl_ctx, &rx_mqp)) + rv = MQP_ERR_CONTENT; + + if(rv < 0) + do_net_close_rx(cl_ctx, rv); +} + +static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs) +{ + bool rv = true; + int32_t idx = 0; + + MUTEX_LOCKIN(); + + for(idx = 0; (idx < n_hnds) && (rv == true); idx++) { + int32_t net = recv_hvec[idx]; + if(net == listen_net) { + rv = accept_ctx(listen_net, wait_secs); + } else if(loopback_port && (net == loopb_net)) { + if(proc_loopback_recv(loopb_net) < 0) + rv = false; + } else { + proc_net_data_recv(net); + } + } + + MUTEX_UNLOCK(); + + return rv; +} + +int32_t mqtt_server_run(uint32_t wait_secs) // TBD break into two functions +{ + uint32_t secs2wait = 0; + int32_t n_hnds = 0; + + USR_INFO("S: MQTT Server Run invoked ....\n\r"); + + if(NULL == net_ops) + return MQP_ERR_NET_OPS; + + if(loopback_port) { + loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, + loopback_port, NULL); + if(-1 == loopb_net) + return MQP_ERR_LIBQUIT; + } + + listen_net = net_ops->listen(0, listener_port, NULL); + if(-1 == listen_net) + return MQP_ERR_LIBQUIT; + + do { + int32_t *r_hvec = recv_hvec + 0; + + *r_hvec++ = listen_net; + if(loopback_port) + *r_hvec++ = loopb_net; + + /* MQTT Timeouts: close expired conns; get time to next expiry */ + ka_sequence(&secs2wait); + + /* Prepare array of net handles. Must've atleast listen handle */ + // recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1, used_ctxs); + recv_hvec_load(r_hvec, MAX_NWCONN + 1, used_ctxs); + + n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, + &rsvd_hvec, secs2wait); + if(n_hnds < 0) + return MQP_ERR_LIBQUIT; + + if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs)) + return MQP_ERR_LIBQUIT; + + } while(1); +} + +int32_t mqtt_server_register_net_svc(const struct device_net_services *net) +{ + if(net && net->io_mon && net->close && net->send && + net->recv && net->time && net->listen) { + net_ops = net; + return 0; + } + + return -1; +} + +int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg, + const struct mqtt_server_msg_cbs *msg_cbs) +{ + cl_ctx_init(); + + if((NULL == lib_cfg) || + (0 == lib_cfg->listener_port) || + (NULL == lib_cfg->debug_printf)) + return -1; + + debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ + + loopback_port = lib_cfg->loopback_port; + listener_port = lib_cfg->listener_port; + + mutex = lib_cfg->mutex; + mutex_lockin = lib_cfg->mutex_lockin; + mutex_unlock = lib_cfg->mutex_unlock; + + aux_dbg_enbl = lib_cfg->aux_debug_en; + debug_printf = lib_cfg->debug_printf; + + usr_cbs = &usr_obj; + + memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs)); + + mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0); + + return 0; +} + +}//namespace mbed_mqtt