David Fletcher
/
cc3100_Test_mqtt_CM4F
TI's MQTT Demo with freertos CM4F
Embed:
(wiki syntax)
Show/hide line numbers
server_pkts.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 #include "server_pkts.h" 00017 00018 namespace mbed_mqtt { 00019 00020 /*----------------------------------------------------------------------------- 00021 * Note: Do not create additional dependency of this file on any header other 00022 * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the 00023 * mqtt_common.[hc] files must be facilitated to create a stand-alone library. 00024 *----------------------------------------------------------------------------- 00025 */ 00026 00027 static void *mutex = NULL; 00028 static void (*mutex_lockin)(void*) = NULL; 00029 static void (*mutex_unlock)(void*) = NULL; 00030 00031 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); 00032 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); 00033 00034 static bool aux_dbg_enbl = false; 00035 static int32_t (*debug_printf)(const char *fmt, ...) = NULL; 00036 00037 #define USR_INFO debug_printf 00038 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) 00039 00040 static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end, 00041 struct utf8_string *utf8) 00042 { 00043 const uint8_t *ref = buf; /* Reference */ 00044 uint16_t len = 0; /* UTF8 Size */ 00045 00046 if(end - buf < 2) 00047 return -1; /* No valid buffer to hold UTF8 size */ 00048 00049 buf += buf_rd_nbo_2B(buf, &len); 00050 if(end - buf < len) 00051 return -1; /* No valid buffer to hold UTF8 name */ 00052 00053 utf8->length = len; 00054 utf8->buffer = len? (char*)buf : NULL; 00055 00056 return buf + len - ref; 00057 } 00058 00059 static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL; 00060 static const struct device_net_services *net_ops = NULL; 00061 00062 #ifndef CFG_SR_MQTT_CTXS 00063 #define MAX_NWCONN 6 00064 #else 00065 #define MAX_NWCONN CFG_SR_MQTT_CTXS 00066 #endif 00067 00068 static struct client_ctx contexts[MAX_NWCONN]; 00069 00070 static struct client_ctx *used_ctxs = NULL; 00071 static struct client_ctx *free_ctxs = NULL; 00072 00073 00074 #define NETWORK_CLOSE_FLAG 0x00200000 00075 #define NW_CONN_ERROR_FLAG 0x00400000 00076 #define RCVD_CONN_MSG_FLAG 0x00800000 00077 00078 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) 00079 00080 static void cl_ctx_init(void) 00081 { 00082 int32_t i = 0; 00083 for(i = 0; i < MAX_NWCONN; i++) { 00084 struct client_ctx *cl_ctx = contexts + i; 00085 00086 cl_ctx_reset(cl_ctx); 00087 00088 cl_ctx->next = free_ctxs; 00089 free_ctxs = cl_ctx; 00090 } 00091 } 00092 00093 static void cl_ctx_free(struct client_ctx *cl_ctx) 00094 { 00095 cl_ctx_reset(cl_ctx); 00096 00097 cl_ctx->next = free_ctxs; 00098 free_ctxs = cl_ctx; 00099 00100 return; 00101 } 00102 00103 static struct client_ctx *cl_ctx_alloc(void) 00104 { 00105 struct client_ctx *cl_ctx = free_ctxs; 00106 if(cl_ctx) { 00107 free_ctxs = cl_ctx->next; 00108 cl_ctx->next = NULL; 00109 } else 00110 USR_INFO("S: fatal, no free cl_ctx\n\r"); 00111 00112 return cl_ctx; 00113 } 00114 00115 static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx) 00116 { 00117 return (cl_ctx->flags & RCVD_CONN_MSG_FLAG); 00118 } 00119 00120 static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx) 00121 { 00122 cl_ctx->flags |= RCVD_CONN_MSG_FLAG; 00123 } 00124 00125 static void used_ctxs_insert(struct client_ctx *cl_ctx) 00126 { 00127 cl_ctx_timeout_insert(&used_ctxs, cl_ctx); 00128 } 00129 00130 static void used_ctxs_remove(struct client_ctx *cl_ctx) 00131 { 00132 cl_ctx_remove(&used_ctxs, cl_ctx); 00133 } 00134 00135 static int32_t loopb_net = -1; 00136 static const uint8_t LOOP_DATA[] = {0x00, 0x01}; 00137 #define LOOP_DLEN sizeof(LOOP_DATA) 00138 static uint16_t loopback_port = 0; 00139 static bool pending_trigs = false; 00140 00141 static int32_t loopb_trigger(void) 00142 { 00143 uint8_t ip_addr[] = {127,0,0,1}; 00144 int32_t rv = 0; 00145 00146 if((-1 != loopb_net) && (false == pending_trigs)) { 00147 rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, 00148 loopback_port, ip_addr, 4); 00149 if(0 == rv) 00150 pending_trigs = true; 00151 } 00152 00153 return rv; 00154 } 00155 00156 static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err) 00157 { 00158 DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net); 00159 00160 net_ops->close(cl_ctx->net); 00161 cl_ctx->net = -1; 00162 00163 if(cl_ctx->usr) 00164 usr_cbs->on_cl_net_close(cl_ctx->usr, due2err); 00165 00166 used_ctxs_remove(cl_ctx); 00167 cl_ctx_free(cl_ctx); 00168 } 00169 00170 static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err) 00171 { 00172 if(due2err) 00173 cl_ctx->flags |= NW_CONN_ERROR_FLAG; 00174 00175 cl_ctx->flags |= NETWORK_CLOSE_FLAG; 00176 00177 loopb_trigger(); 00178 } 00179 00180 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len) 00181 { 00182 int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL); 00183 if(rv <= 0) { 00184 do_net_close_tx(cl_ctx, true); 00185 rv = MQP_ERR_NETWORK; 00186 } 00187 00188 USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r", 00189 *buf, len, cl_ctx->net, rv? "Sent" : "Fail"); 00190 return rv; 00191 } 00192 00193 static 00194 int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos, 00195 bool has_vh, uint16_t vh_data) 00196 { 00197 uint8_t buf[4]; 00198 uint32_t len = 2; 00199 00200 if(false == had_rcvd_conn_msg(cl_ctx)) 00201 return MQP_ERR_NOTCONN; 00202 00203 buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); 00204 buf[1] = has_vh ? 2 : 0; 00205 00206 if(has_vh) 00207 len += buf_wr_nbo_2B(buf + 2, vh_data); 00208 00209 return cl_ctx_send(cl_ctx, buf, len); 00210 } 00211 00212 static 00213 int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, 00214 uint16_t vh_data) 00215 { 00216 struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl; 00217 00218 return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos, 00219 has_vh, vh_data) : -1; 00220 } 00221 00222 int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, 00223 uint16_t vh_data) 00224 { 00225 return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); 00226 } 00227 00228 int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, 00229 bool has_vh, uint16_t vh_data) 00230 { 00231 int32_t rv; 00232 00233 MUTEX_LOCKIN(); 00234 rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); 00235 MUTEX_UNLOCK(); 00236 00237 return rv; 00238 } 00239 00240 int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf) 00241 { 00242 struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl; 00243 00244 int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, 00245 true, (vh_buf[0] << 8) | vh_buf[1]); 00246 00247 if((rv > 0) && (0x00 != vh_buf[1])) 00248 do_net_close_tx(cl_ctx, true); 00249 00250 return rv; 00251 } 00252 00253 static 00254 int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) 00255 { 00256 int32_t rv = 0; 00257 uint8_t *buf = MQP_FHEADER_BUF(mqp); 00258 00259 if(dup) 00260 *buf |= DUP_FLAG_VAL(true); 00261 00262 rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp)); 00263 00264 *buf &= ~DUP_FLAG_VAL(true); 00265 00266 return rv; 00267 } 00268 00269 int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) 00270 { 00271 return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); 00272 } 00273 00274 int32_t 00275 mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup) 00276 { 00277 int32_t rv; 00278 00279 MUTEX_LOCKIN(); 00280 rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); 00281 MUTEX_UNLOCK(); 00282 00283 return rv; 00284 } 00285 00286 #define MQP_MAX_TOPICS 16 00287 #define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2) 00288 00289 static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs, 00290 uint32_t pay_len, uint16_t msg_id) 00291 { 00292 uint8_t *ref = buf += MAX_FH_LEN; 00293 00294 if(MQP_SUBACK_PAY_OFS != pay_ofs) 00295 return MQP_ERR_PKT_LEN; 00296 00297 buf += buf_wr_nbo_2B(buf, msg_id); 00298 ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES, 00299 pay_len + buf - ref); 00300 00301 ref -= 1; 00302 *ref = MAKE_FH_BYTE1(MQTT_SUBACK, 00303 MAKE_FH_FLAGS(false, MQTT_QOS0, false)); 00304 00305 return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref); 00306 } 00307 00308 static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id) 00309 { 00310 return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id); 00311 } 00312 00313 /*---------------------------------------------------------------------------- 00314 * Receive Routines 00315 *---------------------------------------------------------------------------- 00316 */ 00317 00318 /* Candidate to be moved to mqtt_common.c file */ 00319 static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) 00320 { 00321 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); 00322 00323 if(mqp_raw->pl_len < 2) 00324 return false; /* Bytes for MSG ID not available */ 00325 00326 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); 00327 mqp_raw->vh_len += 2; 00328 mqp_raw->pl_len -= 2; 00329 00330 return true; 00331 } 00332 00333 #define BRK_IF_RD_ERR_UTF8(buf, end, utf8) \ 00334 if(rd_buf_utf8(buf, end, utf8) < 0) \ 00335 break; 00336 00337 static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos) 00338 { 00339 struct utf8_string utf8; 00340 uint8_t *ref = buf; 00341 00342 buf += mqp_buf_rd_utf8(buf, end, &utf8); 00343 00344 /* Assess that UTF8 has been read and QOS can be read */ 00345 if((buf > ref) && (end > buf)) { 00346 utf8_qos->buffer = utf8.buffer; 00347 utf8_qos->length = utf8.length; 00348 utf8_qos->qosreq = (enum mqtt_qos)*buf++; 00349 00350 return buf - ref; 00351 } 00352 00353 return -1; 00354 } 00355 00356 static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw, 00357 struct utf8_strqos *qos_topics, uint32_t *n_topics) 00358 { 00359 uint8_t *buf, *end; 00360 uint32_t i = 0; 00361 00362 if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) 00363 return false; /* Problem in contents received from client */ 00364 00365 buf = MQP_PAYLOAD_BUF(mqp_raw); 00366 end = buf + mqp_raw->pl_len; 00367 00368 for(i = 0; (i < *n_topics) && (buf < end); i++) { 00369 struct utf8_strqos *qos_top = qos_topics + i; 00370 int32_t len = buf_rd_utf8_qos(buf, end, qos_top); 00371 if(len < 0) 00372 break; /* Failed to read Topic */ 00373 00374 buf += len; 00375 } 00376 00377 *n_topics = i; 00378 00379 return ((0 == i) || (buf != end))? false : true; 00380 } 00381 00382 static 00383 bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00384 { 00385 uint32_t n_topics = MQP_MAX_TOPICS; 00386 uint16_t msg_id; 00387 00388 struct utf8_strqos qos_topics[MQP_MAX_TOPICS]; 00389 uint8_t ack[MQP_MAX_TOPICS + MQP_SUBACK_PAY_OFS]; 00390 00391 if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics)) 00392 return false; 00393 00394 msg_id = mqp_raw->msg_id; 00395 00396 /* All topics have been now put in array, pass-on info to upper layer */ 00397 if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics, 00398 msg_id, ack + MQP_SUBACK_PAY_OFS)) { 00399 00400 sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id); 00401 00402 return true; 00403 } 00404 00405 return false; 00406 } 00407 00408 00409 static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw, 00410 struct utf8_string *topics, uint32_t *n_topics) 00411 { 00412 uint8_t *buf, *end; 00413 uint32_t i = 0; 00414 00415 if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) 00416 return false; /* Problem in contents received from client */ 00417 00418 buf = MQP_PAYLOAD_BUF(mqp_raw); 00419 end = buf + mqp_raw->pl_len; 00420 00421 for(i = 0; (i < *n_topics) && (buf < end); i++) { 00422 struct utf8_string *topic = topics + i; 00423 int32_t len = mqp_buf_rd_utf8(buf, end, topic); 00424 if(len < 0) 00425 break; /* Failed to read Topic */ 00426 00427 buf += len; 00428 } 00429 00430 *n_topics = i; 00431 00432 return ((0 == i) || (buf != end))? false : true; 00433 } 00434 00435 static 00436 bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00437 { 00438 uint32_t n_topics = MQP_MAX_TOPICS; 00439 uint16_t msg_id; 00440 00441 struct utf8_string topics[MQP_MAX_TOPICS]; 00442 00443 if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics)) 00444 return false; 00445 00446 msg_id = mqp_raw->msg_id; 00447 00448 /* All topics have been now put in array, pass-on info to upper layer */ 00449 if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) { 00450 unsub_ack_send(cl_ctx, msg_id); 00451 return true; 00452 } 00453 00454 return false; 00455 } 00456 00457 static bool proc_pingreq_rx(struct client_ctx *cl_ctx) 00458 { 00459 vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00); 00460 return true; 00461 } 00462 00463 static bool proc_disconn_rx(struct client_ctx *cl_ctx) 00464 { 00465 do_net_close_rx(cl_ctx, false); 00466 return true; 00467 } 00468 00469 static 00470 bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00471 { 00472 bool rv = mqp_proc_pub_rx(mqp_raw); 00473 uint8_t B = mqp_raw->fh_byte1; 00474 enum mqtt_qos qos = ENUM_QOS(B); 00475 struct utf8_string topic; 00476 uint16_t msg_id = 0; 00477 00478 if(false == rv) 00479 return rv; /* Didn't get a good PUB Packet */ 00480 00481 msg_id = mqp_raw->msg_id; 00482 00483 topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw); 00484 topic.length = MQP_PUB_TOP_LEN(mqp_raw); 00485 00486 rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic, 00487 MQP_PUB_PAY_BUF(mqp_raw), 00488 MQP_PUB_PAY_LEN(mqp_raw), 00489 msg_id, BOOL_DUP(B), qos, 00490 BOOL_RETAIN(B)); 00491 if(false == rv) 00492 return rv; 00493 00494 if(MQTT_QOS1 == qos) 00495 vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); 00496 00497 if(MQTT_QOS2 == qos) 00498 vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); 00499 00500 return rv; 00501 } 00502 00503 static 00504 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00505 { 00506 if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) || 00507 (false == usr_cbs->ack_notify(cl_ctx->usr, 00508 mqp_raw->msg_type, 00509 mqp_raw->msg_id))) 00510 return false; 00511 00512 return true; 00513 } 00514 00515 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD 00516 //#define KA_TIMEOUT_NONE (0xFFFFFFFF) 00517 00518 static void rx_timeout_update(struct client_ctx *cl_ctx) 00519 { 00520 if(false == had_rcvd_conn_msg(cl_ctx)) 00521 return; 00522 00523 cl_ctx_timeout_update(cl_ctx, net_ops->time()); 00524 00525 used_ctxs_remove(cl_ctx); 00526 used_ctxs_insert(cl_ctx); 00527 00528 return; 00529 } 00530 00531 static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver) 00532 { 00533 const char *buf = utf8->buffer; 00534 00535 /* Check for protocol version 3.1.1 */ 00536 if((4 == utf8->length) && 00537 (buf[0] == 'M') && 00538 (buf[1] == 'Q') && 00539 (buf[2] == 'T') && 00540 (buf[3] == 'T') && 00541 (0x04 == ver)) 00542 return true; 00543 00544 /* Check for protocol version 3.1 */ 00545 if((6 == utf8->length) && 00546 (buf[0] == 'M') && 00547 (buf[1] == 'Q') && 00548 (buf[2] == 'I') && 00549 (buf[3] == 's') && 00550 (buf[4] == 'd') && 00551 (buf[5] == 'p') && 00552 (0x03 == ver)) 00553 return true; 00554 00555 return false; 00556 } 00557 00558 static uint16_t proc_connect_vh_rx(struct mqtt_packet *mqp_raw, 00559 uint8_t *conn_flags, uint16_t *ka_secs) 00560 { 00561 struct utf8_string utf8; 00562 uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw); 00563 uint8_t *end = buf + mqp_raw->pl_len; 00564 uint8_t *ref = buf; 00565 00566 buf += mqp_buf_rd_utf8(buf, end, &utf8); 00567 if(end - buf < 1) 00568 return CONNACK_RC_BAD_PROTOV; /* No proto ver */ 00569 00570 if(false == proc_protocol_info(&utf8, *buf++)) 00571 return CONNACK_RC_BAD_PROTOV; 00572 00573 *conn_flags = *buf++; 00574 00575 if(end - buf < 2) 00576 return 0xFF; /* Bad packet composition */ 00577 00578 *ka_secs = (buf[0] << 8) | buf[1]; 00579 buf += 2; 00580 *ka_secs += *ka_secs >> 1; 00581 00582 mqp_raw->vh_len = buf - ref; 00583 mqp_raw->pl_len -= buf - ref; 00584 00585 return 0; 00586 } 00587 00588 #define RET_IF_RD_CONN_ERROR(buf, end, utf8) \ 00589 { \ 00590 int32_t len = mqp_buf_rd_utf8(buf, end, utf8); \ 00591 if(len < 0) \ 00592 return 0x00FF; \ 00593 \ 00594 buf += len; \ 00595 } 00596 00597 uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags, 00598 struct utf8_string *free_utf8s, 00599 struct utf8_string **used_refs) 00600 { 00601 struct utf8_string *utf8; 00602 00603 utf8 = used_refs[0] = free_utf8s + 0; 00604 RET_IF_RD_CONN_ERROR(buf, end, utf8); 00605 00606 if(conn_flags & WILL_CONFIG_VAL) { 00607 utf8 = used_refs[1] = free_utf8s + 1; 00608 RET_IF_RD_CONN_ERROR(buf, end, utf8); 00609 00610 utf8 = used_refs[2] = free_utf8s + 2; 00611 RET_IF_RD_CONN_ERROR(buf, end, utf8); 00612 } 00613 00614 if((0 == (conn_flags & USER_NAME_OPVAL)) && 00615 (0 != (conn_flags & PASS_WORD_OPVAL))) 00616 return 0x00FF; /* Bad combination */ 00617 00618 if(conn_flags & USER_NAME_OPVAL) { 00619 utf8 = used_refs[3] = free_utf8s + 3; 00620 RET_IF_RD_CONN_ERROR(buf, end, utf8); 00621 } 00622 00623 if(conn_flags & PASS_WORD_OPVAL) { 00624 utf8 = used_refs[4] = free_utf8s + 4; 00625 RET_IF_RD_CONN_ERROR(buf, end, utf8); 00626 } 00627 00628 return 0; 00629 } 00630 00631 static 00632 bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00633 { 00634 struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL}; 00635 struct utf8_string free_utf8s[5]; 00636 uint8_t conn_flags, *buf, *end; 00637 bool clean_session = false; 00638 uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */ 00639 00640 set_rcvd_conn_msg(cl_ctx); 00641 00642 ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs); 00643 if(ack_vh16) 00644 goto proc_connect_rx_exit1; 00645 00646 buf = MQP_PAYLOAD_BUF(mqp_raw); 00647 end = buf + mqp_raw->pl_len; 00648 ack_vh16 = proc_connect_pl_rx(buf, end, conn_flags, 00649 free_utf8s, used_refs); 00650 if(ack_vh16) 00651 goto proc_connect_rx_exit1; 00652 00653 clean_session = (conn_flags & CLEAN_START_VAL)? true : false; 00654 ack_vh16 = (!used_refs[0]->length && !clean_session)? 00655 CONNACK_RC_CLI_REJECT : 0; /* Validate 0 byte Client ID */ 00656 if(ack_vh16) 00657 goto proc_connect_rx_exit1; 00658 00659 /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */ 00660 ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0], 00661 &cl_ctx->usr); 00662 proc_connect_rx_exit1: 00663 00664 DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16, 00665 ack_vh16 & 0xFF? "error" : "good"); 00666 00667 if(0xFF != (ack_vh16 & 0xFF)) 00668 vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16); 00669 00670 if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) { 00671 rx_timeout_update(cl_ctx); 00672 usr_cbs->on_connack_send(cl_ctx->usr, clean_session); 00673 } else { 00674 return false; 00675 } 00676 00677 return true; 00678 } 00679 00680 00681 static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list) 00682 { 00683 int32_t i = 0; 00684 00685 for(i = 0; (i < size) && (NULL != list); i++, list = list->next) 00686 recv_hvec[i] = list->net; 00687 00688 recv_hvec[i] = -1; 00689 00690 return; 00691 } 00692 00693 static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 00694 { 00695 uint8_t msg_type = mqp_raw->msg_type; 00696 bool rv = false; 00697 00698 USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r", 00699 mqp_raw->fh_byte1, cl_ctx->net); 00700 00701 if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx)) 00702 goto process_recv_exit1; /* Proto Violation */ 00703 00704 rx_timeout_update(cl_ctx); 00705 00706 switch(msg_type) { 00707 00708 case MQTT_CONNECT: 00709 rv = proc_connect_rx(cl_ctx, mqp_raw); 00710 break; 00711 00712 case MQTT_DISCONNECT: 00713 rv = proc_disconn_rx(cl_ctx); 00714 break; 00715 00716 case MQTT_SUBSCRIBE: 00717 rv = proc_sub_msg_rx(cl_ctx, mqp_raw); 00718 break; 00719 00720 case MQTT_UNSUBSCRIBE: 00721 rv = proc_unsub_msg_rx(cl_ctx, mqp_raw); 00722 break; 00723 00724 case MQTT_PINGREQ: 00725 rv = proc_pingreq_rx(cl_ctx); 00726 break; 00727 00728 case MQTT_PUBLISH: 00729 rv = proc_pub_msg_rx(cl_ctx, mqp_raw); 00730 break; 00731 00732 case MQTT_PUBACK: 00733 case MQTT_PUBREC: 00734 case MQTT_PUBREL: 00735 case MQTT_PUBCOMP: 00736 rv = proc_ack_msg_rx(cl_ctx, mqp_raw); 00737 break; 00738 00739 default: 00740 break; 00741 } 00742 00743 process_recv_exit1: 00744 DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r", 00745 mqp_raw->msg_id, rv? "Good" : "Fail"); 00746 00747 return rv; 00748 } 00749 00750 00751 /* Terminate net connections which haven't received PKTs beyond expected time. 00752 Caller must ensure atomic enviroment for execution of this routine. 00753 */ 00754 static void ka_sequence(uint32_t *secs2wait) 00755 { 00756 struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */ 00757 uint32_t now_secs = net_ops->time(); 00758 00759 while(NULL != cl_ctx) { 00760 struct client_ctx *next = cl_ctx->next; 00761 if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) { 00762 bool due2err = false; 00763 if(cl_ctx->flags & NW_CONN_ERROR_FLAG) 00764 due2err = true; 00765 00766 cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG | 00767 NETWORK_CLOSE_FLAG); 00768 00769 /* Close network: Timeout or TX err */ 00770 do_net_close_rx(cl_ctx, due2err); 00771 } 00772 00773 cl_ctx = next; 00774 } 00775 00776 cl_ctx = used_ctxs; 00777 if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) || 00778 ((NULL == cl_ctx))) 00779 *secs2wait = IO_MON_NO_TIMEOUT; 00780 else 00781 *secs2wait = cl_ctx->timeout - now_secs; 00782 00783 return; 00784 } 00785 00786 /* Put a new functiona name such as mk_new_ctx() or setup_ctx() and 00787 processing to restrict limit number of connections. 00788 00789 Return value as well. 00790 */ 00791 static bool accept_ctx(int32_t net, uint32_t wait_secs) 00792 { 00793 struct client_ctx *cl_ctx = cl_ctx_alloc(); 00794 if(NULL == cl_ctx) 00795 return false; 00796 00797 cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip, 00798 &cl_ctx->ip_length); 00799 if(-1 == cl_ctx->net) { 00800 cl_ctx_free(cl_ctx); 00801 USR_INFO("S: failed to accept new NW connection\n\r"); 00802 return false; 00803 } 00804 00805 DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net); 00806 00807 /* Timeout to receive MQTT_CONNECT */ 00808 cl_ctx->timeout = wait_secs + net_ops->time(); 00809 00810 used_ctxs_insert(cl_ctx); 00811 return true; 00812 } 00813 00814 static struct client_ctx *net_cl_ctx_find(int32_t net) 00815 { 00816 struct client_ctx *cl_ctx = used_ctxs; 00817 while(cl_ctx) { 00818 if(net == cl_ctx->net) 00819 break; 00820 00821 cl_ctx = cl_ctx->next; 00822 } 00823 00824 if(NULL == cl_ctx) 00825 USR_INFO("Did not find ctx for net %d\n\r", net); 00826 00827 return cl_ctx; 00828 } 00829 00830 static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */ 00831 static int32_t send_hvec = -1; 00832 static int32_t rsvd_hvec = -1; 00833 static int32_t listen_net = -1; 00834 00835 static struct mqtt_packet rx_mqp; 00836 static uint8_t rxb[MQP_SERVER_RX_LEN]; 00837 static uint16_t listener_port = 0; 00838 00839 static inline 00840 int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out) 00841 { 00842 int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL); 00843 if(rv <= 0) 00844 rv = MQP_ERR_NETWORK; 00845 00846 return rv; 00847 } 00848 00849 static int32_t proc_loopback_recv(int32_t net) 00850 { 00851 uint8_t buf[LOOP_DLEN]; 00852 00853 /* Thanks for waking-up thread and do nothing in this routine */ 00854 int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); 00855 pending_trigs = false; 00856 00857 if(rv <= 0) { 00858 net_ops->close(net); 00859 return MQP_ERR_LIBQUIT; 00860 } 00861 00862 return rv; 00863 } 00864 00865 static void proc_net_data_recv(int32_t net) 00866 { 00867 struct client_ctx *cl_ctx = net_cl_ctx_find(net); 00868 bool dummy; 00869 int32_t rv; 00870 00871 mqp_reset(&rx_mqp); /* Start w/ a clean buffer */ 00872 00873 rv = net_recv(net, &rx_mqp, 0, &dummy); 00874 if(rv > 0) 00875 /* Working Principle: Only RX processing errors should be 00876 reported as 'false'. Status of TX as a follow-up to RX 00877 messages need not be reported by the xyz_rx() routines. 00878 Error observed in TX is either dealt in next iteration 00879 of RX loop. 00880 */ 00881 if(false == process_recv(cl_ctx, &rx_mqp)) 00882 rv = MQP_ERR_CONTENT; 00883 00884 if(rv < 0) 00885 do_net_close_rx(cl_ctx, rv); 00886 } 00887 00888 static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs) 00889 { 00890 bool rv = true; 00891 int32_t idx = 0; 00892 00893 MUTEX_LOCKIN(); 00894 00895 for(idx = 0; (idx < n_hnds) && (rv == true); idx++) { 00896 int32_t net = recv_hvec[idx]; 00897 if(net == listen_net) { 00898 rv = accept_ctx(listen_net, wait_secs); 00899 } else if(loopback_port && (net == loopb_net)) { 00900 if(proc_loopback_recv(loopb_net) < 0) 00901 rv = false; 00902 } else { 00903 proc_net_data_recv(net); 00904 } 00905 } 00906 00907 MUTEX_UNLOCK(); 00908 00909 return rv; 00910 } 00911 00912 int32_t mqtt_server_run(uint32_t wait_secs) // TBD break into two functions 00913 { 00914 uint32_t secs2wait = 0; 00915 int32_t n_hnds = 0; 00916 00917 USR_INFO("S: MQTT Server Run invoked ....\n\r"); 00918 00919 if(NULL == net_ops) 00920 return MQP_ERR_NET_OPS; 00921 00922 if(loopback_port) { 00923 loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, 00924 loopback_port, NULL); 00925 if(-1 == loopb_net) 00926 return MQP_ERR_LIBQUIT; 00927 } 00928 00929 listen_net = net_ops->listen(0, listener_port, NULL); 00930 if(-1 == listen_net) 00931 return MQP_ERR_LIBQUIT; 00932 00933 do { 00934 int32_t *r_hvec = recv_hvec + 0; 00935 00936 *r_hvec++ = listen_net; 00937 if(loopback_port) 00938 *r_hvec++ = loopb_net; 00939 00940 /* MQTT Timeouts: close expired conns; get time to next expiry */ 00941 ka_sequence(&secs2wait); 00942 00943 /* Prepare array of net handles. Must've atleast listen handle */ 00944 // recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1, used_ctxs); 00945 recv_hvec_load(r_hvec, MAX_NWCONN + 1, used_ctxs); 00946 00947 n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, 00948 &rsvd_hvec, secs2wait); 00949 if(n_hnds < 0) 00950 return MQP_ERR_LIBQUIT; 00951 00952 if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs)) 00953 return MQP_ERR_LIBQUIT; 00954 00955 } while(1); 00956 } 00957 00958 int32_t mqtt_server_register_net_svc(const struct device_net_services *net) 00959 { 00960 if(net && net->io_mon && net->close && net->send && 00961 net->recv && net->time && net->listen) { 00962 net_ops = net; 00963 return 0; 00964 } 00965 00966 return -1; 00967 } 00968 00969 int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg, 00970 const struct mqtt_server_msg_cbs *msg_cbs) 00971 { 00972 cl_ctx_init(); 00973 00974 if((NULL == lib_cfg) || 00975 (0 == lib_cfg->listener_port) || 00976 (NULL == lib_cfg->debug_printf)) 00977 return -1; 00978 00979 debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ 00980 00981 loopback_port = lib_cfg->loopback_port; 00982 listener_port = lib_cfg->listener_port; 00983 00984 mutex = lib_cfg->mutex; 00985 mutex_lockin = lib_cfg->mutex_lockin; 00986 mutex_unlock = lib_cfg->mutex_unlock; 00987 00988 aux_dbg_enbl = lib_cfg->aux_debug_en; 00989 debug_printf = lib_cfg->debug_printf; 00990 00991 usr_cbs = &usr_obj; 00992 00993 memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs)); 00994 00995 mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0); 00996 00997 return 0; 00998 } 00999 01000 }//namespace mbed_mqtt
Generated on Wed Jul 13 2022 09:55:39 by 1.7.2