branch with improvemnts
Fork of M2XStreamClient by
Embed:
(wiki syntax)
Show/hide line numbers
M2XStreamClient.cpp
00001 #include "M2XStreamClient.h" 00002 00003 #include <jsonlite.h> 00004 00005 #include "StreamParseFunctions.h" 00006 #include "LocationParseFunctions.h" 00007 00008 const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com"; 00009 00010 static int write_delete_values(Print* print, const char* from, const char* end); 00011 int print_encoded_string(Print* print, const char* str); 00012 int tolower(int ch); 00013 00014 #if defined(ARDUINO_PLATFORM) || defined(MBED_PLATFORM) 00015 int tolower(int ch) 00016 { 00017 // Arduino and mbed use ASCII table, so we can simplify the implementation 00018 if ((ch >= 'A') && (ch <= 'Z')) { 00019 return (ch + 32); 00020 } 00021 return ch; 00022 } 00023 #else 00024 // For other platform, we use libc's tolower by default 00025 #include <ctype.h> 00026 #endif 00027 00028 M2XStreamClient::M2XStreamClient(Client* client, 00029 const char* key, 00030 int case_insensitive, 00031 const char* host, 00032 int port) : _client(client), 00033 _key(key), 00034 _case_insensitive(case_insensitive), 00035 _host(host), 00036 _port(port), 00037 _null_print() { 00038 } 00039 00040 #define WRITE_QUERY_PART(client_, started_, name_, str_) { \ 00041 if (str_) { \ 00042 if (started_) { \ 00043 (client_)->print("&"); \ 00044 } else { \ 00045 (client_)->print("?"); \ 00046 started_ = true; \ 00047 } \ 00048 (client_)->print(name_ "="); \ 00049 (client_)->print(str_); \ 00050 } \ 00051 } 00052 00053 int M2XStreamClient::fetchValues(const char* feedId, const char* streamName, 00054 stream_value_read_callback callback, void* context, 00055 const char* startTime, const char* endTime, 00056 const char* limit) { 00057 if (_client->connect(_host, _port)) { 00058 bool query_started = false; 00059 00060 DBGLN("%s", "Connected to M2X server!"); 00061 _client->print("GET /v1/feeds/"); 00062 print_encoded_string(_client, feedId); 00063 _client->print("/streams/"); 00064 print_encoded_string(_client, streamName); 00065 _client->print("/values"); 00066 00067 WRITE_QUERY_PART(_client, query_started, "start", startTime); 00068 WRITE_QUERY_PART(_client, query_started, "end", endTime); 00069 WRITE_QUERY_PART(_client, query_started, "limit", limit); 00070 00071 _client->println(" HTTP/1.0"); 00072 writeHttpHeader(-1); 00073 } else { 00074 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00075 return E_NOCONNECTION; 00076 } 00077 int status = readStatusCode(false); 00078 if (status == 200) { 00079 readStreamValue(callback, context); 00080 } 00081 00082 close(); 00083 return status; 00084 } 00085 00086 int M2XStreamClient::readLocation(const char* feedId, 00087 location_read_callback callback, 00088 void* context) { 00089 if (_client->connect(_host, _port)) { 00090 DBGLN("%s", "Connected to M2X server!"); 00091 _client->print("GET /v1/feeds/"); 00092 print_encoded_string(_client, feedId); 00093 _client->println("/location HTTP/1.0"); 00094 00095 writeHttpHeader(-1); 00096 } else { 00097 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00098 return E_NOCONNECTION; 00099 } 00100 int status = readStatusCode(false); 00101 if (status == 200) { 00102 readLocation(callback, context); 00103 } 00104 00105 close(); 00106 return status; 00107 } 00108 00109 int M2XStreamClient::deleteValues(const char* feedId, const char* streamName, 00110 const char* from, const char* end) { 00111 if (_client->connect(_host, _port)) { 00112 DBGLN("%s", "Connected to M2X server!"); 00113 int length = write_delete_values(&_null_print, from, end); 00114 writeDeleteHeader(feedId, streamName, length); 00115 write_delete_values(_client, from, end); 00116 } else { 00117 DBGLN("%s", "ERROR: Cannot connect to M2X server!"); 00118 return E_NOCONNECTION; 00119 } 00120 00121 return readStatusCode(true); 00122 } 00123 00124 static int write_delete_values(Print* print, const char* from, 00125 const char* end) { 00126 int bytes = 0; 00127 bytes += print->print("{\"from\":\""); 00128 bytes += print->print(from); 00129 bytes += print->print("\",\"end\":\""); 00130 bytes += print->print(end); 00131 bytes += print->print("\"}"); 00132 return bytes; 00133 } 00134 00135 // Encodes and prints string using Percent-encoding specified 00136 // in RFC 1738, Section 2.2 00137 int print_encoded_string(Print* print, const char* str) { 00138 int bytes = 0; 00139 for (int i = 0; str[i] != 0; i++) { 00140 if (((str[i] >= 'A') && (str[i] <= 'Z')) || 00141 ((str[i] >= 'a') && (str[i] <= 'z')) || 00142 ((str[i] >= '0') && (str[i] <= '9')) || 00143 (str[i] == '-') || (str[i] == '_') || 00144 (str[i] == '.') || (str[i] == '~')) { 00145 bytes += print->print(str[i]); 00146 } else { 00147 // Encode all other characters 00148 bytes += print->print('%'); 00149 bytes += print->print(HEX(str[i] / 16)); 00150 bytes += print->print(HEX(str[i] % 16)); 00151 } 00152 } 00153 return bytes; 00154 } 00155 00156 void M2XStreamClient::writePutHeader(const char* feedId, 00157 const char* streamName, 00158 int contentLength) { 00159 _client->print("PUT /v1/feeds/"); 00160 print_encoded_string(_client, feedId); 00161 _client->print("/streams/"); 00162 print_encoded_string(_client, streamName); 00163 _client->println(" HTTP/1.0"); 00164 00165 writeHttpHeader(contentLength); 00166 } 00167 00168 void M2XStreamClient::writeDeleteHeader(const char* feedId, 00169 const char* streamName, 00170 int contentLength) { 00171 _client->print("DELETE /v1/feeds/"); 00172 print_encoded_string(_client, feedId); 00173 _client->print("/streams/"); 00174 print_encoded_string(_client, streamName); 00175 _client->print("/values"); 00176 _client->println(" HTTP/1.0"); 00177 00178 writeHttpHeader(contentLength); 00179 } 00180 00181 void M2XStreamClient::writeHttpHeader(int contentLength) { 00182 _client->println(USER_AGENT); 00183 _client->print("X-M2X-KEY: "); 00184 _client->println(_key); 00185 00186 _client->print("Host: "); 00187 print_encoded_string(_client, _host); 00188 if (_port != kDefaultM2XPort) { 00189 _client->print(":"); 00190 // port is an integer, does not need encoding 00191 _client->print(_port); 00192 } 00193 _client->println(); 00194 00195 if (contentLength > 0) { 00196 _client->println("Content-Type: application/json"); 00197 DBG("%s", "Content Length: "); 00198 DBGLN("%d", contentLength); 00199 00200 _client->print("Content-Length: "); 00201 _client->println(contentLength); 00202 } 00203 _client->println(); 00204 } 00205 00206 int M2XStreamClient::waitForString(const char* str) { 00207 int currentIndex = 0; 00208 if (str[currentIndex] == '\0') return E_OK; 00209 00210 while (true) { 00211 while (_client->available()) { 00212 char c = _client->read(); 00213 DBG("%c", c); 00214 00215 int cmp; 00216 if (_case_insensitive) { 00217 cmp = tolower(c) - tolower(str[currentIndex]); 00218 } else { 00219 cmp = c - str[currentIndex]; 00220 } 00221 00222 if ((str[currentIndex] == '*') || (cmp == 0)) { 00223 currentIndex++; 00224 if (str[currentIndex] == '\0') { 00225 return E_OK; 00226 } 00227 } else { 00228 // start from the beginning 00229 currentIndex = 0; 00230 } 00231 } 00232 00233 if (!_client->connected()) { 00234 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00235 00236 close(); 00237 return E_DISCONNECTED; 00238 } 00239 00240 delay(1000); 00241 } 00242 // never reached here 00243 return E_NOTREACHABLE; 00244 } 00245 00246 int M2XStreamClient::readStatusCode(bool closeClient) { 00247 int responseCode = 0; 00248 int ret = waitForString("HTTP/*.* "); 00249 if (ret != E_OK) { 00250 if (closeClient) close(); 00251 return ret; 00252 } 00253 00254 // ret is not needed from here(since it must be E_OK), so we can use it 00255 // as a regular variable now. 00256 ret = 0; 00257 while (true) { 00258 while (_client->available()) { 00259 char c = _client->read(); 00260 DBG("%c", c); 00261 00262 responseCode = responseCode * 10 + (c - '0'); 00263 ret++; 00264 if (ret == 3) { 00265 if (closeClient) close(); 00266 return responseCode; 00267 } 00268 } 00269 00270 if (!_client->connected()) { 00271 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00272 00273 if (closeClient) close(); 00274 return E_DISCONNECTED; 00275 } 00276 00277 delay(1000); 00278 } 00279 00280 // never reached here 00281 return E_NOTREACHABLE; 00282 } 00283 00284 int M2XStreamClient::readContentLength() { 00285 int ret = waitForString("Content-Length: "); 00286 if (ret != E_OK) { 00287 return ret; 00288 } 00289 00290 // From now on, ret is not needed, we can use it 00291 // to keep the final result 00292 ret = 0; 00293 while (true) { 00294 while (_client->available()) { 00295 char c = _client->read(); 00296 DBG("%c", c); 00297 00298 if ((c == '\r') || (c == '\n')) { 00299 return (ret == 0) ? (E_INVALID) : (ret); 00300 } else { 00301 ret = ret * 10 + (c - '0'); 00302 } 00303 } 00304 00305 if (!_client->connected()) { 00306 DBGLN("%s", "ERROR: The client is disconnected from the server!"); 00307 00308 return E_DISCONNECTED; 00309 } 00310 00311 delay(1000); 00312 } 00313 00314 // never reached here 00315 return E_NOTREACHABLE; 00316 } 00317 00318 int M2XStreamClient::skipHttpHeader() { 00319 return waitForString("\r\n\r\n"); 00320 } 00321 00322 void M2XStreamClient::close() { 00323 // Eats up buffered data before closing 00324 _client->flush(); 00325 _client->stop(); 00326 } 00327 00328 int M2XStreamClient::readStreamValue(stream_value_read_callback callback, 00329 void* context) { 00330 const int BUF_LEN = 64; 00331 char buf[BUF_LEN]; 00332 00333 int length = readContentLength(); 00334 if (length < 0) { 00335 close(); 00336 return length; 00337 } 00338 00339 int index = skipHttpHeader(); 00340 if (index != E_OK) { 00341 close(); 00342 return index; 00343 } 00344 index = 0; 00345 00346 stream_parsing_context_state state; 00347 state.state = state.index = 0; 00348 state.callback = callback; 00349 state.context = context; 00350 00351 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00352 cbs.key_found = on_stream_key_found; 00353 cbs.number_found = on_stream_number_found; 00354 cbs.string_found = on_stream_string_found; 00355 cbs.context.client_state = &state; 00356 00357 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00358 jsonlite_parser_set_callback(p, &cbs); 00359 00360 jsonlite_result result = jsonlite_result_unknown; 00361 while (index < length) { 00362 int i = 0; 00363 00364 DBG("%s", "Received Data: "); 00365 while ((i < BUF_LEN) && _client->available()) { 00366 buf[i++] = _client->read(); 00367 DBG("%c", buf[i - 1]); 00368 } 00369 DBGLNEND; 00370 00371 if ((!_client->connected()) && 00372 (!_client->available()) && 00373 ((index + i) < length)) { 00374 jsonlite_parser_release(p); 00375 close(); 00376 return E_NOCONNECTION; 00377 } 00378 00379 result = jsonlite_parser_tokenize(p, buf, i); 00380 if ((result != jsonlite_result_ok) && 00381 (result != jsonlite_result_end_of_stream)) { 00382 jsonlite_parser_release(p); 00383 close(); 00384 return E_JSON_INVALID; 00385 } 00386 00387 index += i; 00388 } 00389 00390 jsonlite_parser_release(p); 00391 close(); 00392 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00393 } 00394 00395 int M2XStreamClient::readLocation(location_read_callback callback, 00396 void* context) { 00397 const int BUF_LEN = 40; 00398 char buf[BUF_LEN]; 00399 00400 int length = readContentLength(); 00401 if (length < 0) { 00402 close(); 00403 return length; 00404 } 00405 00406 int index = skipHttpHeader(); 00407 if (index != E_OK) { 00408 close(); 00409 return index; 00410 } 00411 index = 0; 00412 00413 location_parsing_context_state state; 00414 state.state = state.index = 0; 00415 state.callback = callback; 00416 state.context = context; 00417 00418 jsonlite_parser_callbacks cbs = jsonlite_default_callbacks; 00419 cbs.key_found = on_location_key_found; 00420 cbs.string_found = on_location_string_found; 00421 cbs.context.client_state = &state; 00422 00423 jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5)); 00424 jsonlite_parser_set_callback(p, &cbs); 00425 00426 jsonlite_result result = jsonlite_result_unknown; 00427 while (index < length) { 00428 int i = 0; 00429 00430 DBG("%s", "Received Data: "); 00431 while ((i < BUF_LEN) && _client->available()) { 00432 buf[i++] = _client->read(); 00433 DBG("%c", buf[i - 1]); 00434 } 00435 DBGLNEND; 00436 00437 if ((!_client->connected()) && 00438 (!_client->available()) && 00439 ((index + i) < length)) { 00440 jsonlite_parser_release(p); 00441 close(); 00442 return E_NOCONNECTION; 00443 } 00444 00445 result = jsonlite_parser_tokenize(p, buf, i); 00446 if ((result != jsonlite_result_ok) && 00447 (result != jsonlite_result_end_of_stream)) { 00448 jsonlite_parser_release(p); 00449 close(); 00450 return E_JSON_INVALID; 00451 } 00452 00453 index += i; 00454 } 00455 00456 jsonlite_parser_release(p); 00457 close(); 00458 return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID); 00459 }
Generated on Wed Jul 13 2022 05:32:06 by 1.7.2