Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of d7a_1x by
src/d7a_com.cpp
- Committer:
- Jeej
- Date:
- 2016-10-21
- Revision:
- 66:492b1d7ba370
- Parent:
- 65:ac3844adfe49
- Child:
- 69:18852c154df9
File content as of revision 66:492b1d7ba370:
#include "mbed.h"
#include "rtos.h"
#include "dbg.h"
#include "d7a.h"
#include "d7a_com.h"
#include "d7a_alp.h"
#include "d7a_common.h"
#include "d7a_fs.h"
#include "d7a_modem.h"
#include "d7a_sys.h"
#define XON_SIGNAL              (0x0001 << 0)
#define START_SIGNAL            (0x0001 << 1)
typedef struct {
    uint32_t len;
    uint8_t buf[1];
} d7a_com_tx_buf_t;
enum {
    SEARCH_HEADER,
    PARSE_HEADER,
    SEARCH_BODY,
    PARSE_BODY,
};
typedef struct {
    bool                    rx_started;
    bool                    tx_started;
    uint8_t                 state;
    
    DigitalOut*             rts;
    InterruptIn*            cts;
        
    // RX buffer address
    uint8_t*                rx_buffer;
    // RX buffer size
    uint16_t                rx_buffer_size;
    // Write index in RX buffer
    volatile uint16_t       write_idx;
    // Read index in RX buffer
    uint16_t                read_idx;
    // Number of bytes available in RX buffer
    volatile uint16_t       data_available;
    // min data in buffer before parsing body
    d7a_com_rx_msg_t        msg;
    // Number of skipped bytes while parsing
    uint16_t                skipped_bytes;
    
    // Port TX sequence number
    uint8_t                 tx_seq;
    // Port RX sequence number
    uint8_t                 rx_seq;
    // Response to command maximum time
    uint32_t                timeout;
    
    // Waiting for data available in RX buffer
    Semaphore*              data_parsing;
    // CTS management
    Semaphore*              cts_int;
    
    // Data treatment threads
    Thread*                 rx_thread;
    Serial*                 serial;
    
    // Tx Thread
    Thread*                 tx_thread;
    Queue<d7a_com_tx_buf_t, 32> tx_queue;
    
    Timer                   tim;
} d7a_com_ctx_t;
static d7a_com_ctx_t g_com_ctx;
/**
    Thread for parsing packets from RX buffer.
    @param void
    @return void
*/
void d7a_com_rx_thread( void const *p );
void d7a_com_tx_thread( void const *p );
/**
    Serial Rx Interrupt Service Routine.
    Add recevied bytes to the RX buffer.
    @param void
    @return void
*/
void rx_isr()
{
// Loop just in case more than one character is in UART's receive FIFO buffer
// Stop if buffer full
    while (g_com_ctx.serial->readable())
    {
        
        g_com_ctx.rx_buffer[g_com_ctx.write_idx] = g_com_ctx.serial->getc();
        //PRINT("-");
        
        //if (g_com_ctx.rx_started)
        {
            g_com_ctx.data_available++;
            
            g_com_ctx.write_idx = (g_com_ctx.write_idx >= (g_com_ctx.rx_buffer_size-1))? 0 : g_com_ctx.write_idx+1;
            //g_com_ctx.write_idx = (g_com_ctx.write_idx + 1) % g_com_ctx.rx_buffer_size;
            
            // unlock data parsing thread
            if (g_com_ctx.state == SEARCH_HEADER && g_com_ctx.data_available >= KAL_COM_HEADER_LEN)
            {
                g_com_ctx.state = PARSE_HEADER;
                g_com_ctx.data_parsing->release();
            }
            else if (g_com_ctx.state == SEARCH_BODY && g_com_ctx.data_available >= g_com_ctx.msg.blen)
            {
                g_com_ctx.state = PARSE_BODY;
                g_com_ctx.data_parsing->release();
            }
        }
    }
    
    ASSERT(g_com_ctx.data_available <= g_com_ctx.rx_buffer_size, "RX Buffer overflow! (%d/%d)\r\n", g_com_ctx.data_available, g_com_ctx.rx_buffer_size);
}
/**
    CTS pin Interrupt Service Routine.
    For flow control (not yet inplemented)
    @param void
    @return void
*/
void cts_isr()
{
    //PRINT("CTS_INT\r\n");
    //g_com_ctx.cts_int->release();
}
// D7a_com constructor.
// Opens a serial port and monitors the input for ALP packets.
// Pins are those of the host, not the modem:
// TX-host -> RX-modem
// RX-host <- TX-modem
// RTS-host -> CTS-modem
// CTS-host <- RTS-modem
d7a_errors_t d7a_com_open( const d7a_com_config_t* config )
{
    FPRINT("\r\n");
    
    g_com_ctx.rx_started = false;
    g_com_ctx.tx_started = false;
    g_com_ctx.state = SEARCH_HEADER;
    g_com_ctx.rx_buffer_size = config->rx_buffer_size;
    g_com_ctx.rx_buffer = (uint8_t*)MALLOC(g_com_ctx.rx_buffer_size);
    g_com_ctx.data_available = 0;
    g_com_ctx.skipped_bytes = 0;
    g_com_ctx.write_idx = 0;
    g_com_ctx.read_idx = 0;
    g_com_ctx.tx_seq = 0;
    g_com_ctx.rx_seq = 0;
    
    g_com_ctx.serial =          new Serial(config->tx, config->rx);
    g_com_ctx.rts =             new DigitalOut(config->rts);
    g_com_ctx.cts =             new InterruptIn(config->cts);
    g_com_ctx.data_parsing =    new Semaphore(0);
    g_com_ctx.cts_int =         new Semaphore(0);
    
    g_com_ctx.cts->rise(&cts_isr);
    /* XXX: Unknown bug:
    Baud rate can be found with high error rate (> 4%).
    It is here manualy corrected after an oscilloscope measure.
    Normal Baudrate should be 115200.
    */
    //g_com_ctx.serial->baud(117000); // XXX
    g_com_ctx.serial->baud(115200); // XXX
    //g_com_ctx.serial->baud(57600); // XXX
    g_com_ctx.serial->format(8, SerialBase::None, 1);
    g_com_ctx.serial->attach(&rx_isr, Serial::RxIrq);
    
    g_com_ctx.tx_thread =       new Thread(d7a_com_tx_thread, NULL, osPriorityHigh, DEFAULT_STACK_SIZE*3);
    g_com_ctx.rx_thread =       new Thread(d7a_com_rx_thread, NULL, osPriorityHigh, DEFAULT_STACK_SIZE*3);
    
    return D7A_ERR_NONE;
}
// Destructor
d7a_errors_t d7a_com_close(void)
{
    FPRINT("\r\n");
    g_com_ctx.rx_thread->terminate();
    g_com_ctx.tx_thread->terminate();
    delete g_com_ctx.data_parsing;
    delete g_com_ctx.cts_int;
    delete g_com_ctx.serial;
    delete g_com_ctx.rts;
    delete g_com_ctx.cts;
    FREE(g_com_ctx.rx_buffer);
    
    return D7A_ERR_NONE;
}
void d7a_com_start_rx(void)
{
    FPRINT("\r\n");
    g_com_ctx.rx_started = true;
    g_com_ctx.rx_thread->signal_set(START_SIGNAL);
}
void d7a_com_stop_rx(void)
{
    FPRINT("\r\n");
    g_com_ctx.rx_started = false;
}
void d7a_com_start_tx(void)
{
    FPRINT("\r\n");
    g_com_ctx.tx_started = true;
    g_com_ctx.tx_thread->signal_set(START_SIGNAL);
}
void d7a_com_stop_tx(void)
{
    FPRINT("\r\n");
    g_com_ctx.tx_started = false;
}
void d7a_com_restart(void)
{
    FPRINT("\r\n");
    d7a_com_stop_rx();
    d7a_com_stop_tx();
    g_com_ctx.state = SEARCH_HEADER;
    g_com_ctx.data_available = 0;
    g_com_ctx.skipped_bytes = 0;
    g_com_ctx.write_idx = 0;
    g_com_ctx.read_idx = 0;
    g_com_ctx.tx_seq = 0;
    g_com_ctx.rx_seq = 0;
    
    d7a_com_tx_buf_t* msg;
    d7a_com_rx_msg_t* pkt;
    osEvent evt;
    
    // Flush TX queue
    do
    {
        // wait for data
        evt = g_com_ctx.tx_queue.get(1);
        msg = (evt.status == osEventMessage)? (d7a_com_tx_buf_t*)evt.value.p : NULL;
        
        // free message
        if (msg != NULL)
        {
            FREE(msg->buf);
            FREE(msg);
        }
    } while (msg != NULL);
    
    // Flush RX
    for (int i = 0 ; i < 3 ; i++)
    {
        do
        {
            // wait for data
            if (i == 0)
            {
                pkt = d7a_modem_wait_pkt(1);
            }
            else if (i == 1)
            {
                pkt = d7a_fs_wait_pkt(1);
            }
            else
            {
                pkt = d7a_sys_wait_pkt(1);
            }
            
            // free message
            if (pkt != NULL)
            {
                FREE(pkt);
            }
        } while (pkt != NULL);
    }
    
    d7a_com_start_rx();
    d7a_com_start_tx();
}
static void d7a_com_copy_to_linear(uint8_t* linear_buffer, uint16_t read_start, uint16_t length)
{
    uint16_t first_part_size;
    uint16_t second_part_size;
    
    ASSERT(length, "Can't copy 0 bytes\r\n");
    ASSERT(length <= g_com_ctx.rx_buffer_size, "Length too long (%d) for buffer size (%d)\r\n", length, g_com_ctx.rx_buffer_size);
    
    read_start %= g_com_ctx.rx_buffer_size;
    first_part_size = (length > (g_com_ctx.rx_buffer_size - read_start))? g_com_ctx.rx_buffer_size - read_start : length;
    
    //DPRINT("CPY @%08x %d %d %d\r\n", linear_buffer, length, read_start, first_part_size);
    
    memcpy((void*)&linear_buffer[0], (const void*)&g_com_ctx.rx_buffer[read_start], first_part_size);
    
    // The circular buffer is wrapping
    if (length > first_part_size)
    {
        second_part_size = length - first_part_size;
        memcpy((void*)&linear_buffer[first_part_size], (const void*)&g_com_ctx.rx_buffer[0], second_part_size);
    }
}
/**
    Wakes-up modem and send data throught Serial.
    @param const uint8_t*       Pointer to data buffer
    @param int                  Data length
    @return void
*/
static void d7a_com_send(d7a_com_tx_buf_t* tx_buf)
{    
    FPRINT("\r\n");
    
    DPRINT("<-- (0x%02X) %d\r\n", tx_buf->buf[4], (tx_buf->len - KAL_COM_HEADER_LEN));
    
    *(g_com_ctx.rts) = 1;
    
    //dbg_print_data("", "%02X ", (uint8_t*)tx_buf->buf, tx_buf->len, "\r\n");
    
    Thread::wait(2);
    
    for (uint32_t i=0 ; i<tx_buf->len ; i++)
    {
        g_com_ctx.serial->putc(tx_buf->buf[i]);
    }
   
    // Important to not release the ressource too soon
    Thread::wait(1);
    
    *(g_com_ctx.rts) = 0;
}
// Formats and send packet throught Serial.
d7a_com_tx_buf_t* d7a_com_new_msg(d7a_com_tx_msg_t* msg)
{    
    uint8_t len = KAL_COM_HEADER_LEN + msg->alen + msg->plen;
    FPRINT("(len:%d)\r\n", len);
    
    d7a_com_tx_buf_t* tx_buf = (d7a_com_tx_buf_t*)MALLOC(sizeof(d7a_com_tx_buf_t) - 1 + len);
    
    // construct serial header
    // concatenate and update tx_seq ID
    uint8_t* p = tx_buf->buf;
    uint8_t* t = p;
    *p++ = (uint8_t)KAL_COM_SYNC_BYTE_0;
    *p++ = (uint8_t)KAL_COM_SYNC_BYTE_1;
    *p++ = (uint8_t)msg->alen + msg->plen;
    *p++ = (uint8_t)g_com_ctx.tx_seq++;
    *p++ = (uint8_t)msg->id;
    // copy payload and parameters
    memcpy(p, msg->pbuf, msg->plen);
    p += msg->plen;
    memcpy(p, msg->abuf, msg->alen);
    p += msg->alen;
    
    tx_buf->len = (uint32_t)(p - t);
    
    ASSERT(tx_buf->len == len, "New msg wrong length %d expected %d\r\n", tx_buf->len, len);
    
    return tx_buf;
}
void d7a_com_post_msg(d7a_com_tx_msg_t* msg)
{
    FPRINT("\r\n");
    
    g_com_ctx.tx_queue.put(d7a_com_new_msg(msg));
}
void d7a_com_dump(uint8_t* buf, uint8_t len, d7a_com_flow_t flow)
{
    d7a_com_tx_msg_t msg;
    msg.id = flow;
    msg.pbuf = buf;
    msg.plen = len;
    msg.alen = 0;
    d7a_com_post_msg(&msg);
}
static void d7a_com_new_pkt(d7a_com_rx_msg_t* pkt)
{
    //FPRINT("\r\n");
    DPRINT("--> (0x%02X) %d\r\n", pkt->id, pkt->blen);
    // Distribute packet types to processes
    switch (KAL_COM_FLOWID(pkt->id))
    {
        case KAL_COM_FLOWID_FS:
            d7a_fs_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_CMD:
            d7a_modem_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_ALP:
            d7a_alp_new_pkt(pkt);
            break;
        case KAL_COM_FLOWID_SYS:
            // This has to be here to avoid going to another process
            if (pkt->id == KAL_COM_FLOW_SYS_XON)
            {
                FREE(pkt);
                DPRINT("XON\r\n");
                g_com_ctx.tx_thread->signal_set(XON_SIGNAL);
            }
            else if (pkt->id == KAL_COM_FLOW_SYS_XOFF)
            {
                FREE(pkt);
                DPRINT("XOFF\r\n");
                d7a_sys_xack();
            }
            else
            {
                d7a_sys_new_pkt(pkt);
            }
            break;
        default:
            EPRINT("Untreated pkt type 0x%02X\r\n", pkt->id);
            FREE(pkt);
            break;
    }
}
void d7a_com_forward_buffer(uint8_t size)
{
    g_com_ctx.read_idx = (g_com_ctx.read_idx + size) % g_com_ctx.rx_buffer_size;
    g_com_ctx.data_available -= size;
}
/**
    Reads the Rx buffer, parses the packets
    @param void
    @return void
*/
static void parse_packet_header(void)
{
    uint8_t header[KAL_COM_HEADER_LEN];
    uint8_t seqnum;
    
    ASSERT(g_com_ctx.data_available >= KAL_COM_HEADER_LEN, "Not enough data for header\r\n");
    
    g_com_ctx.skipped_bytes = 0;
    
    // Lookup fist sync byte
    while (g_com_ctx.data_available >= KAL_COM_HEADER_LEN)
    {
        if(KAL_COM_SYNC_BYTE_0 == g_com_ctx.rx_buffer[g_com_ctx.read_idx])
        {
            // copy following data
            d7a_com_copy_to_linear(header, g_com_ctx.read_idx, KAL_COM_HEADER_LEN);
            if (KAL_COM_SYNC_BYTE_1 == header[1])
            {
                // Fill temp header
                g_com_ctx.msg.blen = header[2];
                seqnum = header[3];
                g_com_ctx.msg.id = header[4];
                
                // Update seqnum
                WARNING(g_com_ctx.rx_seq == seqnum, "COM Bad seqnum expected:%d got:%d\r\n", g_com_ctx.rx_seq, seqnum);
                g_com_ctx.rx_seq = seqnum + 1;
                
                // search for body
                d7a_com_forward_buffer(KAL_COM_HEADER_LEN);
                g_com_ctx.state = SEARCH_BODY;
                
                // Start parsing if data is already available
                if (g_com_ctx.data_available >= g_com_ctx.msg.blen)
                {
                    g_com_ctx.state = PARSE_BODY;
                    g_com_ctx.data_parsing->release();
                }
                
                //DPRINT("COM header found (id: %02X seq: %d body: %d/%d bytes)\r\n", g_com_ctx.msg.id, seqnum, g_com_ctx.data_available, g_com_ctx.msg.blen);
                break;
            }
        }
        
        // byte skipped
        WARNING(g_com_ctx.skipped_bytes != 1, "COM not sync. Searching for header.\r\n");
        d7a_com_forward_buffer(1);
    }
    
    if (g_com_ctx.skipped_bytes)
    {
        WARNING(false, "COM Skipped %d bytes.\r\n", g_com_ctx.skipped_bytes);
#if 0
        uint8_t* temp = (uint8_t*)MALLOC(g_com_ctx.skipped_bytes);
        d7a_com_copy_to_linear(temp, g_com_ctx.read_idx - g_com_ctx.skipped_bytes, g_com_ctx.skipped_bytes);
        PRINT_DATA("", "%02X ", temp, g_com_ctx.skipped_bytes, "\r\n");
        FREE(temp);
#endif
        g_com_ctx.skipped_bytes = 0;
    }
}
/**
    Reads the Rx buffer, parses the packets
    @param void
    @return void
*/
static void parse_packet_body(void)
{
    ASSERT(g_com_ctx.data_available >= g_com_ctx.msg.blen, "Not enough data for body\r\n");
    
    if (KAL_COM_FLOWID(g_com_ctx.msg.id) != KAL_COM_FLOWID_TRC)
    {
        //DPRINT("COM body found (%d bytes)\r\n", g_com_ctx.msg.blen);
        
        d7a_com_rx_msg_t* pkt = (d7a_com_rx_msg_t*)MALLOC(sizeof(d7a_com_rx_msg_t) - 1 + g_com_ctx.msg.blen);
        // copy data to buffer
        pkt->blen = g_com_ctx.msg.blen;
        pkt->id = g_com_ctx.msg.id;
        d7a_com_copy_to_linear(pkt->buffer, g_com_ctx.read_idx, g_com_ctx.msg.blen);
        // add packet to queue
        d7a_com_new_pkt(pkt);
    }
    else
    {
        // Ignore packet
        //DPRINT("Ignore pkt id %02X\r\n", g_com_ctx.msg.id);
    }
    
    // update buffer indexes
    d7a_com_forward_buffer(g_com_ctx.msg.blen);
    g_com_ctx.state = SEARCH_HEADER;
    
    // Start parsing if data is already available
    if (g_com_ctx.data_available >= KAL_COM_HEADER_LEN)
    {
        g_com_ctx.state = PARSE_HEADER;
        g_com_ctx.data_parsing->release();
    }
}
// Thread for parsing packets from RX buffer.
void d7a_com_rx_thread( void const *p )
{
    FPRINT("(id:0x%08x)\r\n", osThreadGetId());
    while (true)
    {
        if (!g_com_ctx.rx_started)
        {
            g_com_ctx.rx_thread->signal_wait(START_SIGNAL);
        }
        
        // wait for data available
        g_com_ctx.data_parsing->wait();
        
        if (g_com_ctx.state == PARSE_HEADER)
        {
            parse_packet_header();
        }
        else if (g_com_ctx.state == PARSE_BODY)
        {
            parse_packet_body();
        }
    }
}
void d7a_com_tx_thread( void const *p )
{
    FPRINT("(id:0x%08x)\r\n", osThreadGetId());
    
    d7a_com_tx_buf_t* msg;
    osEvent evt;
    uint8_t flow_id;
    
    while (true)
    {
        if (!g_com_ctx.tx_started)
        {
            g_com_ctx.tx_thread->signal_wait(START_SIGNAL);
        }
        
        // wait for data to send
        evt = g_com_ctx.tx_queue.get();
        msg = (evt.status == osEventMessage)? (d7a_com_tx_buf_t*)evt.value.p : NULL;
        
        // send message
        if (msg != NULL)
        {
            flow_id = msg->buf[4];
            d7a_com_send(msg);
            FREE(msg);
            
            if (KAL_COM_FLOW_SYS_XACK == flow_id)
            {
                DPRINT("XACK\r\n");
                g_com_ctx.tx_thread->signal_wait(XON_SIGNAL);
            }
        }
    }
}
            
    