Library for MQTT
mbed-mqtt-master/paho_mqtt_embedded_c/test/mqttsas2.py@0:ba7e439238ab, 2022-06-20 (annotated)
- Committer:
- pavleradojkovic
- Date:
- Mon Jun 20 16:24:43 2022 +0000
- Revision:
- 0:ba7e439238ab
Inital commit
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
pavleradojkovic | 0:ba7e439238ab | 1 | """ |
pavleradojkovic | 0:ba7e439238ab | 2 | ******************************************************************* |
pavleradojkovic | 0:ba7e439238ab | 3 | Copyright (c) 2013, 2017 IBM Corp. |
pavleradojkovic | 0:ba7e439238ab | 4 | |
pavleradojkovic | 0:ba7e439238ab | 5 | All rights reserved. This program and the accompanying materials |
pavleradojkovic | 0:ba7e439238ab | 6 | are made available under the terms of the Eclipse Public License v1.0 |
pavleradojkovic | 0:ba7e439238ab | 7 | and Eclipse Distribution License v1.0 which accompany this distribution. |
pavleradojkovic | 0:ba7e439238ab | 8 | |
pavleradojkovic | 0:ba7e439238ab | 9 | The Eclipse Public License is available at |
pavleradojkovic | 0:ba7e439238ab | 10 | http://www.eclipse.org/legal/epl-v10.html |
pavleradojkovic | 0:ba7e439238ab | 11 | and the Eclipse Distribution License is available at |
pavleradojkovic | 0:ba7e439238ab | 12 | http://www.eclipse.org/org/documents/edl-v10.php. |
pavleradojkovic | 0:ba7e439238ab | 13 | |
pavleradojkovic | 0:ba7e439238ab | 14 | Contributors: |
pavleradojkovic | 0:ba7e439238ab | 15 | Ian Craggs - initial implementation and/or documentation |
pavleradojkovic | 0:ba7e439238ab | 16 | ******************************************************************* |
pavleradojkovic | 0:ba7e439238ab | 17 | """ |
pavleradojkovic | 0:ba7e439238ab | 18 | from __future__ import print_function |
pavleradojkovic | 0:ba7e439238ab | 19 | |
pavleradojkovic | 0:ba7e439238ab | 20 | # Trace MQTT traffic |
pavleradojkovic | 0:ba7e439238ab | 21 | import MQTTV3112 as MQTTV3 |
pavleradojkovic | 0:ba7e439238ab | 22 | |
pavleradojkovic | 0:ba7e439238ab | 23 | import socket, sys, select, traceback, datetime, os |
pavleradojkovic | 0:ba7e439238ab | 24 | import SocketServer as socketserver |
pavleradojkovic | 0:ba7e439238ab | 25 | |
pavleradojkovic | 0:ba7e439238ab | 26 | logging = True |
pavleradojkovic | 0:ba7e439238ab | 27 | myWindow = None |
pavleradojkovic | 0:ba7e439238ab | 28 | |
pavleradojkovic | 0:ba7e439238ab | 29 | |
pavleradojkovic | 0:ba7e439238ab | 30 | def timestamp(): |
pavleradojkovic | 0:ba7e439238ab | 31 | now = datetime.datetime.now() |
pavleradojkovic | 0:ba7e439238ab | 32 | return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:] |
pavleradojkovic | 0:ba7e439238ab | 33 | |
pavleradojkovic | 0:ba7e439238ab | 34 | |
pavleradojkovic | 0:ba7e439238ab | 35 | class MyHandler(socketserver.StreamRequestHandler): |
pavleradojkovic | 0:ba7e439238ab | 36 | |
pavleradojkovic | 0:ba7e439238ab | 37 | def handle(self): |
pavleradojkovic | 0:ba7e439238ab | 38 | if not hasattr(self, "ids"): |
pavleradojkovic | 0:ba7e439238ab | 39 | self.ids = {} |
pavleradojkovic | 0:ba7e439238ab | 40 | if not hasattr(self, "versions"): |
pavleradojkovic | 0:ba7e439238ab | 41 | self.versions = {} |
pavleradojkovic | 0:ba7e439238ab | 42 | inbuf = True |
pavleradojkovic | 0:ba7e439238ab | 43 | i = o = e = None |
pavleradojkovic | 0:ba7e439238ab | 44 | try: |
pavleradojkovic | 0:ba7e439238ab | 45 | clients = self.request |
pavleradojkovic | 0:ba7e439238ab | 46 | brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
pavleradojkovic | 0:ba7e439238ab | 47 | brokers.connect((brokerhost, brokerport)) |
pavleradojkovic | 0:ba7e439238ab | 48 | terminated = False |
pavleradojkovic | 0:ba7e439238ab | 49 | while inbuf != None and not terminated: |
pavleradojkovic | 0:ba7e439238ab | 50 | (i, o, e) = select.select([clients, brokers], [], []) |
pavleradojkovic | 0:ba7e439238ab | 51 | for s in i: |
pavleradojkovic | 0:ba7e439238ab | 52 | if s == clients: |
pavleradojkovic | 0:ba7e439238ab | 53 | inbuf = MQTTV3.getPacket(clients) # get one packet |
pavleradojkovic | 0:ba7e439238ab | 54 | if inbuf == None: |
pavleradojkovic | 0:ba7e439238ab | 55 | break |
pavleradojkovic | 0:ba7e439238ab | 56 | try: |
pavleradojkovic | 0:ba7e439238ab | 57 | packet = MQTTV3.unpackPacket(inbuf) |
pavleradojkovic | 0:ba7e439238ab | 58 | if packet.fh.MessageType == MQTTV3.PUBLISH and \ |
pavleradojkovic | 0:ba7e439238ab | 59 | packet.topicName == "MQTTSAS topic" and \ |
pavleradojkovic | 0:ba7e439238ab | 60 | packet.data == b"TERMINATE": |
pavleradojkovic | 0:ba7e439238ab | 61 | print("Terminating client", self.ids[id(clients)]) |
pavleradojkovic | 0:ba7e439238ab | 62 | brokers.close() |
pavleradojkovic | 0:ba7e439238ab | 63 | clients.close() |
pavleradojkovic | 0:ba7e439238ab | 64 | terminated = True |
pavleradojkovic | 0:ba7e439238ab | 65 | break |
pavleradojkovic | 0:ba7e439238ab | 66 | elif packet.fh.MessageType == MQTTV3.CONNECT: |
pavleradojkovic | 0:ba7e439238ab | 67 | self.ids[id(clients)] = packet.ClientIdentifier |
pavleradojkovic | 0:ba7e439238ab | 68 | self.versions[id(clients)] = 3 |
pavleradojkovic | 0:ba7e439238ab | 69 | print(timestamp() , "C to S", self.ids[id(clients)], repr(packet)) |
pavleradojkovic | 0:ba7e439238ab | 70 | #print([hex(b) for b in inbuf]) |
pavleradojkovic | 0:ba7e439238ab | 71 | #print(inbuf) |
pavleradojkovic | 0:ba7e439238ab | 72 | except: |
pavleradojkovic | 0:ba7e439238ab | 73 | traceback.print_exc() |
pavleradojkovic | 0:ba7e439238ab | 74 | brokers.send(inbuf) # pass it on |
pavleradojkovic | 0:ba7e439238ab | 75 | elif s == brokers: |
pavleradojkovic | 0:ba7e439238ab | 76 | inbuf = MQTTV3.getPacket(brokers) # get one packet |
pavleradojkovic | 0:ba7e439238ab | 77 | if inbuf == None: |
pavleradojkovic | 0:ba7e439238ab | 78 | break |
pavleradojkovic | 0:ba7e439238ab | 79 | try: |
pavleradojkovic | 0:ba7e439238ab | 80 | print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf))) |
pavleradojkovic | 0:ba7e439238ab | 81 | except: |
pavleradojkovic | 0:ba7e439238ab | 82 | traceback.print_exc() |
pavleradojkovic | 0:ba7e439238ab | 83 | clients.send(inbuf) |
pavleradojkovic | 0:ba7e439238ab | 84 | print(timestamp()+" client "+self.ids[id(clients)]+" connection closing") |
pavleradojkovic | 0:ba7e439238ab | 85 | except: |
pavleradojkovic | 0:ba7e439238ab | 86 | print(repr((i, o, e)), repr(inbuf)) |
pavleradojkovic | 0:ba7e439238ab | 87 | traceback.print_exc() |
pavleradojkovic | 0:ba7e439238ab | 88 | if id(clients) in self.ids.keys(): |
pavleradojkovic | 0:ba7e439238ab | 89 | del self.ids[id(clients)] |
pavleradojkovic | 0:ba7e439238ab | 90 | elif id(clients) in self.versions.keys(): |
pavleradojkovic | 0:ba7e439238ab | 91 | del self.versions[id(clients)] |
pavleradojkovic | 0:ba7e439238ab | 92 | |
pavleradojkovic | 0:ba7e439238ab | 93 | class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): |
pavleradojkovic | 0:ba7e439238ab | 94 | pass |
pavleradojkovic | 0:ba7e439238ab | 95 | |
pavleradojkovic | 0:ba7e439238ab | 96 | def run(): |
pavleradojkovic | 0:ba7e439238ab | 97 | global brokerhost, brokerport |
pavleradojkovic | 0:ba7e439238ab | 98 | myhost = '127.0.0.1' |
pavleradojkovic | 0:ba7e439238ab | 99 | if len(sys.argv) > 1: |
pavleradojkovic | 0:ba7e439238ab | 100 | brokerhost = sys.argv[1] |
pavleradojkovic | 0:ba7e439238ab | 101 | else: |
pavleradojkovic | 0:ba7e439238ab | 102 | brokerhost = '127.0.0.1' |
pavleradojkovic | 0:ba7e439238ab | 103 | |
pavleradojkovic | 0:ba7e439238ab | 104 | if len(sys.argv) > 2: |
pavleradojkovic | 0:ba7e439238ab | 105 | brokerport = int(sys.argv[2]) |
pavleradojkovic | 0:ba7e439238ab | 106 | else: |
pavleradojkovic | 0:ba7e439238ab | 107 | brokerport = 1883 |
pavleradojkovic | 0:ba7e439238ab | 108 | |
pavleradojkovic | 0:ba7e439238ab | 109 | if len(sys.argv) > 3: |
pavleradojkovic | 0:ba7e439238ab | 110 | myport = int(sys.argv[3]) |
pavleradojkovic | 0:ba7e439238ab | 111 | else: |
pavleradojkovic | 0:ba7e439238ab | 112 | if brokerhost == myhost: |
pavleradojkovic | 0:ba7e439238ab | 113 | myport = brokerport + 1 |
pavleradojkovic | 0:ba7e439238ab | 114 | else: |
pavleradojkovic | 0:ba7e439238ab | 115 | myport = 1883 |
pavleradojkovic | 0:ba7e439238ab | 116 | |
pavleradojkovic | 0:ba7e439238ab | 117 | print("Listening on port", str(myport)+", broker on port", brokerport) |
pavleradojkovic | 0:ba7e439238ab | 118 | s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler) |
pavleradojkovic | 0:ba7e439238ab | 119 | s.serve_forever() |
pavleradojkovic | 0:ba7e439238ab | 120 | |
pavleradojkovic | 0:ba7e439238ab | 121 | if __name__ == "__main__": |
pavleradojkovic | 0:ba7e439238ab | 122 | run() |