S Morita / mbed-mros2

Dependents:   mbed-os-example-mros2 example-mbed-mros2-sub-pose example-mbed-mros2-pub-twist example-mbed-mros2-mturtle-teleop

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mros2.cpp Source File

mros2.cpp

00001 #include "mros2.h"
00002 #include "mros2_user_config.h"
00003 
00004 #include <rtps/rtps.h>
00005 
00006 #ifdef __MBED__
00007 #include "mbed.h"
00008 #else  /* __MBED__ */
00009 #include "cmsis_os.h"
00010 #endif /* __MBED__ */
00011 
00012 
00013 namespace mros2
00014 {
00015 
00016 rtps::Domain *domain_ptr = NULL;
00017 rtps::Participant *part_ptr = NULL; //TODO: detele this
00018 rtps::Writer *pub_ptr = NULL;
00019 
00020 #define SUB_MSG_SIZE    4   // addr size
00021 osMessageQueueId_t subscriber_msg_queue_id;
00022 
00023 bool completeNodeInit = false;
00024 uint8_t endpointId = 0;
00025 uint32_t subCbArray[10];
00026 
00027 uint8_t buf[100];
00028 uint8_t buf_index = 4;
00029 
00030 /* Callback function to set the boolean to true upon a match */
00031 void setTrue(void* args)
00032 {
00033   *static_cast<volatile bool*>(args) = true;
00034 }
00035 
00036 bool subMatched = false;
00037 bool pubMatched = false;
00038 
00039 void pubMatch(void* args)
00040 {
00041   MROS2_DEBUG("[MROS2LIB] publisher matched with remote subscriber");
00042 }
00043 
00044 void subMatch(void* args)
00045 {
00046   MROS2_DEBUG("[MROS2LIB] subscriber matched with remote publisher");
00047 }
00048 
00049 
00050 /*
00051  *  Initialization of mROS 2 environment
00052  */
00053 #ifdef __MBED__
00054 Thread *mros2_init_thread;
00055 #endif /* __MBED__ */
00056 void init(int argc, char * argv[])
00057 {
00058   buf[0] = 0;
00059   buf[1] = 1;
00060   buf[2] = 0;
00061   buf[3] = 0;
00062 
00063 #ifdef __MBED__
00064   mros2_init_thread =  new Thread(osPriorityAboveNormal, 2000, nullptr, "mROS2Thread");
00065   mros2_init_thread->start(callback(mros2_init, (void *)NULL));
00066 #else /* __MBED__ */
00067   osThreadAttr_t attributes;
00068 
00069   attributes.name = "mROS2Thread",
00070   attributes.stack_size = 1000,
00071   attributes.priority = (osPriority_t)24,
00072 
00073   osThreadNew(mros2_init, NULL, (const osThreadAttr_t*)&attributes);
00074 #endif /* __MBED__ */
00075 
00076 }
00077 
00078 void mros2_init(void *args)
00079 {
00080   osStatus_t ret;
00081 
00082   MROS2_DEBUG("[MROS2LIB] mros2_init task start");
00083 
00084 #ifndef __MBED__
00085   MX_LWIP_Init();
00086   MROS2_DEBUG("[MROS2LIB] Initilizing lwIP complete");
00087 #endif /* __MBED__ */
00088 
00089   int sub_msg_count;
00090   static rtps::Domain domain;
00091   domain_ptr = &domain;
00092 
00093 #ifndef __MBED__
00094   sub_msg_count = mros2_get_submsg_count();
00095   subscriber_msg_queue_id = osMessageQueueNew(sub_msg_count, SUB_MSG_SIZE, NULL);
00096   if (subscriber_msg_queue_id == NULL) {
00097     MROS2_ERROR("[MROS2LIB] ERROR: mROS2 init failed");
00098     return;
00099   }
00100 #endif /* __MBED__ */
00101 
00102   /* wait until participant(node) is created */
00103   while(!completeNodeInit) {
00104     osDelay(100);
00105   }
00106   domain.completeInit();
00107   MROS2_DEBUG("[MROS2LIB] Initilizing Domain complete");
00108 
00109   while(!subMatched && !pubMatched) {
00110     osDelay(1000);
00111   }
00112 
00113   MROS2_DEBUG("[MROS2LIB] mros2_init task end");
00114 
00115   ret = osThreadTerminate(NULL);
00116   if (ret != osOK) {
00117     MROS2_ERROR("[MROS2LIB] ERROR: mros2_init() task terminate error %d", ret);
00118   }
00119 }
00120 
00121 
00122 /*
00123  *  Node functions
00124  */
00125 Node Node::create_node(std::string node_name)
00126 {
00127   MROS2_DEBUG("[MROS2LIB] create_node");
00128   MROS2_DEBUG("[MROS2LIB] start creating participant");
00129 
00130   while(domain_ptr == NULL) {
00131     osDelay(100);
00132   }
00133 
00134   Node node;
00135   node.part = domain_ptr->createParticipant();
00136   /* TODO: utilize node name */
00137   node.node_name = node_name;
00138   part_ptr = node.part;
00139   if(node.part == nullptr) {
00140     MROS2_ERROR("[MROS2LIB] ERROR: create_node() failed");
00141     while(true) {}
00142   }
00143   completeNodeInit = true;
00144 
00145   MROS2_DEBUG("[MROS2LIB] successfully created participant");
00146   return node;
00147 }
00148 
00149 
00150 /*
00151  *  Publisher functions
00152  */
00153 template <class T>
00154 Publisher Node::create_publisher(std::string topic_name, int qos)
00155 {
00156   rtps::Writer* writer = domain_ptr->createWriter(*part_ptr, ("rt/"+topic_name).c_str(), message_traits::TypeName<T*>().value(), false);
00157   if(writer == nullptr) {
00158     MROS2_ERROR("[MROS2LIB] ERROR: failed to create writer in create_publisher()");
00159     while(true) {}
00160   }
00161 
00162   Publisher pub;
00163   pub_ptr = writer;
00164   pub.topic_name = topic_name;
00165 
00166   /* Register callback to ensure that a publisher is matched to the writer before sending messages */
00167   part_ptr->registerOnNewSubscriberMatchedCallback(pubMatch, &subMatched);
00168 
00169   MROS2_DEBUG("[MROS2LIB] create_publisher complete.");
00170   return pub;
00171 }
00172 
00173 template <class T>
00174 void Publisher::publish(T& msg)
00175 {
00176   msg.copyToBuf(&buf[4]);
00177   msg.memAlign(&buf[4]);
00178   pub_ptr->newChange(rtps::ChangeKind_t::ALIVE, buf, msg.getTotalSize() + 4);
00179 }
00180 
00181 
00182 /*
00183  *  Subscriber functions
00184  */
00185 typedef struct {
00186   void (*cb_fp)(intptr_t);
00187   intptr_t argp;
00188 } SubscribeDataType;
00189 
00190 template <class T>
00191 Subscriber Node::create_subscription(std::string topic_name, int qos, void(*fp)(T*))
00192 {
00193   rtps::Reader* reader = domain_ptr->createReader(*(this->part), ("rt/"+topic_name).c_str(), message_traits::TypeName<T*>().value(), false);
00194   if(reader == nullptr) {
00195     MROS2_ERROR("[MROS2LIB] ERROR: failed to create reader in create_subscription()");
00196     while(true) {}
00197   }
00198 
00199   Subscriber sub;
00200   sub.topic_name = topic_name;
00201   sub.cb_fp = (void (*)(intptr_t))fp;
00202 
00203   SubscribeDataType *data_p;
00204   data_p = new SubscribeDataType;
00205   data_p->cb_fp = (void (*)(intptr_t))fp;
00206   data_p->argp = (intptr_t)NULL;
00207   reader->registerCallback(sub.callback_handler<T>, (void *)data_p);
00208 
00209   /* Register callback to ensure that a subscriber is matched to the reader before receiving messages */
00210   part_ptr->registerOnNewPublisherMatchedCallback(subMatch, &pubMatched);
00211 
00212   MROS2_DEBUG("[MROS2LIB] create_subscription complete. data memory address=0x%x", data_p);
00213   return sub;
00214 }
00215 
00216 template <class T>
00217 void Subscriber::callback_handler(void *callee, const rtps::ReaderCacheChange &cacheChange)
00218 {
00219   T msg;
00220   msg.copyFromBuf(&cacheChange.data[4]);
00221 
00222   SubscribeDataType *sub = (SubscribeDataType *)callee;
00223   void (*fp)(intptr_t) = sub->cb_fp;
00224   fp((intptr_t)&msg);
00225 }
00226 
00227 
00228 /*
00229  *  Other utility functions
00230  */
00231 void spin()
00232 {
00233   while(true) {
00234 #ifndef __MBED__
00235     osStatus_t ret;
00236     SubscribeDataType* msg;
00237     ret = osMessageQueueGet(subscriber_msg_queue_id, &msg, NULL, osWaitForever);
00238     if (ret != osOK) {
00239       MROS2_ERROR("[MROS2LIB] ERROR: mROS2 spin() wait error %d", ret);
00240     }
00241 #else /* __MBED__ */
00242     // The queue above seems not to be pushed anywhere. So just sleep.
00243     ThisThread::sleep_for(1000);
00244 #endif /* __MBED__ */
00245   }
00246 }
00247 
00248 }  /* namespace mros2 */
00249 
00250 
00251 /*
00252  *  Declaration for embeddedRTPS participants
00253  */
00254 void *networkSubDriverPtr;
00255 void *networkPubDriverPtr;
00256 void (*hbPubFuncPtr)(void *);
00257 void (*hbSubFuncPtr)(void *);
00258 
00259 extern "C" void callHbPubFunc(void *arg)
00260 {
00261   if(hbPubFuncPtr != NULL && networkPubDriverPtr != NULL) {
00262     (*hbPubFuncPtr)(networkPubDriverPtr);
00263   }
00264 }
00265 extern "C" void callHbSubFunc(void *arg)
00266 {
00267   if(hbSubFuncPtr != NULL && networkSubDriverPtr != NULL) {
00268     (*hbSubFuncPtr)(networkSubDriverPtr);
00269   }
00270 }
00271 
00272 void setTrue(void* args)
00273 {
00274   *static_cast<volatile bool*>(args) = true;
00275 }
00276 
00277 /*
00278  * specialize template functions described in platform's workspace
00279  */
00280 #include "templates.hpp"