Renesas GR-PEACH OpenCV Development / gr-peach-opencv-project-sd-card_update

Fork of gr-peach-opencv-project-sd-card by the do

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers parallel_pthreads.cpp Source File

parallel_pthreads.cpp

00001 /*M///////////////////////////////////////////////////////////////////////////////////////
00002 //
00003 //  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
00004 //
00005 //  By downloading, copying, installing or using the software you agree to this license.
00006 //  If you do not agree to this license, do not download, install,
00007 //  copy or use the software.
00008 //
00009 //
00010 //                           License Agreement
00011 //                For Open Source Computer Vision Library
00012 //
00013 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
00014 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
00015 // Third party copyrights are property of their respective owners.
00016 //
00017 // Redistribution and use in source and binary forms, with or without modification,
00018 // are permitted provided that the following conditions are met:
00019 //
00020 //   * Redistribution's of source code must retain the above copyright notice,
00021 //     this list of conditions and the following disclaimer.
00022 //
00023 //   * Redistribution's in binary form must reproduce the above copyright notice,
00024 //     this list of conditions and the following disclaimer in the documentation
00025 //     and/or other materials provided with the distribution.
00026 //
00027 //   * The name of the copyright holders may not be used to endorse or promote products
00028 //     derived from this software without specific prior written permission.
00029 //
00030 // This software is provided by the copyright holders and contributors "as is" and
00031 // any express or implied warranties, including, but not limited to, the implied
00032 // warranties of merchantability and fitness for a particular purpose are disclaimed.
00033 // In no event shall the Intel Corporation or contributors be liable for any direct,
00034 // indirect, incidental, special, exemplary, or consequential damages
00035 // (including, but not limited to, procurement of substitute goods or services;
00036 // loss of use, data, or profits; or business interruption) however caused
00037 // and on any theory of liability, whether in contract, strict liability,
00038 // or tort (including negligence or otherwise) arising in any way out of
00039 // the use of this software, even if advised of the possibility of such damage.
00040 //
00041 //M*/
00042 
00043 #include "precomp.hpp"
00044 
00045 #ifdef HAVE_PTHREADS_PF
00046 
00047 #include <algorithm>
00048 #include <pthread.h>
00049 
00050 namespace cv
00051 {
00052 
00053 class ThreadManager;
00054 
00055 enum ForThreadState
00056 {
00057     eFTNotStarted = 0,
00058     eFTStarted = 1,
00059     eFTToStop = 2,
00060     eFTStoped = 3
00061 };
00062 
00063 enum ThreadManagerPoolState
00064 {
00065     eTMNotInited = 0,
00066     eTMFailedToInit = 1,
00067     eTMInited = 2,
00068     eTMSingleThreaded = 3
00069 };
00070 
00071 struct work_load
00072 {
00073     work_load()
00074     {
00075         clear();
00076     }
00077 
00078     work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
00079     {
00080         set(range, body, nstripes);
00081     }
00082 
00083     void set(const cv::Range& range, const cv::ParallelLoopBody& body, unsigned int nstripes)
00084     {
00085         m_body = &body;
00086         m_range = &range;
00087 
00088         //ensure that nstripes not larger than range length
00089         m_nstripes = std::min( unsigned(m_range->end - m_range->start) , nstripes);
00090 
00091         m_block_size = ((m_range->end - m_range->start - 1)/m_nstripes) + 1;
00092 
00093         //ensure that nstripes not larger than blocks count, so we would never go out of range
00094         m_nstripes = std::min(m_nstripes, unsigned(((m_range->end - m_range->start - 1)/m_block_size) + 1) );
00095     }
00096 
00097     const cv::ParallelLoopBody* m_body;
00098     const cv::Range*            m_range;
00099     unsigned int                         m_nstripes;
00100     int                m_block_size;
00101 
00102     void clear()
00103     {
00104         m_body = 0;
00105         m_range = 0;
00106         m_nstripes = 0;
00107         m_block_size = 0;
00108     }
00109 };
00110 
00111 class ForThread
00112 {
00113 public:
00114 
00115     ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0)
00116     {
00117     }
00118 
00119     //called from manager thread
00120     bool init(size_t id, ThreadManager* parent);
00121 
00122     //called from manager thread
00123     void run();
00124 
00125     //called from manager thread
00126     void stop();
00127 
00128     ~ForThread();
00129 
00130 private:
00131 
00132     //called from worker thread
00133     static void* thread_loop_wrapper(void* thread_object);
00134 
00135     //called from worker thread
00136     void execute();
00137 
00138     //called from worker thread
00139     void thread_body();
00140 
00141     pthread_t       m_posix_thread;
00142     pthread_mutex_t m_thread_mutex;
00143     pthread_cond_t  m_cond_thread_task;
00144     bool            m_task_start;
00145 
00146     ThreadManager*  m_parent;
00147     ForThreadState  m_state;
00148     size_t          m_id;
00149 };
00150 
00151 class ThreadManager
00152 {
00153 public:
00154     friend class ForThread;
00155 
00156     static ThreadManager& instance()
00157     {
00158         CV_SINGLETON_LAZY_INIT_REF(ThreadManager, new ThreadManager())
00159     }
00160 
00161     static void stop()
00162     {
00163         ThreadManager& manager = instance();
00164 
00165         if(manager.m_pool_state == eTMInited)
00166         {
00167             for(size_t i = 0; i < manager.m_num_threads; ++i)
00168             {
00169                 manager.m_threads[i].stop();
00170             }
00171         }
00172 
00173         manager.m_pool_state = eTMNotInited;
00174     }
00175 
00176     void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
00177 
00178     size_t getNumOfThreads();
00179 
00180     void setNumOfThreads(size_t n);
00181 
00182 private:
00183 
00184     ThreadManager();
00185 
00186     ~ThreadManager();
00187 
00188     void wait_complete();
00189 
00190     void notify_complete();
00191 
00192     bool initPool();
00193 
00194     size_t defaultNumberOfThreads();
00195 
00196     std::vector<ForThread> m_threads;
00197     size_t m_num_threads;
00198 
00199     pthread_mutex_t m_manager_task_mutex;
00200     pthread_cond_t  m_cond_thread_task_complete;
00201     bool            m_task_complete;
00202 
00203     unsigned int m_task_position;
00204     unsigned int m_num_of_completed_tasks;
00205 
00206     pthread_mutex_t m_manager_access_mutex;
00207 
00208     static const char m_env_name[];
00209     static const unsigned int m_default_number_of_threads;
00210 
00211     work_load m_work_load;
00212 
00213     struct work_thread_t
00214     {
00215         work_thread_t(): value(false) { }
00216         bool value;
00217     };
00218 
00219     cv::TLSData<work_thread_t> m_is_work_thread;
00220 
00221     ThreadManagerPoolState m_pool_state;
00222 };
00223 
00224 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM";
00225 
00226 #ifdef ANDROID
00227 // many modern phones/tables have 4-core CPUs. Let's use no more
00228 // than 2 threads by default not to overheat the devices
00229 const unsigned int ThreadManager::m_default_number_of_threads = 2;
00230 #else
00231 const unsigned int ThreadManager::m_default_number_of_threads = 8;
00232 #endif
00233 
00234 ForThread::~ForThread()
00235 {
00236     if(m_state == eFTStarted)
00237     {
00238         stop();
00239 
00240         pthread_mutex_destroy(&m_thread_mutex);
00241 
00242         pthread_cond_destroy(&m_cond_thread_task);
00243     }
00244 }
00245 
00246 bool ForThread::init(size_t id, ThreadManager* parent)
00247 {
00248     m_id = id;
00249 
00250     m_parent = parent;
00251 
00252     int res = 0;
00253 
00254     res |= pthread_mutex_init(&m_thread_mutex, NULL);
00255 
00256     res |= pthread_cond_init(&m_cond_thread_task, NULL);
00257 
00258     if(!res)
00259     {
00260         res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this);
00261     }
00262 
00263 
00264     return res == 0;
00265 }
00266 
00267 void ForThread::stop()
00268 {
00269     if(m_state == eFTStarted)
00270     {
00271         pthread_mutex_lock(&m_thread_mutex);
00272         m_state = eFTToStop;
00273         pthread_mutex_unlock(&m_thread_mutex);
00274 
00275         run();
00276 
00277         pthread_join(m_posix_thread, NULL);
00278     }
00279 
00280     pthread_mutex_lock(&m_thread_mutex);
00281     m_state = eFTStoped;
00282     pthread_mutex_unlock(&m_thread_mutex);
00283 }
00284 
00285 void ForThread::run()
00286 {
00287     pthread_mutex_lock(&m_thread_mutex);
00288 
00289     m_task_start = true;
00290 
00291     pthread_cond_signal(&m_cond_thread_task);
00292 
00293     pthread_mutex_unlock(&m_thread_mutex);
00294 }
00295 
00296 void* ForThread::thread_loop_wrapper(void* thread_object)
00297 {
00298     ((ForThread*)thread_object)->thread_body();
00299     return 0;
00300 }
00301 
00302 void ForThread::execute()
00303 {
00304     unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
00305 
00306     work_load& load = m_parent->m_work_load;
00307 
00308     while(m_current_pos < load.m_nstripes)
00309     {
00310         int start = load.m_range->start + m_current_pos*load.m_block_size;
00311         int end = std::min(start + load.m_block_size, load.m_range->end);
00312 
00313         load.m_body->operator()(cv::Range(start, end));
00314 
00315         m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
00316     }
00317 }
00318 
00319 void ForThread::thread_body()
00320 {
00321     m_parent->m_is_work_thread.get()->value = true;
00322 
00323     pthread_mutex_lock(&m_thread_mutex);
00324 
00325     m_state = eFTStarted;
00326 
00327     while(m_state == eFTStarted)
00328     {
00329         //to handle spurious wakeups
00330         while( !m_task_start && m_state != eFTToStop )
00331             pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex);
00332 
00333         if(m_state == eFTStarted)
00334         {
00335             execute();
00336 
00337             m_task_start = false;
00338 
00339             m_parent->notify_complete();
00340         }
00341     }
00342 
00343     pthread_mutex_unlock(&m_thread_mutex);
00344 }
00345 
00346 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited)
00347 {
00348     int res = 0;
00349 
00350     pthread_mutexattr_t attr;
00351     pthread_mutexattr_init(&attr);
00352     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
00353     res |= pthread_mutex_init(&m_manager_access_mutex, &attr);
00354     pthread_mutexattr_destroy(&attr);
00355 
00356     res |= pthread_mutex_init(&m_manager_task_mutex, NULL);
00357 
00358     res |= pthread_cond_init(&m_cond_thread_task_complete, NULL);
00359 
00360     if(!res)
00361     {
00362         setNumOfThreads(defaultNumberOfThreads());
00363 
00364         m_task_position = 0;
00365     }
00366     else
00367     {
00368         m_num_threads = 1;
00369         m_pool_state = eTMFailedToInit;
00370         m_task_position = 0;
00371 
00372         //print error;
00373     }
00374 }
00375 
00376 ThreadManager::~ThreadManager()
00377 {
00378     stop();
00379 
00380     pthread_mutex_destroy(&m_manager_task_mutex);
00381 
00382     pthread_cond_destroy(&m_cond_thread_task_complete);
00383 
00384     pthread_mutex_destroy(&m_manager_access_mutex);
00385 }
00386 
00387 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
00388 {
00389     bool is_work_thread = m_is_work_thread.get()->value;
00390 
00391     if( (getNumOfThreads() > 1) && !is_work_thread &&
00392         (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) )
00393     {
00394         int res = pthread_mutex_trylock(&m_manager_access_mutex);
00395 
00396         if(!res)
00397         {
00398             if(initPool())
00399             {
00400                 if(nstripes < 1) nstripes = 4*m_threads.size();
00401 
00402                 double max_stripes = 4*m_threads.size();
00403 
00404                 nstripes = std::min(nstripes, max_stripes);
00405 
00406                 pthread_mutex_lock(&m_manager_task_mutex);
00407 
00408                 m_num_of_completed_tasks = 0;
00409 
00410                 m_task_position = 0;
00411 
00412                 m_task_complete = false;
00413 
00414                 m_work_load.set(range, body, cvCeil(nstripes));
00415 
00416                 for(size_t i = 0; i < m_threads.size(); ++i)
00417                 {
00418                     m_threads[i].run();
00419                 }
00420 
00421                 wait_complete();
00422             }
00423             else
00424             {
00425                 //print error
00426                 body(range);
00427             }
00428         }
00429         else
00430         {
00431             body(range);
00432         }
00433     }
00434     else
00435     {
00436         body(range);
00437     }
00438 }
00439 
00440 void ThreadManager::wait_complete()
00441 {
00442     //to handle spurious wakeups
00443     while(!m_task_complete)
00444         pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex);
00445 
00446     pthread_mutex_unlock(&m_manager_task_mutex);
00447 
00448     pthread_mutex_unlock(&m_manager_access_mutex);
00449 }
00450 
00451 void ThreadManager::notify_complete()
00452 {
00453 
00454     unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1);
00455 
00456     if(comp == (m_num_threads - 1))
00457     {
00458         pthread_mutex_lock(&m_manager_task_mutex);
00459 
00460         m_task_complete = true;
00461 
00462         pthread_cond_signal(&m_cond_thread_task_complete);
00463 
00464         pthread_mutex_unlock(&m_manager_task_mutex);
00465     }
00466 }
00467 
00468 bool ThreadManager::initPool()
00469 {
00470     if(m_pool_state != eTMNotInited || m_num_threads == 1)
00471         return true;
00472 
00473     m_threads.resize(m_num_threads);
00474 
00475     bool res = true;
00476 
00477     for(size_t i = 0; i < m_threads.size(); ++i)
00478     {
00479         res |= m_threads[i].init(i, this);
00480     }
00481 
00482     if(res)
00483     {
00484         m_pool_state = eTMInited;
00485     }
00486     else
00487     {
00488         //TODO: join threads?
00489         m_pool_state = eTMFailedToInit;
00490     }
00491 
00492     return res;
00493 }
00494 
00495 size_t ThreadManager::getNumOfThreads()
00496 {
00497     return m_num_threads;
00498 }
00499 
00500 void ThreadManager::setNumOfThreads(size_t n)
00501 {
00502     int res = pthread_mutex_lock(&m_manager_access_mutex);
00503 
00504     if(!res)
00505     {
00506         if(n == 0)
00507         {
00508             n = defaultNumberOfThreads();
00509         }
00510 
00511         if(n != m_num_threads && m_pool_state != eTMFailedToInit)
00512         {
00513             if(m_pool_state == eTMInited)
00514             {
00515                 stop();
00516                 m_threads.clear();
00517             }
00518 
00519             m_num_threads = n;
00520 
00521             if(m_num_threads == 1)
00522             {
00523                 m_pool_state = eTMSingleThreaded;
00524             }
00525             else
00526             {
00527                 m_pool_state = eTMNotInited;
00528             }
00529         }
00530 
00531         pthread_mutex_unlock(&m_manager_access_mutex);
00532     }
00533 }
00534 
00535 size_t ThreadManager::defaultNumberOfThreads()
00536 {
00537     unsigned int result = m_default_number_of_threads;
00538 
00539     char * env = getenv(m_env_name);
00540 
00541     if(env != NULL)
00542     {
00543         sscanf(env, "%u", &result);
00544 
00545         result = std::max(1u, result);
00546         //do we need upper limit of threads number?
00547     }
00548 
00549     return result;
00550 }
00551 
00552 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
00553 size_t parallel_pthreads_get_threads_num();
00554 void parallel_pthreads_set_threads_num(int num);
00555 
00556 size_t parallel_pthreads_get_threads_num()
00557 {
00558     return ThreadManager::instance().getNumOfThreads();
00559 }
00560 
00561 void parallel_pthreads_set_threads_num(int num)
00562 {
00563     if(num < 0)
00564     {
00565         ThreadManager::instance().setNumOfThreads(0);
00566     }
00567     else
00568     {
00569         ThreadManager::instance().setNumOfThreads(size_t(num));
00570     }
00571 }
00572 
00573 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
00574 {
00575     ThreadManager::instance().run(range, body, nstripes);
00576 }
00577 
00578 }
00579 
00580 #endif
00581