Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of gr-peach-opencv-project-sd-card by
parallel_pthreads.cpp
00001 /*M/////////////////////////////////////////////////////////////////////////////////////// 00002 // 00003 // IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. 00004 // 00005 // By downloading, copying, installing or using the software you agree to this license. 00006 // If you do not agree to this license, do not download, install, 00007 // copy or use the software. 00008 // 00009 // 00010 // License Agreement 00011 // For Open Source Computer Vision Library 00012 // 00013 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved. 00014 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved. 00015 // Third party copyrights are property of their respective owners. 00016 // 00017 // Redistribution and use in source and binary forms, with or without modification, 00018 // are permitted provided that the following conditions are met: 00019 // 00020 // * Redistribution's of source code must retain the above copyright notice, 00021 // this list of conditions and the following disclaimer. 00022 // 00023 // * Redistribution's in binary form must reproduce the above copyright notice, 00024 // this list of conditions and the following disclaimer in the documentation 00025 // and/or other materials provided with the distribution. 00026 // 00027 // * The name of the copyright holders may not be used to endorse or promote products 00028 // derived from this software without specific prior written permission. 00029 // 00030 // This software is provided by the copyright holders and contributors "as is" and 00031 // any express or implied warranties, including, but not limited to, the implied 00032 // warranties of merchantability and fitness for a particular purpose are disclaimed. 00033 // In no event shall the Intel Corporation or contributors be liable for any direct, 00034 // indirect, incidental, special, exemplary, or consequential damages 00035 // (including, but not limited to, procurement of substitute goods or services; 00036 // loss of use, data, or profits; or business interruption) however caused 00037 // and on any theory of liability, whether in contract, strict liability, 00038 // or tort (including negligence or otherwise) arising in any way out of 00039 // the use of this software, even if advised of the possibility of such damage. 00040 // 00041 //M*/ 00042 00043 #include "precomp.hpp" 00044 00045 #ifdef HAVE_PTHREADS_PF 00046 00047 #include <algorithm> 00048 #include <pthread.h> 00049 00050 namespace cv 00051 { 00052 00053 class ThreadManager; 00054 00055 enum ForThreadState 00056 { 00057 eFTNotStarted = 0, 00058 eFTStarted = 1, 00059 eFTToStop = 2, 00060 eFTStoped = 3 00061 }; 00062 00063 enum ThreadManagerPoolState 00064 { 00065 eTMNotInited = 0, 00066 eTMFailedToInit = 1, 00067 eTMInited = 2, 00068 eTMSingleThreaded = 3 00069 }; 00070 00071 struct work_load 00072 { 00073 work_load() 00074 { 00075 clear(); 00076 } 00077 00078 work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) 00079 { 00080 set(range, body, nstripes); 00081 } 00082 00083 void set(const cv::Range& range, const cv::ParallelLoopBody& body, unsigned int nstripes) 00084 { 00085 m_body = &body; 00086 m_range = ⦥ 00087 00088 //ensure that nstripes not larger than range length 00089 m_nstripes = std::min( unsigned(m_range->end - m_range->start) , nstripes); 00090 00091 m_block_size = ((m_range->end - m_range->start - 1)/m_nstripes) + 1; 00092 00093 //ensure that nstripes not larger than blocks count, so we would never go out of range 00094 m_nstripes = std::min(m_nstripes, unsigned(((m_range->end - m_range->start - 1)/m_block_size) + 1) ); 00095 } 00096 00097 const cv::ParallelLoopBody* m_body; 00098 const cv::Range* m_range; 00099 unsigned int m_nstripes; 00100 int m_block_size; 00101 00102 void clear() 00103 { 00104 m_body = 0; 00105 m_range = 0; 00106 m_nstripes = 0; 00107 m_block_size = 0; 00108 } 00109 }; 00110 00111 class ForThread 00112 { 00113 public: 00114 00115 ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0) 00116 { 00117 } 00118 00119 //called from manager thread 00120 bool init(size_t id, ThreadManager* parent); 00121 00122 //called from manager thread 00123 void run(); 00124 00125 //called from manager thread 00126 void stop(); 00127 00128 ~ForThread(); 00129 00130 private: 00131 00132 //called from worker thread 00133 static void* thread_loop_wrapper(void* thread_object); 00134 00135 //called from worker thread 00136 void execute(); 00137 00138 //called from worker thread 00139 void thread_body(); 00140 00141 pthread_t m_posix_thread; 00142 pthread_mutex_t m_thread_mutex; 00143 pthread_cond_t m_cond_thread_task; 00144 bool m_task_start; 00145 00146 ThreadManager* m_parent; 00147 ForThreadState m_state; 00148 size_t m_id; 00149 }; 00150 00151 class ThreadManager 00152 { 00153 public: 00154 friend class ForThread; 00155 00156 static ThreadManager& instance() 00157 { 00158 CV_SINGLETON_LAZY_INIT_REF(ThreadManager, new ThreadManager()) 00159 } 00160 00161 static void stop() 00162 { 00163 ThreadManager& manager = instance(); 00164 00165 if(manager.m_pool_state == eTMInited) 00166 { 00167 for(size_t i = 0; i < manager.m_num_threads; ++i) 00168 { 00169 manager.m_threads[i].stop(); 00170 } 00171 } 00172 00173 manager.m_pool_state = eTMNotInited; 00174 } 00175 00176 void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); 00177 00178 size_t getNumOfThreads(); 00179 00180 void setNumOfThreads(size_t n); 00181 00182 private: 00183 00184 ThreadManager(); 00185 00186 ~ThreadManager(); 00187 00188 void wait_complete(); 00189 00190 void notify_complete(); 00191 00192 bool initPool(); 00193 00194 size_t defaultNumberOfThreads(); 00195 00196 std::vector<ForThread> m_threads; 00197 size_t m_num_threads; 00198 00199 pthread_mutex_t m_manager_task_mutex; 00200 pthread_cond_t m_cond_thread_task_complete; 00201 bool m_task_complete; 00202 00203 unsigned int m_task_position; 00204 unsigned int m_num_of_completed_tasks; 00205 00206 pthread_mutex_t m_manager_access_mutex; 00207 00208 static const char m_env_name[]; 00209 static const unsigned int m_default_number_of_threads; 00210 00211 work_load m_work_load; 00212 00213 struct work_thread_t 00214 { 00215 work_thread_t(): value(false) { } 00216 bool value; 00217 }; 00218 00219 cv::TLSData<work_thread_t> m_is_work_thread; 00220 00221 ThreadManagerPoolState m_pool_state; 00222 }; 00223 00224 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM"; 00225 00226 #ifdef ANDROID 00227 // many modern phones/tables have 4-core CPUs. Let's use no more 00228 // than 2 threads by default not to overheat the devices 00229 const unsigned int ThreadManager::m_default_number_of_threads = 2; 00230 #else 00231 const unsigned int ThreadManager::m_default_number_of_threads = 8; 00232 #endif 00233 00234 ForThread::~ForThread() 00235 { 00236 if(m_state == eFTStarted) 00237 { 00238 stop(); 00239 00240 pthread_mutex_destroy(&m_thread_mutex); 00241 00242 pthread_cond_destroy(&m_cond_thread_task); 00243 } 00244 } 00245 00246 bool ForThread::init(size_t id, ThreadManager* parent) 00247 { 00248 m_id = id; 00249 00250 m_parent = parent; 00251 00252 int res = 0; 00253 00254 res |= pthread_mutex_init(&m_thread_mutex, NULL); 00255 00256 res |= pthread_cond_init(&m_cond_thread_task, NULL); 00257 00258 if(!res) 00259 { 00260 res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this); 00261 } 00262 00263 00264 return res == 0; 00265 } 00266 00267 void ForThread::stop() 00268 { 00269 if(m_state == eFTStarted) 00270 { 00271 pthread_mutex_lock(&m_thread_mutex); 00272 m_state = eFTToStop; 00273 pthread_mutex_unlock(&m_thread_mutex); 00274 00275 run(); 00276 00277 pthread_join(m_posix_thread, NULL); 00278 } 00279 00280 pthread_mutex_lock(&m_thread_mutex); 00281 m_state = eFTStoped; 00282 pthread_mutex_unlock(&m_thread_mutex); 00283 } 00284 00285 void ForThread::run() 00286 { 00287 pthread_mutex_lock(&m_thread_mutex); 00288 00289 m_task_start = true; 00290 00291 pthread_cond_signal(&m_cond_thread_task); 00292 00293 pthread_mutex_unlock(&m_thread_mutex); 00294 } 00295 00296 void* ForThread::thread_loop_wrapper(void* thread_object) 00297 { 00298 ((ForThread*)thread_object)->thread_body(); 00299 return 0; 00300 } 00301 00302 void ForThread::execute() 00303 { 00304 unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1); 00305 00306 work_load& load = m_parent->m_work_load; 00307 00308 while(m_current_pos < load.m_nstripes) 00309 { 00310 int start = load.m_range->start + m_current_pos*load.m_block_size; 00311 int end = std::min(start + load.m_block_size, load.m_range->end); 00312 00313 load.m_body->operator()(cv::Range(start, end)); 00314 00315 m_current_pos = CV_XADD(&m_parent->m_task_position, 1); 00316 } 00317 } 00318 00319 void ForThread::thread_body() 00320 { 00321 m_parent->m_is_work_thread.get()->value = true; 00322 00323 pthread_mutex_lock(&m_thread_mutex); 00324 00325 m_state = eFTStarted; 00326 00327 while(m_state == eFTStarted) 00328 { 00329 //to handle spurious wakeups 00330 while( !m_task_start && m_state != eFTToStop ) 00331 pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex); 00332 00333 if(m_state == eFTStarted) 00334 { 00335 execute(); 00336 00337 m_task_start = false; 00338 00339 m_parent->notify_complete(); 00340 } 00341 } 00342 00343 pthread_mutex_unlock(&m_thread_mutex); 00344 } 00345 00346 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited) 00347 { 00348 int res = 0; 00349 00350 pthread_mutexattr_t attr; 00351 pthread_mutexattr_init(&attr); 00352 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); 00353 res |= pthread_mutex_init(&m_manager_access_mutex, &attr); 00354 pthread_mutexattr_destroy(&attr); 00355 00356 res |= pthread_mutex_init(&m_manager_task_mutex, NULL); 00357 00358 res |= pthread_cond_init(&m_cond_thread_task_complete, NULL); 00359 00360 if(!res) 00361 { 00362 setNumOfThreads(defaultNumberOfThreads()); 00363 00364 m_task_position = 0; 00365 } 00366 else 00367 { 00368 m_num_threads = 1; 00369 m_pool_state = eTMFailedToInit; 00370 m_task_position = 0; 00371 00372 //print error; 00373 } 00374 } 00375 00376 ThreadManager::~ThreadManager() 00377 { 00378 stop(); 00379 00380 pthread_mutex_destroy(&m_manager_task_mutex); 00381 00382 pthread_cond_destroy(&m_cond_thread_task_complete); 00383 00384 pthread_mutex_destroy(&m_manager_access_mutex); 00385 } 00386 00387 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) 00388 { 00389 bool is_work_thread = m_is_work_thread.get()->value; 00390 00391 if( (getNumOfThreads() > 1) && !is_work_thread && 00392 (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) ) 00393 { 00394 int res = pthread_mutex_trylock(&m_manager_access_mutex); 00395 00396 if(!res) 00397 { 00398 if(initPool()) 00399 { 00400 if(nstripes < 1) nstripes = 4*m_threads.size(); 00401 00402 double max_stripes = 4*m_threads.size(); 00403 00404 nstripes = std::min(nstripes, max_stripes); 00405 00406 pthread_mutex_lock(&m_manager_task_mutex); 00407 00408 m_num_of_completed_tasks = 0; 00409 00410 m_task_position = 0; 00411 00412 m_task_complete = false; 00413 00414 m_work_load.set(range, body, cvCeil(nstripes)); 00415 00416 for(size_t i = 0; i < m_threads.size(); ++i) 00417 { 00418 m_threads[i].run(); 00419 } 00420 00421 wait_complete(); 00422 } 00423 else 00424 { 00425 //print error 00426 body(range); 00427 } 00428 } 00429 else 00430 { 00431 body(range); 00432 } 00433 } 00434 else 00435 { 00436 body(range); 00437 } 00438 } 00439 00440 void ThreadManager::wait_complete() 00441 { 00442 //to handle spurious wakeups 00443 while(!m_task_complete) 00444 pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex); 00445 00446 pthread_mutex_unlock(&m_manager_task_mutex); 00447 00448 pthread_mutex_unlock(&m_manager_access_mutex); 00449 } 00450 00451 void ThreadManager::notify_complete() 00452 { 00453 00454 unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1); 00455 00456 if(comp == (m_num_threads - 1)) 00457 { 00458 pthread_mutex_lock(&m_manager_task_mutex); 00459 00460 m_task_complete = true; 00461 00462 pthread_cond_signal(&m_cond_thread_task_complete); 00463 00464 pthread_mutex_unlock(&m_manager_task_mutex); 00465 } 00466 } 00467 00468 bool ThreadManager::initPool() 00469 { 00470 if(m_pool_state != eTMNotInited || m_num_threads == 1) 00471 return true; 00472 00473 m_threads.resize(m_num_threads); 00474 00475 bool res = true; 00476 00477 for(size_t i = 0; i < m_threads.size(); ++i) 00478 { 00479 res |= m_threads[i].init(i, this); 00480 } 00481 00482 if(res) 00483 { 00484 m_pool_state = eTMInited; 00485 } 00486 else 00487 { 00488 //TODO: join threads? 00489 m_pool_state = eTMFailedToInit; 00490 } 00491 00492 return res; 00493 } 00494 00495 size_t ThreadManager::getNumOfThreads() 00496 { 00497 return m_num_threads; 00498 } 00499 00500 void ThreadManager::setNumOfThreads(size_t n) 00501 { 00502 int res = pthread_mutex_lock(&m_manager_access_mutex); 00503 00504 if(!res) 00505 { 00506 if(n == 0) 00507 { 00508 n = defaultNumberOfThreads(); 00509 } 00510 00511 if(n != m_num_threads && m_pool_state != eTMFailedToInit) 00512 { 00513 if(m_pool_state == eTMInited) 00514 { 00515 stop(); 00516 m_threads.clear(); 00517 } 00518 00519 m_num_threads = n; 00520 00521 if(m_num_threads == 1) 00522 { 00523 m_pool_state = eTMSingleThreaded; 00524 } 00525 else 00526 { 00527 m_pool_state = eTMNotInited; 00528 } 00529 } 00530 00531 pthread_mutex_unlock(&m_manager_access_mutex); 00532 } 00533 } 00534 00535 size_t ThreadManager::defaultNumberOfThreads() 00536 { 00537 unsigned int result = m_default_number_of_threads; 00538 00539 char * env = getenv(m_env_name); 00540 00541 if(env != NULL) 00542 { 00543 sscanf(env, "%u", &result); 00544 00545 result = std::max(1u, result); 00546 //do we need upper limit of threads number? 00547 } 00548 00549 return result; 00550 } 00551 00552 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); 00553 size_t parallel_pthreads_get_threads_num(); 00554 void parallel_pthreads_set_threads_num(int num); 00555 00556 size_t parallel_pthreads_get_threads_num() 00557 { 00558 return ThreadManager::instance().getNumOfThreads(); 00559 } 00560 00561 void parallel_pthreads_set_threads_num(int num) 00562 { 00563 if(num < 0) 00564 { 00565 ThreadManager::instance().setNumOfThreads(0); 00566 } 00567 else 00568 { 00569 ThreadManager::instance().setNumOfThreads(size_t(num)); 00570 } 00571 } 00572 00573 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) 00574 { 00575 ThreadManager::instance().run(range, body, nstripes); 00576 } 00577 00578 } 00579 00580 #endif 00581
Generated on Tue Jul 12 2022 14:47:31 by
1.7.2
