Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
app.py
00001 import mbed_connector_api # mbed Device Connector library 00002 import pybars # use to fill in handlebar templates 00003 from flask import Flask # framework for hosting webpages 00004 from flask_socketio import SocketIO, emit,send,join_room, leave_room 00005 from base64 import standard_b64decode as b64decode 00006 import os 00007 00008 app = Flask(__name__) 00009 socketio = SocketIO(app,async_mode='threading') 00010 00011 if 'ACCESS_KEY' in os.environ.keys(): 00012 token = os.environ['ACCESS_KEY'] # get access key from environment variable 00013 else: 00014 token = "ChangeMe" # replace with your API token 00015 00016 connector = mbed_connector_api.connector(token) 00017 00018 @app.route('/') 00019 def index(): 00020 # get list of endpoints, for each endpoint get the pattern (/3201/0/5853) value 00021 epList = connector.getEndpoints().result 00022 for index in range(len(epList)): 00023 print "Endpoint Found: ",epList[index]['name'] 00024 e = connector.getResourceValue(epList[index]['name'],"/3201/0/5853") 00025 while not e.isDone(): 00026 None 00027 epList[index]['blinkPattern'] = e.result 00028 print "Endpoint List :",epList 00029 # fill out html using handlebar template 00030 handlebarJSON = {'endpoints':epList} 00031 comp = pybars.Compiler() 00032 source = unicode(open("./views/index.hbs",'r').read()) 00033 template = comp.compile(source) 00034 return "".join(template(handlebarJSON)) 00035 00036 @socketio.on('connect') 00037 def connect(): 00038 print('connect ') 00039 join_room('room') 00040 00041 @socketio.on('disconnect') 00042 def disconnect(): 00043 print('Disconnect') 00044 leave_room('room') 00045 00046 @socketio.on('subscribe_to_presses') 00047 def subscribeToPresses(data): 00048 # Subscribe to all changes of resource /3200/0/5501 (button presses) 00049 print('subscribe_to_presses: ',data) 00050 e = connector.putResourceSubscription(data['endpointName'],'/3200/0/5501') 00051 while not e.isDone(): 00052 None 00053 if e.error: 00054 print("Error: ",e.error.errType, e.error.error, e.raw_data) 00055 else: 00056 print("Subscribed Successfully!") 00057 emit('subscribed-to-presses') 00058 00059 @socketio.on('unsubscribe_to_presses') 00060 def unsubscribeToPresses(data): 00061 print('unsubscribe_to_presses: ',data) 00062 e = connector.deleteResourceSubscription(data['endpointName'],'/3200/0/5501') 00063 while not e.isDone(): 00064 None 00065 if e.error: 00066 print("Error: ",e.error.errType, e.error.error, e.raw_data) 00067 else: 00068 print("Unsubscribed Successfully!") 00069 emit('unsubscribed-to-presses',{"endpointName":data['endpointName'],"value":'True'}) 00070 00071 @socketio.on('get_presses') 00072 def getPresses(data): 00073 # Read data from GET resource /3200/0/5501 (num button presses) 00074 print("get_presses ",data) 00075 e = connector.getResourceValue(data['endpointName'],'/3200/0/5501') 00076 while not e.isDone(): 00077 None 00078 if e.error: 00079 print("Error: ",e.error.errType, e.error.error, e.raw_data) 00080 else: 00081 data_to_emit = {"endpointName":data['endpointName'],"value":e.result} 00082 print data_to_emit 00083 emit('presses', data_to_emit) 00084 00085 @socketio.on('update_blink_pattern') 00086 def updateBlinkPattern(data): 00087 # Set data on PUT resource /3201/0/5853 (pattern of LED blink) 00088 print('update_blink_pattern ',data) 00089 e = connector.putResourceValue(data['endpointName'],'/3201/0/5853',data['blinkPattern']) 00090 while not e.isDone(): 00091 None 00092 if e.error: 00093 print("Error: ",e.error.errType, e.error.error, e.raw_data) 00094 00095 00096 @socketio.on('blink') 00097 def blink(data): 00098 # POST to resource /3201/0/5850 (start blinking LED) 00099 print('blink: ',data) 00100 e = connector.postResource(data['endpointName'],'/3201/0/5850') 00101 while not e.isDone(): 00102 None 00103 if e.error: 00104 print("Error: ",e.error.errType, e.error.error, e.raw_data) 00105 00106 # 'notifications' are routed here, handle subscriptions and update webpage 00107 def notificationHandler(data): 00108 global socketio 00109 print "\r\nNotification Data Received : %s" %data['notifications'] 00110 notifications = data['notifications'] 00111 for thing in notifications: 00112 stuff = {"endpointName":thing["ep"],"value":b64decode(thing["payload"])} 00113 print "Emitting :",stuff 00114 socketio.emit('presses',stuff) 00115 00116 if __name__ == "__main__": 00117 connector.deleteAllSubscriptions() # remove all subscriptions, start fresh 00118 connector.startLongPolling() # start long polling connector.mbed.com 00119 connector.setHandler('notifications', notificationHandler) # send 'notifications' to the notificationHandler FN 00120 socketio.run(app,host='0.0.0.0', port=8080)
Generated on Fri Jul 15 2022 01:15:22 by
1.7.2