Library for MQTT
Diff: mbed-mqtt-master/paho_mqtt_embedded_c/test/MQTTV3112.py
- Revision:
- 0:ba7e439238ab
diff -r 000000000000 -r ba7e439238ab mbed-mqtt-master/paho_mqtt_embedded_c/test/MQTTV3112.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mbed-mqtt-master/paho_mqtt_embedded_c/test/MQTTV3112.py Mon Jun 20 16:24:43 2022 +0000 @@ -0,0 +1,923 @@ +""" +******************************************************************* + Copyright (c) 2013, 2014 IBM Corp. + + All rights reserved. This program and the accompanying materials + are made available under the terms of the Eclipse Public License v1.0 + and Eclipse Distribution License v1.0 which accompany this distribution. + + The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html + and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + + Contributors: + Ian Craggs - initial implementation and/or documentation +******************************************************************* +""" +from __future__ import print_function + +""" + +Assertions are used to validate incoming data, but are omitted from outgoing packets. This is +so that the tests that use this package can send invalid data for error testing. + +""" + + +import logging + +logger = logging.getLogger("mqttsas") + +# Low-level protocol interface + +class MQTTException(Exception): + pass + + +# Message types +CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \ +PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \ +PINGREQ, PINGRESP, DISCONNECT = range(1, 15) + +packetNames = [ "reserved", \ +"Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \ +"Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \ +"Pingreq", "Pingresp", "Disconnect"] + +classNames = [ "reserved", \ +"Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \ +"Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \ +"Pingreqs", "Pingresps", "Disconnects"] + + +def MessageType(byte): + if byte != None: + rc = ord(byte[0]) >> 4 + else: + rc = None + return rc + + +def getPacket(aSocket): + "receive the next packet" + buf = aSocket.recv(1) # get the first byte fixed header + if buf == b"": + return None + if str(aSocket).find("[closed]") != -1: + closed = True + else: + closed = False + if closed: + return None + # now get the remaining length + multiplier = 1 + remlength = 0 + while 1: + next = aSocket.recv(1) + while len(next) == 0: + next = aSocket.recv(1) + buf += next + digit = ord(buf[-1]) + remlength += (digit & 127) * multiplier + if digit & 128 == 0: + break + multiplier *= 128 + # receive the remaining length if there is any + rest = '' + if remlength > 0: + while len(rest) < remlength: + rest += aSocket.recv(remlength-len(rest)) + assert len(rest) == remlength + return buf + rest + + +class FixedHeaders: + + def __init__(self, aMessageType): + self.MessageType = aMessageType + self.DUP = False + self.QoS = 0 + self.RETAIN = False + self.remainingLength = 0 + + def __eq__(self, fh): + return self.MessageType == fh.MessageType and \ + self.DUP == fh.DUP and \ + self.QoS == fh.QoS and \ + self.RETAIN == fh.RETAIN # and \ + # self.remainingLength == fh.remainingLength + + def __repr__(self): + "return printable representation of our data" + return classNames[self.MessageType]+'(DUP='+repr(self.DUP)+ \ + ", QoS="+repr(self.QoS)+", Retain="+repr(self.RETAIN) + + def pack(self, length): + "pack data into string buffer ready for transmission down socket" + buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\ + (self.QoS << 1) | self.RETAIN]) + self.remainingLength = length + buffer += self.encode(length) + return buffer + + def encode(self, x): + assert 0 <= x <= 268435455 + buffer = b'' + while 1: + digit = x % 128 + x //= 128 + if x > 0: + digit |= 0x80 + buffer += bytes([digit]) + if x == 0: + break + return buffer + + def unpack(self, buffer): + "unpack data from string buffer into separate fields" + b0 = ord(buffer[0]) + self.MessageType = b0 >> 4 + self.DUP = ((b0 >> 3) & 0x01) == 1 + self.QoS = (b0 >> 1) & 0x03 + self.RETAIN = (b0 & 0x01) == 1 + (self.remainingLength, bytes) = self.decode(buffer[1:]) + return bytes + 1 # length of fixed header + + def decode(self, buffer): + multiplier = 1 + value = 0 + bytes = 0 + while 1: + bytes += 1 + digit = ord(buffer[0]) + buffer = buffer[1:] + value += (digit & 127) * multiplier + if digit & 128 == 0: + break + multiplier *= 128 + return (value, bytes) + + +def writeInt16(length): + return bytes([length // 256, length % 256]) + +def readInt16(buf): + return ord(buf[0])*256 + ord(buf[1]) + +def writeUTF(data): + # data could be a string, or bytes. If string, encode into bytes with utf-8 + return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8")) + +def readUTF(buffer, maxlen): + if maxlen >= 2: + length = readInt16(buffer) + else: + raise MQTTException("Not enough data to read string length") + maxlen -= 2 + if length > maxlen: + raise MQTTException("Length delimited string too long") + buf = buffer[2:2+length].decode("utf-8") + logger.info("[MQTT-4.7.3-2] topic names and filters not include null") + zz = buf.find("\x00") # look for null in the UTF string + if zz != -1: + raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf) + """for c in range (0xD800, 0xDFFF): + zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string + if zz != -1: + raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf) + """ + if buf.find("\uFEFF") != -1: + logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string") + return buf + +def writeBytes(buffer): + return writeInt16(len(buffer)) + buffer + +def readBytes(buffer): + length = readInt16(buffer) + return buffer[2:2+length] + + +class Packets: + + def pack(self): + buffer = self.fh.pack(0) + return buffer + + def __repr__(self): + return repr(self.fh) + + def __eq__(self, packet): + return self.fh == packet.fh if packet else False + + +class Connects(Packets): + + def __init__(self, buffer = None): + self.fh = FixedHeaders(CONNECT) + + # variable header + self.ProtocolName = "MQTT" + self.ProtocolVersion = 4 + self.CleanSession = True + self.WillFlag = False + self.WillQoS = 0 + self.WillRETAIN = 0 + self.KeepAliveTimer = 30 + self.usernameFlag = False + self.passwordFlag = False + + # Payload + self.ClientIdentifier = "" # UTF-8 + self.WillTopic = None # UTF-8 + self.WillMessage = None # binary + self.username = None # UTF-8 + self.password = None # binary + + if buffer != None: + self.unpack(buffer) + + def pack(self): + connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \ + (self.WillQoS << 3) | (self.WillRETAIN << 5) | \ + (self.usernameFlag << 6) | (self.passwordFlag << 7)]) + buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \ + connectFlags + writeInt16(self.KeepAliveTimer) + buffer += writeUTF(self.ClientIdentifier) + if self.WillFlag: + buffer += writeUTF(self.WillTopic) + buffer += writeBytes(self.WillMessage) + if self.usernameFlag: + buffer += writeUTF(self.username) + if self.passwordFlag: + buffer += writeBytes(self.password) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == CONNECT + + try: + fhlen = self.fh.unpack(buffer) + packlen = fhlen + self.fh.remainingLength + assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen) + curlen = fhlen # points to after header + remaining length + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + + self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen) + curlen += len(self.ProtocolName) + 2 + assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName + + self.ProtocolVersion = ord(buffer[curlen]) + curlen += 1 + + connectFlags = ord(buffer[curlen]) + assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0" + self.CleanSession = ((connectFlags >> 1) & 0x01) == 1 + self.WillFlag = ((connectFlags >> 2) & 0x01) == 1 + self.WillQoS = (connectFlags >> 3) & 0x03 + self.WillRETAIN = (connectFlags >> 5) & 0x01 + self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1 + self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1 + curlen +=1 + + if self.WillFlag: + assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3" + else: + assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false" + assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false" + + self.KeepAliveTimer = readInt16(buffer[curlen:]) + curlen += 2 + logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field") + logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long") + self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen) + curlen += len(self.ClientIdentifier) + 2 + + if self.WillFlag: + self.WillTopic = readUTF(buffer[curlen:], packlen - curlen) + curlen += len(self.WillTopic) + 2 + self.WillMessage = readBytes(buffer[curlen:]) + curlen += len(self.WillMessage) + 2 + logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present") + else: + self.WillTopic = self.WillMessage = None + + if self.usernameFlag: + assert len(buffer) > curlen+2, "Buffer too short to read username length" + self.username = readUTF(buffer[curlen:], packlen - curlen) + curlen += len(self.username) + 2 + logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1") + else: + logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0") + assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0" + + if self.passwordFlag: + assert len(buffer) > curlen+2, "Buffer too short to read password length" + self.password = readBytes(buffer[curlen:]) + curlen += len(self.password) + 2 + logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0") + else: + logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0") + + if self.WillFlag and self.usernameFlag and self.passwordFlag: + logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present") + + assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d" + except: + logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform") + raise + + + + def __repr__(self): + buf = repr(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\ + repr(self.ProtocolVersion)+", CleanSession="+repr(self.CleanSession) +\ + ", WillFlag="+repr(self.WillFlag)+", KeepAliveTimer=" +\ + repr(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\ + ", usernameFlag="+repr(self.usernameFlag)+", passwordFlag="+repr(self.passwordFlag) + if self.WillFlag: + buf += ", WillQoS=" + repr(self.WillQoS) +\ + ", WillRETAIN=" + repr(self.WillRETAIN) +\ + ", WillTopic='"+ self.WillTopic +\ + "', WillMessage='"+str(self.WillMessage)+"'" + if self.username: + buf += ", username="+self.username + if self.password: + buf += ", password="+str(self.password) + return buf+")" + + def __eq__(self, packet): + rc = Packets.__eq__(self, packet) and \ + self.ProtocolName == packet.ProtocolName and \ + self.ProtocolVersion == packet.ProtocolVersion and \ + self.CleanSession == packet.CleanSession and \ + self.WillFlag == packet.WillFlag and \ + self.KeepAliveTimer == packet.KeepAliveTimer and \ + self.ClientIdentifier == packet.ClientIdentifier and \ + self.WillFlag == packet.WillFlag + if rc and self.WillFlag: + rc = self.WillQoS == packet.WillQoS and \ + self.WillRETAIN == packet.WillRETAIN and \ + self.WillTopic == packet.WillTopic and \ + self.WillMessage == packet.WillMessage + return rc + + +class Connacks(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0): + self.fh = FixedHeaders(CONNACK) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + self.flags = 0 + self.returnCode = ReturnCode + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = bytes([self.flags, self.returnCode]) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 4 + assert MessageType(buffer) == CONNACK + self.fh.unpack(buffer) + assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength + assert ord(buffer[2]) in [0, 1], "Connect Acknowledge Flags" + self.returnCode = ord(buffer[3]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + + def __repr__(self): + return repr(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+repr(self.returnCode)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.returnCode == packet.returnCode + + +class Disconnects(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): + self.fh = FixedHeaders(DISCONNECT) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + if buffer != None: + self.unpack(buffer) + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == DISCONNECT + self.fh.unpack(buffer) + assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength + logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0") + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + + def __repr__(self): + return repr(self.fh)+")" + + +class Publishes(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""): + self.fh = FixedHeaders(PUBLISH) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.topicName = TopicName + self.messageIdentifier = MsgId + # payload + self.data = Payload + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeUTF(self.topicName) + if self.fh.QoS != 0: + buffer += writeInt16(self.messageIdentifier) + buffer += self.data + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PUBLISH + fhlen = self.fh.unpack(buffer) + assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2" + packlen = fhlen + self.fh.remainingLength + assert len(buffer) >= packlen + curlen = fhlen + try: + self.topicName = readUTF(buffer[fhlen:], packlen - curlen) + except UnicodeDecodeError: + logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8") + raise + curlen += len(self.topicName) + 2 + if self.fh.QoS != 0: + self.messageIdentifier = readInt16(buffer[curlen:]) + logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2") + curlen += 2 + assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" + else: + logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0") + self.messageIdentifier = 0 + self.data = buffer[curlen:fhlen + self.fh.remainingLength] + if self.fh.QoS == 0: + assert self.fh.DUP == False, "[MQTT-2.1.2-4]" + return fhlen + self.fh.remainingLength + + def __repr__(self): + rc = repr(self.fh) + if self.fh.QoS != 0: + rc += ", MsgId="+repr(self.messageIdentifier) + rc += ", TopicName="+repr(self.topicName)+", Payload="+repr(self.data)+")" + return rc + + def __eq__(self, packet): + rc = Packets.__eq__(self, packet) and \ + self.topicName == packet.topicName and \ + self.data == packet.data + if rc and self.fh.QoS != 0: + rc = self.messageIdentifier == packet.messageIdentifier + return rc + + +class Pubacks(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): + self.fh = FixedHeaders(PUBACK) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PUBACK + fhlen = self.fh.unpack(buffer) + assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength + assert len(buffer) >= fhlen + self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0" + return fhlen + 2 + + def __repr__(self): + return repr(self.fh)+", MsgId "+repr(self.messageIdentifier) + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier + + +class Pubrecs(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): + self.fh = FixedHeaders(PUBREC) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PUBREC + fhlen = self.fh.unpack(buffer) + assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength + assert len(buffer) >= fhlen + self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0" + return fhlen + 2 + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier + + +class Pubrels(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0): + self.fh = FixedHeaders(PUBREL) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PUBREL + fhlen = self.fh.unpack(buffer) + assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength + assert len(buffer) >= fhlen + self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL" + assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL" + logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok") + return fhlen + 2 + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier + + +class Pubcomps(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): + self.fh = FixedHeaders(PUBCOMP) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PUBCOMP + fhlen = self.fh.unpack(buffer) + assert len(buffer) >= fhlen + self.fh.remainingLength + assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp" + return fhlen + 2 + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier + + +class Subscribes(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]): + self.fh = FixedHeaders(SUBSCRIBE) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + # payload - list of topic, qos pairs + self.data = Data[:] + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + for d in self.data: + buffer += writeUTF(d[0]) + bytes([d[1]]) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == SUBSCRIBE + fhlen = self.fh.unpack(buffer) + assert len(buffer) >= fhlen + self.fh.remainingLength + logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe") + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" + leftlen = self.fh.remainingLength - 2 + self.data = [] + while leftlen > 0: + topic = readUTF(buffer[-leftlen:], leftlen) + leftlen -= len(topic) + 2 + qos = ord(buffer[-leftlen]) + assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero" + leftlen -= 1 + self.data.append((topic, qos)) + assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe" + assert leftlen == 0 + assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe" + assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe" + return fhlen + self.fh.remainingLength + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ + ", Data="+repr(self.data)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier and \ + self.data == packet.data + + +class Subacks(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]): + self.fh = FixedHeaders(SUBACK) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + # payload - list of qos + self.data = Data[:] + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + for d in self.data: + buffer += bytes([d]) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == SUBACK + fhlen = self.fh.unpack(buffer) + assert len(buffer) >= fhlen + self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + leftlen = self.fh.remainingLength - 2 + self.data = [] + while leftlen > 0: + qos = buffer[-leftlen] + assert ord(qos) in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80, was "+ord(qos) + leftlen -= 1 + self.data.append(qos) + assert leftlen == 0 + assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback" + return fhlen + self.fh.remainingLength + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ + ", Data="+repr(self.data)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier and \ + self.data == packet.data + + +class Unsubscribes(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]): + self.fh = FixedHeaders(UNSUBSCRIBE) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + # payload - list of topics + self.data = Data[:] + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + for d in self.data: + buffer += writeUTF(d) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == UNSUBSCRIBE + fhlen = self.fh.unpack(buffer) + assert len(buffer) >= fhlen + self.fh.remainingLength + logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe") + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" + leftlen = self.fh.remainingLength - 2 + self.data = [] + while leftlen > 0: + topic = readUTF(buffer[-leftlen:], leftlen) + leftlen -= len(topic) + 2 + self.data.append(topic) + assert leftlen == 0 + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 1, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0") + return fhlen + self.fh.remainingLength + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\ + ", Data="+repr(self.data)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier and \ + self.data == packet.data + + +class Unsubacks(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0): + self.fh = FixedHeaders(UNSUBACK) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + # variable header + self.messageIdentifier = MsgId + if buffer != None: + self.unpack(buffer) + + def pack(self): + buffer = writeInt16(self.messageIdentifier) + buffer = self.fh.pack(len(buffer)) + buffer + return buffer + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == UNSUBACK + fhlen = self.fh.unpack(buffer) + assert len(buffer) >= fhlen + self.fh.remainingLength + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0" + self.messageIdentifier = readInt16(buffer[fhlen:]) + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + return fhlen + self.fh.remainingLength + + def __repr__(self): + return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")" + + def __eq__(self, packet): + return Packets.__eq__(self, packet) and \ + self.messageIdentifier == packet.messageIdentifier + + +class Pingreqs(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): + self.fh = FixedHeaders(PINGREQ) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + if buffer != None: + self.unpack(buffer) + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PINGREQ + fhlen = self.fh.unpack(buffer) + assert self.fh.remainingLength == 0 + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + return fhlen + + def __repr__(self): + return repr(self.fh)+")" + + +class Pingresps(Packets): + + def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False): + self.fh = FixedHeaders(PINGRESP) + self.fh.DUP = DUP + self.fh.QoS = QoS + self.fh.Retain = Retain + if buffer != None: + self.unpack(buffer) + + def unpack(self, buffer): + assert len(buffer) >= 2 + assert MessageType(buffer) == PINGRESP + fhlen = self.fh.unpack(buffer) + assert self.fh.remainingLength == 0 + assert self.fh.DUP == False, "[MQTT-2.1.2-1]" + assert self.fh.QoS == 0, "[MQTT-2.1.2-1]" + assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]" + return fhlen + + def __repr__(self): + return repr(self.fh)+")" + +classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs, + Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes, + Unsubacks, Pingreqs, Pingresps, Disconnects] + +def unpackPacket(buffer): + if MessageType(buffer) != None: + packet = classes[MessageType(buffer)]() + packet.unpack(buffer) + else: + packet = None + return packet + +if __name__ == "__main__": + fh = FixedHeaders(CONNECT) + tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152, + 20555666, 268435454, 268435455] + for x in tests: + try: + assert x == fh.decode(fh.encode(x))[0] + except AssertionError: + print("Test failed for x =", x, fh.decode(fh.encode(x))) + try: + fh.decode(fh.encode(268435456)) + print("Error") + except AssertionError: + pass + + for packet in classes[1:]: + before = str(packet()) + after = str(unpackPacket(packet().pack())) + try: + assert before == after + except: + print("before:", before, "\nafter:", after) + print("End") +