libuav original
Dependents: UAVCAN UAVCAN_Subscriber
server.hpp
00001 /* 00002 * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com> 00003 */ 00004 00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED 00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED 00007 00008 #include <uavcan/build_config.hpp> 00009 #include <uavcan/debug.hpp> 00010 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp> 00011 #include <uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp> 00012 #include <uavcan/protocol/dynamic_node_id_server/abstract_server.hpp> 00013 #include <uavcan/protocol/dynamic_node_id_server/node_id_selector.hpp> 00014 #include <uavcan/protocol/dynamic_node_id_server/event.hpp> 00015 00016 namespace uavcan 00017 { 00018 namespace dynamic_node_id_server 00019 { 00020 namespace distributed 00021 { 00022 /** 00023 * This class implements the top-level allocation logic and server API. 00024 */ 00025 class UAVCAN_EXPORT Server : public AbstractServer 00026 , IRaftLeaderMonitor 00027 { 00028 struct UniqueIDLogPredicate 00029 { 00030 const UniqueID unique_id; 00031 00032 UniqueIDLogPredicate(const UniqueID& uid) 00033 : unique_id(uid) 00034 { } 00035 00036 bool operator()(const RaftCore::LogEntryInfo& info) const 00037 { 00038 return info.entry.unique_id == unique_id; 00039 } 00040 }; 00041 00042 struct NodeIDLogPredicate 00043 { 00044 const NodeID node_id; 00045 00046 NodeIDLogPredicate(const NodeID& nid) 00047 : node_id(nid) 00048 { } 00049 00050 bool operator()(const RaftCore::LogEntryInfo& info) const 00051 { 00052 return info.entry.node_id == node_id.get(); 00053 } 00054 }; 00055 00056 /* 00057 * States 00058 */ 00059 RaftCore raft_core_; 00060 00061 /* 00062 * Methods of IAllocationRequestHandler 00063 */ 00064 virtual bool canPublishFollowupAllocationResponse() const 00065 { 00066 /* 00067 * The server is allowed to publish follow-up allocation responses only if both conditions are met: 00068 * - The server is leader. 00069 * - The last allocation request has been completed successfully. 00070 * 00071 * Why second condition? Imagine a case when there's two Raft nodes that don't hear each other - A and B, 00072 * both of them are leaders (but only A can commit to the log, B is in a minor partition); then there's a 00073 * client X that can exchange with both leaders, and a client Y that can exchange only with A. Such a 00074 * situation can occur in case of a very unlikely failure of redundant interfaces. 00075 * 00076 * Both clients X and Y initially send a first-stage Allocation request; A responds to Y with a first-stage 00077 * response, whereas B responds to X. Both X and Y will issue a follow-up second-stage requests, which may 00078 * cause A to mix second-stage Allocation requests from different nodes, leading to reception of an invalid 00079 * unique ID. When both leaders receive full unique IDs (A will receive an invalid one, B will receive a valid 00080 * unique ID of X), only A will be able to make a commit, because B is in a minority. Since both clients were 00081 * unable to receive node ID values in this round, they will try again later. 00082 * 00083 * Now, in order to prevent B from disrupting client-server communication second time around, we introduce this 00084 * second restriction: the server cannot exchange with clients as long as its log contains uncommitted entries. 00085 * 00086 * Note that this restriction does not apply to allocation requests sent via CAN FD frames, as in this case 00087 * no follow-up responses are necessary. So only CAN FD can offer reliable Allocation exchange. 00088 */ 00089 return raft_core_.isLeader() && raft_core_.areAllLogEntriesCommitted(); 00090 } 00091 00092 virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id) 00093 { 00094 /* 00095 * Note that it is possible that the local node is not leader. We will still perform the log search 00096 * and try to find the node that requested allocation. If the node is found, response will be sent; 00097 * otherwise the request will be ignored because only leader can add new allocations. 00098 */ 00099 const LazyConstructor<RaftCore::LogEntryInfo> result = 00100 raft_core_.traverseLogFromEndUntil(UniqueIDLogPredicate(unique_id)); 00101 00102 if (result.isConstructed()) 00103 { 00104 if (result->committed) 00105 { 00106 tryPublishAllocationResult(result->entry); 00107 UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", 00108 "Allocation request served with existing allocation; node ID %d", 00109 int(result->entry.node_id)); 00110 } 00111 else 00112 { 00113 UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", 00114 "Allocation request ignored - allocation exists but not committed yet; node ID %d", 00115 int(result->entry.node_id)); 00116 } 00117 } 00118 else 00119 { 00120 if (raft_core_.isLeader() && !node_discoverer_.hasUnknownNodes()) 00121 { 00122 allocateNewNode(unique_id, preferred_node_id); 00123 } 00124 } 00125 } 00126 00127 /* 00128 * Methods of INodeDiscoveryHandler 00129 */ 00130 virtual bool canDiscoverNewNodes() const 00131 { 00132 return raft_core_.isLeader(); 00133 } 00134 00135 virtual NodeAwareness checkNodeAwareness(NodeID node_id) const 00136 { 00137 const LazyConstructor<RaftCore::LogEntryInfo> result = 00138 raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id)); 00139 if (result.isConstructed()) 00140 { 00141 return result->committed ? NodeAwarenessKnownAndCommitted : NodeAwarenessKnownButNotCommitted; 00142 } 00143 else 00144 { 00145 return NodeAwarenessUnknown; 00146 } 00147 } 00148 00149 virtual void handleNewNodeDiscovery(const UniqueID* unique_id_or_null, NodeID node_id) 00150 { 00151 if (raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id)).isConstructed()) 00152 { 00153 UAVCAN_ASSERT(0); // Such node is already known, the class that called this method should have known that 00154 return; 00155 } 00156 00157 const UniqueID uid = (unique_id_or_null == UAVCAN_NULLPTR) ? UniqueID() : *unique_id_or_null; 00158 00159 if (raft_core_.isLeader()) 00160 { 00161 raft_core_.appendLog(uid, node_id); 00162 } 00163 } 00164 00165 /* 00166 * Methods of IRaftLeaderMonitor 00167 */ 00168 virtual void handleLogCommitOnLeader(const protocol::dynamic_node_id::server::Entry& entry) 00169 { 00170 /* 00171 * Maybe this node did not request allocation at all, we don't care, we publish anyway. 00172 */ 00173 tryPublishAllocationResult(entry); 00174 } 00175 00176 virtual void handleLocalLeadershipChange(bool local_node_is_leader) 00177 { 00178 if (!local_node_is_leader) 00179 { 00180 return; 00181 } 00182 00183 const LazyConstructor<RaftCore::LogEntryInfo> result = 00184 raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID())); 00185 00186 if (!result.isConstructed()) 00187 { 00188 raft_core_.appendLog(getOwnUniqueID(), node_.getNodeID()); 00189 } 00190 } 00191 00192 /* 00193 * Private methods 00194 */ 00195 bool isNodeIDTaken(const NodeID node_id) const 00196 { 00197 UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", 00198 "Testing if node ID %d is taken", int(node_id.get())); 00199 return raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id)); 00200 } 00201 00202 void allocateNewNode(const UniqueID& unique_id, const NodeID preferred_node_id) 00203 { 00204 const NodeID allocated_node_id = 00205 NodeIDSelector<Server>(this, &Server::isNodeIDTaken).findFreeNodeID(preferred_node_id); 00206 if (!allocated_node_id.isUnicast()) 00207 { 00208 UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "Request ignored - no free node ID left"); 00209 return; 00210 } 00211 00212 UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "New node ID allocated: %d", 00213 int(allocated_node_id.get())); 00214 raft_core_.appendLog(unique_id, allocated_node_id); 00215 } 00216 00217 void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry& entry) 00218 { 00219 const int res = allocation_request_manager_.broadcastAllocationResponse(entry.unique_id, entry.node_id); 00220 if (res < 0) 00221 { 00222 tracer_.onEvent(TraceError, res); 00223 node_.registerInternalFailure("Dynamic allocation response"); 00224 } 00225 } 00226 00227 public: 00228 Server(INode& node, 00229 IStorageBackend& storage, 00230 IEventTracer& tracer) 00231 : AbstractServer(node, tracer) 00232 , raft_core_(node, storage, tracer, *this) 00233 { } 00234 00235 int init(const UniqueID& own_unique_id, 00236 const uint8_t cluster_size = ClusterManager::ClusterSizeUnknown, 00237 const TransferPriority priority = TransferPriority::OneHigherThanLowest) 00238 { 00239 /* 00240 * Initializing Raft core first, because the next step requires Log to be loaded 00241 */ 00242 int res = raft_core_.init(cluster_size, priority); 00243 if (res < 0) 00244 { 00245 return res; 00246 } 00247 00248 /* 00249 * Common logic 00250 */ 00251 res = AbstractServer::init(own_unique_id, priority); 00252 if (res < 0) 00253 { 00254 return res; 00255 } 00256 00257 /* 00258 * Making sure that the server is started with the same node ID 00259 */ 00260 const LazyConstructor<RaftCore::LogEntryInfo> own_log_entry = 00261 raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID())); 00262 00263 if (own_log_entry.isConstructed()) 00264 { 00265 if (own_log_entry->entry.unique_id != getOwnUniqueID()) 00266 { 00267 return -ErrInvalidConfiguration; 00268 } 00269 } 00270 00271 return 0; 00272 } 00273 00274 Log::Index getNumAllocations() const { return raft_core_.getNumAllocations(); } 00275 00276 /** 00277 * These accessors are needed for debugging, visualization and testing. 00278 */ 00279 const RaftCore& getRaftCore() const { return raft_core_; } 00280 }; 00281 00282 /** 00283 * This structure represents immediate state of the server. 00284 * It can be used for state visualization and debugging. 00285 */ 00286 struct StateReport 00287 { 00288 uint8_t cluster_size; 00289 00290 RaftCore::ServerState state; 00291 00292 Log::Index last_log_index; 00293 Log::Index commit_index; 00294 00295 Term last_log_term; 00296 Term current_term; 00297 00298 NodeID voted_for; 00299 00300 MonotonicTime last_activity_timestamp; 00301 MonotonicDuration randomized_timeout; 00302 00303 uint8_t num_unknown_nodes; 00304 00305 struct FollowerState 00306 { 00307 NodeID node_id; 00308 Log::Index next_index; 00309 Log::Index match_index; 00310 00311 FollowerState() 00312 : next_index(0) 00313 , match_index(0) 00314 { } 00315 } followers[ClusterManager::MaxClusterSize - 1]; 00316 00317 StateReport(const Server& s) 00318 : cluster_size (s.getRaftCore().getClusterManager().getClusterSize()) 00319 , state (s.getRaftCore().getServerState()) 00320 , last_log_index (s.getRaftCore().getPersistentState().getLog().getLastIndex()) 00321 , commit_index (s.getRaftCore().getCommitIndex()) 00322 , last_log_term (0) // See below 00323 , current_term (s.getRaftCore().getPersistentState().getCurrentTerm()) 00324 , voted_for (s.getRaftCore().getPersistentState().getVotedFor()) 00325 , last_activity_timestamp(s.getRaftCore().getLastActivityTimestamp()) 00326 , randomized_timeout (s.getRaftCore().getRandomizedTimeout()) 00327 , num_unknown_nodes (s.getNodeDiscoverer().getNumUnknownNodes()) 00328 { 00329 const Entry* const e = s.getRaftCore().getPersistentState().getLog().getEntryAtIndex(last_log_index); 00330 UAVCAN_ASSERT(e != UAVCAN_NULLPTR); 00331 if (e != UAVCAN_NULLPTR) 00332 { 00333 last_log_term = e->term; 00334 } 00335 00336 for (uint8_t i = 0; i < (cluster_size - 1U); i++) 00337 { 00338 const ClusterManager& mgr = s.getRaftCore().getClusterManager(); 00339 const NodeID node_id = mgr.getRemoteServerNodeIDAtIndex(i); 00340 if (node_id.isUnicast()) 00341 { 00342 followers[i].node_id = node_id; 00343 followers[i].next_index = mgr.getServerNextIndex(node_id); 00344 followers[i].match_index = mgr.getServerMatchIndex(node_id); 00345 } 00346 } 00347 } 00348 }; 00349 00350 } 00351 } 00352 } 00353 00354 #endif // Include guard
Generated on Tue Jul 12 2022 17:17:34 by 1.7.2