libuav original
Dependents: UAVCAN UAVCAN_Subscriber
raft_core.hpp
00001 /* 00002 * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com> 00003 */ 00004 00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED 00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED 00007 00008 #include <cstdlib> 00009 #include <uavcan/build_config.hpp> 00010 #include <uavcan/debug.hpp> 00011 #include <uavcan/util/method_binder.hpp> 00012 #include <uavcan/node/timer.hpp> 00013 #include <uavcan/node/service_server.hpp> 00014 #include <uavcan/node/service_client.hpp> 00015 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp> 00016 #include <uavcan/protocol/dynamic_node_id_server/distributed/persistent_state.hpp> 00017 #include <uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp> 00018 #include <uavcan/protocol/dynamic_node_id_server/event.hpp> 00019 // UAVCAN types 00020 #include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp> 00021 #include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp> 00022 00023 namespace uavcan 00024 { 00025 namespace dynamic_node_id_server 00026 { 00027 namespace distributed 00028 { 00029 /** 00030 * Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log. 00031 */ 00032 class IRaftLeaderMonitor 00033 { 00034 public: 00035 /** 00036 * This method will be invoked when a new log entry is committed (only if the local server is the current Leader). 00037 */ 00038 virtual void handleLogCommitOnLeader(const Entry& committed_entry) = 0; 00039 00040 /** 00041 * Invoked by the Raft core when the local node becomes a leader or ceases to be one. 00042 * By default the local node is not leader. 00043 * It is possible to commit to the log right from this method. 00044 */ 00045 virtual void handleLocalLeadershipChange(bool local_node_is_leader) = 0; 00046 00047 virtual ~IRaftLeaderMonitor() { } 00048 }; 00049 00050 /** 00051 * This class implements log replication and voting. 00052 * It does not implement client-server interaction at all; instead it just exposes a public method for adding 00053 * allocation entries. 00054 * 00055 * Note that this class uses std::rand(), so the RNG must be properly seeded by the application. 00056 * 00057 * Activity registration: 00058 * - persistent state update error 00059 * - switch to candidate (this defines timeout between reelections) 00060 * - newer term in response (also switch to follower) 00061 * - append entries request with term >= currentTerm 00062 * - vote granted 00063 */ 00064 class RaftCore : private TimerBase 00065 { 00066 public: 00067 enum ServerState 00068 { 00069 ServerStateFollower, 00070 ServerStateCandidate, 00071 ServerStateLeader 00072 }; 00073 00074 private: 00075 typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<AppendEntries::Request>&, 00076 ServiceResponseDataStructure<AppendEntries::Response>&)> 00077 AppendEntriesCallback; 00078 00079 typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<AppendEntries>&)> 00080 AppendEntriesResponseCallback; 00081 00082 typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<RequestVote::Request>&, 00083 ServiceResponseDataStructure<RequestVote::Response>&)> 00084 RequestVoteCallback; 00085 00086 typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<RequestVote>&)> 00087 RequestVoteResponseCallback; 00088 00089 struct PendingAppendEntriesFields 00090 { 00091 Log::Index prev_log_index; 00092 Log::Index num_entries; 00093 00094 PendingAppendEntriesFields() 00095 : prev_log_index(0) 00096 , num_entries(0) 00097 { } 00098 }; 00099 00100 /* 00101 * Constants 00102 */ 00103 enum { MaxNumFollowers = ClusterManager::MaxClusterSize - 1 }; 00104 00105 IEventTracer& tracer_; 00106 IRaftLeaderMonitor& leader_monitor_; 00107 00108 /* 00109 * States 00110 */ 00111 PersistentState persistent_state_; 00112 ClusterManager cluster_; 00113 Log::Index commit_index_; 00114 00115 MonotonicTime last_activity_timestamp_; 00116 MonotonicDuration randomized_activity_timeout_; 00117 ServerState server_state_; 00118 00119 uint8_t next_server_index_; ///< Next server to query AE from 00120 uint8_t num_votes_received_in_this_campaign_; 00121 00122 PendingAppendEntriesFields pending_append_entries_fields_; 00123 00124 /* 00125 * Transport 00126 */ 00127 ServiceServer<AppendEntries, AppendEntriesCallback> append_entries_srv_; 00128 ServiceClient<AppendEntries, AppendEntriesResponseCallback> append_entries_client_; 00129 ServiceServer<RequestVote, RequestVoteCallback> request_vote_srv_; 00130 ServiceClient<RequestVote, RequestVoteResponseCallback> request_vote_client_; 00131 00132 /* 00133 * Methods 00134 */ 00135 void trace(TraceCode event, int64_t argument) { tracer_.onEvent(event, argument); } 00136 00137 INode& getNode() { return append_entries_srv_.getNode(); } 00138 const INode& getNode() const { return append_entries_srv_.getNode(); } 00139 00140 void checkInvariants() const 00141 { 00142 // Commit index 00143 UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex()); 00144 00145 // Term 00146 UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex()) != 00147 UAVCAN_NULLPTR); 00148 UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex())->term <= 00149 persistent_state_.getCurrentTerm()); 00150 00151 // Elections 00152 UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !request_vote_client_.hasPendingCalls() || 00153 persistent_state_.getVotedFor() == getNode().getNodeID()); 00154 UAVCAN_ASSERT(num_votes_received_in_this_campaign_ <= cluster_.getClusterSize()); 00155 00156 // Transport 00157 UAVCAN_ASSERT(append_entries_client_.getNumPendingCalls() <= 1); 00158 UAVCAN_ASSERT(request_vote_client_.getNumPendingCalls() <= cluster_.getNumKnownServers()); 00159 UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !append_entries_client_.hasPendingCalls()); 00160 UAVCAN_ASSERT(server_state_ != ServerStateLeader || !request_vote_client_.hasPendingCalls()); 00161 UAVCAN_ASSERT(server_state_ != ServerStateFollower || 00162 (!append_entries_client_.hasPendingCalls() && !request_vote_client_.hasPendingCalls())); 00163 } 00164 00165 void registerActivity() 00166 { 00167 last_activity_timestamp_ = getNode().getMonotonicTime(); 00168 00169 static const int32_t randomization_range_msec = AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS - 00170 AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS; 00171 // coverity[dont_call] 00172 const int32_t random_msec = (std::rand() % randomization_range_msec) + 1; 00173 00174 randomized_activity_timeout_ = 00175 MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS + random_msec); 00176 00177 UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() > AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS); 00178 UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() <= AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS); 00179 } 00180 00181 bool isActivityTimedOut() const 00182 { 00183 return getNode().getMonotonicTime() > (last_activity_timestamp_ + randomized_activity_timeout_); 00184 } 00185 00186 void handlePersistentStateUpdateError(int error) 00187 { 00188 UAVCAN_ASSERT(error < 0); 00189 trace(TraceRaftPersistStateUpdateError, error); 00190 switchState(ServerStateFollower); 00191 registerActivity(); // Deferring reelections 00192 } 00193 00194 void updateFollower() 00195 { 00196 if (isActivityTimedOut()) 00197 { 00198 switchState(ServerStateCandidate); 00199 registerActivity(); 00200 } 00201 } 00202 00203 void updateCandidate() 00204 { 00205 if (num_votes_received_in_this_campaign_ > 0) 00206 { 00207 trace(TraceRaftElectionComplete, num_votes_received_in_this_campaign_); 00208 const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize(); 00209 00210 UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "Election complete, won: %d", int(won)); 00211 00212 switchState(won ? ServerStateLeader : ServerStateFollower); // Start over or become leader 00213 } 00214 else 00215 { 00216 // Set votedFor, abort on failure 00217 int res = persistent_state_.setVotedFor(getNode().getNodeID()); 00218 if (res < 0) 00219 { 00220 handlePersistentStateUpdateError(res); 00221 return; 00222 } 00223 00224 // Increment current term, abort on failure 00225 res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U); 00226 if (res < 0) 00227 { 00228 handlePersistentStateUpdateError(res); 00229 return; 00230 } 00231 00232 num_votes_received_in_this_campaign_ = 1; // Voting for self 00233 00234 RequestVote::Request req; 00235 req.last_log_index = persistent_state_.getLog().getLastIndex(); 00236 req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term; 00237 req.term = persistent_state_.getCurrentTerm(); 00238 00239 for (uint8_t i = 0; i < MaxNumFollowers; i++) 00240 { 00241 const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i); 00242 if (!node_id.isUnicast()) 00243 { 00244 break; 00245 } 00246 00247 UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", 00248 "Requesting vote from %d", int(node_id.get())); 00249 trace(TraceRaftVoteRequestInitiation, node_id.get()); 00250 00251 res = request_vote_client_.call(node_id, req); 00252 if (res < 0) 00253 { 00254 trace(TraceError, res); 00255 } 00256 } 00257 } 00258 } 00259 00260 void updateLeader() 00261 { 00262 if (append_entries_client_.hasPendingCalls()) 00263 { 00264 append_entries_client_.cancelAllCalls(); // Refer to the response callback to learn why 00265 } 00266 00267 if (cluster_.getClusterSize() > 1) 00268 { 00269 const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_); 00270 UAVCAN_ASSERT(node_id.isUnicast()); 00271 00272 next_server_index_++; 00273 if (next_server_index_ >= cluster_.getNumKnownServers()) 00274 { 00275 next_server_index_ = 0; 00276 } 00277 00278 AppendEntries::Request req; 00279 req.term = persistent_state_.getCurrentTerm(); 00280 req.leader_commit = commit_index_; 00281 00282 req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U); 00283 00284 const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index); 00285 if (entry == UAVCAN_NULLPTR) 00286 { 00287 UAVCAN_ASSERT(0); 00288 handlePersistentStateUpdateError(-ErrLogic); 00289 return; 00290 } 00291 00292 req.prev_log_term = entry->term; 00293 00294 for (Log::Index index = cluster_.getServerNextIndex(node_id); 00295 index <= persistent_state_.getLog().getLastIndex(); 00296 index++) 00297 { 00298 req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index)); 00299 if (req.entries.size() == req.entries.capacity()) 00300 { 00301 break; 00302 } 00303 } 00304 00305 pending_append_entries_fields_.num_entries = req.entries.size(); 00306 pending_append_entries_fields_.prev_log_index = req.prev_log_index; 00307 00308 const int res = append_entries_client_.call(node_id, req); 00309 if (res < 0) 00310 { 00311 trace(TraceRaftAppendEntriesCallFailure, res); 00312 } 00313 } 00314 00315 propagateCommitIndex(); 00316 } 00317 00318 void switchState(ServerState new_state) 00319 { 00320 if (server_state_ == new_state) 00321 { 00322 return; 00323 } 00324 00325 /* 00326 * Logging 00327 */ 00328 UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "State switch: %d --> %d", 00329 int(server_state_), int(new_state)); 00330 trace(TraceRaftStateSwitch, new_state); 00331 00332 /* 00333 * Updating the current state 00334 */ 00335 const ServerState old_state = server_state_; 00336 server_state_ = new_state; 00337 00338 /* 00339 * Resetting specific states 00340 */ 00341 cluster_.resetAllServerIndices(); 00342 00343 next_server_index_ = 0; 00344 num_votes_received_in_this_campaign_ = 0; 00345 00346 request_vote_client_.cancelAllCalls(); 00347 append_entries_client_.cancelAllCalls(); 00348 00349 /* 00350 * Calling the switch handler 00351 * Note that the handler may commit to the log directly 00352 */ 00353 if ((old_state == ServerStateLeader) || 00354 (new_state == ServerStateLeader)) 00355 { 00356 leader_monitor_.handleLocalLeadershipChange(new_state == ServerStateLeader); 00357 } 00358 } 00359 00360 void tryIncrementCurrentTermFromResponse(Term new_term) 00361 { 00362 trace(TraceRaftNewerTermInResponse, new_term); 00363 const int res = persistent_state_.setCurrentTerm(new_term); 00364 if (res < 0) 00365 { 00366 trace(TraceRaftPersistStateUpdateError, res); 00367 } 00368 registerActivity(); // Deferring future elections 00369 switchState(ServerStateFollower); 00370 } 00371 00372 void propagateCommitIndex() 00373 { 00374 // Objective is to estimate whether we can safely increment commit index value 00375 UAVCAN_ASSERT(server_state_ == ServerStateLeader); 00376 UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex()); 00377 00378 if (commit_index_ < persistent_state_.getLog().getLastIndex()) 00379 { 00380 /* 00381 * Not all local entries are committed. 00382 * Deciding if it is safe to increment commit index. 00383 */ 00384 uint8_t num_nodes_with_next_log_entry_available = 1; // Local node 00385 for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++) 00386 { 00387 const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i)); 00388 if (match_index > commit_index_) 00389 { 00390 num_nodes_with_next_log_entry_available++; 00391 } 00392 } 00393 00394 if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize()) 00395 { 00396 commit_index_++; 00397 UAVCAN_ASSERT(commit_index_ > 0); // Index 0 is always committed 00398 trace(TraceRaftNewEntryCommitted, commit_index_); 00399 00400 // AT THIS POINT ALLOCATION IS COMPLETE 00401 leader_monitor_.handleLogCommitOnLeader(*persistent_state_.getLog().getEntryAtIndex(commit_index_)); 00402 } 00403 } 00404 } 00405 00406 void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request, 00407 ServiceResponseDataStructure<AppendEntries::Response>& response) 00408 { 00409 checkInvariants(); 00410 00411 if (!cluster_.isKnownServer(request.getSrcNodeID())) 00412 { 00413 if (cluster_.isClusterDiscovered()) 00414 { 00415 trace(TraceRaftRequestIgnored, request.getSrcNodeID().get()); 00416 response.setResponseEnabled(false); 00417 return; 00418 } 00419 else 00420 { 00421 cluster_.addServer(request.getSrcNodeID()); 00422 } 00423 } 00424 00425 UAVCAN_ASSERT(response.isResponseEnabled()); // This is default 00426 00427 /* 00428 * Checking if our current state is up to date. 00429 * The request will be ignored if persistent state cannot be updated. 00430 */ 00431 if (request.term > persistent_state_.getCurrentTerm()) 00432 { 00433 int res = persistent_state_.setCurrentTerm(request.term); 00434 if (res < 0) 00435 { 00436 handlePersistentStateUpdateError(res); 00437 response.setResponseEnabled(false); 00438 return; 00439 } 00440 00441 res = persistent_state_.resetVotedFor(); 00442 if (res < 0) 00443 { 00444 handlePersistentStateUpdateError(res); 00445 response.setResponseEnabled(false); 00446 return; 00447 } 00448 } 00449 00450 /* 00451 * Preparing the response 00452 */ 00453 response.term = persistent_state_.getCurrentTerm(); 00454 response.success = false; 00455 00456 /* 00457 * Step 1 (see Raft paper) 00458 * Reject the request if the leader has stale term number. 00459 */ 00460 if (request.term < persistent_state_.getCurrentTerm()) 00461 { 00462 response.setResponseEnabled(true); 00463 return; 00464 } 00465 00466 registerActivity(); 00467 switchState(ServerStateFollower); 00468 00469 /* 00470 * Step 2 00471 * Reject the request if the assumed log index does not exist on the local node. 00472 */ 00473 const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index); 00474 if (prev_entry == UAVCAN_NULLPTR) 00475 { 00476 response.setResponseEnabled(true); 00477 return; 00478 } 00479 00480 /* 00481 * Step 3 00482 * Drop log entries if term number does not match. 00483 * Ignore the request if the persistent state cannot be updated. 00484 */ 00485 if (prev_entry->term != request.prev_log_term) 00486 { 00487 const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index); 00488 response.setResponseEnabled(res >= 0); 00489 if (res < 0) 00490 { 00491 trace(TraceRaftPersistStateUpdateError, res); 00492 } 00493 return; 00494 } 00495 00496 /* 00497 * Step 4 00498 * Update the log with new entries - this will possibly require to rewrite existing entries. 00499 * Ignore the request if the persistent state cannot be updated. 00500 */ 00501 if (request.prev_log_index != persistent_state_.getLog().getLastIndex()) 00502 { 00503 const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index); 00504 if (res < 0) 00505 { 00506 trace(TraceRaftPersistStateUpdateError, res); 00507 response.setResponseEnabled(false); 00508 return; 00509 } 00510 } 00511 00512 for (uint8_t i = 0; i < request.entries.size(); i++) 00513 { 00514 const int res = persistent_state_.getLog().append(request.entries[i]); 00515 if (res < 0) 00516 { 00517 trace(TraceRaftPersistStateUpdateError, res); 00518 response.setResponseEnabled(false); 00519 return; // Response will not be sent, the server will assume that we're dead 00520 } 00521 } 00522 00523 /* 00524 * Step 5 00525 * Update the commit index. 00526 */ 00527 if (request.leader_commit > commit_index_) 00528 { 00529 commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex()); 00530 trace(TraceRaftCommitIndexUpdate, commit_index_); 00531 } 00532 00533 response.setResponseEnabled(true); 00534 response.success = true; 00535 } 00536 00537 void handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result) 00538 { 00539 UAVCAN_ASSERT(server_state_ == ServerStateLeader); // When state switches, all requests must be cancelled 00540 checkInvariants(); 00541 00542 if (!result.isSuccessful()) 00543 { 00544 return; 00545 } 00546 00547 if (result.getResponse().term > persistent_state_.getCurrentTerm()) 00548 { 00549 tryIncrementCurrentTermFromResponse(result.getResponse().term); 00550 } 00551 else 00552 { 00553 if (result.getResponse().success) 00554 { 00555 cluster_.incrementServerNextIndexBy(result.getCallID().server_node_id, 00556 pending_append_entries_fields_.num_entries); 00557 cluster_.setServerMatchIndex(result.getCallID().server_node_id, 00558 Log::Index(pending_append_entries_fields_.prev_log_index + 00559 pending_append_entries_fields_.num_entries)); 00560 } 00561 else 00562 { 00563 cluster_.decrementServerNextIndex(result.getCallID().server_node_id); 00564 trace(TraceRaftAppendEntriesRespUnsucfl, result.getCallID().server_node_id.get()); 00565 } 00566 } 00567 00568 pending_append_entries_fields_ = PendingAppendEntriesFields(); 00569 // Rest of the logic is implemented in periodic update handlers. 00570 } 00571 00572 void handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request, 00573 ServiceResponseDataStructure<RequestVote::Response>& response) 00574 { 00575 checkInvariants(); 00576 trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get()); 00577 00578 if (!cluster_.isKnownServer(request.getSrcNodeID())) 00579 { 00580 trace(TraceRaftRequestIgnored, request.getSrcNodeID().get()); 00581 response.setResponseEnabled(false); 00582 return; 00583 } 00584 00585 UAVCAN_ASSERT(response.isResponseEnabled()); // This is default 00586 00587 /* 00588 * Checking if our current state is up to date. 00589 * The request will be ignored if persistent state cannot be updated. 00590 */ 00591 if (request.term > persistent_state_.getCurrentTerm()) 00592 { 00593 switchState(ServerStateFollower); // Our term is stale, so we can't serve as leader 00594 00595 int res = persistent_state_.setCurrentTerm(request.term); 00596 if (res < 0) 00597 { 00598 handlePersistentStateUpdateError(res); 00599 response.setResponseEnabled(false); 00600 return; 00601 } 00602 00603 res = persistent_state_.resetVotedFor(); 00604 if (res < 0) 00605 { 00606 handlePersistentStateUpdateError(res); 00607 response.setResponseEnabled(false); 00608 return; 00609 } 00610 } 00611 00612 /* 00613 * Preparing the response 00614 */ 00615 response.term = persistent_state_.getCurrentTerm(); 00616 00617 if (request.term < response.term) 00618 { 00619 response.vote_granted = false; 00620 } 00621 else 00622 { 00623 const bool can_vote = !persistent_state_.isVotedForSet() || 00624 (persistent_state_.getVotedFor() == request.getSrcNodeID()); 00625 const bool log_is_up_to_date = 00626 persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term); 00627 00628 response.vote_granted = can_vote && log_is_up_to_date; 00629 00630 if (response.vote_granted) 00631 { 00632 switchState(ServerStateFollower); // Avoiding race condition when Candidate 00633 registerActivity(); // This is necessary to avoid excessive elections 00634 00635 const int res = persistent_state_.setVotedFor(request.getSrcNodeID()); 00636 if (res < 0) 00637 { 00638 trace(TraceRaftPersistStateUpdateError, res); 00639 response.setResponseEnabled(false); 00640 return; 00641 } 00642 } 00643 } 00644 } 00645 00646 void handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result) 00647 { 00648 UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled 00649 checkInvariants(); 00650 00651 if (!result.isSuccessful()) 00652 { 00653 return; 00654 } 00655 00656 trace(TraceRaftVoteRequestSucceeded, result.getCallID().server_node_id.get()); 00657 00658 if (result.getResponse().term > persistent_state_.getCurrentTerm()) 00659 { 00660 tryIncrementCurrentTermFromResponse(result.getResponse().term); 00661 } 00662 else 00663 { 00664 if (result.getResponse().vote_granted) 00665 { 00666 num_votes_received_in_this_campaign_++; 00667 } 00668 } 00669 // Rest of the logic is implemented in periodic update handlers. 00670 // I'm no fan of asynchronous programming. At all. 00671 } 00672 00673 virtual void handleTimerEvent(const TimerEvent&) 00674 { 00675 checkInvariants(); 00676 00677 switch (server_state_) 00678 { 00679 case ServerStateFollower: 00680 { 00681 updateFollower(); 00682 break; 00683 } 00684 case ServerStateCandidate: 00685 { 00686 updateCandidate(); 00687 break; 00688 } 00689 case ServerStateLeader: 00690 { 00691 updateLeader(); 00692 break; 00693 } 00694 default: 00695 { 00696 UAVCAN_ASSERT(0); 00697 break; 00698 } 00699 } 00700 } 00701 00702 public: 00703 RaftCore(INode& node, 00704 IStorageBackend& storage, 00705 IEventTracer& tracer, 00706 IRaftLeaderMonitor& leader_monitor) 00707 : TimerBase(node) 00708 , tracer_(tracer) 00709 , leader_monitor_(leader_monitor) 00710 , persistent_state_(storage, tracer) 00711 , cluster_(node, storage, persistent_state_.getLog(), tracer) 00712 , commit_index_(0) // Per Raft paper, commitIndex must be initialized to zero 00713 , last_activity_timestamp_(node.getMonotonicTime()) 00714 , randomized_activity_timeout_( 00715 MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS)) 00716 , server_state_(ServerStateFollower) 00717 , next_server_index_(0) 00718 , num_votes_received_in_this_campaign_(0) 00719 , append_entries_srv_(node) 00720 , append_entries_client_(node) 00721 , request_vote_srv_(node) 00722 , request_vote_client_(node) 00723 { } 00724 00725 /** 00726 * Once started, the logic runs in the background until destructor is called. 00727 * @param cluster_size If set, this value will be used and stored in the persistent storage. If not set, 00728 * value from the persistent storage will be used. If not set and there's no such key 00729 * in the persistent storage, initialization will fail. 00730 */ 00731 int init(const uint8_t cluster_size, const TransferPriority priority) 00732 { 00733 /* 00734 * Initializing state variables 00735 */ 00736 server_state_ = ServerStateFollower; 00737 next_server_index_ = 0; 00738 num_votes_received_in_this_campaign_ = 0; 00739 commit_index_ = 0; 00740 00741 registerActivity(); 00742 00743 /* 00744 * Initializing internals 00745 */ 00746 int res = persistent_state_.init(); 00747 if (res < 0) 00748 { 00749 return res; 00750 } 00751 00752 res = cluster_.init(cluster_size, priority); 00753 if (res < 0) 00754 { 00755 return res; 00756 } 00757 00758 res = append_entries_srv_.start(AppendEntriesCallback(this, &RaftCore::handleAppendEntriesRequest)); 00759 if (res < 0) 00760 { 00761 return res; 00762 } 00763 00764 res = request_vote_srv_.start(RequestVoteCallback(this, &RaftCore::handleRequestVoteRequest)); 00765 if (res < 0) 00766 { 00767 return res; 00768 } 00769 00770 res = append_entries_client_.init(priority); 00771 if (res < 0) 00772 { 00773 return res; 00774 } 00775 append_entries_client_.setCallback(AppendEntriesResponseCallback(this, 00776 &RaftCore::handleAppendEntriesResponse)); 00777 00778 res = request_vote_client_.init(priority); 00779 if (res < 0) 00780 { 00781 return res; 00782 } 00783 request_vote_client_.setCallback(RequestVoteResponseCallback(this, &RaftCore::handleRequestVoteResponse)); 00784 00785 /* 00786 * Initializing timing constants 00787 * Refer to the specification for the formula 00788 */ 00789 const uint8_t num_followers = static_cast<uint8_t>(cluster_.getClusterSize() - 1); 00790 00791 const MonotonicDuration update_interval = 00792 MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS / 00793 2 / max(static_cast<uint8_t>(2), num_followers)); 00794 00795 UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", 00796 "Update interval: %ld msec", static_cast<long>(update_interval.toMSec())); 00797 00798 append_entries_client_.setRequestTimeout(min(append_entries_client_.getDefaultRequestTimeout(), 00799 update_interval)); 00800 00801 request_vote_client_.setRequestTimeout(min(request_vote_client_.getDefaultRequestTimeout(), 00802 update_interval)); 00803 00804 startPeriodic(update_interval); 00805 00806 trace(TraceRaftCoreInited, update_interval.toUSec()); 00807 00808 UAVCAN_ASSERT(res >= 0); 00809 return 0; 00810 } 00811 00812 /** 00813 * This function is mostly needed for testing. 00814 */ 00815 Log::Index getCommitIndex() const { return commit_index_; } 00816 00817 /** 00818 * This essentially indicates whether the server could replicate log since last allocation. 00819 */ 00820 bool areAllLogEntriesCommitted() const { return commit_index_ == persistent_state_.getLog().getLastIndex(); } 00821 00822 /** 00823 * Only the leader can call @ref appendLog(). 00824 */ 00825 bool isLeader() const { return server_state_ == ServerStateLeader; } 00826 00827 /** 00828 * Inserts one entry into log. 00829 * This method will trigger an assertion failure and return error if the current node is not the leader. 00830 * If operation fails, the node may give up its Leader status. 00831 */ 00832 void appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID node_id) 00833 { 00834 if (isLeader()) 00835 { 00836 Entry entry; 00837 entry.node_id = node_id.get(); 00838 entry.unique_id = unique_id; 00839 entry.term = persistent_state_.getCurrentTerm(); 00840 00841 trace(TraceRaftNewLogEntry, entry.node_id); 00842 const int res = persistent_state_.getLog().append(entry); 00843 if (res < 0) 00844 { 00845 handlePersistentStateUpdateError(res); 00846 } 00847 } 00848 else 00849 { 00850 UAVCAN_ASSERT(0); 00851 } 00852 } 00853 00854 /** 00855 * This class is used to perform log searches. 00856 */ 00857 struct LogEntryInfo 00858 { 00859 Entry entry; 00860 bool committed; 00861 00862 LogEntryInfo(const Entry& arg_entry, bool arg_committed) 00863 : entry(arg_entry) 00864 , committed(arg_committed) 00865 { } 00866 }; 00867 00868 /** 00869 * This method is used by the allocator to query existence of certain entries in the Raft log. 00870 * Predicate is a callable of the following prototype: 00871 * bool (const LogEntryInfo& entry) 00872 * Once the predicate returns true, the loop will be terminated and the method will return an initialized lazy 00873 * contructor with the last visited entry; otherwise the constructor will not be initialized. 00874 * In this case, lazy constructor is used as boost::optional. 00875 * The log is always traversed from HIGH to LOW index values, i.e. entry 0 will be traversed last. 00876 */ 00877 template <typename Predicate> 00878 inline LazyConstructor<LogEntryInfo> traverseLogFromEndUntil(const Predicate& predicate) const 00879 { 00880 UAVCAN_ASSERT(coerceOrFallback<bool>(predicate, true)); 00881 for (int index = static_cast<int>(persistent_state_.getLog().getLastIndex()); index >= 0; index--) 00882 { 00883 const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(Log::Index(index)); 00884 UAVCAN_ASSERT(entry != UAVCAN_NULLPTR); 00885 const LogEntryInfo info(*entry, Log::Index(index) <= commit_index_); 00886 if (predicate(info)) 00887 { 00888 LazyConstructor<LogEntryInfo> ret; 00889 ret.template construct<const LogEntryInfo&>(info); 00890 return ret; 00891 } 00892 } 00893 return LazyConstructor<LogEntryInfo>(); 00894 } 00895 00896 Log::Index getNumAllocations() const 00897 { 00898 // Remember that index zero contains a special-purpose entry that doesn't count as allocation 00899 return persistent_state_.getLog().getLastIndex(); 00900 } 00901 00902 /** 00903 * These accessors are needed for debugging, visualization and testing. 00904 */ 00905 const PersistentState& getPersistentState() const { return persistent_state_; } 00906 const ClusterManager& getClusterManager() const { return cluster_; } 00907 MonotonicTime getLastActivityTimestamp() const { return last_activity_timestamp_; } 00908 ServerState getServerState() const { return server_state_; } 00909 MonotonicDuration getUpdateInterval() const { return getPeriod(); } 00910 MonotonicDuration getRandomizedTimeout() const { return randomized_activity_timeout_; } 00911 }; 00912 00913 } 00914 } 00915 } 00916 00917 #endif // Include guard
Generated on Tue Jul 12 2022 17:17:33 by 1.7.2