libuav original

Dependents:   UAVCAN UAVCAN_Subscriber

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers test_multithreading.cpp Source File

test_multithreading.cpp

00001 /*
00002  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
00003  */
00004 
00005 #ifndef NDEBUG
00006 # define UAVCAN_DEBUG 1
00007 #endif
00008 
00009 #include <iostream>
00010 #include <thread>
00011 #include <condition_variable>
00012 #include <uavcan_linux/uavcan_linux.hpp>
00013 #include <uavcan/node/sub_node.hpp>
00014 #include <uavcan/protocol/node_status_monitor.hpp>
00015 #include <uavcan/protocol/debug/KeyValue.hpp>
00016 #include "debug.hpp"
00017 
00018 /**
00019  * Generic queue based on the linked list class defined in libuavcan.
00020  * This class does not use heap memory.
00021  */
00022 template <typename T>
00023 class Queue
00024 {
00025     struct Item : public uavcan::LinkedListNode<Item>
00026     {
00027         T payload;
00028 
00029         template <typename... Args>
00030         Item(Args... args) : payload(args...) { }
00031     };
00032 
00033     uavcan::LimitedPoolAllocator allocator_;
00034     uavcan::LinkedListRoot<Item>  list_;
00035 
00036 public:
00037     Queue(uavcan::IPoolAllocator& arg_allocator, std::size_t block_allocation_quota) :
00038         allocator_(arg_allocator, block_allocation_quota)
00039     {
00040         uavcan::IsDynamicallyAllocatable<Item>::check();
00041     }
00042 
00043     bool isEmpty() const { return list_.isEmpty(); }
00044 
00045     /**
00046      * Creates one item in-place at the end of the list.
00047      * Returns true if the item was appended successfully, false if there's not enough memory.
00048      * Complexity is O(N) where N is queue length.
00049      */
00050     template <typename... Args>
00051     bool tryEmplace(Args... args)
00052     {
00053         // Allocating memory
00054         void* const ptr = allocator_.allocate(sizeof(Item));
00055         if (ptr == nullptr)
00056         {
00057             return false;
00058         }
00059 
00060         // Constructing the new item
00061         Item* const item = new (ptr) Item(args...);
00062         assert(item != nullptr);
00063 
00064         // Inserting the new item at the end of the list
00065         Item* p = list_.get();
00066         if (p == nullptr)
00067         {
00068             list_.insert(item);
00069         }
00070         else
00071         {
00072             while (p->getNextListNode() != nullptr)
00073             {
00074                 p = p->getNextListNode();
00075             }
00076             assert(p->getNextListNode() == nullptr);
00077             p->setNextListNode(item);
00078             assert(p->getNextListNode()->getNextListNode() == nullptr);
00079         }
00080 
00081         return true;
00082     }
00083 
00084     /**
00085      * Accesses the first element.
00086      * Nullptr will be returned if the queue is empty.
00087      * Complexity is O(1).
00088      */
00089     T*       peek()       { return isEmpty() ? nullptr : &list_.get()->payload; }
00090     const T* peek() const { return isEmpty() ? nullptr : &list_.get()->payload; }
00091 
00092     /**
00093      * Removes the first element.
00094      * If the queue is empty, nothing will be done and assertion failure will be triggered.
00095      * Complexity is O(1).
00096      */
00097     void pop()
00098     {
00099         Item* const item = list_.get();
00100         assert(item != nullptr);
00101         if (item != nullptr)
00102         {
00103             list_.remove(item);
00104             item->~Item();
00105             allocator_.deallocate(item);
00106         }
00107     }
00108 };
00109 
00110 /**
00111  * Feel free to remove.
00112  */
00113 static void testQueue()
00114 {
00115     uavcan::PoolAllocator<1024, uavcan::MemPoolBlockSize> allocator;
00116     Queue<typename uavcan::MakeString<50>::Type> q(allocator, 4);
00117     ENFORCE(q.isEmpty());
00118     ENFORCE(q.peek() == nullptr);
00119     ENFORCE(q.tryEmplace("One"));
00120     ENFORCE(q.tryEmplace("Two"));
00121     ENFORCE(q.tryEmplace("Three"));
00122     ENFORCE(q.tryEmplace("Four"));
00123     ENFORCE(!q.tryEmplace("Five"));
00124     ENFORCE(*q.peek() == "One");
00125     q.pop();
00126     ENFORCE(*q.peek() == "Two");
00127     q.pop();
00128     ENFORCE(*q.peek() == "Three");
00129     q.pop();
00130     ENFORCE(*q.peek() == "Four");
00131     q.pop();
00132     ENFORCE(q.isEmpty());
00133     ENFORCE(q.peek() == nullptr);
00134 }
00135 
00136 /**
00137  * Objects of this class are owned by the sub-node thread.
00138  * This class does not use heap memory.
00139  */
00140 class VirtualCanIface : public uavcan::ICanIface,
00141                         uavcan::Noncopyable
00142 {
00143     struct RxItem
00144     {
00145         const uavcan::CanRxFrame frame;
00146         const uavcan::CanIOFlags flags;
00147 
00148         RxItem(const uavcan::CanRxFrame& arg_frame, uavcan::CanIOFlags arg_flags) :
00149             frame(arg_frame),
00150             flags(arg_flags)
00151         { }
00152     };
00153 
00154     std::mutex& mutex_;
00155     uavcan::CanTxQueue prioritized_tx_queue_;
00156     Queue<RxItem> rx_queue_;
00157 
00158     int16_t send(const uavcan::CanFrame& frame, uavcan::MonotonicTime tx_deadline, uavcan::CanIOFlags flags) override
00159     {
00160         std::lock_guard<std::mutex> lock(mutex_);
00161         prioritized_tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags);
00162         return 1;
00163     }
00164 
00165     int16_t receive(uavcan::CanFrame& out_frame, uavcan::MonotonicTime& out_ts_monotonic,
00166                     uavcan::UtcTime& out_ts_utc, uavcan::CanIOFlags& out_flags) override
00167     {
00168         std::lock_guard<std::mutex> lock(mutex_);
00169 
00170         if (rx_queue_.isEmpty())
00171         {
00172             return 0;
00173         }
00174 
00175         const auto item = *rx_queue_.peek();
00176         rx_queue_.pop();
00177 
00178         out_frame = item.frame;
00179         out_ts_monotonic = item.frame.ts_mono;
00180         out_ts_utc = item.frame.ts_utc;
00181         out_flags = item.flags;
00182 
00183         return 1;
00184     }
00185 
00186     int16_t configureFilters(const uavcan::CanFilterConfig*, std::uint16_t) override { return -uavcan::ErrDriver; }
00187     uint16_t getNumFilters() const override { return 0; }
00188     uint64_t getErrorCount() const override { return 0; }
00189 
00190 public:
00191     VirtualCanIface(uavcan::IPoolAllocator& allocator, uavcan::ISystemClock& clock,
00192                     std::mutex& arg_mutex, unsigned quota_per_queue) :
00193         mutex_(arg_mutex),
00194         prioritized_tx_queue_(allocator, clock, quota_per_queue),
00195         rx_queue_(allocator, quota_per_queue)
00196     { }
00197 
00198     /**
00199      * Note that RX queue overwrites oldest items when overflowed.
00200      * Call this from the main thread only.
00201      * No additional locking is required.
00202      */
00203     void addRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags)
00204     {
00205         std::lock_guard<std::mutex> lock(mutex_);
00206         if (!rx_queue_.tryEmplace(frame, flags) && !rx_queue_.isEmpty())
00207         {
00208             rx_queue_.pop();
00209             (void)rx_queue_.tryEmplace(frame, flags);
00210         }
00211     }
00212 
00213     /**
00214      * Call this from the main thread only.
00215      * No additional locking is required.
00216      */
00217     void flushTxQueueTo(uavcan::INode& main_node, std::uint8_t iface_index)
00218     {
00219         std::lock_guard<std::mutex> lock(mutex_);
00220 
00221         const std::uint8_t iface_mask = static_cast<std::uint8_t>(1U << iface_index);
00222 
00223         while (auto e = prioritized_tx_queue_.peek())
00224         {
00225             UAVCAN_TRACE("VirtualCanIface", "TX injection [iface=0x%02x]: %s",
00226                          unsigned(iface_mask), e->toString().c_str());
00227 
00228             const int res = main_node.injectTxFrame(e->frame, e->deadline, iface_mask,
00229                                                     uavcan::CanTxQueue::Qos(e->qos), e->flags);
00230             prioritized_tx_queue_.remove(e);
00231             if (res <= 0)
00232             {
00233                 break;
00234             }
00235         }
00236     }
00237 
00238     /**
00239      * Call this from the sub-node thread only.
00240      * No additional locking is required.
00241      */
00242     bool hasDataInRxQueue() const
00243     {
00244         std::lock_guard<std::mutex> lock(mutex_);
00245         return !rx_queue_.isEmpty();
00246     }
00247 };
00248 
00249 /**
00250  * This interface defines one method that will be called by the main node thread periodically in order to
00251  * transfer contents of TX queue of the sub-node into the TX queue of the main node.
00252  */
00253 class ITxQueueInjector
00254 {
00255 public:
00256     virtual ~ITxQueueInjector() { }
00257 
00258     /**
00259      * Flush contents of TX queues into the main node.
00260      * @param main_node         Reference to the main node.
00261      */
00262     virtual void injectTxFramesInto(uavcan::INode& main_node) = 0;
00263 };
00264 
00265 /**
00266  * Objects of this class are owned by the sub-node thread.
00267  * This class does not use heap memory.
00268  * @tparam SharedMemoryPoolSize         Amount of memory, in bytes, that will be statically allocated for the
00269  *                                      memory pool that will be shared across all interfaces for RX/TX queues.
00270  *                                      Typically this value should be no less than 4K per interface.
00271  */
00272 template <unsigned SharedMemoryPoolSize>
00273 class VirtualCanDriver : public uavcan::ICanDriver,
00274                          public uavcan::IRxFrameListener,
00275                          public ITxQueueInjector,
00276                          uavcan::Noncopyable
00277 {
00278     class Event
00279     {
00280         std::mutex m_;
00281         std::condition_variable cv_;
00282 
00283     public:
00284         /**
00285          * Note that this method may return spuriously.
00286          */
00287         void waitFor(uavcan::MonotonicDuration duration)
00288         {
00289             std::unique_lock<std::mutex> lk(m_);
00290             (void)cv_.wait_for(lk, std::chrono::microseconds(duration.toUSec()));
00291         }
00292 
00293         void signal() { cv_.notify_all(); }
00294     };
00295 
00296     Event event_;               ///< Used to unblock the select() call when IO happens.
00297     std::mutex mutex_;                                                                  ///< Shared across all ifaces
00298     uavcan::PoolAllocator<SharedMemoryPoolSize, uavcan::MemPoolBlockSize>  allocator_;   ///< Shared across all ifaces
00299     uavcan::LazyConstructor<VirtualCanIface>  ifaces_[uavcan::MaxCanIfaces];
00300     const unsigned num_ifaces_;
00301     uavcan_linux::SystemClock clock_;
00302 
00303     uavcan::ICanIface* getIface(uint8_t iface_index) override
00304     {
00305         return (iface_index < num_ifaces_) ? ifaces_[iface_index].operator VirtualCanIface*() : nullptr;
00306     }
00307 
00308     uint8_t getNumIfaces() const override { return num_ifaces_; }
00309 
00310     /**
00311      * This and other methods of ICanDriver will be invoked by the sub-node thread.
00312      */
00313     int16_t select(uavcan::CanSelectMasks& inout_masks,
00314                    const uavcan::CanFrame* (&)[uavcan::MaxCanIfaces],
00315                    uavcan::MonotonicTime blocking_deadline) override
00316     {
00317         bool need_block = (inout_masks.write == 0);    // Write queue is infinite
00318         for (unsigned i = 0; need_block && (i < num_ifaces_); i++)
00319         {
00320             const bool need_read = inout_masks.read & (1U << i);
00321             if (need_read && ifaces_[i]->hasDataInRxQueue())
00322             {
00323                 need_block = false;
00324             }
00325         }
00326 
00327         if (need_block)
00328         {
00329             event_.waitFor(blocking_deadline - clock_.getMonotonic());
00330         }
00331 
00332         inout_masks = uavcan::CanSelectMasks();
00333         for (unsigned i = 0; i < num_ifaces_; i++)
00334         {
00335             const std::uint8_t iface_mask = 1U << i;
00336             inout_masks.write |= iface_mask;           // Always ready to write
00337             if (ifaces_[i]->hasDataInRxQueue())
00338             {
00339                 inout_masks.read |= iface_mask;
00340             }
00341         }
00342 
00343         return num_ifaces_;       // We're always ready to write, hence > 0.
00344     }
00345 
00346     /**
00347      * This handler will be invoked by the main node thread.
00348      */
00349     void handleRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) override
00350     {
00351         UAVCAN_TRACE("VirtualCanDriver", "RX [flags=%u]: %s", unsigned(flags), frame.toString().c_str());
00352         if (frame.iface_index < num_ifaces_)
00353         {
00354             ifaces_[frame.iface_index]->addRxFrame(frame, flags);
00355             event_.signal();
00356         }
00357     }
00358 
00359     /**
00360      * This method will be invoked by the main node thread.
00361      */
00362     void injectTxFramesInto(uavcan::INode& main_node) override
00363     {
00364         for (unsigned i = 0; i < num_ifaces_; i++)
00365         {
00366             ifaces_[i]->flushTxQueueTo(main_node, i);
00367         }
00368         event_.signal();
00369     }
00370 
00371 public:
00372     VirtualCanDriver(unsigned arg_num_ifaces) : num_ifaces_(arg_num_ifaces)
00373     {
00374         assert(num_ifaces_ > 0 && num_ifaces_ <= uavcan::MaxCanIfaces);
00375 
00376         const unsigned quota_per_iface = allocator_.getBlockCapacity() / num_ifaces_;
00377         const unsigned quota_per_queue = quota_per_iface;             // 2x overcommit
00378 
00379         UAVCAN_TRACE("VirtualCanDriver", "Total blocks: %u, quota per queue: %u",
00380                      unsigned(allocator_.getBlockCapacity()), unsigned(quota_per_queue));
00381 
00382         for (unsigned i = 0; i < num_ifaces_; i++)
00383         {
00384             ifaces_[i].template construct<uavcan::IPoolAllocator&, uavcan::ISystemClock&,
00385                                           std::mutex&, unsigned>(allocator_, clock_, mutex_, quota_per_queue);
00386         }
00387     }
00388 };
00389 
00390 static uavcan_linux::NodePtr initMainNode(const std::vector<std::string>& ifaces, uavcan::NodeID nid,
00391                                           const std::string& name)
00392 {
00393     std::cout << "Initializing main node" << std::endl;
00394 
00395     auto node = uavcan_linux::makeNode(ifaces, name.c_str(),
00396                                        uavcan::protocol::SoftwareVersion(), uavcan::protocol::HardwareVersion(), nid);
00397     node->getLogger().setLevel(uavcan::protocol::debug::LogLevel::DEBUG);
00398     node->setModeOperational();
00399     return node;
00400 }
00401 
00402 template <unsigned QueuePoolSize>
00403 static uavcan_linux::SubNodePtr initSubNode(unsigned num_ifaces, uavcan::INode& main_node)
00404 {
00405     std::cout << "Initializing sub node" << std::endl;
00406 
00407     typedef VirtualCanDriver<QueuePoolSize> Driver;
00408 
00409     std::shared_ptr<Driver> driver(new Driver(num_ifaces));
00410 
00411     auto node = uavcan_linux::makeSubNode(driver, main_node.getNodeID());
00412 
00413     main_node.getDispatcher().installRxFrameListener(driver.get());
00414 
00415     return node;
00416 }
00417 
00418 static void runMainNode(const uavcan_linux::NodePtr& node)
00419 {
00420     std::cout << "Running main node" << std::endl;
00421 
00422     auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(10000), [&node](const uavcan::TimerEvent&)
00423         {
00424             node->logInfo("timer", "Your time is running out.");
00425             // coverity[dont_call]
00426             node->setVendorSpecificStatusCode(static_cast<std::uint16_t>(std::rand()));
00427         });
00428 
00429     /*
00430      * We know that in this implementation, VirtualCanDriver inherits uavcan::IRxFrameListener, so we can simply
00431      * restore the reference to ITxQueueInjector using dynamic_cast. In other implementations this may be
00432      * unacceptable, so a reference to ITxQueueInjector will have to be passed using some other means.
00433      */
00434     if (node->getDispatcher().getRxFrameListener() == nullptr)
00435     {
00436         throw std::logic_error("RX frame listener is not configured");
00437     }
00438     ITxQueueInjector& tx_injector = dynamic_cast<ITxQueueInjector&>(*node->getDispatcher().getRxFrameListener());
00439 
00440     while (true)
00441     {
00442         const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1));
00443         if (res < 0)
00444         {
00445             node->logError("spin", "Error %*", res);
00446         }
00447         // TX queue transfer occurs here.
00448         tx_injector.injectTxFramesInto(*node);
00449     }
00450 }
00451 
00452 static void runSubNode(const uavcan_linux::SubNodePtr& node)
00453 {
00454     std::cout << "Running sub node" << std::endl;
00455 
00456     /*
00457      * Log subscriber
00458      */
00459     auto log_sub = node->makeSubscriber<uavcan::protocol::debug::LogMessage>(
00460         [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::LogMessage>& msg)
00461         {
00462             std::cout << msg << std::endl;
00463         });
00464 
00465     /*
00466      * Node status monitor
00467      */
00468     struct NodeStatusMonitor : public uavcan::NodeStatusMonitor
00469     {
00470         explicit NodeStatusMonitor(uavcan::INode& node) : uavcan::NodeStatusMonitor(node) { }
00471 
00472         virtual void handleNodeStatusChange(const NodeStatusChangeEvent& event) override
00473         {
00474             std::cout << "Remote node NID " << int(event.node_id.get()) << " changed status: "
00475                       << event.old_status.toString() << " --> " << event.status.toString() << std::endl;
00476         }
00477     };
00478     NodeStatusMonitor nsm(*node);
00479     ENFORCE(0 == nsm.start());
00480 
00481     /*
00482      * KV subscriber
00483      */
00484     auto kv_sub = node->makeSubscriber<uavcan::protocol::debug::KeyValue>(
00485         [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::KeyValue>& msg)
00486         {
00487             std::cout << msg << std::endl;
00488         });
00489 
00490     /*
00491      * KV publisher
00492      */
00493     unsigned kv_value = 0;
00494     auto kv_pub = node->makePublisher<uavcan::protocol::debug::KeyValue>();
00495     auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(5000), [&](const uavcan::TimerEvent&)
00496         {
00497             uavcan::protocol::debug::KeyValue kv;
00498             kv.key = "five_seconds";
00499             kv.value = kv_value++;
00500             const int res = kv_pub->broadcast(kv);
00501             if (res < 0)
00502             {
00503                 std::cerr << "Sub KV pub err " << res << std::endl;
00504             }
00505         });
00506 
00507     while (true)
00508     {
00509         const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1000));
00510         if (res < 0)
00511         {
00512             std::cerr << "SubNode spin error: " << res << std::endl;
00513         }
00514     }
00515 }
00516 
00517 int main(int argc, const char** argv)
00518 {
00519     try
00520     {
00521         testQueue();
00522 
00523         constexpr unsigned VirtualIfacePoolSize = 32768;
00524 
00525         if (argc < 3)
00526         {
00527             std::cerr << "Usage:\n\t" << argv[0] << " <node-id> <can-iface-name-1> [can-iface-name-N...]" << std::endl;
00528             return 1;
00529         }
00530 
00531         const int self_node_id = std::stoi(argv[1]);
00532         std::vector<std::string> iface_names(argv + 2, argv + argc);
00533 
00534         auto node = initMainNode(iface_names, self_node_id, "org.uavcan.linux_test_node");
00535         auto sub_node = initSubNode<VirtualIfacePoolSize>(iface_names.size(), *node);
00536 
00537         std::thread sub_thread([&sub_node](){ runSubNode(sub_node); });
00538 
00539         runMainNode(node);
00540 
00541         if (sub_thread.joinable())
00542         {
00543             std::cout << "Waiting for the sub thread to join" << std::endl;
00544             sub_thread.join();
00545         }
00546 
00547         return 0;
00548     }
00549     catch (const std::exception& ex)
00550     {
00551         std::cerr << "Exception: " << ex.what() << std::endl;
00552         return 1;
00553     }
00554 }