libuav original
Dependents: UAVCAN UAVCAN_Subscriber
test_multithreading.cpp
00001 /* 00002 * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com> 00003 */ 00004 00005 #ifndef NDEBUG 00006 # define UAVCAN_DEBUG 1 00007 #endif 00008 00009 #include <iostream> 00010 #include <thread> 00011 #include <condition_variable> 00012 #include <uavcan_linux/uavcan_linux.hpp> 00013 #include <uavcan/node/sub_node.hpp> 00014 #include <uavcan/protocol/node_status_monitor.hpp> 00015 #include <uavcan/protocol/debug/KeyValue.hpp> 00016 #include "debug.hpp" 00017 00018 /** 00019 * Generic queue based on the linked list class defined in libuavcan. 00020 * This class does not use heap memory. 00021 */ 00022 template <typename T> 00023 class Queue 00024 { 00025 struct Item : public uavcan::LinkedListNode<Item> 00026 { 00027 T payload; 00028 00029 template <typename... Args> 00030 Item(Args... args) : payload(args...) { } 00031 }; 00032 00033 uavcan::LimitedPoolAllocator allocator_; 00034 uavcan::LinkedListRoot<Item> list_; 00035 00036 public: 00037 Queue(uavcan::IPoolAllocator& arg_allocator, std::size_t block_allocation_quota) : 00038 allocator_(arg_allocator, block_allocation_quota) 00039 { 00040 uavcan::IsDynamicallyAllocatable<Item>::check(); 00041 } 00042 00043 bool isEmpty() const { return list_.isEmpty(); } 00044 00045 /** 00046 * Creates one item in-place at the end of the list. 00047 * Returns true if the item was appended successfully, false if there's not enough memory. 00048 * Complexity is O(N) where N is queue length. 00049 */ 00050 template <typename... Args> 00051 bool tryEmplace(Args... args) 00052 { 00053 // Allocating memory 00054 void* const ptr = allocator_.allocate(sizeof(Item)); 00055 if (ptr == nullptr) 00056 { 00057 return false; 00058 } 00059 00060 // Constructing the new item 00061 Item* const item = new (ptr) Item(args...); 00062 assert(item != nullptr); 00063 00064 // Inserting the new item at the end of the list 00065 Item* p = list_.get(); 00066 if (p == nullptr) 00067 { 00068 list_.insert(item); 00069 } 00070 else 00071 { 00072 while (p->getNextListNode() != nullptr) 00073 { 00074 p = p->getNextListNode(); 00075 } 00076 assert(p->getNextListNode() == nullptr); 00077 p->setNextListNode(item); 00078 assert(p->getNextListNode()->getNextListNode() == nullptr); 00079 } 00080 00081 return true; 00082 } 00083 00084 /** 00085 * Accesses the first element. 00086 * Nullptr will be returned if the queue is empty. 00087 * Complexity is O(1). 00088 */ 00089 T* peek() { return isEmpty() ? nullptr : &list_.get()->payload; } 00090 const T* peek() const { return isEmpty() ? nullptr : &list_.get()->payload; } 00091 00092 /** 00093 * Removes the first element. 00094 * If the queue is empty, nothing will be done and assertion failure will be triggered. 00095 * Complexity is O(1). 00096 */ 00097 void pop() 00098 { 00099 Item* const item = list_.get(); 00100 assert(item != nullptr); 00101 if (item != nullptr) 00102 { 00103 list_.remove(item); 00104 item->~Item(); 00105 allocator_.deallocate(item); 00106 } 00107 } 00108 }; 00109 00110 /** 00111 * Feel free to remove. 00112 */ 00113 static void testQueue() 00114 { 00115 uavcan::PoolAllocator<1024, uavcan::MemPoolBlockSize> allocator; 00116 Queue<typename uavcan::MakeString<50>::Type> q(allocator, 4); 00117 ENFORCE(q.isEmpty()); 00118 ENFORCE(q.peek() == nullptr); 00119 ENFORCE(q.tryEmplace("One")); 00120 ENFORCE(q.tryEmplace("Two")); 00121 ENFORCE(q.tryEmplace("Three")); 00122 ENFORCE(q.tryEmplace("Four")); 00123 ENFORCE(!q.tryEmplace("Five")); 00124 ENFORCE(*q.peek() == "One"); 00125 q.pop(); 00126 ENFORCE(*q.peek() == "Two"); 00127 q.pop(); 00128 ENFORCE(*q.peek() == "Three"); 00129 q.pop(); 00130 ENFORCE(*q.peek() == "Four"); 00131 q.pop(); 00132 ENFORCE(q.isEmpty()); 00133 ENFORCE(q.peek() == nullptr); 00134 } 00135 00136 /** 00137 * Objects of this class are owned by the sub-node thread. 00138 * This class does not use heap memory. 00139 */ 00140 class VirtualCanIface : public uavcan::ICanIface, 00141 uavcan::Noncopyable 00142 { 00143 struct RxItem 00144 { 00145 const uavcan::CanRxFrame frame; 00146 const uavcan::CanIOFlags flags; 00147 00148 RxItem(const uavcan::CanRxFrame& arg_frame, uavcan::CanIOFlags arg_flags) : 00149 frame(arg_frame), 00150 flags(arg_flags) 00151 { } 00152 }; 00153 00154 std::mutex& mutex_; 00155 uavcan::CanTxQueue prioritized_tx_queue_; 00156 Queue<RxItem> rx_queue_; 00157 00158 int16_t send(const uavcan::CanFrame& frame, uavcan::MonotonicTime tx_deadline, uavcan::CanIOFlags flags) override 00159 { 00160 std::lock_guard<std::mutex> lock(mutex_); 00161 prioritized_tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags); 00162 return 1; 00163 } 00164 00165 int16_t receive(uavcan::CanFrame& out_frame, uavcan::MonotonicTime& out_ts_monotonic, 00166 uavcan::UtcTime& out_ts_utc, uavcan::CanIOFlags& out_flags) override 00167 { 00168 std::lock_guard<std::mutex> lock(mutex_); 00169 00170 if (rx_queue_.isEmpty()) 00171 { 00172 return 0; 00173 } 00174 00175 const auto item = *rx_queue_.peek(); 00176 rx_queue_.pop(); 00177 00178 out_frame = item.frame; 00179 out_ts_monotonic = item.frame.ts_mono; 00180 out_ts_utc = item.frame.ts_utc; 00181 out_flags = item.flags; 00182 00183 return 1; 00184 } 00185 00186 int16_t configureFilters(const uavcan::CanFilterConfig*, std::uint16_t) override { return -uavcan::ErrDriver; } 00187 uint16_t getNumFilters() const override { return 0; } 00188 uint64_t getErrorCount() const override { return 0; } 00189 00190 public: 00191 VirtualCanIface(uavcan::IPoolAllocator& allocator, uavcan::ISystemClock& clock, 00192 std::mutex& arg_mutex, unsigned quota_per_queue) : 00193 mutex_(arg_mutex), 00194 prioritized_tx_queue_(allocator, clock, quota_per_queue), 00195 rx_queue_(allocator, quota_per_queue) 00196 { } 00197 00198 /** 00199 * Note that RX queue overwrites oldest items when overflowed. 00200 * Call this from the main thread only. 00201 * No additional locking is required. 00202 */ 00203 void addRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) 00204 { 00205 std::lock_guard<std::mutex> lock(mutex_); 00206 if (!rx_queue_.tryEmplace(frame, flags) && !rx_queue_.isEmpty()) 00207 { 00208 rx_queue_.pop(); 00209 (void)rx_queue_.tryEmplace(frame, flags); 00210 } 00211 } 00212 00213 /** 00214 * Call this from the main thread only. 00215 * No additional locking is required. 00216 */ 00217 void flushTxQueueTo(uavcan::INode& main_node, std::uint8_t iface_index) 00218 { 00219 std::lock_guard<std::mutex> lock(mutex_); 00220 00221 const std::uint8_t iface_mask = static_cast<std::uint8_t>(1U << iface_index); 00222 00223 while (auto e = prioritized_tx_queue_.peek()) 00224 { 00225 UAVCAN_TRACE("VirtualCanIface", "TX injection [iface=0x%02x]: %s", 00226 unsigned(iface_mask), e->toString().c_str()); 00227 00228 const int res = main_node.injectTxFrame(e->frame, e->deadline, iface_mask, 00229 uavcan::CanTxQueue::Qos(e->qos), e->flags); 00230 prioritized_tx_queue_.remove(e); 00231 if (res <= 0) 00232 { 00233 break; 00234 } 00235 } 00236 } 00237 00238 /** 00239 * Call this from the sub-node thread only. 00240 * No additional locking is required. 00241 */ 00242 bool hasDataInRxQueue() const 00243 { 00244 std::lock_guard<std::mutex> lock(mutex_); 00245 return !rx_queue_.isEmpty(); 00246 } 00247 }; 00248 00249 /** 00250 * This interface defines one method that will be called by the main node thread periodically in order to 00251 * transfer contents of TX queue of the sub-node into the TX queue of the main node. 00252 */ 00253 class ITxQueueInjector 00254 { 00255 public: 00256 virtual ~ITxQueueInjector() { } 00257 00258 /** 00259 * Flush contents of TX queues into the main node. 00260 * @param main_node Reference to the main node. 00261 */ 00262 virtual void injectTxFramesInto(uavcan::INode& main_node) = 0; 00263 }; 00264 00265 /** 00266 * Objects of this class are owned by the sub-node thread. 00267 * This class does not use heap memory. 00268 * @tparam SharedMemoryPoolSize Amount of memory, in bytes, that will be statically allocated for the 00269 * memory pool that will be shared across all interfaces for RX/TX queues. 00270 * Typically this value should be no less than 4K per interface. 00271 */ 00272 template <unsigned SharedMemoryPoolSize> 00273 class VirtualCanDriver : public uavcan::ICanDriver, 00274 public uavcan::IRxFrameListener, 00275 public ITxQueueInjector, 00276 uavcan::Noncopyable 00277 { 00278 class Event 00279 { 00280 std::mutex m_; 00281 std::condition_variable cv_; 00282 00283 public: 00284 /** 00285 * Note that this method may return spuriously. 00286 */ 00287 void waitFor(uavcan::MonotonicDuration duration) 00288 { 00289 std::unique_lock<std::mutex> lk(m_); 00290 (void)cv_.wait_for(lk, std::chrono::microseconds(duration.toUSec())); 00291 } 00292 00293 void signal() { cv_.notify_all(); } 00294 }; 00295 00296 Event event_; ///< Used to unblock the select() call when IO happens. 00297 std::mutex mutex_; ///< Shared across all ifaces 00298 uavcan::PoolAllocator<SharedMemoryPoolSize, uavcan::MemPoolBlockSize> allocator_; ///< Shared across all ifaces 00299 uavcan::LazyConstructor<VirtualCanIface> ifaces_[uavcan::MaxCanIfaces]; 00300 const unsigned num_ifaces_; 00301 uavcan_linux::SystemClock clock_; 00302 00303 uavcan::ICanIface* getIface(uint8_t iface_index) override 00304 { 00305 return (iface_index < num_ifaces_) ? ifaces_[iface_index].operator VirtualCanIface*() : nullptr; 00306 } 00307 00308 uint8_t getNumIfaces() const override { return num_ifaces_; } 00309 00310 /** 00311 * This and other methods of ICanDriver will be invoked by the sub-node thread. 00312 */ 00313 int16_t select(uavcan::CanSelectMasks& inout_masks, 00314 const uavcan::CanFrame* (&)[uavcan::MaxCanIfaces], 00315 uavcan::MonotonicTime blocking_deadline) override 00316 { 00317 bool need_block = (inout_masks.write == 0); // Write queue is infinite 00318 for (unsigned i = 0; need_block && (i < num_ifaces_); i++) 00319 { 00320 const bool need_read = inout_masks.read & (1U << i); 00321 if (need_read && ifaces_[i]->hasDataInRxQueue()) 00322 { 00323 need_block = false; 00324 } 00325 } 00326 00327 if (need_block) 00328 { 00329 event_.waitFor(blocking_deadline - clock_.getMonotonic()); 00330 } 00331 00332 inout_masks = uavcan::CanSelectMasks(); 00333 for (unsigned i = 0; i < num_ifaces_; i++) 00334 { 00335 const std::uint8_t iface_mask = 1U << i; 00336 inout_masks.write |= iface_mask; // Always ready to write 00337 if (ifaces_[i]->hasDataInRxQueue()) 00338 { 00339 inout_masks.read |= iface_mask; 00340 } 00341 } 00342 00343 return num_ifaces_; // We're always ready to write, hence > 0. 00344 } 00345 00346 /** 00347 * This handler will be invoked by the main node thread. 00348 */ 00349 void handleRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) override 00350 { 00351 UAVCAN_TRACE("VirtualCanDriver", "RX [flags=%u]: %s", unsigned(flags), frame.toString().c_str()); 00352 if (frame.iface_index < num_ifaces_) 00353 { 00354 ifaces_[frame.iface_index]->addRxFrame(frame, flags); 00355 event_.signal(); 00356 } 00357 } 00358 00359 /** 00360 * This method will be invoked by the main node thread. 00361 */ 00362 void injectTxFramesInto(uavcan::INode& main_node) override 00363 { 00364 for (unsigned i = 0; i < num_ifaces_; i++) 00365 { 00366 ifaces_[i]->flushTxQueueTo(main_node, i); 00367 } 00368 event_.signal(); 00369 } 00370 00371 public: 00372 VirtualCanDriver(unsigned arg_num_ifaces) : num_ifaces_(arg_num_ifaces) 00373 { 00374 assert(num_ifaces_ > 0 && num_ifaces_ <= uavcan::MaxCanIfaces); 00375 00376 const unsigned quota_per_iface = allocator_.getBlockCapacity() / num_ifaces_; 00377 const unsigned quota_per_queue = quota_per_iface; // 2x overcommit 00378 00379 UAVCAN_TRACE("VirtualCanDriver", "Total blocks: %u, quota per queue: %u", 00380 unsigned(allocator_.getBlockCapacity()), unsigned(quota_per_queue)); 00381 00382 for (unsigned i = 0; i < num_ifaces_; i++) 00383 { 00384 ifaces_[i].template construct<uavcan::IPoolAllocator&, uavcan::ISystemClock&, 00385 std::mutex&, unsigned>(allocator_, clock_, mutex_, quota_per_queue); 00386 } 00387 } 00388 }; 00389 00390 static uavcan_linux::NodePtr initMainNode(const std::vector<std::string>& ifaces, uavcan::NodeID nid, 00391 const std::string& name) 00392 { 00393 std::cout << "Initializing main node" << std::endl; 00394 00395 auto node = uavcan_linux::makeNode(ifaces, name.c_str(), 00396 uavcan::protocol::SoftwareVersion(), uavcan::protocol::HardwareVersion(), nid); 00397 node->getLogger().setLevel(uavcan::protocol::debug::LogLevel::DEBUG); 00398 node->setModeOperational(); 00399 return node; 00400 } 00401 00402 template <unsigned QueuePoolSize> 00403 static uavcan_linux::SubNodePtr initSubNode(unsigned num_ifaces, uavcan::INode& main_node) 00404 { 00405 std::cout << "Initializing sub node" << std::endl; 00406 00407 typedef VirtualCanDriver<QueuePoolSize> Driver; 00408 00409 std::shared_ptr<Driver> driver(new Driver(num_ifaces)); 00410 00411 auto node = uavcan_linux::makeSubNode(driver, main_node.getNodeID()); 00412 00413 main_node.getDispatcher().installRxFrameListener(driver.get()); 00414 00415 return node; 00416 } 00417 00418 static void runMainNode(const uavcan_linux::NodePtr& node) 00419 { 00420 std::cout << "Running main node" << std::endl; 00421 00422 auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(10000), [&node](const uavcan::TimerEvent&) 00423 { 00424 node->logInfo("timer", "Your time is running out."); 00425 // coverity[dont_call] 00426 node->setVendorSpecificStatusCode(static_cast<std::uint16_t>(std::rand())); 00427 }); 00428 00429 /* 00430 * We know that in this implementation, VirtualCanDriver inherits uavcan::IRxFrameListener, so we can simply 00431 * restore the reference to ITxQueueInjector using dynamic_cast. In other implementations this may be 00432 * unacceptable, so a reference to ITxQueueInjector will have to be passed using some other means. 00433 */ 00434 if (node->getDispatcher().getRxFrameListener() == nullptr) 00435 { 00436 throw std::logic_error("RX frame listener is not configured"); 00437 } 00438 ITxQueueInjector& tx_injector = dynamic_cast<ITxQueueInjector&>(*node->getDispatcher().getRxFrameListener()); 00439 00440 while (true) 00441 { 00442 const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1)); 00443 if (res < 0) 00444 { 00445 node->logError("spin", "Error %*", res); 00446 } 00447 // TX queue transfer occurs here. 00448 tx_injector.injectTxFramesInto(*node); 00449 } 00450 } 00451 00452 static void runSubNode(const uavcan_linux::SubNodePtr& node) 00453 { 00454 std::cout << "Running sub node" << std::endl; 00455 00456 /* 00457 * Log subscriber 00458 */ 00459 auto log_sub = node->makeSubscriber<uavcan::protocol::debug::LogMessage>( 00460 [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::LogMessage>& msg) 00461 { 00462 std::cout << msg << std::endl; 00463 }); 00464 00465 /* 00466 * Node status monitor 00467 */ 00468 struct NodeStatusMonitor : public uavcan::NodeStatusMonitor 00469 { 00470 explicit NodeStatusMonitor(uavcan::INode& node) : uavcan::NodeStatusMonitor(node) { } 00471 00472 virtual void handleNodeStatusChange(const NodeStatusChangeEvent& event) override 00473 { 00474 std::cout << "Remote node NID " << int(event.node_id.get()) << " changed status: " 00475 << event.old_status.toString() << " --> " << event.status.toString() << std::endl; 00476 } 00477 }; 00478 NodeStatusMonitor nsm(*node); 00479 ENFORCE(0 == nsm.start()); 00480 00481 /* 00482 * KV subscriber 00483 */ 00484 auto kv_sub = node->makeSubscriber<uavcan::protocol::debug::KeyValue>( 00485 [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::KeyValue>& msg) 00486 { 00487 std::cout << msg << std::endl; 00488 }); 00489 00490 /* 00491 * KV publisher 00492 */ 00493 unsigned kv_value = 0; 00494 auto kv_pub = node->makePublisher<uavcan::protocol::debug::KeyValue>(); 00495 auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(5000), [&](const uavcan::TimerEvent&) 00496 { 00497 uavcan::protocol::debug::KeyValue kv; 00498 kv.key = "five_seconds"; 00499 kv.value = kv_value++; 00500 const int res = kv_pub->broadcast(kv); 00501 if (res < 0) 00502 { 00503 std::cerr << "Sub KV pub err " << res << std::endl; 00504 } 00505 }); 00506 00507 while (true) 00508 { 00509 const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1000)); 00510 if (res < 0) 00511 { 00512 std::cerr << "SubNode spin error: " << res << std::endl; 00513 } 00514 } 00515 } 00516 00517 int main(int argc, const char** argv) 00518 { 00519 try 00520 { 00521 testQueue(); 00522 00523 constexpr unsigned VirtualIfacePoolSize = 32768; 00524 00525 if (argc < 3) 00526 { 00527 std::cerr << "Usage:\n\t" << argv[0] << " <node-id> <can-iface-name-1> [can-iface-name-N...]" << std::endl; 00528 return 1; 00529 } 00530 00531 const int self_node_id = std::stoi(argv[1]); 00532 std::vector<std::string> iface_names(argv + 2, argv + argc); 00533 00534 auto node = initMainNode(iface_names, self_node_id, "org.uavcan.linux_test_node"); 00535 auto sub_node = initSubNode<VirtualIfacePoolSize>(iface_names.size(), *node); 00536 00537 std::thread sub_thread([&sub_node](){ runSubNode(sub_node); }); 00538 00539 runMainNode(node); 00540 00541 if (sub_thread.joinable()) 00542 { 00543 std::cout << "Waiting for the sub thread to join" << std::endl; 00544 sub_thread.join(); 00545 } 00546 00547 return 0; 00548 } 00549 catch (const std::exception& ex) 00550 { 00551 std::cerr << "Exception: " << ex.what() << std::endl; 00552 return 1; 00553 } 00554 }
Generated on Tue Jul 12 2022 17:17:34 by 1.7.2