Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWPacketHandleTask.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 * Tieto Poland Sp. z o.o. - Gateway improvements 00016 **************************************************************************************/ 00017 00018 #include "MQTTSNGWDefines.h" 00019 #include "MQTTSNGWPacketHandleTask.h" 00020 #include "MQTTSNGWProcess.h" 00021 #include "MQTTGWPacket.h" 00022 #include "MQTTSNGWClient.h" 00023 #include "MQTTSNGWProcess.h" 00024 #include "MQTTSNGWAdapterManager.h" 00025 #include "MQTTGWConnectionHandler.h" 00026 #include "MQTTGWPublishHandler.h" 00027 #include "MQTTGWSubscribeHandler.h" 00028 #include "MQTTSNGWConnectionHandler.h" 00029 #include "MQTTSNGWPublishHandler.h" 00030 #include "MQTTSNGWSubscribeHandler.h" 00031 #include "Timer.h" 00032 #include "MQTTSNAggregateConnectionHandler.h" 00033 00034 #include <string.h> 00035 00036 using namespace std; 00037 using namespace MQTTSNGW; 00038 00039 #define EVENT_QUE_TIME_OUT 2000 // 2000 msecs 00040 char* currentDateTime(void); 00041 /*===================================== 00042 Class PacketHandleTask 00043 =====================================*/ 00044 00045 PacketHandleTask::PacketHandleTask(Gateway* gateway) 00046 { 00047 _gateway = gateway; 00048 _gateway->attach((Thread*)this); 00049 _mqttConnection = new MQTTGWConnectionHandler(_gateway); 00050 _mqttPublish = new MQTTGWPublishHandler(_gateway); 00051 _mqttSubscribe = new MQTTGWSubscribeHandler(_gateway); 00052 _mqttsnConnection = new MQTTSNConnectionHandler(_gateway); 00053 _mqttsnPublish = new MQTTSNPublishHandler(_gateway); 00054 _mqttsnSubscribe = new MQTTSNSubscribeHandler(_gateway); 00055 00056 _mqttsnAggrConnection = new MQTTSNAggregateConnectionHandler(_gateway); 00057 } 00058 00059 /** 00060 * Destructor is called by Gateway's destructor indirectly. 00061 */ 00062 PacketHandleTask::~PacketHandleTask() 00063 { 00064 if ( _mqttConnection ) 00065 { 00066 delete _mqttConnection; 00067 } 00068 if ( _mqttPublish ) 00069 { 00070 delete _mqttPublish; 00071 } 00072 if ( _mqttSubscribe ) 00073 { 00074 delete _mqttSubscribe; 00075 } 00076 if ( _mqttsnConnection ) 00077 { 00078 delete _mqttsnConnection; 00079 } 00080 if ( _mqttsnPublish ) 00081 { 00082 delete _mqttsnPublish; 00083 } 00084 if ( _mqttsnSubscribe ) 00085 { 00086 delete _mqttsnSubscribe; 00087 } 00088 00089 if ( _mqttsnAggrConnection ) 00090 { 00091 delete _mqttsnAggrConnection; 00092 } 00093 } 00094 00095 void PacketHandleTask::run() 00096 { 00097 Event* ev = nullptr; 00098 EventQue* eventQue = _gateway->getPacketEventQue(); 00099 AdapterManager* adpMgr = _gateway->getAdapterManager(); 00100 00101 Client* client = nullptr; 00102 MQTTSNPacket* snPacket = nullptr; 00103 MQTTGWPacket* brPacket = nullptr; 00104 char msgId[6]; 00105 memset(msgId, 0, 6); 00106 00107 _advertiseTimer.start(_gateway->getGWParams()->keepAlive * 1000UL); 00108 00109 while (true) 00110 { 00111 /* wait Event */ 00112 ev = eventQue->timedwait(EVENT_QUE_TIME_OUT); 00113 00114 if (ev->getEventType() == EtStop) 00115 { 00116 WRITELOG("%s PacketHandleTask stopped.\n", currentDateTime()); 00117 delete ev; 00118 return; 00119 } 00120 00121 if (ev->getEventType() == EtTimeout) 00122 { 00123 /*------ Check Keep Alive Timer & send Advertise ------*/ 00124 if (_advertiseTimer.isTimeup()) 00125 { 00126 _mqttsnConnection->sendADVERTISE(); 00127 _advertiseTimer.start(_gateway->getGWParams()->keepAlive * 1000UL); 00128 } 00129 00130 /*------ Check Adapters Connect or PINGREQ ------*/ 00131 adpMgr->checkConnection(); 00132 } 00133 00134 /*------ Handle SEARCHGW Message ---------*/ 00135 else if (ev->getEventType() == EtBroadcast) 00136 { 00137 snPacket = ev->getMQTTSNPacket(); 00138 _mqttsnConnection->handleSearchgw(snPacket); 00139 } 00140 00141 /*------ Handle Messages form Clients ---------*/ 00142 else if (ev->getEventType() == EtClientRecv) 00143 { 00144 client = ev->getClient(); 00145 snPacket = ev->getMQTTSNPacket(); 00146 00147 DEBUGLOG(" PacketHandleTask gets %s %s from the client.\n", snPacket->getName(), snPacket->getMsgId(msgId)); 00148 00149 if ( adpMgr->isAggregatedClient(client) ) 00150 { 00151 aggregatePacketHandler(client, snPacket); 00152 } 00153 else 00154 { 00155 transparentPacketHandler(client, snPacket); 00156 } 00157 00158 00159 /* Reset the Timer for PINGREQ. */ 00160 client->updateStatus(snPacket); 00161 } 00162 /*------ Handle Messages form Broker ---------*/ 00163 else if ( ev->getEventType() == EtBrokerRecv ) 00164 { 00165 client = ev->getClient(); 00166 brPacket = ev->getMQTTGWPacket(); 00167 DEBUGLOG(" PacketHandleTask gets %s %s from the broker.\n", brPacket->getName(), brPacket->getMsgId(msgId)); 00168 00169 00170 if ( client->isAggregater() ) 00171 { 00172 aggregatePacketHandler(client, brPacket); 00173 } 00174 else 00175 { 00176 transparentPacketHandler(client, brPacket); 00177 } 00178 } 00179 delete ev; 00180 } 00181 } 00182 00183 00184 00185 void PacketHandleTask::aggregatePacketHandler(Client*client, MQTTSNPacket* packet) 00186 { 00187 switch (packet->getType()) 00188 { 00189 case MQTTSN_CONNECT: 00190 _mqttsnAggrConnection->handleConnect(client, packet); 00191 break; 00192 case MQTTSN_WILLTOPIC: 00193 _mqttsnConnection->handleWilltopic(client, packet); 00194 break; 00195 case MQTTSN_WILLMSG: 00196 _mqttsnAggrConnection->handleWillmsg(client, packet); 00197 break; 00198 case MQTTSN_DISCONNECT: 00199 _mqttsnAggrConnection->handleDisconnect(client, packet); 00200 break; 00201 case MQTTSN_WILLMSGUPD: 00202 _mqttsnConnection->handleWillmsgupd(client, packet); 00203 break; 00204 case MQTTSN_PINGREQ: 00205 _mqttsnAggrConnection->handlePingreq(client, packet); 00206 break; 00207 case MQTTSN_PUBLISH: 00208 _mqttsnPublish->handleAggregatePublish(client, packet); 00209 break; 00210 case MQTTSN_PUBACK: 00211 _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBACK); 00212 break; 00213 case MQTTSN_PUBREC: 00214 _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBREC); 00215 break; 00216 case MQTTSN_PUBREL: 00217 _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBREL); 00218 break; 00219 case MQTTSN_PUBCOMP: 00220 _mqttsnPublish->handleAggregateAck(client, packet, MQTTSN_PUBCOMP); 00221 break; 00222 case MQTTSN_REGISTER: 00223 _mqttsnPublish->handleRegister(client, packet); 00224 break; 00225 case MQTTSN_REGACK: 00226 _mqttsnPublish->handleRegAck(client, packet); 00227 break; 00228 case MQTTSN_SUBSCRIBE: 00229 _mqttsnSubscribe->handleAggregateSubscribe(client, packet); 00230 break; 00231 case MQTTSN_UNSUBSCRIBE: 00232 _mqttsnSubscribe->handleAggregateUnsubscribe(client, packet); 00233 break; 00234 default: 00235 break; 00236 } 00237 } 00238 00239 00240 void PacketHandleTask::aggregatePacketHandler(Client*client, MQTTGWPacket* packet) 00241 { 00242 switch (packet->getType()) 00243 { 00244 case CONNACK: 00245 _mqttConnection->handleConnack(client, packet); 00246 break; 00247 case PINGRESP: 00248 _mqttConnection->handlePingresp(client, packet); 00249 break; 00250 case PUBLISH: 00251 _mqttPublish->handleAggregatePublish(client, packet); 00252 break; 00253 case PUBACK: 00254 _mqttPublish->handleAggregatePuback(client, packet); 00255 break; 00256 case PUBREC: 00257 _mqttPublish->handleAggregateAck(client, packet, PUBREC); 00258 break; 00259 case PUBREL: 00260 _mqttPublish->handleAggregatePubrel(client, packet); 00261 break; 00262 case PUBCOMP: 00263 _mqttPublish->handleAggregateAck(client, packet, PUBCOMP); 00264 break; 00265 case SUBACK: 00266 _mqttSubscribe->handleAggregateSuback(client, packet); 00267 break; 00268 case UNSUBACK: 00269 _mqttSubscribe->handleAggregateUnsuback(client, packet); 00270 break; 00271 default: 00272 break; 00273 } 00274 } 00275 00276 void PacketHandleTask::transparentPacketHandler(Client*client, MQTTSNPacket* packet) 00277 { 00278 switch (packet->getType()) 00279 { 00280 case MQTTSN_CONNECT: 00281 _mqttsnConnection->handleConnect(client, packet); 00282 break; 00283 case MQTTSN_WILLTOPIC: 00284 _mqttsnConnection->handleWilltopic(client, packet); 00285 break; 00286 case MQTTSN_WILLMSG: 00287 _mqttsnConnection->handleWillmsg(client, packet); 00288 break; 00289 case MQTTSN_DISCONNECT: 00290 _mqttsnConnection->handleDisconnect(client, packet); 00291 break; 00292 case MQTTSN_WILLMSGUPD: 00293 _mqttsnConnection->handleWillmsgupd(client, packet); 00294 break; 00295 case MQTTSN_PINGREQ: 00296 _mqttsnConnection->handlePingreq(client, packet); 00297 break; 00298 case MQTTSN_PUBLISH: 00299 _mqttsnPublish->handlePublish(client, packet); 00300 break; 00301 case MQTTSN_PUBACK: 00302 _mqttsnPublish->handlePuback(client, packet); 00303 break; 00304 case MQTTSN_PUBREC: 00305 _mqttsnPublish->handleAck(client, packet, PUBREC); 00306 break; 00307 case MQTTSN_PUBREL: 00308 _mqttsnPublish->handleAck(client, packet, PUBREL); 00309 break; 00310 case MQTTSN_PUBCOMP: 00311 _mqttsnPublish->handleAck(client, packet, PUBCOMP); 00312 break; 00313 case MQTTSN_REGISTER: 00314 _mqttsnPublish->handleRegister(client, packet); 00315 break; 00316 case MQTTSN_REGACK: 00317 _mqttsnPublish->handleRegAck(client, packet); 00318 break; 00319 case MQTTSN_SUBSCRIBE: 00320 _mqttsnSubscribe->handleSubscribe(client, packet); 00321 break; 00322 case MQTTSN_UNSUBSCRIBE: 00323 _mqttsnSubscribe->handleUnsubscribe(client, packet); 00324 break; 00325 default: 00326 break; 00327 } 00328 } 00329 00330 00331 void PacketHandleTask::transparentPacketHandler(Client*client, MQTTGWPacket* packet) 00332 { 00333 switch (packet->getType()) 00334 { 00335 case CONNACK: 00336 _mqttConnection->handleConnack(client, packet); 00337 break; 00338 case PINGRESP: 00339 _mqttConnection->handlePingresp(client, packet); 00340 break; 00341 case PUBLISH: 00342 _mqttPublish->handlePublish(client, packet); 00343 break; 00344 case PUBACK: 00345 _mqttPublish->handlePuback(client, packet); 00346 break; 00347 case PUBREC: 00348 _mqttPublish->handleAck(client, packet, PUBREC); 00349 break; 00350 case PUBREL: 00351 _mqttPublish->handleAck(client, packet, PUBREL); 00352 break; 00353 case PUBCOMP: 00354 _mqttPublish->handleAck(client, packet, PUBCOMP); 00355 break; 00356 case SUBACK: 00357 _mqttSubscribe->handleSuback(client, packet); 00358 break; 00359 case UNSUBACK: 00360 _mqttSubscribe->handleUnsuback(client, packet); 00361 break; 00362 default: 00363 break; 00364 } 00365 } 00366
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2