libMesh
threads_pthread.h
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2026 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 #ifndef LIBMESH_THREADS_PTHREAD_H
19 #define LIBMESH_THREADS_PTHREAD_H
20 
21 // Do not try to #include this header directly, it is designed to be
22 // #included directly by threads.h
23 #ifndef LIBMESH_SQUASH_HEADER_WARNING
24 # warning "This file is designed to be included through libmesh/threads.h"
25 #else
26 
27 #ifdef LIBMESH_HAVE_PTHREAD
28 
29 // C++ includes
30 #ifdef LIBMESH_HAVE_CXX11_THREAD
31 # include <thread>
32 #endif
33 
34 #include <pthread.h>
35 #include <algorithm>
36 #include <vector>
37 #include <memory> // std::unique_ptr, std::make_unique
38 
39 #ifdef __APPLE__
40 # ifdef __MAC_10_12
41 # include <os/lock.h>
42 #else
43 # include <libkern/OSAtomic.h>
44 # endif
45 #endif
46 
47 // Thread-Local-Storage macros
48 #ifdef LIBMESH_HAVE_CXX11_THREAD
49 # define LIBMESH_TLS_TYPE(type) thread_local type
50 # define LIBMESH_TLS_REF(value) (value)
51 #else // Maybe support gcc __thread eventually?
52 # define LIBMESH_TLS_TYPE(type) type
53 # define LIBMESH_TLS_REF(value) (value)
54 #endif
55 
56 namespace libMesh
57 {
58 
59 namespace Threads
60 {
61 
62 
63 #ifdef LIBMESH_HAVE_CXX11_THREAD
64 
67 typedef std::thread Thread;
68 
69 #else
70 
74 typedef NonConcurrentThread Thread;
75 
76 #endif // LIBMESH_HAVE_CXX11_THREAD
77 
78 
83 #ifdef __APPLE__
84 #ifdef __MAC_10_12
85 class spin_mutex
86 {
87 public:
88  spin_mutex() { ulock = OS_UNFAIR_LOCK_INIT; }
89  ~spin_mutex() = default;
90 
91  void lock () { os_unfair_lock_lock(&ulock); }
92  void unlock () { os_unfair_lock_unlock(&ulock); }
93 
94  class scoped_lock
95  {
96  public:
97  scoped_lock () : smutex(nullptr) {}
98  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
99 
101 
102  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
103  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
104 
105  private:
107  };
108 
109 private:
110  os_unfair_lock ulock;
111 };
112 #else
113 class spin_mutex
114 {
115 public:
116  spin_mutex() : slock(0) {} // The convention is that the lock being zero is _unlocked_
117  ~spin_mutex() = default;
118 
119  void lock () { OSSpinLockLock(&slock); }
120  void unlock () { OSSpinLockUnlock(&slock); }
121 
122  class scoped_lock
123  {
124  public:
125  scoped_lock () : smutex(nullptr) {}
126  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
127 
129 
130  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
131  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
132 
133  private:
134  spin_mutex * smutex;
135  };
136 
137 private:
138  OSSpinLock slock;
139 };
140 #endif
141 #else
142 class spin_mutex
143 {
144 public:
145  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
146  spin_mutex() { pthread_spin_init(&slock, PTHREAD_PROCESS_PRIVATE); }
147  ~spin_mutex() { pthread_spin_destroy(&slock); }
148 
149  void lock () { pthread_spin_lock(&slock); }
150  void unlock () { pthread_spin_unlock(&slock); }
151 
152  class scoped_lock
153  {
154  public:
155  scoped_lock () : smutex(nullptr) {}
156  explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
157 
159 
160  void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
161  void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
162 
163  private:
164  spin_mutex * smutex;
165  };
166 
167 private:
168  pthread_spinlock_t slock;
169 };
170 #endif // __APPLE__
171 
172 
173 
178 class recursive_mutex
179 {
180 public:
181  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
183  {
184  pthread_mutexattr_init(&attr);
185  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
186  pthread_mutex_init(&mutex, &attr);
187  }
188  ~recursive_mutex() { pthread_mutex_destroy(&mutex); }
189 
190  void lock () { pthread_mutex_lock(&mutex); }
191  void unlock () { pthread_mutex_unlock(&mutex); }
192 
193  class scoped_lock
194  {
195  public:
196  scoped_lock () : rmutex(nullptr) {}
197  explicit scoped_lock ( recursive_mutex & in_rmutex ) : rmutex(&in_rmutex) { rmutex->lock(); }
198 
200 
201  void acquire ( recursive_mutex & in_rmutex ) { rmutex = &in_rmutex; rmutex->lock(); }
202  void release () { if (rmutex) rmutex->unlock(); rmutex = nullptr; }
203 
204  private:
206  };
207 
208 private:
209  pthread_mutex_t mutex;
210  pthread_mutexattr_t attr;
211 };
212 
213 template <typename Range>
214 unsigned int num_pthreads(const Range & range,
215  unsigned int requested = libMesh::n_threads())
216 {
217  std::size_t mn = std::min((std::size_t)requested, range.size()/range.grainsize());
218  return mn > 0 ? cast_int<unsigned int>(mn) : 1;
219 }
220 
221 template <typename Range, typename Body>
223 {
224 public:
225  Range * range;
226  Body * body;
227 };
228 
229 template <typename Range, typename Body>
230 void * run_body(void * args)
231 {
232  RangeBody<Range, Body> * range_body = (RangeBody<Range, Body> *)args;
233 
234  Body & body = *range_body->body;
235  Range & range = *range_body->range;
236 
237  body(range);
238 
239  return nullptr;
240 }
241 
246 {
247 public:
248  static const int automatic = -1;
249  explicit task_scheduler_init (int = automatic) {}
250  void initialize (int = automatic) {}
251  void terminate () {}
252 };
253 
254 //-------------------------------------------------------------------
259 class split {};
260 
261 
262 
263 
264 //-------------------------------------------------------------------
269 template <typename Range, typename Body>
270 inline
271 void parallel_for (const Range & range, const Body & body,
272  unsigned int n_threads = libMesh::n_threads())
273 {
274  libmesh_error_msg_if(n_threads > libMesh::n_threads(),
275  "Requested n_threads (" << n_threads << ") exceeds the "
276  "global thread count (" << libMesh::n_threads() << ").");
278 
279  unsigned int actual_threads = num_pthreads(range, n_threads);
280 
281  // If we're running in serial - just run!
282  if (actual_threads == 1)
283  {
284  body(range);
285  return;
286  }
287 
288  Threads::RAIIAcquire<int> set_active_threads(Threads::active_threads, actual_threads);
289 
290  DisablePerfLogInScope disable_perf;
291 
292  std::vector<std::unique_ptr<Range>> ranges(actual_threads);
293  std::vector<RangeBody<const Range, const Body>> range_bodies(actual_threads);
294  std::vector<pthread_t> threads(actual_threads);
295 
296  // Create the ranges for each thread
297  std::size_t range_size = range.size() / actual_threads;
298 
299  typename Range::const_iterator current_beginning = range.begin();
300 
301  for (unsigned int i=0; i<actual_threads; i++)
302  {
303  std::size_t this_range_size = range_size;
304 
305  if (i+1 == actual_threads)
306  this_range_size += range.size() % actual_threads; // Give the last one the remaining work to do
307 
308  ranges[i] = std::make_unique<Range>(range, current_beginning, current_beginning + this_range_size);
309 
310  current_beginning = current_beginning + this_range_size;
311  }
312 
313  // Create the RangeBody arguments
314  for (unsigned int i=0; i<actual_threads; i++)
315  {
316  range_bodies[i].range = ranges[i].get();
317  range_bodies[i].body = &body;
318  }
319 
320  // Create the threads. It may seem redundant to wrap a pragma in
321  // #ifdefs... but GCC warns about an "unknown pragma" if it
322  // encounters this line of code when -fopenmp is not passed to the
323  // compiler.
324 #ifdef LIBMESH_HAVE_OPENMP
325 #pragma omp parallel for schedule (static)
326 #endif
327  for (int i=0; i<static_cast<int>(actual_threads); i++)
328  {
329 #if !LIBMESH_HAVE_OPENMP
330  pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
331 #else
332  run_body<Range, Body>((void *)&range_bodies[i]);
333 #endif
334  }
335 
336 #if !LIBMESH_HAVE_OPENMP
337  // Wait for them to finish
338 
339  // The use of 'int' instead of unsigned for the iteration variable
340  // is deliberate here. This is an OpenMP loop, and some older
341  // compilers warn when you don't use int for the loop index. The
342  // reason has to do with signed vs. unsigned integer overflow
343  // behavior and optimization.
344  // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
345  for (int i=0; i<static_cast<int>(actual_threads); i++)
346  pthread_join(threads[i], nullptr);
347 #endif
348 }
349 
354 template <typename Range, typename Body, typename Partitioner>
355 inline
356 void parallel_for (const Range & range, const Body & body, const Partitioner &,
357  unsigned int n_threads = libMesh::n_threads())
358 {
359  parallel_for (range, body, n_threads);
360 }
361 
366 template <typename Range, typename Body>
367 inline
368 void parallel_reduce (const Range & range, Body & body,
369  unsigned int n_threads = libMesh::n_threads())
370 {
371  libmesh_error_msg_if(n_threads > libMesh::n_threads(),
372  "Requested n_threads (" << n_threads << ") exceeds the "
373  "global thread count (" << libMesh::n_threads() << ").");
375 
376  unsigned int actual_threads = num_pthreads(range, n_threads);
377 
378  // If we're running in serial - just run!
379  if (actual_threads == 1)
380  {
381  body(range);
382  return;
383  }
384 
385  Threads::RAIIAcquire<int> set_active_threads(Threads::active_threads, actual_threads);
386 
387  DisablePerfLogInScope disable_perf;
388 
389  std::vector<std::unique_ptr<Range>> ranges(actual_threads);
390  std::vector<std::unique_ptr<Body>> managed_bodies(actual_threads); // bodies we are responsible for
391  std::vector<Body *> bodies(actual_threads); // dumb pointers to managed_bodies
392  std::vector<RangeBody<Range, Body>> range_bodies(actual_threads);
393 
394  // Create actual_threads-1 copies of "body". We manage the lifetime of
395  // these copies with std::unique_ptrs.
396  for (unsigned int i=1; i<actual_threads; i++)
397  managed_bodies[i] = std::make_unique<Body>(body, Threads::split());
398 
399  // Set up the "bodies" vector. Use the passed in body for the first
400  // one, point to managed_bodies entries for the others.
401  bodies[0] = &body;
402  for (unsigned int i=1; i<actual_threads; i++)
403  bodies[i] = managed_bodies[i].get();
404 
405  // Create the ranges for each thread
406  std::size_t range_size = range.size() / actual_threads;
407 
408  typename Range::const_iterator current_beginning = range.begin();
409 
410  for (unsigned int i=0; i<actual_threads; i++)
411  {
412  std::size_t this_range_size = range_size;
413 
414  if (i+1 == actual_threads)
415  this_range_size += range.size() % actual_threads; // Give the last one the remaining work to do
416 
417  ranges[i] = std::make_unique<Range>(range, current_beginning, current_beginning + this_range_size);
418 
419  current_beginning = current_beginning + this_range_size;
420  }
421 
422  // Create the RangeBody arguments
423  for (unsigned int i=0; i<actual_threads; i++)
424  {
425  range_bodies[i].range = ranges[i].get();
426  range_bodies[i].body = bodies[i];
427  }
428 
429  // Create the threads
430  std::vector<pthread_t> threads(actual_threads);
431 
432  // It may seem redundant to wrap a pragma in #ifdefs... but GCC
433  // warns about an "unknown pragma" if it encounters this line of
434  // code when -fopenmp is not passed to the compiler.
435 #ifdef LIBMESH_HAVE_OPENMP
436 #pragma omp parallel for schedule (static)
437 #endif
438  // The use of 'int' instead of unsigned for the iteration variable
439  // is deliberate here. This is an OpenMP loop, and some older
440  // compilers warn when you don't use int for the loop index. The
441  // reason has to do with signed vs. unsigned integer overflow
442  // behavior and optimization.
443  // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
444  for (int i=0; i<static_cast<int>(actual_threads); i++)
445  {
446 #if !LIBMESH_HAVE_OPENMP
447  pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
448 #else
449  run_body<Range, Body>((void *)&range_bodies[i]);
450 #endif
451  }
452 
453 #if !LIBMESH_HAVE_OPENMP
454  // Wait for them to finish
455  for (unsigned int i=0; i<actual_threads; i++)
456  pthread_join(threads[i], nullptr);
457 #endif
458 
459  // Join them all down to the original Body
460  for (unsigned int i=actual_threads-1; i != 0; i--)
461  bodies[i-1]->join(*bodies[i]);
462 }
463 
468 template <typename Range, typename Body, typename Partitioner>
469 inline
470 void parallel_reduce (const Range & range, Body & body, const Partitioner &,
471  unsigned int n_threads = libMesh::n_threads())
472 {
473  parallel_reduce(range, body, n_threads);
474 }
475 
476 
481 template <typename T>
482 class atomic
483 {
484 public:
485  atomic () : val(0) {}
486  operator T () { return val; }
487 
489  {
490  spin_mutex::scoped_lock lock(smutex);
491  val = value;
492  return val;
493  }
494 
496  {
497  spin_mutex::scoped_lock lock(smutex);
498  val = value;
499  return *this;
500  }
501 
502 
504  {
505  spin_mutex::scoped_lock lock(smutex);
506  val += value;
507  return val;
508  }
509 
511  {
512  spin_mutex::scoped_lock lock(smutex);
513  val -= value;
514  return val;
515  }
516 
518  {
519  spin_mutex::scoped_lock lock(smutex);
520  val++;
521  return val;
522  }
523 
524  T operator++(int)
525  {
526  spin_mutex::scoped_lock lock(smutex);
527  val++;
528  return val;
529  }
530 
532  {
533  spin_mutex::scoped_lock lock(smutex);
534  val--;
535  return val;
536  }
537 
538  T operator--(int)
539  {
540  spin_mutex::scoped_lock lock(smutex);
541  val--;
542  return val;
543  }
544 
545 private:
546  T val;
548 };
549 
550 } // namespace Threads
551 
552 } // namespace libMesh
553 
554 #endif // #ifdef LIBMESH_HAVE_PTHREAD
555 
556 #endif // LIBMESH_SQUASH_HEADER_WARNING
557 
558 #endif // LIBMESH_THREADS_PTHREAD_H
void parallel_for(const Range &range, const Body &body, unsigned int n_threads=libMesh::n_threads())
Execute the provided function object in parallel on the specified range.
Definition: threads_none.h:73
unsigned int n_threads()
Definition: libmesh_base.h:109
void acquire(recursive_mutex &in_rmutex)
void * run_body(void *args)
The libMesh namespace provides an interface to certain functionality in the library.
bool in_threads
A boolean which is true iff we are in a Threads:: function It may be useful to assert(!Threadsin_thre...
Definition: threads.C:33
tbb::task_scheduler_init task_scheduler_init
Scheduler to manage the TBB thread pool.
Definition: threads_tbb.h:102
unsigned int num_pthreads(const Range &range, unsigned int requested=libMesh::n_threads())
void parallel_reduce(const Range &range, Body &body, unsigned int n_threads=libMesh::n_threads())
Execute the provided reduction operation in parallel on the specified range.
Definition: threads_none.h:109
tbb::spin_mutex spin_mutex
Spin mutex.
tbb::split split
Dummy "splitting object" used to distinguish splitting constructors from copy constructors.
Definition: threads_tbb.h:136
Defines atomic operations which can only be executed on a single thread at a time.
Definition: threads_none.h:188
atomic< T > & operator=(const atomic< T > &value)
int active_threads
An integer which is set to the number of active threads when we are in a Threads:: parallel operation...
Definition: threads.C:32
static const bool value
Definition: xdr_io.C:55
RAIIAcquire< bool, true, true > BoolAcquire
Definition: threads.h:84
NonConcurrentThread Thread
Use the non-concurrent placeholder.
Definition: threads_none.h:43