Library for MQTT

Committer:
pavleradojkovic
Date:
Mon Jun 20 16:24:43 2022 +0000
Revision:
0:ba7e439238ab
Inital commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
pavleradojkovic 0:ba7e439238ab 1 """
pavleradojkovic 0:ba7e439238ab 2 *******************************************************************
pavleradojkovic 0:ba7e439238ab 3 Copyright (c) 2013, 2017 IBM Corp.
pavleradojkovic 0:ba7e439238ab 4
pavleradojkovic 0:ba7e439238ab 5 All rights reserved. This program and the accompanying materials
pavleradojkovic 0:ba7e439238ab 6 are made available under the terms of the Eclipse Public License v1.0
pavleradojkovic 0:ba7e439238ab 7 and Eclipse Distribution License v1.0 which accompany this distribution.
pavleradojkovic 0:ba7e439238ab 8
pavleradojkovic 0:ba7e439238ab 9 The Eclipse Public License is available at
pavleradojkovic 0:ba7e439238ab 10 http://www.eclipse.org/legal/epl-v10.html
pavleradojkovic 0:ba7e439238ab 11 and the Eclipse Distribution License is available at
pavleradojkovic 0:ba7e439238ab 12 http://www.eclipse.org/org/documents/edl-v10.php.
pavleradojkovic 0:ba7e439238ab 13
pavleradojkovic 0:ba7e439238ab 14 Contributors:
pavleradojkovic 0:ba7e439238ab 15 Ian Craggs - initial implementation and/or documentation
pavleradojkovic 0:ba7e439238ab 16 *******************************************************************
pavleradojkovic 0:ba7e439238ab 17 """
pavleradojkovic 0:ba7e439238ab 18 from __future__ import print_function
pavleradojkovic 0:ba7e439238ab 19
pavleradojkovic 0:ba7e439238ab 20 # Trace MQTT traffic
pavleradojkovic 0:ba7e439238ab 21 import MQTTV3112 as MQTTV3
pavleradojkovic 0:ba7e439238ab 22
pavleradojkovic 0:ba7e439238ab 23 import socket, sys, select, traceback, datetime, os
pavleradojkovic 0:ba7e439238ab 24 import SocketServer as socketserver
pavleradojkovic 0:ba7e439238ab 25
pavleradojkovic 0:ba7e439238ab 26 logging = True
pavleradojkovic 0:ba7e439238ab 27 myWindow = None
pavleradojkovic 0:ba7e439238ab 28
pavleradojkovic 0:ba7e439238ab 29
pavleradojkovic 0:ba7e439238ab 30 def timestamp():
pavleradojkovic 0:ba7e439238ab 31 now = datetime.datetime.now()
pavleradojkovic 0:ba7e439238ab 32 return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
pavleradojkovic 0:ba7e439238ab 33
pavleradojkovic 0:ba7e439238ab 34
pavleradojkovic 0:ba7e439238ab 35 class MyHandler(socketserver.StreamRequestHandler):
pavleradojkovic 0:ba7e439238ab 36
pavleradojkovic 0:ba7e439238ab 37 def handle(self):
pavleradojkovic 0:ba7e439238ab 38 if not hasattr(self, "ids"):
pavleradojkovic 0:ba7e439238ab 39 self.ids = {}
pavleradojkovic 0:ba7e439238ab 40 if not hasattr(self, "versions"):
pavleradojkovic 0:ba7e439238ab 41 self.versions = {}
pavleradojkovic 0:ba7e439238ab 42 inbuf = True
pavleradojkovic 0:ba7e439238ab 43 i = o = e = None
pavleradojkovic 0:ba7e439238ab 44 try:
pavleradojkovic 0:ba7e439238ab 45 clients = self.request
pavleradojkovic 0:ba7e439238ab 46 brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
pavleradojkovic 0:ba7e439238ab 47 brokers.connect((brokerhost, brokerport))
pavleradojkovic 0:ba7e439238ab 48 terminated = False
pavleradojkovic 0:ba7e439238ab 49 while inbuf != None and not terminated:
pavleradojkovic 0:ba7e439238ab 50 (i, o, e) = select.select([clients, brokers], [], [])
pavleradojkovic 0:ba7e439238ab 51 for s in i:
pavleradojkovic 0:ba7e439238ab 52 if s == clients:
pavleradojkovic 0:ba7e439238ab 53 inbuf = MQTTV3.getPacket(clients) # get one packet
pavleradojkovic 0:ba7e439238ab 54 if inbuf == None:
pavleradojkovic 0:ba7e439238ab 55 break
pavleradojkovic 0:ba7e439238ab 56 try:
pavleradojkovic 0:ba7e439238ab 57 packet = MQTTV3.unpackPacket(inbuf)
pavleradojkovic 0:ba7e439238ab 58 if packet.fh.MessageType == MQTTV3.PUBLISH and \
pavleradojkovic 0:ba7e439238ab 59 packet.topicName == "MQTTSAS topic" and \
pavleradojkovic 0:ba7e439238ab 60 packet.data == b"TERMINATE":
pavleradojkovic 0:ba7e439238ab 61 print("Terminating client", self.ids[id(clients)])
pavleradojkovic 0:ba7e439238ab 62 brokers.close()
pavleradojkovic 0:ba7e439238ab 63 clients.close()
pavleradojkovic 0:ba7e439238ab 64 terminated = True
pavleradojkovic 0:ba7e439238ab 65 break
pavleradojkovic 0:ba7e439238ab 66 elif packet.fh.MessageType == MQTTV3.CONNECT:
pavleradojkovic 0:ba7e439238ab 67 self.ids[id(clients)] = packet.ClientIdentifier
pavleradojkovic 0:ba7e439238ab 68 self.versions[id(clients)] = 3
pavleradojkovic 0:ba7e439238ab 69 print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
pavleradojkovic 0:ba7e439238ab 70 #print([hex(b) for b in inbuf])
pavleradojkovic 0:ba7e439238ab 71 #print(inbuf)
pavleradojkovic 0:ba7e439238ab 72 except:
pavleradojkovic 0:ba7e439238ab 73 traceback.print_exc()
pavleradojkovic 0:ba7e439238ab 74 brokers.send(inbuf) # pass it on
pavleradojkovic 0:ba7e439238ab 75 elif s == brokers:
pavleradojkovic 0:ba7e439238ab 76 inbuf = MQTTV3.getPacket(brokers) # get one packet
pavleradojkovic 0:ba7e439238ab 77 if inbuf == None:
pavleradojkovic 0:ba7e439238ab 78 break
pavleradojkovic 0:ba7e439238ab 79 try:
pavleradojkovic 0:ba7e439238ab 80 print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
pavleradojkovic 0:ba7e439238ab 81 except:
pavleradojkovic 0:ba7e439238ab 82 traceback.print_exc()
pavleradojkovic 0:ba7e439238ab 83 clients.send(inbuf)
pavleradojkovic 0:ba7e439238ab 84 print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
pavleradojkovic 0:ba7e439238ab 85 except:
pavleradojkovic 0:ba7e439238ab 86 print(repr((i, o, e)), repr(inbuf))
pavleradojkovic 0:ba7e439238ab 87 traceback.print_exc()
pavleradojkovic 0:ba7e439238ab 88 if id(clients) in self.ids.keys():
pavleradojkovic 0:ba7e439238ab 89 del self.ids[id(clients)]
pavleradojkovic 0:ba7e439238ab 90 elif id(clients) in self.versions.keys():
pavleradojkovic 0:ba7e439238ab 91 del self.versions[id(clients)]
pavleradojkovic 0:ba7e439238ab 92
pavleradojkovic 0:ba7e439238ab 93 class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pavleradojkovic 0:ba7e439238ab 94 pass
pavleradojkovic 0:ba7e439238ab 95
pavleradojkovic 0:ba7e439238ab 96 def run():
pavleradojkovic 0:ba7e439238ab 97 global brokerhost, brokerport
pavleradojkovic 0:ba7e439238ab 98 myhost = '127.0.0.1'
pavleradojkovic 0:ba7e439238ab 99 if len(sys.argv) > 1:
pavleradojkovic 0:ba7e439238ab 100 brokerhost = sys.argv[1]
pavleradojkovic 0:ba7e439238ab 101 else:
pavleradojkovic 0:ba7e439238ab 102 brokerhost = '127.0.0.1'
pavleradojkovic 0:ba7e439238ab 103
pavleradojkovic 0:ba7e439238ab 104 if len(sys.argv) > 2:
pavleradojkovic 0:ba7e439238ab 105 brokerport = int(sys.argv[2])
pavleradojkovic 0:ba7e439238ab 106 else:
pavleradojkovic 0:ba7e439238ab 107 brokerport = 1883
pavleradojkovic 0:ba7e439238ab 108
pavleradojkovic 0:ba7e439238ab 109 if len(sys.argv) > 3:
pavleradojkovic 0:ba7e439238ab 110 myport = int(sys.argv[3])
pavleradojkovic 0:ba7e439238ab 111 else:
pavleradojkovic 0:ba7e439238ab 112 if brokerhost == myhost:
pavleradojkovic 0:ba7e439238ab 113 myport = brokerport + 1
pavleradojkovic 0:ba7e439238ab 114 else:
pavleradojkovic 0:ba7e439238ab 115 myport = 1883
pavleradojkovic 0:ba7e439238ab 116
pavleradojkovic 0:ba7e439238ab 117 print("Listening on port", str(myport)+", broker on port", brokerport)
pavleradojkovic 0:ba7e439238ab 118 s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
pavleradojkovic 0:ba7e439238ab 119 s.serve_forever()
pavleradojkovic 0:ba7e439238ab 120
pavleradojkovic 0:ba7e439238ab 121 if __name__ == "__main__":
pavleradojkovic 0:ba7e439238ab 122 run()