Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWClientRecvTask.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include "MQTTSNGWClientRecvTask.h" 00018 #include "MQTTSNPacket.h" 00019 #include "MQTTSNGWQoSm1Proxy.h" 00020 #include "MQTTSNGWEncapsulatedPacket.h" 00021 #include <cstring> 00022 00023 //#include "MQTTSNGWForwarder.h" 00024 00025 using namespace MQTTSNGW; 00026 char* currentDateTime(void); 00027 /*===================================== 00028 Class ClientRecvTask 00029 =====================================*/ 00030 ClientRecvTask::ClientRecvTask(Gateway* gateway) 00031 { 00032 _gateway = gateway; 00033 _gateway->attach((Thread*)this); 00034 _sensorNetwork = _gateway->getSensorNetwork(); 00035 } 00036 00037 ClientRecvTask::~ClientRecvTask() 00038 { 00039 00040 } 00041 00042 /** 00043 * Initialize SensorNetwork 00044 */ 00045 void ClientRecvTask::initialize(int argc, char** argv) 00046 { 00047 if ( _sensorNetwork->initialize() < 0 ) 00048 { 00049 throw Exception(" Can't open the sensor network.\n"); 00050 } 00051 } 00052 00053 /* 00054 * Receive a packet from clients via sensor netwwork 00055 * and generate a event to execute the packet handling procedure 00056 * of MQTTSNPacketHandlingTask. 00057 */ 00058 void ClientRecvTask::run() 00059 { 00060 Event* ev = nullptr; 00061 AdapterManager* adpMgr = _gateway->getAdapterManager(); 00062 QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy(); 00063 bool isAggrActive = adpMgr->isAggregaterActive(); 00064 ClientList* clientList = _gateway->getClientList(); 00065 EventQue* packetEventQue = _gateway->getPacketEventQue(); 00066 00067 char buf[128]; 00068 00069 while (true) 00070 { 00071 Client* client = nullptr; 00072 Forwarder* fwd = nullptr; 00073 WirelessNodeId nodeId; 00074 00075 MQTTSNPacket* packet = new MQTTSNPacket(); 00076 int packetLen = packet->recv(_sensorNetwork); 00077 00078 if (CHK_SIGINT) 00079 { 00080 WRITELOG("%s ClientRecvTask stopped.\n", currentDateTime()); 00081 delete packet; 00082 return; 00083 } 00084 00085 if (packetLen < 2 ) 00086 { 00087 delete packet; 00088 continue; 00089 } 00090 00091 if ( packet->getType() <= MQTTSN_ADVERTISE || packet->getType() == MQTTSN_GWINFO ) 00092 { 00093 delete packet; 00094 continue; 00095 } 00096 00097 if ( packet->getType() == MQTTSN_SEARCHGW ) 00098 { 00099 /* write log and post Event */ 00100 log(0, packet, 0); 00101 ev = new Event(); 00102 ev->setBrodcastEvent(packet); 00103 packetEventQue->post(ev); 00104 continue; 00105 } 00106 00107 00108 SensorNetAddress* senderAddr = _gateway->getSensorNetwork()->getSenderAddress(); 00109 00110 if ( packet->getType() == MQTTSN_ENCAPSULATED ) 00111 { 00112 fwd = _gateway->getAdapterManager()->getForwarderList()->getForwarder(senderAddr); 00113 00114 if ( fwd != nullptr ) 00115 { 00116 MQTTSNString fwdName = MQTTSNString_initializer; 00117 fwdName.cstring = const_cast<char *>( fwd->getName() ); 00118 log(0, packet, &fwdName); 00119 00120 /* get the packet from the encapsulation message */ 00121 MQTTSNGWEncapsulatedPacket encap; 00122 encap.desirialize(packet->getPacketData(), packet->getPacketLength()); 00123 nodeId.setId( encap.getWirelessNodeId() ); 00124 client = fwd->getClient(&nodeId); 00125 packet = encap.getMQTTSNPacket(); 00126 } 00127 } 00128 else 00129 { 00130 /* Check the client belonging to QoS-1Proxy ? */ 00131 00132 if ( qosm1Proxy->isActive() ) 00133 { 00134 const char* clientName = qosm1Proxy->getClientId(senderAddr); 00135 00136 if ( clientName ) 00137 { 00138 if ( !packet->isQoSMinusPUBLISH() ) 00139 { 00140 client = qosm1Proxy->getClient(); 00141 log(clientName, packet); 00142 WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER); 00143 delete packet; 00144 continue; 00145 } 00146 } 00147 } 00148 } 00149 00150 client = _gateway->getClientList()->getClient(senderAddr); 00151 00152 if ( client ) 00153 { 00154 /* write log and post Event */ 00155 log(client, packet, 0); 00156 ev = new Event(); 00157 ev->setClientRecvEvent(client,packet); 00158 packetEventQue->post(ev); 00159 } 00160 else 00161 { 00162 /* new client */ 00163 if (packet->getType() == MQTTSN_CONNECT) 00164 { 00165 MQTTSNPacket_connectData data; 00166 memset(&data, 0, sizeof(MQTTSNPacket_connectData)); 00167 if ( !packet->getCONNECT(&data) ) 00168 { 00169 log(0, packet, &data.clientID); 00170 WRITELOG("%s CONNECT message form %s is incorrect.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER); 00171 delete packet; 00172 continue; 00173 } 00174 00175 client = clientList->getClient(&data.clientID); 00176 00177 if ( fwd ) 00178 { 00179 if ( client == nullptr ) 00180 { 00181 /* create a new client */ 00182 client = clientList->createClient(0, &data.clientID, isAggrActive); 00183 } 00184 /* Add to af forwarded client list of forwarder. */ 00185 fwd->addClient(client, &nodeId); 00186 } 00187 else 00188 { 00189 if ( client ) 00190 { 00191 /* Client exists. Set SensorNet Address of it. */ 00192 client->setClientAddress(senderAddr); 00193 } 00194 else 00195 { 00196 /* create a new client */ 00197 client = clientList->createClient(senderAddr, &data.clientID, isAggrActive); 00198 } 00199 } 00200 00201 log(client, packet, &data.clientID); 00202 00203 if (!client) 00204 { 00205 WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER); 00206 delete packet; 00207 continue; 00208 } 00209 00210 /* post Client RecvEvent */ 00211 ev = new Event(); 00212 ev->setClientRecvEvent(client, packet); 00213 packetEventQue->post(ev); 00214 } 00215 else 00216 { 00217 log(client, packet, 0); 00218 if ( packet->getType() == MQTTSN_ENCAPSULATED ) 00219 { 00220 WRITELOG("%s Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER); 00221 } 00222 else 00223 { 00224 WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER); 00225 } 00226 delete packet; 00227 } 00228 } 00229 } 00230 } 00231 00232 void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id) 00233 { 00234 const char* clientId; 00235 char cstr[MAX_CLIENTID_LENGTH + 1]; 00236 00237 if ( id ) 00238 { 00239 if ( id->cstring ) 00240 { 00241 strncpy(cstr, id->cstring, strlen(id->cstring) ); 00242 clientId = cstr; 00243 } 00244 else 00245 { 00246 memset((void*)cstr, 0, id->lenstring.len + 1); 00247 strncpy(cstr, id->lenstring.data, id->lenstring.len ); 00248 clientId = cstr; 00249 } 00250 } 00251 else if ( client ) 00252 { 00253 clientId = client->getClientId(); 00254 } 00255 else 00256 { 00257 clientId = UNKNOWNCL; 00258 } 00259 00260 log(clientId, packet); 00261 } 00262 00263 void ClientRecvTask::log(const char* clientId, MQTTSNPacket* packet) 00264 { 00265 char pbuf[ SIZE_OF_LOG_PACKET * 3 + 1]; 00266 char msgId[6]; 00267 00268 switch (packet->getType()) 00269 { 00270 case MQTTSN_SEARCHGW: 00271 WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, CLIENT, packet->print(pbuf)); 00272 break; 00273 case MQTTSN_CONNECT: 00274 case MQTTSN_PINGREQ: 00275 WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf)); 00276 break; 00277 case MQTTSN_DISCONNECT: 00278 case MQTTSN_WILLTOPICUPD: 00279 case MQTTSN_WILLMSGUPD: 00280 case MQTTSN_WILLTOPIC: 00281 case MQTTSN_WILLMSG: 00282 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf)); 00283 break; 00284 case MQTTSN_PUBLISH: 00285 case MQTTSN_REGISTER: 00286 case MQTTSN_SUBSCRIBE: 00287 case MQTTSN_UNSUBSCRIBE: 00288 WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf)); 00289 break; 00290 case MQTTSN_REGACK: 00291 case MQTTSN_PUBACK: 00292 case MQTTSN_PUBREC: 00293 case MQTTSN_PUBREL: 00294 case MQTTSN_PUBCOMP: 00295 WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf)); 00296 break; 00297 case MQTTSN_ENCAPSULATED: 00298 WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf)); 00299 break; 00300 default: 00301 WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf)); 00302 break; 00303 } 00304 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2