Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqttsas2.py Source File

mqttsas2.py

00001 """
00002 *******************************************************************
00003   Copyright (c) 2013, 2017 IBM Corp.
00004 
00005   All rights reserved. This program and the accompanying materials
00006   are made available under the terms of the Eclipse Public License v1.0
00007   and Eclipse Distribution License v1.0 which accompany this distribution.
00008 
00009   The Eclipse Public License is available at
00010      http://www.eclipse.org/legal/epl-v10.html
00011   and the Eclipse Distribution License is available at
00012     http://www.eclipse.org/org/documents/edl-v10.php.
00013 
00014   Contributors:
00015      Ian Craggs - initial implementation and/or documentation
00016 *******************************************************************
00017 """
00018 from __future__ import print_function
00019 
00020 # Trace MQTT traffic
00021 import MQTTV3112 as MQTTV3
00022 
00023 import socket, sys, select, traceback, datetime, os
00024 import SocketServer as socketserver
00025 
00026 logging = True
00027 myWindow = None
00028 
00029 
00030 def timestamp():
00031   now = datetime.datetime.now()
00032   return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
00033 
00034 
00035 class MyHandler(socketserver.StreamRequestHandler):
00036 
00037   def handle(self):
00038     if not hasattr(self, "ids"):
00039       self.ids = {}
00040     if not hasattr(self, "versions"):
00041       self.versions = {}
00042     inbuf = True
00043     i = o = e = None
00044     try:
00045       clients = self.request
00046       brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00047       brokers.connect((brokerhost, brokerport))
00048       terminated = False
00049       while inbuf != None and not terminated:
00050         (i, o, e) = select.select([clients, brokers], [], [])
00051         for s in i:
00052           if s == clients:
00053             inbuf = MQTTV3.getPacket(clients) # get one packet
00054             if inbuf == None:
00055               break
00056             try:
00057               packet = MQTTV3.unpackPacket(inbuf)
00058               if packet.fh.MessageType == MQTTV3.PUBLISH and \
00059                   packet.topicName == "MQTTSAS topic" and \
00060                   packet.data == b"TERMINATE":
00061                 print("Terminating client", self.ids[id(clients)])
00062                 brokers.close()
00063                 clients.close()
00064                 terminated = True
00065                 break
00066               elif packet.fh.MessageType == MQTTV3.CONNECT:
00067                 self.ids[id(clients)] = packet.ClientIdentifier
00068                 self.versions[id(clients)] = 3
00069               print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
00070               #print([hex(b) for b in inbuf])
00071               #print(inbuf)
00072             except:
00073               traceback.print_exc()
00074             brokers.send(inbuf)       # pass it on
00075           elif s == brokers:
00076             inbuf = MQTTV3.getPacket(brokers) # get one packet
00077             if inbuf == None:
00078               break
00079             try:
00080               print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
00081             except:
00082               traceback.print_exc()
00083             clients.send(inbuf)
00084       print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
00085     except:
00086       print(repr((i, o, e)), repr(inbuf))
00087       traceback.print_exc()
00088     if id(clients) in self.ids.keys():
00089       del self.ids[id(clients)]
00090     elif id(clients) in self.versions.keys():
00091       del self.versions[id(clients)]
00092 
00093 class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
00094   pass
00095 
00096 def run():
00097   global brokerhost, brokerport
00098   myhost = '127.0.0.1'
00099   if len(sys.argv) > 1:
00100     brokerhost = sys.argv[1]
00101   else:
00102     brokerhost = '127.0.0.1'
00103 
00104   if len(sys.argv) > 2:
00105     brokerport = int(sys.argv[2])
00106   else:
00107     brokerport = 1883
00108 
00109   if len(sys.argv) > 3:
00110     myport = int(sys.argv[3])
00111   else:
00112     if brokerhost == myhost:
00113       myport = brokerport + 1
00114     else:
00115       myport = 1883
00116 
00117   print("Listening on port", str(myport)+", broker on port", brokerport)
00118   s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
00119   s.serve_forever()
00120 
00121 if __name__ == "__main__":
00122   run()