Trond Enger / d7a_1x

Fork of d7a_1x by WizziLab

src/d7a_com.cpp

Committer:
Jeej
Date:
2016-05-25
Revision:
30:d775c1409849
Parent:
28:0376b97b4b55
Child:
31:ab9bfdbc6b44

File content as of revision 30:d775c1409849:

#include "mbed.h"
#include "rtos.h"
#include "dbg.h"
#include "d7a_com.h"
#include "d7a_common.h"
#include "d7a_fs.h"
#include "d7a_modem.h"
#include "d7a_sys.h"

#define MAX_WAITING_THREADS     16
static volatile bool g_xon;
static volatile uint8_t g_waiting_nb;


typedef struct {
    DigitalOut*             rts;
    InterruptIn*            cts;
        
    // RX buffer address
    uint8_t*                rx_buffer;
    // RX buffer size
    uint16_t                rx_buffer_size;
    // Write index in RX buffer
    volatile uint16_t       write_idx;
    // Read index in RX buffer
    uint16_t                read_idx;
    // Number of bytes available in RX buffer
    volatile uint16_t       data_available;
    // Port TX sequence number
    uint8_t                 tx_seq;
    // Port RX sequence number
    uint8_t                 rx_seq;
    // Response to command maximum time
    uint32_t                timeout;
    
    // Packet reception queue
    Queue<d7a_com_rx_msg_t, 32> pkt_queue;
    
    // Waiting for data available in RX buffer
    Semaphore*              data_parsing;
    // CTS management
    Semaphore*              cts_int;
    // XON/XOFF management
    Semaphore*              xonoff;
    
    // Data treatment threads
    Thread*                 thread;
    Serial*                 serial;
    
    uint32_t                rx_pkt_count;
    uint32_t                rx_byte_count;
    uint32_t                tx_pkt_count;
    uint32_t                tx_byte_count;
} d7a_com_ctx_t;

static d7a_com_ctx_t g_com_ctx;

/**
    Thread for parsing packets from RX buffer.

    @param void
    @return void
*/
void d7a_com_thread( void const *p );


/**
    Serial Rx Interrupt Service Routine.
    Add recevied bytes to the RX buffer.

    @param void
    @return void
*/
void rx_isr()
{
// Loop just in case more than one character is in UART's receive FIFO buffer
// Stop if buffer full
    while ((g_com_ctx.serial->readable()) && (((g_com_ctx.write_idx + 1) % g_com_ctx.rx_buffer_size) != g_com_ctx.read_idx)) {
        g_com_ctx.rx_buffer[g_com_ctx.write_idx] = g_com_ctx.serial->getc();
        g_com_ctx.data_available++;
        //DPRINT(".");

        g_com_ctx.write_idx = (g_com_ctx.write_idx + 1) % g_com_ctx.rx_buffer_size;
    }
            
    // unlock data parsing thread
    // do not parse if minimum packet size is not available
    if (g_com_ctx.data_available > KAL_COM_HEADER_LEN)
    {
        g_com_ctx.data_parsing->release();
    }
    
    return;
}

/**
    CTS pin Interrupt Service Routine.
    For flow control (not yet inplemented)

    @param void
    @return void
*/
void cts_isr()
{
    g_com_ctx.cts_int->release();
    return;
}

// D7a_com constructor.
// Opens a serial port and monitors the input for ALP packets.
// Pins are those of the host, not the modem:
// TX-host -> RX-modem
// RX-host <- TX-modem
// RTS-host -> CTS-modem
// CTS-host <- RTS-modem
void d7a_com_open( const d7a_com_config_t* config )
{
    FPRINT("\r\n");
    
    g_com_ctx.rx_buffer_size = config->rx_buffer_size;
    g_com_ctx.rx_buffer = (uint8_t*)MALLOC(g_com_ctx.rx_buffer_size);
    g_com_ctx.cts->rise(&cts_isr);
    g_com_ctx.data_available = 0;
    g_com_ctx.write_idx = 0;
    g_com_ctx.read_idx = 0;
    g_com_ctx.tx_seq = 0;
    g_com_ctx.rx_seq = 0;
    
    g_com_ctx.serial =          new Serial(config->tx, config->rx);
    g_com_ctx.rts =             new DigitalOut(config->rts);
    g_com_ctx.cts =             new InterruptIn(config->cts);
    g_com_ctx.data_parsing =    new Semaphore(1);
    g_com_ctx.cts_int =         new Semaphore(1);
    g_com_ctx.xonoff =          new Semaphore(MAX_WAITING_THREADS);
    g_com_ctx.thread =          new Thread(d7a_com_thread, NULL, osPriorityBelowNormal, DEFAULT_STACK_SIZE*2);
        
    /* XXX: Unknown bug:
    Baud rate can be found with high error rate (> 4%).
    It is here manualy corrected after an oscilloscope measure.
    Normal Baudrate should be 115200.
    */
    g_com_ctx.serial->baud(117000); // XXX
    //g_com_ctx.serial->baud(58500); // XXX
    g_com_ctx.serial->format(8, SerialBase::None, 1);
    g_com_ctx.serial->attach(&rx_isr, Serial::RxIrq);
    
    // Consume Semaphore
    for(uint8_t i=0 ; i<MAX_WAITING_THREADS ; i++)
    {
        g_com_ctx.xonoff->wait();
    }
    g_xon = true;
    g_waiting_nb = 0;
}

// Destructor
void d7a_com_close()
{
    FPRINT("\r\n");
    g_com_ctx.thread->terminate();
    delete g_com_ctx.data_parsing;
    delete g_com_ctx.cts_int;
    delete g_com_ctx.serial;
    delete g_com_ctx.rts;
    delete g_com_ctx.cts;
    FREE(g_com_ctx.rx_buffer);
}


/**
    Wakes-up modem and send data throught Serial.

    @param const uint8_t*       Pointer to data buffer
    @param int                  Data length
    @return void
*/
static void d7a_com_send(const uint8_t* buffer, int length)
{    
    int i;
    
    if (!g_xon)
    {
        // Wait if COM port is OFF
        g_waiting_nb++;
        DPRINT("COM WAITING (%d threads)\r\n", g_waiting_nb);
        g_com_ctx.xonoff->wait();
    }
    
    *(g_com_ctx.rts) = 1;
    
    Thread::wait(5);
    
    for (i=0 ; i<length ; i++)
    {
        g_com_ctx.serial->putc(buffer[i]);
    }
   
    *(g_com_ctx.rts) = 0;
}

// Formats and send packet throught Serial.
void d7a_com_send_msg(d7a_com_tx_msg_t* msg)
{
    uint8_t* buf;
    uint16_t len = msg->alen + msg->plen;
    FPRINT("(len:%d)\r\n", len);
    
    buf = (uint8_t*)MALLOC(KAL_COM_HEADER_LEN + len);
    
    // construct serial header
    // concatenate and update tx_seq ID
    buf[0] = (uint8_t)KAL_COM_SYNC_BYTE_0;
    buf[1] = (uint8_t)KAL_COM_SYNC_BYTE_1;
    buf[2] = (uint8_t)len;
    buf[3] = (uint8_t)g_com_ctx.tx_seq++;
    buf[4] = (uint8_t)msg->id;
    len += KAL_COM_HEADER_LEN;

    // copy payload and parameters
    memcpy(buf + KAL_COM_HEADER_LEN, msg->pbuf, msg->plen);
    memcpy(buf + KAL_COM_HEADER_LEN + msg->plen, msg->abuf, msg->alen);
    
    d7a_com_send(buf, len);
    
    // Stats
    g_com_ctx.tx_pkt_count++;
    g_com_ctx.tx_byte_count += len;
    
    FREE(buf);
    
    DPRINT("<-- %d (0x%02X)\r\n", (len - KAL_COM_HEADER_LEN), msg->id);
}

void d7a_com_dump(uint8_t* buf, uint8_t len, d7a_com_flow_t flow)
{
    d7a_com_tx_msg_t msg;

    msg.id = flow;
    msg.pbuf = buf;
    msg.plen = len;
    msg.alen = 0;
    d7a_com_send_msg(&msg);
}



static void d7a_com_new_pkt(d7a_com_rx_msg_t* pkt)
{
    //FPRINT("\r\n");
    if (KAL_COM_FLOWID(pkt->id) != KAL_COM_FLOWID_TRC)
    {
        DPRINT("--> %d (0x%02X)\r\n", pkt->blen, pkt->id);
    }
    
    // Distribute packet types to processes
    switch (KAL_COM_FLOWID(pkt->id))
    {
        case KAL_COM_FLOWID_FS:
            d7a_fs_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_CMD:
            d7a_modem_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_SYS:
            // This has to be here to avoid going to another process
            if (pkt->id == KAL_COM_FLOW_SYS_XON)
            {
                DPRINT("XON (%d threads)\r\n", g_waiting_nb);
                g_xon = true;
                // Free all threads
                for(uint8_t i=0 ; i<g_waiting_nb ; i++)
                {
                    g_com_ctx.xonoff->release();
                }
                g_waiting_nb = 0;
                FREE(pkt);
            }
            else if (pkt->id == KAL_COM_FLOW_SYS_XOFF)
            {
                uint8_t buf[] = "X";
                d7a_com_dump(buf, 1, KAL_COM_FLOW_SYS_XACK);
                g_xon = false;
                FREE(pkt);
                DPRINT("XOFF\r\n");
            }
            else
            {
                d7a_sys_new_pkt(pkt);
            }
            break;
        case KAL_COM_FLOWID_TRC:
            break;
        default:
            EPRINT("Untreated pkt type 0x%02X\r\n", pkt->id);
            FREE(pkt);
            break;
    }
}

/**
    Reads the Rx buffer, parses the packets and adds them to _pkt_queue

    @param void
    @return void
*/
static bool parse_packet(const uint16_t start_idx, const uint16_t data_available)
{
    uint16_t data_read = 0;
    uint8_t data = 0;
    uint8_t state = 0;
    uint8_t seqnum = 0;
    uint8_t len = 0;
    uint8_t id = 0;
    uint8_t i = 0;
    bool pkt_found = false;
    
    while (data_read < data_available)
    {
        // do not parse if minimum packet size is not available
        if (0 == state && (data_available - data_read) <= KAL_COM_HEADER_LEN)
        {
            break;
        }
        
        // Pop data
        data = g_com_ctx.rx_buffer[(g_com_ctx.read_idx + data_read) % g_com_ctx.rx_buffer_size];
        data_read++;
        //DPRINT("Read %d of %d bytes: 0x%02X\r\n", data_read, data_available, data);
        
        // search first sync byte
        if (0 == state && KAL_COM_SYNC_BYTE_0 == data)
        {
            state++;
        }
        // look up second sync byte
        else if (1 == state && KAL_COM_SYNC_BYTE_1 == data)
        {
            state++;
        }
        // look up packet size
        else if (2 == state && 0 != data)
        {
            len = data;
            // if not enough data is available
            if (data_available < data_read + 1 + 1 + len) // data read + seqnum + id + data length
            {
                // go back to begining of packet
                data_read -= 3;
                break;
            }
            else
            {
                state++;
            }
        }
        // get seqnum
        else if (3 == state)
        {
            seqnum = data;
            if (g_com_ctx.rx_seq != seqnum)
            {
                WARNING(false, "COM Missing %d seqnum\r\n", g_com_ctx.rx_seq - seqnum);
                g_com_ctx.rx_seq = seqnum;
            }

            g_com_ctx.rx_seq++;
            state++;
        }
        // get id
        else if (4 == state)
        {
            id = data;
            state++;
        }
        // we got a full packet
        else if (5 == state)
        {
            d7a_com_rx_msg_t* pkt = NULL;
            pkt = (d7a_com_rx_msg_t*)MALLOC(sizeof(d7a_com_rx_msg_t) - 1 + len);
            
            //DPRINT("Got packet fid %d len %d\r\n", fid, len);

            // copy data to buffer
            pkt->blen = len;
            pkt->id = id;
            for (i=0 ; i<len ; i++)
            {
                pkt->buffer[i] = g_com_ctx.rx_buffer[(g_com_ctx.read_idx + data_read + i - 1) % g_com_ctx.rx_buffer_size];
            }
            data_read += i - 1;
            
            // add packet to queue
            d7a_com_new_pkt(pkt);
            
            pkt_found = true;
            
            // Stats
            g_com_ctx.rx_pkt_count++;
            g_com_ctx.rx_byte_count += (len + KAL_COM_HEADER_LEN);
            
            break;
        }
        else
        {
            // Restart search
            state = 0;
        }
    }

    if (data_read)
    {
        uint16_t discarded_bytes;
        if (pkt_found)
        {
            // If packet is found, bytes before the packet are discarded
            discarded_bytes = data_read - len - KAL_COM_HEADER_LEN;
        }
        else
        {
            // Discard all bytes
            discarded_bytes = data_read;
        }
        
        if (discarded_bytes)
        {
            EPRINT("Discarding %d bytes\r\n", discarded_bytes);
            for (i=0 ; i<discarded_bytes ; i++)
            {
                DPRINT("%02X ", g_com_ctx.rx_buffer[(g_com_ctx.read_idx + i) % g_com_ctx.rx_buffer_size]);
            }
            DPRINT("\r\n");
        }
        
        // update buffer indexes
        g_com_ctx.read_idx = (g_com_ctx.read_idx + data_read) % g_com_ctx.rx_buffer_size;
        g_com_ctx.data_available -= data_read;
    }
    
    return pkt_found;
}





// Thread for parsing packets from RX buffer.
void d7a_com_thread( void const *p )
{
    FPRINT("\r\n");
    while (true)
    {
        // wait for data available
        g_com_ctx.data_parsing->wait();
        // search for packets
        parse_packet(g_com_ctx.read_idx, g_com_ctx.data_available);
    }
}