Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTGWPublishHandler.cpp Source File

MQTTGWPublishHandler.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  *    Tieto Poland Sp. z o.o. - Gateway improvements
00016  **************************************************************************************/
00017 
00018 #include "MQTTGWPublishHandler.h"
00019 #include "MQTTSNGateway.h"
00020 #include "MQTTSNPacket.h"
00021 #include <string>
00022 
00023 using namespace std;
00024 using namespace MQTTSNGW;
00025 
00026 char* currentDateTime(void);
00027 
00028 MQTTGWPublishHandler::MQTTGWPublishHandler(Gateway* gateway)
00029 {
00030     _gateway = gateway;
00031 }
00032 
00033 MQTTGWPublishHandler::~MQTTGWPublishHandler()
00034 {
00035 
00036 }
00037 
00038 void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
00039 {
00040     if ( !client->isActive() && !client->isSleep() && !client->isAwake())
00041     {
00042         WRITELOG("%s     The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER);
00043         return;
00044     }
00045 
00046     /* client is sleeping. save PUBLISH */
00047     if ( client->isSleep() )
00048     {
00049         Publish pub;
00050         packet->getPUBLISH(&pub);
00051 
00052         WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
00053         RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
00054 
00055         if (pub.header.bits.qos == 1)
00056         {
00057             replyACK(client, &pub, PUBACK);
00058         }
00059         else if ( pub.header.bits.qos == 2)
00060         {
00061             replyACK(client, &pub, PUBREC);
00062         }
00063 
00064         MQTTGWPacket* msg = new MQTTGWPacket();
00065         *msg = *packet;
00066         if ( msg->getType() == 0 )
00067         {
00068             WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
00069             delete msg;
00070             return;
00071         }
00072         client->setClientSleepPacket(msg);
00073         return;
00074     }
00075 
00076     Publish pub;
00077     packet->getPUBLISH(&pub);
00078 
00079     MQTTSNPacket* snPacket = new MQTTSNPacket();
00080 
00081     /* create MQTTSN_topicid */
00082     MQTTSN_topicid topicId;
00083     uint16_t id = 0;
00084 
00085     if (pub.topiclen <= 2)
00086     {
00087         topicId.type = MQTTSN_TOPIC_TYPE_SHORT;
00088         *(topicId.data.short_name) = *pub.topic;
00089         *(topicId.data.short_name + 1) = *(pub.topic + 1);
00090     }
00091     else
00092     {
00093         topicId.data.long_.len = pub.topiclen;
00094         topicId.data.long_.name = pub.topic;
00095         Topic* tp = client->getTopics()->getTopicByName(&topicId);
00096 
00097         if ( tp )
00098         {
00099             topicId.type = tp->getType();
00100             topicId.data.long_.len = pub.topiclen;
00101             topicId.data.long_.name = pub.topic;
00102             topicId.data.id = tp->getTopicId();
00103         }
00104         else
00105         {
00106             /* This message might be subscribed with wild card. */
00107             topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
00108             Topic* topic = client->getTopics()->match(&topicId);
00109             if (topic == nullptr)
00110             {
00111                 WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
00112                 if (pub.header.bits.qos == 1)
00113                 {
00114                     replyACK(client, &pub, PUBACK);
00115                 }
00116                 else if ( pub.header.bits.qos == 2 )
00117                 {
00118                     replyACK(client, &pub, PUBREC);
00119                 }
00120 
00121                 delete snPacket;
00122                 return;
00123             }
00124 
00125             /* add the Topic and get a TopicId */
00126             topic = client->getTopics()->add(&topicId);
00127             id = topic->getTopicId();
00128 
00129             if (id > 0)
00130             {
00131                 /* create REGISTER */
00132                 MQTTSNPacket* regPacket = new MQTTSNPacket();
00133 
00134                 MQTTSNString topicName = MQTTSNString_initializer;
00135                 topicName.lenstring.len = topicId.data.long_.len;
00136                 topicName.lenstring.data = topicId.data.long_.name;
00137 
00138                 uint16_t regackMsgId = client->getNextSnMsgId();
00139                 regPacket->setREGISTER(id, regackMsgId, &topicName);
00140 
00141                 /* send REGISTER */
00142                 Event* evrg = new Event();
00143                 evrg->setClientSendEvent(client, regPacket);
00144                 _gateway->getClientSendQue()->post(evrg);
00145 
00146                 /* send PUBLISH */
00147                 topicId.data.id = id;
00148                 snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
00149                         (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
00150                         pub.payloadlen);
00151                 client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
00152                 return;
00153             }
00154             else
00155             {
00156                 WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
00157                 delete snPacket;
00158                 return;
00159             }
00160         }
00161     }
00162 
00163     snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain,
00164             (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen);
00165     Event* ev1 = new Event();
00166     ev1->setClientSendEvent(client, snPacket);
00167     _gateway->getClientSendQue()->post(ev1);
00168 
00169 }
00170 
00171 void MQTTGWPublishHandler::replyACK(Client* client, Publish* pub, int type)
00172 {
00173     MQTTGWPacket* pubAck = new MQTTGWPacket();
00174     pubAck->setAck(type, (uint16_t)pub->msgId);
00175     Event* ev1 = new Event();
00176     ev1->setBrokerSendEvent(client, pubAck);
00177     _gateway->getBrokerSendQue()->post(ev1);
00178 }
00179 
00180 void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet)
00181 {
00182     Ack ack;
00183     packet->getAck(&ack);
00184     TopicIdMapElement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
00185     if (topicId)
00186     {
00187         MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
00188         mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0);
00189 
00190         client->eraseWaitedPubTopicId((uint16_t)ack.msgId);
00191         Event* ev1 = new Event();
00192         ev1->setClientSendEvent(client, mqttsnPacket);
00193         _gateway->getClientSendQue()->post(ev1);
00194         return;
00195     }
00196     WRITELOG(" PUBACK from the Broker is invalid. PacketID : %04X  ClientID : %s \n", (uint16_t)ack.msgId, client->getClientId());
00197 }
00198 
00199 void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int type)
00200 {
00201     Ack ack;
00202     packet->getAck(&ack);
00203 
00204     if ( client->isActive() || client->isAwake() )
00205     {
00206         MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
00207         if (type == PUBREC)
00208         {
00209             mqttsnPacket->setPUBREC((uint16_t) ack.msgId);
00210         }
00211         else if (type == PUBREL)
00212         {
00213             mqttsnPacket->setPUBREL((uint16_t) ack.msgId);
00214         }
00215         else if (type == PUBCOMP)
00216         {
00217             mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId);
00218         }
00219 
00220         Event* ev1 = new Event();
00221         ev1->setClientSendEvent(client, mqttsnPacket);
00222         _gateway->getClientSendQue()->post(ev1);
00223     }
00224     else if ( client->isSleep() )
00225     {
00226         if (type == PUBREL)
00227         {
00228             MQTTGWPacket* pubComp = new MQTTGWPacket();
00229             pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId);
00230             Event* ev1 = new Event();
00231             ev1->setBrokerSendEvent(client, pubComp);
00232             _gateway->getBrokerSendQue()->post(ev1);
00233         }
00234     }
00235 }
00236 
00237 
00238 
00239 void MQTTGWPublishHandler::handleAggregatePuback(Client* client, MQTTGWPacket* packet)
00240 {
00241     uint16_t msgId = packet->getMsgId();
00242     uint16_t clientMsgId = 0;
00243     Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId);
00244     if ( newClient != nullptr )
00245     {
00246         packet->setMsgId((int)clientMsgId);
00247         handlePuback(newClient, packet);
00248     }
00249 }
00250 
00251 void MQTTGWPublishHandler::handleAggregateAck(Client* client, MQTTGWPacket* packet, int type)
00252 {
00253     uint16_t msgId = packet->getMsgId();
00254     uint16_t clientMsgId = 0;
00255     Client* newClient = _gateway->getAdapterManager()->convertClient(msgId, &clientMsgId);
00256     if ( newClient != nullptr )
00257     {
00258         packet->setMsgId((int)clientMsgId);
00259         handleAck(newClient, packet,type);
00260     }
00261 }
00262 
00263 void MQTTGWPublishHandler::handleAggregatePubrel(Client* client, MQTTGWPacket* packet)
00264 {
00265     Publish pub;
00266     packet->getPUBLISH(&pub);
00267     replyACK(client, &pub, PUBCOMP);
00268 }
00269 
00270 void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* packet)
00271 {
00272     Publish pub;
00273     packet->getPUBLISH(&pub);
00274 
00275     WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
00276     RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
00277 
00278     if (pub.header.bits.qos == 1)
00279     {
00280         replyACK(client, &pub, PUBACK);
00281     }
00282     else if ( pub.header.bits.qos == 2)
00283     {
00284         replyACK(client, &pub, PUBREC);
00285     }
00286 
00287 
00288 
00289     string* topicName = new string(pub.topic, pub.topiclen);
00290     Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
00291     AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic);
00292     if ( list != nullptr )
00293     {
00294         ClientTopicElement* p = list->getFirstElement();
00295 
00296         while ( p )
00297         {
00298             Client* devClient = p->getClient();
00299             if ( devClient != nullptr )
00300             {
00301                 MQTTGWPacket* msg = new MQTTGWPacket();
00302                 *msg = *packet;
00303                 if ( msg->getType() == 0 )
00304                 {
00305                     WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
00306                     delete msg;
00307                     break;
00308                 }
00309                 Event* ev = new Event();
00310                 ev->setBrokerRecvEvent(devClient, msg);
00311                 _gateway->getPacketEventQue()->post(ev);
00312             }
00313             else
00314             {
00315                 break;
00316             }
00317 
00318             p = list->getNextElement(p);
00319         }
00320         delete list;
00321     }
00322 }
00323