Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mqttsas2.py
00001 """ 00002 ******************************************************************* 00003 Copyright (c) 2013, 2017 IBM Corp. 00004 00005 All rights reserved. This program and the accompanying materials 00006 are made available under the terms of the Eclipse Public License v1.0 00007 and Eclipse Distribution License v1.0 which accompany this distribution. 00008 00009 The Eclipse Public License is available at 00010 http://www.eclipse.org/legal/epl-v10.html 00011 and the Eclipse Distribution License is available at 00012 http://www.eclipse.org/org/documents/edl-v10.php. 00013 00014 Contributors: 00015 Ian Craggs - initial implementation and/or documentation 00016 ******************************************************************* 00017 """ 00018 from __future__ import print_function 00019 00020 # Trace MQTT traffic 00021 import MQTTV3112 as MQTTV3 00022 00023 import socket, sys, select, traceback, datetime, os 00024 import SocketServer as socketserver 00025 00026 logging = True 00027 myWindow = None 00028 00029 00030 def timestamp(): 00031 now = datetime.datetime.now() 00032 return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:] 00033 00034 00035 class MyHandler(socketserver.StreamRequestHandler): 00036 00037 def handle(self): 00038 if not hasattr(self, "ids"): 00039 self.ids = {} 00040 if not hasattr(self, "versions"): 00041 self.versions = {} 00042 inbuf = True 00043 i = o = e = None 00044 try: 00045 clients = self.request 00046 brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 00047 brokers.connect((brokerhost, brokerport)) 00048 terminated = False 00049 while inbuf != None and not terminated: 00050 (i, o, e) = select.select([clients, brokers], [], []) 00051 for s in i: 00052 if s == clients: 00053 inbuf = MQTTV3.getPacket(clients) # get one packet 00054 if inbuf == None: 00055 break 00056 try: 00057 packet = MQTTV3.unpackPacket(inbuf) 00058 if packet.fh.MessageType == MQTTV3.PUBLISH and \ 00059 packet.topicName == "MQTTSAS topic" and \ 00060 packet.data == b"TERMINATE": 00061 print("Terminating client", self.ids[id(clients)]) 00062 brokers.close() 00063 clients.close() 00064 terminated = True 00065 break 00066 elif packet.fh.MessageType == MQTTV3.CONNECT: 00067 self.ids[id(clients)] = packet.ClientIdentifier 00068 self.versions[id(clients)] = 3 00069 print(timestamp() , "C to S", self.ids[id(clients)], repr(packet)) 00070 #print([hex(b) for b in inbuf]) 00071 #print(inbuf) 00072 except: 00073 traceback.print_exc() 00074 brokers.send(inbuf) # pass it on 00075 elif s == brokers: 00076 inbuf = MQTTV3.getPacket(brokers) # get one packet 00077 if inbuf == None: 00078 break 00079 try: 00080 print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf))) 00081 except: 00082 traceback.print_exc() 00083 clients.send(inbuf) 00084 print(timestamp()+" client "+self.ids[id(clients)]+" connection closing") 00085 except: 00086 print(repr((i, o, e)), repr(inbuf)) 00087 traceback.print_exc() 00088 if id(clients) in self.ids.keys(): 00089 del self.ids[id(clients)] 00090 elif id(clients) in self.versions.keys(): 00091 del self.versions[id(clients)] 00092 00093 class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 00094 pass 00095 00096 def run(): 00097 global brokerhost, brokerport 00098 myhost = '127.0.0.1' 00099 if len(sys.argv) > 1: 00100 brokerhost = sys.argv[1] 00101 else: 00102 brokerhost = '127.0.0.1' 00103 00104 if len(sys.argv) > 2: 00105 brokerport = int(sys.argv[2]) 00106 else: 00107 brokerport = 1883 00108 00109 if len(sys.argv) > 3: 00110 myport = int(sys.argv[3]) 00111 else: 00112 if brokerhost == myhost: 00113 myport = brokerport + 1 00114 else: 00115 myport = 1883 00116 00117 print("Listening on port", str(myport)+", broker on port", brokerport) 00118 s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler) 00119 s.serve_forever() 00120 00121 if __name__ == "__main__": 00122 run()
Generated on Wed Jul 13 2022 10:46:02 by
1.7.2