branch with improvemnts

Fork of M2XStreamClient by AT&T M2X Team

M2XStreamClient.cpp

Committer:
citrusbyte
Date:
2013-10-24
Revision:
5:ea68c8980ad8
Child:
6:e6d66d99dd6f

File content as of revision 5:ea68c8980ad8:

#include "M2XStreamClient.h"

#include <jsonlite.h>

#include "StreamParseFunctions.h"
#include "LocationParseFunctions.h"

#define HEX(t_) ((char) (((t_) > 9) ? ((t_) - 10 + 'A') : ((t_) + '0')))
#define MAX_DOUBLE_DIGITS 7

const char* M2XStreamClient::kDefaultM2XHost = "api-m2x.att.com";
const char* kUserAgentLine = "User-Agent: M2X Arduino Client/0.1";

static int print_encoded_string(Print* print, const char* str);

M2XStreamClient::M2XStreamClient(Client* client,
                                 const char* key,
                                 const char* host,
                                 int port) : _client(client),
                                             _key(key),
                                             _host(host),
                                             _port(port),
                                             _null_print() {
}

int M2XStreamClient::send(const char* feedId,
                          const char* streamName,
                          double value) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    writeSendHeader(feedId, streamName,
                    // 6 for "value="
                    _null_print.print(value) + 6);
    _client->print("value=");
    // value is a double, does not need encoding
    _client->print(value);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }

  return readStatusCode(true);
}

int M2XStreamClient::send(const char* feedId,
                          const char* streamName,
                          long value) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    writeSendHeader(feedId, streamName,
                    // 6 for "value="
                    _null_print.print(value) + 6);

    _client->print("value=");
    // value is a long, does not need encoding
    _client->print(value);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }

  return readStatusCode(true);
}

int M2XStreamClient::send(const char* feedId,
                          const char* streamName,
                          int value) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    writeSendHeader(feedId, streamName,
                    // 6 for "value="
                    _null_print.print(value) + 6);

    _client->print("value=");
    // value is an int, does not need encoding
    _client->print(value);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }

  return readStatusCode(true);
}

int M2XStreamClient::send(const char* feedId,
                          const char* streamName,
                          const char* value) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    writeSendHeader(feedId, streamName,
                    // 6 for "value="
                    _null_print.print(value) + 6);

    _client->print("value=");
    print_encoded_string(_client, value);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }

  return readStatusCode(true);
}

int M2XStreamClient::receive(const char* feedId, const char* streamName,
                             stream_value_read_callback callback, void* context) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    _client->print("GET /v1/feeds/");
    print_encoded_string(_client, feedId);
    _client->print("/streams/");
    print_encoded_string(_client, streamName);
    _client->println("/values HTTP/1.0");

    writeHttpHeader(-1);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }
  int status = readStatusCode(false);
  if (status == 200) {
    readStreamValue(callback, context);
  }

  close();
  return status;
}

int M2XStreamClient::readLocation(const char* feedId,
                                  location_read_callback callback,
                                  void* context) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif
    _client->print("GET /v1/feeds/");
    print_encoded_string(_client, feedId);
    _client->println("/location HTTP/1.0");

    writeHttpHeader(-1);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }
  int status = readStatusCode(false);
  if (status == 200) {
    readLocation(callback, context);
  }

  close();
  return status;
}

// Encodes and prints string using Percent-encoding specified
// in RFC 1738, Section 2.2
static int print_encoded_string(Print* print, const char* str) {
  int bytes = 0;
  for (int i = 0; str[i] != 0; i++) {
    if (((str[i] >= 'A') && (str[i] <= 'Z')) ||
        ((str[i] >= 'a') && (str[i] <= 'z')) ||
        ((str[i] >= '0') && (str[i] <= '9')) ||
        (str[i] == '-') || (str[i] == '_') ||
        (str[i] == '.') || (str[i] == '~')) {
      bytes += print->print(str[i]);
    } else {
      // Encode all other characters
      bytes += print->print('%');
      bytes += print->print(HEX(str[i] / 16));
      bytes += print->print(HEX(str[i] % 16));
    }
  }
  return bytes;
}

static int write_location_data(Print* print, const char* name,
                               double latitude, double longitude,
                               double elevation) {
  int bytes = 0;
  bytes += print->print("name=");
  bytes += print_encoded_string(print, name);
  bytes += print->print("&latitude=");
  bytes += print->print(latitude, MAX_DOUBLE_DIGITS);
  bytes += print->print("&longitude=");
  bytes += print->print(longitude, MAX_DOUBLE_DIGITS);
  bytes += print->print("&elevation=");
  bytes += print->print(elevation);
  return bytes;
}

static int write_location_data(Print* print, const char* name,
                               const char* latitude, const char* longitude,
                               const char* elevation) {
  int bytes = 0;
  bytes += print->print("name=");
  bytes += print_encoded_string(print, name);
  bytes += print->print("&latitude=");
  bytes += print_encoded_string(print, latitude);
  bytes += print->print("&longitude=");
  bytes += print_encoded_string(print, longitude);
  bytes += print->print("&elevation=");
  bytes += print_encoded_string(print, elevation);
  return bytes;
}

int M2XStreamClient::updateLocation(const char* feedId,
                                    const char* name,
                                    double latitude,
                                    double longitude,
                                    double elevation) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif

    int length = write_location_data(&_null_print, name, latitude, longitude,
                                     elevation);
    _client->print("PUT /v1/feeds/");
    print_encoded_string(_client, feedId);
    _client->println("/location HTTP/1.0");

    writeHttpHeader(length);
    write_location_data(_client, name, latitude, longitude, elevation);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }
  return readStatusCode(true);
}

int M2XStreamClient::updateLocation(const char* feedId,
                                    const char* name,
                                    const char* latitude,
                                    const char* longitude,
                                    const char* elevation) {
  if (_client->connect(_host, _port)) {
#ifdef DEBUG
    printf("Connected to M2X server!\n");
#endif

    int length = write_location_data(&_null_print, name, latitude, longitude,
                                     elevation);
    _client->print("PUT /v1/feeds/");
    print_encoded_string(_client, feedId);
    _client->println("/location HTTP/1.0");

    writeHttpHeader(length);
    write_location_data(_client, name, latitude, longitude, elevation);
  } else {
#ifdef DEBUG
    printf("ERROR: Cannot connect to M2X server!\n");
#endif
    return E_NOCONNECTION;
  }
  return readStatusCode(true);
}

void M2XStreamClient::writeSendHeader(const char* feedId,
                                      const char* streamName,
                                      int contentLength) {
  _client->print("PUT /v1/feeds/");
  print_encoded_string(_client, feedId);
  _client->print("/streams/");
  print_encoded_string(_client, streamName);
  _client->println(" HTTP/1.0");
  
  writeHttpHeader(contentLength);
}

void M2XStreamClient::writeHttpHeader(int contentLength) {
  _client->println(kUserAgentLine);
  _client->print("X-M2X-KEY: ");
  _client->println(_key);
  
  _client->print("Host: ");
  print_encoded_string(_client, _host);
  if (_port != kDefaultM2XPort) {
    _client->print(":");
    // port is an integer, does not need encoding
    _client->print(_port);
  }
  _client->println();

  if (contentLength > 0) {
    _client->println("Content-Type: application/x-www-form-urlencoded");
#ifdef DEBUG
    printf("Content Length: %d\n", contentLength);
#endif
    _client->print("Content-Length: ");
    _client->println(contentLength);
  }
  _client->println();
}

int M2XStreamClient::waitForString(const char* str) {
  int currentIndex = 0;
  if (str[currentIndex] == '\0') return E_OK;

  while (true) {
    while (_client->available()) {
      char c = _client->read();
#ifdef DEBUG
      printf("%c", c);
#endif

      if ((str[currentIndex] == '*') ||
          (c == str[currentIndex])) {
        currentIndex++;
        if (str[currentIndex] == '\0') {
          return E_OK;
        }
      } else {
        // start from the beginning
        currentIndex = 0;
      }
    }

    if (!_client->connected()) {
#ifdef DEBUG
      printf("ERROR: The client is disconnected from the server!\n");
#endif
      close();
      return E_DISCONNECTED;
    }

    delay(1000);
  }
  // never reached here
  return E_NOTREACHABLE;
}

int M2XStreamClient::readStatusCode(bool closeClient) {
  int responseCode = 0;
  int ret = waitForString("HTTP/*.* ");
  if (ret != E_OK) {
    if (closeClient) close();
    return ret;
  }

  // ret is not needed from here(since it must be E_OK), so we can use it
  // as a regular variable now.
  ret = 0;
  while (true) {
    while (_client->available()) {
      char c = _client->read();
#ifdef DEBUG
      printf("%c", c);
#endif
      responseCode = responseCode * 10 + (c - '0');
      ret++;
      if (ret == 3) {
        if (closeClient) close();
        return responseCode;
      }
    }

    if (!_client->connected()) {
#ifdef DEBUG
      printf("ERROR: The client is disconnected from the server!\n");
#endif
      if (closeClient) close();
      return E_DISCONNECTED;
    }

    delay(1000);
  }

  // never reached here
  return E_NOTREACHABLE;
}

int M2XStreamClient::readContentLength() {
  int ret = waitForString("Content-Length: ");
  if (ret != E_OK) {
    return ret;
  }

  // From now on, ret is not needed, we can use it
  // to keep the final result
  ret = 0;
  while (true) {
    while (_client->available()) {
      char c = _client->read();
#ifdef DEBUG
      printf("%c", c);
#endif
      if ((c == '\r') || (c == '\n')) {
        return (ret == 0) ? (E_INVALID) : (ret);
      } else {
        ret = ret * 10 + (c - '0');
      }
    }

    if (!_client->connected()) {
#ifdef DEBUG
      printf("ERROR: The client is disconnected from the server!\n");
#endif
      return E_DISCONNECTED;
    }

    delay(1000);
  }

  // never reached here
  return E_NOTREACHABLE;
}

int M2XStreamClient::skipHttpHeader() {
  return waitForString("\r\n\r\n");
}

void M2XStreamClient::close() {
  // Eats up buffered data before closing
  _client->flush();
  _client->stop();
}

int M2XStreamClient::readStreamValue(stream_value_read_callback callback,
                                     void* context) {
  const int BUF_LEN = 32;
  char buf[BUF_LEN];

  int length = readContentLength();
  if (length < 0) {
    close();
    return length;
  }

  int index = skipHttpHeader();
  if (index != E_OK) {
    close();
    return index;
  }
  index = 0;

  stream_parsing_context_state state;
  state.state = state.index = 0;
  state.callback = callback;
  state.context = context;

  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
  cbs.key_found = on_stream_key_found;
  cbs.string_found = on_stream_string_found;
  cbs.context.client_state = &state;

  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
  jsonlite_parser_set_callback(p, &cbs);

  jsonlite_result result = jsonlite_result_unknown;
  while (index < length) {
    int i = 0;

#ifdef DEBUG
    printf("Received Data: ");
#endif
    while ((i < BUF_LEN) && _client->available()) {
      buf[i++] = _client->read();
#ifdef DEBUG
      printf("%c", buf[i - 1]);
#endif
    }
#ifdef DEBUG
      printf("\n");
#endif

    if ((!_client->connected()) &&
        (!_client->available()) &&
        ((index + i) < length)) {
      jsonlite_parser_release(p);
      close();
      return E_NOCONNECTION;
    }

    result = jsonlite_parser_tokenize(p, buf, i);
    if ((result != jsonlite_result_ok) &&
        (result != jsonlite_result_end_of_stream)) {
      jsonlite_parser_release(p);
      close();
      return E_JSON_INVALID;
    }

    index += i;
  }

  jsonlite_parser_release(p);
  close();
  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}

int M2XStreamClient::readLocation(location_read_callback callback,
                                  void* context) {
  const int BUF_LEN = 40;
  char buf[BUF_LEN];

  int length = readContentLength();
  if (length < 0) {
    close();
    return length;
  }

  int index = skipHttpHeader();
  if (index != E_OK) {
    close();
    return index;
  }
  index = 0;

  location_parsing_context_state state;
  state.state = state.index = 0;
  state.callback = callback;
  state.context = context;

  jsonlite_parser_callbacks cbs = jsonlite_default_callbacks;
  cbs.key_found = on_location_key_found;
  cbs.string_found = on_location_string_found;
  cbs.context.client_state = &state;

  jsonlite_parser p = jsonlite_parser_init(jsonlite_parser_estimate_size(5));
  jsonlite_parser_set_callback(p, &cbs);

  jsonlite_result result = jsonlite_result_unknown;
  while (index < length) {
    int i = 0;

#ifdef DEBUG
    printf("Received Data: ");
#endif
    while ((i < BUF_LEN) && _client->available()) {
      buf[i++] = _client->read();
#ifdef DEBUG
      printf("%c", buf[i - 1]);
#endif
    }
#ifdef DEBUG
    printf("\n");
#endif

    if ((!_client->connected()) &&
        (!_client->available()) &&
        ((index + i) < length)) {
      jsonlite_parser_release(p);
      close();
      return E_NOCONNECTION;
    }

    result = jsonlite_parser_tokenize(p, buf, i);
    if ((result != jsonlite_result_ok) &&
        (result != jsonlite_result_end_of_stream)) {
      jsonlite_parser_release(p);
      close();
      return E_JSON_INVALID;
    }

    index += i;
  }

  jsonlite_parser_release(p);
  close();
  return (result == jsonlite_result_ok) ? (E_OK) : (E_JSON_INVALID);
}