TI's MQTT Demo with freertos CM4F

Dependencies:   mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers server_pkts.cpp Source File

server_pkts.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 #include "server_pkts.h"
00017 
00018 namespace mbed_mqtt {
00019 
00020 /*-----------------------------------------------------------------------------
00021  * Note: Do not create additional dependency of this file on any header other
00022  * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the
00023  * mqtt_common.[hc] files must be facilitated to create a stand-alone library.
00024  *-----------------------------------------------------------------------------
00025  */
00026 
00027 static void  *mutex = NULL;
00028 static void (*mutex_lockin)(void*) = NULL;
00029 static void (*mutex_unlock)(void*) = NULL;
00030 
00031 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
00032 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
00033 
00034 static bool aux_dbg_enbl = false;
00035 static int32_t (*debug_printf)(const char *fmt, ...) = NULL;
00036 
00037 #define USR_INFO debug_printf
00038 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
00039 
00040 static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end,
00041                            struct utf8_string *utf8)
00042 {
00043         const uint8_t *ref = buf; /* Reference */
00044         uint16_t len = 0;         /* UTF8 Size */
00045 
00046         if(end - buf < 2)
00047                 return -1; /* No valid buffer to hold UTF8 size */
00048 
00049         buf += buf_rd_nbo_2B(buf, &len);
00050         if(end - buf < len)
00051                 return -1; /* No valid buffer to hold UTF8 name */
00052 
00053         utf8->length = len;
00054         utf8->buffer = len? (char*)buf : NULL;
00055 
00056         return buf + len - ref;
00057 }
00058 
00059 static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL;
00060 static const struct device_net_services *net_ops = NULL;
00061 
00062 #ifndef CFG_SR_MQTT_CTXS
00063 #define MAX_NWCONN 6
00064 #else
00065 #define MAX_NWCONN CFG_SR_MQTT_CTXS
00066 #endif
00067 
00068 static struct client_ctx contexts[MAX_NWCONN];
00069 
00070 static struct client_ctx *used_ctxs = NULL;
00071 static struct client_ctx *free_ctxs = NULL;
00072 
00073 
00074 #define NETWORK_CLOSE_FLAG         0x00200000
00075 #define NW_CONN_ERROR_FLAG         0x00400000
00076 #define RCVD_CONN_MSG_FLAG         0x00800000
00077 
00078 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
00079 
00080 static void cl_ctx_init(void)
00081 {
00082         int32_t i = 0;
00083         for(i = 0; i < MAX_NWCONN; i++) {
00084                 struct client_ctx *cl_ctx = contexts + i;
00085 
00086                 cl_ctx_reset(cl_ctx);
00087 
00088                 cl_ctx->next = free_ctxs;
00089                 free_ctxs = cl_ctx;
00090         }
00091 }
00092 
00093 static void cl_ctx_free(struct client_ctx *cl_ctx)
00094 {
00095         cl_ctx_reset(cl_ctx);
00096 
00097         cl_ctx->next = free_ctxs;
00098         free_ctxs = cl_ctx;
00099 
00100         return;
00101 }
00102 
00103 static struct client_ctx *cl_ctx_alloc(void)
00104 {
00105         struct client_ctx *cl_ctx = free_ctxs;
00106         if(cl_ctx) {
00107                 free_ctxs = cl_ctx->next;
00108                 cl_ctx->next = NULL;
00109         } else
00110                 USR_INFO("S: fatal, no free cl_ctx\n\r");
00111 
00112         return cl_ctx;
00113 }
00114 
00115 static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx)
00116 {
00117         return (cl_ctx->flags & RCVD_CONN_MSG_FLAG);
00118 }
00119 
00120 static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx)
00121 {
00122         cl_ctx->flags |= RCVD_CONN_MSG_FLAG;
00123 }
00124 
00125 static void used_ctxs_insert(struct client_ctx *cl_ctx)
00126 {
00127         cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
00128 }
00129 
00130 static void used_ctxs_remove(struct client_ctx *cl_ctx)
00131 {
00132         cl_ctx_remove(&used_ctxs, cl_ctx);
00133 }
00134 
00135 static int32_t loopb_net        = -1;
00136 static const uint8_t LOOP_DATA[] = {0x00, 0x01};
00137 #define LOOP_DLEN sizeof(LOOP_DATA)
00138 static uint16_t  loopback_port   = 0;
00139 static bool pending_trigs   = false;
00140 
00141 static int32_t loopb_trigger(void)
00142 {
00143         uint8_t ip_addr[] = {127,0,0,1};
00144         int32_t rv = 0;
00145         
00146         if((-1 != loopb_net) && (false == pending_trigs)) {
00147                 rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
00148                                         loopback_port, ip_addr, 4);
00149                 if(0 == rv)
00150                         pending_trigs = true;
00151         }
00152 
00153         return rv;
00154 }
00155 
00156 static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err)
00157 {
00158         DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net);
00159 
00160         net_ops->close(cl_ctx->net);
00161         cl_ctx->net = -1;
00162 
00163         if(cl_ctx->usr)
00164                 usr_cbs->on_cl_net_close(cl_ctx->usr, due2err);
00165 
00166         used_ctxs_remove(cl_ctx);
00167         cl_ctx_free(cl_ctx);
00168 }
00169 
00170 static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err)
00171 {
00172         if(due2err)
00173                 cl_ctx->flags |= NW_CONN_ERROR_FLAG;
00174 
00175         cl_ctx->flags |= NETWORK_CLOSE_FLAG;
00176 
00177         loopb_trigger();
00178 }
00179 
00180 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len)
00181 {
00182         int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL);
00183         if(rv <= 0) {
00184                 do_net_close_tx(cl_ctx, true);
00185                 rv = MQP_ERR_NETWORK;
00186         }
00187 
00188         USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r",
00189                  *buf, len, cl_ctx->net, rv? "Sent" : "Fail");
00190         return rv;
00191 }
00192 
00193 static
00194 int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos,
00195                 bool has_vh, uint16_t vh_data)
00196 {
00197         uint8_t  buf[4];
00198         uint32_t len = 2;
00199 
00200         if(false == had_rcvd_conn_msg(cl_ctx))
00201                 return MQP_ERR_NOTCONN;
00202 
00203         buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
00204         buf[1] = has_vh ? 2 : 0;
00205 
00206         if(has_vh)
00207                 len += buf_wr_nbo_2B(buf + 2, vh_data);
00208 
00209         return cl_ctx_send(cl_ctx, buf, len);
00210 }
00211 
00212 static
00213 int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
00214                       uint16_t vh_data)
00215 {
00216         struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl;
00217 
00218         return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos,
00219                                    has_vh, vh_data) : -1;
00220 }
00221 
00222 int32_t  mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
00223                       uint16_t vh_data)
00224 {
00225         return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
00226 }
00227 
00228 int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
00229                             bool has_vh, uint16_t vh_data)
00230 {
00231         int32_t rv;
00232 
00233         MUTEX_LOCKIN();
00234         rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
00235         MUTEX_UNLOCK();
00236 
00237         return rv;
00238 }
00239 
00240 int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf)
00241 {
00242         struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl;
00243 
00244         int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0,
00245                              true, (vh_buf[0] << 8) | vh_buf[1]);
00246 
00247         if((rv > 0) && (0x00 != vh_buf[1]))
00248                 do_net_close_tx(cl_ctx, true);
00249 
00250         return rv;
00251 }
00252 
00253 static
00254 int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
00255 {
00256         int32_t rv = 0;
00257         uint8_t *buf = MQP_FHEADER_BUF(mqp);
00258 
00259         if(dup)
00260                 *buf |= DUP_FLAG_VAL(true);
00261 
00262         rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp));
00263 
00264         *buf &= ~DUP_FLAG_VAL(true);
00265 
00266         return rv;
00267 }
00268 
00269 int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
00270 {
00271         return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
00272 }
00273 
00274 int32_t 
00275 mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
00276 {
00277         int32_t rv;
00278 
00279         MUTEX_LOCKIN();
00280         rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
00281         MUTEX_UNLOCK();
00282 
00283         return rv;
00284 }
00285 
00286 #define MQP_MAX_TOPICS      16
00287 #define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2)
00288 
00289 static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs,
00290                         uint32_t pay_len, uint16_t msg_id)
00291 {
00292         uint8_t *ref = buf += MAX_FH_LEN;
00293 
00294         if(MQP_SUBACK_PAY_OFS != pay_ofs)
00295                 return MQP_ERR_PKT_LEN;
00296 
00297         buf += buf_wr_nbo_2B(buf, msg_id);
00298         ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES, 
00299                                       pay_len + buf - ref);
00300 
00301         ref -= 1;
00302         *ref = MAKE_FH_BYTE1(MQTT_SUBACK, 
00303                              MAKE_FH_FLAGS(false, MQTT_QOS0, false));
00304 
00305         return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref);
00306 }
00307 
00308 static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id)
00309 {
00310         return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id);
00311 }
00312 
00313 /*----------------------------------------------------------------------------
00314  * Receive Routines
00315  *----------------------------------------------------------------------------
00316  */
00317 
00318 /* Candidate to be moved to mqtt_common.c file */
00319 static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
00320 {
00321         uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
00322 
00323         if(mqp_raw->pl_len < 2)
00324                 return false;    /* Bytes for MSG ID not available */
00325 
00326         buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
00327         mqp_raw->vh_len += 2;
00328         mqp_raw->pl_len -= 2;
00329 
00330         return true;
00331 }
00332 
00333 #define BRK_IF_RD_ERR_UTF8(buf, end, utf8)      \
00334         if(rd_buf_utf8(buf, end, utf8) < 0)     \
00335                 break;
00336 
00337 static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos)
00338 {
00339         struct utf8_string utf8;
00340         uint8_t *ref = buf;
00341 
00342         buf += mqp_buf_rd_utf8(buf, end, &utf8);
00343 
00344         /* Assess that UTF8 has been read and QOS can be read */
00345         if((buf > ref) && (end > buf)) {
00346                 utf8_qos->buffer = utf8.buffer;
00347                 utf8_qos->length = utf8.length;
00348                 utf8_qos->qosreq = (enum mqtt_qos)*buf++;
00349 
00350                 return buf - ref;
00351         }
00352 
00353         return -1;
00354 }
00355 
00356 static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw,
00357                              struct utf8_strqos *qos_topics, uint32_t *n_topics)
00358 {
00359         uint8_t *buf, *end;
00360         uint32_t i = 0;
00361 
00362         if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
00363                 return false;   /* Problem in contents received from client */
00364 
00365         buf = MQP_PAYLOAD_BUF(mqp_raw);
00366         end = buf + mqp_raw->pl_len;
00367 
00368         for(i = 0; (i < *n_topics) && (buf < end); i++) {
00369                 struct utf8_strqos *qos_top = qos_topics + i;
00370                 int32_t len = buf_rd_utf8_qos(buf, end, qos_top);
00371                 if(len < 0)
00372                         break; /* Failed to read Topic */
00373 
00374                 buf += len;
00375         }
00376 
00377         *n_topics = i;
00378 
00379         return ((0 == i) || (buf != end))? false : true;
00380 }
00381 
00382 static
00383 bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
00384 {
00385         uint32_t n_topics = MQP_MAX_TOPICS;
00386         uint16_t msg_id;
00387 
00388         struct utf8_strqos qos_topics[MQP_MAX_TOPICS];
00389         uint8_t ack[MQP_MAX_TOPICS  +  MQP_SUBACK_PAY_OFS];
00390 
00391         if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics))
00392                 return false;
00393 
00394         msg_id = mqp_raw->msg_id;
00395 
00396         /* All topics have been now put in array, pass-on info to upper layer */
00397         if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics,
00398                                msg_id, ack + MQP_SUBACK_PAY_OFS)) {
00399 
00400                 sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id);
00401 
00402                 return true;
00403         }
00404 
00405         return false;
00406 }
00407 
00408 
00409 static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw,
00410                                struct utf8_string *topics, uint32_t *n_topics)
00411 {
00412         uint8_t *buf, *end;
00413         uint32_t i = 0;
00414         
00415         if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
00416                 return false;   /* Problem in contents received from client */
00417 
00418         buf = MQP_PAYLOAD_BUF(mqp_raw);
00419         end = buf + mqp_raw->pl_len;
00420 
00421         for(i = 0; (i < *n_topics) && (buf < end); i++) {
00422                 struct utf8_string *topic = topics + i;
00423                 int32_t len = mqp_buf_rd_utf8(buf, end, topic);
00424                 if(len < 0)
00425                         break; /* Failed to read Topic */
00426                 
00427                 buf += len;
00428         }
00429 
00430         *n_topics = i;
00431 
00432         return ((0 == i) || (buf != end))? false : true;
00433 }
00434 
00435 static
00436 bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
00437 {
00438         uint32_t n_topics = MQP_MAX_TOPICS;
00439         uint16_t msg_id;
00440 
00441         struct utf8_string topics[MQP_MAX_TOPICS];
00442 
00443         if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics))
00444                 return false;
00445 
00446         msg_id = mqp_raw->msg_id;
00447 
00448         /* All topics have been now put in array, pass-on info to upper layer */
00449         if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) {
00450                 unsub_ack_send(cl_ctx, msg_id);
00451                 return true;
00452         }
00453 
00454         return false;
00455 }
00456 
00457 static bool proc_pingreq_rx(struct client_ctx *cl_ctx)
00458 {
00459         vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00);
00460         return true;
00461 }
00462 
00463 static bool proc_disconn_rx(struct client_ctx *cl_ctx)
00464 {
00465         do_net_close_rx(cl_ctx, false);
00466         return true;
00467 }
00468 
00469 static
00470 bool proc_pub_msg_rx(struct client_ctx  *cl_ctx, struct mqtt_packet *mqp_raw)
00471 {
00472         bool rv = mqp_proc_pub_rx(mqp_raw);
00473         uint8_t B = mqp_raw->fh_byte1;
00474         enum mqtt_qos qos = ENUM_QOS(B);
00475         struct utf8_string topic;
00476         uint16_t msg_id = 0;
00477 
00478         if(false == rv)
00479                 return rv; /* Didn't get a good PUB Packet */
00480 
00481         msg_id = mqp_raw->msg_id;
00482 
00483         topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw);
00484         topic.length =      MQP_PUB_TOP_LEN(mqp_raw);
00485 
00486         rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic, 
00487                                  MQP_PUB_PAY_BUF(mqp_raw),
00488                                  MQP_PUB_PAY_LEN(mqp_raw),
00489                                  msg_id, BOOL_DUP(B), qos, 
00490                                  BOOL_RETAIN(B));
00491         if(false == rv)
00492                 return rv;
00493 
00494         if(MQTT_QOS1 == qos)
00495                 vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
00496 
00497         if(MQTT_QOS2 == qos)
00498                 vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
00499 
00500         return rv;
00501 }
00502 
00503 static 
00504 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
00505 {
00506         if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) ||
00507            (false == usr_cbs->ack_notify(cl_ctx->usr,
00508                                          mqp_raw->msg_type,
00509                                          mqp_raw->msg_id)))
00510                 return false;
00511 
00512         return true;
00513 }
00514 
00515 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD
00516 //#define KA_TIMEOUT_NONE   (0xFFFFFFFF)
00517 
00518 static void rx_timeout_update(struct client_ctx *cl_ctx)
00519 {
00520         if(false == had_rcvd_conn_msg(cl_ctx))
00521                 return;
00522 
00523         cl_ctx_timeout_update(cl_ctx, net_ops->time());
00524 
00525         used_ctxs_remove(cl_ctx);
00526         used_ctxs_insert(cl_ctx);
00527 
00528         return;
00529 }
00530 
00531 static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver)
00532 {
00533         const char *buf = utf8->buffer;
00534 
00535         /* Check for protocol version 3.1.1 */
00536         if((4 == utf8->length)  &&
00537            (buf[0] == 'M')      &&
00538            (buf[1] == 'Q')      &&
00539            (buf[2] == 'T')      &&
00540            (buf[3] == 'T')      &&
00541            (0x04 == ver))
00542                 return true;
00543 
00544         /* Check for protocol version 3.1   */
00545         if((6 == utf8->length)  &&
00546            (buf[0] == 'M')      &&
00547            (buf[1] == 'Q')      &&
00548            (buf[2] == 'I')      &&
00549            (buf[3] == 's')      &&
00550            (buf[4] == 'd')      &&
00551            (buf[5] == 'p')      &&
00552            (0x03 == ver))
00553                 return true;
00554 
00555         return false;
00556 }
00557 
00558 static uint16_t proc_connect_vh_rx(struct mqtt_packet  *mqp_raw,
00559                               uint8_t *conn_flags, uint16_t *ka_secs)
00560 {
00561         struct utf8_string utf8;
00562         uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw);
00563         uint8_t *end = buf + mqp_raw->pl_len;
00564         uint8_t *ref = buf;
00565 
00566         buf += mqp_buf_rd_utf8(buf, end, &utf8);
00567         if(end - buf < 1)
00568                 return CONNACK_RC_BAD_PROTOV; /* No proto ver */
00569 
00570         if(false == proc_protocol_info(&utf8, *buf++))
00571                 return CONNACK_RC_BAD_PROTOV;
00572 
00573         *conn_flags = *buf++;
00574 
00575         if(end - buf < 2)
00576                 return 0xFF;        /* Bad packet composition */
00577 
00578         *ka_secs  = (buf[0] << 8) | buf[1];
00579         buf      += 2;
00580         *ka_secs += *ka_secs >> 1;
00581 
00582         mqp_raw->vh_len  = buf - ref;
00583         mqp_raw->pl_len -= buf - ref;
00584 
00585         return 0;
00586 }
00587 
00588 #define RET_IF_RD_CONN_ERROR(buf, end, utf8)                   \
00589         {                                                      \
00590                 int32_t len = mqp_buf_rd_utf8(buf, end, utf8);     \
00591                 if(len < 0)                                    \
00592                         return 0x00FF;                         \
00593                                                                \
00594                 buf += len;                                    \
00595         }
00596 
00597 uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags,
00598                        struct utf8_string *free_utf8s,
00599                        struct utf8_string **used_refs)
00600 {
00601         struct utf8_string *utf8;
00602 
00603         utf8 = used_refs[0] = free_utf8s + 0;
00604         RET_IF_RD_CONN_ERROR(buf, end, utf8);
00605 
00606         if(conn_flags & WILL_CONFIG_VAL) {
00607                 utf8 = used_refs[1] = free_utf8s + 1;
00608                 RET_IF_RD_CONN_ERROR(buf, end, utf8);
00609 
00610                 utf8 = used_refs[2] = free_utf8s + 2;
00611                 RET_IF_RD_CONN_ERROR(buf, end, utf8);
00612         }
00613 
00614         if((0 == (conn_flags & USER_NAME_OPVAL))  &&
00615            (0 != (conn_flags & PASS_WORD_OPVAL)))
00616                 return 0x00FF;  /* Bad combination */
00617 
00618         if(conn_flags & USER_NAME_OPVAL) {
00619                 utf8 = used_refs[3] = free_utf8s + 3;
00620                 RET_IF_RD_CONN_ERROR(buf, end, utf8);
00621         }
00622 
00623         if(conn_flags & PASS_WORD_OPVAL) {
00624                 utf8 = used_refs[4] = free_utf8s + 4;
00625                 RET_IF_RD_CONN_ERROR(buf, end, utf8);
00626         }
00627 
00628         return 0;
00629 }
00630 
00631 static
00632 bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
00633 {
00634         struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL};
00635         struct utf8_string free_utf8s[5];
00636         uint8_t conn_flags, *buf, *end;
00637         bool clean_session = false;
00638         uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */
00639 
00640         set_rcvd_conn_msg(cl_ctx);
00641 
00642         ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs);
00643         if(ack_vh16) 
00644                 goto proc_connect_rx_exit1;
00645 
00646         buf = MQP_PAYLOAD_BUF(mqp_raw);
00647         end = buf + mqp_raw->pl_len;
00648         ack_vh16 = proc_connect_pl_rx(buf,  end, conn_flags,
00649                                       free_utf8s, used_refs);
00650         if(ack_vh16)
00651                 goto proc_connect_rx_exit1;
00652 
00653         clean_session = (conn_flags & CLEAN_START_VAL)? true : false;
00654         ack_vh16 = (!used_refs[0]->length && !clean_session)?
00655                    CONNACK_RC_CLI_REJECT : 0;   /* Validate 0 byte Client ID */
00656         if(ack_vh16)
00657                 goto proc_connect_rx_exit1;
00658 
00659         /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */
00660         ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0],
00661                                        &cl_ctx->usr);
00662  proc_connect_rx_exit1:
00663 
00664         DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16,
00665                  ack_vh16 & 0xFF? "error" : "good");
00666 
00667         if(0xFF != (ack_vh16 & 0xFF))
00668                 vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16);
00669 
00670         if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) {
00671                 rx_timeout_update(cl_ctx);
00672                 usr_cbs->on_connack_send(cl_ctx->usr, clean_session);
00673         } else {
00674                 return false;
00675         }
00676 
00677         return true;
00678 }
00679 
00680 
00681 static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list)
00682 {
00683         int32_t i = 0;
00684 
00685         for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
00686                 recv_hvec[i] = list->net;
00687 
00688         recv_hvec[i] = -1;
00689 
00690         return;
00691 }
00692 
00693 static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
00694 {
00695         uint8_t msg_type = mqp_raw->msg_type;
00696         bool rv = false;
00697 
00698         USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r",
00699                  mqp_raw->fh_byte1, cl_ctx->net);
00700 
00701         if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx))
00702                 goto process_recv_exit1; /* Proto Violation */
00703 
00704         rx_timeout_update(cl_ctx);
00705 
00706         switch(msg_type) {
00707 
00708         case MQTT_CONNECT:
00709                 rv = proc_connect_rx(cl_ctx, mqp_raw);
00710                 break;
00711 
00712         case MQTT_DISCONNECT:
00713                 rv = proc_disconn_rx(cl_ctx);
00714                 break;
00715 
00716         case MQTT_SUBSCRIBE:
00717                 rv = proc_sub_msg_rx(cl_ctx, mqp_raw);
00718                 break;
00719 
00720         case MQTT_UNSUBSCRIBE:
00721                 rv = proc_unsub_msg_rx(cl_ctx, mqp_raw);
00722                 break;
00723 
00724         case MQTT_PINGREQ:
00725                 rv = proc_pingreq_rx(cl_ctx);
00726                 break;
00727 
00728         case MQTT_PUBLISH:
00729                 rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
00730                 break;
00731 
00732         case MQTT_PUBACK:
00733         case MQTT_PUBREC:
00734         case MQTT_PUBREL:
00735         case MQTT_PUBCOMP:
00736                 rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
00737                 break;
00738 
00739         default:
00740                 break;
00741         }
00742 
00743  process_recv_exit1:
00744         DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r",
00745                  mqp_raw->msg_id, rv? "Good" : "Fail");
00746 
00747         return rv;
00748 }
00749 
00750 
00751 /* Terminate net connections which haven't received PKTs beyond expected time.
00752    Caller must ensure atomic enviroment for execution of this routine.
00753 */
00754 static void ka_sequence(uint32_t *secs2wait)
00755 {
00756         struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */
00757         uint32_t now_secs = net_ops->time();
00758 
00759         while(NULL != cl_ctx) {
00760                 struct client_ctx *next = cl_ctx->next;
00761                 if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) {
00762                         bool due2err = false;
00763                         if(cl_ctx->flags & NW_CONN_ERROR_FLAG)
00764                                 due2err = true;
00765 
00766                         cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG |
00767                                            NETWORK_CLOSE_FLAG);
00768 
00769                         /* Close network: Timeout or TX err */
00770                         do_net_close_rx(cl_ctx, due2err);
00771                 }
00772 
00773                 cl_ctx = next;
00774         }
00775 
00776         cl_ctx = used_ctxs;
00777         if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) ||
00778            ((NULL == cl_ctx)))
00779                 *secs2wait = IO_MON_NO_TIMEOUT;
00780         else
00781                 *secs2wait = cl_ctx->timeout - now_secs;
00782 
00783         return;
00784 }
00785 
00786 /* Put a new functiona name such as mk_new_ctx() or setup_ctx() and
00787    processing to restrict limit number of connections.
00788 
00789    Return value as well.
00790 */
00791 static bool accept_ctx(int32_t net, uint32_t wait_secs)
00792 {
00793         struct client_ctx *cl_ctx = cl_ctx_alloc();
00794         if(NULL == cl_ctx)
00795                 return false;
00796 
00797         cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip,
00798                                       &cl_ctx->ip_length);
00799         if(-1 == cl_ctx->net) {
00800                 cl_ctx_free(cl_ctx);
00801                 USR_INFO("S: failed to accept new NW connection\n\r");
00802                 return false;
00803         }
00804 
00805         DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net);
00806 
00807         /* Timeout to receive MQTT_CONNECT */
00808         cl_ctx->timeout = wait_secs + net_ops->time(); 
00809 
00810         used_ctxs_insert(cl_ctx);
00811         return true;
00812 }
00813 
00814 static struct client_ctx *net_cl_ctx_find(int32_t net)
00815 {
00816         struct client_ctx *cl_ctx = used_ctxs;
00817         while(cl_ctx) {
00818                 if(net == cl_ctx->net)
00819                         break;
00820 
00821                 cl_ctx = cl_ctx->next;
00822         }
00823 
00824         if(NULL == cl_ctx)
00825                 USR_INFO("Did not find ctx for net %d\n\r", net); 
00826 
00827         return cl_ctx;
00828 }
00829 
00830 static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */
00831 static int32_t send_hvec  = -1;
00832 static int32_t rsvd_hvec  = -1;
00833 static int32_t listen_net = -1;
00834 
00835 static struct mqtt_packet rx_mqp;
00836 static uint8_t rxb[MQP_SERVER_RX_LEN];
00837 static uint16_t listener_port = 0;
00838 
00839 static inline
00840 int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out)
00841 {
00842         int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL);
00843         if(rv <= 0)
00844                 rv = MQP_ERR_NETWORK;
00845 
00846         return rv;
00847 }
00848 
00849 static int32_t proc_loopback_recv(int32_t net)
00850 {
00851         uint8_t buf[LOOP_DLEN];
00852 
00853         /* Thanks for waking-up thread and do nothing in this routine */
00854         int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
00855         pending_trigs = false;
00856 
00857         if(rv <= 0) {
00858                 net_ops->close(net);
00859                 return MQP_ERR_LIBQUIT;
00860         }
00861 
00862         return rv;
00863 }
00864 
00865 static void proc_net_data_recv(int32_t net)
00866 {
00867         struct client_ctx *cl_ctx = net_cl_ctx_find(net);
00868         bool dummy;
00869         int32_t rv;
00870 
00871         mqp_reset(&rx_mqp); /* Start w/ a clean buffer */
00872 
00873         rv = net_recv(net, &rx_mqp, 0, &dummy);
00874         if(rv > 0)
00875                 /* Working Principle: Only RX processing errors should be
00876                    reported as 'false'. Status of TX as a follow-up to RX
00877                    messages need not be reported by the xyz_rx() routines.
00878                    Error observed in TX is either dealt in next iteration
00879                    of RX loop.
00880                 */
00881                 if(false == process_recv(cl_ctx, &rx_mqp))
00882                         rv = MQP_ERR_CONTENT;
00883 
00884         if(rv < 0)
00885                 do_net_close_rx(cl_ctx, rv);
00886 }
00887 
00888 static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs)
00889 {
00890         bool rv = true;
00891         int32_t idx = 0;
00892 
00893         MUTEX_LOCKIN();
00894 
00895         for(idx = 0; (idx < n_hnds) && (rv == true); idx++) {
00896                 int32_t net = recv_hvec[idx];
00897                 if(net == listen_net) {
00898                         rv = accept_ctx(listen_net, wait_secs);
00899                 } else if(loopback_port && (net == loopb_net)) {
00900                         if(proc_loopback_recv(loopb_net) < 0)
00901                                 rv = false;
00902                 } else {
00903                         proc_net_data_recv(net);
00904                 }
00905         }
00906 
00907         MUTEX_UNLOCK();
00908 
00909         return rv;
00910 }
00911 
00912 int32_t mqtt_server_run(uint32_t wait_secs)   // TBD break into two functions
00913 {
00914         uint32_t secs2wait = 0;
00915         int32_t n_hnds = 0;
00916 
00917         USR_INFO("S: MQTT Server Run invoked ....\n\r");
00918 
00919         if(NULL == net_ops)
00920                 return MQP_ERR_NET_OPS;
00921 
00922         if(loopback_port) {
00923                 loopb_net  = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, 
00924                                            loopback_port, NULL);
00925                 if(-1 == loopb_net)
00926                         return MQP_ERR_LIBQUIT;
00927         }
00928 
00929         listen_net = net_ops->listen(0, listener_port, NULL);
00930         if(-1 == listen_net)
00931                 return MQP_ERR_LIBQUIT;
00932 
00933         do {
00934                 int32_t *r_hvec = recv_hvec + 0;
00935 
00936                 *r_hvec++ = listen_net;
00937                 if(loopback_port)
00938                         *r_hvec++ = loopb_net;
00939 
00940                 /* MQTT Timeouts: close expired conns; get time to next expiry */
00941                 ka_sequence(&secs2wait); 
00942 
00943                 /* Prepare array of net handles. Must've atleast listen handle */
00944                 //                recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1,  used_ctxs);
00945                 recv_hvec_load(r_hvec, MAX_NWCONN + 1,  used_ctxs);
00946 
00947                 n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
00948                                          &rsvd_hvec, secs2wait);
00949                 if(n_hnds < 0)
00950                         return MQP_ERR_LIBQUIT;
00951 
00952                 if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs))
00953                         return MQP_ERR_LIBQUIT;
00954 
00955         } while(1);
00956 }
00957 
00958 int32_t mqtt_server_register_net_svc(const struct device_net_services *net)
00959 {
00960         if(net && net->io_mon && net->close && net->send &&
00961            net->recv && net->time && net->listen) {
00962                 net_ops = net;
00963                 return 0;
00964         }
00965 
00966         return -1;
00967 }
00968 
00969 int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg,
00970                          const struct mqtt_server_msg_cbs *msg_cbs)
00971 {
00972         cl_ctx_init();
00973 
00974         if((NULL == lib_cfg)                ||
00975            (0    == lib_cfg->listener_port) ||
00976            (NULL == lib_cfg->debug_printf))
00977                 return -1;
00978 
00979         debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
00980 
00981         loopback_port = lib_cfg->loopback_port;
00982         listener_port = lib_cfg->listener_port;
00983 
00984         mutex         = lib_cfg->mutex;
00985         mutex_lockin  = lib_cfg->mutex_lockin;
00986         mutex_unlock  = lib_cfg->mutex_unlock;
00987 
00988         aux_dbg_enbl  = lib_cfg->aux_debug_en;
00989         debug_printf  = lib_cfg->debug_printf;
00990 
00991         usr_cbs = &usr_obj;
00992 
00993         memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs));
00994 
00995         mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0);
00996 
00997         return 0;
00998 }
00999 
01000 }//namespace mbed_mqtt