Line data Source code
1 : // The libMesh Finite Element Library.
2 : // Copyright (C) 2002-2026 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 :
4 : // This library is free software; you can redistribute it and/or
5 : // modify it under the terms of the GNU Lesser General Public
6 : // License as published by the Free Software Foundation; either
7 : // version 2.1 of the License, or (at your option) any later version.
8 :
9 : // This library is distributed in the hope that it will be useful,
10 : // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : // Lesser General Public License for more details.
13 :
14 : // You should have received a copy of the GNU Lesser General Public
15 : // License along with this library; if not, write to the Free Software
16 : // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 :
18 : #ifndef LIBMESH_THREADS_PTHREAD_H
19 : #define LIBMESH_THREADS_PTHREAD_H
20 :
21 : // Do not try to #include this header directly, it is designed to be
22 : // #included directly by threads.h
23 : #ifndef LIBMESH_SQUASH_HEADER_WARNING
24 : # warning "This file is designed to be included through libmesh/threads.h"
25 : #else
26 :
27 : #ifdef LIBMESH_HAVE_PTHREAD
28 :
29 : // C++ includes
30 : #ifdef LIBMESH_HAVE_CXX11_THREAD
31 : # include <thread>
32 : #endif
33 :
34 : #include <pthread.h>
35 : #include <algorithm>
36 : #include <vector>
37 : #include <memory> // std::unique_ptr, std::make_unique
38 :
39 : #ifdef __APPLE__
40 : # ifdef __MAC_10_12
41 : # include <os/lock.h>
42 : #else
43 : # include <libkern/OSAtomic.h>
44 : # endif
45 : #endif
46 :
47 : // Thread-Local-Storage macros
48 : #ifdef LIBMESH_HAVE_CXX11_THREAD
49 : # define LIBMESH_TLS_TYPE(type) thread_local type
50 : # define LIBMESH_TLS_REF(value) (value)
51 : #else // Maybe support gcc __thread eventually?
52 : # define LIBMESH_TLS_TYPE(type) type
53 : # define LIBMESH_TLS_REF(value) (value)
54 : #endif
55 :
56 : namespace libMesh
57 : {
58 :
59 : namespace Threads
60 : {
61 :
62 :
63 : #ifdef LIBMESH_HAVE_CXX11_THREAD
64 : /**
65 : * Use std::thread when available.
66 : */
67 : typedef std::thread Thread;
68 :
69 : #else
70 :
71 : /**
72 : * Use the non-concurrent placeholder.
73 : */
74 : typedef NonConcurrentThread Thread;
75 :
76 : #endif // LIBMESH_HAVE_CXX11_THREAD
77 :
78 :
79 : /**
80 : * Spin mutex. Implements mutual exclusion by busy-waiting in user
81 : * space for the lock to be acquired.
82 : */
83 : #ifdef __APPLE__
84 : #ifdef __MAC_10_12
85 : class spin_mutex
86 : {
87 : public:
88 : spin_mutex() { ulock = OS_UNFAIR_LOCK_INIT; }
89 : ~spin_mutex() = default;
90 :
91 : void lock () { os_unfair_lock_lock(&ulock); }
92 : void unlock () { os_unfair_lock_unlock(&ulock); }
93 :
94 : class scoped_lock
95 : {
96 : public:
97 : scoped_lock () : smutex(nullptr) {}
98 : explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
99 :
100 : ~scoped_lock () { release(); }
101 :
102 : void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
103 : void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
104 :
105 : private:
106 : spin_mutex * smutex;
107 : };
108 :
109 : private:
110 : os_unfair_lock ulock;
111 : };
112 : #else
113 : class spin_mutex
114 : {
115 : public:
116 : spin_mutex() : slock(0) {} // The convention is that the lock being zero is _unlocked_
117 : ~spin_mutex() = default;
118 :
119 : void lock () { OSSpinLockLock(&slock); }
120 : void unlock () { OSSpinLockUnlock(&slock); }
121 :
122 : class scoped_lock
123 : {
124 : public:
125 : scoped_lock () : smutex(nullptr) {}
126 : explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
127 :
128 : ~scoped_lock () { release(); }
129 :
130 : void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
131 : void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
132 :
133 : private:
134 : spin_mutex * smutex;
135 : };
136 :
137 : private:
138 : OSSpinLock slock;
139 : };
140 : #endif
141 : #else
142 : class spin_mutex
143 : {
144 : public:
145 : // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
146 50562 : spin_mutex() { pthread_spin_init(&slock, PTHREAD_PROCESS_PRIVATE); }
147 157722 : ~spin_mutex() { pthread_spin_destroy(&slock); }
148 :
149 5235026921 : void lock () { pthread_spin_lock(&slock); }
150 4521625631 : void unlock () { pthread_spin_unlock(&slock); }
151 :
152 : class scoped_lock
153 : {
154 : public:
155 : scoped_lock () : smutex(nullptr) {}
156 287710 : explicit scoped_lock ( spin_mutex & in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
157 :
158 287710 : ~scoped_lock () { release(); }
159 :
160 8 : void acquire ( spin_mutex & in_smutex ) { smutex = &in_smutex; smutex->lock(); }
161 762009 : void release () { if (smutex) smutex->unlock(); smutex = nullptr; }
162 :
163 : private:
164 : spin_mutex * smutex;
165 : };
166 :
167 : private:
168 : pthread_spinlock_t slock;
169 : };
170 : #endif // __APPLE__
171 :
172 :
173 :
174 : /**
175 : * Recursive mutex. Implements mutual exclusion by busy-waiting in user
176 : * space for the lock to be acquired.
177 : */
178 : class recursive_mutex
179 : {
180 : public:
181 : // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
182 16600 : recursive_mutex()
183 16600 : {
184 16600 : pthread_mutexattr_init(&attr);
185 16600 : pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
186 16600 : pthread_mutex_init(&mutex, &attr);
187 16600 : }
188 16600 : ~recursive_mutex() { pthread_mutex_destroy(&mutex); }
189 :
190 : void lock () { pthread_mutex_lock(&mutex); }
191 : void unlock () { pthread_mutex_unlock(&mutex); }
192 :
193 : class scoped_lock
194 : {
195 : public:
196 : scoped_lock () : rmutex(nullptr) {}
197 : explicit scoped_lock ( recursive_mutex & in_rmutex ) : rmutex(&in_rmutex) { rmutex->lock(); }
198 :
199 : ~scoped_lock () { release(); }
200 :
201 : void acquire ( recursive_mutex & in_rmutex ) { rmutex = &in_rmutex; rmutex->lock(); }
202 : void release () { if (rmutex) rmutex->unlock(); rmutex = nullptr; }
203 :
204 : private:
205 : recursive_mutex * rmutex;
206 : };
207 :
208 : private:
209 : pthread_mutex_t mutex;
210 : pthread_mutexattr_t attr;
211 : };
212 :
213 : template <typename Range>
214 151266 : unsigned int num_pthreads(const Range & range,
215 : unsigned int requested = libMesh::n_threads())
216 : {
217 6621765 : std::size_t mn = std::min((std::size_t)requested, range.size());
218 302510 : return mn > 0 ? cast_int<unsigned int>(mn) : 1;
219 : }
220 :
221 : template <typename Range, typename Body>
222 : class RangeBody
223 : {
224 : public:
225 : Range * range;
226 : Body * body;
227 : };
228 :
229 : template <typename Range, typename Body>
230 170382 : void * run_body(void * args)
231 : {
232 147642 : RangeBody<Range, Body> * range_body = (RangeBody<Range, Body> *)args;
233 :
234 167626 : Body & body = *range_body->body;
235 170382 : Range & range = *range_body->range;
236 :
237 147642 : body(range);
238 :
239 170382 : return nullptr;
240 : }
241 :
242 : /**
243 : * Scheduler to manage threads.
244 : */
245 : class task_scheduler_init
246 : {
247 : public:
248 : static const int automatic = -1;
249 468 : explicit task_scheduler_init (int = automatic) {}
250 : void initialize (int = automatic) {}
251 : void terminate () {}
252 : };
253 :
254 : //-------------------------------------------------------------------
255 : /**
256 : * Dummy "splitting object" used to distinguish splitting constructors
257 : * from copy constructors.
258 : */
259 : class split {};
260 :
261 :
262 :
263 :
264 : //-------------------------------------------------------------------
265 : /**
266 : * Execute the provided function object in parallel on the specified
267 : * range.
268 : */
269 : template <typename Range, typename Body>
270 : inline
271 2530687 : void parallel_for (const Range & range, const Body & body,
272 : unsigned int n_threads = libMesh::n_threads())
273 : {
274 2530687 : libmesh_error_msg_if(n_threads > libMesh::n_threads(),
275 : "Requested n_threads (" << n_threads << ") exceeds the "
276 : "global thread count (" << libMesh::n_threads() << ").");
277 70550 : Threads::BoolAcquire set_in_threads(Threads::in_threads);
278 :
279 70550 : unsigned int actual_threads = num_pthreads(range, n_threads);
280 :
281 : // If we're running in serial - just run!
282 2528131 : if (actual_threads == 1)
283 : {
284 2388016 : body(range);
285 25433 : return;
286 : }
287 :
288 175964 : Threads::RAIIAcquire<int> set_active_threads(Threads::active_threads, actual_threads);
289 :
290 180440 : DisablePerfLogInScope disable_perf;
291 :
292 221067 : std::vector<std::unique_ptr<Range>> ranges(actual_threads);
293 175964 : std::vector<RangeBody<const Range, const Body>> range_bodies(actual_threads);
294 130847 : std::vector<pthread_t> threads(actual_threads);
295 :
296 : // Create the ranges for each thread
297 130847 : std::size_t range_size = range.size() / actual_threads;
298 :
299 130847 : typename Range::const_iterator current_beginning = range.begin();
300 :
301 392541 : for (unsigned int i=0; i<actual_threads; i++)
302 : {
303 90234 : std::size_t this_range_size = range_size;
304 :
305 261694 : if (i+1 == actual_threads)
306 130847 : this_range_size += range.size() % actual_threads; // Give the last one the remaining work to do
307 :
308 351900 : ranges[i] = std::make_unique<Range>(range, current_beginning, current_beginning + this_range_size);
309 :
310 351900 : current_beginning = current_beginning + this_range_size;
311 : }
312 :
313 : // Create the RangeBody arguments
314 392541 : for (unsigned int i=0; i<actual_threads; i++)
315 : {
316 351900 : range_bodies[i].range = ranges[i].get();
317 261694 : range_bodies[i].body = &body;
318 : }
319 :
320 : // Create the threads. It may seem redundant to wrap a pragma in
321 : // #ifdefs... but GCC warns about an "unknown pragma" if it
322 : // encounters this line of code when -fopenmp is not passed to the
323 : // compiler.
324 : #ifdef LIBMESH_HAVE_OPENMP
325 130847 : #pragma omp parallel for schedule (static)
326 : #endif
327 : for (int i=0; i<static_cast<int>(actual_threads); i++)
328 : {
329 : #if !LIBMESH_HAVE_OPENMP
330 : pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
331 : #else
332 : run_body<Range, Body>((void *)&range_bodies[i]);
333 : #endif
334 : }
335 :
336 : #if !LIBMESH_HAVE_OPENMP
337 : // Wait for them to finish
338 :
339 : // The use of 'int' instead of unsigned for the iteration variable
340 : // is deliberate here. This is an OpenMP loop, and some older
341 : // compilers warn when you don't use int for the loop index. The
342 : // reason has to do with signed vs. unsigned integer overflow
343 : // behavior and optimization.
344 : // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
345 : for (int i=0; i<static_cast<int>(actual_threads); i++)
346 : pthread_join(threads[i], nullptr);
347 : #endif
348 40627 : }
349 :
350 : /**
351 : * Execute the provided function object in parallel on the specified
352 : * range with the specified partitioner.
353 : */
354 : template <typename Range, typename Body, typename Partitioner>
355 : inline
356 : void parallel_for (const Range & range, const Body & body, const Partitioner &,
357 : unsigned int n_threads = libMesh::n_threads())
358 : {
359 : parallel_for (range, body, n_threads);
360 : }
361 :
362 : /**
363 : * Execute the provided reduction operation in parallel on the specified
364 : * range.
365 : */
366 : template <typename Range, typename Body>
367 : inline
368 4091078 : void parallel_reduce (const Range & range, Body & body,
369 : unsigned int n_threads = libMesh::n_threads())
370 : {
371 4091078 : libmesh_error_msg_if(n_threads > libMesh::n_threads(),
372 : "Requested n_threads (" << n_threads << ") exceeds the "
373 : "global thread count (" << libMesh::n_threads() << ").");
374 80716 : Threads::BoolAcquire set_in_threads(Threads::in_threads);
375 :
376 80716 : unsigned int actual_threads = num_pthreads(range, n_threads);
377 :
378 : // If we're running in serial - just run!
379 4050224 : if (actual_threads == 1)
380 : {
381 4006983 : body(range);
382 52012 : return;
383 : }
384 :
385 112799 : Threads::RAIIAcquire<int> set_active_threads(Threads::active_threads, actual_threads);
386 :
387 114826 : DisablePerfLogInScope disable_perf;
388 :
389 141508 : std::vector<std::unique_ptr<Range>> ranges(actual_threads);
390 141508 : std::vector<std::unique_ptr<Body>> managed_bodies(actual_threads); // bodies we are responsible for
391 112799 : std::vector<Body *> bodies(actual_threads); // dumb pointers to managed_bodies
392 112799 : std::vector<RangeBody<Range, Body>> range_bodies(actual_threads);
393 :
394 : // Create actual_threads-1 copies of "body". We manage the lifetime of
395 : // these copies with std::unique_ptrs.
396 168190 : for (unsigned int i=1; i<actual_threads; i++)
397 95462 : managed_bodies[i] = std::make_unique<Body>(body, Threads::split());
398 :
399 : // Set up the "bodies" vector. Use the passed in body for the first
400 : // one, point to managed_bodies entries for the others.
401 84095 : bodies[0] = &body;
402 168190 : for (unsigned int i=1; i<actual_threads; i++)
403 112804 : bodies[i] = managed_bodies[i].get();
404 :
405 : // Create the ranges for each thread
406 84095 : std::size_t range_size = range.size() / actual_threads;
407 :
408 83983 : typename Range::const_iterator current_beginning = range.begin();
409 :
410 252285 : for (unsigned int i=0; i<actual_threads; i++)
411 : {
412 57408 : std::size_t this_range_size = range_size;
413 :
414 168190 : if (i+1 == actual_threads)
415 84095 : this_range_size += range.size() % actual_threads; // Give the last one the remaining work to do
416 :
417 225480 : ranges[i] = std::make_unique<Range>(range, current_beginning, current_beginning + this_range_size);
418 :
419 225256 : current_beginning = current_beginning + this_range_size;
420 : }
421 :
422 : // Create the RangeBody arguments
423 252285 : for (unsigned int i=0; i<actual_threads; i++)
424 : {
425 225608 : range_bodies[i].range = ranges[i].get();
426 225608 : range_bodies[i].body = bodies[i];
427 : }
428 :
429 : // Create the threads
430 112799 : std::vector<pthread_t> threads(actual_threads);
431 :
432 : // It may seem redundant to wrap a pragma in #ifdefs... but GCC
433 : // warns about an "unknown pragma" if it encounters this line of
434 : // code when -fopenmp is not passed to the compiler.
435 : #ifdef LIBMESH_HAVE_OPENMP
436 84095 : #pragma omp parallel for schedule (static)
437 : #endif
438 : // The use of 'int' instead of unsigned for the iteration variable
439 : // is deliberate here. This is an OpenMP loop, and some older
440 : // compilers warn when you don't use int for the loop index. The
441 : // reason has to do with signed vs. unsigned integer overflow
442 : // behavior and optimization.
443 : // http://blog.llvm.org/2011/05/what-every-c-programmer-should-know.html
444 : for (int i=0; i<static_cast<int>(actual_threads); i++)
445 : {
446 : #if !LIBMESH_HAVE_OPENMP
447 : pthread_create(&threads[i], nullptr, &run_body<Range, Body>, (void *)&range_bodies[i]);
448 : #else
449 : run_body<Range, Body>((void *)&range_bodies[i]);
450 : #endif
451 : }
452 :
453 : #if !LIBMESH_HAVE_OPENMP
454 : // Wait for them to finish
455 : for (unsigned int i=0; i<actual_threads; i++)
456 : pthread_join(threads[i], nullptr);
457 : #endif
458 :
459 : // Join them all down to the original Body
460 168190 : for (unsigned int i=actual_threads-1; i != 0; i--)
461 141513 : bodies[i-1]->join(*bodies[i]);
462 26682 : }
463 :
464 : /**
465 : * Execute the provided reduction operation in parallel on the specified
466 : * range with the specified partitioner.
467 : */
468 : template <typename Range, typename Body, typename Partitioner>
469 : inline
470 : void parallel_reduce (const Range & range, Body & body, const Partitioner &,
471 : unsigned int n_threads = libMesh::n_threads())
472 : {
473 : parallel_reduce(range, body, n_threads);
474 : }
475 :
476 :
477 : /**
478 : * Defines atomic operations which can only be executed on a
479 : * single thread at a time.
480 : */
481 : template <typename T>
482 : class atomic
483 : {
484 : public:
485 468 : atomic () : val(0) {}
486 33668 : operator T () { return val; }
487 :
488 : T operator=( T value )
489 : {
490 : spin_mutex::scoped_lock lock(smutex);
491 : val = value;
492 : return val;
493 : }
494 :
495 : atomic<T> & operator=( const atomic<T> & value )
496 : {
497 : spin_mutex::scoped_lock lock(smutex);
498 : val = value;
499 : return *this;
500 : }
501 :
502 :
503 : T operator+=(T value)
504 : {
505 : spin_mutex::scoped_lock lock(smutex);
506 : val += value;
507 : return val;
508 : }
509 :
510 : T operator-=(T value)
511 : {
512 : spin_mutex::scoped_lock lock(smutex);
513 : val -= value;
514 : return val;
515 : }
516 :
517 504936722 : T operator++()
518 : {
519 30405942 : spin_mutex::scoped_lock lock(smutex);
520 1191179611 : val++;
521 504936722 : return val;
522 : }
523 :
524 : T operator++(int)
525 : {
526 : spin_mutex::scoped_lock lock(smutex);
527 : val++;
528 : return val;
529 : }
530 :
531 3720403410 : T operator--()
532 : {
533 11267906 : spin_mutex::scoped_lock lock(smutex);
534 3747561811 : val--;
535 3720403410 : return val;
536 : }
537 :
538 : T operator--(int)
539 : {
540 : spin_mutex::scoped_lock lock(smutex);
541 : val--;
542 : return val;
543 : }
544 :
545 : private:
546 : T val;
547 : spin_mutex smutex;
548 : };
549 :
550 : } // namespace Threads
551 :
552 : } // namespace libMesh
553 :
554 : #endif // #ifdef LIBMESH_HAVE_PTHREAD
555 :
556 : #endif // LIBMESH_SQUASH_HEADER_WARNING
557 :
558 : #endif // LIBMESH_THREADS_PTHREAD_H
|