https://mooseframework.inl.gov
ParallelStudy.h
Go to the documentation of this file.
1 //* This file is part of the MOOSE framework
2 //* https://mooseframework.inl.gov
3 //*
4 //* All rights reserved, see COPYRIGHT for full restrictions
5 //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 //*
7 //* Licensed under LGPL 2.1, please see LICENSE for details
8 //* https://www.gnu.org/licenses/lgpl-2.1.html
9 
10 #pragma once
11 
12 #include "ParallelStudyMethod.h"
13 
14 // MOOSE includes
15 #include "InputParameters.h"
16 #include "SharedPool.h"
17 #include "MooseEnum.h"
18 #include "CircularBuffer.h"
19 #include "LIFOBuffer.h"
20 
21 // Local includes
22 #include "SendBuffer.h"
23 #include "ReceiveBuffer.h"
24 
25 // libMesh Includes
26 #include "libmesh/parallel_object.h"
27 
28 template <typename WorkType, typename ParallelDataType>
30 {
31 public:
35 
37 
39  const InputParameters & params,
40  const std::string & name);
41 
45  void preExecute();
49  void execute();
50 
62  void moveWorkToBuffer(WorkType & work, const THREAD_ID tid);
64  void moveWorkToBuffer(const work_iterator begin, const work_iterator end, const THREAD_ID tid);
65  void moveWorkToBuffer(std::vector<WorkType> & work, const THREAD_ID tid);
67 
71  template <typename... Args>
73  acquireParallelData(const THREAD_ID tid, Args &&... args)
74  {
75  return _parallel_data_pools[tid].acquire(std::forward<Args>(args)...);
76  }
77 
81  void moveParallelDataToBuffer(std::shared_ptr<ParallelDataType> & data,
82  const processor_id_type dest_pid);
83 
88  receiveBuffer() const
89  {
90  return *_receive_buffer;
91  }
92 
97 
101  unsigned long long int sendBufferPoolCreated() const;
105  unsigned long long int parallelDataSent() const;
109  unsigned long long int buffersSent() const;
113  unsigned long long int poolParallelDataCreated() const;
114 
118  unsigned long long int localWorkStarted() const { return _local_work_started; }
122  unsigned long long int localWorkExecuted() const { return _local_work_executed; }
126  unsigned long long int totalWorkCompleted() const { return _total_work_completed; }
130  unsigned long long int localChunksExecuted() const { return _local_chunks_executed; }
131 
135  bool currentlyExecuting() const { return _currently_executing; }
140 
144  unsigned int maxBufferSize() const { return _max_buffer_size; }
148  unsigned int chunkSize() const { return _chunk_size; }
149 
153  unsigned int clicksPerCommunication() const { return _clicks_per_communication; }
161  unsigned int clicksPerReceive() const { return _clicks_per_receive; }
162 
166  ParallelStudyMethod method() const { return _method; }
167 
176  void reserveBuffer(const std::size_t size);
177 
178 protected:
183  {
189  };
190 
196  virtual std::unique_ptr<MooseUtils::Buffer<WorkType>> createWorkBuffer();
197 
201  virtual void executeWork(const WorkType & work, const THREAD_ID tid) = 0;
202 
206  virtual void moveWorkError(const MoveWorkError error, const WorkType * work = nullptr) const;
207 
212  virtual bool alternateSmartEndingCriteriaMet();
213 
219  virtual void postExecuteChunk(const work_iterator /* begin */, const work_iterator /* end */) {}
220 
225  virtual void preReceiveAndExecute() {}
226 
234  virtual void postReceiveParallelData(const parallel_data_iterator begin,
235  const parallel_data_iterator end) = 0;
236 
243  virtual bool workIsComplete(const WorkType & /* work */) { return true; }
244 
249  void moveContinuingWorkToBuffer(WorkType & Work);
251  void moveContinuingWorkToBuffer(const work_iterator begin, const work_iterator end);
253 
258  bool buffersAreEmpty() const;
259 
263  const std::string _name;
270 
271 private:
275  void flushSendBuffers();
276 
280  void smartExecute();
284  void harmExecute();
288  void bsExecute();
289 
293  bool receiveAndExecute();
294 
298  void executeAndBuffer(const std::size_t chunk_size);
299 
303  void canMoveWorkCheck(const THREAD_ID tid);
304 
310 
312  const unsigned int _min_buffer_size;
314  const unsigned int _max_buffer_size;
320  const unsigned int _chunk_size;
323 
325  const unsigned int _clicks_per_communication;
327  const unsigned int _clicks_per_root_communication;
329  const unsigned int _clicks_per_receive;
330 
332  Parallel::MessageTag _parallel_data_buffer_tag;
334  std::vector<MooseUtils::SharedPool<ParallelDataType>> _parallel_data_pools;
336  std::vector<std::vector<WorkType>> _temp_threaded_work;
338  const std::unique_ptr<MooseUtils::Buffer<WorkType>> _work_buffer;
340  const std::unique_ptr<ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>
343  std::unordered_map<
345  std::unique_ptr<SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>>
347 
349  unsigned long long int _local_chunks_executed;
351  unsigned long long int _local_work_completed;
353  unsigned long long int _local_work_started;
355  unsigned long long int _local_work_executed;
357  unsigned long long int _total_work_started;
359  unsigned long long int _total_work_completed;
360 
367 };
368 
369 template <typename WorkType, typename ParallelDataType>
371  const libMesh::Parallel::Communicator & comm,
372  const InputParameters & params,
373  const std::string & name)
374  : ParallelObject(comm),
375  _pid(comm.rank()),
376  _name(name),
377  _params(params),
378 
379  _method((ParallelStudyMethod)(int)(params.get<MooseEnum>("method"))),
380  _has_alternate_ending_criteria(false),
381  _min_buffer_size(params.isParamSetByUser("min_buffer_size")
382  ? params.get<unsigned int>("min_buffer_size")
383  : params.get<unsigned int>("send_buffer_size")),
384  _max_buffer_size(params.get<unsigned int>("send_buffer_size")),
385  _buffer_growth_multiplier(params.get<Real>("buffer_growth_multiplier")),
386  _buffer_shrink_multiplier(params.get<Real>("buffer_shrink_multiplier")),
387  _chunk_size(params.get<unsigned int>("chunk_size")),
388  _allow_new_work_during_execution(params.get<bool>("allow_new_work_during_execution")),
389 
390  _clicks_per_communication(params.get<unsigned int>("clicks_per_communication")),
391  _clicks_per_root_communication(params.get<unsigned int>("clicks_per_root_communication")),
392  _clicks_per_receive(params.get<unsigned int>("clicks_per_receive")),
393 
394  _parallel_data_buffer_tag(comm.get_unique_tag()),
395  _parallel_data_pools(libMesh::n_threads()),
396  _temp_threaded_work(libMesh::n_threads()),
397  _work_buffer(createWorkBuffer()),
398  _receive_buffer(std::make_unique<
399  ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>(
400  comm, this, _method, _clicks_per_receive, _parallel_data_buffer_tag)),
401 
402  _currently_executing(false),
403  _currently_pre_executing(false),
404  _currently_executing_work(false)
405 {
406 #ifndef LIBMESH_HAVE_OPENMP
407  if (libMesh::n_threads() != 1)
408  mooseWarning(_name, ": Threading will not be used without OpenMP");
409 #endif
410 
413  ": When allowing new work addition during execution\n",
414  "('allow_new_work_during_execution = true'), the method must be SMART");
415 }
416 
417 template <typename WorkType, typename ParallelDataType>
418 std::unique_ptr<MooseUtils::Buffer<WorkType>>
420 {
421  std::unique_ptr<MooseUtils::Buffer<WorkType>> buffer;
422 
423  const auto buffer_type = _params.get<MooseEnum>("work_buffer_type");
424  if (buffer_type == "lifo")
425  buffer = std::make_unique<MooseUtils::LIFOBuffer<WorkType>>();
426  else if (buffer_type == "circular")
427  buffer = std::make_unique<MooseUtils::CircularBuffer<WorkType>>();
428  else
429  mooseError("Unknown work buffer type");
430 
431  return buffer;
432 }
433 
434 template <typename WorkType, typename ParallelDataType>
437 {
438  auto params = emptyInputParameters();
439 
440  params.addRangeCheckedParam<unsigned int>(
441  "send_buffer_size", 100, "send_buffer_size > 0", "The size of the send buffer");
442  params.addRangeCheckedParam<unsigned int>(
443  "chunk_size",
444  100,
445  "chunk_size > 0",
446  "The number of objects to process at one time during execution");
447  params.addRangeCheckedParam<unsigned int>("clicks_per_communication",
448  10,
449  "clicks_per_communication >= 0",
450  "Iterations to wait before communicating");
451  params.addRangeCheckedParam<unsigned int>("clicks_per_root_communication",
452  10,
453  "clicks_per_root_communication > 0",
454  "Iterations to wait before communicating with root");
455  params.addRangeCheckedParam<unsigned int>("clicks_per_receive",
456  1,
457  "clicks_per_receive > 0",
458  "Iterations to wait before checking for new objects");
459 
460  params.addParam<unsigned int>("min_buffer_size",
461  "The initial size of the SendBuffer and the floor for shrinking "
462  "it. This defaults to send_buffer_size if not set (i.e. the "
463  "buffer won't change size)");
464  params.addParam<Real>("buffer_growth_multiplier",
465  2.,
466  "How much to grow a SendBuffer by if the buffer completely fills and "
467  "dumps. Will max at send_buffer_size");
468  params.addRangeCheckedParam<Real>("buffer_shrink_multiplier",
469  0.5,
470  "0 < buffer_shrink_multiplier <= 1.0",
471  "Multiplier (between 0 and 1) to apply to the current buffer "
472  "size if it is force dumped. Will stop at "
473  "min_buffer_size.");
474 
475  params.addParam<bool>(
476  "allow_new_work_during_execution",
477  true,
478  "Whether or not to allow the addition of new work to the work buffer during execution");
479 
480  MooseEnum methods("smart harm bs", "smart");
481  params.addParam<MooseEnum>("method", methods, "The algorithm to use");
482 
483  MooseEnum work_buffers("lifo circular", "circular");
484  params.addParam<MooseEnum>("work_buffer_type", work_buffers, "The work buffer type to use");
485 
486  params.addParamNamesToGroup(
487  "send_buffer_size chunk_size clicks_per_communication clicks_per_root_communication "
488  "clicks_per_receive min_buffer_size buffer_growth_multiplier buffer_shrink_multiplier method "
489  "work_buffer_type allow_new_work_during_execution",
490  "Advanced");
491 
492  return params;
493 }
494 
495 template <typename WorkType, typename ParallelDataType>
496 void
498 {
499  _currently_executing_work = true;
500 
501  // If chunk_size > the number of objects left, this will properly grab all of them
502  const auto begin = _work_buffer->beginChunk(chunk_size);
503  const auto end = _work_buffer->endChunk(chunk_size);
504 
505  _local_chunks_executed++;
506 
507 #ifdef LIBMESH_HAVE_OPENMP
508 #pragma omp parallel
509 #endif
510  {
511  const THREAD_ID tid =
512 #ifdef LIBMESH_HAVE_OPENMP
513  omp_get_thread_num();
514 #else
515  0;
516 #endif
517 
518 #ifdef LIBMESH_HAVE_OPENMP
519 #pragma omp for schedule(dynamic, 20) nowait
520 #endif
521  for (auto it = begin; it < end; ++it)
522  executeWork(*it, tid);
523  }
524 
525  // Increment the executed and completed counters
526  _local_work_executed += std::distance(begin, end);
527  for (auto it = begin; it != end; ++it)
528  if (workIsComplete(*it))
529  ++_local_work_completed;
530 
531  // Insertion point for derived classes to do something to the completed work
532  // Example: Create ParallelData to spawn additional work on another processor
533  postExecuteChunk(begin, end);
534 
535  // Remove the objects we just worked on from the buffer
536  _work_buffer->eraseChunk(chunk_size);
537 
538  // If new work is allowed to be geneated during execution, it goes into _temp_threaded_work
539  // during the threaded execution phase and then must be moved into the working buffer
540  if (_allow_new_work_during_execution)
541  {
542  // Amount of work that needs to be moved into the main working buffer from
543  // the temporary working buffer
544  std::size_t threaded_work_size = 0;
545  for (const auto & work_objects : _temp_threaded_work)
546  threaded_work_size += work_objects.size();
547 
548  if (threaded_work_size)
549  {
550  // We don't ever want to decrease the capacity, so only set it if we need more entries
551  if (_work_buffer->capacity() < _work_buffer->size() + threaded_work_size)
552  _work_buffer->setCapacity(_work_buffer->size() + threaded_work_size);
553 
554  // Move the work into the buffer
555  for (auto & threaded_work_vector : _temp_threaded_work)
556  {
557  for (auto & work : threaded_work_vector)
558  _work_buffer->move(work);
559  threaded_work_vector.clear();
560  }
561 
562  // Variable that must be set when adding work so that the algorithm can keep count
563  // of how much work still needs to be executed
564  _local_work_started += threaded_work_size;
565  }
566  }
567 
568  if (_method == ParallelStudyMethod::HARM)
569  flushSendBuffers();
570 
571  _currently_executing_work = false;
572 }
573 
574 template <typename WorkType, typename ParallelDataType>
575 void
577  std::shared_ptr<ParallelDataType> & data, const processor_id_type dest_pid)
578 {
579  mooseAssert(comm().size() > dest_pid, "Invalid processor ID");
580  mooseAssert(_pid != dest_pid, "Processor ID is self");
581 
582  if (!_currently_executing && !_currently_pre_executing)
583  mooseError(_name, ": Cannot sendParallelData() when not executing");
584 
585  // Get the send buffer for the proc this object is going to
586  auto find_pair = _send_buffers.find(dest_pid);
587  // Need to create a send buffer for said processor
588  if (find_pair == _send_buffers.end())
589  _send_buffers
590  .emplace(dest_pid,
591  std::make_unique<
593  comm(),
594  this,
595  dest_pid,
596  _method,
597  _min_buffer_size,
598  _max_buffer_size,
599  _buffer_growth_multiplier,
600  _buffer_shrink_multiplier,
601  _parallel_data_buffer_tag))
602  .first->second->moveObject(data);
603  // Send buffer exists for this processor
604  else
605  find_pair->second->moveObject(data);
606 }
607 
608 template <typename WorkType, typename ParallelDataType>
609 void
611 {
612  for (auto & send_buffer_iter : _send_buffers)
613  send_buffer_iter.second->forceSend();
614 }
615 
616 template <typename WorkType, typename ParallelDataType>
617 void
619 {
620  if (!_currently_pre_executing)
621  mooseError(_name, ": Can only reserve in object buffer during pre-execution");
622 
623  // We don't ever want to decrease the capacity, so only set if we need more entries
624  if (_work_buffer->capacity() < size)
625  _work_buffer->setCapacity(size);
626 }
627 
628 template <typename WorkType, typename ParallelDataType>
629 void
631 {
632  if (_receive_buffer->buffer().empty())
633  return;
634 
635  // Let derived classes work on the data and then clear it after
636  postReceiveParallelData(_receive_buffer->buffer().begin(), _receive_buffer->buffer().end());
637  for (auto & data : _receive_buffer->buffer())
638  if (data)
639  data.reset();
640 
641  _receive_buffer->buffer().clear();
642 }
643 
644 template <typename WorkType, typename ParallelDataType>
645 bool
647 {
648  bool executed_some = false;
649 
650  if (_receive_buffer->currentlyReceiving() && _method == ParallelStudyMethod::SMART)
651  _receive_buffer->cleanupRequests();
652  else
653  _receive_buffer->receive();
654 
655  postReceiveParallelDataInternal();
656 
657  preReceiveAndExecute();
658 
659  while (!_work_buffer->empty())
660  {
661  executed_some = true;
662 
663  // Switch between tracing a chunk and buffering with SMART
664  if (_method == ParallelStudyMethod::SMART)
665  {
666  // Look for extra work first so that these transfers can be finishing while we're executing
667  // Start receives only if our work buffer is decently sized
668  const bool start_receives_only = _work_buffer->size() > (2 * _chunk_size);
669  _receive_buffer->receive(_work_buffer->size() > (2 * _chunk_size));
670  if (!start_receives_only)
671  postReceiveParallelDataInternal();
672 
673  // Execute some objects
674  executeAndBuffer(_chunk_size);
675  }
676  // Execute all of them and then buffer with the other methods
677  else
678  executeAndBuffer(_work_buffer->size());
679  }
680 
681  return executed_some;
682 }
683 
684 template <typename WorkType, typename ParallelDataType>
685 void
687 {
688  mooseAssert(_method == ParallelStudyMethod::SMART, "Should be called with SMART only");
689 
690  // Request for the sum of the started work
691  Parallel::Request started_request;
692  // Request for the sum of the completed work
693  Parallel::Request completed_request;
694 
695  // Temp for use in sending the current value in a nonblocking sum instead of an updated value
696  unsigned long long int temp;
697 
698  // Whether or not to make the started request first, or after every finished request.
699  // When allowing adding new work during the execution phase, the starting object counts could
700  // change after right now, so we must update them after each finished request is complete.
701  // When not allowing generation during propagation, we know the counts up front.
702  const bool started_request_first = !_allow_new_work_during_execution;
703 
704  // Get the amount of work that was started in the whole domain, if applicable
705  if (started_request_first)
706  comm().sum(_local_work_started, _total_work_started, started_request);
707 
708  // Whether or not the started request has been made
709  bool made_started_request = started_request_first;
710  // Whether or not the completed request has been made
711  bool made_completed_request = false;
712 
713  // Good time to get rid of whatever's currently in our SendBuffers
714  flushSendBuffers();
715 
716  // Use these to try to delay some forced communication
717  unsigned int non_executing_clicks = 0;
718  unsigned int non_executing_root_clicks = 0;
719  bool executed_some = true;
720 
721  // Keep executing work until it has all completed
722  while (true)
723  {
724  executed_some = receiveAndExecute();
725 
726  if (executed_some)
727  {
728  non_executing_clicks = 0;
729  non_executing_root_clicks = 0;
730  }
731  else
732  {
733  non_executing_clicks++;
734  non_executing_root_clicks++;
735  }
736 
737  if (non_executing_clicks >= _clicks_per_communication)
738  {
739  non_executing_clicks = 0;
740 
741  flushSendBuffers();
742  }
743 
744  if (_has_alternate_ending_criteria)
745  {
746  if (buffersAreEmpty() && alternateSmartEndingCriteriaMet())
747  {
748  comm().barrier();
749  return;
750  }
751  }
752  else if (non_executing_root_clicks >= _clicks_per_root_communication)
753  {
754  non_executing_root_clicks = 0;
755 
756  // We need the starting work sum first but said request isn't complete yet
757  if (started_request_first && !started_request.test())
758  continue;
759 
760  // At this point, we need to make a request for the completed work sum
761  if (!made_completed_request)
762  {
763  made_completed_request = true;
764  temp = _local_work_completed;
765  comm().sum(temp, _total_work_completed, completed_request);
766  continue;
767  }
768 
769  // We have the completed work sum
770  if (completed_request.test())
771  {
772  // The starting work sum must be requested /after/ we have finishing counts and we
773  // need to make the request for said sum
774  if (!made_started_request)
775  {
776  made_started_request = true;
777  temp = _local_work_started;
778  comm().sum(temp, _total_work_started, started_request);
779  continue;
780  }
781 
782  // The starting work sum must be requested /after/ we have finishing sum and we
783  // don't have the starting sum yet
784  if (!started_request_first && !started_request.test())
785  continue;
786 
787  // Started count is the same as the finished count - we're done!
788  if (_total_work_started == _total_work_completed)
789  return;
790 
791  // Next time around we should make a completed sum request
792  made_completed_request = false;
793  // If we need the starting work sum after the completed work sum, we need those now as well
794  if (!started_request_first)
795  made_started_request = false;
796  }
797  }
798  }
799 }
800 
801 template <typename WorkType, typename ParallelDataType>
802 void
804 {
805  if (_has_alternate_ending_criteria)
806  mooseError("ParallelStudy: Alternate ending criteria not yet supported for HARM");
807  if (_allow_new_work_during_execution)
808  mooseError(_name, ": The addition of new work during execution is not supported by HARM");
809  mooseAssert(_method == ParallelStudyMethod::HARM, "Should be called with HARM only");
810 
811  // Request for the total amount of work started
812  Parallel::Request work_started_request;
813  // Requests for sending the amount of finished worked to every other processor
814  std::vector<Parallel::Request> work_completed_requests(comm().size());
815  // Whether or not the finished requests have been sent to each processor
816  std::vector<bool> work_completed_requests_sent(comm().size(), false);
817  // Values of work completed on this processor that are being sent to other processors
818  std::vector<unsigned long long int> work_completed_requests_temps(comm().size(), 0);
819  // Work completed by each processor
820  std::vector<unsigned long long int> work_completed_per_proc(comm().size(), 0);
821  // Tag for sending work finished
822  const auto work_completed_requests_tag = comm().get_unique_tag();
823 
824  // Get the amount of work that was started in the whole domain
825  comm().sum(_local_work_started, _total_work_started, work_started_request);
826 
827  // All work has been executed, so time to communicate
828  flushSendBuffers();
829 
830  // HARM only does some communication based on times through the loop.
831  // This counter will be used for that
832  unsigned int communication_clicks = 0;
833 
834  Parallel::Status work_completed_probe_status;
835  int work_completed_probe_flag;
836 
837  // Keep working until done
838  while (true)
839  {
840  receiveAndExecute();
841 
842  flushSendBuffers();
843 
844  if (communication_clicks > comm().size())
845  {
846  // Receive messages about work being finished
847  do
848  {
849  MPI_Iprobe(MPI_ANY_SOURCE,
850  work_completed_requests_tag.value(),
851  comm().get(),
852  &work_completed_probe_flag,
853  work_completed_probe_status.get());
854 
855  if (work_completed_probe_flag)
856  {
857  auto proc = work_completed_probe_status.source();
858  comm().receive(proc, work_completed_per_proc[proc], work_completed_requests_tag);
859  }
860  } while (work_completed_probe_flag);
861 
862  _total_work_completed = std::accumulate(
863  work_completed_per_proc.begin(), work_completed_per_proc.end(), _local_work_completed);
864 
865  // Reset
866  communication_clicks = 0;
867  }
868 
869  // Send messages about objects being finished
870  for (processor_id_type pid = 0; pid < comm().size(); ++pid)
871  if (pid != _pid &&
872  (!work_completed_requests_sent[pid] || work_completed_requests[pid].test()) &&
873  _local_work_completed > work_completed_requests_temps[pid])
874  {
875  work_completed_requests_temps[pid] = _local_work_completed;
876  comm().send(pid,
877  work_completed_requests_temps[pid],
878  work_completed_requests[pid],
879  work_completed_requests_tag);
880  work_completed_requests_sent[pid] = true;
881  }
882 
883  // All procs agree on the amount of work started and we've finished all the work started
884  if (work_started_request.test() && _total_work_started == _total_work_completed)
885  {
886  // Need to call the post wait work for all of the requests
887  for (processor_id_type pid = 0; pid < comm().size(); ++pid)
888  if (pid != _pid)
889  work_completed_requests[pid].wait();
890 
891  return;
892  }
893 
894  communication_clicks++;
895  }
896 }
897 
898 template <typename WorkType, typename ParallelDataType>
899 void
901 {
902  if (_has_alternate_ending_criteria)
903  mooseError("ParallelStudy: Alternate ending criteria not yet supported for BS");
904  if (_allow_new_work_during_execution)
905  mooseError(_name, ": The addition of new work during execution is not supported by BS");
906  mooseAssert(_method == ParallelStudyMethod::BS, "Should be called with BS only");
907 
908  Parallel::Request work_completed_probe_status;
909  Parallel::Request work_completed_request;
910 
911  // Temp for use in sending the current value in a nonblocking sum instead of an updated value
912  unsigned long long int temp;
913 
914  // Get the amount of work that were started in the whole domain
915  comm().sum(_local_work_started, _total_work_started, work_completed_probe_status);
916 
917  // Keep working until done
918  while (true)
919  {
920  bool receiving = false;
921  bool sending = false;
922 
923  Parallel::Request some_left_request;
924  unsigned int some_left = 0;
925  unsigned int all_some_left = 1;
926 
927  do
928  {
929  _receive_buffer->receive();
930  postReceiveParallelDataInternal();
931  flushSendBuffers();
932 
933  receiving = _receive_buffer->currentlyReceiving();
934 
935  sending = false;
936  for (auto & send_buffer : _send_buffers)
937  sending = sending || send_buffer.second->currentlySending() ||
938  send_buffer.second->currentlyBuffered();
939 
940  if (!receiving && !sending && some_left_request.test() && all_some_left)
941  {
942  some_left = receiving || sending;
943  comm().sum(some_left, all_some_left, some_left_request);
944  }
945  } while (receiving || sending || !some_left_request.test() || all_some_left);
946 
947  executeAndBuffer(_work_buffer->size());
948 
949  comm().barrier();
950 
951  if (work_completed_probe_status.test() && work_completed_request.test())
952  {
953  if (_total_work_started == _total_work_completed)
954  return;
955 
956  temp = _local_work_completed;
957  comm().sum(temp, _total_work_completed, work_completed_request);
958  }
959  }
960 }
961 
962 template <typename WorkType, typename ParallelDataType>
963 void
965 {
966  if (!buffersAreEmpty())
967  mooseError(_name, ": Buffers are not empty in preExecute()");
968 
969  // Clear communication buffers
970  for (auto & send_buffer_pair : _send_buffers)
971  send_buffer_pair.second->clear();
972  _send_buffers.clear();
973  _receive_buffer->clear();
974 
975  // Clear counters
976  _local_chunks_executed = 0;
977  _local_work_completed = 0;
978  _local_work_started = 0;
979  _local_work_executed = 0;
980  _total_work_started = 0;
981  _total_work_completed = 0;
982 
983  _currently_pre_executing = true;
984 }
985 
986 template <typename WorkType, typename ParallelDataType>
987 void
989 {
990  if (!_currently_pre_executing)
991  mooseError(_name, ": preExecute() was not called before execute()");
992 
993  _currently_pre_executing = false;
994  _currently_executing = true;
995 
996  switch (_method)
997  {
999  smartExecute();
1000  break;
1002  harmExecute();
1003  break;
1005  bsExecute();
1006  break;
1007  default:
1008  mooseError("Unknown ParallelStudyMethod");
1009  }
1010 
1011  _currently_executing = false;
1012 
1013  // Sanity checks on if we're really done
1014  comm().barrier();
1015 
1016  if (!buffersAreEmpty())
1017  mooseError(_name, ": Buffers are not empty after execution");
1018 }
1019 
1020 template <typename WorkType, typename ParallelDataType>
1021 void
1023  const MoveWorkError error, const WorkType * /* work = nullptr */) const
1024 {
1025  if (error == MoveWorkError::DURING_EXECUTION_DISABLED)
1026  mooseError(_name,
1027  ": The moving of new work into the buffer during work execution requires\n",
1028  "that the parameter 'allow_new_work_during_execution = true'");
1029  if (error == MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY)
1030  mooseError(
1031  _name,
1032  ": Can only move work into the buffer in the pre-execution and execution phase\n(between "
1033  "preExecute() and the end of execute()");
1034  if (error == MoveWorkError::PRE_EXECUTION_ONLY)
1035  mooseError(_name,
1036  ": Can only move work into the buffer in the pre-execution phase\n(between "
1037  "preExecute() and execute()");
1038  if (error == MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY)
1039  mooseError(_name,
1040  ": Can only move work into the buffer in the pre-execution phase\n(between "
1041  "preExecute() and execute()) on thread 0");
1042  if (error == CONTINUING_DURING_EXECUTING_WORK)
1043  mooseError(_name, ": Cannot move continuing work into the buffer during executeAndBuffer()");
1044 
1045  mooseError("Unknown MoveWorkError");
1046 }
1047 
1048 template <typename WorkType, typename ParallelDataType>
1049 void
1051 {
1052  if (_currently_executing)
1053  {
1054  if (!_allow_new_work_during_execution)
1055  moveWorkError(MoveWorkError::DURING_EXECUTION_DISABLED);
1056  }
1057  else if (!_currently_pre_executing)
1058  {
1059  if (_allow_new_work_during_execution)
1060  moveWorkError(MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY);
1061  else
1062  moveWorkError(MoveWorkError::PRE_EXECUTION_ONLY);
1063  }
1064  else if (tid != 0)
1065  moveWorkError(MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY);
1066 }
1067 
1068 template <typename WorkType, typename ParallelDataType>
1069 void
1071 {
1072  // Error checks for moving work into the buffer at unallowed times
1073  canMoveWorkCheck(tid);
1074 
1075  // Can move directly into the work buffer on thread 0 when we're not executing work
1076  if (!_currently_executing_work && tid == 0)
1077  {
1078  ++_local_work_started; // must ALWAYS increment when adding new work to the working buffer
1079  _work_buffer->move(work);
1080  }
1081  // Objects added during execution go into a temporary threaded vector (is thread safe) to be
1082  // moved into the working buffer when possible
1083  else
1084  _temp_threaded_work[tid].emplace_back(std::move(work));
1085 }
1086 
1087 template <typename WorkType, typename ParallelDataType>
1088 void
1090  const work_iterator end,
1091  const THREAD_ID tid)
1092 {
1093  // Error checks for moving work into the buffer at unallowed times
1094  canMoveWorkCheck(tid);
1095 
1096  // Get work size beforehand so we can resize
1097  const auto size = std::distance(begin, end);
1098 
1099  // Can move directly into the work buffer on thread 0 when we're not executing work
1100  if (!_currently_executing_work && tid == 0)
1101  {
1102  if (_work_buffer->capacity() < _work_buffer->size() + size)
1103  _work_buffer->setCapacity(_work_buffer->size() + size);
1104  _local_work_started += size;
1105  }
1106  else
1107  _temp_threaded_work[tid].reserve(_temp_threaded_work[tid].size() + size);
1108 
1109  // Move the objects
1110  if (!_currently_executing_work && tid == 0)
1111  for (auto it = begin; it != end; ++it)
1112  _work_buffer->move(*it);
1113  else
1114  for (auto it = begin; it != end; ++it)
1115  _temp_threaded_work[tid].emplace_back(std::move(*it));
1116 }
1117 
1118 template <typename WorkType, typename ParallelDataType>
1119 void
1121  const THREAD_ID tid)
1122 {
1123  moveWorkToBuffer(work_vector.begin(), work_vector.end(), tid);
1124 }
1125 
1126 template <typename WorkType, typename ParallelDataType>
1127 void
1129 {
1130  if (_currently_executing_work)
1131  moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1132 
1133  _work_buffer->move(work);
1134 }
1135 
1136 template <typename WorkType, typename ParallelDataType>
1137 void
1139  const work_iterator end)
1140 {
1141  if (_currently_executing_work)
1142  moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1143 
1144  const auto size = std::distance(begin, end);
1145  if (_work_buffer->capacity() < _work_buffer->size() + size)
1146  _work_buffer->setCapacity(_work_buffer->size() + size);
1147 
1148  for (auto it = begin; it != end; ++it)
1149  _work_buffer->move(*it);
1150 }
1151 
1152 template <typename WorkType, typename ParallelDataType>
1153 unsigned long long int
1155 {
1156  unsigned long long int total = 0;
1157 
1158  for (const auto & buffer : _send_buffers)
1159  total += buffer.second->bufferPoolCreated();
1160 
1161  return total;
1162 }
1163 
1164 template <typename WorkType, typename ParallelDataType>
1165 unsigned long long int
1167 {
1168  unsigned long long int total_sent = 0;
1169 
1170  for (const auto & buffer : _send_buffers)
1171  total_sent += buffer.second->objectsSent();
1172 
1173  return total_sent;
1174 }
1175 
1176 template <typename WorkType, typename ParallelDataType>
1177 unsigned long long int
1179 {
1180  unsigned long long int total_sent = 0;
1181 
1182  for (const auto & buffer : _send_buffers)
1183  total_sent += buffer.second->buffersSent();
1184 
1185  return total_sent;
1186 }
1187 
1188 template <typename WorkType, typename ParallelDataType>
1189 unsigned long long int
1191 {
1192  unsigned long long int num_created = 0;
1193 
1194  for (const auto & pool : _parallel_data_pools)
1195  num_created += pool.num_created();
1196 
1197  return num_created;
1198 }
1199 
1200 template <typename WorkType, typename ParallelDataType>
1201 bool
1203 {
1204  mooseError(_name, ": Unimplemented alternateSmartEndingCriteriaMet()");
1205 }
1206 
1207 template <typename WorkType, typename ParallelDataType>
1208 bool
1210 {
1211  if (!_work_buffer->empty())
1212  return false;
1213  for (const auto & threaded_buffer : _temp_threaded_work)
1214  if (!threaded_buffer.empty())
1215  return false;
1216  if (_receive_buffer->currentlyReceiving())
1217  return false;
1218  for (const auto & map_pair : _send_buffers)
1219  if (map_pair.second->currentlySending() || map_pair.second->currentlyBuffered())
1220  return false;
1221 
1222  return true;
1223 }
void reserveBuffer(const std::size_t size)
Reserve size entries in the work buffer.
const std::unique_ptr< ReceiveBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > > _receive_buffer
The receive buffer.
unsigned int clicksPerCommunication() const
Gets the number of iterations to wait before communicating.
const unsigned int _max_buffer_size
Number of objects to buffer before communication.
unsigned int n_threads()
const unsigned int _chunk_size
Number of objects to execute at once during communication.
unsigned long long int poolParallelDataCreated() const
Gets the total number of parallel data created in all of the threaded pools.
unsigned long long int parallelDataSent() const
Gets the total number of parallel data objects sent from this processor.
std::vector< std::vector< WorkType > > _temp_threaded_work
Threaded temprorary storage for work added while we&#39;re using the _work_buffer (one for each thread) ...
const unsigned int _clicks_per_communication
Iterations to wait before communicating.
unsigned long long int localChunksExecuted() const
Gets the total number of chunks of work executed on this processor.
void execute()
Execute method.
const Real _buffer_shrink_multiplier
Multiplier for the buffer size for shrinking the buffer.
const unsigned int _min_buffer_size
Minimum size of a SendBuffer.
void mooseError(Args &&... args)
unsigned long long int totalWorkCompleted() const
Gets the total amount of work completeed across all processors.
bool _currently_pre_executing
Whether we are between preExecute() and execute()
unsigned long long int _local_work_executed
Amount of work executed on this processor.
unsigned long long int localWorkStarted() const
Gets the total amount of work started from this processor.
void mooseWarning(Args &&... args)
void moveWorkToBuffer(WorkType &work, const THREAD_ID tid)
Adds work to the buffer to be executed.
virtual bool alternateSmartEndingCriteriaMet()
Insertion point for derived classes to provide an alternate ending criteria for SMART execution...
const std::string _name
Name for this object for use in error handling.
void canMoveWorkCheck(const THREAD_ID tid)
Internal check for if it is allowed to currently add work in moveWorkToBuffer().
const Parallel::Communicator & comm() const
bool _currently_executing
Whether we are within execute()
ParallelStudyMethod method() const
Gets the method.
unsigned long long int _local_work_completed
Amount of work completed on this processor.
void flushSendBuffers()
Flushes all parallel data out of the send buffers.
const std::unique_ptr< MooseUtils::Buffer< WorkType > > _work_buffer
Buffer for executing work.
std::vector< T >::iterator iterator
The following methods are specializations for using the Parallel::packed_range_* routines for a vecto...
const processor_id_type _pid
This rank.
void moveParallelDataToBuffer(std::shared_ptr< ParallelDataType > &data, const processor_id_type dest_pid)
Moves parallel data objects to the send buffer to be communicated to processor dest_pid.
const Real _buffer_growth_multiplier
Multiplier for the buffer size for growing the buffer.
InputParameters emptyInputParameters()
const ParallelStudyMethod _method
The study method.
uint8_t processor_id_type
void moveContinuingWorkToBuffer(WorkType &Work)
Moves work that is considered continuing for the purposes of the execution algorithm into the buffer...
virtual bool workIsComplete(const WorkType &)
Can be overridden to denote if a piece of work is not complete yet.
void smartExecute()
Execute work using SMART.
std::unique_ptr< T, ExternalDeleter > PtrType
bool receiveAndExecute()
Receive packets of parallel data from other processors and executes work.
ParallelStudy(const libMesh::Parallel::Communicator &comm, const InputParameters &params, const std::string &name)
virtual void preReceiveAndExecute()
Insertion point called just after trying to receive work and just before beginning work on the work b...
const unsigned int _clicks_per_root_communication
Iterations to wait before communicating with root.
bool buffersAreEmpty() const
Whether or not ALL of the buffers are empty: Working buffer, threaded buffers, receive buffer...
MoveWorkError
Enum for providing useful errors during work addition in moveWorkError().
static InputParameters validParams()
const std::string name
Definition: Setup.h:20
unsigned long long int _local_chunks_executed
Number of chunks of work executed on this processor.
unsigned int chunkSize() const
Gets the chunk size.
void harmExecute()
Execute work using HARM.
Parallel::MessageTag _parallel_data_buffer_tag
MessageTag for sending parallel data.
unsigned long long int _total_work_completed
Amount of work completed on all processors.
virtual void moveWorkError(const MoveWorkError error, const WorkType *work=nullptr) const
Virtual that allows for the customization of error text for moving work into the buffer.
unsigned int clicksPerRootCommunication() const
Gets the number of iterations to wait before communicating with root.
MooseUtils::SharedPool< ParallelDataType >::PtrType acquireParallelData(const THREAD_ID tid, Args &&... args)
Acquire a parallel data object from the pool.
Definition: ParallelStudy.h:73
void preExecute()
Pre-execute method that MUST be called before execute() and before adding work.
void postReceiveParallelDataInternal()
Internal method for acting on the parallel data that has just been received into the parallel buffer...
const ReceiveBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > & receiveBuffer() const
Gets the receive buffer.
Definition: ParallelStudy.h:88
unsigned long long int _local_work_started
Amount of work started on this processor.
std::vector< MooseUtils::SharedPool< ParallelDataType > > _parallel_data_pools
Pools for re-using destructed parallel data objects (one for each thread)
void bsExecute()
Execute work using BS.
const MooseUtils::Buffer< WorkType > & workBuffer() const
Gets the work buffer.
Definition: ParallelStudy.h:96
virtual void postReceiveParallelData(const parallel_data_iterator begin, const parallel_data_iterator end)=0
Pure virtual for acting on parallel data that has JUST been received and filled into the buffer...
ParallelStudyMethod
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
const bool _allow_new_work_during_execution
Whether or not to allow the addition of new work to the buffer during execution.
MooseUtils::Buffer< WorkType >::iterator work_iterator
Definition: ParallelStudy.h:32
unsigned long long int buffersSent() const
Gets the total number of buffers sent from this processor.
MooseUtils::Buffer< std::shared_ptr< ParallelDataType > >::iterator parallel_data_iterator
Definition: ParallelStudy.h:34
virtual void postExecuteChunk(const work_iterator, const work_iterator)
Insertion point for acting on work that was just executed.
bool currentlyPreExecuting() const
Whether or not this object is between preExecute() and execute().
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
const InputParameters & _params
The InputParameters.
bool _currently_executing_work
Whether or not we are currently within executeAndBuffer()
unsigned int maxBufferSize() const
Gets the max buffer size.
bool currentlyExecuting() const
Whether or not this object is currently in execute().
void executeAndBuffer(const std::size_t chunk_size)
Execute a chunk of work and buffer.
virtual std::unique_ptr< MooseUtils::Buffer< WorkType > > createWorkBuffer()
Creates the work buffer.
virtual void executeWork(const WorkType &work, const THREAD_ID tid)=0
Pure virtual to be overridden that executes a single object of work on a given thread.
unsigned int clicksPerReceive() const
Gets the number of iterations to wait before checking for new parallel data.
void ErrorVector unsigned int
const Elem & get(const ElemType type_in)
unsigned long long int _total_work_started
Amount of work started on all processors.
bool _has_alternate_ending_criteria
Whether or not this object has alternate ending criteria.
unsigned long long int localWorkExecuted() const
Gets the total amount of work executed on this processor.
unsigned int THREAD_ID
std::unordered_map< processor_id_type, std::unique_ptr< SendBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > > > _send_buffers
Send buffers for each processor.
unsigned long long int sendBufferPoolCreated() const
Gets the total number of send buffer pools created.