Trond Enger / d7a_1x

Fork of d7a_1x by WizziLab

src/d7a_com.cpp

Committer:
Jeej
Date:
2016-08-22
Revision:
34:1311cc53201a
Parent:
33:f9a542d3efaa
Child:
35:1fe2975c5a63

File content as of revision 34:1311cc53201a:

#include "mbed.h"
#include "rtos.h"
#include "dbg.h"
#include "d7a_com.h"
#include "d7a_common.h"
#include "d7a_fs.h"
#include "d7a_modem.h"
#include "d7a_sys.h"


#define XON_SIGNAL      0x0001

typedef struct {
    uint32_t len;
    uint8_t* buf;
} d7a_com_tx_buf_t;


typedef struct {
    bool                    started;
    
    DigitalOut*             rts;
    InterruptIn*            cts;
        
    // RX buffer address
    uint8_t*                rx_buffer;
    // RX buffer size
    uint16_t                rx_buffer_size;
    // Write index in RX buffer
    volatile uint16_t       write_idx;
    // Read index in RX buffer
    uint16_t                read_idx;
    // Number of bytes available in RX buffer
    volatile uint16_t       data_available;
    // Last Number of available data
    volatile uint16_t       last_available;
    // Number of skipped bytes while parsing
    uint16_t                skipped_bytes;
    
    // Port TX sequence number
    uint8_t                 tx_seq;
    // Port RX sequence number
    uint8_t                 rx_seq;
    // Response to command maximum time
    uint32_t                timeout;
    
    // Waiting for data available in RX buffer
    Semaphore*              data_parsing;
    // CTS management
    Semaphore*              cts_int;
    
    // Data treatment threads
    Thread*                 rx_thread;
    Serial*                 serial;
    
    // Tx Thread
    Thread*                 tx_thread;
    Queue<d7a_com_tx_buf_t, 32> tx_queue;
    
    Timer                   tim;

} d7a_com_ctx_t;


static d7a_com_ctx_t g_com_ctx;

/**
    Thread for parsing packets from RX buffer.

    @param void
    @return void
*/
void d7a_com_rx_thread( void const *p );
void d7a_com_tx_thread( void const *p );


/**
    Serial Rx Interrupt Service Routine.
    Add recevied bytes to the RX buffer.

    @param void
    @return void
*/
void rx_isr()
{
// Loop just in case more than one character is in UART's receive FIFO buffer
// Stop if buffer full
    while (g_com_ctx.serial->readable())
    {
        
        g_com_ctx.rx_buffer[g_com_ctx.write_idx] = g_com_ctx.serial->getc();
        //PRINT("-");
        
        if (g_com_ctx.started)
        {
            g_com_ctx.data_available++;
            
            g_com_ctx.write_idx = (g_com_ctx.write_idx >= (g_com_ctx.rx_buffer_size-1))? 0 : g_com_ctx.write_idx+1;
            //g_com_ctx.write_idx = (g_com_ctx.write_idx + 1) % g_com_ctx.rx_buffer_size;
        }
    }
    
    ASSERT(g_com_ctx.data_available <= g_com_ctx.rx_buffer_size, "RX Buffer overflow! (%d/%d)\r\n", g_com_ctx.data_available, g_com_ctx.rx_buffer_size);
    
    // unlock data parsing thread
    g_com_ctx.data_parsing->release();
}

/**
    CTS pin Interrupt Service Routine.
    For flow control (not yet inplemented)

    @param void
    @return void
*/
void cts_isr()
{
    //PRINT("CTS_INT\r\n");
    //g_com_ctx.cts_int->release();
}

// D7a_com constructor.
// Opens a serial port and monitors the input for ALP packets.
// Pins are those of the host, not the modem:
// TX-host -> RX-modem
// RX-host <- TX-modem
// RTS-host -> CTS-modem
// CTS-host <- RTS-modem
void d7a_com_open( const d7a_com_config_t* config )
{
    FPRINT("\r\n");
    
    g_com_ctx.started = false;
    g_com_ctx.rx_buffer_size = config->rx_buffer_size;
    g_com_ctx.rx_buffer = (uint8_t*)MALLOC(g_com_ctx.rx_buffer_size);
    g_com_ctx.data_available = 0;
    g_com_ctx.last_available = 0;
    g_com_ctx.skipped_bytes = 0;
    g_com_ctx.write_idx = 0;
    g_com_ctx.read_idx = 0;
    g_com_ctx.tx_seq = 0;
    g_com_ctx.rx_seq = 0;
    
    g_com_ctx.serial =          new Serial(config->tx, config->rx);
    g_com_ctx.rts =             new DigitalOut(config->rts);
    g_com_ctx.cts =             new InterruptIn(config->cts);
    g_com_ctx.data_parsing =    new Semaphore(0);
    g_com_ctx.cts_int =         new Semaphore(0);
    
    g_com_ctx.cts->rise(&cts_isr);

    /* XXX: Unknown bug:
    Baud rate can be found with high error rate (> 4%).
    It is here manualy corrected after an oscilloscope measure.
    Normal Baudrate should be 115200.
    */
    //g_com_ctx.serial->baud(117000); // XXX
    g_com_ctx.serial->baud(115200); // XXX
    //g_com_ctx.serial->baud(57600); // XXX
    g_com_ctx.serial->format(8, SerialBase::None, 1);
    g_com_ctx.serial->attach(&rx_isr, Serial::RxIrq);
    
    g_com_ctx.tx_thread =       new Thread(d7a_com_tx_thread, NULL, osPriorityHigh, DEFAULT_STACK_SIZE);
    g_com_ctx.rx_thread =       new Thread(d7a_com_rx_thread, NULL, osPriorityHigh, DEFAULT_STACK_SIZE);
}

// Destructor
void d7a_com_close()
{
    FPRINT("\r\n");
    g_com_ctx.rx_thread->terminate();
    g_com_ctx.tx_thread->terminate();
    delete g_com_ctx.data_parsing;
    delete g_com_ctx.cts_int;
    delete g_com_ctx.serial;
    delete g_com_ctx.rts;
    delete g_com_ctx.cts;
    FREE(g_com_ctx.rx_buffer);
}

void d7a_com_start(void)
{
    FPRINT("\r\n");
    g_com_ctx.started = true;
}

void d7a_com_restart(void)
{
    FPRINT("\r\n");
    g_com_ctx.started = false;
    g_com_ctx.data_available = 0;
    g_com_ctx.last_available = 0;
    g_com_ctx.skipped_bytes = 0;
    g_com_ctx.write_idx = 0;
    g_com_ctx.read_idx = 0;
    g_com_ctx.tx_seq = 0;
    g_com_ctx.rx_seq = 0;
    
    d7a_com_tx_buf_t* msg;
    d7a_com_rx_msg_t* pkt;
    osEvent evt;
    
    // Flush TX queue
    do
    {
        // wait for data
        evt = g_com_ctx.tx_queue.get(1);
        msg = (evt.status == osEventMessage)? (d7a_com_tx_buf_t*)evt.value.p : NULL;
        
        // free message
        if (msg != NULL)
        {
            FREE(msg->buf);
            FREE(msg);
        }
    } while (msg != NULL);
    
    // Flush RX
    for (int i = 0 ; i < 3 ; i++)
    {
        do
        {
            // wait for data
            if (i == 0)
            {
                pkt = d7a_modem_wait_pkt(1);
            }
            else if (i == 1)
            {
                pkt = d7a_fs_wait_pkt(1);
            }
            else
            {
                pkt = d7a_sys_wait_pkt(1);
            }
            
            // free message
            if (pkt != NULL)
            {
                FREE(pkt);
            }
        } while (pkt != NULL);
    }
    
    g_com_ctx.started = true;
}

static void d7a_com_copy_to_linear(uint8_t* linear_buffer, uint16_t read_start, uint16_t length)
{
    uint16_t first_part_size;
    uint16_t second_part_size;
    
    ASSERT(length <= g_com_ctx.rx_buffer_size, "Length too long (%d) for buffer size (%d)\r\n", length, g_com_ctx.rx_buffer_size);
    
    read_start %= g_com_ctx.rx_buffer_size;
    first_part_size = (length > (g_com_ctx.rx_buffer_size - read_start))? g_com_ctx.rx_buffer_size - read_start : length;
    
    memcpy((void*)&linear_buffer[0], (const void*)&g_com_ctx.rx_buffer[read_start], first_part_size);
    
    // The circular buffer is wrapping
    if (length > first_part_size)
    {
        second_part_size = length - first_part_size;
        memcpy((void*)&linear_buffer[first_part_size], (const void*)&g_com_ctx.rx_buffer[0], second_part_size);
    }
}

/**
    Wakes-up modem and send data throught Serial.

    @param const uint8_t*       Pointer to data buffer
    @param int                  Data length
    @return void
*/
static void d7a_com_send(const uint8_t* buffer, int length)
{    
    FPRINT("(len:%d)\r\n", length);
    ASSERT(g_com_ctx.started, "COM not started!\r\n");
    
    *(g_com_ctx.rts) = 1;
    
    //dbg_print_data("%02X ", (uint8_t*)buffer, length);
    //DPRINT("\r\n");
    Thread::wait(5);
    
    for (uint32_t i=0 ; i<length ; i++)
    {
        g_com_ctx.serial->putc(buffer[i]);
    }
   
    *(g_com_ctx.rts) = 0;
}


static void d7a_com_post_tx(uint8_t* buffer, uint32_t length)
{
    FPRINT("(len:%d)\r\n", length);
    ASSERT(g_com_ctx.started, "COM not started!\r\n");
    
    d7a_com_tx_buf_t* tx_buf = (d7a_com_tx_buf_t*)MALLOC(sizeof(d7a_com_tx_buf_t));
    
    tx_buf->len = length;
    tx_buf->buf = buffer;    
    
    g_com_ctx.tx_queue.put(tx_buf);
}

// Formats and send packet throught Serial.
void d7a_com_send_msg(d7a_com_tx_msg_t* msg)
{    
    uint8_t* buf;
    uint16_t len = msg->alen + msg->plen;
    FPRINT("(len:%d)\r\n", len);
    
    buf = (uint8_t*)MALLOC(KAL_COM_HEADER_LEN + len);
    
    // construct serial header
    // concatenate and update tx_seq ID
    buf[0] = (uint8_t)KAL_COM_SYNC_BYTE_0;
    buf[1] = (uint8_t)KAL_COM_SYNC_BYTE_1;
    buf[2] = (uint8_t)len;
    buf[3] = (uint8_t)g_com_ctx.tx_seq++;
    buf[4] = (uint8_t)msg->id;
    len += KAL_COM_HEADER_LEN;

    // copy payload and parameters
    memcpy(buf + KAL_COM_HEADER_LEN, msg->pbuf, msg->plen);
    memcpy(buf + KAL_COM_HEADER_LEN + msg->plen, msg->abuf, msg->alen);
    
    DPRINT("<-- (0x%02X) %d\r\n", buf[4], (len - KAL_COM_HEADER_LEN));
    
    d7a_com_send(buf, len);
}

void d7a_com_post_msg(d7a_com_tx_msg_t* msg)
{    
    uint8_t* buf;
    uint16_t len = msg->alen + msg->plen;
    FPRINT("(len:%d)\r\n", len);
    
    buf = (uint8_t*)MALLOC(KAL_COM_HEADER_LEN + len);
    
    // construct serial header
    // concatenate and update tx_seq ID
    buf[0] = (uint8_t)KAL_COM_SYNC_BYTE_0;
    buf[1] = (uint8_t)KAL_COM_SYNC_BYTE_1;
    buf[2] = (uint8_t)len;
    buf[3] = (uint8_t)g_com_ctx.tx_seq++;
    buf[4] = (uint8_t)msg->id;
    len += KAL_COM_HEADER_LEN;

    // copy payload and parameters
    memcpy(buf + KAL_COM_HEADER_LEN, msg->pbuf, msg->plen);
    memcpy(buf + KAL_COM_HEADER_LEN + msg->plen, msg->abuf, msg->alen);
    
    d7a_com_post_tx(buf, len);
}


void d7a_com_dump(uint8_t* buf, uint8_t len, d7a_com_flow_t flow)
{
    d7a_com_tx_msg_t msg;

    msg.id = flow;
    msg.pbuf = buf;
    msg.plen = len;
    msg.alen = 0;
    d7a_com_post_msg(&msg);
}

void d7a_com_force_dump(uint8_t* buf, uint8_t len, d7a_com_flow_t flow)
{
    d7a_com_tx_msg_t msg;

    msg.id = flow;
    msg.pbuf = buf;
    msg.plen = len;
    msg.alen = 0;
    d7a_com_send_msg(&msg);
}

static void d7a_com_new_pkt(d7a_com_rx_msg_t* pkt)
{
    //FPRINT("\r\n");
    if (KAL_COM_FLOWID(pkt->id) != KAL_COM_FLOWID_TRC)
    {
        DPRINT("--> (0x%02X) %d\r\n", pkt->id, pkt->blen);
    }
    
    // Distribute packet types to processes
    switch (KAL_COM_FLOWID(pkt->id))
    {
        case KAL_COM_FLOWID_FS:
            d7a_fs_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_CMD:
            d7a_modem_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_SYS:
            // This has to be here to avoid going to another process
            if (pkt->id == KAL_COM_FLOW_SYS_XON)
            {
                g_com_ctx.tx_thread->signal_set(XON_SIGNAL);
                FREE(pkt);
            }
            else if (pkt->id == KAL_COM_FLOW_SYS_XOFF)
            {
                d7a_sys_xack();
                FREE(pkt);
            }
            else
            {
                if (KAL_COM_FLOW_SYS_PONG == pkt->id)
                {
                    g_com_ctx.tim.stop();
                    uint32_t time_us = g_com_ctx.tim.read_us();
                    IPRINT("Ping in %d.%03dms\r\n", time_us/1000, time_us%1000);
                }
                d7a_sys_new_pkt(pkt);
            }
            break;
        case KAL_COM_FLOWID_TRC:
            FREE(pkt);
            break;
        default:
            EPRINT("Untreated pkt type 0x%02X\r\n", pkt->id);
            FREE(pkt);
            break;
    }
}


/**
    Reads the Rx buffer, parses the packets

    @param void
    @return void
*/
static void parse_packet(void)
{
    uint16_t base = 0;
    uint16_t current = 0;
    uint8_t byte;
    
    // Packet fields
    uint8_t seqnum = 0;
    uint8_t len = 0;
    uint8_t id = 0;
    
    // Stats
    uint16_t nb_pkt = 0;
    
    while (g_com_ctx.data_available >= (base + KAL_COM_HEADER_LEN + 1))
    {
        // Pop data
        byte = g_com_ctx.rx_buffer[(g_com_ctx.read_idx + (base + current)) % g_com_ctx.rx_buffer_size];
        DPRINT("Treating base %d byte %d of %d: 0x%02X\r\n", base, current, g_com_ctx.data_available, byte);

        // search first sync byte
        if (0 == current)
        {
            WARNING(g_com_ctx.skipped_bytes != 1, "COM not sync. Searching for header.\r\n");
        
            if(KAL_COM_SYNC_BYTE_0 == byte)
            {
                current++;
            }
            else
            {
                // Not a packet. Restart at base + 1
                g_com_ctx.skipped_bytes++;
                base++;
            }
        }
        // look up second sync byte
        else if (1 == current)
        {
            if(KAL_COM_SYNC_BYTE_1 == byte)
            {
                current++;
            }
            else
            {
                // Not a packet. Restart at base + 1
                g_com_ctx.skipped_bytes++;
                base++;
                current = 0;
            }
        }
        // look up packet size
        else if (2 == current)
        {
            len = byte;
            // if not enough data is available
            if (g_com_ctx.data_available < (KAL_COM_HEADER_LEN + len + base))
            {
                // exit parser
                DPRINT("len exit\r\n");
                break;
            }
            
            current++;
        }
        // get seqnum
        else if (3 == current)
        {
            seqnum = byte;
            current++;
        }
        // get id
        else if (4 == current)
        {
            id = byte;
            current++;
        }
        // we got a full packet
        else if (5 == current)
        {
            if (g_com_ctx.skipped_bytes)
            {
                WARNING(false, "COM header found, skipped %d bytes.\r\n", g_com_ctx.skipped_bytes);
#if 1
                uint8_t* temp = (uint8_t*)MALLOC(g_com_ctx.skipped_bytes);
                d7a_com_copy_to_linear(temp, g_com_ctx.read_idx + (base - g_com_ctx.skipped_bytes), g_com_ctx.skipped_bytes);
                PRINT_DATA("", "%02X ", temp, g_com_ctx.skipped_bytes, "\r\n");
                FREE(temp);
#endif
                g_com_ctx.skipped_bytes = 0;
            }
            
            // Update seqnum
            WARNING(g_com_ctx.rx_seq == seqnum, "COM Bad seqnum expected:%d got:%d (base:%d curr:%d avail:%d read:%d)\r\n",
                g_com_ctx.rx_seq, seqnum, base, current, g_com_ctx.data_available, g_com_ctx.read_idx);
                
            g_com_ctx.rx_seq = seqnum + 1;
            
            nb_pkt++;
            
            //DPRINT("Got packet id 0x%02X len %d\r\n", id, len);
            
            if (KAL_COM_FLOWID(id) != KAL_COM_FLOWID_TRC)
            {
                d7a_com_rx_msg_t* pkt = NULL;
                pkt = (d7a_com_rx_msg_t*)MALLOC(sizeof(d7a_com_rx_msg_t) - 1 + len);
    
                // copy data to buffer
                pkt->blen = len;
                pkt->id = id;
                d7a_com_copy_to_linear(pkt->buffer, g_com_ctx.read_idx + (base + current), len);
                
                // add packet to queue
                d7a_com_new_pkt(pkt);
            }
            else
            {
                // Ignore packet
            }
            
            // Search next packet
            base += current + len;
            current = 0;
        }
        else
        {
            ASSERT(false, "D7A COM parser!\r\n");
        }
    }
    
    // update buffer indexes
    g_com_ctx.read_idx = (g_com_ctx.read_idx + base) % g_com_ctx.rx_buffer_size;
    g_com_ctx.data_available -= base;
    
    if (base) {
        DPRINT("Parsed %d bytes of %d (%d packets)\r\n", base, g_com_ctx.data_available + base, nb_pkt);
    }
}


// Thread for parsing packets from RX buffer.
void d7a_com_rx_thread( void const *p )
{
    FPRINT("\r\n");
    while (true)
    {
        // wait for data available
        g_com_ctx.data_parsing->wait();
        // search for packets
        if ((g_com_ctx.last_available != g_com_ctx.data_available) && (g_com_ctx.data_available >= (KAL_COM_HEADER_LEN + 1)))
        {
            g_com_ctx.last_available = g_com_ctx.data_available;
            parse_packet();
        }
    }
}

void d7a_com_tx_thread( void const *p )
{
    FPRINT("\r\n");
    
    d7a_com_tx_buf_t* msg;
    osEvent evt;
    uint8_t flow_id;
    
    while (true)
    {
        // wait for data to send
        evt = g_com_ctx.tx_queue.get();
        msg = (evt.status == osEventMessage)? (d7a_com_tx_buf_t*)evt.value.p : NULL;
        
        // send message
        if (msg != NULL)
        {
            flow_id = msg->buf[4];
            DPRINT("<-- (0x%02X) %d\r\n", flow_id, (msg->len - KAL_COM_HEADER_LEN));

            d7a_com_send(msg->buf, msg->len);
            FREE(msg->buf);
            FREE(msg);
                        
            if (KAL_COM_FLOW_SYS_PING == flow_id)
            {
                g_com_ctx.tim.reset();
                g_com_ctx.tim.start();
            }
            
            if (KAL_COM_FLOW_SYS_XACK == flow_id)
            {
                DPRINT("XOFF\r\n");
                g_com_ctx.tx_thread->signal_wait(XON_SIGNAL);
                DPRINT("XON\r\n");
            }
        }
    }
}