M2XStreamClient fork with a workaround in M2XStreamClient.cpp for the MTS_Wifi_Connect_M2X example.
Dependents: STM32_MTS_Wifi_Connect_M2X M2X_STM32_MTS_Temp MTS_WiFi_Connect_M2X_Example
Fork of M2XStreamClient by
M2XStreamClient.cpp
00001 #include "M2XStreamClient.h" 00002 00003 #include <jsonlite.h> 00004 00005 #include "StreamParseFunctions.h" 00006 #include "LocationParseFunctions.h" 00007 00008 const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; 00009 00010 int print_encoded_string(Print* print, const char* str); 00011 int tolower(int ch); 00012 00013 #if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) 00014 int tolower(int ch) 00015 { 00016 // Arduino and mbed use ASCII table, so we can simplify the implementation 00017 if ((ch >= 'A') && (ch <= 'Z')) { 00018 return (ch + 32); 00019 } 00020 return ch; 00021 } 00022 #else 00023 // For other platform, we use libc's tolower by default 00024 #include <ctype.h> 00025 #endif 00026 00027 M2XStreamClient::M2XStreamClient(Client* client, 00028 const char* key, 00029 int case_insensitive, 00030 const char* host, 00031 int port) : _client(client), 00032 _key(key), 00033 _case_insensitive(case_insensitive), 00034 _host(host), 00035 _port(port), 00036 _null_print() { 00037 } 00038 00039 #define WRITE_QUERY_PART(client_, started_, name_, str_) { \ 00040 if (str_) { \ 00041 if (started_) { \ 00042 (client_)->print("&"); \ 00043 } else { \ 00044 (client_)->print("?"); \ 00045 started_ = true; \ 00046 } \ 00047 (client_)->print(name_ "="); \ 00048 (client_)->print(str_); \ 00049 } \ 00050 } 00051 00052 int M2XStreamClient::fetchValues(const char* feedId, const char* streamName, 00053 stream_value_read_callback callback, void* context, 00054 const char* startTime, const char* endTime, 00055 const char* limit) { 00056 if (_client->connect(_host, _port)) { 00057 bool query_started = false; 00058 00059 DBGLN("%s", "Connected to M2X server!"); 00060 _client->print("GET /v1/feeds/"); 00061 print_encoded_string(_client, feedId); 00062 _client->print("/streams/"); 00063 print_encoded_string(_client, streamName); 00064 _client->print("/values"); 00065 00066 WRITE_QUERY_PART(_client, query_started, "start", startTime); 00067 WRITE_QUERY_PART(_client, query_started, "end", endTime); 00068 WRITE_QUERY_PART(_client, query_started, "limit", limit); 00069 00070 _client->println(" HTTP/1.0"); 00071 writeHttpHeader(-1); 00072 } else { 00073 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00074 return E_NOCONNECTION; 00075 } 00076 int status = readStatusCode(false); 00077 if (status == 200) { 00078 readStreamValue(callback, context); 00079 } 00080 00081 close(); 00082 return status; 00083 } 00084 00085 int M2XStreamClient::readLocation(const char* feedId, 00086 location_read_callback callback, 00087 void* context) { 00088 if (_client->connect(_host, _port)) { 00089 DBGLN("%s", "Connected to M2X server!"); 00090 _client->print("GET /v1/feeds/"); 00091 print_encoded_string(_client, feedId); 00092 _client->println("/location HTTP/1.0"); 00093 00094 writeHttpHeader(-1); 00095 } else { 00096 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00097 return E_NOCONNECTION; 00098 } 00099 int status = readStatusCode(false); 00100 if (status == 200) { 00101 readLocation(callback, context); 00102 } 00103 00104 close(); 00105 return status; 00106 } 00107 00108 // Encodes and prints string using Percent-encoding specified 00109 // in RFC 1738, Section 2.2 00110 int print_encoded_string(Print* print, const char* str) { 00111 int bytes = 0; 00112 for (int i = 0; str[i] != 0; i++) { 00113 if (((str[i] >= 'A') && (str[i] <= 'Z')) || 00114 ((str[i] >= 'a') && (str[i] <= 'z')) || 00115 ((str[i] >= '0') && (str[i] <= '9')) || 00116 (str[i] == '-') || (str[i] == '_') || 00117 (str[i] == '.') || (str[i] == '~')) { 00118 bytes += print->print(str[i]); 00119 } else { 00120 // Encode all other characters 00121 bytes += print->print('%'); 00122 bytes += print->print(HEX(str[i] / 16)); 00123 bytes += print->print(HEX(str[i] % 16)); 00124 } 00125 } 00126 return bytes; 00127 } 00128 00129 void M2XStreamClient::writePostHeader(const char* feedId, 00130 const char* streamName, 00131 int contentLength) { 00132 _client->print("PUT /v1/feeds/"); 00133 print_encoded_string(_client, feedId); 00134 _client->print("/streams/"); 00135 print_encoded_string(_client, streamName); 00136 _client->println(" HTTP/1.0"); 00137 00138 writeHttpHeader(contentLength); 00139 } 00140 00141 void M2XStreamClient::writeHttpHeader(int contentLength) { 00142 _client->println(USER_AGENT); 00143 _client->print("X-M2X-KEY: "); 00144 _client->println(_key); 00145 00146 _client->print("Host: "); 00147 print_encoded_string(_client, _host); 00148 if (_port != kDefaultM2XPort) { 00149 _client->print(":"); 00150 // port is an integer, does not need encoding 00151 _client->print(_port); 00152 } 00153 _client->println(); 00154 00155 if (contentLength > 0) { 00156 _client->println("Content-Type: application/json"); 00157 DBG("%s", "Content Length: "); 00158 DBGLN("%d", contentLength); 00159 00160 _client->print("Content-Length: "); 00161 _client->println(contentLength); 00162 } 00163 _client->println(); 00164 } 00165 00166 int M2XStreamClient::waitForString(const char* str) { 00167 int currentIndex = 0; 00168 if (str[currentIndex] == '\0') return E_OK; 00169 00170 while (true) { 00171 while (_client->available()) { 00172 char c = _client->read(); 00173 DBG("%c", c); 00174 00175 int cmp; 00176 if (_case_insensitive) { 00177 cmp = tolower(c) - tolower(str[currentIndex]); 00178 } else { 00179 cmp = c - str[currentIndex]; 00180 } 00181 00182 if ((str[currentIndex] == '*') || (cmp == 0)) { 00183 currentIndex++; 00184 if (str[currentIndex] == '\0') { 00185 return E_OK; 00186 } 00187 } else { 00188 // start from the beginning 00189 currentIndex = 0; 00190 } 00191 } 00192 00193 if (!_client->connected()) { 00194 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00195 00196 close(); 00197 return E_DISCONNECTED; 00198 } 00199 00200 delay(1000); 00201 } 00202 // never reached here 00203 return E_NOTREACHABLE; 00204 } 00205 00206 int M2XStreamClient::readStatusCode(bool closeClient) { 00207 int responseCode = 0; 00208 int ret = waitForString("HTTP/*.* "); 00209 if (ret != E_OK) { 00210 if (closeClient) close(); 00211 return ret; 00212 } 00213 00214 // ret is not needed from here(since it must be E_OK), so we can use it 00215 // as a regular variable now. 00216 ret = 0; 00217 while (true) { 00218 while (_client->available()) { 00219 char c = _client->read(); 00220 DBG("%c", c); 00221 00222 responseCode = responseCode * 10 + (c - '0'); 00223 ret++; 00224 if (ret == 3) { 00225 if (closeClient) close(); 00226 return responseCode; 00227 } 00228 } 00229 00230 if (!_client->connected()) { 00231 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00232 00233 if (closeClient) close(); 00234 return E_DISCONNECTED; 00235 } 00236 00237 delay(1000); 00238 } 00239 00240 // never reached here 00241 return E_NOTREACHABLE; 00242 } 00243 00244 int M2XStreamClient::readContentLength() { 00245 int ret = waitForString("Content-Length: "); 00246 if (ret != E_OK) { 00247 return ret; 00248 } 00249 00250 // From now on, ret is not needed, we can use it 00251 // to keep the final result 00252 ret = 0; 00253 while (true) { 00254 while (_client->available()) { 00255 char c = _client->read(); 00256 DBG("%c", c); 00257 00258 if ((c == '\r') || (c == '\n')) { 00259 return (ret == 0) ? (E_INVALID) : (ret); 00260 } else { 00261 ret = ret * 10 + (c - '0'); 00262 } 00263 } 00264 00265 if (!_client->connected()) { 00266 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00267 00268 return E_DISCONNECTED; 00269 } 00270 00271 delay(1000); 00272 } 00273 00274 // never reached here 00275 return E_NOTREACHABLE; 00276 } 00277 00278 int M2XStreamClient::skipHttpHeader() { 00279 return waitForString("\r\n\r\n"); 00280 } 00281 00282 void M2XStreamClient::close() { 00283 // Eats up buffered data before closing 00284 _client->flush(); 00285 _client->stop(); 00286 } 00287 00288 int M2XStreamClient::readStreamValue(stream_value_read_callback callback, 00289 void* context) { 00290 const int BUF_LEN = 32; 00291 char buf[BUF_LEN]; 00292 00293 int length = readContentLength(); 00294 if (length < 0) { 00295 close(); 00296 return length; 00297 } 00298 00299 int index = skipHttpHeader(); 00300 if (index != E_OK) { 00301 close(); 00302 return index; 00303 } 00304 index = 0; 00305 00306 stream_parsing_context_state state; 00307 state.state = state.index = 0; 00308 state.callback = callback; 00309 state.context = context; 00310 00311 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00312 cbs.key_found = on_stream_key_found; 00313 cbs.string_found = on_stream_string_found; 00314 cbs.context.client_state = &state; 00315 00316 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00317 jsonlite_parser_set_callback(p, &cbs); 00318 00319 jsonlite_result result = jsonlite_result_unknown; 00320 00321 wait_ms(100); // Workaround - 100ms delay necessary for data fetches to work 00322 while (index < length) { 00323 int i = 0; 00324 00325 DBG("%s", "Received Data: "); 00326 while ((i < BUF_LEN) && _client->available()) { 00327 buf[i++] = _client->read(); 00328 DBG("%c", buf[i - 1]); 00329 } 00330 DBGLNEND; 00331 00332 if ((!_client->connected()) && 00333 (!_client->available()) && 00334 ((index + i) < length)) { 00335 jsonlite_parser_release(p); 00336 close(); 00337 return E_NOCONNECTION; 00338 } 00339 00340 result = jsonlite_parser_tokenize(p, buf, i); 00341 if ((result != jsonlite_result_ok) && 00342 (result != jsonlite_result_end_of_stream)) { 00343 jsonlite_parser_release(p); 00344 close(); 00345 return E_JSON_INVALID; 00346 } 00347 00348 index += i; 00349 } 00350 00351 jsonlite_parser_release(p); 00352 close(); 00353 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00354 } 00355 00356 int M2XStreamClient::readLocation(location_read_callback callback, 00357 void* context) { 00358 const int BUF_LEN = 40; 00359 char buf[BUF_LEN]; 00360 00361 int length = readContentLength(); 00362 if (length < 0) { 00363 close(); 00364 return length; 00365 } 00366 00367 int index = skipHttpHeader(); 00368 if (index != E_OK) { 00369 close(); 00370 return index; 00371 } 00372 index = 0; 00373 00374 location_parsing_context_state state; 00375 state.state = state.index = 0; 00376 state.callback = callback; 00377 state.context = context; 00378 00379 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00380 cbs.key_found = on_location_key_found; 00381 cbs.string_found = on_location_string_found; 00382 cbs.context.client_state = &state; 00383 00384 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00385 jsonlite_parser_set_callback(p, &cbs); 00386 00387 jsonlite_result result = jsonlite_result_unknown; 00388 00389 wait_ms(100); // Workaround - 100ms delay necessary for data fetches to work 00390 while (index < length) { 00391 int i = 0; 00392 00393 DBG("%s", "Received Data: "); 00394 while ((i < BUF_LEN) && _client->available()) { 00395 buf[i++] = _client->read(); 00396 DBG("%c", buf[i - 1]); 00397 } 00398 DBGLNEND; 00399 00400 if ((!_client->connected()) && 00401 (!_client->available()) && 00402 ((index + i) < length)) { 00403 jsonlite_parser_release(p); 00404 close(); 00405 return E_NOCONNECTION; 00406 } 00407 00408 result = jsonlite_parser_tokenize(p, buf, i); 00409 if ((result != jsonlite_result_ok) && 00410 (result != jsonlite_result_end_of_stream)) { 00411 jsonlite_parser_release(p); 00412 close(); 00413 return E_JSON_INVALID; 00414 } 00415 00416 index += i; 00417 } 00418 00419 jsonlite_parser_release(p); 00420 close(); 00421 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00422 }
Generated on Fri Jul 15 2022 12:10:59 by 1.7.2