Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWSubscribeHandler.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 * Tieto Poland Sp. z o.o. - Gateway improvements 00016 **************************************************************************************/ 00017 00018 #include "MQTTSNGWSubscribeHandler.h" 00019 #include "MQTTSNGWPacket.h" 00020 #include "MQTTGWPacket.h" 00021 #include "MQTTSNGateway.h" 00022 #include "MQTTSNGWClient.h" 00023 00024 using namespace std; 00025 using namespace MQTTSNGW; 00026 00027 MQTTSNSubscribeHandler::MQTTSNSubscribeHandler(Gateway* gateway) 00028 { 00029 _gateway = gateway; 00030 } 00031 00032 MQTTSNSubscribeHandler::~MQTTSNSubscribeHandler() 00033 { 00034 00035 } 00036 00037 MQTTGWPacket* MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packet) 00038 { 00039 uint8_t dup; 00040 int qos; 00041 uint16_t msgId; 00042 MQTTSN_topicid topicFilter; 00043 Topic* topic = nullptr; 00044 uint16_t topicId = 0; 00045 MQTTGWPacket* subscribe; 00046 Event* ev1; 00047 Event* evsuback; 00048 00049 if ( packet->getSUBSCRIBE(&dup, &qos, &msgId, &topicFilter) == 0 ) 00050 { 00051 return nullptr; 00052 } 00053 00054 if ( msgId == 0 ) 00055 { 00056 return nullptr; 00057 } 00058 00059 if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED ) 00060 { 00061 topic = client->getTopics()->getTopicById(&topicFilter); 00062 00063 00064 if ( topic ) 00065 { 00066 topicId = topic->getTopicId(); 00067 subscribe = new MQTTGWPacket(); 00068 subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); 00069 } 00070 else 00071 { 00072 topic = _gateway->getTopics()->getTopicById(&topicFilter); 00073 if ( !topic ) 00074 { 00075 topic = client->getTopics()->add(topic->getTopicName()->c_str(), topic->getTopicId()); 00076 } 00077 else 00078 { 00079 goto RespExit; 00080 } 00081 } 00082 } 00083 else if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL) 00084 { 00085 topic = client->getTopics()->getTopicByName(&topicFilter); 00086 if ( topic == nullptr ) 00087 { 00088 topic = client->getTopics()->add(&topicFilter); 00089 if ( topic == nullptr ) 00090 { 00091 WRITELOG("%s Client(%s) can't add the Topic.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); 00092 return nullptr; 00093 } 00094 } 00095 topicId = topic->getTopicId(); 00096 subscribe = new MQTTGWPacket(); 00097 00098 subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); 00099 } 00100 else //MQTTSN_TOPIC_TYPE_SHORT 00101 { 00102 char topicstr[3]; 00103 topicstr[0] = topicFilter.data.short_name[0]; 00104 topicstr[1] = topicFilter.data.short_name[1]; 00105 topicstr[2] = 0; 00106 topicId = 0; 00107 subscribe = new MQTTGWPacket(); 00108 subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId); 00109 } 00110 00111 client->setWaitedSubTopicId(msgId, topicId, topicFilter.type); 00112 00113 if ( !client->isAggregated() ) 00114 { 00115 ev1 = new Event(); 00116 ev1->setBrokerSendEvent(client, subscribe); 00117 _gateway->getBrokerSendQue()->post(ev1); 00118 return nullptr; 00119 } 00120 else 00121 { 00122 return subscribe; 00123 } 00124 00125 00126 RespExit: 00127 MQTTSNPacket* sSuback = new MQTTSNPacket(); 00128 sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_NOT_SUPPORTED); 00129 evsuback = new Event(); 00130 evsuback->setClientSendEvent(client, sSuback); 00131 _gateway->getClientSendQue()->post(evsuback); 00132 return nullptr; 00133 } 00134 00135 MQTTGWPacket* MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* packet) 00136 { 00137 uint16_t msgId; 00138 MQTTSN_topicid topicFilter; 00139 MQTTGWPacket* unsubscribe = nullptr; 00140 00141 if ( packet->getUNSUBSCRIBE(&msgId, &topicFilter) == 0 ) 00142 { 00143 return nullptr; 00144 } 00145 00146 if ( msgId == 0 ) 00147 { 00148 return nullptr; 00149 } 00150 00151 Topic* topic = client->getTopics()->getTopicById(&topicFilter); 00152 00153 if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) 00154 { 00155 char shortTopic[3]; 00156 shortTopic[0] = topicFilter.data.short_name[0]; 00157 shortTopic[1] = topicFilter.data.short_name[1]; 00158 shortTopic[2] = 0; 00159 unsubscribe = new MQTTGWPacket(); 00160 unsubscribe->setUNSUBSCRIBE(shortTopic, msgId); 00161 } 00162 else 00163 { 00164 if ( topic == nullptr ) 00165 { 00166 MQTTSNPacket* sUnsuback = new MQTTSNPacket(); 00167 sUnsuback->setUNSUBACK(msgId); 00168 Event* evsuback = new Event(); 00169 evsuback->setClientSendEvent(client, sUnsuback); 00170 _gateway->getClientSendQue()->post(evsuback); 00171 return nullptr; 00172 } 00173 else 00174 { 00175 unsubscribe = new MQTTGWPacket(); 00176 unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId); 00177 } 00178 } 00179 00180 if ( !client->isAggregated() ) 00181 { 00182 Event* ev1 = new Event(); 00183 ev1->setBrokerSendEvent(client, unsubscribe); 00184 _gateway->getBrokerSendQue()->post(ev1); 00185 return nullptr; 00186 } 00187 else 00188 { 00189 return unsubscribe; 00190 } 00191 } 00192 00193 void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPacket* packet) 00194 { 00195 MQTTGWPacket* subscribe = handleSubscribe(client, packet); 00196 00197 if ( subscribe != nullptr ) 00198 { 00199 UTF8String str = subscribe->getTopic(); 00200 string* topicName = new string(str.data, str.len); 00201 Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); 00202 _gateway->getAdapterManager()->addAggregateTopic(&topic, client); 00203 00204 int msgId = 0; 00205 if ( packet->isDuplicate() ) 00206 { 00207 msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId()); 00208 } 00209 else 00210 { 00211 msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId()); 00212 } 00213 00214 if ( msgId == 0 ) 00215 { 00216 WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); 00217 return; 00218 } 00219 WRITELOG("msgId=%d\n",msgId); 00220 subscribe->setMsgId(msgId); 00221 Event* ev = new Event(); 00222 ev->setBrokerSendEvent(client, subscribe); 00223 _gateway->getBrokerSendQue()->post(ev); 00224 } 00225 } 00226 00227 void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPacket* packet) 00228 { 00229 MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet); 00230 if ( unsubscribe != nullptr ) 00231 { 00232 UTF8String str = unsubscribe->getTopic(); 00233 string* topicName = new string(str.data, str.len); 00234 Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); 00235 _gateway->getAdapterManager()->removeAggregateTopic(&topic, client); 00236 00237 int msgId = 0; 00238 if ( packet->isDuplicate() ) 00239 { 00240 msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId()); 00241 } 00242 else 00243 { 00244 msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId()); 00245 } 00246 00247 if ( msgId == 0 ) 00248 { 00249 WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); 00250 return; 00251 } 00252 unsubscribe->setMsgId(msgId); 00253 Event* ev = new Event(); 00254 ev->setBrokerSendEvent(client, unsubscribe); 00255 _gateway->getBrokerSendQue()->post(ev); 00256 } 00257 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2