libuav original

Dependents:   UAVCAN UAVCAN_Subscriber

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers server.hpp Source File

server.hpp

00001 /*
00002  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
00003  */
00004 
00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_SERVER_HPP_INCLUDED
00007 
00008 #include <uavcan/build_config.hpp>
00009 #include <uavcan/debug.hpp>
00010 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
00011 #include <uavcan/protocol/dynamic_node_id_server/distributed/raft_core.hpp>
00012 #include <uavcan/protocol/dynamic_node_id_server/abstract_server.hpp>
00013 #include <uavcan/protocol/dynamic_node_id_server/node_id_selector.hpp>
00014 #include <uavcan/protocol/dynamic_node_id_server/event.hpp>
00015 
00016 namespace uavcan
00017 {
00018 namespace dynamic_node_id_server
00019 {
00020 namespace distributed
00021 {
00022 /**
00023  * This class implements the top-level allocation logic and server API.
00024  */
00025 class UAVCAN_EXPORT Server : public AbstractServer
00026                            , IRaftLeaderMonitor
00027 {
00028     struct UniqueIDLogPredicate
00029     {
00030         const UniqueID unique_id;
00031 
00032         UniqueIDLogPredicate(const UniqueID& uid)
00033             : unique_id(uid)
00034         { }
00035 
00036         bool operator()(const RaftCore::LogEntryInfo& info) const
00037         {
00038             return info.entry.unique_id == unique_id;
00039         }
00040     };
00041 
00042     struct NodeIDLogPredicate
00043     {
00044         const NodeID node_id;
00045 
00046         NodeIDLogPredicate(const NodeID& nid)
00047             : node_id(nid)
00048         { }
00049 
00050         bool operator()(const RaftCore::LogEntryInfo& info) const
00051         {
00052             return info.entry.node_id == node_id.get();
00053         }
00054     };
00055 
00056     /*
00057      * States
00058      */
00059     RaftCore raft_core_;
00060 
00061     /*
00062      * Methods of IAllocationRequestHandler
00063      */
00064     virtual bool canPublishFollowupAllocationResponse() const
00065     {
00066         /*
00067          * The server is allowed to publish follow-up allocation responses only if both conditions are met:
00068          * - The server is leader.
00069          * - The last allocation request has been completed successfully.
00070          *
00071          * Why second condition? Imagine a case when there's two Raft nodes that don't hear each other - A and B,
00072          * both of them are leaders (but only A can commit to the log, B is in a minor partition); then there's a
00073          * client X that can exchange with both leaders, and a client Y that can exchange only with A. Such a
00074          * situation can occur in case of a very unlikely failure of redundant interfaces.
00075          *
00076          * Both clients X and Y initially send a first-stage Allocation request; A responds to Y with a first-stage
00077          * response, whereas B responds to X. Both X and Y will issue a follow-up second-stage requests, which may
00078          * cause A to mix second-stage Allocation requests from different nodes, leading to reception of an invalid
00079          * unique ID. When both leaders receive full unique IDs (A will receive an invalid one, B will receive a valid
00080          * unique ID of X), only A will be able to make a commit, because B is in a minority. Since both clients were
00081          * unable to receive node ID values in this round, they will try again later.
00082          *
00083          * Now, in order to prevent B from disrupting client-server communication second time around, we introduce this
00084          * second restriction: the server cannot exchange with clients as long as its log contains uncommitted entries.
00085          *
00086          * Note that this restriction does not apply to allocation requests sent via CAN FD frames, as in this case
00087          * no follow-up responses are necessary. So only CAN FD can offer reliable Allocation exchange.
00088          */
00089         return raft_core_.isLeader() && raft_core_.areAllLogEntriesCommitted();
00090     }
00091 
00092     virtual void handleAllocationRequest(const UniqueID& unique_id, const NodeID preferred_node_id)
00093     {
00094         /*
00095          * Note that it is possible that the local node is not leader. We will still perform the log search
00096          * and try to find the node that requested allocation. If the node is found, response will be sent;
00097          * otherwise the request will be ignored because only leader can add new allocations.
00098          */
00099         const LazyConstructor<RaftCore::LogEntryInfo> result =
00100             raft_core_.traverseLogFromEndUntil(UniqueIDLogPredicate(unique_id));
00101 
00102          if (result.isConstructed())
00103          {
00104              if (result->committed)
00105              {
00106                  tryPublishAllocationResult(result->entry);
00107                  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
00108                               "Allocation request served with existing allocation; node ID %d",
00109                               int(result->entry.node_id));
00110              }
00111              else
00112              {
00113                  UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
00114                               "Allocation request ignored - allocation exists but not committed yet; node ID %d",
00115                               int(result->entry.node_id));
00116              }
00117          }
00118          else
00119          {
00120              if (raft_core_.isLeader() && !node_discoverer_.hasUnknownNodes())
00121              {
00122                  allocateNewNode(unique_id, preferred_node_id);
00123              }
00124          }
00125     }
00126 
00127     /*
00128      * Methods of INodeDiscoveryHandler
00129      */
00130     virtual bool canDiscoverNewNodes() const
00131     {
00132         return raft_core_.isLeader();
00133     }
00134 
00135     virtual NodeAwareness checkNodeAwareness(NodeID node_id) const
00136     {
00137         const LazyConstructor<RaftCore::LogEntryInfo> result =
00138             raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id));
00139         if (result.isConstructed())
00140         {
00141             return result->committed ? NodeAwarenessKnownAndCommitted : NodeAwarenessKnownButNotCommitted;
00142         }
00143         else
00144         {
00145             return NodeAwarenessUnknown;
00146         }
00147     }
00148 
00149     virtual void handleNewNodeDiscovery(const UniqueID* unique_id_or_null, NodeID node_id)
00150     {
00151         if (raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id)).isConstructed())
00152         {
00153             UAVCAN_ASSERT(0);   // Such node is already known, the class that called this method should have known that
00154             return;
00155         }
00156 
00157         const UniqueID uid = (unique_id_or_null == UAVCAN_NULLPTR) ? UniqueID() : *unique_id_or_null;
00158 
00159         if (raft_core_.isLeader())
00160         {
00161             raft_core_.appendLog(uid, node_id);
00162         }
00163     }
00164 
00165     /*
00166      * Methods of IRaftLeaderMonitor
00167      */
00168     virtual void handleLogCommitOnLeader(const protocol::dynamic_node_id::server::Entry& entry)
00169     {
00170         /*
00171          * Maybe this node did not request allocation at all, we don't care, we publish anyway.
00172          */
00173         tryPublishAllocationResult(entry);
00174     }
00175 
00176     virtual void handleLocalLeadershipChange(bool local_node_is_leader)
00177     {
00178         if (!local_node_is_leader)
00179         {
00180             return;
00181         }
00182 
00183         const LazyConstructor<RaftCore::LogEntryInfo> result =
00184             raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
00185 
00186         if (!result.isConstructed())
00187         {
00188             raft_core_.appendLog(getOwnUniqueID(), node_.getNodeID());
00189         }
00190     }
00191 
00192     /*
00193      * Private methods
00194      */
00195     bool isNodeIDTaken(const NodeID node_id) const
00196     {
00197         UAVCAN_TRACE("dynamic_node_id_server::distributed::Server",
00198                      "Testing if node ID %d is taken", int(node_id.get()));
00199         return raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_id));
00200     }
00201 
00202     void allocateNewNode(const UniqueID& unique_id, const NodeID preferred_node_id)
00203     {
00204         const NodeID allocated_node_id =
00205             NodeIDSelector<Server>(this, &Server::isNodeIDTaken).findFreeNodeID(preferred_node_id);
00206         if (!allocated_node_id.isUnicast())
00207         {
00208             UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "Request ignored - no free node ID left");
00209             return;
00210         }
00211 
00212         UAVCAN_TRACE("dynamic_node_id_server::distributed::Server", "New node ID allocated: %d",
00213                      int(allocated_node_id.get()));
00214         raft_core_.appendLog(unique_id, allocated_node_id);
00215     }
00216 
00217     void tryPublishAllocationResult(const protocol::dynamic_node_id::server::Entry& entry)
00218     {
00219         const int res = allocation_request_manager_.broadcastAllocationResponse(entry.unique_id, entry.node_id);
00220         if (res < 0)
00221         {
00222             tracer_.onEvent(TraceError, res);
00223             node_.registerInternalFailure("Dynamic allocation response");
00224         }
00225     }
00226 
00227 public:
00228     Server(INode& node,
00229            IStorageBackend& storage,
00230            IEventTracer& tracer)
00231         : AbstractServer(node, tracer)
00232         , raft_core_(node, storage, tracer, *this)
00233     { }
00234 
00235     int init(const UniqueID& own_unique_id,
00236              const uint8_t cluster_size = ClusterManager::ClusterSizeUnknown,
00237              const TransferPriority priority = TransferPriority::OneHigherThanLowest)
00238     {
00239         /*
00240          * Initializing Raft core first, because the next step requires Log to be loaded
00241          */
00242         int res = raft_core_.init(cluster_size, priority);
00243         if (res < 0)
00244         {
00245             return res;
00246         }
00247 
00248         /*
00249          * Common logic
00250          */
00251         res = AbstractServer::init(own_unique_id, priority);
00252         if (res < 0)
00253         {
00254             return res;
00255         }
00256 
00257         /*
00258          * Making sure that the server is started with the same node ID
00259          */
00260         const LazyConstructor<RaftCore::LogEntryInfo> own_log_entry =
00261             raft_core_.traverseLogFromEndUntil(NodeIDLogPredicate(node_.getNodeID()));
00262 
00263         if (own_log_entry.isConstructed())
00264         {
00265             if (own_log_entry->entry.unique_id != getOwnUniqueID())
00266             {
00267                 return -ErrInvalidConfiguration;
00268             }
00269         }
00270 
00271         return 0;
00272     }
00273 
00274     Log::Index getNumAllocations() const { return raft_core_.getNumAllocations(); }
00275 
00276     /**
00277      * These accessors are needed for debugging, visualization and testing.
00278      */
00279     const RaftCore& getRaftCore() const { return raft_core_; }
00280 };
00281 
00282 /**
00283  * This structure represents immediate state of the server.
00284  * It can be used for state visualization and debugging.
00285  */
00286 struct StateReport
00287 {
00288     uint8_t cluster_size;
00289 
00290     RaftCore::ServerState state;
00291 
00292     Log::Index last_log_index;
00293     Log::Index commit_index;
00294 
00295     Term last_log_term;
00296     Term current_term;
00297 
00298     NodeID voted_for;
00299 
00300     MonotonicTime last_activity_timestamp;
00301     MonotonicDuration randomized_timeout;
00302 
00303     uint8_t num_unknown_nodes;
00304 
00305     struct FollowerState
00306     {
00307         NodeID node_id;
00308         Log::Index next_index;
00309         Log::Index match_index;
00310 
00311         FollowerState()
00312             : next_index(0)
00313             , match_index(0)
00314         { }
00315     } followers[ClusterManager::MaxClusterSize - 1];
00316 
00317     StateReport(const Server& s)
00318         : cluster_size           (s.getRaftCore().getClusterManager().getClusterSize())
00319         , state                  (s.getRaftCore().getServerState())
00320         , last_log_index         (s.getRaftCore().getPersistentState().getLog().getLastIndex())
00321         , commit_index           (s.getRaftCore().getCommitIndex())
00322         , last_log_term          (0)    // See below
00323         , current_term           (s.getRaftCore().getPersistentState().getCurrentTerm())
00324         , voted_for              (s.getRaftCore().getPersistentState().getVotedFor())
00325         , last_activity_timestamp(s.getRaftCore().getLastActivityTimestamp())
00326         , randomized_timeout     (s.getRaftCore().getRandomizedTimeout())
00327         , num_unknown_nodes      (s.getNodeDiscoverer().getNumUnknownNodes())
00328     {
00329         const Entry* const e = s.getRaftCore().getPersistentState().getLog().getEntryAtIndex(last_log_index);
00330         UAVCAN_ASSERT(e != UAVCAN_NULLPTR);
00331         if (e != UAVCAN_NULLPTR)
00332         {
00333             last_log_term = e->term;
00334         }
00335 
00336         for (uint8_t i = 0; i < (cluster_size - 1U); i++)
00337         {
00338             const ClusterManager& mgr = s.getRaftCore().getClusterManager();
00339             const NodeID node_id = mgr.getRemoteServerNodeIDAtIndex(i);
00340             if (node_id.isUnicast())
00341             {
00342                 followers[i].node_id     = node_id;
00343                 followers[i].next_index  = mgr.getServerNextIndex(node_id);
00344                 followers[i].match_index = mgr.getServerMatchIndex(node_id);
00345             }
00346         }
00347     }
00348 };
00349 
00350 }
00351 }
00352 }
00353 
00354 #endif // Include guard