sandbox / mbed-client-classic

Fork of mbed-client-classic by Christopher Haster

Revision:
2:c3434146c3d2
Parent:
1:2dc916e504c9
Child:
4:0c58f5786538
diff -r 2dc916e504c9 -r c3434146c3d2 source/m2mconnectionhandlerpimpl.cpp
--- a/source/m2mconnectionhandlerpimpl.cpp	Fri Jan 22 16:53:32 2016 -0600
+++ b/source/m2mconnectionhandlerpimpl.cpp	Thu Feb 11 23:49:50 2016 +0000
@@ -38,7 +38,9 @@
  _resolved(true),
  _is_handshaking(false),
  _listening(false),
- _reading_thread(0)
+ _listen_thread(0),
+ _recv_thread(0),
+ _send_thread(0)
 {
     memset(&_address_buffer, 0, sizeof _address_buffer);
     memset(&_address, 0, sizeof _address);
@@ -52,11 +54,39 @@
         _binding_mode == M2MInterface::TCP_QUEUE) {
         error("ConnectionHandler: Unsupported binding mode, only UDP based modes are currently supported");
     }
+    
+    _running = true;
+    _recv_thread = rtos::create_thread<
+        M2MConnectionHandlerPimpl,
+        &M2MConnectionHandlerPimpl::recv_handler>(this, osPriorityAboveNormal);
+    _send_thread = rtos::create_thread<
+        M2MConnectionHandlerPimpl,
+        &M2MConnectionHandlerPimpl::send_handler>(this);
+    _listen_thread = rtos::create_thread<
+        M2MConnectionHandlerPimpl,
+        &M2MConnectionHandlerPimpl::listen_handler>(this);    
 }
 
 M2MConnectionHandlerPimpl::~M2MConnectionHandlerPimpl()
 {
-    stop_listening();
+    _listening = false;
+    _running = false;
+    
+    if (_listen_thread) {
+        delete _listen_thread;
+        _listen_thread = 0;
+    }
+    
+    if (_recv_thread) {
+        delete _recv_thread;
+        _recv_thread = 0;
+    }
+    
+    if (_send_thread) {
+        delete _send_thread;
+        _send_thread = 0;
+    }
+    
     delete _security_impl;
 }
 
@@ -102,6 +132,36 @@
     return true;
 }
 
+
+void M2MConnectionHandlerPimpl::recv_handler()
+{
+    while (_running) {
+        Endpoint recv_endpoint;
+        int size = _socket.receiveFrom(recv_endpoint,
+            (char*)_recv_buffer, sizeof _recv_buffer);
+    
+        if (size > 0) {
+            _recv_queue.put(new std::string(_recv_buffer, _recv_buffer+size));
+        } else {
+            rtos::Thread::wait(1000);
+        }
+    }
+}
+
+void M2MConnectionHandlerPimpl::send_handler()
+{    
+    while (_running) {
+        osEvent e = _send_queue.get();
+        if (e.status == osEventMessage) {
+            std::string *packet = (std::string*)e.value.p;
+            int size = _socket.sendTo(_endpoint, (char*)packet->data(), packet->size());
+            delete packet;
+        } else {
+            rtos::Thread::wait(1000);
+        }
+    }
+}
+
 bool M2MConnectionHandlerPimpl::send_data(uint8_t *data,
                                           uint16_t data_len,
                                           sn_nsdl_addr_s *address)
@@ -115,7 +175,7 @@
             return false;
         }
     } else {
-        if (_socket.sendTo(_endpoint, (char*)data, data_len) < 0) {
+        if (send_to_socket(data, data_len) < 0) {
             return false;
         }
     }
@@ -127,45 +187,35 @@
 bool M2MConnectionHandlerPimpl::start_listening_for_data()
 {
     _listening = true;
-    _reading_thread = create_thread<M2MConnectionHandlerPimpl,
-                                   &M2MConnectionHandlerPimpl::receive_data>(this);
     return true;
 }
 
 void M2MConnectionHandlerPimpl::stop_listening()
 {
-    if (_reading_thread) {
-        _listening = false;
-        delete _reading_thread;
-        _reading_thread = 0;
-    }
+    _listening = false;
 }
 
-void M2MConnectionHandlerPimpl::receive_data()
+void M2MConnectionHandlerPimpl::listen_handler()
 {
-    while (_listening) {
-        if (_use_secure_connection) {
-            memset(_receive_buffer, 0, sizeof _receive_buffer);
-
-            int size = _security_impl->read(_receive_buffer, sizeof _receive_buffer);
+    while (_running) {
+        if (_listening) {
+            memset(_listen_buffer, 0, sizeof _listen_buffer);
+            int size;
+            
+            if (_use_secure_connection) {
+                size = _security_impl->read(_listen_buffer, sizeof _listen_buffer);
+            } else {
+                size = receive_from_socket(_listen_buffer, sizeof _listen_buffer);
+            }
+            
             if (size > 0) {
-                _observer.data_available((uint8_t*)_receive_buffer,
-                                         size, _address);
-            } else if (size != 0 && size != M2MConnectionHandler::CONNECTION_ERROR_WANTS_READ) {
-                _listening = false;
-                _observer.socket_error(1);
-            }
-        } else {
-            memset(_receive_buffer, 0, sizeof _receive_buffer);
-
-            int size = _socket.receiveFrom(_endpoint, (char*)_receive_buffer, sizeof _receive_buffer);
-            if (size > 0) {
-                _observer.data_available((uint8_t*)_receive_buffer,
-                                         size, _address);
+                _observer.data_available((uint8_t*)_listen_buffer, size, _address);
             } else if (size != 0) {
                 _listening = false;
                 _observer.socket_error(2);
             }
+        } else {
+            rtos::Thread::wait(1000);
         }
     }
 }
@@ -173,25 +223,28 @@
 
 int M2MConnectionHandlerPimpl::send_to_socket(const unsigned char *buf, size_t len)
 {
-    int size = _socket.sendTo(_endpoint, (char*)buf, len);
-    
-    if (size == 0) {
+    if (_send_queue.put(new std::string(buf, buf+len)) != osOK) {
         return M2MConnectionHandler::CONNECTION_ERROR_WANTS_WRITE;
     } else {
-        return size;
+        return len;
     }
 }
 
 int M2MConnectionHandlerPimpl::receive_from_socket(unsigned char *buf, size_t len)
 {
-    Endpoint recv_endpoint;
-    int size = _socket.receiveFrom(recv_endpoint, (char*)buf, len);
+    osEvent e = _recv_queue.get();
+    if (e.status == osEventMessage) {
+        std::string *packet = (std::string*)e.value.p;
+        int size = packet->size();
+        
+        if (size <= len) {
+            memcpy(buf, packet->data(), size);
+            delete packet;
+            return size;
+        }
+    }
     
-    if (size == 0) {
-        return M2MConnectionHandler::CONNECTION_ERROR_WANTS_READ;
-    } else {
-        return size;
-    }
+    return M2MConnectionHandler::CONNECTION_ERROR_WANTS_READ;
 }
 
 void M2MConnectionHandlerPimpl::handle_connection_error(int /*error*/)