First class data visualization and communication library with embedded devices. Code is maintained at github.com/Overdrivr/Telemetry

Dependents:   telemetry_car_demo telemetry_demo_FRDM-TFC telemetry_example_01 telemetry_indexed_data_demo ... more

Files at this revision

API Documentation at this revision

Comitter:
Overdrivr
Date:
Wed Jan 27 17:39:36 2016 +0000
Child:
1:e51abb43c074
Commit message:
Port of github.com/Overdrivr/Telemetry to mbed. Current commit corresponds to release tagged 1.0.1

Changed in this revision

crc16.cpp Show annotated file Show diff for this revision Revisions of this file
crc16.hpp Show annotated file Show diff for this revision Revisions of this file
driver.hpp Show annotated file Show diff for this revision Revisions of this file
framing.cpp Show annotated file Show diff for this revision Revisions of this file
framing.hpp Show annotated file Show diff for this revision Revisions of this file
telemetry.cpp Show annotated file Show diff for this revision Revisions of this file
telemetry.hpp Show annotated file Show diff for this revision Revisions of this file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/crc16.cpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,33 @@
+#include "crc16.hpp"
+
+uint16_t crc16(uint8_t* data, uint32_t len)
+{
+    uint16_t rem  = 0;
+    for(uint16_t i = 0 ; i < len ; i++)
+    {
+        rem = crc16_recursive(data[i],rem);
+    }
+  return rem;
+ }
+
+uint16_t crc16_recursive(uint8_t byte, uint16_t remainder)
+{
+    uint16_t n = 16;
+
+    remainder  = remainder ^ (byte << (n-8));
+
+    for(uint16_t j = 1 ; j < 8 ; j++)
+    {
+        if(remainder & 0x8000)
+        {
+            remainder  = (remainder << 1) ^ 0x1021;
+        }
+        else
+        {
+            remainder  = remainder << 1;
+        }
+        remainder &= 0xffff;
+    }
+
+    return remainder;
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/crc16.hpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,11 @@
+#ifndef CRC16_H_
+#define CRC16_H_
+
+//From : https://en.wikipedia.org/wiki/Computation_of_cyclic_redundancy_checks
+
+#include "stdint.h"
+
+uint16_t crc16(uint8_t * data, uint32_t lenght);
+uint16_t crc16_recursive(uint8_t byte, uint16_t remainder);
+
+#endif
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/driver.hpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,44 @@
+#include "MODSERIAL.h"
+
+// Driver implementation for telemetry (github.com/Overdrivr/Telemetry)
+// Don't forget in your main to aggregate the driver to telemetry (see below)
+/*
+  // Add this code to your main 
+  
+  TM_transport transport;
+  transport.read = read;
+  transport.write = write;
+  transport.readable = readable;
+  transport.writeable = writeable;
+
+  // Feed to transport to telemetry on init
+  init_telemetry(.., &transport);
+*/
+
+MODSERIAL pc(USBTX, USBRX);
+
+int32_t read(void * buf, uint32_t sizeToRead)
+{
+    *(uint8_t*)(buf) = pc.getc();
+    return 1;
+}
+
+int32_t readable()
+{
+    return pc.rxBufferGetCount();
+}
+
+int32_t write(void * buf, uint32_t sizeToWrite)
+{
+    char * ptr = (char *)buf;
+    for(uint32_t i = 0 ; i < sizeToWrite ; i++)
+    {
+        pc.putc(ptr[i]);
+    }
+    return 0;
+}
+
+int32_t writeable()
+{
+    return pc.writeable();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/framing.cpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,184 @@
+#include "framing.hpp"
+
+typedef enum _state _state;
+enum _state
+{
+  IDLE, // No incoming frame is in process
+  ESCAPING, // incoming frame in process, next character to be escaped
+  ACTIVE // frame in process
+};
+
+typedef struct storage storage;
+struct storage
+{
+   uint8_t * ptr;
+   uint32_t size;
+   uint32_t cursor;
+
+};
+
+static storage incomingStorage;
+static storage outgoingStorage;
+
+int8_t safe_append(storage * s, uint8_t byte);
+
+static uint8_t SOF_;
+static uint8_t EOF_;
+static uint8_t ESC_;
+
+static _state incoming_state;
+
+void (*on_incoming_frame_cb)(uint8_t * storage, uint32_t occupiedSize);
+void (*on_error_cb)(int32_t errCode);
+
+void initialize_framing()
+{
+  incomingStorage.ptr = NULL;
+  outgoingStorage.ptr = NULL;
+
+  incomingStorage.size = 0;
+  outgoingStorage.size = 0;
+
+  incomingStorage.cursor = 0;
+  incomingStorage.cursor = 0;
+
+  SOF_ = 0xF7;
+  EOF_ = 0x7F;
+  ESC_ = 0x7D;
+
+  incoming_state = IDLE;
+}
+
+void outgoing_storage(uint8_t * buf, uint32_t bufSize)
+{
+  outgoingStorage.ptr = buf;
+  outgoingStorage.size = bufSize;
+}
+
+void begin()
+{
+  if(outgoingStorage.size == 0 || outgoingStorage.ptr == NULL)
+    return;
+
+  outgoingStorage.cursor = 0;
+
+  // Should not fail
+  safe_append(&outgoingStorage,SOF_);
+}
+
+void append(uint8_t byte)
+{
+  if(outgoingStorage.size == 0 || outgoingStorage.ptr == NULL)
+    return;
+
+  // byte == to flag, need to escape it
+  if(byte == SOF_ || byte == EOF_ || byte == ESC_)
+  {
+    if(!safe_append(&outgoingStorage,ESC_))
+      return;
+  }
+
+  if(!safe_append(&outgoingStorage,byte))
+    return;
+}
+
+void append2(uint16_t twobytes)
+{
+  uint8_t * ptr = (uint8_t*)(&twobytes);
+  append(ptr[0]);
+  append(ptr[1]);
+}
+
+void append4(uint32_t fourbytes)
+{
+  uint8_t * ptr = (uint8_t*)(&fourbytes);
+  append(ptr[0]);
+  append(ptr[1]);
+  append(ptr[2]);
+  append(ptr[3]);
+}
+
+uint32_t end()
+{
+  if(outgoingStorage.size == 0 || outgoingStorage.ptr == NULL)
+    return 0;
+
+  if(!safe_append(&outgoingStorage,EOF_))
+    return 0;
+
+  return outgoingStorage.cursor;
+}
+
+void incoming_storage(uint8_t * buf, uint32_t bufSize)
+{
+  incomingStorage.ptr = buf;
+  incomingStorage.size = bufSize;
+}
+
+void set_on_incoming_frame(void (*callback)(uint8_t * storage, uint32_t occupiedSize))
+{
+  on_incoming_frame_cb = callback;
+}
+
+void set_on_incoming_error(void (*callback)(int32_t errCode))
+{
+  on_error_cb = callback;
+}
+
+void feed(uint8_t byte)
+{
+  if(incomingStorage.size == 0 || incomingStorage.ptr == NULL)
+    return;
+
+  if(incoming_state == ESCAPING)
+  {
+    if(!safe_append(&incomingStorage,byte))
+    {
+      incoming_state = IDLE;
+      return;
+    }
+    incoming_state = ACTIVE;
+    return;
+  }
+
+  if(byte == SOF_)
+  {
+      incoming_state = ACTIVE;
+      incomingStorage.cursor = 0;
+      return;
+  }
+
+  if(incoming_state == ACTIVE)
+  {
+    if(byte == EOF_)
+    {
+        incoming_state = IDLE;
+        on_incoming_frame_cb(incomingStorage.ptr, incomingStorage.cursor);
+    }
+    // Escape next character
+    else if(byte == ESC_)
+    {
+        incoming_state = ESCAPING;
+    }
+    else
+    {
+      if(!safe_append(&incomingStorage,byte))
+      {
+        incoming_state = IDLE;
+        return;
+      }
+      incoming_state = ACTIVE;
+    }
+  }
+}
+
+int8_t safe_append(storage * s, uint8_t byte)
+{
+  // Not enough space for 1 more character
+  if(s->cursor + 1 >= s->size)
+    return 0;
+
+  s->ptr[s->cursor++] = byte;
+
+  return 1;
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/framing.hpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,25 @@
+#ifndef FRAMING_H_
+#define FRAMING_H_
+
+#include "stddef.h"
+#include "stdint.h"
+
+void initialize_framing();
+// Outgoing data
+// Set storage for the outgoing frame
+void outgoing_storage(uint8_t * buf, uint32_t bufSize);
+
+void begin();
+void append(uint8_t byte);
+void append2(uint16_t twobytes);
+void append4(uint32_t fourbytes);
+uint32_t end();
+
+// Incoming data
+// Set storage for the incoming data
+void incoming_storage(uint8_t * buf, uint32_t bufSize);
+
+void set_on_incoming_frame(void (*callback)(uint8_t * storage, uint32_t occupiedSize));
+void set_on_incoming_error(void (*callback)(int32_t errCode));
+void feed(uint8_t byte);
+#endif
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/telemetry.cpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,313 @@
+#include "telemetry.hpp"
+#include "framing.hpp"
+#include "crc16.hpp"
+#include "string.h"
+
+static TM_state * statePtr;
+static TM_transport * transportPtr;
+static uint8_t incomingBuffer[INCOMING_BUFFER_SIZE];
+static uint8_t outgoingBuffer[OUTGOING_BUFFER_SIZE];
+static char topicBuffer[TOPIC_BUFFER_SIZE];
+
+static void (*userCallback)(TM_state * s, TM_msg * m);
+
+uint16_t header(TM_type type);
+uint16_t topic(const char * topic, uint16_t crc);
+uint16_t payload(const void * payload, uint32_t size, uint16_t crc);
+void frame(const char * t, TM_type type, const void * data, uint32_t datasize);
+void send(void * buf, uint32_t size);
+void on_incoming_frame(uint8_t * storage, uint32_t size);
+void on_incoming_error(int32_t errCode);
+void emptyCallback(TM_state * s, TM_msg * m);
+
+void init_telemetry(TM_state* s, TM_transport * t)
+{
+  statePtr = s;
+  transportPtr = t;
+  userCallback = emptyCallback;
+
+  // Setup framing
+  initialize_framing();
+  incoming_storage(incomingBuffer,INCOMING_BUFFER_SIZE);
+  outgoing_storage(outgoingBuffer, OUTGOING_BUFFER_SIZE);
+  set_on_incoming_frame(on_incoming_frame);
+  set_on_incoming_error(on_incoming_error);
+}
+
+uint32_t emplace(TM_msg* m, char * buf, size_t bufSize)
+{
+  if(m->type != TM_string)
+    return 0;
+
+  uint32_t size = m->size;
+
+  if(bufSize - 1 < size)
+    size = bufSize - 1;
+
+  strncpy(buf, (char*)(m->buffer), size);
+  buf[size] = '\0';
+
+  return 1;
+}
+
+uint32_t emplace_u8(TM_msg* m, uint8_t* dst)
+{
+  if(m->type != TM_uint8)
+    return 0;
+
+  memcpy(dst,m->buffer,1);
+  return 1;
+}
+
+uint32_t emplace_u16(TM_msg* m, uint16_t* dst)
+{
+  if(m->type != TM_uint16)
+    return 0;
+
+  memcpy(dst,m->buffer,2);
+  return 1;
+}
+
+uint32_t emplace_u32(TM_msg* m, uint32_t* dst)
+{
+  if(m->type != TM_uint32)
+    return 0;
+
+  memcpy(dst,m->buffer,4);
+  return 1;
+}
+
+uint32_t emplace_i8(TM_msg* m, int8_t* dst)
+{
+  if(m->type != TM_int8)
+    return 0;
+
+  memcpy(dst,m->buffer,1);
+  return 1;
+}
+
+uint32_t emplace_i16(TM_msg* m, int16_t* dst)
+{
+  if(m->type != TM_int16)
+    return 0;
+
+  memcpy(dst,m->buffer,2);
+  return 1;
+}
+
+uint32_t emplace_i32(TM_msg* m, int32_t* dst)
+{
+  if(m->type != TM_int32)
+    return 0;
+
+  memcpy(dst,m->buffer,4);
+  return 1;
+}
+
+uint32_t emplace_f32(TM_msg* m, float* dst)
+{
+  if(m->type != TM_float32)
+    return 0;
+
+  memcpy(dst,m->buffer,4);
+  return 1;
+}
+
+void publish(const char * t, char * msg)
+{
+  frame(t,TM_string,msg,strlen(msg));
+}
+
+void publish_u8(const char * t, uint8_t  msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_uint8,ptr,1);
+}
+
+void publish_u16(const char * t, uint16_t msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_uint16,ptr,2);
+}
+
+void publish_u32(const char * t, uint32_t msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_uint32,ptr,4);
+}
+
+void publish_i8(const char * t, int8_t   msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_int8,ptr,1);
+}
+
+void publish_i16(const char * t, int16_t  msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_int16,ptr,2);
+}
+
+void publish_i32(const char * t, int32_t  msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_int32,ptr,4);
+}
+
+void publish_f32(const char * t, float    msg)
+{
+  void * ptr = (void *)(&msg);
+  frame(t,TM_float32,ptr,4);
+}
+
+void subscribe(void (*callback)(TM_state* s, TM_msg* m))
+{
+  userCallback = callback;
+}
+
+void update_telemetry(float elapsedTime)
+{
+  uint32_t amount = transportPtr->readable();
+  for(uint32_t i = 0 ; i < amount ; i++)
+  {
+    uint8_t c;
+    transportPtr->read(&c,1);
+    feed(c);
+  }
+}
+
+uint16_t header(TM_type type)
+{
+  // header data
+  uint16_t h = type;
+  uint8_t * ptr = (uint8_t*)(&h);
+
+  // add data to frame
+  append2(h);
+
+  // compute crc and return it
+  return crc16(ptr, 2);
+}
+
+uint16_t topic(const char * t, uint16_t crc)
+{
+  const uint8_t * ptr = (uint8_t*)t;
+  for(uint32_t i = 0 ; i < strlen(t) ; i++)
+  {
+    // TODO : Replace with Huffman compression
+    append(ptr[i]);
+    crc = crc16_recursive(ptr[i], crc);
+  }
+  // Add NULL character
+  append(0);
+  return crc16_recursive(0,crc);
+}
+
+uint16_t payload(const void * p, uint32_t size, uint16_t crc)
+{
+  const uint8_t * ptr = (uint8_t*)p;
+  for(uint32_t i = 0 ; i < size ; i++)
+  {
+    append(ptr[i]);
+    crc = crc16_recursive(ptr[i], crc);
+  }
+  return crc;
+}
+
+void frame(const char * t, TM_type type, const void * data, uint32_t datasize)
+{
+  // start new frame
+  begin();
+
+  // header
+  uint16_t crc = header(type);
+
+  // topic
+  crc = topic(t, crc);
+
+  // payload
+  crc = payload(data, datasize, crc);
+
+  // crc
+  append2(crc);
+
+  // complete frame
+  uint32_t bytesAmount = end();
+
+  // send data
+  send(outgoingBuffer, bytesAmount);
+}
+
+void send(void * buf, uint32_t size)
+{
+  if(transportPtr->writeable() && size > 0)
+  {
+    transportPtr->write(outgoingBuffer, size);
+  }
+}
+
+void on_incoming_frame(uint8_t * storage, uint32_t size)
+{
+  if(size < 2)
+    return;
+  // Read header
+  uint16_t head;
+  uint8_t * ptr;
+  ptr = (uint8_t*)(&head);
+  memcpy(ptr,storage,2);
+
+  // Read topic
+  uint32_t cursor = 2;
+  uint32_t topicSize = 0;
+  while(cursor < size)
+  {
+    if(storage[cursor] == 0)
+      break;
+    topicSize++;
+    cursor++;
+  }
+
+  if(topicSize == 0)
+    return;
+
+  // payload = total - header - topic - /0 - crc
+  int32_t payloadSize = size - 2 - topicSize - 1 - 2;
+
+  if(payloadSize <= 0)
+    return;
+
+  // Check crc
+  uint16_t expected_crc = crc16(storage, size-2);
+  uint16_t rcv_crc;
+  ptr = (uint8_t*)(&rcv_crc);
+  memcpy(ptr,storage+size-2,2);
+
+  if(expected_crc != rcv_crc)
+    return;
+
+  // Store topic
+  char * t = (char*)(storage);
+  strcpy(topicBuffer, t + 2);
+
+  // ptr to beginning of payload
+  ptr = (uint8_t*)(storage) + (uint32_t)(2 + topicSize + 1);
+
+  TM_msg packet;
+  packet.topic = topicBuffer;
+  packet.type = (TM_type)head;
+  packet.buffer = (void *)(ptr);
+  packet.size = (uint32_t)payloadSize;
+
+  // Call callback
+  userCallback(statePtr,&packet);
+}
+
+void on_incoming_error(int32_t errCode)
+{
+  // TODO : Error management
+}
+
+void emptyCallback(TM_state * s, TM_msg * m)
+{
+  // Called only if the user forgot to subscribe a callback
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/telemetry.hpp	Wed Jan 27 17:39:36 2016 +0000
@@ -0,0 +1,72 @@
+#ifndef TELEMETRY_H_
+#define TELEMETRY_H_
+
+#include "stddef.h"
+#include "stdint.h"
+
+#define INCOMING_BUFFER_SIZE 128
+#define OUTGOING_BUFFER_SIZE 128
+#define TOPIC_BUFFER_SIZE 64
+
+// Forward declaration of user state
+typedef struct TM_state TM_state;
+
+// Enumeration of supported message payloads
+enum TM_type {
+  TM_float32 = 0,
+  TM_uint8 = 1,
+  TM_uint16 = 2,
+  TM_uint32 = 3,
+  TM_int8 = 4,
+  TM_int16 = 5,
+  TM_int32 = 6,
+  TM_string = 7
+};
+typedef enum TM_type TM_type;
+
+// Data structure for received messages
+typedef struct TM_msg TM_msg;
+struct TM_msg {
+  TM_type type;
+  char * topic;
+  void * buffer;
+  uint32_t size;
+};
+
+// Data structure for holding transport interface
+typedef struct TM_transport TM_transport;
+struct TM_transport {
+  int32_t (*read)(void * buf, uint32_t sizeToRead);
+  int32_t (*readable)();
+  int32_t (*write)(void * buf, uint32_t sizeToWrite);
+  int32_t (*writeable)();
+};
+
+void init_telemetry(TM_state * s, TM_transport * t);
+
+// Decodes TM_msg buffer and emplaces its value into dst
+// Returns 0 if decoding was successful
+uint32_t emplace(TM_msg * m, char * buf, size_t bufSize);
+uint32_t emplace_u8(TM_msg * m, uint8_t * dst);
+uint32_t emplace_u16(TM_msg * m, uint16_t * dst);
+uint32_t emplace_u32(TM_msg * m, uint32_t * dst);
+uint32_t emplace_i8(TM_msg * m, int8_t * dst);
+uint32_t emplace_i16(TM_msg * m, int16_t * dst);
+uint32_t emplace_i32(TM_msg * m, int32_t * dst);
+uint32_t emplace_f32(TM_msg * m, float * dst);
+
+void publish(const char * topic, char * msg);
+void publish_u8(const char * topic, uint8_t msg);
+void publish_u16(const char * topic, uint16_t msg);
+void publish_u32(const char * topic, uint32_t msg);
+void publish_i8(const char * topic, int8_t msg);
+void publish_i16(const char * topic, int16_t msg);
+void publish_i32(const char * topic, int32_t msg);
+void publish_f32(const char * topic, float msg);
+
+void subscribe(void (*callback)(TM_state * s, TM_msg * m));
+
+void update_telemetry(float elapsedTime);
+
+
+#endif