threads_pthread.h
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2018 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 #ifndef LIBMESH_THREADS_PTHREAD_H
19 #define LIBMESH_THREADS_PTHREAD_H
20 
21 // Do not try to #include this header directly, it is designed to be
22 // #included directly by threads.h
23 #ifndef LIBMESH_SQUASH_HEADER_WARNING
24 # warning "This file is designed to be included through libmesh/threads.h"
25 #else
26 
27 #ifdef LIBMESH_HAVE_PTHREAD
28 
29 // C++ includes
30 #ifdef LIBMESH_HAVE_CXX11_THREAD
31 # include <thread>
32 #endif
33 
35 #include <pthread.h>
36 #include <algorithm>
37 #include <vector>
38 
39 #ifdef __APPLE__
40 # ifdef __MAC_10_12
41 # include <os/lock.h>
42 #else
43 # include <libkern/OSAtomic.h>
44 # endif
45 #endif
46 
47 // Thread-Local-Storage macros
48 #ifdef LIBMESH_HAVE_CXX11_THREAD
49 # define LIBMESH_TLS_TYPE(type) thread_local type
50 # define LIBMESH_TLS_REF(value) (value)
51 #else // Maybe support gcc __thread eventually?
52 # define LIBMESH_TLS_TYPE(type) type
53 # define LIBMESH_TLS_REF(value) (value)
54 #endif
55 
56 namespace libMesh
57 {
58 
59 namespace Threads
60 {
61 
62 
63 #ifdef LIBMESH_HAVE_CXX11_THREAD
64 
67 typedef std::thread Thread;
68 
69 #else
70 
74 typedef NonConcurrentThread Thread;
75 
76 #endif // LIBMESH_HAVE_CXX11_THREAD
77 
78 
83 #ifdef __APPLE__
84 #ifdef __MAC_10_12
85 class spin_mutex
86 {
87 public:
88  spin_mutex() { ulock = OS_UNFAIR_LOCK_INIT; }
90 
91  void lock () { os_unfair_lock_lock(&ulock); }
92  void unlock () { os_unfair_lock_unlock(&ulock); }
93 
94  class scoped_lock
95  {
96  public:
97  scoped_lock () : smutex(nullptr) {}
98  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
99 
101 
102  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
103  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
104 
105  private:
107  };
108 
109 private:
110  os_unfair_lock ulock;
111 };
112 #else
113 class spin_mutex
114 {
115 public:
116  spin_mutex() : slock(0) {} // The convention is that the lock being zero is _unlocked_
118 
119  void lock () { OSSpinLockLock(&slock); }
120  void unlock () { OSSpinLockUnlock(&slock); }
121 
122  class scoped_lock
123  {
124  public:
125  scoped_lock () : smutex(nullptr) {}
126  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
127 
129 
130  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
131  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
132 
133  private:
134  spin_mutex * smutex;
135  };
136 
137 private:
138  OSSpinLock slock;
139 };
140 #endif
141 #else
142 class spin_mutex
143 {
144 public:
145  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
146  spin_mutex() { pthread_spin_init(&slock, PTHREAD_PROCESS_PRIVATE); }
147  ~spin_mutex() { pthread_spin_destroy(&slock); }
148 
149  void lock () { pthread_spin_lock(&slock); }
150  void unlock () { pthread_spin_unlock(&slock); }
151 
152  class scoped_lock
153  {
154  public:
155  scoped_lock () : smutex(nullptr) {}
156  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
157 
159 
160  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
161  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
162 
163  private:
164  spin_mutex * smutex;
165  };
166 
167 private:
168  pthread_spinlock_t slock;
169 };
170 #endif // __APPLE__
171 
172 
173 
178 class recursive_mutex
179 {
180 public:
181  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
183  {
184  pthread_mutexattr_init(&attr);
185  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
186  pthread_mutex_init(&mutex, &attr);
187  }
188  ~recursive_mutex() { pthread_mutex_destroy(&mutex); }
189 
190  void lock () { pthread_mutex_lock(&mutex); }
191  void unlock () { pthread_mutex_unlock(&mutex); }
192 
193  class scoped_lock
194  {
195  public:
196  scoped_lock () : rmutex(nullptr) {}
197  explicit scoped_lock ( recursive_mutex & in_rmutex ) : rmutex(&in_rmutex) { rmutex->lock(); }
198 
200 
201  void acquire ( recursive_mutex & in_rmutex ) { rmutex = &in_rmutex; rmutex->lock(); }
202  void release () { if (rmutex) rmutex->unlock(); rmutex = nullptr; }
203 
204  private:
206  };
207 
208 private:
209  pthread_mutex_t mutex;
210  pthread_mutexattr_t attr;
211 };
212 
213 template <typename Range>
214 unsigned int num_pthreads(Range & range)
215 {
216  std::size_t min = std::min((std::size_t)libMesh::n_threads(), range.size());
217  return min > 0 ? cast_int<unsigned int>(min) : 1;
218 }
219 
220 template <typename Range, typename Body>
222 {
223 public:
224  Range * range;
225  Body * body;
226 };
227 
228 template <typename Range, typename Body>
229 void * run_body(void * args)
230 {
231  RangeBody<Range, Body> * range_body = (RangeBody<Range, Body> *)args;
232 
233  Body & body = *range_body->body;
234  Range & range = *range_body->range;
235 
236  body(range);
237 
238  return nullptr;
239 }
240 
245 {
246 public:
247  static const int automatic = -1;
248  explicit task_scheduler_init (int = automatic) {}
249  void initialize (int = automatic) {}
250  void terminate () {}
251 };
252 
253 //-------------------------------------------------------------------
258 class split {};
259 
260 
261 
262 
263 //-------------------------------------------------------------------
268 template <typename Range, typename Body>
269 inline
270 void parallel_for (const Range & range, const Body & body)
271 {
272  Threads::BoolAcquire b(Threads::in_threads);
273 
274  // If we're running in serial - just run!
275  if (libMesh::n_threads() == 1)
276  {
277  body(range);
278  return;
279  }
280 
281 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
282  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
283 
284  if (libMesh::n_threads() > 1)
286 #endif
287  unsigned int n_threads = num_pthreads(range);
288 
289  std::vector<Range *> ranges(n_threads);
290  std::vector<RangeBody<const Range, const Body>> range_bodies(n_threads);
291  std::vector<pthread_t> threads(n_threads);
292 
293  // Create the ranges for each thread
294  std::size_t range_size = range.size() / n_threads;
295 
296  typename Range::const_iterator current_beginning = range.begin();
297 
298  for (unsigned int i=0; i<n_threads; i++)
299  {
300  std::size_t this_range_size = range_size;
301 
302  if (i+1 == n_threads)
303  this_range_size += range.size() % n_threads; // Give the last one the remaining work to do
304 
305  ranges[i] = new Range(range, current_beginning, current_beginning + this_range_size);
306 
307  current_beginning = current_beginning + this_range_size;
308  }
309 
310  // Create the RangeBody arguments
311  for (unsigned int i=0; i<n_threads; i++)
312  {
313  range_bodies[i].range = ranges[i];
314  range_bodies[i].body = &body;
315  }
316 
317  // Create the threads. It may seem redundant to wrap a pragma in
318  // #ifdefs... but GCC warns about an "unknown pragma" if it
319  // encounters this line of code when -fopenmp is not passed to the
320  // compiler.
321 #ifdef LIBMESH_HAVE_OPENMP
322 #pragma omp parallel for schedule (static)
323 #endif
324  for (unsigned int i=0; i<n_threads; i++)
325  {
326 #if !LIBMESH_HAVE_OPENMP
327  pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
328 #else
329  run_body<Range, Body>((void *)&range_bodies[i]);
330 #endif
331  }
332 
333 #if !LIBMESH_HAVE_OPENMP
334  // Wait for them to finish
335 
336  // The use of 'int' instead of unsigned for the iteration variable
337  // is deliberate here. This is an OpenMP loop, and some older
338  // compilers warn when you don't use int for the loop index. The
339  // reason has to do with signed vs. unsigned integer overflow
340  // behavior and optimization.
341  // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
342  for (int i=0; i<static_cast<int>(n_threads); i++)
343  pthread_join(threads[i], nullptr);
344 #endif
345 
346  // Clean up
347  for (unsigned int i=0; i<n_threads; i++)
348  delete ranges[i];
349 
350 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
351  if (libMesh::n_threads() > 1 && logging_was_enabled)
352  libMesh::perflog.enable_logging();
353 #endif
354 }
355 
360 template <typename Range, typename Body, typename Partitioner>
361 inline
362 void parallel_for (const Range & range, const Body & body, const Partitioner &)
363 {
364  parallel_for (range, body);
365 }
366 
371 template <typename Range, typename Body>
372 inline
373 void parallel_reduce (const Range & range, Body & body)
374 {
375  Threads::BoolAcquire b(Threads::in_threads);
376 
377  // If we're running in serial - just run!
378  if (libMesh::n_threads() == 1)
379  {
380  body(range);
381  return;
382  }
383 
384 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
385  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
386 
387  if (libMesh::n_threads() > 1)
389 #endif
390 
391  unsigned int n_threads = num_pthreads(range);
392 
393  std::vector<Range *> ranges(n_threads);
394  std::vector<Body *> bodies(n_threads);
395  std::vector<RangeBody<Range, Body>> range_bodies(n_threads);
396 
397  // Create copies of the body for each thread
398  bodies[0] = &body; // Use the original body for the first one
399  for (unsigned int i=1; i<n_threads; i++)
400  bodies[i] = new Body(body, Threads::split());
401 
402  // Create the ranges for each thread
403  std::size_t range_size = range.size() / n_threads;
404 
405  typename Range::const_iterator current_beginning = range.begin();
406 
407  for (unsigned int i=0; i<n_threads; i++)
408  {
409  std::size_t this_range_size = range_size;
410 
411  if (i+1 == n_threads)
412  this_range_size += range.size() % n_threads; // Give the last one the remaining work to do
413 
414  ranges[i] = new Range(range, current_beginning, current_beginning + this_range_size);
415 
416  current_beginning = current_beginning + this_range_size;
417  }
418 
419  // Create the RangeBody arguments
420  for (unsigned int i=0; i<n_threads; i++)
421  {
422  range_bodies[i].range = ranges[i];
423  range_bodies[i].body = bodies[i];
424  }
425 
426  // Create the threads
427  std::vector<pthread_t> threads(n_threads);
428 
429  // It may seem redundant to wrap a pragma in #ifdefs... but GCC
430  // warns about an "unknown pragma" if it encounters this line of
431  // code when -fopenmp is not passed to the compiler.
432 #ifdef LIBMESH_HAVE_OPENMP
433 #pragma omp parallel for schedule (static)
434 #endif
435  // The use of 'int' instead of unsigned for the iteration variable
436  // is deliberate here. This is an OpenMP loop, and some older
437  // compilers warn when you don't use int for the loop index. The
438  // reason has to do with signed vs. unsigned integer overflow
439  // behavior and optimization.
440  // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
441  for (int i=0; i<static_cast<int>(n_threads); i++)
442  {
443 #if !LIBMESH_HAVE_OPENMP
444  pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
445 #else
446  run_body<Range, Body>((void *)&range_bodies[i]);
447 #endif
448  }
449 
450 #if !LIBMESH_HAVE_OPENMP
451  // Wait for them to finish
452  for (unsigned int i=0; i<n_threads; i++)
453  pthread_join(threads[i], nullptr);
454 #endif
455 
456  // Join them all down to the original Body
457  for (unsigned int i=n_threads-1; i != 0; i--)
458  bodies[i-1]->join(*bodies[i]);
459 
460  // Clean up
461  for (unsigned int i=1; i<n_threads; i++)
462  delete bodies[i];
463  for (unsigned int i=0; i<n_threads; i++)
464  delete ranges[i];
465 
466 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
467  if (libMesh::n_threads() > 1 && logging_was_enabled)
469 #endif
470 }
471 
476 template <typename Range, typename Body, typename Partitioner>
477 inline
478 void parallel_reduce (const Range & range, Body & body, const Partitioner &)
479 {
480  parallel_reduce(range, body);
481 }
482 
483 
488 template <typename T>
489 class atomic
490 {
491 public:
492  atomic () : val(0) {}
493  operator T () { return val; }
494 
496  {
497  spin_mutex::scoped_lock lock(smutex);
498  val = value;
499  return val;
500  }
501 
503  {
504  spin_mutex::scoped_lock lock(smutex);
505  val = value;
506  return *this;
507  }
508 
509 
511  {
512  spin_mutex::scoped_lock lock(smutex);
513  val += value;
514  return val;
515  }
516 
518  {
519  spin_mutex::scoped_lock lock(smutex);
520  val -= value;
521  return val;
522  }
523 
525  {
526  spin_mutex::scoped_lock lock(smutex);
527  val++;
528  return val;
529  }
530 
531  T operator++(int)
532  {
533  spin_mutex::scoped_lock lock(smutex);
534  val++;
535  return val;
536  }
537 
539  {
540  spin_mutex::scoped_lock lock(smutex);
541  val--;
542  return val;
543  }
544 
545  T operator--(int)
546  {
547  spin_mutex::scoped_lock lock(smutex);
548  val--;
549  return val;
550  }
551 
552 private:
553  T val;
555 };
556 
557 } // namespace Threads
558 
559 } // namespace libMesh
560 
561 #endif // #ifdef LIBMESH_HAVE_PTHREAD
562 
563 #endif // LIBMESH_SQUASH_HEADER_WARNING
564 
565 #endif // LIBMESH_THREADS_PTHREAD_H
NonConcurrentThread Thread
Definition: threads_none.h:43
unsigned int n_threads()
Definition: libmesh_base.h:96
void acquire(recursive_mutex &in_rmutex)
void parallel_for(const Range &range, const Body &body)
Definition: threads_none.h:73
void enable_logging()
Definition: perf_log.h:161
void * run_body(void *args)
bool in_threads
Definition: threads.C:31
PerfLog perflog("libMesh", #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING true #else false #endif)
tbb::task_scheduler_init task_scheduler_init
Definition: threads_tbb.h:73
bool logging_enabled() const
Definition: perf_log.h:166
tbb::spin_mutex spin_mutex
Definition: threads_tbb.h:209
tbb::split split
Definition: threads_tbb.h:79
atomic< T > & operator=(const atomic< T > &value)
static const bool value
Definition: xdr_io.C:109
void parallel_reduce(const Range &range, Body &body)
Definition: threads_none.h:101
unsigned int num_pthreads(Range &range)
long double min(long double a, double b)
void disable_logging()
Definition: perf_log.h:156