Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWBrokerSendTask.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include <MQTTSNGWAdapterManager.h> 00018 #include "MQTTSNGWBrokerSendTask.h" 00019 #include "MQTTSNGWDefines.h" 00020 #include "MQTTSNGateway.h" 00021 #include "MQTTSNGWClient.h" 00022 #include "MQTTGWPacket.h" 00023 #include <string.h> 00024 00025 using namespace std; 00026 using namespace MQTTSNGW; 00027 00028 char* currentDateTime(); 00029 #define ERRMSG_FORMAT "\n%s \x1b[0m\x1b[31merror:\x1b[0m\x1b[37m Can't Xmit to the Broker. errno=%d\n" 00030 00031 /*===================================== 00032 Class BrokerSendTask 00033 =====================================*/ 00034 BrokerSendTask::BrokerSendTask(Gateway* gateway) 00035 { 00036 _gateway = gateway; 00037 _gateway->attach((Thread*)this); 00038 _gwparams = nullptr; 00039 _light = nullptr; 00040 } 00041 00042 BrokerSendTask::~BrokerSendTask() 00043 { 00044 00045 } 00046 00047 /** 00048 * Initialize attributs of this class 00049 */ 00050 void BrokerSendTask::initialize(int argc, char** argv) 00051 { 00052 _gwparams = _gateway->getGWParams(); 00053 _light = _gateway->getLightIndicator(); 00054 } 00055 00056 /** 00057 * connect to the broker and send MQTT messges 00058 */ 00059 void BrokerSendTask::run() 00060 { 00061 Event* ev = nullptr; 00062 MQTTGWPacket* packet = nullptr; 00063 Client* client = nullptr; 00064 AdapterManager* adpMgr = _gateway->getAdapterManager(); 00065 int rc = 0; 00066 00067 while (true) 00068 { 00069 ev = _gateway->getBrokerSendQue()->wait(); 00070 00071 if ( ev->getEventType() == EtStop ) 00072 { 00073 WRITELOG("%s BrokerSendTask stopped.\n", currentDateTime()); 00074 delete ev; 00075 return; 00076 } 00077 00078 if ( ev->getEventType() == EtBrokerSend) 00079 { 00080 client = ev->getClient(); 00081 packet = ev->getMQTTGWPacket(); 00082 00083 /* Check Client is managed by Adapters */ 00084 client = adpMgr->getClient(*client); 00085 00086 if ( packet->getType() == CONNECT && client->getNetwork()->isValid() ) 00087 { 00088 client->getNetwork()->close(); 00089 } 00090 00091 if ( !client->getNetwork()->isValid() ) 00092 { 00093 /* connect to the broker and send a packet */ 00094 00095 if (client->isSecureNetwork()) 00096 { 00097 rc = client->getNetwork()->connect((const char*)_gwparams->brokerName, (const char*)_gwparams->portSecure, (const char*)_gwparams->rootCApath, 00098 (const char*)_gwparams->rootCAfile, (const char*)_gwparams->certKey, (const char*)_gwparams->privateKey); 00099 } 00100 else 00101 { 00102 rc = client->getNetwork()->connect((const char*)_gwparams->brokerName, (const char*)_gwparams->port); 00103 } 00104 00105 if ( !rc ) 00106 { 00107 /* disconnect the broker and the client */ 00108 WRITELOG("%s BrokerSendTask: %s can't connect to the broker. errno=%d %s %s\n", 00109 ERRMSG_HEADER, client->getClientId(), errno, strerror(errno), ERRMSG_FOOTER); 00110 delete ev; 00111 client->getNetwork()->close(); 00112 continue; 00113 } 00114 } 00115 00116 /* send a packet */ 00117 _light->blueLight(true); 00118 if ( (rc = packet->send(client->getNetwork())) > 0 ) 00119 { 00120 if ( packet->getType() == CONNECT ) 00121 { 00122 client->connectSended(); 00123 } 00124 log(client, packet); 00125 } 00126 else 00127 { 00128 WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n", 00129 ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER); 00130 client->getNetwork()->close(); 00131 00132 /* Disconnect the client */ 00133 packet = new MQTTGWPacket(); 00134 packet->setHeader(DISCONNECT); 00135 Event* ev1 = new Event(); 00136 ev1->setBrokerRecvEvent(client, packet); 00137 _gateway->getPacketEventQue()->post(ev1); 00138 } 00139 00140 _light->blueLight(false); 00141 } 00142 delete ev; 00143 } 00144 } 00145 00146 00147 /** 00148 * write message content into stdout or Ringbuffer 00149 */ 00150 void BrokerSendTask::log(Client* client, MQTTGWPacket* packet) 00151 { 00152 char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3]; 00153 char msgId[6]; 00154 00155 switch (packet->getType()) 00156 { 00157 case CONNECT: 00158 WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); 00159 break; 00160 case PUBLISH: 00161 WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); 00162 break; 00163 case SUBSCRIBE: 00164 case UNSUBSCRIBE: 00165 case PUBACK: 00166 case PUBREC: 00167 case PUBREL: 00168 case PUBCOMP: 00169 WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); 00170 break; 00171 case PINGREQ: 00172 WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); 00173 break; 00174 case DISCONNECT: 00175 WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); 00176 break; 00177 default: 00178 break; 00179 } 00180 } 00181
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2