Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Threading.cpp
00001 /************************************************************************************** 00002 * Copyright (c) 2016, Tomoaki Yamaguchi 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation 00015 **************************************************************************************/ 00016 00017 #include "MQTTSNGWProcess.h" 00018 #include "Threading.h" 00019 #include <stdlib.h> 00020 #include <sys/types.h> 00021 #include <sys/ipc.h> 00022 #include <sys/shm.h> 00023 #include <sys/stat.h> 00024 #include <semaphore.h> 00025 #include <fcntl.h> 00026 #include <string.h> 00027 #include <pthread.h> 00028 #include <unistd.h> 00029 00030 using namespace std; 00031 using namespace MQTTSNGW; 00032 00033 #if defined(OSX) 00034 int sem_timedwait(sem_type sem, const struct timespec *timeout) 00035 { 00036 int rc = -1; 00037 int64_t tout = timeout->tv_sec * 1000L + tv_nsec * 1000000L 00038 rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, tout)); 00039 if (rc != 0) 00040 { 00041 rc = ETIMEDOUT; 00042 } 00043 return rc; 00044 } 00045 #endif 00046 00047 /*===================================== 00048 Class Mutex 00049 =====================================*/ 00050 00051 Mutex::Mutex(void) 00052 { 00053 pthread_mutexattr_t attr; 00054 pthread_mutexattr_init(&attr); 00055 pthread_mutex_init(&_mutex, &attr); 00056 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); 00057 _shmid = 0; 00058 _pmutex = 0; 00059 } 00060 00061 Mutex::Mutex(const char* fileName) 00062 { 00063 pthread_mutexattr_t attr; 00064 00065 key_t key = ftok(fileName, 1); 00066 00067 if ((_shmid = shmget(key, sizeof(pthread_mutex_t), IPC_CREAT | 0666)) < 0) 00068 { 00069 throw Exception( -1, "Mutex can't create a shared memory."); 00070 } 00071 _pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0); 00072 if (_pmutex < 0) 00073 { 00074 throw Exception( -1, "Mutex can't attach shared memory."); 00075 } 00076 00077 pthread_mutexattr_init(&attr); 00078 00079 if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) 00080 { 00081 throw Exception( -1, "Mutex can't set the process-shared flag"); 00082 } 00083 if (pthread_mutex_init(_pmutex, &attr) != 0) 00084 { 00085 throw Exception( -1, "Mutex can't initialize."); 00086 } 00087 } 00088 00089 Mutex::~Mutex(void) 00090 { 00091 if (_pmutex) 00092 { 00093 pthread_mutex_lock(_pmutex); 00094 pthread_mutex_unlock(_pmutex); 00095 pthread_mutex_destroy(_pmutex); 00096 } 00097 else 00098 { 00099 pthread_mutex_lock(&_mutex); 00100 pthread_mutex_unlock(&_mutex); 00101 pthread_mutex_destroy(&_mutex); 00102 } 00103 if (_shmid) 00104 { 00105 shmctl(_shmid, IPC_RMID, NULL); 00106 } 00107 } 00108 00109 void Mutex::lock(void) 00110 { 00111 if (_pmutex) 00112 { 00113 pthread_mutex_lock(_pmutex); 00114 } 00115 else 00116 { 00117 try 00118 { 00119 if (pthread_mutex_lock(&_mutex)) 00120 { 00121 throw; 00122 } 00123 } catch (char* errmsg) 00124 { 00125 throw Exception( -1, "The same thread can't aquire a mutex twice."); 00126 } 00127 } 00128 } 00129 00130 void Mutex::unlock(void) 00131 { 00132 00133 if (_pmutex) 00134 { 00135 pthread_mutex_unlock(_pmutex); 00136 } 00137 else 00138 { 00139 try 00140 { 00141 if (pthread_mutex_unlock(&_mutex)) 00142 { 00143 throw; 00144 } 00145 } catch (char* errmsg) 00146 { 00147 throw Exception( -1, "Mutex can't unlock."); 00148 } 00149 } 00150 } 00151 00152 /*===================================== 00153 Class Semaphore 00154 =====================================*/ 00155 00156 Semaphore::Semaphore() 00157 { 00158 sem_init(&_sem, 0, 0); 00159 _name = 0; 00160 _psem = 0; 00161 } 00162 00163 Semaphore::Semaphore(unsigned int val) 00164 { 00165 sem_init(&_sem, 0, val); 00166 _name = 0; 00167 _psem = 0; 00168 } 00169 00170 Semaphore::Semaphore(const char* name, unsigned int val) 00171 { 00172 _psem = sem_open(name, O_CREAT, 0666, val); 00173 if (_psem == SEM_FAILED) 00174 { 00175 throw Exception( -1, "Semaphore can't be created."); 00176 } 00177 _name = strdup(name); 00178 if (_name == NULL) 00179 { 00180 throw Exception( -1, "Semaphore can't allocate memories."); 00181 } 00182 } 00183 00184 Semaphore::~Semaphore() 00185 { 00186 if (_name) 00187 { 00188 sem_close(_psem); 00189 sem_unlink(_name); 00190 free(_name); 00191 } 00192 else 00193 { 00194 sem_destroy(&_sem); 00195 } 00196 } 00197 00198 void Semaphore::post(void) 00199 { 00200 int val = 0; 00201 if (_psem) 00202 { 00203 sem_getvalue(_psem, &val); 00204 if (val <= 0) 00205 { 00206 sem_post(_psem); 00207 } 00208 } 00209 else 00210 { 00211 sem_getvalue(&_sem, &val); 00212 if (val <= 0) 00213 { 00214 sem_post(&_sem); 00215 } 00216 } 00217 } 00218 00219 void Semaphore::wait(void) 00220 { 00221 if (_psem) 00222 { 00223 sem_wait(_psem); 00224 } 00225 else 00226 { 00227 sem_wait(&_sem); 00228 } 00229 } 00230 00231 void Semaphore::timedwait(uint16_t millsec) 00232 { 00233 struct timespec ts; 00234 clock_gettime(CLOCK_REALTIME, &ts); 00235 ts.tv_sec += millsec / 1000; 00236 ts.tv_nsec = (millsec % 1000) * 1000000; 00237 if (_psem) 00238 { 00239 sem_timedwait(_psem, &ts); 00240 } 00241 else 00242 { 00243 sem_timedwait(&_sem, &ts); 00244 } 00245 } 00246 00247 /*========================================= 00248 Class RingBuffer 00249 =========================================*/ 00250 RingBuffer::RingBuffer() 00251 { 00252 RingBuffer(MQTTSNGW_KEY_DIRECTORY); 00253 } 00254 00255 RingBuffer::RingBuffer(const char* keyDirectory) 00256 { 00257 int fp = 0; 00258 string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY); 00259 fp = open(fileName.c_str(), O_CREAT, S_IRGRP); 00260 if ( fp > 0 ) 00261 { 00262 close(fp); 00263 } 00264 00265 fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY); 00266 fp = open(fileName.c_str(), O_CREAT, S_IRGRP); 00267 if ( fp > 0 ) 00268 { 00269 close(fp); 00270 } 00271 00272 key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1); 00273 00274 if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, 00275 IPC_CREAT | IPC_EXCL | 0666)) >= 0) 00276 { 00277 if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) 00278 { 00279 _length = (uint16_t*) _shmaddr; 00280 _start = (uint16_t*) _length + sizeof(uint16_t*); 00281 _end = (uint16_t*) _start + sizeof(uint16_t*); 00282 _buffer = (char*) _end + sizeof(uint16_t*); 00283 _createFlg = true; 00284 00285 *_length = PROCESS_LOG_BUFFER_SIZE - sizeof(uint16_t*) * 3 - 16; 00286 *_start = *_end = 0; 00287 } 00288 else 00289 { 00290 throw Exception(-1, "RingBuffer can't attach shared memory."); 00291 } 00292 } 00293 else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) >= 0) 00294 { 00295 if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) 00296 { 00297 _length = (uint16_t*) _shmaddr; 00298 _start = (uint16_t*) _length + sizeof(uint16_t*); 00299 _end = (uint16_t*) _start + sizeof(uint16_t*); 00300 _buffer = (char*) _end + sizeof(uint16_t*); 00301 _createFlg = false; 00302 } 00303 else 00304 { 00305 throw Exception(-1, "RingBuffer can't create a shared memory."); 00306 } 00307 } 00308 else 00309 { 00310 throw Exception(-1, "RingBuffer can't create a shared memory."); 00311 } 00312 00313 _pmx = new Mutex(MQTTSNGW_RB_MUTEX_KEY); 00314 } 00315 00316 RingBuffer::~RingBuffer() 00317 { 00318 if (_createFlg) 00319 { 00320 if (_shmid > 0) 00321 { 00322 shmctl(_shmid, IPC_RMID, NULL); 00323 } 00324 } 00325 else 00326 { 00327 if (_shmid > 0) 00328 { 00329 shmdt(_shmaddr); 00330 } 00331 } 00332 00333 if (_pmx > 0) 00334 { 00335 delete _pmx; 00336 } 00337 } 00338 00339 void RingBuffer::put(char* data) 00340 { 00341 _pmx->lock(); 00342 00343 uint16_t dlen = strlen(data); 00344 uint16_t blen = *_length - *_end; 00345 00346 if (*_end > *_start) 00347 { 00348 if (dlen < blen) 00349 { 00350 strncpy(_buffer + *_end, data, dlen); 00351 if (*_end - *_start == 1) 00352 { // Buffer is empty. 00353 *_start = *_end; 00354 } 00355 *_end += dlen; 00356 } 00357 else 00358 { 00359 strncpy(_buffer + *_end, data, blen); 00360 strncpy(_buffer, data + blen, dlen - blen); 00361 if (*_end - *_start == 1) 00362 { // Buffer is empty. 00363 *_start = *_end; 00364 *_end = dlen - blen; 00365 } 00366 else 00367 { 00368 *_end = dlen - blen; 00369 *_start = *_end + 1; 00370 } 00371 } 00372 } 00373 else if (*_end == *_start) 00374 { 00375 if (dlen < blen) 00376 { 00377 strncpy(_buffer + *_end, data, dlen); 00378 *_end += dlen; 00379 } 00380 else 00381 { 00382 const char* errmsg = "RingBuffer Error: data is too long"; 00383 strcpy(_buffer + *_end, errmsg); 00384 *_end += strlen(errmsg); 00385 } 00386 } 00387 else 00388 { // *_end < *_start 00389 if (dlen < *_start - *_end) 00390 { 00391 strncpy(_buffer + *_end, data, dlen); 00392 *_end += dlen; 00393 *_start = *_end + 1; 00394 } 00395 else 00396 { 00397 if (dlen < blen) 00398 { 00399 strncpy(_buffer + *_end, data, dlen); 00400 *_end += dlen; 00401 *_start = *_end + 1; 00402 } 00403 else 00404 { 00405 strncpy(_buffer + *_end, data, blen); 00406 strncpy(_buffer, data + blen, dlen - blen); 00407 *_start = *_end; 00408 *_end = dlen - blen; 00409 } 00410 } 00411 } 00412 _pmx->unlock(); 00413 } 00414 00415 int RingBuffer::get(char* buf, int length) 00416 { 00417 int len = 0; 00418 _pmx->lock(); 00419 00420 if (*_end > *_start) 00421 { 00422 if (length > *_end - *_start) 00423 { 00424 len = *_end - *_start; 00425 if (len == 1) 00426 { 00427 len = 0; 00428 } 00429 strncpy(buf, _buffer + *_start, len); 00430 *_start = *_end - 1; 00431 } 00432 else 00433 { 00434 len = length; 00435 strncpy(buf, _buffer + *_start, len); 00436 *_start = *_start + len; 00437 } 00438 } 00439 else if (*_end < *_start) 00440 { 00441 int blen = *_length - *_start; 00442 if (length > blen) 00443 { 00444 strncpy(buf, _buffer + *_start, blen); 00445 *_start = 0; 00446 if (length - (blen + *_end) > 0) 00447 { 00448 strncpy(buf + blen, _buffer, *_end); 00449 len = blen + *_end; 00450 if (*_end > 0) 00451 { 00452 *_start = *_end - 1; 00453 } 00454 } 00455 else 00456 { 00457 strncpy(buf + blen, _buffer, length - blen); 00458 len = length; 00459 *_start = length - blen; 00460 } 00461 } 00462 else 00463 { 00464 strncpy(buf, _buffer + *_start, length); 00465 *_start += length; 00466 len = length; 00467 } 00468 } 00469 _pmx->unlock(); 00470 return len; 00471 } 00472 00473 void RingBuffer::reset() 00474 { 00475 _pmx->lock(); 00476 if ( _start && _end ) 00477 { 00478 *_start = *_end = 0; 00479 } 00480 else 00481 { 00482 throw Exception(-1, "RingBuffer can't reset. need to clear shared memory."); 00483 } 00484 _pmx->unlock(); 00485 } 00486 00487 /*===================================== 00488 Class Thread 00489 =====================================*/ 00490 Thread::Thread() 00491 { 00492 _threadID = 0; 00493 } 00494 00495 Thread::~Thread() 00496 { 00497 } 00498 00499 void* Thread::_run(void* runnable) 00500 { 00501 static_cast<Runnable*>(runnable)->EXECRUN(); 00502 return 0; 00503 } 00504 00505 void Thread::initialize(int argc, char** argv) 00506 { 00507 00508 } 00509 00510 pthread_t Thread::getID() 00511 { 00512 return pthread_self(); 00513 } 00514 00515 bool Thread::equals(pthread_t *t1, pthread_t *t2) 00516 { 00517 return (pthread_equal(*t1, *t2) ? false : true); 00518 } 00519 00520 int Thread::start(void) 00521 { 00522 Runnable* runnable = this; 00523 return pthread_create(&_threadID, 0, _run, runnable); 00524 } 00525 00526 void Thread::stopProcess(void) 00527 { 00528 theMultiTaskProcess->threadStoped(); 00529 } 00530 00531 void Thread::stop(void) 00532 { 00533 if ( _threadID ) 00534 { 00535 pthread_join(_threadID, NULL); 00536 _threadID = 0; 00537 } 00538 }
Generated on Wed Jul 13 2022 10:46:03 by
1.7.2