Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTGWPacket.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2009, 2014 IBM Corp. Tomoaki YAMAGUCHI 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 * Tomoaki Yamaguchi - modify codes for MATT-SN Gateway 00016 **************************************************************************************/ 00017 00018 #include "MQTTGWPacket.h" 00019 #include <string> 00020 #include <string.h> 00021 00022 using namespace MQTTSNGW; 00023 00024 int readInt(char** ptr); 00025 void writeInt(unsigned char** pptr, int msgId); 00026 00027 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 3 00028 /** 00029 * List of the predefined MQTT v3 packet names. 00030 */ 00031 static const char* mqtt_packet_names[] = 00032 { "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK", 00033 "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" }; 00034 00035 /** 00036 * Encodes the message length according to the MQTT algorithm 00037 * @param buf the buffer into which the encoded data is written 00038 * @param length the length to be encoded 00039 * @return the number of bytes written to buffer 00040 */ 00041 int MQTTPacket_encode(char* buf, int length) 00042 { 00043 int rc = 0; 00044 do 00045 { 00046 char d = length % 128; 00047 length /= 128; 00048 /* if there are more digits to encode, set the top bit of this digit */ 00049 if (length > 0) 00050 d |= 0x80; 00051 buf[rc++] = d; 00052 } while (length > 0); 00053 return rc; 00054 } 00055 00056 /** 00057 * Calculates an integer from two bytes read from the input buffer 00058 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00059 * @return the integer value calculated 00060 */ 00061 int readInt(char** pptr) 00062 { 00063 char* ptr = *pptr; 00064 int len = 256 * ((unsigned char) (*ptr)) + (unsigned char) (*(ptr + 1)); 00065 *pptr += 2; 00066 return len; 00067 } 00068 00069 /** 00070 * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means 00071 * a length delimited string. So it reads the two byte length then the data according to 00072 * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused 00073 * by an incorrect length. 00074 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00075 * @param enddata pointer to the end of the buffer not to be read beyond 00076 * @param len returns the calculcated value of the length bytes read 00077 * @return an allocated C string holding the characters read, or NULL if the length read would 00078 * have caused an overrun. 00079 * 00080 */ 00081 char* readUTFlen(char** pptr, char* enddata, int* len) 00082 { 00083 char* string = NULL; 00084 00085 if (enddata - (*pptr) > 1) /* enough length to read the integer? */ 00086 { 00087 *len = readInt(pptr); 00088 if (&(*pptr)[*len] <= enddata) 00089 { 00090 string = (char*)calloc(*len + 1, 1); 00091 memcpy(string, *pptr, (size_t)*len); 00092 string[*len] = '\0'; 00093 *pptr += *len; 00094 } 00095 } 00096 return string; 00097 } 00098 00099 /** 00100 * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means 00101 * a length delimited string. So it reads the two byte length then the data according to 00102 * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused 00103 * by an incorrect length. 00104 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00105 * @param enddata pointer to the end of the buffer not to be read beyond 00106 * @return an allocated C string holding the characters read, or NULL if the length read would 00107 * have caused an overrun. 00108 */ 00109 char* readUTF(char** pptr, char* enddata) 00110 { 00111 int len; 00112 return readUTFlen(pptr, enddata, &len); 00113 } 00114 00115 /** 00116 * Reads one character from the input buffer. 00117 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00118 * @return the character read 00119 */ 00120 unsigned char readChar(char** pptr) 00121 { 00122 unsigned char c = **pptr; 00123 (*pptr)++; 00124 return c; 00125 } 00126 00127 /** 00128 * Writes one character to an output buffer. 00129 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00130 * @param c the character to write 00131 */ 00132 void writeChar(unsigned char** pptr, char c) 00133 { 00134 **pptr = c; 00135 (*pptr)++; 00136 } 00137 00138 /** 00139 * Writes an integer as 2 bytes to an output buffer. 00140 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00141 * @param anInt the integer to write 00142 */ 00143 void writeInt(unsigned char** pptr, int anInt) 00144 { 00145 **pptr = (unsigned char)(anInt / 256); 00146 (*pptr)++; 00147 **pptr = (unsigned char)(anInt % 256); 00148 (*pptr)++; 00149 } 00150 00151 /** 00152 * Writes a "UTF" string to an output buffer. Converts C string to length-delimited. 00153 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00154 * @param string the C string to write 00155 */ 00156 void writeUTF(unsigned char** pptr, const char* string) 00157 { 00158 int len = (int)strlen(string); 00159 writeInt(pptr, len); 00160 memcpy(*pptr, string, (size_t)len); 00161 *pptr += len; 00162 } 00163 00164 /** 00165 * Lapper class of MQTTPacket 00166 * 00167 */ 00168 MQTTGWPacket::MQTTGWPacket() 00169 { 00170 _data = 0; 00171 _header.byte = 0; 00172 _remainingLength = 0; 00173 } 00174 00175 MQTTGWPacket::~MQTTGWPacket() 00176 { 00177 if (_data) 00178 { 00179 free(_data); 00180 } 00181 } 00182 00183 int MQTTGWPacket::recv(Network* network) 00184 { 00185 int len = 0; 00186 int multiplier = 1; 00187 unsigned char c; 00188 00189 /* read First Byte of Packet */ 00190 int rc = network->recv((unsigned char*)&_header.byte, 1); 00191 if ( rc <= 0) 00192 { 00193 return rc; 00194 } 00195 /* read RemainingLength */ 00196 do 00197 { 00198 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00199 { 00200 return -2; 00201 } 00202 if (network->recv(&c, 1) == -1) 00203 { 00204 return -1; 00205 } 00206 _remainingLength += (c & 127) * multiplier; 00207 multiplier *= 128; 00208 } while ((c & 128) != 0); 00209 00210 if ( _remainingLength > 0 ) 00211 { 00212 /* allocate buffer */ 00213 _data = (unsigned char*)calloc(_remainingLength, 1); 00214 if ( !_data ) 00215 { 00216 return -3; 00217 } 00218 00219 /* read Payload */ 00220 int remlen = network->recv(_data, _remainingLength); 00221 00222 if (remlen == -1 ) 00223 { 00224 return -1; 00225 } 00226 else if ( remlen != _remainingLength ) 00227 { 00228 return -2; 00229 } 00230 } 00231 return 1 + len + _remainingLength; 00232 } 00233 00234 int MQTTGWPacket::send(Network* network) 00235 { 00236 unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE]; 00237 memset(buf, 0, MQTTSNGW_MAX_PACKET_SIZE); 00238 int len = getPacketData(buf); 00239 return network->send(buf, len); 00240 00241 } 00242 00243 int MQTTGWPacket::getAck(Ack* ack) 00244 { 00245 if (PUBACK != _header.bits.type && PUBREC != _header.bits.type && PUBREL != _header.bits.type 00246 && PUBCOMP != _header.bits.type && UNSUBACK != _header.bits.type) 00247 { 00248 return 0; 00249 } 00250 char* ptr = (char*) _data; 00251 ack->header.byte = _header.byte; 00252 ack->msgId = readInt((char**) &ptr); 00253 return 1; 00254 } 00255 00256 int MQTTGWPacket::getCONNACK(Connack* resp) 00257 { 00258 if (_header.bits.type != CONNACK) 00259 { 00260 return 0; 00261 } 00262 char* ptr = (char*) _data; 00263 resp->header.byte = _header.byte; 00264 resp->flags.all = *ptr++; 00265 resp->rc = readChar(&ptr); 00266 return 1; 00267 } 00268 00269 int MQTTGWPacket::getSUBACK(unsigned short* msgId, unsigned char* rc) 00270 { 00271 if (_header.bits.type != SUBACK) 00272 { 00273 return 0; 00274 } 00275 char *ptr = (char*) _data; 00276 *msgId = readInt((char**) &ptr); 00277 *rc = readChar(&ptr); 00278 return 1; 00279 } 00280 00281 int MQTTGWPacket::getPUBLISH(Publish* pub) 00282 { 00283 if (_header.bits.type != PUBLISH) 00284 { 00285 return 0; 00286 } 00287 char* ptr = (char*) _data; 00288 pub->header = _header; 00289 pub->topiclen = readInt((char**) &ptr); 00290 pub->topic = (char*) _data + 2; 00291 ptr += pub->topiclen; 00292 if (_header.bits.qos > 0) 00293 { 00294 pub->msgId = readInt(&ptr); 00295 pub->payloadlen = _remainingLength - pub->topiclen - 4; 00296 } 00297 else 00298 { 00299 pub->msgId = 0; 00300 pub->payloadlen = _remainingLength - pub->topiclen - 2; 00301 } 00302 pub->payload = ptr; 00303 return 1; 00304 } 00305 00306 int MQTTGWPacket::setCONNECT(Connect* connect, unsigned char* username, unsigned char* password) 00307 { 00308 clearData(); 00309 _header = connect->header; 00310 00311 _remainingLength = ((connect->version == 3) ? 12 : 10) + (int)strlen(connect->clientID) + 2; 00312 if (connect->flags.bits.will) 00313 { 00314 _remainingLength += (int)strlen(connect->willTopic) + 2 + (int)strlen(connect->willMsg) + 2; 00315 } 00316 if ( connect->flags.bits.username ) 00317 { 00318 _remainingLength += (int)strlen((char*) username) + 2; 00319 } 00320 if (connect->flags.bits.password) 00321 { 00322 _remainingLength += (int)strlen((char*) password) + 2; 00323 } 00324 00325 _data = (unsigned char*)calloc(_remainingLength, 1); 00326 unsigned char* ptr = _data; 00327 00328 if (connect->version == 3) 00329 { 00330 writeUTF(&ptr, "MQIsdp"); 00331 writeChar(&ptr, (char) 3); 00332 } 00333 else if (connect->version == 4) 00334 { 00335 writeUTF(&ptr, "MQTT"); 00336 writeChar(&ptr, (char) 4); 00337 } 00338 else 00339 { 00340 return 0; 00341 } 00342 00343 writeChar(&ptr, connect->flags.all); 00344 writeInt(&ptr, connect->keepAliveTimer); 00345 writeUTF(&ptr, connect->clientID); 00346 if (connect->flags.bits.will) 00347 { 00348 writeUTF(&ptr, connect->willTopic); 00349 writeUTF(&ptr, connect->willMsg); 00350 } 00351 00352 if (connect->flags.bits.username) 00353 { 00354 writeUTF(&ptr, (const char*) username); 00355 } 00356 if (connect->flags.bits.password) 00357 { 00358 writeUTF(&ptr, (const char*) password); 00359 } 00360 return 1; 00361 } 00362 00363 int MQTTGWPacket::setSUBSCRIBE(const char* topic, unsigned char qos, unsigned short msgId) 00364 { 00365 clearData(); 00366 _header.byte = 0; 00367 _header.bits.type = SUBSCRIBE; 00368 _header.bits.qos = 1; // Reserved 00369 _remainingLength = (int)strlen(topic) + 5; 00370 _data = (unsigned char*)calloc(_remainingLength, 1); 00371 if (_data) 00372 { 00373 unsigned char* ptr = _data; 00374 writeInt(&ptr, msgId); 00375 writeUTF(&ptr, topic); 00376 writeChar(&ptr, (char) qos); 00377 return 1; 00378 } 00379 clearData(); 00380 return 0; 00381 } 00382 00383 int MQTTGWPacket::setUNSUBSCRIBE(const char* topic, unsigned short msgid) 00384 { 00385 clearData(); 00386 _header.byte = 0; 00387 _header.bits.type = UNSUBSCRIBE; 00388 _header.bits.qos = 1; 00389 _remainingLength = (int)strlen(topic) + 4; 00390 _data = (unsigned char*)calloc(_remainingLength, 1); 00391 if (_data) 00392 { 00393 unsigned char* ptr = _data; 00394 writeInt(&ptr, msgid); 00395 writeUTF(&ptr, topic); 00396 return 1; 00397 } 00398 clearData(); 00399 return 0; 00400 00401 } 00402 00403 int MQTTGWPacket::setPUBLISH(Publish* pub) 00404 { 00405 clearData(); 00406 _header.byte = pub->header.byte; 00407 _header.bits.type = PUBLISH; 00408 _remainingLength = 4 + pub->topiclen + pub->payloadlen; 00409 _data = (unsigned char*)calloc(_remainingLength, 1); 00410 if (_data) 00411 { 00412 unsigned char* ptr = _data; 00413 writeInt(&ptr, pub->topiclen); 00414 memcpy(ptr, pub->topic, pub->topiclen); 00415 ptr += pub->topiclen; 00416 if ( _header.bits.qos > 0 ) 00417 { 00418 writeInt(&ptr, pub->msgId); 00419 } 00420 else 00421 { 00422 _remainingLength -= 2; 00423 } 00424 memcpy(ptr, pub->payload, pub->payloadlen); 00425 return 1; 00426 } 00427 else 00428 { 00429 clearData(); 00430 return 0; 00431 } 00432 } 00433 00434 int MQTTGWPacket::setAck(unsigned char msgType, unsigned short msgid) 00435 { 00436 clearData(); 00437 _remainingLength = 2; 00438 _header.bits.type = msgType; 00439 _header.bits.qos = (msgType == PUBREL) ? 1 : 0; 00440 00441 _data = (unsigned char*)calloc(_remainingLength, 1); 00442 if (_data) 00443 { 00444 unsigned char* data = _data; 00445 writeInt(&data, msgid); 00446 return 1; 00447 } 00448 return 0; 00449 } 00450 00451 int MQTTGWPacket::setHeader(unsigned char msgType) 00452 { 00453 clearData(); 00454 if (msgType < CONNECT || msgType > DISCONNECT) 00455 { 00456 return 0; 00457 } 00458 _header.bits.type = msgType; 00459 return 0; 00460 } 00461 00462 int MQTTGWPacket::getType(void) 00463 { 00464 return _header.bits.type; 00465 } 00466 00467 const char* MQTTGWPacket::getName(void) 00468 { 00469 return getType() > DISCONNECT ? "UNKNOWN" : mqtt_packet_names[getType()]; 00470 } 00471 00472 int MQTTGWPacket::getPacketData(unsigned char* buf) 00473 { 00474 unsigned char* ptr = buf; 00475 *ptr++ = _header.byte; 00476 int len = MQTTPacket_encode((char*)ptr, _remainingLength); 00477 ptr += len; 00478 memcpy(ptr, _data, _remainingLength); 00479 return 1 + len + _remainingLength; 00480 } 00481 00482 int MQTTGWPacket::getPacketLength(void) 00483 { 00484 char buf[4]; 00485 return 1 + MQTTPacket_encode(buf, _remainingLength) + _remainingLength; 00486 } 00487 00488 void MQTTGWPacket::clearData(void) 00489 { 00490 if (_data) 00491 { 00492 free(_data); 00493 } 00494 _header.byte = 0; 00495 _remainingLength = 0; 00496 } 00497 00498 char* MQTTGWPacket::getMsgId(char* pbuf) 00499 { 00500 int type = getType(); 00501 00502 switch ( type ) 00503 { 00504 case PUBLISH: 00505 Publish pub; 00506 pub.msgId = 0; 00507 getPUBLISH(&pub); 00508 if ( _header.bits.dup ) 00509 { 00510 sprintf(pbuf, "+%04X", pub.msgId); 00511 } 00512 else 00513 { 00514 sprintf(pbuf, " %04X", pub.msgId); 00515 } 00516 break; 00517 case SUBSCRIBE: 00518 case UNSUBSCRIBE: 00519 case PUBACK: 00520 case PUBREC: 00521 case PUBREL: 00522 case PUBCOMP: 00523 case SUBACK: 00524 case UNSUBACK: 00525 sprintf(pbuf, " %02X%02X", _data[0], _data[1]); 00526 break; 00527 default: 00528 sprintf(pbuf, " "); 00529 break; 00530 } 00531 if ( strcmp(pbuf, " 0000") == 0 ) 00532 { 00533 sprintf(pbuf, " "); 00534 } 00535 return pbuf; 00536 } 00537 00538 int MQTTGWPacket::getMsgId(void) 00539 { 00540 int type = getType(); 00541 int msgId = 0; 00542 00543 switch ( type ) 00544 { 00545 case PUBLISH: 00546 Publish pub; 00547 pub.msgId = 0; 00548 getPUBLISH(&pub); 00549 msgId = pub.msgId; 00550 break; 00551 case PUBACK: 00552 case PUBREC: 00553 case PUBREL: 00554 case PUBCOMP: 00555 case SUBSCRIBE: 00556 case UNSUBSCRIBE: 00557 case SUBACK: 00558 case UNSUBACK: 00559 msgId = 256 * (unsigned char)_data[0] + (unsigned char)_data[1]; 00560 break; 00561 default: 00562 break; 00563 } 00564 return msgId; 00565 } 00566 00567 void MQTTGWPacket::setMsgId(int msgId) 00568 { 00569 int type = getType(); 00570 unsigned char* ptr = 0; 00571 00572 switch ( type ) 00573 { 00574 case PUBLISH: 00575 Publish pub; 00576 pub.topiclen = 0; 00577 pub.msgId = 0; 00578 getPUBLISH(&pub); 00579 pub.msgId = msgId; 00580 ptr = _data + pub.topiclen; 00581 writeInt(&ptr, pub.msgId); 00582 *ptr++ = (unsigned char)(msgId / 256); 00583 *ptr = (unsigned char)(msgId % 256); 00584 break; 00585 case SUBSCRIBE: 00586 case UNSUBSCRIBE: 00587 case PUBACK: 00588 case PUBREC: 00589 case PUBREL: 00590 case PUBCOMP: 00591 case SUBACK: 00592 case UNSUBACK: 00593 ptr = _data; 00594 *ptr++ = (unsigned char)(msgId / 256); 00595 *ptr = (unsigned char)(msgId % 256); 00596 break; 00597 default: 00598 break; 00599 } 00600 } 00601 00602 char* MQTTGWPacket::print(char* pbuf) 00603 { 00604 uint8_t packetData[MQTTSNGW_MAX_PACKET_SIZE]; 00605 char* ptr = pbuf; 00606 char** pptr = &pbuf; 00607 int len = getPacketData(packetData); 00608 int size = len > SIZE_OF_LOG_PACKET ? SIZE_OF_LOG_PACKET : len; 00609 for (int i = 0; i < size; i++) 00610 { 00611 sprintf(*pptr, " %02X", packetData[i]); 00612 *pptr += 3; 00613 } 00614 **pptr = 0; 00615 return ptr; 00616 } 00617 00618 MQTTGWPacket& MQTTGWPacket::operator =(MQTTGWPacket& packet) 00619 { 00620 clearData(); 00621 this->_header.byte = packet._header.byte; 00622 this->_remainingLength = packet._remainingLength; 00623 _data = (unsigned char*)calloc(_remainingLength, 1); 00624 if (_data) 00625 { 00626 memcpy(this->_data, packet._data, _remainingLength); 00627 } 00628 else 00629 { 00630 clearData(); 00631 } 00632 return *this; 00633 } 00634 00635 UTF8String MQTTGWPacket::getTopic(void) 00636 { 00637 UTF8String str = {0, nullptr}; 00638 if ( _header.bits.type == SUBSCRIBE || _header.bits.type == UNSUBSCRIBE ) 00639 { 00640 char* ptr = (char*)(_data + 2); 00641 str.len = readInt(&ptr); 00642 str.data = (char*)(_data + 4); 00643 } 00644 return str; 00645 }
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2