S Morita / mbed-mros2

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Committer:
smoritaemb
Date:
Thu Dec 30 21:06:29 2021 +0900
Revision:
0:580aba13d1a1
Updated to catch up to mros2 v2.3

Who changed what in which revision?

UserRevisionLine numberNew contents of line
smoritaemb 0:580aba13d1a1 1 /*
smoritaemb 0:580aba13d1a1 2 The MIT License
smoritaemb 0:580aba13d1a1 3 Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University
smoritaemb 0:580aba13d1a1 4 Permission is hereby granted, free of charge, to any person obtaining a copy
smoritaemb 0:580aba13d1a1 5 of this software and associated documentation files (the "Software"), to deal
smoritaemb 0:580aba13d1a1 6 in the Software without restriction, including without limitation the rights
smoritaemb 0:580aba13d1a1 7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
smoritaemb 0:580aba13d1a1 8 copies of the Software, and to permit persons to whom the Software is
smoritaemb 0:580aba13d1a1 9 furnished to do so, subject to the following conditions:
smoritaemb 0:580aba13d1a1 10 The above copyright notice and this permission notice shall be included in
smoritaemb 0:580aba13d1a1 11 all copies or substantial portions of the Software.
smoritaemb 0:580aba13d1a1 12 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
smoritaemb 0:580aba13d1a1 13 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
smoritaemb 0:580aba13d1a1 14 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
smoritaemb 0:580aba13d1a1 15 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
smoritaemb 0:580aba13d1a1 16 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
smoritaemb 0:580aba13d1a1 17 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
smoritaemb 0:580aba13d1a1 18 THE SOFTWARE
smoritaemb 0:580aba13d1a1 19
smoritaemb 0:580aba13d1a1 20 This file is part of embeddedRTPS.
smoritaemb 0:580aba13d1a1 21
smoritaemb 0:580aba13d1a1 22 Author: i11 - Embedded Software, RWTH Aachen University
smoritaemb 0:580aba13d1a1 23 */
smoritaemb 0:580aba13d1a1 24
smoritaemb 0:580aba13d1a1 25 #include "rtps/ThreadPool.h"
smoritaemb 0:580aba13d1a1 26
smoritaemb 0:580aba13d1a1 27 #include "lwip/tcpip.h"
smoritaemb 0:580aba13d1a1 28 #include "rtps/entities/Writer.h"
smoritaemb 0:580aba13d1a1 29 #include "rtps/utils/udpUtils.h"
smoritaemb 0:580aba13d1a1 30
smoritaemb 0:580aba13d1a1 31 using rtps::ThreadPool;
smoritaemb 0:580aba13d1a1 32
smoritaemb 0:580aba13d1a1 33 #define THREAD_POOL_VERBOSE 0
smoritaemb 0:580aba13d1a1 34
smoritaemb 0:580aba13d1a1 35 ThreadPool::ThreadPool(receiveJumppad_fp receiveCallback, void *callee)
smoritaemb 0:580aba13d1a1 36 : m_receiveJumppad(receiveCallback), m_callee(callee) {
smoritaemb 0:580aba13d1a1 37
smoritaemb 0:580aba13d1a1 38 if (!m_queueOutgoing.init() || !m_queueIncoming.init()) {
smoritaemb 0:580aba13d1a1 39 return;
smoritaemb 0:580aba13d1a1 40 }
smoritaemb 0:580aba13d1a1 41 err_t inputErr = sys_sem_new(&m_readerNotificationSem, 0);
smoritaemb 0:580aba13d1a1 42 err_t outputErr = sys_sem_new(&m_writerNotificationSem, 0);
smoritaemb 0:580aba13d1a1 43 #if THREAD_POOL_VERBOSE
smoritaemb 0:580aba13d1a1 44 if (inputErr != ERR_OK || outputErr != ERR_OK) {
smoritaemb 0:580aba13d1a1 45 printf("ThreadPool: Failed to create Semaphores.\n");
smoritaemb 0:580aba13d1a1 46 }
smoritaemb 0:580aba13d1a1 47 #endif
smoritaemb 0:580aba13d1a1 48 }
smoritaemb 0:580aba13d1a1 49
smoritaemb 0:580aba13d1a1 50 ThreadPool::~ThreadPool() {
smoritaemb 0:580aba13d1a1 51 if (m_running) {
smoritaemb 0:580aba13d1a1 52 stopThreads();
smoritaemb 0:580aba13d1a1 53 sys_msleep(500);
smoritaemb 0:580aba13d1a1 54 }
smoritaemb 0:580aba13d1a1 55
smoritaemb 0:580aba13d1a1 56 if (sys_sem_valid(&m_readerNotificationSem)) {
smoritaemb 0:580aba13d1a1 57 sys_sem_free(&m_readerNotificationSem);
smoritaemb 0:580aba13d1a1 58 }
smoritaemb 0:580aba13d1a1 59 if (sys_sem_valid(&m_writerNotificationSem)) {
smoritaemb 0:580aba13d1a1 60 sys_sem_free(&m_writerNotificationSem);
smoritaemb 0:580aba13d1a1 61 }
smoritaemb 0:580aba13d1a1 62 }
smoritaemb 0:580aba13d1a1 63
smoritaemb 0:580aba13d1a1 64 bool ThreadPool::startThreads() {
smoritaemb 0:580aba13d1a1 65 if (m_running) {
smoritaemb 0:580aba13d1a1 66 return true;
smoritaemb 0:580aba13d1a1 67 }
smoritaemb 0:580aba13d1a1 68 if (!sys_sem_valid(&m_readerNotificationSem) ||
smoritaemb 0:580aba13d1a1 69 !sys_sem_valid(&m_writerNotificationSem)) {
smoritaemb 0:580aba13d1a1 70 return false;
smoritaemb 0:580aba13d1a1 71 }
smoritaemb 0:580aba13d1a1 72
smoritaemb 0:580aba13d1a1 73 m_running = true;
smoritaemb 0:580aba13d1a1 74 for (auto &thread : m_writers) {
smoritaemb 0:580aba13d1a1 75 // TODO ID, err check, waitOnStop
smoritaemb 0:580aba13d1a1 76 #ifdef MROS2_USE_EMBEDDEDRTPS
smoritaemb 0:580aba13d1a1 77 thread = sys_thread_new("WriterThread", callWriterThreadFunction, this,
smoritaemb 0:580aba13d1a1 78 Config::THREAD_POOL_WRITER_STACKSIZE,
smoritaemb 0:580aba13d1a1 79 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 80 #else
smoritaemb 0:580aba13d1a1 81 thread = sys_thread_new("WriterThread", writerThreadFunction, this,
smoritaemb 0:580aba13d1a1 82 Config::THREAD_POOL_WRITER_STACKSIZE,
smoritaemb 0:580aba13d1a1 83 Config::THREAD_POOL_WRITER_PRIO);
smoritaemb 0:580aba13d1a1 84 #endif
smoritaemb 0:580aba13d1a1 85 }
smoritaemb 0:580aba13d1a1 86
smoritaemb 0:580aba13d1a1 87 for (auto &thread : m_readers) {
smoritaemb 0:580aba13d1a1 88 // TODO ID, err check, waitOnStop
smoritaemb 0:580aba13d1a1 89 #ifdef MROS2_USE_EMBEDDEDRTPS
smoritaemb 0:580aba13d1a1 90 thread = sys_thread_new("ReaderThread", callReaderThreadFunction, this,
smoritaemb 0:580aba13d1a1 91 Config::THREAD_POOL_READER_STACKSIZE,
smoritaemb 0:580aba13d1a1 92 Config::THREAD_POOL_READER_PRIO);
smoritaemb 0:580aba13d1a1 93 # else
smoritaemb 0:580aba13d1a1 94 thread = sys_thread_new("ReaderThread", readerThreadFunction, this,
smoritaemb 0:580aba13d1a1 95 Config::THREAD_POOL_READER_STACKSIZE,
smoritaemb 0:580aba13d1a1 96 Config::THREAD_POOL_READER_PRIO);
smoritaemb 0:580aba13d1a1 97 #endif
smoritaemb 0:580aba13d1a1 98 }
smoritaemb 0:580aba13d1a1 99 return true;
smoritaemb 0:580aba13d1a1 100 }
smoritaemb 0:580aba13d1a1 101
smoritaemb 0:580aba13d1a1 102 void ThreadPool::stopThreads() {
smoritaemb 0:580aba13d1a1 103 m_running = false;
smoritaemb 0:580aba13d1a1 104 // TODO make sure they have finished. Seems to be sufficient for tests.
smoritaemb 0:580aba13d1a1 105 // Not sufficient if threads shall actually be stopped during runtime.
smoritaemb 0:580aba13d1a1 106 sys_msleep(10);
smoritaemb 0:580aba13d1a1 107 }
smoritaemb 0:580aba13d1a1 108
smoritaemb 0:580aba13d1a1 109 void ThreadPool::clearQueues() {
smoritaemb 0:580aba13d1a1 110 m_queueOutgoing.clear();
smoritaemb 0:580aba13d1a1 111 m_queueIncoming.clear();
smoritaemb 0:580aba13d1a1 112 }
smoritaemb 0:580aba13d1a1 113
smoritaemb 0:580aba13d1a1 114 bool ThreadPool::addWorkload(Writer *workload) {
smoritaemb 0:580aba13d1a1 115 bool res = m_queueOutgoing.moveElementIntoBuffer(std::move(workload));
smoritaemb 0:580aba13d1a1 116 if (res) {
smoritaemb 0:580aba13d1a1 117 sys_sem_signal(&m_writerNotificationSem);
smoritaemb 0:580aba13d1a1 118 }
smoritaemb 0:580aba13d1a1 119
smoritaemb 0:580aba13d1a1 120 return res;
smoritaemb 0:580aba13d1a1 121 }
smoritaemb 0:580aba13d1a1 122
smoritaemb 0:580aba13d1a1 123 bool ThreadPool::addNewPacket(PacketInfo &&packet) {
smoritaemb 0:580aba13d1a1 124 bool res = m_queueIncoming.moveElementIntoBuffer(std::move(packet));
smoritaemb 0:580aba13d1a1 125 if (res) {
smoritaemb 0:580aba13d1a1 126 sys_sem_signal(&m_readerNotificationSem);
smoritaemb 0:580aba13d1a1 127 }
smoritaemb 0:580aba13d1a1 128 return res;
smoritaemb 0:580aba13d1a1 129 }
smoritaemb 0:580aba13d1a1 130
smoritaemb 0:580aba13d1a1 131 void ThreadPool::writerThreadFunction(void *arg) {
smoritaemb 0:580aba13d1a1 132 auto pool = static_cast<ThreadPool *>(arg);
smoritaemb 0:580aba13d1a1 133 if (pool == nullptr) {
smoritaemb 0:580aba13d1a1 134 #if THREAD_POOL_VERBOSE
smoritaemb 0:580aba13d1a1 135 printf("nullptr passed to writer function\n");
smoritaemb 0:580aba13d1a1 136 #endif
smoritaemb 0:580aba13d1a1 137 return;
smoritaemb 0:580aba13d1a1 138 }
smoritaemb 0:580aba13d1a1 139
smoritaemb 0:580aba13d1a1 140 pool->doWriterWork();
smoritaemb 0:580aba13d1a1 141 }
smoritaemb 0:580aba13d1a1 142
smoritaemb 0:580aba13d1a1 143 void ThreadPool::doWriterWork() {
smoritaemb 0:580aba13d1a1 144 while (m_running) {
smoritaemb 0:580aba13d1a1 145 Writer *workload;
smoritaemb 0:580aba13d1a1 146 auto isWorkToDo = m_queueOutgoing.moveFirstInto(workload);
smoritaemb 0:580aba13d1a1 147 if (!isWorkToDo) {
smoritaemb 0:580aba13d1a1 148 sys_sem_wait(&m_writerNotificationSem);
smoritaemb 0:580aba13d1a1 149 continue;
smoritaemb 0:580aba13d1a1 150 }
smoritaemb 0:580aba13d1a1 151
smoritaemb 0:580aba13d1a1 152 workload->progress();
smoritaemb 0:580aba13d1a1 153 }
smoritaemb 0:580aba13d1a1 154 }
smoritaemb 0:580aba13d1a1 155
smoritaemb 0:580aba13d1a1 156 void ThreadPool::readCallback(void *args, udp_pcb *target, pbuf *pbuf,
smoritaemb 0:580aba13d1a1 157 const ip_addr_t * /*addr*/, Ip4Port_t port) {
smoritaemb 0:580aba13d1a1 158 auto &pool = *static_cast<ThreadPool *>(args);
smoritaemb 0:580aba13d1a1 159
smoritaemb 0:580aba13d1a1 160 PacketInfo packet;
smoritaemb 0:580aba13d1a1 161 packet.destAddr = {0}; // not relevant
smoritaemb 0:580aba13d1a1 162 packet.destPort = target->local_port;
smoritaemb 0:580aba13d1a1 163 packet.srcPort = port;
smoritaemb 0:580aba13d1a1 164 packet.buffer = PBufWrapper{pbuf};
smoritaemb 0:580aba13d1a1 165 if (!pool.addNewPacket(std::move(packet))) {
smoritaemb 0:580aba13d1a1 166 #if THREAD_POOL_VERBOSE
smoritaemb 0:580aba13d1a1 167 printf("ThreadPool: dropped packet\n");
smoritaemb 0:580aba13d1a1 168 #endif
smoritaemb 0:580aba13d1a1 169 }
smoritaemb 0:580aba13d1a1 170 }
smoritaemb 0:580aba13d1a1 171
smoritaemb 0:580aba13d1a1 172 void ThreadPool::readerThreadFunction(void *arg) {
smoritaemb 0:580aba13d1a1 173 auto pool = static_cast<ThreadPool *>(arg);
smoritaemb 0:580aba13d1a1 174 if (pool == nullptr) {
smoritaemb 0:580aba13d1a1 175 #if THREAD_POOL_VERBOSE
smoritaemb 0:580aba13d1a1 176 printf("nullptr passed to reader function\n");
smoritaemb 0:580aba13d1a1 177 #endif
smoritaemb 0:580aba13d1a1 178 return;
smoritaemb 0:580aba13d1a1 179 }
smoritaemb 0:580aba13d1a1 180 pool->doReaderWork();
smoritaemb 0:580aba13d1a1 181 }
smoritaemb 0:580aba13d1a1 182
smoritaemb 0:580aba13d1a1 183 void ThreadPool::doReaderWork() {
smoritaemb 0:580aba13d1a1 184
smoritaemb 0:580aba13d1a1 185 while (m_running) {
smoritaemb 0:580aba13d1a1 186 PacketInfo packet;
smoritaemb 0:580aba13d1a1 187 auto isWorkToDo = m_queueIncoming.moveFirstInto(packet);
smoritaemb 0:580aba13d1a1 188 if (!isWorkToDo) {
smoritaemb 0:580aba13d1a1 189 sys_sem_wait(&m_readerNotificationSem);
smoritaemb 0:580aba13d1a1 190 continue;
smoritaemb 0:580aba13d1a1 191 }
smoritaemb 0:580aba13d1a1 192
smoritaemb 0:580aba13d1a1 193 m_receiveJumppad(m_callee, const_cast<const PacketInfo &>(packet));
smoritaemb 0:580aba13d1a1 194 }
smoritaemb 0:580aba13d1a1 195 }
smoritaemb 0:580aba13d1a1 196
smoritaemb 0:580aba13d1a1 197 void callWriterThreadFunction(void *arg){
smoritaemb 0:580aba13d1a1 198 ThreadPool::writerThreadFunction(arg);
smoritaemb 0:580aba13d1a1 199 }
smoritaemb 0:580aba13d1a1 200
smoritaemb 0:580aba13d1a1 201 void callReaderThreadFunction(void *arg){
smoritaemb 0:580aba13d1a1 202 ThreadPool::readerThreadFunction(arg);
smoritaemb 0:580aba13d1a1 203 }
smoritaemb 0:580aba13d1a1 204
smoritaemb 0:580aba13d1a1 205 #undef THREAD_POOL_VERBOSE