Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWBrokerRecvTask.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include "MQTTSNGWBrokerRecvTask.h" 00018 #include "MQTTSNGWClient.h" 00019 #include "MQTTSNGWClientList.h" 00020 #include <unistd.h> 00021 00022 using namespace std; 00023 using namespace MQTTSNGW; 00024 00025 char* currentDateTime(void); 00026 00027 /*===================================== 00028 Class BrokerRecvTask 00029 =====================================*/ 00030 BrokerRecvTask::BrokerRecvTask(Gateway* gateway) 00031 { 00032 _gateway = gateway; 00033 _gateway->attach((Thread*)this); 00034 _light = nullptr; 00035 } 00036 00037 BrokerRecvTask::~BrokerRecvTask() 00038 { 00039 00040 } 00041 00042 /** 00043 * Initialize attributs of this class 00044 */ 00045 void BrokerRecvTask::initialize(int argc, char** argv) 00046 { 00047 _light = _gateway->getLightIndicator(); 00048 } 00049 00050 /** 00051 * receive a MQTT messge from the broker and post a event. 00052 */ 00053 void BrokerRecvTask::run(void) 00054 { 00055 struct timeval timeout; 00056 MQTTGWPacket* packet = nullptr; 00057 int rc; 00058 Event* ev = nullptr; 00059 fd_set rset; 00060 fd_set wset; 00061 00062 while (true) 00063 { 00064 _light->blueLight(false); 00065 if (CHK_SIGINT) 00066 { 00067 WRITELOG("%s BrokerRecvTask stopped.\n", currentDateTime()); 00068 return; 00069 } 00070 timeout.tv_sec = 0; 00071 timeout.tv_usec = 500000; // 500 msec 00072 FD_ZERO(&rset); 00073 FD_ZERO(&wset); 00074 int maxSock = 0; 00075 int sockfd = 0; 00076 00077 /* Prepare sockets list to read */ 00078 Client* client = _gateway->getClientList()->getClient(0); 00079 00080 while ( client ) 00081 { 00082 if (client->getNetwork()->isValid()) 00083 { 00084 sockfd = client->getNetwork()->getSock(); 00085 FD_SET(sockfd, &rset); 00086 FD_SET(sockfd, &wset); 00087 if (sockfd > maxSock) 00088 { 00089 maxSock = sockfd; 00090 } 00091 } 00092 client = client->getNextClient(); 00093 } 00094 00095 if (maxSock == 0) 00096 { 00097 usleep(500 * 1000); 00098 } 00099 else 00100 { 00101 /* Check sockets is ready to read */ 00102 int activity = select(maxSock + 1, &rset, 0, 0, &timeout); 00103 if (activity > 0) 00104 { 00105 client = _gateway->getClientList()->getClient(0); 00106 00107 while ( client ) 00108 { 00109 _light->blueLight(false); 00110 if (client->getNetwork()->isValid()) 00111 { 00112 int sockfd = client->getNetwork()->getSock(); 00113 if (FD_ISSET(sockfd, &rset)) 00114 { 00115 packet = new MQTTGWPacket(); 00116 rc = 0; 00117 /* read sockets */ 00118 _light->blueLight(true); 00119 rc = packet->recv(client->getNetwork()); 00120 if ( rc > 0 ) 00121 { 00122 if ( log(client, packet) == -1 ) 00123 { 00124 delete packet; 00125 goto nextClient; 00126 } 00127 00128 /* post a BrokerRecvEvent */ 00129 ev = new Event(); 00130 ev->setBrokerRecvEvent(client, packet); 00131 _gateway->getPacketEventQue()->post(ev); 00132 } 00133 else 00134 { 00135 if ( rc == 0 ) // Disconnected 00136 { 00137 client->getNetwork()->close(); 00138 delete packet; 00139 00140 /* delete client when the client is not authorized & session is clean */ 00141 _gateway->getClientList()->erase(client); 00142 00143 if ( client ) 00144 { 00145 client = client->getNextClient(); 00146 } 00147 continue; 00148 } 00149 else if (rc == -1) 00150 { 00151 WRITELOG("%s BrokerRecvTask can't receive a packet from the broker errno=%d %s%s\n", ERRMSG_HEADER, errno, client->getClientId(), ERRMSG_FOOTER); 00152 } 00153 else if ( rc == -2 ) 00154 { 00155 WRITELOG("%s BrokerRecvTask receive invalid length of packet from the broker. DISCONNECT %s %s\n", ERRMSG_HEADER, client->getClientId(),ERRMSG_FOOTER); 00156 } 00157 else if ( rc == -3 ) 00158 { 00159 WRITELOG("%s BrokerRecvTask can't get memories for the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); 00160 } 00161 00162 delete packet; 00163 00164 if ( (rc == -1 || rc == -2) && client->isActive() ) 00165 { 00166 /* disconnect the client */ 00167 packet = new MQTTGWPacket(); 00168 packet->setHeader(DISCONNECT); 00169 ev = new Event(); 00170 ev->setBrokerRecvEvent(client, packet); 00171 _gateway->getPacketEventQue()->post(ev); 00172 } 00173 } 00174 } 00175 } 00176 nextClient: 00177 client = client->getNextClient(); 00178 } 00179 } 00180 } 00181 } 00182 } 00183 00184 /** 00185 * write message content into stdout or Ringbuffer 00186 */ 00187 int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet) 00188 { 00189 char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3]; 00190 char msgId[6]; 00191 int rc = 0; 00192 00193 switch (packet->getType()) 00194 { 00195 case CONNACK: 00196 WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROWB, client->getClientId(), packet->print(pbuf)); 00197 break; 00198 case PUBLISH: 00199 WRITELOG(FORMAT_W_MSGID_Y_W_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf)); 00200 break; 00201 case PUBACK: 00202 case PUBREC: 00203 case PUBREL: 00204 case PUBCOMP: 00205 WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf)); 00206 break; 00207 case SUBACK: 00208 case UNSUBACK: 00209 WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROWB, client->getClientId(), packet->print(pbuf)); 00210 break; 00211 case PINGRESP: 00212 WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROWB, client->getClientId(), packet->print(pbuf)); 00213 break; 00214 default: 00215 WRITELOG("Type=%x\n", packet->getType()); 00216 rc = -1; 00217 break; 00218 } 00219 return rc; 00220 }
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2