Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSNGWSubscribeHandler.cpp Source File

MQTTSNGWSubscribeHandler.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  *    Tieto Poland Sp. z o.o. - Gateway improvements
00016  **************************************************************************************/
00017 
00018 #include "MQTTSNGWSubscribeHandler.h"
00019 #include "MQTTSNGWPacket.h"
00020 #include "MQTTGWPacket.h"
00021 #include "MQTTSNGateway.h"
00022 #include "MQTTSNGWClient.h"
00023 
00024 using namespace std;
00025 using namespace MQTTSNGW;
00026 
00027 MQTTSNSubscribeHandler::MQTTSNSubscribeHandler(Gateway* gateway)
00028 {
00029     _gateway = gateway;
00030 }
00031 
00032 MQTTSNSubscribeHandler::~MQTTSNSubscribeHandler()
00033 {
00034 
00035 }
00036 
00037 MQTTGWPacket* MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packet)
00038 {
00039     uint8_t dup;
00040     int qos;
00041     uint16_t msgId;
00042     MQTTSN_topicid topicFilter;
00043     Topic* topic = nullptr;
00044     uint16_t topicId = 0;
00045     MQTTGWPacket* subscribe;
00046     Event* ev1;
00047     Event* evsuback;
00048 
00049     if ( packet->getSUBSCRIBE(&dup, &qos, &msgId, &topicFilter) == 0 )
00050     {
00051         return nullptr;
00052     }
00053 
00054     if ( msgId == 0 )
00055     {
00056         return nullptr;
00057     }
00058 
00059     if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED )
00060     {
00061         topic = client->getTopics()->getTopicById(&topicFilter);
00062 
00063 
00064         if ( topic )
00065         {
00066             topicId = topic->getTopicId();
00067             subscribe = new MQTTGWPacket();
00068             subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
00069         }
00070         else
00071         {
00072             topic = _gateway->getTopics()->getTopicById(&topicFilter);
00073             if ( !topic )
00074             {
00075                 topic = client->getTopics()->add(topic->getTopicName()->c_str(), topic->getTopicId());
00076             }
00077             else
00078             {
00079                 goto RespExit;
00080             }
00081         }
00082     }
00083     else if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
00084     {
00085         topic = client->getTopics()->getTopicByName(&topicFilter);
00086         if ( topic  == nullptr )
00087         {
00088             topic = client->getTopics()->add(&topicFilter);
00089             if ( topic == nullptr )
00090             {
00091                 WRITELOG("%s Client(%s) can't add the Topic.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
00092                 return nullptr;
00093             }
00094         }
00095         topicId = topic->getTopicId();
00096         subscribe = new MQTTGWPacket();
00097 
00098         subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
00099     }
00100     else  //MQTTSN_TOPIC_TYPE_SHORT
00101     {
00102         char topicstr[3];
00103         topicstr[0] = topicFilter.data.short_name[0];
00104         topicstr[1] = topicFilter.data.short_name[1];
00105         topicstr[2] = 0;
00106         topicId = 0;
00107         subscribe = new MQTTGWPacket();
00108         subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId);
00109     }
00110 
00111     client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
00112 
00113     if ( !client->isAggregated() )
00114     {
00115         ev1 = new Event();
00116         ev1->setBrokerSendEvent(client, subscribe);
00117         _gateway->getBrokerSendQue()->post(ev1);
00118         return nullptr;
00119     }
00120     else
00121     {
00122         return subscribe;
00123     }
00124 
00125 
00126 RespExit:
00127      MQTTSNPacket* sSuback = new MQTTSNPacket();
00128      sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_NOT_SUPPORTED);
00129      evsuback = new Event();
00130      evsuback->setClientSendEvent(client, sSuback);
00131      _gateway->getClientSendQue()->post(evsuback);
00132      return nullptr;
00133 }
00134 
00135 MQTTGWPacket* MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* packet)
00136 {
00137     uint16_t msgId;
00138     MQTTSN_topicid topicFilter;
00139     MQTTGWPacket* unsubscribe = nullptr;
00140 
00141     if ( packet->getUNSUBSCRIBE(&msgId, &topicFilter) == 0 )
00142     {
00143         return nullptr;
00144     }
00145 
00146     if ( msgId == 0 )
00147     {
00148         return nullptr;
00149     }
00150 
00151     Topic* topic = client->getTopics()->getTopicById(&topicFilter);
00152 
00153     if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
00154     {
00155         char shortTopic[3];
00156         shortTopic[0] = topicFilter.data.short_name[0];
00157         shortTopic[1] = topicFilter.data.short_name[1];
00158         shortTopic[2] = 0;
00159         unsubscribe = new MQTTGWPacket();
00160         unsubscribe->setUNSUBSCRIBE(shortTopic, msgId);
00161     }
00162     else
00163     {
00164         if ( topic == nullptr )
00165         {
00166             MQTTSNPacket* sUnsuback = new MQTTSNPacket();
00167             sUnsuback->setUNSUBACK(msgId);
00168             Event* evsuback = new Event();
00169             evsuback->setClientSendEvent(client, sUnsuback);
00170             _gateway->getClientSendQue()->post(evsuback);
00171             return nullptr;
00172         }
00173         else
00174         {
00175             unsubscribe = new MQTTGWPacket();
00176             unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId);
00177         }
00178     }
00179 
00180     if ( !client->isAggregated() )
00181     {
00182         Event* ev1 = new Event();
00183         ev1->setBrokerSendEvent(client, unsubscribe);
00184         _gateway->getBrokerSendQue()->post(ev1);
00185         return nullptr;
00186     }
00187     else
00188     {
00189         return unsubscribe;
00190     }
00191 }
00192 
00193 void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPacket* packet)
00194 {
00195     MQTTGWPacket* subscribe = handleSubscribe(client, packet);
00196 
00197     if ( subscribe != nullptr )
00198     {
00199         UTF8String str = subscribe->getTopic();
00200         string* topicName = new string(str.data, str.len);
00201         Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
00202          _gateway->getAdapterManager()->addAggregateTopic(&topic, client);
00203 
00204         int msgId = 0;
00205         if ( packet->isDuplicate() )
00206         {
00207             msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId());
00208         }
00209         else
00210         {
00211             msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId());
00212         }
00213 
00214         if ( msgId == 0 )
00215         {
00216             WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement  %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
00217             return;
00218         }
00219 WRITELOG("msgId=%d\n",msgId);
00220         subscribe->setMsgId(msgId);
00221         Event* ev = new Event();
00222         ev->setBrokerSendEvent(client, subscribe);
00223         _gateway->getBrokerSendQue()->post(ev);
00224     }
00225 }
00226 
00227 void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPacket* packet)
00228 {
00229     MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet);
00230     if ( unsubscribe != nullptr )
00231     {
00232         UTF8String str = unsubscribe->getTopic();
00233         string* topicName = new string(str.data, str.len);
00234         Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
00235         _gateway->getAdapterManager()->removeAggregateTopic(&topic, client);
00236 
00237         int msgId = 0;
00238         if ( packet->isDuplicate() )
00239         {
00240             msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId());
00241         }
00242         else
00243         {
00244             msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId());
00245         }
00246 
00247         if ( msgId == 0 )
00248         {
00249             WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement  %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
00250             return;
00251         }
00252         unsubscribe->setMsgId(msgId);
00253         Event* ev = new Event();
00254         ev->setBrokerSendEvent(client, unsubscribe);
00255         _gateway->getBrokerSendQue()->post(ev);
00256     }
00257 }