Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTSNGWPublishHandler.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 * Tieto Poland Sp. z o.o. - Gateway improvements 00016 **************************************************************************************/ 00017 00018 #include "MQTTSNGWPublishHandler.h" 00019 #include "MQTTSNGWPacket.h" 00020 #include "MQTTGWPacket.h" 00021 #include "MQTTSNGateway.h" 00022 #include "MQTTSNGWClient.h" 00023 #include "MQTTSNGWQoSm1Proxy.h" 00024 #include <string.h> 00025 using namespace std; 00026 using namespace MQTTSNGW; 00027 00028 MQTTSNPublishHandler::MQTTSNPublishHandler(Gateway* gateway) 00029 { 00030 _gateway = gateway; 00031 } 00032 00033 MQTTSNPublishHandler::~MQTTSNPublishHandler() 00034 { 00035 00036 } 00037 00038 MQTTGWPacket* MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet) 00039 { 00040 uint8_t dup; 00041 int qos; 00042 uint8_t retained; 00043 uint16_t msgId; 00044 uint8_t* payload; 00045 MQTTSN_topicid topicid; 00046 int payloadlen; 00047 Publish pub = MQTTPacket_Publish_Initializer; 00048 00049 char shortTopic[2]; 00050 00051 if ( !_gateway->getAdapterManager()->getQoSm1Proxy()->isActive() ) 00052 { 00053 if ( client->isQoSm1() ) 00054 { 00055 _gateway->getAdapterManager()->getQoSm1Proxy()->savePacket(client, packet); 00056 00057 return nullptr; 00058 } 00059 } 00060 00061 if ( packet->getPUBLISH(&dup, &qos, &retained, &msgId, &topicid, &payload, &payloadlen) ==0 ) 00062 { 00063 return nullptr; 00064 } 00065 pub.msgId = msgId; 00066 pub.header.bits.dup = dup; 00067 pub.header.bits.qos = ( qos == 3 ? 0 : qos ); 00068 pub.header.bits.retain = retained; 00069 00070 Topic* topic = nullptr; 00071 00072 if( topicid.type == MQTTSN_TOPIC_TYPE_SHORT ) 00073 { 00074 shortTopic[0] = topicid.data.short_name[0]; 00075 shortTopic[1] = topicid.data.short_name[1]; 00076 pub.topic = shortTopic; 00077 pub.topiclen = 2; 00078 } 00079 else 00080 { 00081 topic = client->getTopics()->getTopicById(&topicid); 00082 if ( !topic ) 00083 { 00084 topic = _gateway->getTopics()->getTopicById(&topicid); 00085 if ( topic ) 00086 { 00087 topic = client->getTopics()->add(topic->getTopicName()->c_str(), topic->getTopicId()); 00088 } 00089 } 00090 00091 if( !topic && qos == 3 ) 00092 { 00093 WRITELOG("%s Invalid TopicId.%s %s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); 00094 return nullptr; 00095 } 00096 00097 if( !topic && msgId && qos > 0 && qos < 3 ) 00098 { 00099 /* Reply PubAck with INVALID_TOPIC_ID to the client */ 00100 MQTTSNPacket* pubAck = new MQTTSNPacket(); 00101 pubAck->setPUBACK( topicid.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID); 00102 Event* ev1 = new Event(); 00103 ev1->setClientSendEvent(client, pubAck); 00104 _gateway->getClientSendQue()->post(ev1); 00105 return nullptr; 00106 } 00107 if ( topic ) 00108 { 00109 pub.topic = (char*)topic->getTopicName()->data(); 00110 pub.topiclen = topic->getTopicName()->length(); 00111 } 00112 } 00113 /* Save a msgId & a TopicId pare for PUBACK */ 00114 if( msgId && qos > 0 && qos < 3) 00115 { 00116 client->setWaitedPubTopicId(msgId, topicid.data.id, topicid.type); 00117 } 00118 00119 pub.payload = (char*)payload; 00120 pub.payloadlen = payloadlen; 00121 00122 MQTTGWPacket* publish = new MQTTGWPacket(); 00123 publish->setPUBLISH(&pub); 00124 00125 if ( _gateway->getAdapterManager()->isAggregaterActive() && client->isAggregated() ) 00126 { 00127 return publish; 00128 } 00129 else 00130 { 00131 Event* ev1 = new Event(); 00132 ev1->setBrokerSendEvent(client, publish); 00133 _gateway->getBrokerSendQue()->post(ev1); 00134 return nullptr; 00135 } 00136 } 00137 00138 void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet) 00139 { 00140 uint16_t topicId; 00141 uint16_t msgId; 00142 uint8_t rc; 00143 00144 if ( client->isActive() ) 00145 { 00146 if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) 00147 { 00148 return; 00149 } 00150 00151 if ( rc == MQTTSN_RC_ACCEPTED) 00152 { 00153 if ( !_gateway->getAdapterManager()->getAggregater()->isActive() ) 00154 { 00155 MQTTGWPacket* pubAck = new MQTTGWPacket(); 00156 pubAck->setAck(PUBACK, msgId); 00157 Event* ev1 = new Event(); 00158 ev1->setBrokerSendEvent(client, pubAck); 00159 _gateway->getBrokerSendQue()->post(ev1); 00160 } 00161 } 00162 else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) 00163 { 00164 WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); 00165 } 00166 } 00167 } 00168 00169 void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8_t packetType) 00170 { 00171 uint16_t msgId; 00172 00173 if ( client->isActive() ) 00174 { 00175 if ( packet->getACK(&msgId) == 0 ) 00176 { 00177 return; 00178 } 00179 MQTTGWPacket* ackPacket = new MQTTGWPacket(); 00180 ackPacket->setAck(packetType, msgId); 00181 Event* ev1 = new Event(); 00182 ev1->setBrokerSendEvent(client, ackPacket); 00183 _gateway->getBrokerSendQue()->post(ev1); 00184 } 00185 } 00186 00187 void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) 00188 { 00189 uint16_t id; 00190 uint16_t msgId; 00191 MQTTSNString topicName = MQTTSNString_initializer;; 00192 MQTTSN_topicid topicid; 00193 00194 if ( client->isActive() || client->isAwake()) 00195 { 00196 if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) 00197 { 00198 return; 00199 } 00200 00201 topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; 00202 topicid.data.long_.len = topicName.lenstring.len; 00203 topicid.data.long_.name = topicName.lenstring.data; 00204 00205 id = client->getTopics()->add(&topicid)->getTopicId(); 00206 00207 MQTTSNPacket* regAck = new MQTTSNPacket(); 00208 regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); 00209 Event* ev = new Event(); 00210 ev->setClientSendEvent(client, regAck); 00211 _gateway->getClientSendQue()->post(ev); 00212 } 00213 } 00214 00215 void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet) 00216 { 00217 uint16_t id; 00218 uint16_t msgId; 00219 uint8_t rc; 00220 if ( client->isActive() || client->isAwake()) 00221 { 00222 if ( packet->getREGACK(&id, &msgId, &rc) == 0 ) 00223 { 00224 return; 00225 } 00226 00227 MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId); 00228 00229 if ( regAck != nullptr ) 00230 { 00231 client->getWaitREGACKPacketList()->erase(msgId); 00232 Event* ev = new Event(); 00233 ev->setClientSendEvent(client, regAck); 00234 _gateway->getClientSendQue()->post(ev); 00235 } 00236 if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 ) 00237 { 00238 /* send PINGREQ to the broker */ 00239 client->resetPingRequest(); 00240 MQTTGWPacket* pingreq = new MQTTGWPacket(); 00241 pingreq->setHeader(PINGREQ); 00242 Event* evt = new Event(); 00243 evt->setBrokerSendEvent(client, pingreq); 00244 _gateway->getBrokerSendQue()->post(evt); 00245 } 00246 } 00247 00248 } 00249 00250 00251 00252 00253 void MQTTSNPublishHandler::handleAggregatePublish(Client* client, MQTTSNPacket* packet) 00254 { 00255 int msgId = 0; 00256 MQTTGWPacket* publish = handlePublish(client, packet); 00257 if ( publish != nullptr ) 00258 { 00259 if ( publish->getMsgId() > 0 ) 00260 { 00261 if ( packet->isDuplicate() ) 00262 { 00263 msgId = _gateway->getAdapterManager()->getAggregater()->getMsgId(client, packet->getMsgId()); 00264 } 00265 else 00266 { 00267 msgId = _gateway->getAdapterManager()->getAggregater()->addMessageIdTable(client, packet->getMsgId()); 00268 } 00269 publish->setMsgId(msgId); 00270 } 00271 Event* ev1 = new Event(); 00272 ev1->setBrokerSendEvent(client, publish); 00273 _gateway->getBrokerSendQue()->post(ev1); 00274 } 00275 } 00276 00277 void MQTTSNPublishHandler::handleAggregateAck(Client* client, MQTTSNPacket* packet, int type) 00278 { 00279 if ( type == MQTTSN_PUBREC ) 00280 { 00281 uint16_t msgId; 00282 00283 if ( packet->getACK(&msgId) == 0 ) 00284 { 00285 return; 00286 } 00287 MQTTSNPacket* ackPacket = new MQTTSNPacket(); 00288 ackPacket->setPUBREL(msgId); 00289 Event* ev = new Event(); 00290 ev->setClientSendEvent(client, ackPacket); 00291 _gateway->getClientSendQue()->post(ev); 00292 } 00293 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2