libuav original

Dependents:   UAVCAN UAVCAN_Subscriber

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers raft_core.hpp Source File

raft_core.hpp

00001 /*
00002  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
00003  */
00004 
00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_RAFT_CORE_HPP_INCLUDED
00007 
00008 #include <cstdlib>
00009 #include <uavcan/build_config.hpp>
00010 #include <uavcan/debug.hpp>
00011 #include <uavcan/util/method_binder.hpp>
00012 #include <uavcan/node/timer.hpp>
00013 #include <uavcan/node/service_server.hpp>
00014 #include <uavcan/node/service_client.hpp>
00015 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
00016 #include <uavcan/protocol/dynamic_node_id_server/distributed/persistent_state.hpp>
00017 #include <uavcan/protocol/dynamic_node_id_server/distributed/cluster_manager.hpp>
00018 #include <uavcan/protocol/dynamic_node_id_server/event.hpp>
00019 // UAVCAN types
00020 #include <uavcan/protocol/dynamic_node_id/server/AppendEntries.hpp>
00021 #include <uavcan/protocol/dynamic_node_id/server/RequestVote.hpp>
00022 
00023 namespace uavcan
00024 {
00025 namespace dynamic_node_id_server
00026 {
00027 namespace distributed
00028 {
00029 /**
00030  * Allocator has to implement this interface so the RaftCore can inform it when a new entry gets committed to the log.
00031  */
00032 class IRaftLeaderMonitor
00033 {
00034 public:
00035     /**
00036      * This method will be invoked when a new log entry is committed (only if the local server is the current Leader).
00037      */
00038     virtual void handleLogCommitOnLeader(const Entry& committed_entry) = 0;
00039 
00040     /**
00041      * Invoked by the Raft core when the local node becomes a leader or ceases to be one.
00042      * By default the local node is not leader.
00043      * It is possible to commit to the log right from this method.
00044      */
00045     virtual void handleLocalLeadershipChange(bool local_node_is_leader) = 0;
00046 
00047     virtual ~IRaftLeaderMonitor() { }
00048 };
00049 
00050 /**
00051  * This class implements log replication and voting.
00052  * It does not implement client-server interaction at all; instead it just exposes a public method for adding
00053  * allocation entries.
00054  *
00055  * Note that this class uses std::rand(), so the RNG must be properly seeded by the application.
00056  *
00057  * Activity registration:
00058  *   - persistent state update error
00059  *   - switch to candidate (this defines timeout between reelections)
00060  *   - newer term in response (also switch to follower)
00061  *   - append entries request with term >= currentTerm
00062  *   - vote granted
00063  */
00064 class RaftCore : private TimerBase
00065 {
00066 public:
00067     enum ServerState
00068     {
00069         ServerStateFollower,
00070         ServerStateCandidate,
00071         ServerStateLeader
00072     };
00073 
00074 private:
00075     typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<AppendEntries::Request>&,
00076                                                        ServiceResponseDataStructure<AppendEntries::Response>&)>
00077         AppendEntriesCallback;
00078 
00079     typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<AppendEntries>&)>
00080         AppendEntriesResponseCallback;
00081 
00082     typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ReceivedDataStructure<RequestVote::Request>&,
00083                                                        ServiceResponseDataStructure<RequestVote::Response>&)>
00084         RequestVoteCallback;
00085 
00086     typedef MethodBinder<RaftCore*, void (RaftCore::*)(const ServiceCallResult<RequestVote>&)>
00087         RequestVoteResponseCallback;
00088 
00089     struct PendingAppendEntriesFields
00090     {
00091         Log::Index prev_log_index;
00092         Log::Index num_entries;
00093 
00094         PendingAppendEntriesFields()
00095             : prev_log_index(0)
00096             , num_entries(0)
00097         { }
00098     };
00099 
00100     /*
00101      * Constants
00102      */
00103     enum { MaxNumFollowers = ClusterManager::MaxClusterSize - 1 };
00104 
00105     IEventTracer& tracer_;
00106     IRaftLeaderMonitor& leader_monitor_;
00107 
00108     /*
00109      * States
00110      */
00111     PersistentState persistent_state_;
00112     ClusterManager cluster_;
00113     Log::Index commit_index_;
00114 
00115     MonotonicTime last_activity_timestamp_;
00116     MonotonicDuration randomized_activity_timeout_;
00117     ServerState server_state_;
00118 
00119     uint8_t next_server_index_;         ///< Next server to query AE from
00120     uint8_t num_votes_received_in_this_campaign_;
00121 
00122     PendingAppendEntriesFields pending_append_entries_fields_;
00123 
00124     /*
00125      * Transport
00126      */
00127     ServiceServer<AppendEntries, AppendEntriesCallback>         append_entries_srv_;
00128     ServiceClient<AppendEntries, AppendEntriesResponseCallback> append_entries_client_;
00129     ServiceServer<RequestVote, RequestVoteCallback>         request_vote_srv_;
00130     ServiceClient<RequestVote, RequestVoteResponseCallback> request_vote_client_;
00131 
00132     /*
00133      * Methods
00134      */
00135     void trace(TraceCode event, int64_t argument) { tracer_.onEvent(event, argument); }
00136 
00137     INode&       getNode()       { return append_entries_srv_.getNode(); }
00138     const INode& getNode() const { return append_entries_srv_.getNode(); }
00139 
00140     void checkInvariants() const
00141     {
00142         // Commit index
00143         UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
00144 
00145         // Term
00146         UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex()) !=
00147                       UAVCAN_NULLPTR);
00148         UAVCAN_ASSERT(persistent_state_.getLog().getEntryAtIndex(persistent_state_.getLog().getLastIndex())->term <=
00149                       persistent_state_.getCurrentTerm());
00150 
00151         // Elections
00152         UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !request_vote_client_.hasPendingCalls() ||
00153                       persistent_state_.getVotedFor() == getNode().getNodeID());
00154         UAVCAN_ASSERT(num_votes_received_in_this_campaign_ <= cluster_.getClusterSize());
00155 
00156         // Transport
00157         UAVCAN_ASSERT(append_entries_client_.getNumPendingCalls() <= 1);
00158         UAVCAN_ASSERT(request_vote_client_.getNumPendingCalls() <= cluster_.getNumKnownServers());
00159         UAVCAN_ASSERT(server_state_ != ServerStateCandidate || !append_entries_client_.hasPendingCalls());
00160         UAVCAN_ASSERT(server_state_ != ServerStateLeader    || !request_vote_client_.hasPendingCalls());
00161         UAVCAN_ASSERT(server_state_ != ServerStateFollower  ||
00162                       (!append_entries_client_.hasPendingCalls() && !request_vote_client_.hasPendingCalls()));
00163     }
00164 
00165     void registerActivity()
00166     {
00167         last_activity_timestamp_ = getNode().getMonotonicTime();
00168 
00169         static const int32_t randomization_range_msec = AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS -
00170                                                         AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS;
00171         // coverity[dont_call]
00172         const int32_t random_msec = (std::rand() % randomization_range_msec) + 1;
00173 
00174         randomized_activity_timeout_ =
00175             MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS + random_msec);
00176 
00177         UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() > AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS);
00178         UAVCAN_ASSERT(randomized_activity_timeout_.toMSec() <= AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS);
00179     }
00180 
00181     bool isActivityTimedOut() const
00182     {
00183         return getNode().getMonotonicTime() > (last_activity_timestamp_ + randomized_activity_timeout_);
00184     }
00185 
00186     void handlePersistentStateUpdateError(int error)
00187     {
00188         UAVCAN_ASSERT(error < 0);
00189         trace(TraceRaftPersistStateUpdateError, error);
00190         switchState(ServerStateFollower);
00191         registerActivity();                 // Deferring reelections
00192     }
00193 
00194     void updateFollower()
00195     {
00196         if (isActivityTimedOut())
00197         {
00198             switchState(ServerStateCandidate);
00199             registerActivity();
00200         }
00201     }
00202 
00203     void updateCandidate()
00204     {
00205         if (num_votes_received_in_this_campaign_ > 0)
00206         {
00207             trace(TraceRaftElectionComplete, num_votes_received_in_this_campaign_);
00208             const bool won = num_votes_received_in_this_campaign_ >= cluster_.getQuorumSize();
00209 
00210             UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "Election complete, won: %d", int(won));
00211 
00212             switchState(won ? ServerStateLeader : ServerStateFollower);       // Start over or become leader
00213         }
00214         else
00215         {
00216             // Set votedFor, abort on failure
00217             int res = persistent_state_.setVotedFor(getNode().getNodeID());
00218             if (res < 0)
00219             {
00220                 handlePersistentStateUpdateError(res);
00221                 return;
00222             }
00223 
00224             // Increment current term, abort on failure
00225             res = persistent_state_.setCurrentTerm(persistent_state_.getCurrentTerm() + 1U);
00226             if (res < 0)
00227             {
00228                 handlePersistentStateUpdateError(res);
00229                 return;
00230             }
00231 
00232             num_votes_received_in_this_campaign_ = 1;               // Voting for self
00233 
00234             RequestVote::Request req;
00235             req.last_log_index = persistent_state_.getLog().getLastIndex();
00236             req.last_log_term = persistent_state_.getLog().getEntryAtIndex(req.last_log_index)->term;
00237             req.term = persistent_state_.getCurrentTerm();
00238 
00239             for (uint8_t i = 0; i < MaxNumFollowers; i++)
00240             {
00241                 const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(i);
00242                 if (!node_id.isUnicast())
00243                 {
00244                     break;
00245                 }
00246 
00247                 UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore",
00248                              "Requesting vote from %d", int(node_id.get()));
00249                 trace(TraceRaftVoteRequestInitiation, node_id.get());
00250 
00251                 res = request_vote_client_.call(node_id, req);
00252                 if (res < 0)
00253                 {
00254                     trace(TraceError, res);
00255                 }
00256             }
00257         }
00258     }
00259 
00260     void updateLeader()
00261     {
00262         if (append_entries_client_.hasPendingCalls())
00263         {
00264             append_entries_client_.cancelAllCalls();    // Refer to the response callback to learn why
00265         }
00266 
00267         if (cluster_.getClusterSize() > 1)
00268         {
00269             const NodeID node_id = cluster_.getRemoteServerNodeIDAtIndex(next_server_index_);
00270             UAVCAN_ASSERT(node_id.isUnicast());
00271 
00272             next_server_index_++;
00273             if (next_server_index_ >= cluster_.getNumKnownServers())
00274             {
00275                 next_server_index_ = 0;
00276             }
00277 
00278             AppendEntries::Request req;
00279             req.term = persistent_state_.getCurrentTerm();
00280             req.leader_commit = commit_index_;
00281 
00282             req.prev_log_index = Log::Index(cluster_.getServerNextIndex(node_id) - 1U);
00283 
00284             const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(req.prev_log_index);
00285             if (entry == UAVCAN_NULLPTR)
00286             {
00287                 UAVCAN_ASSERT(0);
00288                 handlePersistentStateUpdateError(-ErrLogic);
00289                 return;
00290             }
00291 
00292             req.prev_log_term = entry->term;
00293 
00294             for (Log::Index index = cluster_.getServerNextIndex(node_id);
00295                  index <= persistent_state_.getLog().getLastIndex();
00296                  index++)
00297             {
00298                 req.entries.push_back(*persistent_state_.getLog().getEntryAtIndex(index));
00299                 if (req.entries.size() == req.entries.capacity())
00300                 {
00301                     break;
00302                 }
00303             }
00304 
00305             pending_append_entries_fields_.num_entries = req.entries.size();
00306             pending_append_entries_fields_.prev_log_index = req.prev_log_index;
00307 
00308             const int res = append_entries_client_.call(node_id, req);
00309             if (res < 0)
00310             {
00311                 trace(TraceRaftAppendEntriesCallFailure, res);
00312             }
00313         }
00314 
00315         propagateCommitIndex();
00316     }
00317 
00318     void switchState(ServerState new_state)
00319     {
00320         if (server_state_ == new_state)
00321         {
00322             return;
00323         }
00324 
00325         /*
00326          * Logging
00327          */
00328         UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore", "State switch: %d --> %d",
00329                      int(server_state_), int(new_state));
00330         trace(TraceRaftStateSwitch, new_state);
00331 
00332         /*
00333          * Updating the current state
00334          */
00335         const ServerState old_state = server_state_;
00336         server_state_ = new_state;
00337 
00338         /*
00339          * Resetting specific states
00340          */
00341         cluster_.resetAllServerIndices();
00342 
00343         next_server_index_ = 0;
00344         num_votes_received_in_this_campaign_ = 0;
00345 
00346         request_vote_client_.cancelAllCalls();
00347         append_entries_client_.cancelAllCalls();
00348 
00349         /*
00350          * Calling the switch handler
00351          * Note that the handler may commit to the log directly
00352          */
00353         if ((old_state == ServerStateLeader) ||
00354             (new_state == ServerStateLeader))
00355         {
00356             leader_monitor_.handleLocalLeadershipChange(new_state == ServerStateLeader);
00357         }
00358     }
00359 
00360     void tryIncrementCurrentTermFromResponse(Term new_term)
00361     {
00362         trace(TraceRaftNewerTermInResponse, new_term);
00363         const int res = persistent_state_.setCurrentTerm(new_term);
00364         if (res < 0)
00365         {
00366             trace(TraceRaftPersistStateUpdateError, res);
00367         }
00368         registerActivity();                             // Deferring future elections
00369         switchState(ServerStateFollower);
00370     }
00371 
00372     void propagateCommitIndex()
00373     {
00374         // Objective is to estimate whether we can safely increment commit index value
00375         UAVCAN_ASSERT(server_state_ == ServerStateLeader);
00376         UAVCAN_ASSERT(commit_index_ <= persistent_state_.getLog().getLastIndex());
00377 
00378         if (commit_index_ < persistent_state_.getLog().getLastIndex())
00379         {
00380             /*
00381              * Not all local entries are committed.
00382              * Deciding if it is safe to increment commit index.
00383              */
00384             uint8_t num_nodes_with_next_log_entry_available = 1; // Local node
00385             for (uint8_t i = 0; i < cluster_.getNumKnownServers(); i++)
00386             {
00387                 const Log::Index match_index = cluster_.getServerMatchIndex(cluster_.getRemoteServerNodeIDAtIndex(i));
00388                 if (match_index > commit_index_)
00389                 {
00390                     num_nodes_with_next_log_entry_available++;
00391                 }
00392             }
00393 
00394             if (num_nodes_with_next_log_entry_available >= cluster_.getQuorumSize())
00395             {
00396                 commit_index_++;
00397                 UAVCAN_ASSERT(commit_index_ > 0);   // Index 0 is always committed
00398                 trace(TraceRaftNewEntryCommitted, commit_index_);
00399 
00400                 // AT THIS POINT ALLOCATION IS COMPLETE
00401                 leader_monitor_.handleLogCommitOnLeader(*persistent_state_.getLog().getEntryAtIndex(commit_index_));
00402             }
00403         }
00404     }
00405 
00406     void handleAppendEntriesRequest(const ReceivedDataStructure<AppendEntries::Request>& request,
00407                                     ServiceResponseDataStructure<AppendEntries::Response>& response)
00408     {
00409         checkInvariants();
00410 
00411         if (!cluster_.isKnownServer(request.getSrcNodeID()))
00412         {
00413             if (cluster_.isClusterDiscovered())
00414             {
00415                 trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
00416                 response.setResponseEnabled(false);
00417                 return;
00418             }
00419             else
00420             {
00421                 cluster_.addServer(request.getSrcNodeID());
00422             }
00423         }
00424 
00425         UAVCAN_ASSERT(response.isResponseEnabled());  // This is default
00426 
00427         /*
00428          * Checking if our current state is up to date.
00429          * The request will be ignored if persistent state cannot be updated.
00430          */
00431         if (request.term > persistent_state_.getCurrentTerm())
00432         {
00433             int res = persistent_state_.setCurrentTerm(request.term);
00434             if (res < 0)
00435             {
00436                 handlePersistentStateUpdateError(res);
00437                 response.setResponseEnabled(false);
00438                 return;
00439             }
00440 
00441             res = persistent_state_.resetVotedFor();
00442             if (res < 0)
00443             {
00444                 handlePersistentStateUpdateError(res);
00445                 response.setResponseEnabled(false);
00446                 return;
00447             }
00448         }
00449 
00450         /*
00451          * Preparing the response
00452          */
00453         response.term = persistent_state_.getCurrentTerm();
00454         response.success = false;
00455 
00456         /*
00457          * Step 1 (see Raft paper)
00458          * Reject the request if the leader has stale term number.
00459          */
00460         if (request.term < persistent_state_.getCurrentTerm())
00461         {
00462             response.setResponseEnabled(true);
00463             return;
00464         }
00465 
00466         registerActivity();
00467         switchState(ServerStateFollower);
00468 
00469         /*
00470          * Step 2
00471          * Reject the request if the assumed log index does not exist on the local node.
00472          */
00473         const Entry* const prev_entry = persistent_state_.getLog().getEntryAtIndex(request.prev_log_index);
00474         if (prev_entry == UAVCAN_NULLPTR)
00475         {
00476             response.setResponseEnabled(true);
00477             return;
00478         }
00479 
00480         /*
00481          * Step 3
00482          * Drop log entries if term number does not match.
00483          * Ignore the request if the persistent state cannot be updated.
00484          */
00485         if (prev_entry->term != request.prev_log_term)
00486         {
00487             const int res = persistent_state_.getLog().removeEntriesWhereIndexGreaterOrEqual(request.prev_log_index);
00488             response.setResponseEnabled(res >= 0);
00489             if (res < 0)
00490             {
00491                 trace(TraceRaftPersistStateUpdateError, res);
00492             }
00493             return;
00494         }
00495 
00496         /*
00497          * Step 4
00498          * Update the log with new entries - this will possibly require to rewrite existing entries.
00499          * Ignore the request if the persistent state cannot be updated.
00500          */
00501         if (request.prev_log_index != persistent_state_.getLog().getLastIndex())
00502         {
00503             const int res = persistent_state_.getLog().removeEntriesWhereIndexGreater(request.prev_log_index);
00504             if (res < 0)
00505             {
00506                 trace(TraceRaftPersistStateUpdateError, res);
00507                 response.setResponseEnabled(false);
00508                 return;
00509             }
00510         }
00511 
00512         for (uint8_t i = 0; i < request.entries.size(); i++)
00513         {
00514             const int res = persistent_state_.getLog().append(request.entries[i]);
00515             if (res < 0)
00516             {
00517                 trace(TraceRaftPersistStateUpdateError, res);
00518                 response.setResponseEnabled(false);
00519                 return;                     // Response will not be sent, the server will assume that we're dead
00520             }
00521         }
00522 
00523         /*
00524          * Step 5
00525          * Update the commit index.
00526          */
00527         if (request.leader_commit > commit_index_)
00528         {
00529             commit_index_ = min(request.leader_commit, persistent_state_.getLog().getLastIndex());
00530             trace(TraceRaftCommitIndexUpdate, commit_index_);
00531         }
00532 
00533         response.setResponseEnabled(true);
00534         response.success = true;
00535     }
00536 
00537     void handleAppendEntriesResponse(const ServiceCallResult<AppendEntries>& result)
00538     {
00539         UAVCAN_ASSERT(server_state_ == ServerStateLeader);  // When state switches, all requests must be cancelled
00540         checkInvariants();
00541 
00542         if (!result.isSuccessful())
00543         {
00544             return;
00545         }
00546 
00547         if (result.getResponse().term > persistent_state_.getCurrentTerm())
00548         {
00549             tryIncrementCurrentTermFromResponse(result.getResponse().term);
00550         }
00551         else
00552         {
00553             if (result.getResponse().success)
00554             {
00555                 cluster_.incrementServerNextIndexBy(result.getCallID().server_node_id,
00556                                                     pending_append_entries_fields_.num_entries);
00557                 cluster_.setServerMatchIndex(result.getCallID().server_node_id,
00558                                              Log::Index(pending_append_entries_fields_.prev_log_index +
00559                                                         pending_append_entries_fields_.num_entries));
00560             }
00561             else
00562             {
00563                 cluster_.decrementServerNextIndex(result.getCallID().server_node_id);
00564                 trace(TraceRaftAppendEntriesRespUnsucfl, result.getCallID().server_node_id.get());
00565             }
00566         }
00567 
00568         pending_append_entries_fields_ = PendingAppendEntriesFields();
00569         // Rest of the logic is implemented in periodic update handlers.
00570     }
00571 
00572     void handleRequestVoteRequest(const ReceivedDataStructure<RequestVote::Request>& request,
00573                                   ServiceResponseDataStructure<RequestVote::Response>& response)
00574     {
00575         checkInvariants();
00576         trace(TraceRaftVoteRequestReceived, request.getSrcNodeID().get());
00577 
00578         if (!cluster_.isKnownServer(request.getSrcNodeID()))
00579         {
00580             trace(TraceRaftRequestIgnored, request.getSrcNodeID().get());
00581             response.setResponseEnabled(false);
00582             return;
00583         }
00584 
00585         UAVCAN_ASSERT(response.isResponseEnabled());  // This is default
00586 
00587         /*
00588          * Checking if our current state is up to date.
00589          * The request will be ignored if persistent state cannot be updated.
00590          */
00591         if (request.term > persistent_state_.getCurrentTerm())
00592         {
00593             switchState(ServerStateFollower);   // Our term is stale, so we can't serve as leader
00594 
00595             int res = persistent_state_.setCurrentTerm(request.term);
00596             if (res < 0)
00597             {
00598                 handlePersistentStateUpdateError(res);
00599                 response.setResponseEnabled(false);
00600                 return;
00601             }
00602 
00603             res = persistent_state_.resetVotedFor();
00604             if (res < 0)
00605             {
00606                 handlePersistentStateUpdateError(res);
00607                 response.setResponseEnabled(false);
00608                 return;
00609             }
00610         }
00611 
00612         /*
00613          * Preparing the response
00614          */
00615         response.term = persistent_state_.getCurrentTerm();
00616 
00617         if (request.term < response.term)
00618         {
00619             response.vote_granted = false;
00620         }
00621         else
00622         {
00623             const bool can_vote = !persistent_state_.isVotedForSet() ||
00624                                   (persistent_state_.getVotedFor() == request.getSrcNodeID());
00625             const bool log_is_up_to_date =
00626                 persistent_state_.getLog().isOtherLogUpToDate(request.last_log_index, request.last_log_term);
00627 
00628             response.vote_granted = can_vote && log_is_up_to_date;
00629 
00630             if (response.vote_granted)
00631             {
00632                 switchState(ServerStateFollower);   // Avoiding race condition when Candidate
00633                 registerActivity();                 // This is necessary to avoid excessive elections
00634 
00635                 const int res = persistent_state_.setVotedFor(request.getSrcNodeID());
00636                 if (res < 0)
00637                 {
00638                     trace(TraceRaftPersistStateUpdateError, res);
00639                     response.setResponseEnabled(false);
00640                     return;
00641                 }
00642             }
00643         }
00644     }
00645 
00646     void handleRequestVoteResponse(const ServiceCallResult<RequestVote>& result)
00647     {
00648         UAVCAN_ASSERT(server_state_ == ServerStateCandidate); // When state switches, all requests must be cancelled
00649         checkInvariants();
00650 
00651         if (!result.isSuccessful())
00652         {
00653             return;
00654         }
00655 
00656         trace(TraceRaftVoteRequestSucceeded, result.getCallID().server_node_id.get());
00657 
00658         if (result.getResponse().term > persistent_state_.getCurrentTerm())
00659         {
00660             tryIncrementCurrentTermFromResponse(result.getResponse().term);
00661         }
00662         else
00663         {
00664             if (result.getResponse().vote_granted)
00665             {
00666                 num_votes_received_in_this_campaign_++;
00667             }
00668         }
00669         // Rest of the logic is implemented in periodic update handlers.
00670         // I'm no fan of asynchronous programming. At all.
00671     }
00672 
00673     virtual void handleTimerEvent(const TimerEvent&)
00674     {
00675         checkInvariants();
00676 
00677         switch (server_state_)
00678         {
00679         case ServerStateFollower:
00680         {
00681             updateFollower();
00682             break;
00683         }
00684         case ServerStateCandidate:
00685         {
00686             updateCandidate();
00687             break;
00688         }
00689         case ServerStateLeader:
00690         {
00691             updateLeader();
00692             break;
00693         }
00694         default:
00695         {
00696             UAVCAN_ASSERT(0);
00697             break;
00698         }
00699         }
00700     }
00701 
00702 public:
00703     RaftCore(INode& node,
00704              IStorageBackend& storage,
00705              IEventTracer& tracer,
00706              IRaftLeaderMonitor& leader_monitor)
00707         : TimerBase(node)
00708         , tracer_(tracer)
00709         , leader_monitor_(leader_monitor)
00710         , persistent_state_(storage, tracer)
00711         , cluster_(node, storage, persistent_state_.getLog(), tracer)
00712         , commit_index_(0)                                  // Per Raft paper, commitIndex must be initialized to zero
00713         , last_activity_timestamp_(node.getMonotonicTime())
00714         , randomized_activity_timeout_(
00715             MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MAX_ELECTION_TIMEOUT_MS))
00716         , server_state_(ServerStateFollower)
00717         , next_server_index_(0)
00718         , num_votes_received_in_this_campaign_(0)
00719         , append_entries_srv_(node)
00720         , append_entries_client_(node)
00721         , request_vote_srv_(node)
00722         , request_vote_client_(node)
00723     { }
00724 
00725     /**
00726      * Once started, the logic runs in the background until destructor is called.
00727      * @param cluster_size      If set, this value will be used and stored in the persistent storage. If not set,
00728      *                          value from the persistent storage will be used. If not set and there's no such key
00729      *                          in the persistent storage, initialization will fail.
00730      */
00731     int init(const uint8_t cluster_size, const TransferPriority priority)
00732     {
00733         /*
00734          * Initializing state variables
00735          */
00736         server_state_ = ServerStateFollower;
00737         next_server_index_ = 0;
00738         num_votes_received_in_this_campaign_ = 0;
00739         commit_index_ = 0;
00740 
00741         registerActivity();
00742 
00743         /*
00744          * Initializing internals
00745          */
00746         int res = persistent_state_.init();
00747         if (res < 0)
00748         {
00749             return res;
00750         }
00751 
00752         res = cluster_.init(cluster_size, priority);
00753         if (res < 0)
00754         {
00755             return res;
00756         }
00757 
00758         res = append_entries_srv_.start(AppendEntriesCallback(this, &RaftCore::handleAppendEntriesRequest));
00759         if (res < 0)
00760         {
00761             return res;
00762         }
00763 
00764         res = request_vote_srv_.start(RequestVoteCallback(this, &RaftCore::handleRequestVoteRequest));
00765         if (res < 0)
00766         {
00767             return res;
00768         }
00769 
00770         res = append_entries_client_.init(priority);
00771         if (res < 0)
00772         {
00773             return res;
00774         }
00775         append_entries_client_.setCallback(AppendEntriesResponseCallback(this,
00776                                                                          &RaftCore::handleAppendEntriesResponse));
00777 
00778         res = request_vote_client_.init(priority);
00779         if (res < 0)
00780         {
00781             return res;
00782         }
00783         request_vote_client_.setCallback(RequestVoteResponseCallback(this, &RaftCore::handleRequestVoteResponse));
00784 
00785         /*
00786          * Initializing timing constants
00787          * Refer to the specification for the formula
00788          */
00789         const uint8_t num_followers = static_cast<uint8_t>(cluster_.getClusterSize() - 1);
00790 
00791         const MonotonicDuration update_interval =
00792             MonotonicDuration::fromMSec(AppendEntries::Request::DEFAULT_MIN_ELECTION_TIMEOUT_MS /
00793                                         2 / max(static_cast<uint8_t>(2), num_followers));
00794 
00795         UAVCAN_TRACE("dynamic_node_id_server::distributed::RaftCore",
00796                      "Update interval: %ld msec", static_cast<long>(update_interval.toMSec()));
00797 
00798         append_entries_client_.setRequestTimeout(min(append_entries_client_.getDefaultRequestTimeout(),
00799                                                      update_interval));
00800 
00801         request_vote_client_.setRequestTimeout(min(request_vote_client_.getDefaultRequestTimeout(),
00802                                                    update_interval));
00803 
00804         startPeriodic(update_interval);
00805 
00806         trace(TraceRaftCoreInited, update_interval.toUSec());
00807 
00808         UAVCAN_ASSERT(res >= 0);
00809         return 0;
00810     }
00811 
00812     /**
00813      * This function is mostly needed for testing.
00814      */
00815     Log::Index getCommitIndex() const { return commit_index_; }
00816 
00817     /**
00818      * This essentially indicates whether the server could replicate log since last allocation.
00819      */
00820     bool areAllLogEntriesCommitted() const { return commit_index_ == persistent_state_.getLog().getLastIndex(); }
00821 
00822     /**
00823      * Only the leader can call @ref appendLog().
00824      */
00825     bool isLeader() const { return server_state_ == ServerStateLeader; }
00826 
00827     /**
00828      * Inserts one entry into log.
00829      * This method will trigger an assertion failure and return error if the current node is not the leader.
00830      * If operation fails, the node may give up its Leader status.
00831      */
00832     void appendLog(const Entry::FieldTypes::unique_id& unique_id, NodeID node_id)
00833     {
00834         if (isLeader())
00835         {
00836             Entry entry;
00837             entry.node_id = node_id.get();
00838             entry.unique_id = unique_id;
00839             entry.term = persistent_state_.getCurrentTerm();
00840 
00841             trace(TraceRaftNewLogEntry, entry.node_id);
00842             const int res = persistent_state_.getLog().append(entry);
00843             if (res < 0)
00844             {
00845                 handlePersistentStateUpdateError(res);
00846             }
00847         }
00848         else
00849         {
00850             UAVCAN_ASSERT(0);
00851         }
00852     }
00853 
00854     /**
00855      * This class is used to perform log searches.
00856      */
00857     struct LogEntryInfo
00858     {
00859         Entry entry;
00860         bool committed;
00861 
00862         LogEntryInfo(const Entry& arg_entry, bool arg_committed)
00863             : entry(arg_entry)
00864             , committed(arg_committed)
00865         { }
00866     };
00867 
00868     /**
00869      * This method is used by the allocator to query existence of certain entries in the Raft log.
00870      * Predicate is a callable of the following prototype:
00871      *  bool (const LogEntryInfo& entry)
00872      * Once the predicate returns true, the loop will be terminated and the method will return an initialized lazy
00873      * contructor with the last visited entry; otherwise the constructor will not be initialized.
00874      * In this case, lazy constructor is used as boost::optional.
00875      * The log is always traversed from HIGH to LOW index values, i.e. entry 0 will be traversed last.
00876      */
00877     template <typename Predicate>
00878     inline LazyConstructor<LogEntryInfo> traverseLogFromEndUntil(const Predicate& predicate) const
00879     {
00880         UAVCAN_ASSERT(coerceOrFallback<bool>(predicate, true));
00881         for (int index = static_cast<int>(persistent_state_.getLog().getLastIndex()); index >= 0; index--)
00882         {
00883             const Entry* const entry = persistent_state_.getLog().getEntryAtIndex(Log::Index(index));
00884             UAVCAN_ASSERT(entry != UAVCAN_NULLPTR);
00885             const LogEntryInfo info(*entry, Log::Index(index) <= commit_index_);
00886             if (predicate(info))
00887             {
00888                 LazyConstructor<LogEntryInfo> ret;
00889                 ret.template construct<const LogEntryInfo&>(info);
00890                 return ret;
00891             }
00892         }
00893         return LazyConstructor<LogEntryInfo>();
00894     }
00895 
00896     Log::Index getNumAllocations() const
00897     {
00898         // Remember that index zero contains a special-purpose entry that doesn't count as allocation
00899         return persistent_state_.getLog().getLastIndex();
00900     }
00901 
00902     /**
00903      * These accessors are needed for debugging, visualization and testing.
00904      */
00905     const PersistentState& getPersistentState() const { return persistent_state_; }
00906     const ClusterManager& getClusterManager()   const { return cluster_; }
00907     MonotonicTime getLastActivityTimestamp()    const { return last_activity_timestamp_; }
00908     ServerState getServerState()                const { return server_state_; }
00909     MonotonicDuration getUpdateInterval()       const { return getPeriod(); }
00910     MonotonicDuration getRandomizedTimeout()    const { return randomized_activity_timeout_; }
00911 };
00912 
00913 }
00914 }
00915 }
00916 
00917 #endif // Include guard