Line data Source code
1 : //* This file is part of the MOOSE framework
2 : //* https://mooseframework.inl.gov
3 : //*
4 : //* All rights reserved, see COPYRIGHT for full restrictions
5 : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 : //*
7 : //* Licensed under LGPL 2.1, please see LICENSE for details
8 : //* https://www.gnu.org/licenses/lgpl-2.1.html
9 :
10 : #pragma once
11 :
12 : #include "ParallelStudyMethod.h"
13 :
14 : // MOOSE includes
15 : #include "InputParameters.h"
16 : #include "SharedPool.h"
17 : #include "MooseEnum.h"
18 : #include "CircularBuffer.h"
19 : #include "LIFOBuffer.h"
20 :
21 : // Local includes
22 : #include "SendBuffer.h"
23 : #include "ReceiveBuffer.h"
24 :
25 : // libMesh Includes
26 : #include "libmesh/parallel_object.h"
27 :
28 : template <typename WorkType, typename ParallelDataType>
29 : class ParallelStudy : public libMesh::ParallelObject
30 : {
31 : public:
32 : typedef typename MooseUtils::Buffer<WorkType>::iterator work_iterator;
33 : typedef typename MooseUtils::Buffer<std::shared_ptr<ParallelDataType>>::iterator
34 : parallel_data_iterator;
35 :
36 : static InputParameters validParams();
37 :
38 : ParallelStudy(const libMesh::Parallel::Communicator & comm,
39 : const InputParameters & params,
40 : const std::string & name);
41 :
42 : /**
43 : * Pre-execute method that MUST be called before execute() and before adding work.
44 : */
45 : void preExecute();
46 : /**
47 : * Execute method.
48 : */
49 : void execute();
50 :
51 : /**
52 : * Adds work to the buffer to be executed. This will move the work into the buffer
53 : * (with std::move), therefore the passed in work will be invalid after this call.
54 : * For the purposes of the completion algorithm, this added work is considered
55 : * NEW work.
56 : *
57 : * During pre-execution (between preExecute() and execute()), this method can ONLY
58 : * be called on thread 0.
59 : *
60 : * During execute(), this method is thread safe and can be used to add work during execution.
61 : */
62 : ///@{
63 : void moveWorkToBuffer(WorkType & work, const THREAD_ID tid);
64 : void moveWorkToBuffer(const work_iterator begin, const work_iterator end, const THREAD_ID tid);
65 : void moveWorkToBuffer(std::vector<WorkType> & work, const THREAD_ID tid);
66 : ///@}
67 :
68 : /**
69 : * Acquire a parallel data object from the pool.
70 : */
71 : template <typename... Args>
72 : typename MooseUtils::SharedPool<ParallelDataType>::PtrType
73 : acquireParallelData(const THREAD_ID tid, Args &&... args)
74 : {
75 12018600 : return _parallel_data_pools[tid].acquire(std::forward<Args>(args)...);
76 : }
77 :
78 : /**
79 : * Moves parallel data objects to the send buffer to be communicated to processor \p dest_pid.
80 : */
81 : void moveParallelDataToBuffer(std::shared_ptr<ParallelDataType> & data,
82 : const processor_id_type dest_pid);
83 :
84 : /**
85 : * Gets the receive buffer.
86 : */
87 : const ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>> &
88 : receiveBuffer() const
89 : {
90 : return *_receive_buffer;
91 : }
92 :
93 : /**
94 : * Gets the work buffer.
95 : */
96 : const MooseUtils::Buffer<WorkType> & workBuffer() const { return *_work_buffer; }
97 :
98 : /**
99 : * Gets the total number of send buffer pools created.
100 : */
101 : unsigned long long int sendBufferPoolCreated() const;
102 : /**
103 : * Gets the total number of parallel data objects sent from this processor.
104 : */
105 : unsigned long long int parallelDataSent() const;
106 : /**
107 : * Gets the total number of buffers sent from this processor.
108 : */
109 : unsigned long long int buffersSent() const;
110 : /**
111 : * Gets the total number of parallel data created in all of the threaded pools
112 : */
113 : unsigned long long int poolParallelDataCreated() const;
114 :
115 : /**
116 : * Gets the total amount of work started from this processor.
117 : */
118 14 : unsigned long long int localWorkStarted() const { return _local_work_started; }
119 : /**
120 : * Gets the total amount of work executed on this processor.
121 : */
122 14 : unsigned long long int localWorkExecuted() const { return _local_work_executed; }
123 : /**
124 : * Gets the total amount of work completeed across all processors.
125 : */
126 284 : unsigned long long int totalWorkCompleted() const { return _total_work_completed; }
127 : /**
128 : * Gets the total number of chunks of work executed on this processor.
129 : */
130 14 : unsigned long long int localChunksExecuted() const { return _local_chunks_executed; }
131 :
132 : /**
133 : * Whether or not this object is currently in execute().
134 : */
135 1104 : bool currentlyExecuting() const { return _currently_executing; }
136 : /**
137 : * Whether or not this object is between preExecute() and execute().
138 : */
139 9625 : bool currentlyPreExecuting() const { return _currently_pre_executing; }
140 :
141 : /**
142 : * Gets the max buffer size
143 : */
144 : unsigned int maxBufferSize() const { return _max_buffer_size; }
145 : /**
146 : * Gets the chunk size
147 : */
148 : unsigned int chunkSize() const { return _chunk_size; }
149 :
150 : /**
151 : * Gets the number of iterations to wait before communicating
152 : */
153 : unsigned int clicksPerCommunication() const { return _clicks_per_communication; }
154 : /**
155 : * Gets the number of iterations to wait before communicating with root
156 : */
157 : unsigned int clicksPerRootCommunication() const { return _clicks_per_root_communication; }
158 : /**
159 : * Gets the number of iterations to wait before checking for new parallel data
160 : */
161 : unsigned int clicksPerReceive() const { return _clicks_per_receive; }
162 :
163 : /**
164 : * Gets the method
165 : */
166 : ParallelStudyMethod method() const { return _method; }
167 :
168 : /**
169 : * Reserve \p size entries in the work buffer.
170 : *
171 : * This can only be used during the pre-execution phase (between preExecute() and execute()).
172 : *
173 : * This is particularly useful when one wants to move many work objects into the buffer using
174 : * moveWorkToBuffer() and wants to allocate the space ahead of time.
175 : */
176 : void reserveBuffer(const std::size_t size);
177 :
178 : protected:
179 : /**
180 : * Enum for providing useful errors during work addition in moveWorkError().
181 : */
182 : enum MoveWorkError
183 : {
184 : DURING_EXECUTION_DISABLED,
185 : PRE_EXECUTION_AND_EXECUTION_ONLY,
186 : PRE_EXECUTION_ONLY,
187 : PRE_EXECUTION_THREAD_0_ONLY,
188 : CONTINUING_DURING_EXECUTING_WORK
189 : };
190 :
191 : /**
192 : * Creates the work buffer
193 : *
194 : * This is virtual so that derived classes can use their own specialized buffers
195 : */
196 : virtual std::unique_ptr<MooseUtils::Buffer<WorkType>> createWorkBuffer();
197 :
198 : /**
199 : * Pure virtual to be overridden that executes a single object of work on a given thread
200 : */
201 : virtual void executeWork(const WorkType & work, const THREAD_ID tid) = 0;
202 :
203 : /**
204 : * Virtual that allows for the customization of error text for moving work into the buffer.
205 : */
206 : virtual void moveWorkError(const MoveWorkError error, const WorkType * work = nullptr) const;
207 :
208 : /**
209 : * Insertion point for derived classes to provide an alternate ending criteria for
210 : * SMART execution. Only called when _has_alternate_ending_criteria == true.
211 : */
212 : virtual bool alternateSmartEndingCriteriaMet();
213 :
214 : /**
215 : * Insertion point for acting on work that was just executed.
216 : *
217 : * This is not called in threads.
218 : */
219 0 : virtual void postExecuteChunk(const work_iterator /* begin */, const work_iterator /* end */) {}
220 :
221 : /**
222 : * Insertion point called just after trying to receive work and just before beginning
223 : * work on the work buffer
224 : */
225 4403306 : virtual void preReceiveAndExecute() {}
226 :
227 : /**
228 : * Pure virtual for acting on parallel data that has JUST been received and
229 : * filled into the buffer.
230 : *
231 : * The parallel data in the range passed here will have its use count reduced
232 : * by one if it still exists after this call.
233 : */
234 : virtual void postReceiveParallelData(const parallel_data_iterator begin,
235 : const parallel_data_iterator end) = 0;
236 :
237 : /**
238 : * Can be overridden to denote if a piece of work is not complete yet.
239 : *
240 : * The complete terminology is used within the execution algorithms to
241 : * determine if the study is complete.
242 : */
243 0 : virtual bool workIsComplete(const WorkType & /* work */) { return true; }
244 :
245 : /**
246 : * Moves work that is considered continuing for the purposes of the execution
247 : * algorithm into the buffer.
248 : */
249 : ///@{
250 : void moveContinuingWorkToBuffer(WorkType & Work);
251 : void moveContinuingWorkToBuffer(const work_iterator begin, const work_iterator end);
252 : ///@}
253 :
254 : /**
255 : * Whether or not ALL of the buffers are empty:
256 : * Working buffer, threaded buffers, receive buffer, and send buffers.
257 : */
258 : bool buffersAreEmpty() const;
259 :
260 : /// This rank
261 : const processor_id_type _pid;
262 : /// Name for this object for use in error handling
263 : const std::string _name;
264 : /// The InputParameters
265 : const InputParameters & _params;
266 : /// The study method
267 : const ParallelStudyMethod _method;
268 : /// Whether or not this object has alternate ending criteria
269 : bool _has_alternate_ending_criteria;
270 :
271 : private:
272 : /**
273 : * Flushes all parallel data out of the send buffers
274 : */
275 : void flushSendBuffers();
276 :
277 : /**
278 : * Execute work using SMART
279 : */
280 : void smartExecute();
281 : /**
282 : * Execute work using HARM
283 : */
284 : void harmExecute();
285 : /**
286 : * Execute work using BS
287 : */
288 : void bsExecute();
289 :
290 : /**
291 : * Receive packets of parallel data from other processors and executes work
292 : */
293 : bool receiveAndExecute();
294 :
295 : /**
296 : * Execute a chunk of work and buffer
297 : */
298 : void executeAndBuffer(const std::size_t chunk_size);
299 :
300 : /**
301 : * Internal check for if it is allowed to currently add work in moveWorkToBuffer().
302 : */
303 : void canMoveWorkCheck(const THREAD_ID tid);
304 :
305 : /**
306 : * Internal method for acting on the parallel data that has just been received into
307 : * the parallel buffer
308 : */
309 : void postReceiveParallelDataInternal();
310 :
311 : /// Minimum size of a SendBuffer
312 : const unsigned int _min_buffer_size;
313 : /// Number of objects to buffer before communication
314 : const unsigned int _max_buffer_size;
315 : /// Multiplier for the buffer size for growing the buffer
316 : const Real _buffer_growth_multiplier;
317 : /// Multiplier for the buffer size for shrinking the buffer
318 : const Real _buffer_shrink_multiplier;
319 : /// Number of objects to execute at once during communication
320 : const unsigned int _chunk_size;
321 : /// Whether or not to allow the addition of new work to the buffer during execution
322 : const bool _allow_new_work_during_execution;
323 :
324 : /// Iterations to wait before communicating
325 : const unsigned int _clicks_per_communication;
326 : /// Iterations to wait before communicating with root
327 : const unsigned int _clicks_per_root_communication;
328 : /// Iterations to wait before checking for new objects
329 : const unsigned int _clicks_per_receive;
330 :
331 : /// MessageTag for sending parallel data
332 : Parallel::MessageTag _parallel_data_buffer_tag;
333 : /// Pools for re-using destructed parallel data objects (one for each thread)
334 : std::vector<MooseUtils::SharedPool<ParallelDataType>> _parallel_data_pools;
335 : /// Threaded temprorary storage for work added while we're using the _work_buffer (one for each thread)
336 : std::vector<std::vector<WorkType>> _temp_threaded_work;
337 : /// Buffer for executing work
338 : const std::unique_ptr<MooseUtils::Buffer<WorkType>> _work_buffer;
339 : /// The receive buffer
340 : const std::unique_ptr<ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>
341 : _receive_buffer;
342 : /// Send buffers for each processor
343 : std::unordered_map<
344 : processor_id_type,
345 : std::unique_ptr<SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>>
346 : _send_buffers;
347 :
348 : /// Number of chunks of work executed on this processor
349 : unsigned long long int _local_chunks_executed;
350 : /// Amount of work completed on this processor
351 : unsigned long long int _local_work_completed;
352 : /// Amount of work started on this processor
353 : unsigned long long int _local_work_started;
354 : /// Amount of work executed on this processor
355 : unsigned long long int _local_work_executed;
356 : /// Amount of work started on all processors
357 : unsigned long long int _total_work_started;
358 : /// Amount of work completed on all processors
359 : unsigned long long int _total_work_completed;
360 :
361 : /// Whether we are within execute()
362 : bool _currently_executing;
363 : /// Whether we are between preExecute() and execute()
364 : bool _currently_pre_executing;
365 : /// Whether or not we are currently within executeAndBuffer()
366 : bool _currently_executing_work;
367 : };
368 :
369 : template <typename WorkType, typename ParallelDataType>
370 3827 : ParallelStudy<WorkType, ParallelDataType>::ParallelStudy(
371 : const libMesh::Parallel::Communicator & comm,
372 : const InputParameters & params,
373 : const std::string & name)
374 : : ParallelObject(comm),
375 3827 : _pid(comm.rank()),
376 3827 : _name(name),
377 3827 : _params(params),
378 :
379 3827 : _method((ParallelStudyMethod)(int)(params.get<MooseEnum>("method"))),
380 3827 : _has_alternate_ending_criteria(false),
381 7654 : _min_buffer_size(params.isParamSetByUser("min_buffer_size")
382 7654 : ? params.get<unsigned int>("min_buffer_size")
383 7654 : : params.get<unsigned int>("send_buffer_size")),
384 3827 : _max_buffer_size(params.get<unsigned int>("send_buffer_size")),
385 3827 : _buffer_growth_multiplier(params.get<Real>("buffer_growth_multiplier")),
386 3827 : _buffer_shrink_multiplier(params.get<Real>("buffer_shrink_multiplier")),
387 3827 : _chunk_size(params.get<unsigned int>("chunk_size")),
388 3827 : _allow_new_work_during_execution(params.get<bool>("allow_new_work_during_execution")),
389 :
390 3827 : _clicks_per_communication(params.get<unsigned int>("clicks_per_communication")),
391 3827 : _clicks_per_root_communication(params.get<unsigned int>("clicks_per_root_communication")),
392 3827 : _clicks_per_receive(params.get<unsigned int>("clicks_per_receive")),
393 :
394 3827 : _parallel_data_buffer_tag(comm.get_unique_tag()),
395 3827 : _parallel_data_pools(libMesh::n_threads()),
396 3827 : _temp_threaded_work(libMesh::n_threads()),
397 3827 : _work_buffer(createWorkBuffer()),
398 3827 : _receive_buffer(std::make_unique<
399 : ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>(
400 3827 : comm, this, _method, _clicks_per_receive, _parallel_data_buffer_tag)),
401 :
402 3827 : _currently_executing(false),
403 3827 : _currently_pre_executing(false),
404 3827 : _currently_executing_work(false)
405 : {
406 : #ifndef LIBMESH_HAVE_OPENMP
407 : if (libMesh::n_threads() != 1)
408 : mooseWarning(_name, ": Threading will not be used without OpenMP");
409 : #endif
410 :
411 3827 : if (_method != ParallelStudyMethod::SMART && _allow_new_work_during_execution)
412 0 : mooseError(_name,
413 : ": When allowing new work addition during execution\n",
414 : "('allow_new_work_during_execution = true'), the method must be SMART");
415 3827 : }
416 :
417 : template <typename WorkType, typename ParallelDataType>
418 : std::unique_ptr<MooseUtils::Buffer<WorkType>>
419 3827 : ParallelStudy<WorkType, ParallelDataType>::createWorkBuffer()
420 : {
421 3827 : std::unique_ptr<MooseUtils::Buffer<WorkType>> buffer;
422 :
423 3827 : const auto buffer_type = _params.get<MooseEnum>("work_buffer_type");
424 3827 : if (buffer_type == "lifo")
425 19 : buffer = std::make_unique<MooseUtils::LIFOBuffer<WorkType>>();
426 3808 : else if (buffer_type == "circular")
427 3808 : buffer = std::make_unique<MooseUtils::CircularBuffer<WorkType>>();
428 : else
429 0 : mooseError("Unknown work buffer type");
430 :
431 3827 : return buffer;
432 3827 : }
433 :
434 : template <typename WorkType, typename ParallelDataType>
435 : InputParameters
436 7576 : ParallelStudy<WorkType, ParallelDataType>::validParams()
437 : {
438 7576 : auto params = emptyInputParameters();
439 :
440 22728 : params.addRangeCheckedParam<unsigned int>(
441 15152 : "send_buffer_size", 100, "send_buffer_size > 0", "The size of the send buffer");
442 22728 : params.addRangeCheckedParam<unsigned int>(
443 : "chunk_size",
444 15152 : 100,
445 : "chunk_size > 0",
446 : "The number of objects to process at one time during execution");
447 22728 : params.addRangeCheckedParam<unsigned int>("clicks_per_communication",
448 15152 : 10,
449 : "clicks_per_communication >= 0",
450 : "Iterations to wait before communicating");
451 22728 : params.addRangeCheckedParam<unsigned int>("clicks_per_root_communication",
452 15152 : 10,
453 : "clicks_per_root_communication > 0",
454 : "Iterations to wait before communicating with root");
455 22728 : params.addRangeCheckedParam<unsigned int>("clicks_per_receive",
456 15152 : 1,
457 : "clicks_per_receive > 0",
458 : "Iterations to wait before checking for new objects");
459 :
460 15152 : params.addParam<unsigned int>("min_buffer_size",
461 : "The initial size of the SendBuffer and the floor for shrinking "
462 : "it. This defaults to send_buffer_size if not set (i.e. the "
463 : "buffer won't change size)");
464 15152 : params.addParam<Real>("buffer_growth_multiplier",
465 15152 : 2.,
466 : "How much to grow a SendBuffer by if the buffer completely fills and "
467 : "dumps. Will max at send_buffer_size");
468 22728 : params.addRangeCheckedParam<Real>("buffer_shrink_multiplier",
469 15152 : 0.5,
470 : "0 < buffer_shrink_multiplier <= 1.0",
471 : "Multiplier (between 0 and 1) to apply to the current buffer "
472 : "size if it is force dumped. Will stop at "
473 : "min_buffer_size.");
474 :
475 15152 : params.addParam<bool>(
476 : "allow_new_work_during_execution",
477 15152 : true,
478 : "Whether or not to allow the addition of new work to the work buffer during execution");
479 :
480 15152 : MooseEnum methods("smart harm bs", "smart");
481 15152 : params.addParam<MooseEnum>("method", methods, "The algorithm to use");
482 :
483 15152 : MooseEnum work_buffers("lifo circular", "circular");
484 15152 : params.addParam<MooseEnum>("work_buffer_type", work_buffers, "The work buffer type to use");
485 :
486 15152 : params.addParamNamesToGroup(
487 : "send_buffer_size chunk_size clicks_per_communication clicks_per_root_communication "
488 : "clicks_per_receive min_buffer_size buffer_growth_multiplier buffer_shrink_multiplier method "
489 : "work_buffer_type allow_new_work_during_execution",
490 : "Advanced");
491 :
492 7576 : return params;
493 7576 : }
494 :
495 : template <typename WorkType, typename ParallelDataType>
496 : void
497 62535 : ParallelStudy<WorkType, ParallelDataType>::executeAndBuffer(const std::size_t chunk_size)
498 : {
499 62535 : _currently_executing_work = true;
500 :
501 : // If chunk_size > the number of objects left, this will properly grab all of them
502 62535 : const auto begin = _work_buffer->beginChunk(chunk_size);
503 62535 : const auto end = _work_buffer->endChunk(chunk_size);
504 :
505 62509 : _local_chunks_executed++;
506 :
507 : #ifdef LIBMESH_HAVE_OPENMP
508 62509 : #pragma omp parallel
509 : #endif
510 : {
511 : const THREAD_ID tid =
512 : #ifdef LIBMESH_HAVE_OPENMP
513 : omp_get_thread_num();
514 : #else
515 : 0;
516 : #endif
517 :
518 : #ifdef LIBMESH_HAVE_OPENMP
519 : #pragma omp for schedule(dynamic, 20) nowait
520 : #endif
521 : for (auto it = begin; it < end; ++it)
522 : executeWork(*it, tid);
523 : }
524 :
525 : // Increment the executed and completed counters
526 62509 : _local_work_executed += std::distance(begin, end);
527 5442679 : for (auto it = begin; it != end; ++it)
528 5380170 : if (workIsComplete(*it))
529 4120844 : ++_local_work_completed;
530 :
531 : // Insertion point for derived classes to do something to the completed work
532 : // Example: Create ParallelData to spawn additional work on another processor
533 62509 : postExecuteChunk(begin, end);
534 :
535 : // Remove the objects we just worked on from the buffer
536 62509 : _work_buffer->eraseChunk(chunk_size);
537 :
538 : // If new work is allowed to be geneated during execution, it goes into _temp_threaded_work
539 : // during the threaded execution phase and then must be moved into the working buffer
540 62509 : if (_allow_new_work_during_execution)
541 : {
542 : // Amount of work that needs to be moved into the main working buffer from
543 : // the temporary working buffer
544 : std::size_t threaded_work_size = 0;
545 139017 : for (const auto & work_objects : _temp_threaded_work)
546 76570 : threaded_work_size += work_objects.size();
547 :
548 62447 : if (threaded_work_size)
549 : {
550 : // We don't ever want to decrease the capacity, so only set it if we need more entries
551 718 : if (_work_buffer->capacity() < _work_buffer->size() + threaded_work_size)
552 : _work_buffer->setCapacity(_work_buffer->size() + threaded_work_size);
553 :
554 : // Move the work into the buffer
555 1593 : for (auto & threaded_work_vector : _temp_threaded_work)
556 : {
557 83315 : for (auto & work : threaded_work_vector)
558 82440 : _work_buffer->move(work);
559 875 : threaded_work_vector.clear();
560 : }
561 :
562 : // Variable that must be set when adding work so that the algorithm can keep count
563 : // of how much work still needs to be executed
564 718 : _local_work_started += threaded_work_size;
565 : }
566 : }
567 :
568 62509 : if (_method == ParallelStudyMethod::HARM)
569 24 : flushSendBuffers();
570 :
571 62509 : _currently_executing_work = false;
572 62509 : }
573 :
574 : template <typename WorkType, typename ParallelDataType>
575 : void
576 1259326 : ParallelStudy<WorkType, ParallelDataType>::moveParallelDataToBuffer(
577 : std::shared_ptr<ParallelDataType> & data, const processor_id_type dest_pid)
578 : {
579 : mooseAssert(comm().size() > dest_pid, "Invalid processor ID");
580 : mooseAssert(_pid != dest_pid, "Processor ID is self");
581 :
582 1259326 : if (!_currently_executing && !_currently_pre_executing)
583 0 : mooseError(_name, ": Cannot sendParallelData() when not executing");
584 :
585 : // Get the send buffer for the proc this object is going to
586 : auto find_pair = _send_buffers.find(dest_pid);
587 : // Need to create a send buffer for said processor
588 1259326 : if (find_pair == _send_buffers.end())
589 : _send_buffers
590 3653 : .emplace(dest_pid,
591 : std::make_unique<
592 : SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>(
593 : comm(),
594 : this,
595 : dest_pid,
596 3653 : _method,
597 3653 : _min_buffer_size,
598 3653 : _max_buffer_size,
599 3653 : _buffer_growth_multiplier,
600 3653 : _buffer_shrink_multiplier,
601 3653 : _parallel_data_buffer_tag))
602 3653 : .first->second->moveObject(data);
603 : // Send buffer exists for this processor
604 : else
605 1255673 : find_pair->second->moveObject(data);
606 1259326 : }
607 :
608 : template <typename WorkType, typename ParallelDataType>
609 : void
610 445549 : ParallelStudy<WorkType, ParallelDataType>::flushSendBuffers()
611 : {
612 784066 : for (auto & send_buffer_iter : _send_buffers)
613 338517 : send_buffer_iter.second->forceSend();
614 445549 : }
615 :
616 : template <typename WorkType, typename ParallelDataType>
617 : void
618 7561 : ParallelStudy<WorkType, ParallelDataType>::reserveBuffer(const std::size_t size)
619 : {
620 7561 : if (!_currently_pre_executing)
621 0 : mooseError(_name, ": Can only reserve in object buffer during pre-execution");
622 :
623 : // We don't ever want to decrease the capacity, so only set if we need more entries
624 7561 : if (_work_buffer->capacity() < size)
625 : _work_buffer->setCapacity(size);
626 7561 : }
627 :
628 : template <typename WorkType, typename ParallelDataType>
629 : void
630 4417935 : ParallelStudy<WorkType, ParallelDataType>::postReceiveParallelDataInternal()
631 : {
632 4417935 : if (_receive_buffer->buffer().empty())
633 : return;
634 :
635 : // Let derived classes work on the data and then clear it after
636 7019 : postReceiveParallelData(_receive_buffer->buffer().begin(), _receive_buffer->buffer().end());
637 1266345 : for (auto & data : _receive_buffer->buffer())
638 1259326 : if (data)
639 0 : data.reset();
640 :
641 : _receive_buffer->buffer().clear();
642 : }
643 :
644 : template <typename WorkType, typename ParallelDataType>
645 : bool
646 4403306 : ParallelStudy<WorkType, ParallelDataType>::receiveAndExecute()
647 : {
648 : bool executed_some = false;
649 :
650 4403306 : if (_receive_buffer->currentlyReceiving() && _method == ParallelStudyMethod::SMART)
651 : _receive_buffer->cleanupRequests();
652 : else
653 4403306 : _receive_buffer->receive();
654 :
655 4403306 : postReceiveParallelDataInternal();
656 :
657 4403306 : preReceiveAndExecute();
658 :
659 4465777 : while (!_work_buffer->empty())
660 : {
661 : executed_some = true;
662 :
663 : // Switch between tracing a chunk and buffering with SMART
664 62497 : if (_method == ParallelStudyMethod::SMART)
665 : {
666 : // Look for extra work first so that these transfers can be finishing while we're executing
667 : // Start receives only if our work buffer is decently sized
668 62473 : const bool start_receives_only = _work_buffer->size() > (2 * _chunk_size);
669 62473 : _receive_buffer->receive(_work_buffer->size() > (2 * _chunk_size));
670 62473 : if (!start_receives_only)
671 14554 : postReceiveParallelDataInternal();
672 :
673 : // Execute some objects
674 62473 : executeAndBuffer(_chunk_size);
675 : }
676 : // Execute all of them and then buffer with the other methods
677 : else
678 24 : executeAndBuffer(_work_buffer->size());
679 : }
680 :
681 4403280 : return executed_some;
682 : }
683 :
684 : template <typename WorkType, typename ParallelDataType>
685 : void
686 7925 : ParallelStudy<WorkType, ParallelDataType>::smartExecute()
687 : {
688 : mooseAssert(_method == ParallelStudyMethod::SMART, "Should be called with SMART only");
689 :
690 : // Request for the sum of the started work
691 7925 : Parallel::Request started_request;
692 : // Request for the sum of the completed work
693 7925 : Parallel::Request completed_request;
694 :
695 : // Temp for use in sending the current value in a nonblocking sum instead of an updated value
696 : unsigned long long int temp;
697 :
698 : // Whether or not to make the started request first, or after every finished request.
699 : // When allowing adding new work during the execution phase, the starting object counts could
700 : // change after right now, so we must update them after each finished request is complete.
701 : // When not allowing generation during propagation, we know the counts up front.
702 7925 : const bool started_request_first = !_allow_new_work_during_execution;
703 :
704 : // Get the amount of work that was started in the whole domain, if applicable
705 7925 : if (started_request_first)
706 0 : comm().sum(_local_work_started, _total_work_started, started_request);
707 :
708 : // Whether or not the started request has been made
709 : bool made_started_request = started_request_first;
710 : // Whether or not the completed request has been made
711 : bool made_completed_request = false;
712 :
713 : // Good time to get rid of whatever's currently in our SendBuffers
714 7925 : flushSendBuffers();
715 :
716 : // Use these to try to delay some forced communication
717 : unsigned int non_executing_clicks = 0;
718 : unsigned int non_executing_root_clicks = 0;
719 : bool executed_some = true;
720 :
721 : // Keep executing work until it has all completed
722 : while (true)
723 : {
724 4403254 : executed_some = receiveAndExecute();
725 :
726 4403228 : if (executed_some)
727 : {
728 : non_executing_clicks = 0;
729 : non_executing_root_clicks = 0;
730 : }
731 : else
732 : {
733 4391913 : non_executing_clicks++;
734 4391913 : non_executing_root_clicks++;
735 : }
736 :
737 4403228 : if (non_executing_clicks >= _clicks_per_communication)
738 : {
739 : non_executing_clicks = 0;
740 :
741 437459 : flushSendBuffers();
742 : }
743 :
744 4403228 : if (_has_alternate_ending_criteria)
745 : {
746 0 : if (buffersAreEmpty() && alternateSmartEndingCriteriaMet())
747 : {
748 0 : comm().barrier();
749 : return;
750 : }
751 : }
752 4403228 : else if (non_executing_root_clicks >= _clicks_per_root_communication)
753 : {
754 : non_executing_root_clicks = 0;
755 :
756 : // We need the starting work sum first but said request isn't complete yet
757 437459 : if (started_request_first && !started_request.test())
758 0 : continue;
759 :
760 : // At this point, we need to make a request for the completed work sum
761 437459 : if (!made_completed_request)
762 : {
763 : made_completed_request = true;
764 14261 : temp = _local_work_completed;
765 14261 : comm().sum(temp, _total_work_completed, completed_request);
766 14261 : continue;
767 : }
768 :
769 : // We have the completed work sum
770 423198 : if (completed_request.test())
771 : {
772 : // The starting work sum must be requested /after/ we have finishing counts and we
773 : // need to make the request for said sum
774 153191 : if (!made_started_request)
775 : {
776 : made_started_request = true;
777 14261 : temp = _local_work_started;
778 14261 : comm().sum(temp, _total_work_started, started_request);
779 14261 : continue;
780 : }
781 :
782 : // The starting work sum must be requested /after/ we have finishing sum and we
783 : // don't have the starting sum yet
784 138930 : if (!started_request_first && !started_request.test())
785 124669 : continue;
786 :
787 : // Started count is the same as the finished count - we're done!
788 14261 : if (_total_work_started == _total_work_completed)
789 : return;
790 :
791 : // Next time around we should make a completed sum request
792 : made_completed_request = false;
793 : // If we need the starting work sum after the completed work sum, we need those now as well
794 6362 : if (!started_request_first)
795 : made_started_request = false;
796 : }
797 : }
798 : }
799 7899 : }
800 :
801 : template <typename WorkType, typename ParallelDataType>
802 : void
803 14 : ParallelStudy<WorkType, ParallelDataType>::harmExecute()
804 : {
805 14 : if (_has_alternate_ending_criteria)
806 0 : mooseError("ParallelStudy: Alternate ending criteria not yet supported for HARM");
807 14 : if (_allow_new_work_during_execution)
808 0 : mooseError(_name, ": The addition of new work during execution is not supported by HARM");
809 : mooseAssert(_method == ParallelStudyMethod::HARM, "Should be called with HARM only");
810 :
811 : // Request for the total amount of work started
812 14 : Parallel::Request work_started_request;
813 : // Requests for sending the amount of finished worked to every other processor
814 14 : std::vector<Parallel::Request> work_completed_requests(comm().size());
815 : // Whether or not the finished requests have been sent to each processor
816 14 : std::vector<bool> work_completed_requests_sent(comm().size(), false);
817 : // Values of work completed on this processor that are being sent to other processors
818 14 : std::vector<unsigned long long int> work_completed_requests_temps(comm().size(), 0);
819 : // Work completed by each processor
820 14 : std::vector<unsigned long long int> work_completed_per_proc(comm().size(), 0);
821 : // Tag for sending work finished
822 14 : const auto work_completed_requests_tag = comm().get_unique_tag();
823 :
824 : // Get the amount of work that was started in the whole domain
825 14 : comm().sum(_local_work_started, _total_work_started, work_started_request);
826 :
827 : // All work has been executed, so time to communicate
828 14 : flushSendBuffers();
829 :
830 : // HARM only does some communication based on times through the loop.
831 : // This counter will be used for that
832 : unsigned int communication_clicks = 0;
833 :
834 : Parallel::Status work_completed_probe_status;
835 : int work_completed_probe_flag;
836 :
837 : // Keep working until done
838 38 : while (true)
839 : {
840 52 : receiveAndExecute();
841 :
842 52 : flushSendBuffers();
843 :
844 52 : if (communication_clicks > comm().size())
845 : {
846 : // Receive messages about work being finished
847 : do
848 : {
849 34 : MPI_Iprobe(MPI_ANY_SOURCE,
850 : work_completed_requests_tag.value(),
851 : comm().get(),
852 : &work_completed_probe_flag,
853 : work_completed_probe_status.get());
854 :
855 34 : if (work_completed_probe_flag)
856 : {
857 : auto proc = work_completed_probe_status.source();
858 20 : comm().receive(proc, work_completed_per_proc[proc], work_completed_requests_tag);
859 : }
860 34 : } while (work_completed_probe_flag);
861 :
862 28 : _total_work_completed = std::accumulate(
863 : work_completed_per_proc.begin(), work_completed_per_proc.end(), _local_work_completed);
864 :
865 : // Reset
866 : communication_clicks = 0;
867 : }
868 :
869 : // Send messages about objects being finished
870 144 : for (processor_id_type pid = 0; pid < comm().size(); ++pid)
871 40 : if (pid != _pid &&
872 132 : (!work_completed_requests_sent[pid] || work_completed_requests[pid].test()) &&
873 40 : _local_work_completed > work_completed_requests_temps[pid])
874 : {
875 20 : work_completed_requests_temps[pid] = _local_work_completed;
876 20 : comm().send(pid,
877 : work_completed_requests_temps[pid],
878 : work_completed_requests[pid],
879 : work_completed_requests_tag);
880 : work_completed_requests_sent[pid] = true;
881 : }
882 :
883 : // All procs agree on the amount of work started and we've finished all the work started
884 52 : if (work_started_request.test() && _total_work_started == _total_work_completed)
885 : {
886 : // Need to call the post wait work for all of the requests
887 38 : for (processor_id_type pid = 0; pid < comm().size(); ++pid)
888 24 : if (pid != _pid)
889 10 : work_completed_requests[pid].wait();
890 :
891 14 : return;
892 : }
893 :
894 38 : communication_clicks++;
895 : }
896 28 : }
897 :
898 : template <typename WorkType, typename ParallelDataType>
899 : void
900 14 : ParallelStudy<WorkType, ParallelDataType>::bsExecute()
901 : {
902 14 : if (_has_alternate_ending_criteria)
903 0 : mooseError("ParallelStudy: Alternate ending criteria not yet supported for BS");
904 14 : if (_allow_new_work_during_execution)
905 0 : mooseError(_name, ": The addition of new work during execution is not supported by BS");
906 : mooseAssert(_method == ParallelStudyMethod::BS, "Should be called with BS only");
907 :
908 14 : Parallel::Request work_completed_probe_status;
909 14 : Parallel::Request work_completed_request;
910 :
911 : // Temp for use in sending the current value in a nonblocking sum instead of an updated value
912 : unsigned long long int temp;
913 :
914 : // Get the amount of work that were started in the whole domain
915 14 : comm().sum(_local_work_started, _total_work_started, work_completed_probe_status);
916 :
917 : // Keep working until done
918 38 : while (true)
919 : {
920 : bool receiving = false;
921 : bool sending = false;
922 :
923 38 : Parallel::Request some_left_request;
924 38 : unsigned int some_left = 0;
925 38 : unsigned int all_some_left = 1;
926 :
927 : do
928 : {
929 75 : _receive_buffer->receive();
930 75 : postReceiveParallelDataInternal();
931 75 : flushSendBuffers();
932 :
933 : receiving = _receive_buffer->currentlyReceiving();
934 :
935 : sending = false;
936 109 : for (auto & send_buffer : _send_buffers)
937 34 : sending = sending || send_buffer.second->currentlySending() ||
938 : send_buffer.second->currentlyBuffered();
939 :
940 75 : if (!receiving && !sending && some_left_request.test() && all_some_left)
941 : {
942 38 : some_left = receiving || sending;
943 38 : comm().sum(some_left, all_some_left, some_left_request);
944 : }
945 112 : } while (receiving || sending || !some_left_request.test() || all_some_left);
946 :
947 38 : executeAndBuffer(_work_buffer->size());
948 :
949 38 : comm().barrier();
950 :
951 38 : if (work_completed_probe_status.test() && work_completed_request.test())
952 : {
953 38 : if (_total_work_started == _total_work_completed)
954 14 : return;
955 :
956 24 : temp = _local_work_completed;
957 24 : comm().sum(temp, _total_work_completed, work_completed_request);
958 : }
959 : }
960 14 : }
961 :
962 : template <typename WorkType, typename ParallelDataType>
963 : void
964 8023 : ParallelStudy<WorkType, ParallelDataType>::preExecute()
965 : {
966 8023 : if (!buffersAreEmpty())
967 0 : mooseError(_name, ": Buffers are not empty in preExecute()");
968 :
969 : // Clear communication buffers
970 9921 : for (auto & send_buffer_pair : _send_buffers)
971 1898 : send_buffer_pair.second->clear();
972 : _send_buffers.clear();
973 8023 : _receive_buffer->clear();
974 :
975 : // Clear counters
976 8023 : _local_chunks_executed = 0;
977 8023 : _local_work_completed = 0;
978 8023 : _local_work_started = 0;
979 8023 : _local_work_executed = 0;
980 8023 : _total_work_started = 0;
981 8023 : _total_work_completed = 0;
982 :
983 8023 : _currently_pre_executing = true;
984 8023 : }
985 :
986 : template <typename WorkType, typename ParallelDataType>
987 : void
988 7953 : ParallelStudy<WorkType, ParallelDataType>::execute()
989 : {
990 7953 : if (!_currently_pre_executing)
991 0 : mooseError(_name, ": preExecute() was not called before execute()");
992 :
993 7953 : _currently_pre_executing = false;
994 7953 : _currently_executing = true;
995 :
996 7953 : switch (_method)
997 : {
998 7925 : case ParallelStudyMethod::SMART:
999 7925 : smartExecute();
1000 7899 : break;
1001 14 : case ParallelStudyMethod::HARM:
1002 14 : harmExecute();
1003 14 : break;
1004 14 : case ParallelStudyMethod::BS:
1005 14 : bsExecute();
1006 14 : break;
1007 0 : default:
1008 0 : mooseError("Unknown ParallelStudyMethod");
1009 : }
1010 :
1011 7927 : _currently_executing = false;
1012 :
1013 : // Sanity checks on if we're really done
1014 7927 : comm().barrier();
1015 :
1016 7927 : if (!buffersAreEmpty())
1017 0 : mooseError(_name, ": Buffers are not empty after execution");
1018 7927 : }
1019 :
1020 : template <typename WorkType, typename ParallelDataType>
1021 : void
1022 0 : ParallelStudy<WorkType, ParallelDataType>::moveWorkError(
1023 : const MoveWorkError error, const WorkType * /* work = nullptr */) const
1024 : {
1025 : if (error == MoveWorkError::DURING_EXECUTION_DISABLED)
1026 0 : mooseError(_name,
1027 : ": The moving of new work into the buffer during work execution requires\n",
1028 : "that the parameter 'allow_new_work_during_execution = true'");
1029 : if (error == MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY)
1030 0 : mooseError(
1031 0 : _name,
1032 : ": Can only move work into the buffer in the pre-execution and execution phase\n(between "
1033 : "preExecute() and the end of execute()");
1034 : if (error == MoveWorkError::PRE_EXECUTION_ONLY)
1035 0 : mooseError(_name,
1036 : ": Can only move work into the buffer in the pre-execution phase\n(between "
1037 : "preExecute() and execute()");
1038 : if (error == MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY)
1039 0 : mooseError(_name,
1040 : ": Can only move work into the buffer in the pre-execution phase\n(between "
1041 : "preExecute() and execute()) on thread 0");
1042 : if (error == CONTINUING_DURING_EXECUTING_WORK)
1043 0 : mooseError(_name, ": Cannot move continuing work into the buffer during executeAndBuffer()");
1044 :
1045 0 : mooseError("Unknown MoveWorkError");
1046 : }
1047 :
1048 : template <typename WorkType, typename ParallelDataType>
1049 : void
1050 4120172 : ParallelStudy<WorkType, ParallelDataType>::canMoveWorkCheck(const THREAD_ID tid)
1051 : {
1052 4120172 : if (_currently_executing)
1053 : {
1054 82440 : if (!_allow_new_work_during_execution)
1055 0 : moveWorkError(MoveWorkError::DURING_EXECUTION_DISABLED);
1056 : }
1057 4037732 : else if (!_currently_pre_executing)
1058 : {
1059 0 : if (_allow_new_work_during_execution)
1060 0 : moveWorkError(MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY);
1061 : else
1062 0 : moveWorkError(MoveWorkError::PRE_EXECUTION_ONLY);
1063 : }
1064 4037732 : else if (tid != 0)
1065 0 : moveWorkError(MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY);
1066 4120172 : }
1067 :
1068 : template <typename WorkType, typename ParallelDataType>
1069 : void
1070 4119890 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(WorkType & work, const THREAD_ID tid)
1071 : {
1072 : // Error checks for moving work into the buffer at unallowed times
1073 4119890 : canMoveWorkCheck(tid);
1074 :
1075 : // Can move directly into the work buffer on thread 0 when we're not executing work
1076 4119890 : if (!_currently_executing_work && tid == 0)
1077 : {
1078 4037450 : ++_local_work_started; // must ALWAYS increment when adding new work to the working buffer
1079 4037450 : _work_buffer->move(work);
1080 : }
1081 : // Objects added during execution go into a temporary threaded vector (is thread safe) to be
1082 : // moved into the working buffer when possible
1083 : else
1084 82440 : _temp_threaded_work[tid].emplace_back(std::move(work));
1085 4119890 : }
1086 :
1087 : template <typename WorkType, typename ParallelDataType>
1088 : void
1089 282 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(const work_iterator begin,
1090 : const work_iterator end,
1091 : const THREAD_ID tid)
1092 : {
1093 : // Error checks for moving work into the buffer at unallowed times
1094 282 : canMoveWorkCheck(tid);
1095 :
1096 : // Get work size beforehand so we can resize
1097 : const auto size = std::distance(begin, end);
1098 :
1099 : // Can move directly into the work buffer on thread 0 when we're not executing work
1100 282 : if (!_currently_executing_work && tid == 0)
1101 : {
1102 282 : if (_work_buffer->capacity() < _work_buffer->size() + size)
1103 : _work_buffer->setCapacity(_work_buffer->size() + size);
1104 282 : _local_work_started += size;
1105 : }
1106 : else
1107 0 : _temp_threaded_work[tid].reserve(_temp_threaded_work[tid].size() + size);
1108 :
1109 : // Move the objects
1110 282 : if (!_currently_executing_work && tid == 0)
1111 1272 : for (auto it = begin; it != end; ++it)
1112 990 : _work_buffer->move(*it);
1113 : else
1114 0 : for (auto it = begin; it != end; ++it)
1115 0 : _temp_threaded_work[tid].emplace_back(std::move(*it));
1116 282 : }
1117 :
1118 : template <typename WorkType, typename ParallelDataType>
1119 : void
1120 282 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(std::vector<WorkType> & work_vector,
1121 : const THREAD_ID tid)
1122 : {
1123 282 : moveWorkToBuffer(work_vector.begin(), work_vector.end(), tid);
1124 282 : }
1125 :
1126 : template <typename WorkType, typename ParallelDataType>
1127 : void
1128 : ParallelStudy<WorkType, ParallelDataType>::moveContinuingWorkToBuffer(WorkType & work)
1129 : {
1130 : if (_currently_executing_work)
1131 : moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1132 :
1133 : _work_buffer->move(work);
1134 : }
1135 :
1136 : template <typename WorkType, typename ParallelDataType>
1137 : void
1138 7019 : ParallelStudy<WorkType, ParallelDataType>::moveContinuingWorkToBuffer(const work_iterator begin,
1139 : const work_iterator end)
1140 : {
1141 7019 : if (_currently_executing_work)
1142 0 : moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1143 :
1144 : const auto size = std::distance(begin, end);
1145 7019 : if (_work_buffer->capacity() < _work_buffer->size() + size)
1146 : _work_buffer->setCapacity(_work_buffer->size() + size);
1147 :
1148 1266345 : for (auto it = begin; it != end; ++it)
1149 1259326 : _work_buffer->move(*it);
1150 7019 : }
1151 :
1152 : template <typename WorkType, typename ParallelDataType>
1153 : unsigned long long int
1154 14 : ParallelStudy<WorkType, ParallelDataType>::sendBufferPoolCreated() const
1155 : {
1156 : unsigned long long int total = 0;
1157 :
1158 24 : for (const auto & buffer : _send_buffers)
1159 10 : total += buffer.second->bufferPoolCreated();
1160 :
1161 14 : return total;
1162 : }
1163 :
1164 : template <typename WorkType, typename ParallelDataType>
1165 : unsigned long long int
1166 14 : ParallelStudy<WorkType, ParallelDataType>::parallelDataSent() const
1167 : {
1168 : unsigned long long int total_sent = 0;
1169 :
1170 24 : for (const auto & buffer : _send_buffers)
1171 10 : total_sent += buffer.second->objectsSent();
1172 :
1173 14 : return total_sent;
1174 : }
1175 :
1176 : template <typename WorkType, typename ParallelDataType>
1177 : unsigned long long int
1178 14 : ParallelStudy<WorkType, ParallelDataType>::buffersSent() const
1179 : {
1180 : unsigned long long int total_sent = 0;
1181 :
1182 24 : for (const auto & buffer : _send_buffers)
1183 10 : total_sent += buffer.second->buffersSent();
1184 :
1185 14 : return total_sent;
1186 : }
1187 :
1188 : template <typename WorkType, typename ParallelDataType>
1189 : unsigned long long int
1190 : ParallelStudy<WorkType, ParallelDataType>::poolParallelDataCreated() const
1191 : {
1192 : unsigned long long int num_created = 0;
1193 :
1194 31 : for (const auto & pool : _parallel_data_pools)
1195 17 : num_created += pool.num_created();
1196 :
1197 : return num_created;
1198 : }
1199 :
1200 : template <typename WorkType, typename ParallelDataType>
1201 : bool
1202 0 : ParallelStudy<WorkType, ParallelDataType>::alternateSmartEndingCriteriaMet()
1203 : {
1204 0 : mooseError(_name, ": Unimplemented alternateSmartEndingCriteriaMet()");
1205 : }
1206 :
1207 : template <typename WorkType, typename ParallelDataType>
1208 : bool
1209 15950 : ParallelStudy<WorkType, ParallelDataType>::buffersAreEmpty() const
1210 : {
1211 15950 : if (!_work_buffer->empty())
1212 : return false;
1213 36594 : for (const auto & threaded_buffer : _temp_threaded_work)
1214 20644 : if (!threaded_buffer.empty())
1215 : return false;
1216 15950 : if (_receive_buffer->currentlyReceiving())
1217 : return false;
1218 21501 : for (const auto & map_pair : _send_buffers)
1219 5551 : if (map_pair.second->currentlySending() || map_pair.second->currentlyBuffered())
1220 : return false;
1221 :
1222 : return true;
1223 : }
|