libuav original

Dependents:   UAVCAN UAVCAN_Subscriber

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers log.hpp Source File

log.hpp

00001 /*
00002  * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
00003  */
00004 
00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED
00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED
00007 
00008 #include <uavcan/build_config.hpp>
00009 #include <uavcan/debug.hpp>
00010 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp>
00011 #include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp>
00012 #include <uavcan/protocol/dynamic_node_id_server/event.hpp>
00013 
00014 namespace uavcan
00015 {
00016 namespace dynamic_node_id_server
00017 {
00018 namespace distributed
00019 {
00020 /**
00021  * Raft log.
00022  * This class transparently replicates its state to the storage backend, keeping the most recent state in memory.
00023  * Writes are slow, reads are instantaneous.
00024  */
00025 class Log
00026 {
00027 public:
00028     typedef uint8_t Index;
00029 
00030     enum { Capacity = NodeID::Max + 1 };
00031 
00032 private:
00033     IStorageBackend& storage_;
00034     IEventTracer& tracer_;
00035     Entry entries_[Capacity];
00036     Index last_index_;             // Index zero always contains an empty entry
00037 
00038     static IStorageBackend::String getLastIndexKey() { return "log_last_index"; }
00039 
00040     static IStorageBackend::String makeEntryKey(Index index, const char* postfix)
00041     {
00042         IStorageBackend::String str;
00043         // "log0_foobar"
00044         str += "log";
00045         str.appendFormatted("%d", int(index));
00046         str += "_";
00047         str += postfix;
00048         return str;
00049     }
00050 
00051     int readEntryFromStorage(Index index, Entry& out_entry)
00052     {
00053         const StorageMarshaller io(storage_);
00054 
00055         // Term
00056         if (io.get(makeEntryKey(index, "term"), out_entry.term) < 0)
00057         {
00058             return -ErrFailure;
00059         }
00060 
00061         // Unique ID
00062         if (io.get(makeEntryKey(index, "unique_id"), out_entry.unique_id) < 0)
00063         {
00064             return -ErrFailure;
00065         }
00066 
00067         // Node ID
00068         uint32_t node_id = 0;
00069         if (io.get(makeEntryKey(index, "node_id"), node_id) < 0)
00070         {
00071             return -ErrFailure;
00072         }
00073         if (node_id > NodeID::Max)
00074         {
00075             return -ErrFailure;
00076         }
00077         out_entry.node_id = static_cast<uint8_t>(node_id);
00078 
00079         return 0;
00080     }
00081 
00082     int writeEntryToStorage(Index index, const Entry& entry)
00083     {
00084         Entry temp = entry;
00085 
00086         StorageMarshaller io(storage_);
00087 
00088         // Term
00089         if (io.setAndGetBack(makeEntryKey(index, "term"), temp.term) < 0)
00090         {
00091             return -ErrFailure;
00092         }
00093 
00094         // Unique ID
00095         if (io.setAndGetBack(makeEntryKey(index, "unique_id"), temp.unique_id) < 0)
00096         {
00097             return -ErrFailure;
00098         }
00099 
00100         // Node ID
00101         uint32_t node_id = entry.node_id;
00102         if (io.setAndGetBack(makeEntryKey(index, "node_id"), node_id) < 0)
00103         {
00104             return -ErrFailure;
00105         }
00106         temp.node_id = static_cast<uint8_t>(node_id);
00107 
00108         return (temp == entry) ? 0 : -ErrFailure;
00109     }
00110 
00111     int initEmptyLogStorage()
00112     {
00113         StorageMarshaller io(storage_);
00114 
00115         /*
00116          * Writing the zero entry - it must always be default-initialized
00117          */
00118         entries_[0] = Entry();
00119         int res = writeEntryToStorage(0, entries_[0]);
00120         if (res < 0)
00121         {
00122             return res;
00123         }
00124 
00125         /*
00126          * Initializing last index
00127          * Last index must be written AFTER the zero entry, otherwise if the write fails here the storage will be
00128          * left in an inconsistent state.
00129          */
00130         last_index_ = 0;
00131         uint32_t stored_index = 0;
00132         res = io.setAndGetBack(getLastIndexKey(), stored_index);
00133         if (res < 0)
00134         {
00135             return res;
00136         }
00137         if (stored_index != 0)
00138         {
00139             return -ErrFailure;
00140         }
00141 
00142         return 0;
00143     }
00144 
00145 public:
00146     Log(IStorageBackend& storage, IEventTracer& tracer)
00147         : storage_(storage)
00148         , tracer_(tracer)
00149         , last_index_(0)
00150     { }
00151 
00152     int init()
00153     {
00154         StorageMarshaller io(storage_);
00155 
00156         // Reading max index
00157         {
00158             uint32_t value = 0;
00159             if (io.get(getLastIndexKey(), value) < 0)
00160             {
00161                 if (storage_.get(getLastIndexKey()).empty())
00162                 {
00163                     UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Initializing empty storage");
00164                     return initEmptyLogStorage();
00165                 }
00166                 else
00167                 {
00168                     // There's some data in the storage, but it cannot be parsed - reporting an error
00169                     UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Failed to read last index");
00170                     return -ErrFailure;
00171                 }
00172             }
00173             if (value >= Capacity)
00174             {
00175                 return -ErrFailure;
00176             }
00177             last_index_ = Index(value);
00178         }
00179 
00180         tracer_.onEvent(TraceRaftLogLastIndexRestored, last_index_);
00181 
00182         // Restoring log entries - note that index 0 always exists
00183         for (Index index = 0; index <= last_index_; index++)
00184         {
00185             const int result = readEntryFromStorage(index, entries_[index]);
00186             if (result < 0)
00187             {
00188                 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Failed to read entry at index %u: %d",
00189                              unsigned(index), result);
00190                 return result;
00191             }
00192         }
00193 
00194         UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Restored %u log entries", unsigned(last_index_));
00195         return 0;
00196     }
00197 
00198     /**
00199      * This method invokes storage IO.
00200      * Returned value indicates whether the entry was successfully appended.
00201      */
00202     int append(const Entry& entry)
00203     {
00204         if ((last_index_ + 1) >= Capacity)
00205         {
00206             return -ErrLogic;
00207         }
00208 
00209         tracer_.onEvent(TraceRaftLogAppend, last_index_ + 1U);
00210 
00211         // If next operations fail, we'll get a dangling entry, but it's absolutely OK.
00212         int res = writeEntryToStorage(Index(last_index_ + 1), entry);
00213         if (res < 0)
00214         {
00215             return res;
00216         }
00217 
00218         // Updating the last index
00219         StorageMarshaller io(storage_);
00220         uint32_t new_last_index = last_index_ + 1U;
00221         res = io.setAndGetBack(getLastIndexKey(), new_last_index);
00222         if (res < 0)
00223         {
00224             return res;
00225         }
00226         if (new_last_index != last_index_ + 1U)
00227         {
00228             return -ErrFailure;
00229         }
00230         entries_[new_last_index] = entry;
00231         last_index_ = Index(new_last_index);
00232 
00233         UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "New entry, index %u, node ID %u, term %u",
00234                      unsigned(last_index_), unsigned(entry.node_id), unsigned(entry.term));
00235         return 0;
00236     }
00237 
00238     /**
00239      * This method invokes storage IO.
00240      * Returned value indicates whether the requested operation has been carried out successfully.
00241      */
00242     int removeEntriesWhereIndexGreaterOrEqual(Index index)
00243     {
00244         UAVCAN_ASSERT(last_index_ < Capacity);
00245 
00246         if (((index) >= Capacity) || (index <= 0))
00247         {
00248             return -ErrLogic;
00249         }
00250 
00251         uint32_t new_last_index = index - 1U;
00252 
00253         tracer_.onEvent(TraceRaftLogRemove, new_last_index);
00254 
00255         if (new_last_index != last_index_)
00256         {
00257             StorageMarshaller io(storage_);
00258             int res = io.setAndGetBack(getLastIndexKey(), new_last_index);
00259             if (res < 0)
00260             {
00261                 return res;
00262             }
00263             if (new_last_index != index - 1U)
00264             {
00265                 return -ErrFailure;
00266             }
00267             UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Entries removed, last index %u --> %u",
00268                          unsigned(last_index_), unsigned(new_last_index));
00269             last_index_ = Index(new_last_index);
00270         }
00271 
00272         // Removal operation leaves dangling entries in storage, it's OK
00273         return 0;
00274     }
00275 
00276     int removeEntriesWhereIndexGreater(Index index)
00277     {
00278         return removeEntriesWhereIndexGreaterOrEqual(Index(index + 1U));
00279     }
00280 
00281     /**
00282      * Returns nullptr if there's no such index.
00283      * This method does not use storage IO.
00284      */
00285     const Entry* getEntryAtIndex(Index index) const
00286     {
00287         UAVCAN_ASSERT(last_index_ < Capacity);
00288         return (index <= last_index_) ? &entries_[index] : UAVCAN_NULLPTR;
00289     }
00290 
00291     Index getLastIndex() const { return last_index_; }
00292 
00293     bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const
00294     {
00295         UAVCAN_ASSERT(last_index_ < Capacity);
00296         // Terms are different - the one with higher term is more up-to-date
00297         if (other_last_term != entries_[last_index_].term)
00298         {
00299             return other_last_term > entries_[last_index_].term;
00300         }
00301         // Terms are equal - longer log wins
00302         return other_last_index >= last_index_;
00303     }
00304 };
00305 
00306 }
00307 }
00308 }
00309 
00310 #endif // Include guard