Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: UAVCAN UAVCAN_Subscriber
cluster_manager.hpp
00001 /* 00002 * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com> 00003 */ 00004 00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED 00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_CLUSTER_MANAGER_HPP_INCLUDED 00007 00008 #include <uavcan/build_config.hpp> 00009 #include <uavcan/debug.hpp> 00010 #include <uavcan/util/method_binder.hpp> 00011 #include <uavcan/node/timer.hpp> 00012 #include <uavcan/node/subscriber.hpp> 00013 #include <uavcan/node/publisher.hpp> 00014 #include <uavcan/protocol/dynamic_node_id_server/distributed/log.hpp> 00015 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp> 00016 #include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp> 00017 #include <uavcan/protocol/dynamic_node_id_server/event.hpp> 00018 // UAVCAN types 00019 #include <uavcan/protocol/dynamic_node_id/server/Discovery.hpp> 00020 00021 namespace uavcan 00022 { 00023 namespace dynamic_node_id_server 00024 { 00025 namespace distributed 00026 { 00027 /** 00028 * This class maintains the cluster state. 00029 */ 00030 class ClusterManager : private TimerBase 00031 { 00032 public: 00033 enum { MaxClusterSize = Discovery::FieldTypes::known_nodes::MaxSize }; 00034 00035 private: 00036 typedef MethodBinder<ClusterManager*, 00037 void (ClusterManager::*) 00038 (const ReceivedDataStructure<Discovery>&)> 00039 DiscoveryCallback; 00040 00041 struct Server 00042 { 00043 NodeID node_id; 00044 Log::Index next_index; 00045 Log::Index match_index; 00046 00047 Server() 00048 : next_index(0) 00049 , match_index(0) 00050 { } 00051 00052 void resetIndices(const Log& log) 00053 { 00054 next_index = Log::Index(log.getLastIndex() + 1U); 00055 match_index = 0; 00056 } 00057 }; 00058 00059 IStorageBackend& storage_; 00060 IEventTracer& tracer_; 00061 const Log& log_; 00062 00063 Subscriber<Discovery, DiscoveryCallback> discovery_sub_; 00064 mutable Publisher<Discovery> discovery_pub_; 00065 00066 Server servers_[MaxClusterSize - 1]; ///< Minus one because the local server is not listed there. 00067 00068 uint8_t cluster_size_; 00069 uint8_t num_known_servers_; 00070 00071 static IStorageBackend::String getStorageKeyForClusterSize() { return "cluster_size"; } 00072 00073 INode& getNode() { return discovery_sub_.getNode(); } 00074 const INode& getNode() const { return discovery_sub_.getNode(); } 00075 00076 const Server* findServer(NodeID node_id) const { return const_cast<ClusterManager*>(this)->findServer(node_id); } 00077 Server* findServer(NodeID node_id) 00078 { 00079 for (uint8_t i = 0; i < num_known_servers_; i++) 00080 { 00081 UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); 00082 if (servers_[i].node_id == node_id) 00083 { 00084 return &servers_[i]; 00085 } 00086 } 00087 return UAVCAN_NULLPTR; 00088 } 00089 00090 virtual void handleTimerEvent(const TimerEvent&) 00091 { 00092 UAVCAN_ASSERT(num_known_servers_ < cluster_size_); 00093 00094 tracer_.onEvent(TraceRaftDiscoveryBroadcast, num_known_servers_); 00095 00096 /* 00097 * Filling the message 00098 */ 00099 Discovery msg; 00100 msg.configured_cluster_size = cluster_size_; 00101 00102 msg.known_nodes.push_back(getNode().getNodeID().get()); // Putting ourselves at index 0 00103 00104 for (uint8_t i = 0; i < num_known_servers_; i++) 00105 { 00106 UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); 00107 msg.known_nodes.push_back(servers_[i].node_id.get()); 00108 } 00109 00110 UAVCAN_ASSERT(msg.known_nodes.size() == (num_known_servers_ + 1)); 00111 00112 /* 00113 * Broadcasting 00114 */ 00115 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", 00116 "Broadcasting Discovery message; known nodes: %d of %d", 00117 int(msg.known_nodes.size()), int(cluster_size_)); 00118 00119 const int res = discovery_pub_.broadcast(msg); 00120 if (res < 0) 00121 { 00122 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Discovery broadcst failed: %d", res); 00123 getNode().registerInternalFailure("Raft discovery broadcast"); 00124 } 00125 00126 /* 00127 * Termination condition 00128 */ 00129 if (isClusterDiscovered()) 00130 { 00131 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", 00132 "Discovery broadcasting timer stopped"); 00133 stop(); 00134 } 00135 } 00136 00137 void handleDiscovery(const ReceivedDataStructure<Discovery>& msg) 00138 { 00139 tracer_.onEvent(TraceRaftDiscoveryReceived, msg.getSrcNodeID().get()); 00140 00141 /* 00142 * Validating cluster configuration 00143 * If there's a case of misconfiguration, the message will be ignored. 00144 */ 00145 if (msg.configured_cluster_size != cluster_size_) 00146 { 00147 tracer_.onEvent(TraceRaftBadClusterSizeReceived, msg.configured_cluster_size); 00148 getNode().registerInternalFailure("Bad Raft cluster size"); 00149 return; 00150 } 00151 00152 /* 00153 * Updating the set of known servers 00154 */ 00155 for (uint8_t i = 0; i < msg.known_nodes.size(); i++) 00156 { 00157 if (isClusterDiscovered()) 00158 { 00159 break; 00160 } 00161 00162 const NodeID node_id(msg.known_nodes[i]); 00163 if (node_id.isUnicast() && !isKnownServer(node_id)) 00164 { 00165 addServer(node_id); 00166 } 00167 } 00168 00169 /* 00170 * Publishing a new Discovery request if the publishing server needs to learn about more servers. 00171 */ 00172 if (msg.configured_cluster_size > msg.known_nodes.size()) 00173 { 00174 startDiscoveryPublishingTimerIfNotRunning(); 00175 } 00176 } 00177 00178 void startDiscoveryPublishingTimerIfNotRunning() 00179 { 00180 if (!isRunning()) 00181 { 00182 startPeriodic(MonotonicDuration::fromMSec(Discovery::BROADCASTING_PERIOD_MS)); 00183 } 00184 } 00185 00186 public: 00187 enum { ClusterSizeUnknown = 0 }; 00188 00189 /** 00190 * @param node Needed to publish and subscribe to Discovery message 00191 * @param storage Needed to read the cluster size parameter from the storage 00192 * @param log Needed to initialize nextIndex[] values after elections 00193 */ 00194 ClusterManager (INode& node, IStorageBackend& storage, const Log& log, IEventTracer& tracer) 00195 : TimerBase(node) 00196 , storage_(storage) 00197 , tracer_(tracer) 00198 , log_(log) 00199 , discovery_sub_(node) 00200 , discovery_pub_(node) 00201 , cluster_size_(0) 00202 , num_known_servers_(0) 00203 { } 00204 00205 /** 00206 * If cluster_size is set to ClusterSizeUnknown, the class will try to read this parameter from the 00207 * storage backend using key 'cluster_size'. 00208 * Returns negative error code. 00209 */ 00210 int init(const uint8_t init_cluster_size, const TransferPriority priority) 00211 { 00212 /* 00213 * Figuring out the cluster size 00214 */ 00215 if (init_cluster_size == ClusterSizeUnknown) 00216 { 00217 // Reading from the storage 00218 StorageMarshaller io(storage_); 00219 uint32_t value = 0; 00220 int res = io.get(getStorageKeyForClusterSize(), value); 00221 if (res < 0) 00222 { 00223 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", 00224 "Cluster size is neither configured nor stored in the storage"); 00225 return res; 00226 } 00227 if ((value == 0) || (value > MaxClusterSize)) 00228 { 00229 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Cluster size is invalid"); 00230 return -ErrInvalidConfiguration; 00231 } 00232 cluster_size_ = static_cast<uint8_t>(value); 00233 } 00234 else 00235 { 00236 if ((init_cluster_size == 0) || (init_cluster_size > MaxClusterSize)) 00237 { 00238 return -ErrInvalidParam; 00239 } 00240 cluster_size_ = init_cluster_size; 00241 00242 // Writing the storage 00243 StorageMarshaller io(storage_); 00244 uint32_t value = init_cluster_size; 00245 int res = io.setAndGetBack(getStorageKeyForClusterSize(), value); 00246 if ((res < 0) || (value != init_cluster_size)) 00247 { 00248 UAVCAN_TRACE("dynamic_node_id_server::distributed::ClusterManager", "Failed to store cluster size"); 00249 return -ErrFailure; 00250 } 00251 } 00252 00253 tracer_.onEvent(TraceRaftClusterSizeInited, cluster_size_); 00254 00255 UAVCAN_ASSERT(cluster_size_ > 0); 00256 UAVCAN_ASSERT(cluster_size_ <= MaxClusterSize); 00257 00258 /* 00259 * Initializing pub/sub and timer 00260 */ 00261 int res = discovery_pub_.init(priority); 00262 if (res < 0) 00263 { 00264 return res; 00265 } 00266 00267 res = discovery_sub_.start(DiscoveryCallback(this, &ClusterManager::handleDiscovery)); 00268 if (res < 0) 00269 { 00270 return res; 00271 } 00272 00273 startDiscoveryPublishingTimerIfNotRunning(); 00274 00275 /* 00276 * Misc 00277 */ 00278 resetAllServerIndices(); 00279 return 0; 00280 } 00281 00282 /** 00283 * Adds once server regardless of the discovery logic. 00284 */ 00285 void addServer(NodeID node_id) 00286 { 00287 UAVCAN_ASSERT((num_known_servers_ + 1) < MaxClusterSize); 00288 if (!isKnownServer(node_id) && node_id.isUnicast()) 00289 { 00290 tracer_.onEvent(TraceRaftNewServerDiscovered, node_id.get()); 00291 servers_[num_known_servers_].node_id = node_id; 00292 servers_[num_known_servers_].resetIndices(log_); 00293 num_known_servers_ = static_cast<uint8_t>(num_known_servers_ + 1U); 00294 } 00295 else 00296 { 00297 UAVCAN_ASSERT(0); 00298 } 00299 } 00300 00301 /** 00302 * Whether such server has been discovered. 00303 */ 00304 bool isKnownServer(NodeID node_id) const 00305 { 00306 if (node_id == getNode().getNodeID()) 00307 { 00308 return true; 00309 } 00310 for (uint8_t i = 0; i < num_known_servers_; i++) 00311 { 00312 UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); 00313 UAVCAN_ASSERT(servers_[i].node_id != getNode().getNodeID()); 00314 if (servers_[i].node_id == node_id) 00315 { 00316 return true; 00317 } 00318 } 00319 return false; 00320 } 00321 00322 /** 00323 * An invalid node ID will be returned if there's no such server. 00324 * The local server is not listed there. 00325 */ 00326 NodeID getRemoteServerNodeIDAtIndex(uint8_t index) const 00327 { 00328 if (index < num_known_servers_) 00329 { 00330 return servers_[index].node_id; 00331 } 00332 return NodeID(); 00333 } 00334 00335 /** 00336 * See next_index[] in Raft paper. 00337 */ 00338 Log::Index getServerNextIndex(NodeID server_node_id) const 00339 { 00340 const Server* const s = findServer(server_node_id); 00341 if (s != UAVCAN_NULLPTR) 00342 { 00343 return s->next_index; 00344 } 00345 UAVCAN_ASSERT(0); 00346 return 0; 00347 } 00348 00349 void incrementServerNextIndexBy(NodeID server_node_id, Log::Index increment) 00350 { 00351 Server* const s = findServer(server_node_id); 00352 if (s != UAVCAN_NULLPTR) 00353 { 00354 s->next_index = Log::Index(s->next_index + increment); 00355 } 00356 else 00357 { 00358 UAVCAN_ASSERT(0); 00359 } 00360 } 00361 00362 void decrementServerNextIndex(NodeID server_node_id) 00363 { 00364 Server* const s = findServer(server_node_id); 00365 if (s != UAVCAN_NULLPTR) 00366 { 00367 s->next_index--; 00368 } 00369 else 00370 { 00371 UAVCAN_ASSERT(0); 00372 } 00373 } 00374 00375 /** 00376 * See match_index[] in Raft paper. 00377 */ 00378 Log::Index getServerMatchIndex(NodeID server_node_id) const 00379 { 00380 const Server* const s = findServer(server_node_id); 00381 if (s != UAVCAN_NULLPTR) 00382 { 00383 return s->match_index; 00384 } 00385 UAVCAN_ASSERT(0); 00386 return 0; 00387 } 00388 00389 void setServerMatchIndex(NodeID server_node_id, Log::Index match_index) 00390 { 00391 Server* const s = findServer(server_node_id); 00392 if (s != UAVCAN_NULLPTR) 00393 { 00394 s->match_index = match_index; 00395 } 00396 else 00397 { 00398 UAVCAN_ASSERT(0); 00399 } 00400 } 00401 00402 /** 00403 * This method must be called when the current server becomes leader. 00404 */ 00405 void resetAllServerIndices() 00406 { 00407 for (uint8_t i = 0; i < num_known_servers_; i++) 00408 { 00409 UAVCAN_ASSERT(servers_[i].node_id.isUnicast()); 00410 servers_[i].resetIndices(log_); 00411 } 00412 } 00413 00414 /** 00415 * Number of known servers can only grow, and it never exceeds the cluster size value. 00416 * This number does not include the local server. 00417 */ 00418 uint8_t getNumKnownServers() const { return num_known_servers_; } 00419 00420 /** 00421 * Cluster size and quorum size are constant. 00422 */ 00423 uint8_t getClusterSize() const { return cluster_size_; } 00424 uint8_t getQuorumSize() const { return static_cast<uint8_t>(cluster_size_ / 2U + 1U); } 00425 00426 bool isClusterDiscovered() const { return num_known_servers_ == (cluster_size_ - 1); } 00427 }; 00428 00429 } 00430 } 00431 } 00432 00433 #endif // Include guard
Generated on Tue Jul 12 2022 17:17:30 by
