sandbox / mbed-client-classic

Fork of mbed-client-classic by Christopher Haster

Committer:
geky
Date:
Thu Feb 11 23:49:50 2016 +0000
Revision:
2:c3434146c3d2
Parent:
1:2dc916e504c9
Child:
4:0c58f5786538
Restructured M2MConnectionHandlerPimpl to use dedicated threads for recving/sending

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Christopher Haster 1:2dc916e504c9 1 /*
Christopher Haster 1:2dc916e504c9 2 * Copyright (c) 2015 ARM Limited. All rights reserved.
Christopher Haster 1:2dc916e504c9 3 * SPDX-License-Identifier: Apache-2.0
Christopher Haster 1:2dc916e504c9 4 * Licensed under the Apache License, Version 2.0 (the License); you may
Christopher Haster 1:2dc916e504c9 5 * not use this file except in compliance with the License.
Christopher Haster 1:2dc916e504c9 6 * You may obtain a copy of the License at
Christopher Haster 1:2dc916e504c9 7 *
Christopher Haster 1:2dc916e504c9 8 * http://www.apache.org/licenses/LICENSE-2.0
Christopher Haster 1:2dc916e504c9 9 *
Christopher Haster 1:2dc916e504c9 10 * Unless required by applicable law or agreed to in writing, software
Christopher Haster 1:2dc916e504c9 11 * distributed under the License is distributed on an AS IS BASIS, WITHOUT
Christopher Haster 1:2dc916e504c9 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Christopher Haster 1:2dc916e504c9 13 * See the License for the specific language governing permissions and
Christopher Haster 1:2dc916e504c9 14 * limitations under the License.
Christopher Haster 1:2dc916e504c9 15 */
Christopher Haster 1:2dc916e504c9 16 #include "mbed-client-classic/m2mconnectionhandlerpimpl.h"
Christopher Haster 1:2dc916e504c9 17 #include "mbed-client/m2mconnectionobserver.h"
Christopher Haster 1:2dc916e504c9 18 #include "mbed-client/m2mconstants.h"
Christopher Haster 1:2dc916e504c9 19 #include "mbed-client/m2msecurity.h"
Christopher Haster 1:2dc916e504c9 20 #include "mbed-client/m2mconnectionhandler.h"
Christopher Haster 1:2dc916e504c9 21
Christopher Haster 1:2dc916e504c9 22 #include "Socket/TCPSocketConnection.h"
Christopher Haster 1:2dc916e504c9 23 #include "Socket/UDPSocket.h"
Christopher Haster 1:2dc916e504c9 24 #include "threadwrapper.h"
Christopher Haster 1:2dc916e504c9 25 #include "mbed_error.h"
Christopher Haster 1:2dc916e504c9 26
Christopher Haster 1:2dc916e504c9 27
Christopher Haster 1:2dc916e504c9 28 M2MConnectionHandlerPimpl::M2MConnectionHandlerPimpl(M2MConnectionHandler* base, M2MConnectionObserver &observer,
Christopher Haster 1:2dc916e504c9 29 M2MConnectionSecurity* sec,
Christopher Haster 1:2dc916e504c9 30 M2MInterface::BindingMode mode,
Christopher Haster 1:2dc916e504c9 31 M2MInterface::NetworkStack stack)
Christopher Haster 1:2dc916e504c9 32 :_base(base),
Christopher Haster 1:2dc916e504c9 33 _observer(observer),
Christopher Haster 1:2dc916e504c9 34 _security_impl(sec),
Christopher Haster 1:2dc916e504c9 35 _use_secure_connection(false),
Christopher Haster 1:2dc916e504c9 36 _binding_mode(mode),
Christopher Haster 1:2dc916e504c9 37 _network_stack(stack),
Christopher Haster 1:2dc916e504c9 38 _resolved(true),
Christopher Haster 1:2dc916e504c9 39 _is_handshaking(false),
Christopher Haster 1:2dc916e504c9 40 _listening(false),
geky 2:c3434146c3d2 41 _listen_thread(0),
geky 2:c3434146c3d2 42 _recv_thread(0),
geky 2:c3434146c3d2 43 _send_thread(0)
Christopher Haster 1:2dc916e504c9 44 {
Christopher Haster 1:2dc916e504c9 45 memset(&_address_buffer, 0, sizeof _address_buffer);
Christopher Haster 1:2dc916e504c9 46 memset(&_address, 0, sizeof _address);
Christopher Haster 1:2dc916e504c9 47 _address._address = _address_buffer;
Christopher Haster 1:2dc916e504c9 48
Christopher Haster 1:2dc916e504c9 49 if (_network_stack != M2MInterface::LwIP_IPv4) {
Christopher Haster 1:2dc916e504c9 50 error("ConnectionHandler: Unsupported network stack, only IPv4 is currently supported");
Christopher Haster 1:2dc916e504c9 51 }
Christopher Haster 1:2dc916e504c9 52
Christopher Haster 1:2dc916e504c9 53 if (_binding_mode == M2MInterface::TCP ||
Christopher Haster 1:2dc916e504c9 54 _binding_mode == M2MInterface::TCP_QUEUE) {
Christopher Haster 1:2dc916e504c9 55 error("ConnectionHandler: Unsupported binding mode, only UDP based modes are currently supported");
Christopher Haster 1:2dc916e504c9 56 }
geky 2:c3434146c3d2 57
geky 2:c3434146c3d2 58 _running = true;
geky 2:c3434146c3d2 59 _recv_thread = rtos::create_thread<
geky 2:c3434146c3d2 60 M2MConnectionHandlerPimpl,
geky 2:c3434146c3d2 61 &M2MConnectionHandlerPimpl::recv_handler>(this, osPriorityAboveNormal);
geky 2:c3434146c3d2 62 _send_thread = rtos::create_thread<
geky 2:c3434146c3d2 63 M2MConnectionHandlerPimpl,
geky 2:c3434146c3d2 64 &M2MConnectionHandlerPimpl::send_handler>(this);
geky 2:c3434146c3d2 65 _listen_thread = rtos::create_thread<
geky 2:c3434146c3d2 66 M2MConnectionHandlerPimpl,
geky 2:c3434146c3d2 67 &M2MConnectionHandlerPimpl::listen_handler>(this);
Christopher Haster 1:2dc916e504c9 68 }
Christopher Haster 1:2dc916e504c9 69
Christopher Haster 1:2dc916e504c9 70 M2MConnectionHandlerPimpl::~M2MConnectionHandlerPimpl()
Christopher Haster 1:2dc916e504c9 71 {
geky 2:c3434146c3d2 72 _listening = false;
geky 2:c3434146c3d2 73 _running = false;
geky 2:c3434146c3d2 74
geky 2:c3434146c3d2 75 if (_listen_thread) {
geky 2:c3434146c3d2 76 delete _listen_thread;
geky 2:c3434146c3d2 77 _listen_thread = 0;
geky 2:c3434146c3d2 78 }
geky 2:c3434146c3d2 79
geky 2:c3434146c3d2 80 if (_recv_thread) {
geky 2:c3434146c3d2 81 delete _recv_thread;
geky 2:c3434146c3d2 82 _recv_thread = 0;
geky 2:c3434146c3d2 83 }
geky 2:c3434146c3d2 84
geky 2:c3434146c3d2 85 if (_send_thread) {
geky 2:c3434146c3d2 86 delete _send_thread;
geky 2:c3434146c3d2 87 _send_thread = 0;
geky 2:c3434146c3d2 88 }
geky 2:c3434146c3d2 89
Christopher Haster 1:2dc916e504c9 90 delete _security_impl;
Christopher Haster 1:2dc916e504c9 91 }
Christopher Haster 1:2dc916e504c9 92
Christopher Haster 1:2dc916e504c9 93 bool M2MConnectionHandlerPimpl::bind_connection(const uint16_t listen_port)
Christopher Haster 1:2dc916e504c9 94 {
Christopher Haster 1:2dc916e504c9 95 return !(_socket.bind(listen_port) < 0);
Christopher Haster 1:2dc916e504c9 96 }
Christopher Haster 1:2dc916e504c9 97
Christopher Haster 1:2dc916e504c9 98 bool M2MConnectionHandlerPimpl::resolve_server_address(const String& server_address,
Christopher Haster 1:2dc916e504c9 99 const uint16_t server_port,
Christopher Haster 1:2dc916e504c9 100 M2MConnectionObserver::ServerType server_type,
Christopher Haster 1:2dc916e504c9 101 const M2MSecurity* security)
Christopher Haster 1:2dc916e504c9 102 {
Christopher Haster 1:2dc916e504c9 103 if (_endpoint.set_address(server_address.c_str(), server_port) < 0) {
Christopher Haster 1:2dc916e504c9 104 return false;
Christopher Haster 1:2dc916e504c9 105 }
Christopher Haster 1:2dc916e504c9 106
Christopher Haster 1:2dc916e504c9 107 inet_aton(_endpoint.get_address(), _address._address);
Christopher Haster 1:2dc916e504c9 108 _address._port = _endpoint.get_port();
Christopher Haster 1:2dc916e504c9 109 _address._length = 4;
Christopher Haster 1:2dc916e504c9 110 _address._stack = _network_stack;
Christopher Haster 1:2dc916e504c9 111
Christopher Haster 1:2dc916e504c9 112 if (security) {
Christopher Haster 1:2dc916e504c9 113 if (security->resource_value_int(M2MSecurity::SecurityMode) == M2MSecurity::Certificate ||
Christopher Haster 1:2dc916e504c9 114 security->resource_value_int(M2MSecurity::SecurityMode) == M2MSecurity::Psk) {
Christopher Haster 1:2dc916e504c9 115 if (_security_impl != NULL) {
Christopher Haster 1:2dc916e504c9 116 _security_impl->reset();
Christopher Haster 1:2dc916e504c9 117 _security_impl->init(security);
Christopher Haster 1:2dc916e504c9 118
Christopher Haster 1:2dc916e504c9 119 if (_security_impl->connect(_base) < 0) {
Christopher Haster 1:2dc916e504c9 120 return false;
Christopher Haster 1:2dc916e504c9 121 }
Christopher Haster 1:2dc916e504c9 122
Christopher Haster 1:2dc916e504c9 123 _use_secure_connection = true;
Christopher Haster 1:2dc916e504c9 124 }
Christopher Haster 1:2dc916e504c9 125 }
Christopher Haster 1:2dc916e504c9 126 }
Christopher Haster 1:2dc916e504c9 127
Christopher Haster 1:2dc916e504c9 128 _observer.address_ready(_address,
Christopher Haster 1:2dc916e504c9 129 server_type,
Christopher Haster 1:2dc916e504c9 130 _address._port);
Christopher Haster 1:2dc916e504c9 131
Christopher Haster 1:2dc916e504c9 132 return true;
Christopher Haster 1:2dc916e504c9 133 }
Christopher Haster 1:2dc916e504c9 134
geky 2:c3434146c3d2 135
geky 2:c3434146c3d2 136 void M2MConnectionHandlerPimpl::recv_handler()
geky 2:c3434146c3d2 137 {
geky 2:c3434146c3d2 138 while (_running) {
geky 2:c3434146c3d2 139 Endpoint recv_endpoint;
geky 2:c3434146c3d2 140 int size = _socket.receiveFrom(recv_endpoint,
geky 2:c3434146c3d2 141 (char*)_recv_buffer, sizeof _recv_buffer);
geky 2:c3434146c3d2 142
geky 2:c3434146c3d2 143 if (size > 0) {
geky 2:c3434146c3d2 144 _recv_queue.put(new std::string(_recv_buffer, _recv_buffer+size));
geky 2:c3434146c3d2 145 } else {
geky 2:c3434146c3d2 146 rtos::Thread::wait(1000);
geky 2:c3434146c3d2 147 }
geky 2:c3434146c3d2 148 }
geky 2:c3434146c3d2 149 }
geky 2:c3434146c3d2 150
geky 2:c3434146c3d2 151 void M2MConnectionHandlerPimpl::send_handler()
geky 2:c3434146c3d2 152 {
geky 2:c3434146c3d2 153 while (_running) {
geky 2:c3434146c3d2 154 osEvent e = _send_queue.get();
geky 2:c3434146c3d2 155 if (e.status == osEventMessage) {
geky 2:c3434146c3d2 156 std::string *packet = (std::string*)e.value.p;
geky 2:c3434146c3d2 157 int size = _socket.sendTo(_endpoint, (char*)packet->data(), packet->size());
geky 2:c3434146c3d2 158 delete packet;
geky 2:c3434146c3d2 159 } else {
geky 2:c3434146c3d2 160 rtos::Thread::wait(1000);
geky 2:c3434146c3d2 161 }
geky 2:c3434146c3d2 162 }
geky 2:c3434146c3d2 163 }
geky 2:c3434146c3d2 164
Christopher Haster 1:2dc916e504c9 165 bool M2MConnectionHandlerPimpl::send_data(uint8_t *data,
Christopher Haster 1:2dc916e504c9 166 uint16_t data_len,
Christopher Haster 1:2dc916e504c9 167 sn_nsdl_addr_s *address)
Christopher Haster 1:2dc916e504c9 168 {
Christopher Haster 1:2dc916e504c9 169 if (address == NULL || data == NULL) {
Christopher Haster 1:2dc916e504c9 170 return false;
Christopher Haster 1:2dc916e504c9 171 }
Christopher Haster 1:2dc916e504c9 172
Christopher Haster 1:2dc916e504c9 173 if (_use_secure_connection) {
Christopher Haster 1:2dc916e504c9 174 if (_security_impl->send_message(data, data_len) < 0) {
Christopher Haster 1:2dc916e504c9 175 return false;
Christopher Haster 1:2dc916e504c9 176 }
Christopher Haster 1:2dc916e504c9 177 } else {
geky 2:c3434146c3d2 178 if (send_to_socket(data, data_len) < 0) {
Christopher Haster 1:2dc916e504c9 179 return false;
Christopher Haster 1:2dc916e504c9 180 }
Christopher Haster 1:2dc916e504c9 181 }
Christopher Haster 1:2dc916e504c9 182
Christopher Haster 1:2dc916e504c9 183 _observer.data_sent();
Christopher Haster 1:2dc916e504c9 184 return true;
Christopher Haster 1:2dc916e504c9 185 }
Christopher Haster 1:2dc916e504c9 186
Christopher Haster 1:2dc916e504c9 187 bool M2MConnectionHandlerPimpl::start_listening_for_data()
Christopher Haster 1:2dc916e504c9 188 {
Christopher Haster 1:2dc916e504c9 189 _listening = true;
Christopher Haster 1:2dc916e504c9 190 return true;
Christopher Haster 1:2dc916e504c9 191 }
Christopher Haster 1:2dc916e504c9 192
Christopher Haster 1:2dc916e504c9 193 void M2MConnectionHandlerPimpl::stop_listening()
Christopher Haster 1:2dc916e504c9 194 {
geky 2:c3434146c3d2 195 _listening = false;
Christopher Haster 1:2dc916e504c9 196 }
Christopher Haster 1:2dc916e504c9 197
geky 2:c3434146c3d2 198 void M2MConnectionHandlerPimpl::listen_handler()
Christopher Haster 1:2dc916e504c9 199 {
geky 2:c3434146c3d2 200 while (_running) {
geky 2:c3434146c3d2 201 if (_listening) {
geky 2:c3434146c3d2 202 memset(_listen_buffer, 0, sizeof _listen_buffer);
geky 2:c3434146c3d2 203 int size;
geky 2:c3434146c3d2 204
geky 2:c3434146c3d2 205 if (_use_secure_connection) {
geky 2:c3434146c3d2 206 size = _security_impl->read(_listen_buffer, sizeof _listen_buffer);
geky 2:c3434146c3d2 207 } else {
geky 2:c3434146c3d2 208 size = receive_from_socket(_listen_buffer, sizeof _listen_buffer);
geky 2:c3434146c3d2 209 }
geky 2:c3434146c3d2 210
Christopher Haster 1:2dc916e504c9 211 if (size > 0) {
geky 2:c3434146c3d2 212 _observer.data_available((uint8_t*)_listen_buffer, size, _address);
Christopher Haster 1:2dc916e504c9 213 } else if (size != 0) {
Christopher Haster 1:2dc916e504c9 214 _listening = false;
Christopher Haster 1:2dc916e504c9 215 _observer.socket_error(2);
Christopher Haster 1:2dc916e504c9 216 }
geky 2:c3434146c3d2 217 } else {
geky 2:c3434146c3d2 218 rtos::Thread::wait(1000);
Christopher Haster 1:2dc916e504c9 219 }
Christopher Haster 1:2dc916e504c9 220 }
Christopher Haster 1:2dc916e504c9 221 }
Christopher Haster 1:2dc916e504c9 222
Christopher Haster 1:2dc916e504c9 223
Christopher Haster 1:2dc916e504c9 224 int M2MConnectionHandlerPimpl::send_to_socket(const unsigned char *buf, size_t len)
Christopher Haster 1:2dc916e504c9 225 {
geky 2:c3434146c3d2 226 if (_send_queue.put(new std::string(buf, buf+len)) != osOK) {
Christopher Haster 1:2dc916e504c9 227 return M2MConnectionHandler::CONNECTION_ERROR_WANTS_WRITE;
Christopher Haster 1:2dc916e504c9 228 } else {
geky 2:c3434146c3d2 229 return len;
Christopher Haster 1:2dc916e504c9 230 }
Christopher Haster 1:2dc916e504c9 231 }
Christopher Haster 1:2dc916e504c9 232
Christopher Haster 1:2dc916e504c9 233 int M2MConnectionHandlerPimpl::receive_from_socket(unsigned char *buf, size_t len)
Christopher Haster 1:2dc916e504c9 234 {
geky 2:c3434146c3d2 235 osEvent e = _recv_queue.get();
geky 2:c3434146c3d2 236 if (e.status == osEventMessage) {
geky 2:c3434146c3d2 237 std::string *packet = (std::string*)e.value.p;
geky 2:c3434146c3d2 238 int size = packet->size();
geky 2:c3434146c3d2 239
geky 2:c3434146c3d2 240 if (size <= len) {
geky 2:c3434146c3d2 241 memcpy(buf, packet->data(), size);
geky 2:c3434146c3d2 242 delete packet;
geky 2:c3434146c3d2 243 return size;
geky 2:c3434146c3d2 244 }
geky 2:c3434146c3d2 245 }
Christopher Haster 1:2dc916e504c9 246
geky 2:c3434146c3d2 247 return M2MConnectionHandler::CONNECTION_ERROR_WANTS_READ;
Christopher Haster 1:2dc916e504c9 248 }
Christopher Haster 1:2dc916e504c9 249
Christopher Haster 1:2dc916e504c9 250 void M2MConnectionHandlerPimpl::handle_connection_error(int /*error*/)
Christopher Haster 1:2dc916e504c9 251 {
Christopher Haster 1:2dc916e504c9 252 _observer.socket_error(4);
Christopher Haster 1:2dc916e504c9 253 }