Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTV3112.py
00001 """ 00002 ******************************************************************* 00003 Copyright (c) 2013, 2014 IBM Corp. 00004 00005 All rights reserved. This program and the accompanying materials 00006 are made available under the terms of the Eclipse Public License v1.0 00007 and Eclipse Distribution License v1.0 which accompany this distribution. 00008 00009 The Eclipse Public License is available at 00010 http://www.eclipse.org/legal/epl-v10.html 00011 and the Eclipse Distribution License is available at 00012 http://www.eclipse.org/org/documents/edl-v10.php. 00013 00014 Contributors: 00015 Ian Craggs - initial implementation and/or documentation 00016 ******************************************************************* 00017 """ 00018 from __future__ import print_function 00019 00020 """ 00021 00022 Assertions are used to validate incoming data, but are omitted from outgoing packets. This is 00023 so that the tests that use this package can send invalid data for error testing. 00024 00025 """ 00026 00027 00028 import logging 00029 00030 logger = logging.getLogger("mqttsas") 00031 00032 # Low-level protocol interface 00033 00034 class MQTTException(Exception): 00035 pass 00036 00037 00038 # Message types 00039 CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \ 00040 PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \ 00041 PINGREQ, PINGRESP, DISCONNECT = range(1, 15) 00042 00043 packetNames = [ "reserved", \ 00044 "Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \ 00045 "Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \ 00046 "Pingreq", "Pingresp", "Disconnect"] 00047 00048 classNames = [ "reserved", \ 00049 "Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \ 00050 "Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \ 00051 "Pingreqs", "Pingresps", "Disconnects"] 00052 00053 00054 def MessageType(byte): 00055 if byte != None: 00056 rc = ord(byte[0]) >> 4 00057 else: 00058 rc = None 00059 return rc 00060 00061 00062 def getPacket(aSocket): 00063 "receive the next packet" 00064 buf = aSocket.recv(1) # get the first byte fixed header 00065 if buf == b"": 00066 return None 00067 if str(aSocket).find("[closed]") != -1: 00068 closed = True 00069 else: 00070 closed = False 00071 if closed: 00072 return None 00073 # now get the remaining length 00074 multiplier = 1 00075 remlength = 0 00076 while 1: 00077 next = aSocket.recv(1) 00078 while len(next) == 0: 00079 next = aSocket.recv(1) 00080 buf += next 00081 digit = ord(buf[-1]) 00082 remlength += (digit & 127) * multiplier 00083 if digit & 128 == 0: 00084 break 00085 multiplier *= 128 00086 # receive the remaining length if there is any 00087 rest = '' 00088 if remlength > 0: 00089 while len(rest) < remlength: 00090 rest += aSocket.recv(remlength-len(rest)) 00091 assert len(rest) == remlength 00092 return buf + rest 00093 00094 00095 class FixedHeaders: 00096 00097 def __init__(self, aMessageType): 00098 self.MessageType = aMessageType 00099 self.DUP = False 00100 self.QoS = 0 00101 self.RETAIN = False 00102 self.remainingLength = 0 00103 00104 def __eq__(self, fh): 00105 return self.MessageType == fh.MessageType and \ 00106 self.DUP == fh.DUP and \ 00107 self.QoS == fh.QoS and \ 00108 self.RETAIN == fh.RETAIN # and \ 00109 # self.remainingLength == fh.remainingLength 00110 00111 def __repr__(self): 00112 "return printable representation of our data" 00113 return classNames[self.MessageType]+'(DUP='+repr(self.DUP)+ \ 00114 ", QoS="+repr(self.QoS)+", Retain="+repr(self.RETAIN) 00115 00116 def pack(self, length): 00117 "pack data into string buffer ready for transmission down socket" 00118 buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\ 00119 (self.QoS << 1) | self.RETAIN]) 00120 self.remainingLength = length 00121 buffer += self.encode(length) 00122 return buffer 00123 00124 def encode(self, x): 00125 assert 0 <= x <= 268435455 00126 buffer = b'' 00127 while 1: 00128 digit = x % 128 00129 x //= 128 00130 if x > 0: 00131 digit |= 0x80 00132 buffer += bytes([digit]) 00133 if x == 0: 00134 break 00135 return buffer 00136 00137 def unpack(self, buffer): 00138 "unpack data from string buffer into separate fields" 00139 b0 = ord(buffer[0]) 00140 self.MessageType = b0 >> 4 00141 self.DUP = ((b0 >> 3) & 0x01) == 1 00142 self.QoS = (b0 >> 1) & 0x03 00143 self.RETAIN = (b0 & 0x01) == 1 00144 (self.remainingLength, bytes) = self.decode(buffer[1:]) 00145 return bytes + 1 # length of fixed header 00146 00147 def decode(self, buffer): 00148 multiplier = 1 00149 value = 0 00150 bytes = 0 00151 while 1: 00152 bytes += 1 00153 digit = ord(buffer[0]) 00154 buffer = buffer[1:] 00155 value += (digit & 127) * multiplier 00156 if digit & 128 == 0: 00157 break 00158 multiplier *= 128 00159 return (value, bytes) 00160 00161 00162 def writeInt16(length): 00163 return bytes([length // 256, length % 256]) 00164 00165 def readInt16(buf): 00166 return ord(buf[0])*256 + ord(buf[1]) 00167 00168 def writeUTF(data): 00169 # data could be a string, or bytes. If string, encode into bytes with utf-8 00170 return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8")) 00171 00172 def readUTF(buffer, maxlen): 00173 if maxlen >= 2: 00174 length = readInt16(buffer) 00175 else: 00176 raise MQTTException("Not enough data to read string length") 00177 maxlen -= 2 00178 if length > maxlen: 00179 raise MQTTException("Length delimited string too long") 00180 buf = buffer[2:2+length].decode("utf-8") 00181 logger.info("[MQTT-4.7.3-2] topic names and filters not include null") 00182 zz = buf.find("\x00") # look for null in the UTF string 00183 if zz != -1: 00184 raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf) 00185 """for c in range (0xD800, 0xDFFF): 00186 zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string 00187 if zz != -1: 00188 raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf) 00189 """ 00190 if buf.find("\uFEFF") != -1: 00191 logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string") 00192 return buf 00193 00194 def writeBytes(buffer): 00195 return writeInt16(len(buffer)) + buffer 00196 00197 def readBytes(buffer): 00198 length = readInt16(buffer) 00199 return buffer[2:2+length] 00200 00201 00202 class Packets: 00203 00204 def pack(self): 00205 buffer = self.fh.pack(0) 00206 return buffer 00207 00208 def __repr__(self): 00209 return repr(self.fh) 00210 00211 def __eq__(self, packet): 00212 return self.fh == packet.fh if packet else False 00213 00214 00215 class Connects(Packets): 00216 00217 def __init__(self, buffer = None): 00218 self.fh = FixedHeaders(CONNECT) 00219 00220 # variable header 00221 self.ProtocolName = "MQTT" 00222 self.ProtocolVersion = 4 00223 self.CleanSession = True 00224 self.WillFlag = False 00225 self.WillQoS = 0 00226 self.WillRETAIN = 0 00227 self.KeepAliveTimer = 30 00228 self.usernameFlag = False 00229 self.passwordFlag = False 00230 00231 # Payload 00232 self.ClientIdentifier = "" # UTF-8 00233 self.WillTopic = None # UTF-8 00234 self.WillMessage = None # binary 00235 self.username = None # UTF-8 00236 self.password = None # binary 00237 00238 if buffer != None: 00239 self.unpack(buffer) 00240 00241 def pack(self): 00242 connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \ 00243 (self.WillQoS << 3) | (self.WillRETAIN << 5) | \ 00244 (self.usernameFlag << 6) | (self.passwordFlag << 7)]) 00245 buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \ 00246 connectFlags + writeInt16(self.KeepAliveTimer) 00247 buffer += writeUTF(self.ClientIdentifier) 00248 if self.WillFlag: 00249 buffer += writeUTF(self.WillTopic) 00250 buffer += writeBytes(self.WillMessage) 00251 if self.usernameFlag: 00252 buffer += writeUTF(self.username) 00253 if self.passwordFlag: 00254 buffer += writeBytes(self.password) 00255 buffer = self.fh.pack(len(buffer)) + buffer 00256 return buffer 00257 00258 def unpack(self, buffer): 00259 assert len(buffer) >= 2 00260 assert MessageType(buffer) == CONNECT 00261 00262 try: 00263 fhlen = self.fh.unpack(buffer) 00264 packlen = fhlen + self.fh.remainingLength 00265 assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen) 00266 curlen = fhlen # points to after header + remaining length 00267 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00268 assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS 00269 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00270 00271 self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen) 00272 curlen += len(self.ProtocolName) + 2 00273 assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName 00274 00275 self.ProtocolVersion = ord(buffer[curlen]) 00276 curlen += 1 00277 00278 connectFlags = ord(buffer[curlen]) 00279 assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0" 00280 self.CleanSession = ((connectFlags >> 1) & 0x01) == 1 00281 self.WillFlag = ((connectFlags >> 2) & 0x01) == 1 00282 self.WillQoS = (connectFlags >> 3) & 0x03 00283 self.WillRETAIN = (connectFlags >> 5) & 0x01 00284 self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1 00285 self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1 00286 curlen +=1 00287 00288 if self.WillFlag: 00289 assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3" 00290 else: 00291 assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false" 00292 assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false" 00293 00294 self.KeepAliveTimer = readInt16(buffer[curlen:]) 00295 curlen += 2 00296 logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field") 00297 logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long") 00298 self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen) 00299 curlen += len(self.ClientIdentifier) + 2 00300 00301 if self.WillFlag: 00302 self.WillTopic = readUTF(buffer[curlen:], packlen - curlen) 00303 curlen += len(self.WillTopic) + 2 00304 self.WillMessage = readBytes(buffer[curlen:]) 00305 curlen += len(self.WillMessage) + 2 00306 logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present") 00307 else: 00308 self.WillTopic = self.WillMessage = None 00309 00310 if self.usernameFlag: 00311 assert len(buffer) > curlen+2, "Buffer too short to read username length" 00312 self.username = readUTF(buffer[curlen:], packlen - curlen) 00313 curlen += len(self.username) + 2 00314 logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1") 00315 else: 00316 logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0") 00317 assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0" 00318 00319 if self.passwordFlag: 00320 assert len(buffer) > curlen+2, "Buffer too short to read password length" 00321 self.password = readBytes(buffer[curlen:]) 00322 curlen += len(self.password) + 2 00323 logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0") 00324 else: 00325 logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0") 00326 00327 if self.WillFlag and self.usernameFlag and self.passwordFlag: 00328 logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present") 00329 00330 assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d" 00331 except: 00332 logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform") 00333 raise 00334 00335 00336 00337 def __repr__(self): 00338 buf = repr(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\ 00339 repr(self.ProtocolVersion)+", CleanSession="+repr(self.CleanSession) +\ 00340 ", WillFlag="+repr(self.WillFlag)+", KeepAliveTimer=" +\ 00341 repr(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\ 00342 ", usernameFlag="+repr(self.usernameFlag)+", passwordFlag="+repr(self.passwordFlag) 00343 if self.WillFlag: 00344 buf += ", WillQoS=" + repr(self.WillQoS) +\ 00345 ", WillRETAIN=" + repr(self.WillRETAIN) +\ 00346 ", WillTopic='"+ self.WillTopic +\ 00347 "', WillMessage='"+str(self.WillMessage)+"'" 00348 if self.username: 00349 buf += ", username="+self.username 00350 if self.password: 00351 buf += ", password="+str(self.password) 00352 return buf+")" 00353 00354 def __eq__(self, packet): 00355 rc = Packets.__eq__(self, packet) and \ 00356 self.ProtocolName == packet.ProtocolName and \ 00357 self.ProtocolVersion == packet.ProtocolVersion and \ 00358 self.CleanSession == packet.CleanSession and \ 00359 self.WillFlag == packet.WillFlag and \ 00360 self.KeepAliveTimer == packet.KeepAliveTimer and \ 00361 self.ClientIdentifier == packet.ClientIdentifier and \ 00362 self.WillFlag == packet.WillFlag 00363 if rc and self.WillFlag: 00364 rc = self.WillQoS == packet.WillQoS and \ 00365 self.WillRETAIN == packet.WillRETAIN and \ 00366 self.WillTopic == packet.WillTopic and \ 00367 self.WillMessage == packet.WillMessage 00368 return rc 00369 00370 00371 class Connacks(Packets): 00372 00373 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0): 00374 self.fh = FixedHeaders(CONNACK) 00375 self.fh.DUP = DUP 00376 self.fh.QoS = QoS 00377 self.fh.Retain = Retain 00378 self.flags = 0 00379 self.returnCode = ReturnCode 00380 if buffer != None: 00381 self.unpack(buffer) 00382 00383 def pack(self): 00384 buffer = bytes([self.flags, self.returnCode]) 00385 buffer = self.fh.pack(len(buffer)) + buffer 00386 return buffer 00387 00388 def unpack(self, buffer): 00389 assert len(buffer) >= 4 00390 assert MessageType(buffer) == CONNACK 00391 self.fh.unpack(buffer) 00392 assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength 00393 assert ord(buffer[2]) in [0, 1], "Connect Acknowledge Flags" 00394 self.returnCode = ord(buffer[3]) 00395 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00396 assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" 00397 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00398 00399 def __repr__(self): 00400 return repr(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+repr(self.returnCode)+")" 00401 00402 def __eq__(self, packet): 00403 return Packets.__eq__(self, packet) and \ 00404 self.returnCode == packet.returnCode 00405 00406 00407 class Disconnects(Packets): 00408 00409 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): 00410 self.fh = FixedHeaders(DISCONNECT) 00411 self.fh.DUP = DUP 00412 self.fh.QoS = QoS 00413 self.fh.Retain = Retain 00414 if buffer != None: 00415 self.unpack(buffer) 00416 00417 def unpack(self, buffer): 00418 assert len(buffer) >= 2 00419 assert MessageType(buffer) == DISCONNECT 00420 self.fh.unpack(buffer) 00421 assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength 00422 logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0") 00423 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00424 assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" 00425 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00426 00427 def __repr__(self): 00428 return repr(self.fh)+")" 00429 00430 00431 class Publishes(Packets): 00432 00433 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""): 00434 self.fh = FixedHeaders(PUBLISH) 00435 self.fh.DUP = DUP 00436 self.fh.QoS = QoS 00437 self.fh.Retain = Retain 00438 # variable header 00439 self.topicName = TopicName 00440 self.messageIdentifier = MsgId 00441 # payload 00442 self.data = Payload 00443 if buffer != None: 00444 self.unpack(buffer) 00445 00446 def pack(self): 00447 buffer = writeUTF(self.topicName) 00448 if self.fh.QoS != 0: 00449 buffer += writeInt16(self.messageIdentifier) 00450 buffer += self.data 00451 buffer = self.fh.pack(len(buffer)) + buffer 00452 return buffer 00453 00454 def unpack(self, buffer): 00455 assert len(buffer) >= 2 00456 assert MessageType(buffer) == PUBLISH 00457 fhlen = self.fh.unpack(buffer) 00458 assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2" 00459 packlen = fhlen + self.fh.remainingLength 00460 assert len(buffer) >= packlen 00461 curlen = fhlen 00462 try: 00463 self.topicName = readUTF(buffer[fhlen:], packlen - curlen) 00464 except UnicodeDecodeError: 00465 logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8") 00466 raise 00467 curlen += len(self.topicName) + 2 00468 if self.fh.QoS != 0: 00469 self.messageIdentifier = readInt16(buffer[curlen:]) 00470 logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2") 00471 curlen += 2 00472 assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" 00473 else: 00474 logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0") 00475 self.messageIdentifier = 0 00476 self.data = buffer[curlen:fhlen + self.fh.remainingLength] 00477 if self.fh.QoS == 0: 00478 assert self.fh.DUP == False, "[MQTT-2.1.2-4]" 00479 return fhlen + self.fh.remainingLength 00480 00481 def __repr__(self): 00482 rc = repr(self.fh) 00483 if self.fh.QoS != 0: 00484 rc += ", MsgId="+repr(self.messageIdentifier) 00485 rc += ", TopicName="+repr(self.topicName)+", Payload="+repr(self.data)+")" 00486 return rc 00487 00488 def __eq__(self, packet): 00489 rc = Packets.__eq__(self, packet) and \ 00490 self.topicName == packet.topicName and \ 00491 self.data == packet.data 00492 if rc and self.fh.QoS != 0: 00493 rc = self.messageIdentifier == packet.messageIdentifier 00494 return rc 00495 00496 00497 class Pubacks(Packets): 00498 00499 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): 00500 self.fh = FixedHeaders(PUBACK) 00501 self.fh.DUP = DUP 00502 self.fh.QoS = QoS 00503 self.fh.Retain = Retain 00504 # variable header 00505 self.messageIdentifier = MsgId 00506 if buffer != None: 00507 self.unpack(buffer) 00508 00509 def pack(self): 00510 buffer = writeInt16(self.messageIdentifier) 00511 buffer = self.fh.pack(len(buffer)) + buffer 00512 return buffer 00513 00514 def unpack(self, buffer): 00515 assert len(buffer) >= 2 00516 assert MessageType(buffer) == PUBACK 00517 fhlen = self.fh.unpack(buffer) 00518 assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength 00519 assert len(buffer) >= fhlen + self.fh.remainingLength 00520 self.messageIdentifier = readInt16(buffer[fhlen:]) 00521 assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0" 00522 assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0" 00523 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0" 00524 return fhlen + 2 00525 00526 def __repr__(self): 00527 return repr(self.fh)+", MsgId "+repr(self.messageIdentifier) 00528 00529 def __eq__(self, packet): 00530 return Packets.__eq__(self, packet) and \ 00531 self.messageIdentifier == packet.messageIdentifier 00532 00533 00534 class Pubrecs(Packets): 00535 00536 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): 00537 self.fh = FixedHeaders(PUBREC) 00538 self.fh.DUP = DUP 00539 self.fh.QoS = QoS 00540 self.fh.Retain = Retain 00541 # variable header 00542 self.messageIdentifier = MsgId 00543 if buffer != None: 00544 self.unpack(buffer) 00545 00546 def pack(self): 00547 buffer = writeInt16(self.messageIdentifier) 00548 buffer = self.fh.pack(len(buffer)) + buffer 00549 return buffer 00550 00551 def unpack(self, buffer): 00552 assert len(buffer) >= 2 00553 assert MessageType(buffer) == PUBREC 00554 fhlen = self.fh.unpack(buffer) 00555 assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength 00556 assert len(buffer) >= fhlen + self.fh.remainingLength 00557 self.messageIdentifier = readInt16(buffer[fhlen:]) 00558 assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" 00559 assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" 00560 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" 00561 return fhlen + 2 00562 00563 def __repr__(self): 00564 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" 00565 00566 def __eq__(self, packet): 00567 return Packets.__eq__(self, packet) and \ 00568 self.messageIdentifier == packet.messageIdentifier 00569 00570 00571 class Pubrels(Packets): 00572 00573 def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0): 00574 self.fh = FixedHeaders(PUBREL) 00575 self.fh.DUP = DUP 00576 self.fh.QoS = QoS 00577 self.fh.Retain = Retain 00578 # variable header 00579 self.messageIdentifier = MsgId 00580 if buffer != None: 00581 self.unpack(buffer) 00582 00583 def pack(self): 00584 buffer = writeInt16(self.messageIdentifier) 00585 buffer = self.fh.pack(len(buffer)) + buffer 00586 return buffer 00587 00588 def unpack(self, buffer): 00589 assert len(buffer) >= 2 00590 assert MessageType(buffer) == PUBREL 00591 fhlen = self.fh.unpack(buffer) 00592 assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength 00593 assert len(buffer) >= fhlen + self.fh.remainingLength 00594 self.messageIdentifier = readInt16(buffer[fhlen:]) 00595 assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL" 00596 assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL" 00597 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL" 00598 logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok") 00599 return fhlen + 2 00600 00601 def __repr__(self): 00602 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" 00603 00604 def __eq__(self, packet): 00605 return Packets.__eq__(self, packet) and \ 00606 self.messageIdentifier == packet.messageIdentifier 00607 00608 00609 class Pubcomps(Packets): 00610 00611 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): 00612 self.fh = FixedHeaders(PUBCOMP) 00613 self.fh.DUP = DUP 00614 self.fh.QoS = QoS 00615 self.fh.Retain = Retain 00616 # variable header 00617 self.messageIdentifier = MsgId 00618 if buffer != None: 00619 self.unpack(buffer) 00620 00621 def pack(self): 00622 buffer = writeInt16(self.messageIdentifier) 00623 buffer = self.fh.pack(len(buffer)) + buffer 00624 return buffer 00625 00626 def unpack(self, buffer): 00627 assert len(buffer) >= 2 00628 assert MessageType(buffer) == PUBCOMP 00629 fhlen = self.fh.unpack(buffer) 00630 assert len(buffer) >= fhlen + self.fh.remainingLength 00631 assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength 00632 self.messageIdentifier = readInt16(buffer[fhlen:]) 00633 assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp" 00634 assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp" 00635 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp" 00636 return fhlen + 2 00637 00638 def __repr__(self): 00639 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" 00640 00641 def __eq__(self, packet): 00642 return Packets.__eq__(self, packet) and \ 00643 self.messageIdentifier == packet.messageIdentifier 00644 00645 00646 class Subscribes(Packets): 00647 00648 def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]): 00649 self.fh = FixedHeaders(SUBSCRIBE) 00650 self.fh.DUP = DUP 00651 self.fh.QoS = QoS 00652 self.fh.Retain = Retain 00653 # variable header 00654 self.messageIdentifier = MsgId 00655 # payload - list of topic, qos pairs 00656 self.data = Data[:] 00657 if buffer != None: 00658 self.unpack(buffer) 00659 00660 def pack(self): 00661 buffer = writeInt16(self.messageIdentifier) 00662 for d in self.data: 00663 buffer += writeUTF(d[0]) + bytes([d[1]]) 00664 buffer = self.fh.pack(len(buffer)) + buffer 00665 return buffer 00666 00667 def unpack(self, buffer): 00668 assert len(buffer) >= 2 00669 assert MessageType(buffer) == SUBSCRIBE 00670 fhlen = self.fh.unpack(buffer) 00671 assert len(buffer) >= fhlen + self.fh.remainingLength 00672 logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe") 00673 self.messageIdentifier = readInt16(buffer[fhlen:]) 00674 assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" 00675 leftlen = self.fh.remainingLength - 2 00676 self.data = [] 00677 while leftlen > 0: 00678 topic = readUTF(buffer[-leftlen:], leftlen) 00679 leftlen -= len(topic) + 2 00680 qos = ord(buffer[-leftlen]) 00681 assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero" 00682 leftlen -= 1 00683 self.data.append((topic, qos)) 00684 assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe" 00685 assert leftlen == 0 00686 assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe" 00687 assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe" 00688 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe" 00689 return fhlen + self.fh.remainingLength 00690 00691 def __repr__(self): 00692 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ 00693 ", Data="+repr(self.data)+")" 00694 00695 def __eq__(self, packet): 00696 return Packets.__eq__(self, packet) and \ 00697 self.messageIdentifier == packet.messageIdentifier and \ 00698 self.data == packet.data 00699 00700 00701 class Subacks(Packets): 00702 00703 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]): 00704 self.fh = FixedHeaders(SUBACK) 00705 self.fh.DUP = DUP 00706 self.fh.QoS = QoS 00707 self.fh.Retain = Retain 00708 # variable header 00709 self.messageIdentifier = MsgId 00710 # payload - list of qos 00711 self.data = Data[:] 00712 if buffer != None: 00713 self.unpack(buffer) 00714 00715 def pack(self): 00716 buffer = writeInt16(self.messageIdentifier) 00717 for d in self.data: 00718 buffer += bytes([d]) 00719 buffer = self.fh.pack(len(buffer)) + buffer 00720 return buffer 00721 00722 def unpack(self, buffer): 00723 assert len(buffer) >= 2 00724 assert MessageType(buffer) == SUBACK 00725 fhlen = self.fh.unpack(buffer) 00726 assert len(buffer) >= fhlen + self.fh.remainingLength 00727 self.messageIdentifier = readInt16(buffer[fhlen:]) 00728 leftlen = self.fh.remainingLength - 2 00729 self.data = [] 00730 while leftlen > 0: 00731 qos = buffer[-leftlen] 00732 assert ord(qos) in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80, was "+ord(qos) 00733 leftlen -= 1 00734 self.data.append(qos) 00735 assert leftlen == 0 00736 assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback" 00737 assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback" 00738 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback" 00739 return fhlen + self.fh.remainingLength 00740 00741 def __repr__(self): 00742 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ 00743 ", Data="+repr(self.data)+")" 00744 00745 def __eq__(self, packet): 00746 return Packets.__eq__(self, packet) and \ 00747 self.messageIdentifier == packet.messageIdentifier and \ 00748 self.data == packet.data 00749 00750 00751 class Unsubscribes(Packets): 00752 00753 def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]): 00754 self.fh = FixedHeaders(UNSUBSCRIBE) 00755 self.fh.DUP = DUP 00756 self.fh.QoS = QoS 00757 self.fh.Retain = Retain 00758 # variable header 00759 self.messageIdentifier = MsgId 00760 # payload - list of topics 00761 self.data = Data[:] 00762 if buffer != None: 00763 self.unpack(buffer) 00764 00765 def pack(self): 00766 buffer = writeInt16(self.messageIdentifier) 00767 for d in self.data: 00768 buffer += writeUTF(d) 00769 buffer = self.fh.pack(len(buffer)) + buffer 00770 return buffer 00771 00772 def unpack(self, buffer): 00773 assert len(buffer) >= 2 00774 assert MessageType(buffer) == UNSUBSCRIBE 00775 fhlen = self.fh.unpack(buffer) 00776 assert len(buffer) >= fhlen + self.fh.remainingLength 00777 logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe") 00778 self.messageIdentifier = readInt16(buffer[fhlen:]) 00779 assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" 00780 leftlen = self.fh.remainingLength - 2 00781 self.data = [] 00782 while leftlen > 0: 00783 topic = readUTF(buffer[-leftlen:], leftlen) 00784 leftlen -= len(topic) + 2 00785 self.data.append(topic) 00786 assert leftlen == 0 00787 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00788 assert self.fh.QoS == 1, "[MQTT-2.1.2-1]" 00789 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00790 logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0") 00791 return fhlen + self.fh.remainingLength 00792 00793 def __repr__(self): 00794 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ 00795 ", Data="+repr(self.data)+")" 00796 00797 def __eq__(self, packet): 00798 return Packets.__eq__(self, packet) and \ 00799 self.messageIdentifier == packet.messageIdentifier and \ 00800 self.data == packet.data 00801 00802 00803 class Unsubacks(Packets): 00804 00805 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): 00806 self.fh = FixedHeaders(UNSUBACK) 00807 self.fh.DUP = DUP 00808 self.fh.QoS = QoS 00809 self.fh.Retain = Retain 00810 # variable header 00811 self.messageIdentifier = MsgId 00812 if buffer != None: 00813 self.unpack(buffer) 00814 00815 def pack(self): 00816 buffer = writeInt16(self.messageIdentifier) 00817 buffer = self.fh.pack(len(buffer)) + buffer 00818 return buffer 00819 00820 def unpack(self, buffer): 00821 assert len(buffer) >= 2 00822 assert MessageType(buffer) == UNSUBACK 00823 fhlen = self.fh.unpack(buffer) 00824 assert len(buffer) >= fhlen + self.fh.remainingLength 00825 self.messageIdentifier = readInt16(buffer[fhlen:]) 00826 assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" 00827 self.messageIdentifier = readInt16(buffer[fhlen:]) 00828 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00829 assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" 00830 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00831 return fhlen + self.fh.remainingLength 00832 00833 def __repr__(self): 00834 return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" 00835 00836 def __eq__(self, packet): 00837 return Packets.__eq__(self, packet) and \ 00838 self.messageIdentifier == packet.messageIdentifier 00839 00840 00841 class Pingreqs(Packets): 00842 00843 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): 00844 self.fh = FixedHeaders(PINGREQ) 00845 self.fh.DUP = DUP 00846 self.fh.QoS = QoS 00847 self.fh.Retain = Retain 00848 if buffer != None: 00849 self.unpack(buffer) 00850 00851 def unpack(self, buffer): 00852 assert len(buffer) >= 2 00853 assert MessageType(buffer) == PINGREQ 00854 fhlen = self.fh.unpack(buffer) 00855 assert self.fh.remainingLength == 0 00856 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00857 assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" 00858 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00859 return fhlen 00860 00861 def __repr__(self): 00862 return repr(self.fh)+")" 00863 00864 00865 class Pingresps(Packets): 00866 00867 def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): 00868 self.fh = FixedHeaders(PINGRESP) 00869 self.fh.DUP = DUP 00870 self.fh.QoS = QoS 00871 self.fh.Retain = Retain 00872 if buffer != None: 00873 self.unpack(buffer) 00874 00875 def unpack(self, buffer): 00876 assert len(buffer) >= 2 00877 assert MessageType(buffer) == PINGRESP 00878 fhlen = self.fh.unpack(buffer) 00879 assert self.fh.remainingLength == 0 00880 assert self.fh.DUP == False, "[MQTT-2.1.2-1]" 00881 assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" 00882 assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" 00883 return fhlen 00884 00885 def __repr__(self): 00886 return repr(self.fh)+")" 00887 00888 classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs, 00889 Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes, 00890 Unsubacks, Pingreqs, Pingresps, Disconnects] 00891 00892 def unpackPacket(buffer): 00893 if MessageType(buffer) != None: 00894 packet = classes[MessageType(buffer)]() 00895 packet.unpack(buffer) 00896 else: 00897 packet = None 00898 return packet 00899 00900 if __name__ == "__main__": 00901 fh = FixedHeaders(CONNECT) 00902 tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152, 00903 20555666, 268435454, 268435455] 00904 for x in tests: 00905 try: 00906 assert x == fh.decode(fh.encode(x))[0] 00907 except AssertionError: 00908 print("Test failed for x =", x, fh.decode(fh.encode(x))) 00909 try: 00910 fh.decode(fh.encode(268435456)) 00911 print("Error") 00912 except AssertionError: 00913 pass 00914 00915 for packet in classes[1:]: 00916 before = str(packet()) 00917 after = str(unpackPacket(packet().pack())) 00918 try: 00919 assert before == after 00920 except: 00921 print("before:", before, "\nafter:", after) 00922 print("End") 00923
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2