Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of M2XStreamClient by
M2XStreamClient.cpp
00001 #include "M2XStreamClient.h" 00002 00003 #include <jsonlite.h> 00004 00005 #include "StreamParseFunctions.h" 00006 #include "LocationParseFunctions.h" 00007 00008 const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; 00009 00010 static int write_delete_values(Print* print, const char* from, const char* end); 00011 int print_encoded_string(Print* print, const char* str); 00012 int tolower(int ch); 00013 00014 #if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) 00015 int tolower(int ch) 00016 { 00017 // Arduino and mbed use ASCII table, so we can simplify the implementation 00018 if ((ch >= 'A') && (ch <= 'Z')) { 00019 return (ch + 32); 00020 } 00021 return ch; 00022 } 00023 #else 00024 // For other platform, we use libc's tolower by default 00025 #include <ctype.h> 00026 #endif 00027 00028 M2XStreamClient::M2XStreamClient(Client* client, 00029 const char* key, 00030 int case_insensitive, 00031 const char* host, 00032 int port) : _client(client), 00033 _key(key), 00034 _case_insensitive(case_insensitive), 00035 _host(host), 00036 _port(port), 00037 _null_print() { 00038 } 00039 00040 #define WRITE_QUERY_PART(client_, started_, name_, str_) { \ 00041 if (str_) { \ 00042 if (started_) { \ 00043 (client_)->print("&"); \ 00044 } else { \ 00045 (client_)->print("?"); \ 00046 started_ = true; \ 00047 } \ 00048 (client_)->print(name_ "="); \ 00049 (client_)->print(str_); \ 00050 } \ 00051 } 00052 00053 int M2XStreamClient::fetchValues(const char* feedId, const char* streamName, 00054 stream_value_read_callback callback, void* context, 00055 const char* startTime, const char* endTime, 00056 const char* limit) { 00057 if (_client->connect(_host, _port)) { 00058 bool query_started = false; 00059 00060 DBGLN("%s", "Connected to M2X server!"); 00061 _client->print("GET /v1/feeds/"); 00062 print_encoded_string(_client, feedId); 00063 _client->print("/streams/"); 00064 print_encoded_string(_client, streamName); 00065 _client->print("/values"); 00066 00067 WRITE_QUERY_PART(_client, query_started, "start", startTime); 00068 WRITE_QUERY_PART(_client, query_started, "end", endTime); 00069 WRITE_QUERY_PART(_client, query_started, "limit", limit); 00070 00071 _client->println(" HTTP/1.0"); 00072 writeHttpHeader(-1); 00073 } else { 00074 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00075 return E_NOCONNECTION; 00076 } 00077 int status = readStatusCode(false); 00078 if (status == 200) { 00079 readStreamValue(callback, context); 00080 } 00081 00082 close(); 00083 return status; 00084 } 00085 00086 int M2XStreamClient::readLocation(const char* feedId, 00087 location_read_callback callback, 00088 void* context) { 00089 if (_client->connect(_host, _port)) { 00090 DBGLN("%s", "Connected to M2X server!"); 00091 _client->print("GET /v1/feeds/"); 00092 print_encoded_string(_client, feedId); 00093 _client->println("/location HTTP/1.0"); 00094 00095 writeHttpHeader(-1); 00096 } else { 00097 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00098 return E_NOCONNECTION; 00099 } 00100 int status = readStatusCode(false); 00101 if (status == 200) { 00102 readLocation(callback, context); 00103 } 00104 00105 close(); 00106 return status; 00107 } 00108 00109 int M2XStreamClient::deleteValues(const char* feedId, const char* streamName, 00110 const char* from, const char* end) { 00111 if (_client->connect(_host, _port)) { 00112 DBGLN("%s", "Connected to M2X server!"); 00113 int length = write_delete_values(&_null_print, from, end); 00114 writeDeleteHeader(feedId, streamName, length); 00115 write_delete_values(_client, from, end); 00116 } else { 00117 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00118 return E_NOCONNECTION; 00119 } 00120 00121 return readStatusCode(true); 00122 } 00123 00124 static int write_delete_values(Print* print, const char* from, 00125 const char* end) { 00126 int bytes = 0; 00127 bytes += print->print("{\"from\":\""); 00128 bytes += print->print(from); 00129 bytes += print->print("\",\"end\":\""); 00130 bytes += print->print(end); 00131 bytes += print->print("\"}"); 00132 return bytes; 00133 } 00134 00135 // Encodes and prints string using Percent-encoding specified 00136 // in RFC 1738, Section 2.2 00137 int print_encoded_string(Print* print, const char* str) { 00138 int bytes = 0; 00139 for (int i = 0; str[i] != 0; i++) { 00140 if (((str[i] >= 'A') && (str[i] <= 'Z')) || 00141 ((str[i] >= 'a') && (str[i] <= 'z')) || 00142 ((str[i] >= '0') && (str[i] <= '9')) || 00143 (str[i] == '-') || (str[i] == '_') || 00144 (str[i] == '.') || (str[i] == '~')) { 00145 bytes += print->print(str[i]); 00146 } else { 00147 // Encode all other characters 00148 bytes += print->print('%'); 00149 bytes += print->print(HEX(str[i] / 16)); 00150 bytes += print->print(HEX(str[i] % 16)); 00151 } 00152 } 00153 return bytes; 00154 } 00155 00156 void M2XStreamClient::writePutHeader(const char* feedId, 00157 const char* streamName, 00158 int contentLength) { 00159 _client->print("PUT /v1/feeds/"); 00160 print_encoded_string(_client, feedId); 00161 _client->print("/streams/"); 00162 print_encoded_string(_client, streamName); 00163 _client->println(" HTTP/1.0"); 00164 00165 writeHttpHeader(contentLength); 00166 } 00167 00168 void M2XStreamClient::writeDeleteHeader(const char* feedId, 00169 const char* streamName, 00170 int contentLength) { 00171 _client->print("DELETE /v1/feeds/"); 00172 print_encoded_string(_client, feedId); 00173 _client->print("/streams/"); 00174 print_encoded_string(_client, streamName); 00175 _client->print("/values"); 00176 _client->println(" HTTP/1.0"); 00177 00178 writeHttpHeader(contentLength); 00179 } 00180 00181 void M2XStreamClient::writeHttpHeader(int contentLength) { 00182 _client->println(USER_AGENT); 00183 _client->print("X-M2X-KEY: "); 00184 _client->println(_key); 00185 00186 _client->print("Host: "); 00187 print_encoded_string(_client, _host); 00188 if (_port != kDefaultM2XPort) { 00189 _client->print(":"); 00190 // port is an integer, does not need encoding 00191 _client->print(_port); 00192 } 00193 _client->println(); 00194 00195 if (contentLength > 0) { 00196 _client->println("Content-Type: application/json"); 00197 DBG("%s", "Content Length: "); 00198 DBGLN("%d", contentLength); 00199 00200 _client->print("Content-Length: "); 00201 _client->println(contentLength); 00202 } 00203 _client->println(); 00204 } 00205 00206 int M2XStreamClient::waitForString(const char* str) { 00207 int currentIndex = 0; 00208 if (str[currentIndex] == '\0') return E_OK; 00209 00210 while (true) { 00211 while (_client->available()) { 00212 char c = _client->read(); 00213 DBG("%c", c); 00214 00215 int cmp; 00216 if (_case_insensitive) { 00217 cmp = tolower(c) - tolower(str[currentIndex]); 00218 } else { 00219 cmp = c - str[currentIndex]; 00220 } 00221 00222 if ((str[currentIndex] == '*') || (cmp == 0)) { 00223 currentIndex++; 00224 if (str[currentIndex] == '\0') { 00225 return E_OK; 00226 } 00227 } else { 00228 // start from the beginning 00229 currentIndex = 0; 00230 } 00231 } 00232 00233 if (!_client->connected()) { 00234 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00235 00236 close(); 00237 return E_DISCONNECTED; 00238 } 00239 00240 delay(1000); 00241 } 00242 // never reached here 00243 return E_NOTREACHABLE; 00244 } 00245 00246 int M2XStreamClient::readStatusCode(bool closeClient) { 00247 int responseCode = 0; 00248 int ret = waitForString("HTTP/*.* "); 00249 if (ret != E_OK) { 00250 if (closeClient) close(); 00251 return ret; 00252 } 00253 00254 // ret is not needed from here(since it must be E_OK), so we can use it 00255 // as a regular variable now. 00256 ret = 0; 00257 while (true) { 00258 while (_client->available()) { 00259 char c = _client->read(); 00260 DBG("%c", c); 00261 00262 responseCode = responseCode * 10 + (c - '0'); 00263 ret++; 00264 if (ret == 3) { 00265 if (closeClient) close(); 00266 return responseCode; 00267 } 00268 } 00269 00270 if (!_client->connected()) { 00271 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00272 00273 if (closeClient) close(); 00274 return E_DISCONNECTED; 00275 } 00276 00277 delay(1000); 00278 } 00279 00280 // never reached here 00281 return E_NOTREACHABLE; 00282 } 00283 00284 int M2XStreamClient::readContentLength() { 00285 int ret = waitForString("Content-Length: "); 00286 if (ret != E_OK) { 00287 return ret; 00288 } 00289 00290 // From now on, ret is not needed, we can use it 00291 // to keep the final result 00292 ret = 0; 00293 while (true) { 00294 while (_client->available()) { 00295 char c = _client->read(); 00296 DBG("%c", c); 00297 00298 if ((c == '\r') || (c == '\n')) { 00299 return (ret == 0) ? (E_INVALID) : (ret); 00300 } else { 00301 ret = ret * 10 + (c - '0'); 00302 } 00303 } 00304 00305 if (!_client->connected()) { 00306 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00307 00308 return E_DISCONNECTED; 00309 } 00310 00311 delay(1000); 00312 } 00313 00314 // never reached here 00315 return E_NOTREACHABLE; 00316 } 00317 00318 int M2XStreamClient::skipHttpHeader() { 00319 return waitForString("\r\n\r\n"); 00320 } 00321 00322 void M2XStreamClient::close() { 00323 // Eats up buffered data before closing 00324 _client->flush(); 00325 _client->stop(); 00326 } 00327 00328 int M2XStreamClient::readStreamValue(stream_value_read_callback callback, 00329 void* context) { 00330 const int BUF_LEN = 64; 00331 char buf[BUF_LEN]; 00332 00333 int length = readContentLength(); 00334 if (length < 0) { 00335 close(); 00336 return length; 00337 } 00338 00339 int index = skipHttpHeader(); 00340 if (index != E_OK) { 00341 close(); 00342 return index; 00343 } 00344 index = 0; 00345 00346 stream_parsing_context_state state; 00347 state.state = state.index = 0; 00348 state.callback = callback; 00349 state.context = context; 00350 00351 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00352 cbs.key_found = on_stream_key_found; 00353 cbs.number_found = on_stream_number_found; 00354 cbs.string_found = on_stream_string_found; 00355 cbs.context.client_state = &state; 00356 00357 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00358 jsonlite_parser_set_callback(p, &cbs); 00359 00360 jsonlite_result result = jsonlite_result_unknown; 00361 while (index < length) { 00362 int i = 0; 00363 00364 DBG("%s", "Received Data: "); 00365 while ((i < BUF_LEN) && _client->available()) { 00366 buf[i++] = _client->read(); 00367 DBG("%c", buf[i - 1]); 00368 } 00369 DBGLNEND; 00370 00371 if ((!_client->connected()) && 00372 (!_client->available()) && 00373 ((index + i) < length)) { 00374 jsonlite_parser_release(p); 00375 close(); 00376 return E_NOCONNECTION; 00377 } 00378 00379 result = jsonlite_parser_tokenize(p, buf, i); 00380 if ((result != jsonlite_result_ok) && 00381 (result != jsonlite_result_end_of_stream)) { 00382 jsonlite_parser_release(p); 00383 close(); 00384 return E_JSON_INVALID; 00385 } 00386 00387 index += i; 00388 } 00389 00390 jsonlite_parser_release(p); 00391 close(); 00392 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00393 } 00394 00395 int M2XStreamClient::readLocation(location_read_callback callback, 00396 void* context) { 00397 const int BUF_LEN = 40; 00398 char buf[BUF_LEN]; 00399 00400 int length = readContentLength(); 00401 if (length < 0) { 00402 close(); 00403 return length; 00404 } 00405 00406 int index = skipHttpHeader(); 00407 if (index != E_OK) { 00408 close(); 00409 return index; 00410 } 00411 index = 0; 00412 00413 location_parsing_context_state state; 00414 state.state = state.index = 0; 00415 state.callback = callback; 00416 state.context = context; 00417 00418 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00419 cbs.key_found = on_location_key_found; 00420 cbs.string_found = on_location_string_found; 00421 cbs.context.client_state = &state; 00422 00423 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00424 jsonlite_parser_set_callback(p, &cbs); 00425 00426 jsonlite_result result = jsonlite_result_unknown; 00427 while (index < length) { 00428 int i = 0; 00429 00430 DBG("%s", "Received Data: "); 00431 while ((i < BUF_LEN) && _client->available()) { 00432 buf[i++] = _client->read(); 00433 DBG("%c", buf[i - 1]); 00434 } 00435 DBGLNEND; 00436 00437 if ((!_client->connected()) && 00438 (!_client->available()) && 00439 ((index + i) < length)) { 00440 jsonlite_parser_release(p); 00441 close(); 00442 return E_NOCONNECTION; 00443 } 00444 00445 result = jsonlite_parser_tokenize(p, buf, i); 00446 if ((result != jsonlite_result_ok) && 00447 (result != jsonlite_result_end_of_stream)) { 00448 jsonlite_parser_release(p); 00449 close(); 00450 return E_JSON_INVALID; 00451 } 00452 00453 index += i; 00454 } 00455 00456 jsonlite_parser_release(p); 00457 close(); 00458 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00459 }
Generated on Wed Jul 13 2022 05:32:06 by
