libuav original
Dependents: UAVCAN UAVCAN_Subscriber
log.hpp
00001 /* 00002 * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com> 00003 */ 00004 00005 #ifndef UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED 00006 #define UAVCAN_PROTOCOL_DYNAMIC_NODE_ID_SERVER_DISTRIBUTED_LOG_HPP_INCLUDED 00007 00008 #include <uavcan/build_config.hpp> 00009 #include <uavcan/debug.hpp> 00010 #include <uavcan/protocol/dynamic_node_id_server/distributed/types.hpp> 00011 #include <uavcan/protocol/dynamic_node_id_server/storage_marshaller.hpp> 00012 #include <uavcan/protocol/dynamic_node_id_server/event.hpp> 00013 00014 namespace uavcan 00015 { 00016 namespace dynamic_node_id_server 00017 { 00018 namespace distributed 00019 { 00020 /** 00021 * Raft log. 00022 * This class transparently replicates its state to the storage backend, keeping the most recent state in memory. 00023 * Writes are slow, reads are instantaneous. 00024 */ 00025 class Log 00026 { 00027 public: 00028 typedef uint8_t Index; 00029 00030 enum { Capacity = NodeID::Max + 1 }; 00031 00032 private: 00033 IStorageBackend& storage_; 00034 IEventTracer& tracer_; 00035 Entry entries_[Capacity]; 00036 Index last_index_; // Index zero always contains an empty entry 00037 00038 static IStorageBackend::String getLastIndexKey() { return "log_last_index"; } 00039 00040 static IStorageBackend::String makeEntryKey(Index index, const char* postfix) 00041 { 00042 IStorageBackend::String str; 00043 // "log0_foobar" 00044 str += "log"; 00045 str.appendFormatted("%d", int(index)); 00046 str += "_"; 00047 str += postfix; 00048 return str; 00049 } 00050 00051 int readEntryFromStorage(Index index, Entry& out_entry) 00052 { 00053 const StorageMarshaller io(storage_); 00054 00055 // Term 00056 if (io.get(makeEntryKey(index, "term"), out_entry.term) < 0) 00057 { 00058 return -ErrFailure; 00059 } 00060 00061 // Unique ID 00062 if (io.get(makeEntryKey(index, "unique_id"), out_entry.unique_id) < 0) 00063 { 00064 return -ErrFailure; 00065 } 00066 00067 // Node ID 00068 uint32_t node_id = 0; 00069 if (io.get(makeEntryKey(index, "node_id"), node_id) < 0) 00070 { 00071 return -ErrFailure; 00072 } 00073 if (node_id > NodeID::Max) 00074 { 00075 return -ErrFailure; 00076 } 00077 out_entry.node_id = static_cast<uint8_t>(node_id); 00078 00079 return 0; 00080 } 00081 00082 int writeEntryToStorage(Index index, const Entry& entry) 00083 { 00084 Entry temp = entry; 00085 00086 StorageMarshaller io(storage_); 00087 00088 // Term 00089 if (io.setAndGetBack(makeEntryKey(index, "term"), temp.term) < 0) 00090 { 00091 return -ErrFailure; 00092 } 00093 00094 // Unique ID 00095 if (io.setAndGetBack(makeEntryKey(index, "unique_id"), temp.unique_id) < 0) 00096 { 00097 return -ErrFailure; 00098 } 00099 00100 // Node ID 00101 uint32_t node_id = entry.node_id; 00102 if (io.setAndGetBack(makeEntryKey(index, "node_id"), node_id) < 0) 00103 { 00104 return -ErrFailure; 00105 } 00106 temp.node_id = static_cast<uint8_t>(node_id); 00107 00108 return (temp == entry) ? 0 : -ErrFailure; 00109 } 00110 00111 int initEmptyLogStorage() 00112 { 00113 StorageMarshaller io(storage_); 00114 00115 /* 00116 * Writing the zero entry - it must always be default-initialized 00117 */ 00118 entries_[0] = Entry(); 00119 int res = writeEntryToStorage(0, entries_[0]); 00120 if (res < 0) 00121 { 00122 return res; 00123 } 00124 00125 /* 00126 * Initializing last index 00127 * Last index must be written AFTER the zero entry, otherwise if the write fails here the storage will be 00128 * left in an inconsistent state. 00129 */ 00130 last_index_ = 0; 00131 uint32_t stored_index = 0; 00132 res = io.setAndGetBack(getLastIndexKey(), stored_index); 00133 if (res < 0) 00134 { 00135 return res; 00136 } 00137 if (stored_index != 0) 00138 { 00139 return -ErrFailure; 00140 } 00141 00142 return 0; 00143 } 00144 00145 public: 00146 Log(IStorageBackend& storage, IEventTracer& tracer) 00147 : storage_(storage) 00148 , tracer_(tracer) 00149 , last_index_(0) 00150 { } 00151 00152 int init() 00153 { 00154 StorageMarshaller io(storage_); 00155 00156 // Reading max index 00157 { 00158 uint32_t value = 0; 00159 if (io.get(getLastIndexKey(), value) < 0) 00160 { 00161 if (storage_.get(getLastIndexKey()).empty()) 00162 { 00163 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Initializing empty storage"); 00164 return initEmptyLogStorage(); 00165 } 00166 else 00167 { 00168 // There's some data in the storage, but it cannot be parsed - reporting an error 00169 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Failed to read last index"); 00170 return -ErrFailure; 00171 } 00172 } 00173 if (value >= Capacity) 00174 { 00175 return -ErrFailure; 00176 } 00177 last_index_ = Index(value); 00178 } 00179 00180 tracer_.onEvent(TraceRaftLogLastIndexRestored, last_index_); 00181 00182 // Restoring log entries - note that index 0 always exists 00183 for (Index index = 0; index <= last_index_; index++) 00184 { 00185 const int result = readEntryFromStorage(index, entries_[index]); 00186 if (result < 0) 00187 { 00188 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Failed to read entry at index %u: %d", 00189 unsigned(index), result); 00190 return result; 00191 } 00192 } 00193 00194 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Restored %u log entries", unsigned(last_index_)); 00195 return 0; 00196 } 00197 00198 /** 00199 * This method invokes storage IO. 00200 * Returned value indicates whether the entry was successfully appended. 00201 */ 00202 int append(const Entry& entry) 00203 { 00204 if ((last_index_ + 1) >= Capacity) 00205 { 00206 return -ErrLogic; 00207 } 00208 00209 tracer_.onEvent(TraceRaftLogAppend, last_index_ + 1U); 00210 00211 // If next operations fail, we'll get a dangling entry, but it's absolutely OK. 00212 int res = writeEntryToStorage(Index(last_index_ + 1), entry); 00213 if (res < 0) 00214 { 00215 return res; 00216 } 00217 00218 // Updating the last index 00219 StorageMarshaller io(storage_); 00220 uint32_t new_last_index = last_index_ + 1U; 00221 res = io.setAndGetBack(getLastIndexKey(), new_last_index); 00222 if (res < 0) 00223 { 00224 return res; 00225 } 00226 if (new_last_index != last_index_ + 1U) 00227 { 00228 return -ErrFailure; 00229 } 00230 entries_[new_last_index] = entry; 00231 last_index_ = Index(new_last_index); 00232 00233 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "New entry, index %u, node ID %u, term %u", 00234 unsigned(last_index_), unsigned(entry.node_id), unsigned(entry.term)); 00235 return 0; 00236 } 00237 00238 /** 00239 * This method invokes storage IO. 00240 * Returned value indicates whether the requested operation has been carried out successfully. 00241 */ 00242 int removeEntriesWhereIndexGreaterOrEqual(Index index) 00243 { 00244 UAVCAN_ASSERT(last_index_ < Capacity); 00245 00246 if (((index) >= Capacity) || (index <= 0)) 00247 { 00248 return -ErrLogic; 00249 } 00250 00251 uint32_t new_last_index = index - 1U; 00252 00253 tracer_.onEvent(TraceRaftLogRemove, new_last_index); 00254 00255 if (new_last_index != last_index_) 00256 { 00257 StorageMarshaller io(storage_); 00258 int res = io.setAndGetBack(getLastIndexKey(), new_last_index); 00259 if (res < 0) 00260 { 00261 return res; 00262 } 00263 if (new_last_index != index - 1U) 00264 { 00265 return -ErrFailure; 00266 } 00267 UAVCAN_TRACE("dynamic_node_id_server::distributed::Log", "Entries removed, last index %u --> %u", 00268 unsigned(last_index_), unsigned(new_last_index)); 00269 last_index_ = Index(new_last_index); 00270 } 00271 00272 // Removal operation leaves dangling entries in storage, it's OK 00273 return 0; 00274 } 00275 00276 int removeEntriesWhereIndexGreater(Index index) 00277 { 00278 return removeEntriesWhereIndexGreaterOrEqual(Index(index + 1U)); 00279 } 00280 00281 /** 00282 * Returns nullptr if there's no such index. 00283 * This method does not use storage IO. 00284 */ 00285 const Entry* getEntryAtIndex(Index index) const 00286 { 00287 UAVCAN_ASSERT(last_index_ < Capacity); 00288 return (index <= last_index_) ? &entries_[index] : UAVCAN_NULLPTR; 00289 } 00290 00291 Index getLastIndex() const { return last_index_; } 00292 00293 bool isOtherLogUpToDate(Index other_last_index, Term other_last_term) const 00294 { 00295 UAVCAN_ASSERT(last_index_ < Capacity); 00296 // Terms are different - the one with higher term is more up-to-date 00297 if (other_last_term != entries_[last_index_].term) 00298 { 00299 return other_last_term > entries_[last_index_].term; 00300 } 00301 // Terms are equal - longer log wins 00302 return other_last_index >= last_index_; 00303 } 00304 }; 00305 00306 } 00307 } 00308 } 00309 00310 #endif // Include guard
Generated on Tue Jul 12 2022 17:17:33 by 1.7.2