Руслан Урядинский / libuavcan

Dependents:   UAVCAN UAVCAN_Subscriber

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers cluster_manager.hpp Source File

cluster_manager.hpp

00001 /*
00002  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
00003  */
00004 
00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED
00007 
00008 #include <uavcan/build_config.hpp>
00009 #include <uavcan/debug.hpp>
00010 #include <uavcan/util/method_binder.hpp>
00011 #include <uavcan/node/timer.hpp>
00012 #include <uavcan/node/subscriber.hpp>
00013 #include <uavcan/node/publisher.hpp>
00014 #include <uavcan/protocol/dynamic_node_id_server/distributed/log.hpp>
00015 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
00016 #include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
00017 #include <uavcan/protocol/dynamic_node_id_server/event.hpp>
00018 // UAVCAN types
00019 #include <uavcan/protocol/dynamic_node_id/server/Discovery.hpp>
00020 
00021 namespace uavcan
00022 {
00023 namespace dynamic_node_id_server
00024 {
00025 namespace distributed
00026 {
00027 /**
00028  * This class maintains the cluster state.
00029  */
00030 class ClusterManager : private TimerBase
00031 {
00032 public:
00033     enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize };
00034 
00035 private:
00036     typedef MethodBinder<ClusterManager*,
00037                          void (ClusterManager::*)
00038                              (const ReceivedDataStructure<Discovery>&)>
00039         DiscoveryCallback;
00040 
00041     struct Server
00042     {
00043         NodeID node_id;
00044         Log::Index next_index;
00045         Log::Index match_index;
00046 
00047         Server()
00048             : next_index(0)
00049             , match_index(0)
00050         { }
00051 
00052         void resetIndices(const Log& log)
00053         {
00054             next_index = Log::Index(log.getLastIndex() + 1U);
00055             match_index = 0;
00056         }
00057     };
00058 
00059     IStorageBackend& storage_;
00060     IEventTracer& tracer_;
00061     const Log& log_;
00062 
00063     Subscriber<Discovery, DiscoveryCallback> discovery_sub_;
00064     mutable Publisher<Discovery>  discovery_pub_;
00065 
00066     Server servers_[MaxClusterSize - 1];   ///< Minus one because the local server is not listed there.
00067 
00068     uint8_t cluster_size_;
00069     uint8_t num_known_servers_;
00070 
00071     static IStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; }
00072 
00073     INode&       getNode()       { return discovery_sub_.getNode(); }
00074     const INode& getNode() const { return discovery_sub_.getNode(); }
00075 
00076     const Server* findServer(NodeID node_id) const { return const_cast<ClusterManager*>(this)->findServer(node_id); }
00077     Server*       findServer(NodeID node_id)
00078     {
00079         for (uint8_t i = 0; i < num_known_servers_; i++)
00080         {
00081             UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
00082             if (servers_[i].node_id == node_id)
00083             {
00084                 return &servers_[i];
00085             }
00086         }
00087         return UAVCAN_NULLPTR;
00088     }
00089 
00090     virtual void handleTimerEvent(const TimerEvent&)
00091     {
00092         UAVCAN_ASSERT(num_known_servers_ < cluster_size_);
00093 
00094         tracer_.onEvent(TraceRaftDiscoveryBroadcast, num_known_servers_);
00095 
00096         /*
00097          * Filling the message
00098          */
00099         Discovery msg;
00100         msg.configured_cluster_size = cluster_size_;
00101 
00102         msg.known_nodes.push_back(getNode().getNodeID().get());     // Putting ourselves at index 0
00103 
00104         for (uint8_t i = 0; i < num_known_servers_; i++)
00105         {
00106             UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
00107             msg.known_nodes.push_back(servers_[i].node_id.get());
00108         }
00109 
00110         UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1));
00111 
00112         /*
00113          * Broadcasting
00114          */
00115         UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
00116                      "Broadcasting Discovery message; known nodes: %d of %d",
00117                      int(msg.known_nodes.size()), int(cluster_size_));
00118 
00119         const int res = discovery_pub_.broadcast(msg);
00120         if (res < 0)
00121         {
00122             UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Discovery broadcst failed: %d", res);
00123             getNode().registerInternalFailure("Raft discovery broadcast");
00124         }
00125 
00126         /*
00127          * Termination condition
00128          */
00129         if (isClusterDiscovered())
00130         {
00131             UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
00132                          "Discovery broadcasting timer stopped");
00133             stop();
00134         }
00135     }
00136 
00137     void handleDiscovery(const ReceivedDataStructure<Discovery>& msg)
00138     {
00139         tracer_.onEvent(TraceRaftDiscoveryReceived, msg.getSrcNodeID().get());
00140 
00141         /*
00142          * Validating cluster configuration
00143          * If there's a case of misconfiguration, the message will be ignored.
00144          */
00145         if (msg.configured_cluster_size != cluster_size_)
00146         {
00147             tracer_.onEvent(TraceRaftBadClusterSizeReceived, msg.configured_cluster_size);
00148             getNode().registerInternalFailure("Bad Raft cluster size");
00149             return;
00150         }
00151 
00152         /*
00153          * Updating the set of known servers
00154          */
00155         for (uint8_t i = 0; i < msg.known_nodes.size(); i++)
00156         {
00157             if (isClusterDiscovered())
00158             {
00159                 break;
00160             }
00161 
00162             const NodeID node_id(msg.known_nodes[i]);
00163             if (node_id.isUnicast() && !isKnownServer(node_id))
00164             {
00165                 addServer(node_id);
00166             }
00167         }
00168 
00169         /*
00170          * Publishing a new Discovery request if the publishing server needs to learn about more servers.
00171          */
00172         if (msg.configured_cluster_size > msg.known_nodes.size())
00173         {
00174             startDiscoveryPublishingTimerIfNotRunning();
00175         }
00176     }
00177 
00178     void startDiscoveryPublishingTimerIfNotRunning()
00179     {
00180         if (!isRunning())
00181         {
00182             startPeriodic(MonotonicDuration::fromMSec(Discovery::BROADCASTING_PERIOD_MS));
00183         }
00184     }
00185 
00186 public:
00187     enum { ClusterSizeUnknown = 0 };
00188 
00189     /**
00190      * @param node          Needed to publish and subscribe to Discovery message
00191      * @param storage       Needed to read the cluster size parameter from the storage
00192      * @param log           Needed to initialize nextIndex[] values after elections
00193      */
00194     ClusterManager (INode& node, IStorageBackend& storage, const Log& log, IEventTracer& tracer)
00195         : TimerBase(node)
00196         , storage_(storage)
00197         , tracer_(tracer)
00198         , log_(log)
00199         , discovery_sub_(node)
00200         , discovery_pub_(node)
00201         , cluster_size_(0)
00202         , num_known_servers_(0)
00203     { }
00204 
00205     /**
00206      * If cluster_size is set to ClusterSizeUnknown, the class will try to read this parameter from the
00207      * storage backend using key 'cluster_size'.
00208      * Returns negative error code.
00209      */
00210     int init(const uint8_t init_cluster_size, const TransferPriority priority)
00211     {
00212         /*
00213          * Figuring out the cluster size
00214          */
00215         if (init_cluster_size == ClusterSizeUnknown)
00216         {
00217             // Reading from the storage
00218             StorageMarshaller io(storage_);
00219             uint32_t value = 0;
00220             int res = io.get(getStorageKeyForClusterSize(), value);
00221             if (res < 0)
00222             {
00223                 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager",
00224                              "Cluster size is neither configured nor stored in the storage");
00225                 return res;
00226             }
00227             if ((value == 0) || (value > MaxClusterSize))
00228             {
00229                 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Cluster size is invalid");
00230                 return -ErrInvalidConfiguration;
00231             }
00232             cluster_size_ = static_cast<uint8_t>(value);
00233         }
00234         else
00235         {
00236             if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize))
00237             {
00238                 return -ErrInvalidParam;
00239             }
00240             cluster_size_ = init_cluster_size;
00241 
00242             // Writing the storage
00243             StorageMarshaller io(storage_);
00244             uint32_t value = init_cluster_size;
00245             int res = io.setAndGetBack(getStorageKeyForClusterSize(), value);
00246             if ((res < 0) || (value != init_cluster_size))
00247             {
00248                 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Failed to store cluster size");
00249                 return -ErrFailure;
00250             }
00251         }
00252 
00253         tracer_.onEvent(TraceRaftClusterSizeInited, cluster_size_);
00254 
00255         UAVCAN_ASSERT(cluster_size_ > 0);
00256         UAVCAN_ASSERT(cluster_size_ <= MaxClusterSize);
00257 
00258         /*
00259          * Initializing pub/sub and timer
00260          */
00261         int res = discovery_pub_.init(priority);
00262         if (res < 0)
00263         {
00264             return res;
00265         }
00266 
00267         res = discovery_sub_.start(DiscoveryCallback(this, &ClusterManager::handleDiscovery));
00268         if (res < 0)
00269         {
00270             return res;
00271         }
00272 
00273         startDiscoveryPublishingTimerIfNotRunning();
00274 
00275         /*
00276          * Misc
00277          */
00278         resetAllServerIndices();
00279         return 0;
00280     }
00281 
00282     /**
00283      * Adds once server regardless of the discovery logic.
00284      */
00285     void addServer(NodeID node_id)
00286     {
00287         UAVCAN_ASSERT((num_known_servers_ + 1) < MaxClusterSize);
00288         if (!isKnownServer(node_id) && node_id.isUnicast())
00289         {
00290             tracer_.onEvent(TraceRaftNewServerDiscovered, node_id.get());
00291             servers_[num_known_servers_].node_id = node_id;
00292             servers_[num_known_servers_].resetIndices(log_);
00293             num_known_servers_ = static_cast<uint8_t>(num_known_servers_ + 1U);
00294         }
00295         else
00296         {
00297             UAVCAN_ASSERT(0);
00298         }
00299     }
00300 
00301     /**
00302      * Whether such server has been discovered.
00303      */
00304     bool isKnownServer(NodeID node_id) const
00305     {
00306         if (node_id == getNode().getNodeID())
00307         {
00308             return true;
00309         }
00310         for (uint8_t i = 0; i < num_known_servers_; i++)
00311         {
00312             UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
00313             UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID());
00314             if (servers_[i].node_id == node_id)
00315             {
00316                 return true;
00317             }
00318         }
00319         return false;
00320     }
00321 
00322     /**
00323      * An invalid node ID will be returned if there's no such server.
00324      * The local server is not listed there.
00325      */
00326     NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const
00327     {
00328         if (index < num_known_servers_)
00329         {
00330             return servers_[index].node_id;
00331         }
00332         return NodeID();
00333     }
00334 
00335     /**
00336      * See next_index[] in Raft paper.
00337      */
00338     Log::Index getServerNextIndex(NodeID server_node_id) const
00339     {
00340         const Server* const s = findServer(server_node_id);
00341         if (s != UAVCAN_NULLPTR)
00342         {
00343             return s->next_index;
00344         }
00345         UAVCAN_ASSERT(0);
00346         return 0;
00347     }
00348 
00349     void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment)
00350     {
00351         Server* const s = findServer(server_node_id);
00352         if (s != UAVCAN_NULLPTR)
00353         {
00354             s->next_index = Log::Index(s->next_index + increment);
00355         }
00356         else
00357         {
00358             UAVCAN_ASSERT(0);
00359         }
00360     }
00361 
00362     void decrementServerNextIndex(NodeID server_node_id)
00363     {
00364         Server* const s = findServer(server_node_id);
00365         if (s != UAVCAN_NULLPTR)
00366         {
00367             s->next_index--;
00368         }
00369         else
00370         {
00371             UAVCAN_ASSERT(0);
00372         }
00373     }
00374 
00375     /**
00376      * See match_index[] in Raft paper.
00377      */
00378     Log::Index getServerMatchIndex(NodeID server_node_id) const
00379     {
00380         const Server* const s = findServer(server_node_id);
00381         if (s != UAVCAN_NULLPTR)
00382         {
00383             return s->match_index;
00384         }
00385         UAVCAN_ASSERT(0);
00386         return 0;
00387     }
00388 
00389     void setServerMatchIndex(NodeID server_node_id, Log::Index match_index)
00390     {
00391         Server* const s = findServer(server_node_id);
00392         if (s != UAVCAN_NULLPTR)
00393         {
00394             s->match_index = match_index;
00395         }
00396         else
00397         {
00398             UAVCAN_ASSERT(0);
00399         }
00400     }
00401 
00402     /**
00403      * This method must be called when the current server becomes leader.
00404      */
00405     void resetAllServerIndices()
00406     {
00407         for (uint8_t i = 0; i < num_known_servers_; i++)
00408         {
00409             UAVCAN_ASSERT(servers_[i].node_id.isUnicast());
00410             servers_[i].resetIndices(log_);
00411         }
00412     }
00413 
00414     /**
00415      * Number of known servers can only grow, and it never exceeds the cluster size value.
00416      * This number does not include the local server.
00417      */
00418     uint8_t getNumKnownServers() const { return num_known_servers_; }
00419 
00420     /**
00421      * Cluster size and quorum size are constant.
00422      */
00423     uint8_t getClusterSize() const { return cluster_size_; }
00424     uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); }
00425 
00426     bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); }
00427 };
00428 
00429 }
00430 }
00431 }
00432 
00433 #endif // Include guard