Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers Threading.cpp Source File

Threading.cpp

00001 /**************************************************************************************
00002  * Copyright (c) 2016, Tomoaki Yamaguchi
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
00015  **************************************************************************************/
00016 
00017 #include "MQTTSNGWProcess.h"
00018 #include "Threading.h"
00019 #include <stdlib.h>
00020 #include <sys/types.h>
00021 #include <sys/ipc.h>
00022 #include <sys/shm.h>
00023 #include <sys/stat.h>
00024 #include <semaphore.h>
00025 #include <fcntl.h>
00026 #include <string.h>
00027 #include <pthread.h>
00028 #include <unistd.h>
00029 
00030 using namespace std;
00031 using namespace MQTTSNGW;
00032 
00033 #if defined(OSX)
00034 int sem_timedwait(sem_type sem, const struct timespec *timeout)
00035 {
00036     int rc = -1;
00037     int64_t tout = timeout->tv_sec * 1000L + tv_nsec * 1000000L
00038     rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, tout));
00039     if (rc != 0)
00040     {
00041         rc = ETIMEDOUT;
00042     }
00043     return rc;
00044 }
00045 #endif
00046 
00047 /*=====================================
00048  Class Mutex
00049  =====================================*/
00050 
00051 Mutex::Mutex(void)
00052 {
00053     pthread_mutexattr_t attr;
00054     pthread_mutexattr_init(&attr);
00055     pthread_mutex_init(&_mutex, &attr);
00056     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
00057     _shmid = 0;
00058     _pmutex = 0;
00059 }
00060 
00061 Mutex::Mutex(const char* fileName)
00062 {
00063     pthread_mutexattr_t attr;
00064 
00065     key_t key = ftok(fileName, 1);
00066 
00067     if ((_shmid = shmget(key, sizeof(pthread_mutex_t), IPC_CREAT | 0666)) < 0)
00068     {
00069         throw Exception( -1, "Mutex can't create a shared memory.");
00070     }
00071     _pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0);
00072     if (_pmutex < 0)
00073     {
00074         throw Exception( -1, "Mutex can't attach shared memory.");
00075     }
00076 
00077     pthread_mutexattr_init(&attr);
00078 
00079     if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0)
00080     {
00081         throw Exception( -1, "Mutex can't set the process-shared flag");
00082     }
00083     if (pthread_mutex_init(_pmutex, &attr) != 0)
00084     {
00085         throw Exception( -1, "Mutex can't initialize.");
00086     }
00087 }
00088 
00089 Mutex::~Mutex(void)
00090 {
00091     if (_pmutex)
00092     {
00093         pthread_mutex_lock(_pmutex);
00094         pthread_mutex_unlock(_pmutex);
00095         pthread_mutex_destroy(_pmutex);
00096     }
00097     else
00098     {
00099         pthread_mutex_lock(&_mutex);
00100         pthread_mutex_unlock(&_mutex);
00101         pthread_mutex_destroy(&_mutex);
00102     }
00103     if (_shmid)
00104     {
00105         shmctl(_shmid, IPC_RMID, NULL);
00106     }
00107 }
00108 
00109 void Mutex::lock(void)
00110 {
00111     if (_pmutex)
00112     {
00113         pthread_mutex_lock(_pmutex);
00114     }
00115     else
00116     {
00117         try
00118         {
00119             if (pthread_mutex_lock(&_mutex))
00120             {
00121                 throw;
00122             }
00123         } catch (char* errmsg)
00124         {
00125             throw Exception( -1, "The same thread can't aquire a mutex twice.");
00126         }
00127     }
00128 }
00129 
00130 void Mutex::unlock(void)
00131 {
00132 
00133     if (_pmutex)
00134     {
00135         pthread_mutex_unlock(_pmutex);
00136     }
00137     else
00138     {
00139         try
00140         {
00141             if (pthread_mutex_unlock(&_mutex))
00142             {
00143                 throw;
00144             }
00145         } catch (char* errmsg)
00146         {
00147             throw Exception( -1, "Mutex can't unlock.");
00148         }
00149     }
00150 }
00151 
00152 /*=====================================
00153  Class Semaphore
00154  =====================================*/
00155 
00156 Semaphore::Semaphore()
00157 {
00158     sem_init(&_sem, 0, 0);
00159     _name = 0;
00160     _psem = 0;
00161 }
00162 
00163 Semaphore::Semaphore(unsigned int val)
00164 {
00165     sem_init(&_sem, 0, val);
00166     _name = 0;
00167     _psem = 0;
00168 }
00169 
00170 Semaphore::Semaphore(const char* name, unsigned int val)
00171 {
00172     _psem = sem_open(name, O_CREAT, 0666, val);
00173     if (_psem == SEM_FAILED)
00174     {
00175         throw Exception( -1, "Semaphore can't be created.");
00176     }
00177     _name = strdup(name);
00178     if (_name == NULL)
00179     {
00180         throw Exception( -1, "Semaphore can't allocate memories.");
00181     }
00182 }
00183 
00184 Semaphore::~Semaphore()
00185 {
00186     if (_name)
00187     {
00188         sem_close(_psem);
00189         sem_unlink(_name);
00190         free(_name);
00191     }
00192     else
00193     {
00194         sem_destroy(&_sem);
00195     }
00196 }
00197 
00198 void Semaphore::post(void)
00199 {
00200     int val = 0;
00201     if (_psem)
00202     {
00203         sem_getvalue(_psem, &val);
00204         if (val <= 0)
00205         {
00206             sem_post(_psem);
00207         }
00208     }
00209     else
00210     {
00211         sem_getvalue(&_sem, &val);
00212         if (val <= 0)
00213         {
00214             sem_post(&_sem);
00215         }
00216     }
00217 }
00218 
00219 void Semaphore::wait(void)
00220 {
00221     if (_psem)
00222     {
00223         sem_wait(_psem);
00224     }
00225     else
00226     {
00227         sem_wait(&_sem);
00228     }
00229 }
00230 
00231 void Semaphore::timedwait(uint16_t millsec)
00232 {
00233     struct timespec ts;
00234     clock_gettime(CLOCK_REALTIME, &ts);
00235     ts.tv_sec += millsec / 1000;
00236     ts.tv_nsec = (millsec % 1000) * 1000000;
00237     if (_psem)
00238     {
00239         sem_timedwait(_psem, &ts);
00240     }
00241     else
00242     {
00243         sem_timedwait(&_sem, &ts);
00244     }
00245 }
00246 
00247 /*=========================================
00248  Class RingBuffer
00249  =========================================*/
00250 RingBuffer::RingBuffer()
00251 {
00252     RingBuffer(MQTTSNGW_KEY_DIRECTORY);
00253 }
00254 
00255 RingBuffer::RingBuffer(const char* keyDirectory)
00256 {
00257     int fp = 0;
00258     string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY);
00259     fp = open(fileName.c_str(), O_CREAT, S_IRGRP);
00260     if ( fp > 0 )
00261     {
00262         close(fp);
00263     }
00264 
00265     fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY);
00266     fp = open(fileName.c_str(), O_CREAT, S_IRGRP);
00267     if ( fp > 0 )
00268     {
00269         close(fp);
00270     }
00271 
00272     key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1);
00273 
00274     if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE,
00275     IPC_CREAT | IPC_EXCL | 0666)) >= 0)
00276     {
00277         if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0)
00278         {
00279             _length = (uint16_t*) _shmaddr;
00280             _start = (uint16_t*) _length + sizeof(uint16_t*);
00281             _end = (uint16_t*) _start + sizeof(uint16_t*);
00282             _buffer = (char*) _end + sizeof(uint16_t*);
00283             _createFlg = true;
00284 
00285             *_length = PROCESS_LOG_BUFFER_SIZE - sizeof(uint16_t*) * 3 - 16;
00286             *_start = *_end = 0;
00287         }
00288         else
00289         {
00290             throw Exception(-1, "RingBuffer can't attach shared memory.");
00291         }
00292     }
00293     else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) >= 0)
00294     {
00295         if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0)
00296         {
00297             _length = (uint16_t*) _shmaddr;
00298             _start = (uint16_t*) _length + sizeof(uint16_t*);
00299             _end = (uint16_t*) _start + sizeof(uint16_t*);
00300             _buffer = (char*) _end + sizeof(uint16_t*);
00301             _createFlg = false;
00302         }
00303         else
00304         {
00305             throw Exception(-1, "RingBuffer can't create a shared memory.");
00306         }
00307     }
00308     else
00309     {
00310         throw Exception(-1, "RingBuffer can't create a shared memory.");
00311     }
00312 
00313     _pmx = new Mutex(MQTTSNGW_RB_MUTEX_KEY);
00314 }
00315 
00316 RingBuffer::~RingBuffer()
00317 {
00318     if (_createFlg)
00319     {
00320         if (_shmid > 0)
00321         {
00322             shmctl(_shmid, IPC_RMID, NULL);
00323         }
00324     }
00325     else
00326     {
00327         if (_shmid > 0)
00328         {
00329             shmdt(_shmaddr);
00330         }
00331     }
00332 
00333     if (_pmx > 0)
00334     {
00335         delete _pmx;
00336     }
00337 }
00338 
00339 void RingBuffer::put(char* data)
00340 {
00341     _pmx->lock();
00342 
00343     uint16_t dlen = strlen(data);
00344     uint16_t blen = *_length - *_end;
00345 
00346     if (*_end > *_start)
00347     {
00348         if (dlen < blen)
00349         {
00350             strncpy(_buffer + *_end, data, dlen);
00351             if (*_end - *_start == 1)
00352             { // Buffer is empty.
00353                 *_start = *_end;
00354             }
00355             *_end += dlen;
00356         }
00357         else
00358         {
00359             strncpy(_buffer + *_end, data, blen);
00360             strncpy(_buffer, data + blen, dlen - blen);
00361             if (*_end - *_start == 1)
00362             { // Buffer is empty.
00363                 *_start = *_end;
00364                 *_end = dlen - blen;
00365             }
00366             else
00367             {
00368                 *_end = dlen - blen;
00369                 *_start = *_end + 1;
00370             }
00371         }
00372     }
00373     else if (*_end == *_start)
00374     {
00375         if (dlen < blen)
00376         {
00377             strncpy(_buffer + *_end, data, dlen);
00378             *_end += dlen;
00379         }
00380         else
00381         {
00382             const char* errmsg = "RingBuffer Error: data is too long";
00383             strcpy(_buffer + *_end, errmsg);
00384             *_end += strlen(errmsg);
00385         }
00386     }
00387     else
00388     {    // *_end < *_start
00389         if (dlen < *_start - *_end)
00390         {
00391             strncpy(_buffer + *_end, data, dlen);
00392             *_end += dlen;
00393             *_start = *_end + 1;
00394         }
00395         else
00396         {
00397             if (dlen < blen)
00398             {
00399                 strncpy(_buffer + *_end, data, dlen);
00400                 *_end += dlen;
00401                 *_start = *_end + 1;
00402             }
00403             else
00404             {
00405                 strncpy(_buffer + *_end, data, blen);
00406                 strncpy(_buffer, data + blen, dlen - blen);
00407                 *_start = *_end;
00408                 *_end = dlen - blen;
00409             }
00410         }
00411     }
00412     _pmx->unlock();
00413 }
00414 
00415 int RingBuffer::get(char* buf, int length)
00416 {
00417     int len = 0;
00418     _pmx->lock();
00419 
00420     if (*_end > *_start)
00421     {
00422         if (length > *_end - *_start)
00423         {
00424             len = *_end - *_start;
00425             if (len == 1)
00426             {
00427                 len = 0;
00428             }
00429             strncpy(buf, _buffer + *_start, len);
00430             *_start = *_end - 1;
00431         }
00432         else
00433         {
00434             len = length;
00435             strncpy(buf, _buffer + *_start, len);
00436             *_start = *_start + len;
00437         }
00438     }
00439     else if (*_end < *_start)
00440     {
00441         int blen = *_length - *_start;
00442         if (length > blen)
00443         {
00444             strncpy(buf, _buffer + *_start, blen);
00445             *_start = 0;
00446             if (length - (blen + *_end) > 0)
00447             {
00448                 strncpy(buf + blen, _buffer, *_end);
00449                 len = blen + *_end;
00450                 if (*_end > 0)
00451                 {
00452                     *_start = *_end - 1;
00453                 }
00454             }
00455             else
00456             {
00457                 strncpy(buf + blen, _buffer, length - blen);
00458                 len = length;
00459                 *_start = length - blen;
00460             }
00461         }
00462         else
00463         {
00464             strncpy(buf, _buffer + *_start, length);
00465             *_start += length;
00466             len = length;
00467         }
00468     }
00469     _pmx->unlock();
00470     return len;
00471 }
00472 
00473 void RingBuffer::reset()
00474 {
00475     _pmx->lock();
00476     if ( _start && _end )
00477     {
00478         *_start = *_end = 0;
00479     }
00480     else
00481     {
00482         throw Exception(-1, "RingBuffer can't reset. need to clear shared memory.");
00483     }
00484     _pmx->unlock();
00485 }
00486 
00487 /*=====================================
00488  Class Thread
00489  =====================================*/
00490 Thread::Thread()
00491 {
00492     _threadID = 0;
00493 }
00494 
00495 Thread::~Thread()
00496 {
00497 }
00498 
00499 void* Thread::_run(void* runnable)
00500 {
00501     static_cast<Runnable*>(runnable)->EXECRUN();
00502     return 0;
00503 }
00504 
00505 void Thread::initialize(int argc, char** argv)
00506 {
00507 
00508 }
00509 
00510 pthread_t Thread::getID()
00511 {
00512     return pthread_self();
00513 }
00514 
00515 bool Thread::equals(pthread_t *t1, pthread_t *t2)
00516 {
00517     return (pthread_equal(*t1, *t2) ? false : true);
00518 }
00519 
00520 int Thread::start(void)
00521 {
00522     Runnable* runnable = this;
00523     return pthread_create(&_threadID, 0, _run, runnable);
00524 }
00525 
00526 void Thread::stopProcess(void)
00527 {
00528     theMultiTaskProcess->threadStoped();
00529 }
00530 
00531 void Thread::stop(void)
00532 {
00533     if ( _threadID )
00534     {
00535         pthread_join(_threadID, NULL);
00536         _threadID = 0;
00537     }
00538 }