Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWPublishHandler.cpp Source File

MQTTSNGWPublishHandler.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  *    Tieto Poland Sp. z o.o. - Gateway improvements
00016  **************************************************************************************/
00017 
00018 #include "MQTTSNGWPublishHandler.h"
00019 #include "MQTTSNGWPacket.h"
00020 #include "MQTTGWPacket.h"
00021 #include "MQTTSNGateway.h"
00022 #include "MQTTSNGWClient.h"
00023 #include "MQTTSNGWQoSm1Proxy.h"
00024 #include <string.h>
00025 using namespace std;
00026 using namespace MQTTSNGW;
00027 
00028 MQTTSNPublishHandler::MQTTSNPublishHandler(Gateway* gateway)
00029 {
00030     _gateway = gateway;
00031 }
00032 
00033 MQTTSNPublishHandler::~MQTTSNPublishHandler()
00034 {
00035 
00036 }
00037 
00038 MQTTGWPacket* MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
00039 {
00040     uint8_t dup;
00041     int qos;
00042     uint8_t retained;
00043     uint16_t msgId;
00044     uint8_t* payload;
00045     MQTTSN_topicid topicid;
00046     int payloadlen;
00047     Publish pub = MQTTPacket_Publish_Initializer;
00048 
00049     char  shortTopic[2];
00050 
00051     if ( !_gateway->getAdapterManager()->getQoSm1Proxy()->isActive() )
00052     {
00053         if ( client->isQoSm1() )
00054         {
00055             _gateway->getAdapterManager()->getQoSm1Proxy()->savePacket(client, packet);
00056 
00057             return nullptr;
00058         }
00059     }
00060 
00061     if ( packet->getPUBLISH(&dup, &qos, &retained, &msgId, &topicid, &payload, &payloadlen) ==0 )
00062     {
00063         return nullptr;
00064     }
00065     pub.msgId = msgId;
00066     pub.header.bits.dup = dup;
00067     pub.header.bits.qos = ( qos == 3 ? 0 : qos );
00068     pub.header.bits.retain = retained;
00069 
00070     Topic* topic = nullptr;
00071 
00072     if( topicid.type ==  MQTTSN_TOPIC_TYPE_SHORT )
00073     {
00074         shortTopic[0] = topicid.data.short_name[0];
00075         shortTopic[1] = topicid.data.short_name[1];
00076         pub.topic = shortTopic;
00077         pub.topiclen = 2;
00078     }
00079     else
00080     {
00081         topic = client->getTopics()->getTopicById(&topicid);
00082         if ( !topic )
00083         {
00084             topic = _gateway->getTopics()->getTopicById(&topicid);
00085             if ( topic )
00086             {
00087                 topic = client->getTopics()->add(topic->getTopicName()->c_str(), topic->getTopicId());
00088             }
00089         }
00090 
00091         if( !topic && qos == 3 )
00092         {
00093             WRITELOG("%s Invalid TopicId.%s %s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
00094             return nullptr;
00095         }
00096 
00097         if( !topic && msgId && qos > 0 && qos < 3 )
00098         {
00099             /* Reply PubAck with INVALID_TOPIC_ID to the client */
00100             MQTTSNPacket* pubAck = new MQTTSNPacket();
00101             pubAck->setPUBACK( topicid.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID);
00102             Event* ev1 = new Event();
00103             ev1->setClientSendEvent(client, pubAck);
00104             _gateway->getClientSendQue()->post(ev1);
00105             return nullptr;
00106         }
00107         if ( topic )
00108         {
00109             pub.topic = (char*)topic->getTopicName()->data();
00110             pub.topiclen = topic->getTopicName()->length();
00111         }
00112     }
00113     /* Save a msgId & a TopicId pare for PUBACK */
00114     if( msgId && qos > 0 && qos < 3)
00115     {
00116         client->setWaitedPubTopicId(msgId, topicid.data.id, topicid.type);
00117     }
00118 
00119     pub.payload = (char*)payload;
00120     pub.payloadlen = payloadlen;
00121 
00122     MQTTGWPacket* publish = new MQTTGWPacket();
00123     publish->setPUBLISH(&pub);
00124 
00125     if ( _gateway->getAdapterManager()->isAggregaterActive() && client->isAggregated() )
00126     {
00127         return publish;
00128     }
00129     else
00130     {
00131         Event* ev1 = new Event();
00132         ev1->setBrokerSendEvent(client, publish);
00133         _gateway->getBrokerSendQue()->post(ev1);
00134         return nullptr;
00135     }
00136 }
00137 
00138 void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
00139 {
00140     uint16_t topicId;
00141     uint16_t msgId;
00142     uint8_t rc;
00143 
00144     if ( client->isActive() )
00145     {
00146         if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 )
00147         {
00148             return;
00149         }
00150 
00151         if ( rc == MQTTSN_RC_ACCEPTED)
00152         {
00153             if ( !_gateway->getAdapterManager()->getAggregater()->isActive() )
00154             {
00155                 MQTTGWPacket* pubAck = new MQTTGWPacket();
00156                 pubAck->setAck(PUBACK, msgId);
00157                 Event* ev1 = new Event();
00158                 ev1->setBrokerSendEvent(client, pubAck);
00159                 _gateway->getBrokerSendQue()->post(ev1);
00160             }
00161         }
00162         else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID)
00163         {
00164             WRITELOG("  PUBACK   %d : Invalid Topic ID\n", msgId);
00165         }
00166     }
00167 }
00168 
00169 void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8_t packetType)
00170 {
00171     uint16_t msgId;
00172 
00173     if ( client->isActive() )
00174     {
00175         if ( packet->getACK(&msgId) == 0 )
00176         {
00177             return;
00178         }
00179         MQTTGWPacket* ackPacket = new MQTTGWPacket();
00180         ackPacket->setAck(packetType, msgId);
00181         Event* ev1 = new Event();
00182         ev1->setBrokerSendEvent(client, ackPacket);
00183         _gateway->getBrokerSendQue()->post(ev1);
00184     }
00185 }
00186 
00187 void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
00188 {
00189     uint16_t id;
00190     uint16_t msgId;
00191     MQTTSNString topicName  = MQTTSNString_initializer;;
00192     MQTTSN_topicid topicid;
00193 
00194     if ( client->isActive() || client->isAwake())
00195     {
00196         if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 )
00197         {
00198             return;
00199         }
00200 
00201         topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
00202         topicid.data.long_.len = topicName.lenstring.len;
00203         topicid.data.long_.name = topicName.lenstring.data;
00204 
00205         id = client->getTopics()->add(&topicid)->getTopicId();
00206 
00207         MQTTSNPacket* regAck = new MQTTSNPacket();
00208         regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED);
00209         Event* ev = new Event();
00210         ev->setClientSendEvent(client, regAck);
00211         _gateway->getClientSendQue()->post(ev);
00212     }
00213 }
00214 
00215 void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
00216 {
00217     uint16_t id;
00218     uint16_t msgId;
00219     uint8_t rc;
00220     if ( client->isActive() || client->isAwake())
00221     {
00222         if ( packet->getREGACK(&id, &msgId, &rc) == 0 )
00223         {
00224             return;
00225         }
00226 
00227         MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId);
00228 
00229         if ( regAck != nullptr )
00230         {
00231             client->getWaitREGACKPacketList()->erase(msgId);
00232             Event* ev = new Event();
00233             ev->setClientSendEvent(client, regAck);
00234             _gateway->getClientSendQue()->post(ev);
00235         }
00236         if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 )
00237         {
00238             /* send PINGREQ to the broker */
00239            client->resetPingRequest();
00240            MQTTGWPacket* pingreq = new MQTTGWPacket();
00241            pingreq->setHeader(PINGREQ);
00242            Event* evt = new Event();
00243            evt->setBrokerSendEvent(client, pingreq);
00244            _gateway->getBrokerSendQue()->post(evt);
00245         }
00246     }
00247 
00248 }
00249 
00250 
00251 
00252 
00253 void MQTTSNPublishHandler::handleAggregatePublish(Client* client, MQTTSNPacket* packet)
00254 {
00255     int msgId = 0;
00256     MQTTGWPacket* publish = handlePublish(client, packet);
00257     if ( publish != nullptr )
00258     {
00259         if ( publish->getMsgId() > 0 )
00260         {
00261             if ( packet->isDuplicate() )
00262             {
00263                 msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId());
00264             }
00265             else
00266             {
00267                 msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId());
00268             }
00269             publish->setMsgId(msgId);
00270         }
00271         Event* ev1 = new Event();
00272         ev1->setBrokerSendEvent(client, publish);
00273         _gateway->getBrokerSendQue()->post(ev1);
00274     }
00275 }
00276 
00277 void MQTTSNPublishHandler::handleAggregateAck(Client* client, MQTTSNPacket* packet, int type)
00278 {
00279     if ( type == MQTTSN_PUBREC )
00280     {
00281         uint16_t msgId;
00282 
00283         if ( packet->getACK(&msgId) == 0 )
00284         {
00285             return;
00286         }
00287         MQTTSNPacket* ackPacket = new MQTTSNPacket();
00288         ackPacket->setPUBREL(msgId);
00289         Event* ev = new Event();
00290         ev->setClientSendEvent(client, ackPacket);
00291         _gateway->getClientSendQue()->post(ev);
00292     }
00293 }