Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTGWPacket.cpp Source File

MQTTGWPacket.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2009, 2014 IBM Corp. Tomoaki YAMAGUCHI
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *    Tomoaki Yamaguchi - modify codes for MATT-SN Gateway
00016  **************************************************************************************/
00017 
00018 #include "MQTTGWPacket.h"
00019 #include <string>
00020 #include <string.h>
00021 
00022 using namespace MQTTSNGW;
00023 
00024 int readInt(char** ptr);
00025 void writeInt(unsigned char** pptr, int msgId);
00026 
00027 #define MAX_NO_OF_REMAINING_LENGTH_BYTES    3
00028 /**
00029  * List of the predefined MQTT v3 packet names.
00030  */
00031 static const char* mqtt_packet_names[] =
00032 { "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK",
00033         "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" };
00034 
00035 /**
00036  * Encodes the message length according to the MQTT algorithm
00037  * @param buf the buffer into which the encoded data is written
00038  * @param length the length to be encoded
00039  * @return the number of bytes written to buffer
00040  */
00041 int MQTTPacket_encode(char* buf, int length)
00042 {
00043     int rc = 0;
00044     do
00045     {
00046         char d = length % 128;
00047         length /= 128;
00048         /* if there are more digits to encode, set the top bit of this digit */
00049         if (length > 0)
00050             d |= 0x80;
00051         buf[rc++] = d;
00052     } while (length > 0);
00053     return rc;
00054 }
00055 
00056 /**
00057  * Calculates an integer from two bytes read from the input buffer
00058  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
00059  * @return the integer value calculated
00060  */
00061 int readInt(char** pptr)
00062 {
00063     char* ptr = *pptr;
00064     int len = 256 * ((unsigned char) (*ptr)) + (unsigned char) (*(ptr + 1));
00065     *pptr += 2;
00066     return len;
00067 }
00068 
00069 /**
00070  * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
00071  * a length delimited string.  So it reads the two byte length then the data according to
00072  * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
00073  * by an incorrect length.
00074  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
00075  * @param enddata pointer to the end of the buffer not to be read beyond
00076  * @param len returns the calculcated value of the length bytes read
00077  * @return an allocated C string holding the characters read, or NULL if the length read would
00078  * have caused an overrun.
00079  *
00080  */
00081 char* readUTFlen(char** pptr, char* enddata, int* len)
00082 {
00083     char* string = NULL;
00084 
00085     if (enddata - (*pptr) > 1) /* enough length to read the integer? */
00086     {
00087         *len = readInt(pptr);
00088         if (&(*pptr)[*len] <= enddata)
00089         {
00090             string = (char*)calloc(*len + 1, 1);
00091             memcpy(string, *pptr, (size_t)*len);
00092             string[*len] = '\0';
00093             *pptr += *len;
00094         }
00095     }
00096     return string;
00097 }
00098 
00099 /**
00100  * Reads a "UTF" string from the input buffer.  UTF as in the MQTT v3 spec which really means
00101  * a length delimited string.  So it reads the two byte length then the data according to
00102  * that length.  The end of the buffer is provided too, so we can prevent buffer overruns caused
00103  * by an incorrect length.
00104  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
00105  * @param enddata pointer to the end of the buffer not to be read beyond
00106  * @return an allocated C string holding the characters read, or NULL if the length read would
00107  * have caused an overrun.
00108  */
00109 char* readUTF(char** pptr, char* enddata)
00110 {
00111     int len;
00112     return readUTFlen(pptr, enddata, &len);
00113 }
00114 
00115 /**
00116  * Reads one character from the input buffer.
00117  * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
00118  * @return the character read
00119  */
00120 unsigned char readChar(char** pptr)
00121 {
00122     unsigned char c = **pptr;
00123     (*pptr)++;
00124     return c;
00125 }
00126 
00127 /**
00128  * Writes one character to an output buffer.
00129  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
00130  * @param c the character to write
00131  */
00132 void writeChar(unsigned char** pptr, char c)
00133 {
00134     **pptr = c;
00135     (*pptr)++;
00136 }
00137 
00138 /**
00139  * Writes an integer as 2 bytes to an output buffer.
00140  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
00141  * @param anInt the integer to write
00142  */
00143 void writeInt(unsigned char** pptr, int anInt)
00144 {
00145     **pptr = (unsigned char)(anInt / 256);
00146     (*pptr)++;
00147     **pptr = (unsigned char)(anInt % 256);
00148     (*pptr)++;
00149 }
00150 
00151 /**
00152  * Writes a "UTF" string to an output buffer.  Converts C string to length-delimited.
00153  * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
00154  * @param string the C string to write
00155  */
00156 void writeUTF(unsigned char** pptr, const char* string)
00157 {
00158     int len = (int)strlen(string);
00159     writeInt(pptr, len);
00160     memcpy(*pptr, string, (size_t)len);
00161     *pptr += len;
00162 }
00163 
00164 /**
00165  * Lapper class of MQTTPacket
00166  *
00167  */
00168 MQTTGWPacket::MQTTGWPacket()
00169 {
00170     _data = 0;
00171     _header.byte = 0;
00172     _remainingLength = 0;
00173 }
00174 
00175 MQTTGWPacket::~MQTTGWPacket()
00176 {
00177     if (_data)
00178     {
00179         free(_data);
00180     }
00181 }
00182 
00183 int MQTTGWPacket::recv(Network* network)
00184 {
00185     int len = 0;
00186     int multiplier = 1;
00187     unsigned char c;
00188 
00189     /* read First Byte of Packet */
00190     int rc = network->recv((unsigned char*)&_header.byte, 1);
00191     if ( rc <= 0)
00192     {
00193         return rc;
00194     }
00195     /* read RemainingLength */
00196     do
00197     {
00198         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00199         {
00200             return -2;
00201         }
00202         if (network->recv(&c, 1) == -1)
00203         {
00204             return -1;
00205         }
00206         _remainingLength += (c & 127) * multiplier;
00207         multiplier *= 128;
00208     } while ((c & 128) != 0);
00209 
00210     if ( _remainingLength > 0 )
00211     {
00212         /* allocate buffer */
00213         _data = (unsigned char*)calloc(_remainingLength, 1);
00214         if ( !_data )
00215         {
00216             return -3;
00217         }
00218 
00219         /* read Payload */
00220         int remlen = network->recv(_data, _remainingLength);
00221 
00222         if (remlen == -1 )
00223         {
00224             return -1;
00225         }
00226         else if ( remlen != _remainingLength )
00227         {
00228             return -2;
00229         }
00230     }
00231     return 1 + len + _remainingLength;
00232 }
00233 
00234 int MQTTGWPacket::send(Network* network)
00235 {
00236     unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
00237     memset(buf, 0, MQTTSNGW_MAX_PACKET_SIZE);
00238     int len = getPacketData(buf);
00239     return network->send(buf, len);
00240 
00241 }
00242 
00243 int MQTTGWPacket::getAck(Ack* ack)
00244 {
00245     if (PUBACK != _header.bits.type && PUBREC != _header.bits.type && PUBREL != _header.bits.type
00246             && PUBCOMP != _header.bits.type && UNSUBACK != _header.bits.type)
00247     {
00248         return 0;
00249     }
00250     char* ptr = (char*) _data;
00251     ack->header.byte = _header.byte;
00252     ack->msgId = readInt((char**) &ptr);
00253     return 1;
00254 }
00255 
00256 int MQTTGWPacket::getCONNACK(Connack* resp)
00257 {
00258     if (_header.bits.type != CONNACK)
00259     {
00260         return 0;
00261     }
00262     char* ptr = (char*) _data;
00263     resp->header.byte = _header.byte;
00264     resp->flags.all = *ptr++;
00265     resp->rc = readChar(&ptr);
00266     return 1;
00267 }
00268 
00269 int MQTTGWPacket::getSUBACK(unsigned short* msgId, unsigned char* rc)
00270 {
00271     if (_header.bits.type != SUBACK)
00272     {
00273         return 0;
00274     }
00275     char *ptr = (char*) _data;
00276     *msgId = readInt((char**) &ptr);
00277     *rc = readChar(&ptr);
00278     return 1;
00279 }
00280 
00281 int MQTTGWPacket::getPUBLISH(Publish* pub)
00282 {
00283     if (_header.bits.type != PUBLISH)
00284     {
00285         return 0;
00286     }
00287     char* ptr = (char*) _data;
00288     pub->header = _header;
00289     pub->topiclen = readInt((char**) &ptr);
00290     pub->topic = (char*) _data + 2;
00291     ptr += pub->topiclen;
00292     if (_header.bits.qos > 0)
00293     {
00294         pub->msgId = readInt(&ptr);
00295         pub->payloadlen = _remainingLength - pub->topiclen - 4;
00296     }
00297     else
00298     {
00299         pub->msgId = 0;
00300         pub->payloadlen = _remainingLength - pub->topiclen - 2;
00301     }
00302     pub->payload = ptr;
00303     return 1;
00304 }
00305 
00306 int MQTTGWPacket::setCONNECT(Connect* connect, unsigned char* username, unsigned char* password)
00307 {
00308     clearData();
00309     _header = connect->header;
00310 
00311     _remainingLength = ((connect->version == 3) ? 12 : 10) + (int)strlen(connect->clientID) + 2;
00312     if (connect->flags.bits.will)
00313     {
00314         _remainingLength += (int)strlen(connect->willTopic) + 2 + (int)strlen(connect->willMsg) + 2;
00315     }
00316     if ( connect->flags.bits.username )
00317     {
00318         _remainingLength += (int)strlen((char*) username) + 2;
00319     }
00320     if (connect->flags.bits.password)
00321     {
00322         _remainingLength += (int)strlen((char*) password) + 2;
00323     }
00324 
00325     _data = (unsigned char*)calloc(_remainingLength, 1);
00326     unsigned char* ptr = _data;
00327 
00328     if (connect->version == 3)
00329     {
00330         writeUTF(&ptr, "MQIsdp");
00331         writeChar(&ptr, (char) 3);
00332     }
00333     else if (connect->version == 4)
00334     {
00335         writeUTF(&ptr, "MQTT");
00336         writeChar(&ptr, (char) 4);
00337     }
00338     else
00339     {
00340         return 0;
00341     }
00342 
00343     writeChar(&ptr, connect->flags.all);
00344     writeInt(&ptr, connect->keepAliveTimer);
00345     writeUTF(&ptr, connect->clientID);
00346     if (connect->flags.bits.will)
00347     {
00348         writeUTF(&ptr, connect->willTopic);
00349         writeUTF(&ptr, connect->willMsg);
00350     }
00351 
00352     if (connect->flags.bits.username)
00353     {
00354         writeUTF(&ptr, (const char*) username);
00355     }
00356     if (connect->flags.bits.password)
00357     {
00358         writeUTF(&ptr, (const char*) password);
00359     }
00360     return 1;
00361 }
00362 
00363 int MQTTGWPacket::setSUBSCRIBE(const char* topic, unsigned char qos, unsigned short msgId)
00364 {
00365     clearData();
00366     _header.byte = 0;
00367     _header.bits.type = SUBSCRIBE;
00368     _header.bits.qos = 1;          // Reserved
00369     _remainingLength = (int)strlen(topic) + 5;
00370     _data = (unsigned char*)calloc(_remainingLength, 1);
00371     if (_data)
00372     {
00373         unsigned char* ptr = _data;
00374         writeInt(&ptr, msgId);
00375         writeUTF(&ptr, topic);
00376         writeChar(&ptr, (char) qos);
00377         return 1;
00378     }
00379     clearData();
00380     return 0;
00381 }
00382 
00383 int MQTTGWPacket::setUNSUBSCRIBE(const char* topic, unsigned short msgid)
00384 {
00385     clearData();
00386     _header.byte = 0;
00387     _header.bits.type = UNSUBSCRIBE;
00388     _header.bits.qos = 1;
00389     _remainingLength = (int)strlen(topic) + 4;
00390     _data = (unsigned char*)calloc(_remainingLength, 1);
00391     if (_data)
00392     {
00393         unsigned char* ptr = _data;
00394         writeInt(&ptr, msgid);
00395         writeUTF(&ptr, topic);
00396         return 1;
00397     }
00398     clearData();
00399     return 0;
00400 
00401 }
00402 
00403 int MQTTGWPacket::setPUBLISH(Publish* pub)
00404 {
00405     clearData();
00406     _header.byte = pub->header.byte;
00407     _header.bits.type = PUBLISH;
00408     _remainingLength = 4 + pub->topiclen + pub->payloadlen;
00409     _data = (unsigned char*)calloc(_remainingLength, 1);
00410     if (_data)
00411     {
00412         unsigned char* ptr = _data;
00413         writeInt(&ptr, pub->topiclen);
00414         memcpy(ptr, pub->topic, pub->topiclen);
00415         ptr += pub->topiclen;
00416         if ( _header.bits.qos > 0 )
00417         {
00418             writeInt(&ptr, pub->msgId);
00419         }
00420         else
00421         {
00422             _remainingLength -= 2;
00423         }
00424         memcpy(ptr, pub->payload, pub->payloadlen);
00425         return 1;
00426     }
00427     else
00428     {
00429         clearData();
00430         return 0;
00431     }
00432 }
00433 
00434 int MQTTGWPacket::setAck(unsigned char msgType, unsigned short msgid)
00435 {
00436     clearData();
00437     _remainingLength = 2;
00438     _header.bits.type = msgType;
00439     _header.bits.qos = (msgType == PUBREL) ? 1 : 0;
00440 
00441     _data = (unsigned char*)calloc(_remainingLength, 1);
00442     if (_data)
00443     {
00444         unsigned char* data = _data;
00445         writeInt(&data, msgid);
00446         return 1;
00447     }
00448     return 0;
00449 }
00450 
00451 int MQTTGWPacket::setHeader(unsigned char msgType)
00452 {
00453     clearData();
00454     if (msgType < CONNECT || msgType > DISCONNECT)
00455     {
00456         return 0;
00457     }
00458     _header.bits.type = msgType;
00459     return 0;
00460 }
00461 
00462 int MQTTGWPacket::getType(void)
00463 {
00464     return _header.bits.type;
00465 }
00466 
00467 const char* MQTTGWPacket::getName(void)
00468 {
00469     return getType() > DISCONNECT ? "UNKNOWN" : mqtt_packet_names[getType()];
00470 }
00471 
00472 int MQTTGWPacket::getPacketData(unsigned char* buf)
00473 {
00474     unsigned char* ptr = buf;
00475     *ptr++ = _header.byte;
00476     int len = MQTTPacket_encode((char*)ptr, _remainingLength);
00477     ptr += len;
00478     memcpy(ptr, _data, _remainingLength);
00479     return 1 + len + _remainingLength;
00480 }
00481 
00482 int MQTTGWPacket::getPacketLength(void)
00483 {
00484     char buf[4];
00485     return 1 + MQTTPacket_encode(buf, _remainingLength) + _remainingLength;
00486 }
00487 
00488 void MQTTGWPacket::clearData(void)
00489 {
00490     if (_data)
00491     {
00492         free(_data);
00493     }
00494     _header.byte = 0;
00495     _remainingLength = 0;
00496 }
00497 
00498 char* MQTTGWPacket::getMsgId(char* pbuf)
00499 {
00500     int type = getType();
00501 
00502     switch ( type )
00503     {
00504     case PUBLISH:
00505         Publish pub;
00506         pub.msgId = 0;
00507         getPUBLISH(&pub);
00508         if ( _header.bits.dup )
00509         {
00510             sprintf(pbuf, "+%04X", pub.msgId);
00511         }
00512         else
00513         {
00514             sprintf(pbuf, " %04X", pub.msgId);
00515         }
00516         break;
00517     case SUBSCRIBE:
00518     case UNSUBSCRIBE:
00519     case PUBACK:
00520     case PUBREC:
00521     case PUBREL:
00522     case PUBCOMP:
00523     case SUBACK:
00524     case UNSUBACK:
00525         sprintf(pbuf, " %02X%02X", _data[0], _data[1]);
00526         break;
00527     default:
00528         sprintf(pbuf, "    ");
00529         break;
00530     }
00531     if ( strcmp(pbuf, " 0000") == 0 )
00532     {
00533         sprintf(pbuf, "    ");
00534     }
00535     return pbuf;
00536 }
00537 
00538 int MQTTGWPacket::getMsgId(void)
00539 {
00540     int type = getType();
00541     int msgId = 0;
00542 
00543     switch ( type )
00544     {
00545     case PUBLISH:
00546         Publish pub;
00547         pub.msgId = 0;
00548         getPUBLISH(&pub);
00549         msgId = pub.msgId;
00550         break;
00551     case PUBACK:
00552     case PUBREC:
00553     case PUBREL:
00554     case PUBCOMP:
00555     case SUBSCRIBE:
00556     case UNSUBSCRIBE:
00557     case SUBACK:
00558     case UNSUBACK:
00559         msgId = 256 * (unsigned char)_data[0] + (unsigned char)_data[1];
00560         break;
00561     default:
00562         break;
00563     }
00564     return msgId;
00565 }
00566 
00567 void MQTTGWPacket::setMsgId(int msgId)
00568 {
00569     int type = getType();
00570     unsigned char* ptr = 0;
00571 
00572     switch ( type )
00573     {
00574     case PUBLISH:
00575         Publish pub;
00576         pub.topiclen = 0;
00577         pub.msgId = 0;
00578         getPUBLISH(&pub);
00579         pub.msgId = msgId;
00580         ptr = _data + pub.topiclen;
00581         writeInt(&ptr, pub.msgId);
00582         *ptr++ = (unsigned char)(msgId / 256);
00583         *ptr = (unsigned char)(msgId % 256);
00584         break;
00585     case SUBSCRIBE:
00586     case UNSUBSCRIBE:
00587     case PUBACK:
00588     case PUBREC:
00589     case PUBREL:
00590     case PUBCOMP:
00591     case SUBACK:
00592     case UNSUBACK:
00593         ptr = _data;
00594         *ptr++ = (unsigned char)(msgId / 256);
00595         *ptr = (unsigned char)(msgId % 256);
00596         break;
00597     default:
00598         break;
00599     }
00600 }
00601 
00602 char* MQTTGWPacket::print(char* pbuf)
00603 {
00604     uint8_t packetData[MQTTSNGW_MAX_PACKET_SIZE];
00605     char* ptr = pbuf;
00606     char** pptr = &pbuf;
00607     int len = getPacketData(packetData);
00608     int size = len > SIZE_OF_LOG_PACKET ? SIZE_OF_LOG_PACKET : len;
00609     for (int i = 0; i < size; i++)
00610     {
00611         sprintf(*pptr, " %02X", packetData[i]);
00612         *pptr += 3;
00613     }
00614     **pptr = 0;
00615     return ptr;
00616 }
00617 
00618 MQTTGWPacket& MQTTGWPacket::operator =(MQTTGWPacket& packet)
00619 {
00620     clearData();
00621     this->_header.byte = packet._header.byte;
00622     this->_remainingLength = packet._remainingLength;
00623     _data = (unsigned char*)calloc(_remainingLength, 1);
00624     if (_data)
00625     {
00626         memcpy(this->_data, packet._data, _remainingLength);
00627     }
00628     else
00629     {
00630         clearData();
00631     }
00632     return *this;
00633 }
00634 
00635 UTF8String MQTTGWPacket::getTopic(void)
00636 {
00637     UTF8String str = {0, nullptr};
00638     if ( _header.bits.type == SUBSCRIBE || _header.bits.type == UNSUBSCRIBE )
00639     {
00640         char* ptr = (char*)(_data + 2);
00641         str.len = readInt(&ptr);
00642         str.data = (char*)(_data + 4);
00643     }
00644     return str;
00645 }