LCOV - code coverage report
Current view: top level - include/utils - ParallelStudy.h (source / functions) Hit Total Coverage
Test: idaholab/moose ray_tracing: #31405 (292dce) with base fef103 Lines: 309 348 88.8 %
Date: 2025-09-04 07:56:07 Functions: 24 28 85.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : //* This file is part of the MOOSE framework
       2             : //* https://mooseframework.inl.gov
       3             : //*
       4             : //* All rights reserved, see COPYRIGHT for full restrictions
       5             : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
       6             : //*
       7             : //* Licensed under LGPL 2.1, please see LICENSE for details
       8             : //* https://www.gnu.org/licenses/lgpl-2.1.html
       9             : 
      10             : #pragma once
      11             : 
      12             : #include "ParallelStudyMethod.h"
      13             : 
      14             : // MOOSE includes
      15             : #include "InputParameters.h"
      16             : #include "SharedPool.h"
      17             : #include "MooseEnum.h"
      18             : #include "CircularBuffer.h"
      19             : #include "LIFOBuffer.h"
      20             : 
      21             : // Local includes
      22             : #include "SendBuffer.h"
      23             : #include "ReceiveBuffer.h"
      24             : 
      25             : // libMesh Includes
      26             : #include "libmesh/parallel_object.h"
      27             : 
      28             : template <typename WorkType, typename ParallelDataType>
      29             : class ParallelStudy : public libMesh::ParallelObject
      30             : {
      31             : public:
      32             :   typedef typename MooseUtils::Buffer<WorkType>::iterator work_iterator;
      33             :   typedef typename MooseUtils::Buffer<std::shared_ptr<ParallelDataType>>::iterator
      34             :       parallel_data_iterator;
      35             : 
      36             :   static InputParameters validParams();
      37             : 
      38             :   ParallelStudy(const libMesh::Parallel::Communicator & comm,
      39             :                 const InputParameters & params,
      40             :                 const std::string & name);
      41             : 
      42             :   /**
      43             :    * Pre-execute method that MUST be called before execute() and before adding work.
      44             :    */
      45             :   void preExecute();
      46             :   /**
      47             :    * Execute method.
      48             :    */
      49             :   void execute();
      50             : 
      51             :   /**
      52             :    * Adds work to the buffer to be executed. This will move the work into the buffer
      53             :    * (with std::move), therefore the passed in work will be invalid after this call.
      54             :    * For the purposes of the completion algorithm, this added work is considered
      55             :    * NEW work.
      56             :    *
      57             :    * During pre-execution (between preExecute() and execute()), this method can ONLY
      58             :    * be called on thread 0.
      59             :    *
      60             :    * During execute(), this method is thread safe and can be used to add work during execution.
      61             :    */
      62             :   ///@{
      63             :   void moveWorkToBuffer(WorkType & work, const THREAD_ID tid);
      64             :   void moveWorkToBuffer(const work_iterator begin, const work_iterator end, const THREAD_ID tid);
      65             :   void moveWorkToBuffer(std::vector<WorkType> & work, const THREAD_ID tid);
      66             :   ///@}
      67             : 
      68             :   /**
      69             :    * Acquire a parallel data object from the pool.
      70             :    */
      71             :   template <typename... Args>
      72             :   typename MooseUtils::SharedPool<ParallelDataType>::PtrType
      73             :   acquireParallelData(const THREAD_ID tid, Args &&... args)
      74             :   {
      75    12018600 :     return _parallel_data_pools[tid].acquire(std::forward<Args>(args)...);
      76             :   }
      77             : 
      78             :   /**
      79             :    * Moves parallel data objects to the send buffer to be communicated to processor \p dest_pid.
      80             :    */
      81             :   void moveParallelDataToBuffer(std::shared_ptr<ParallelDataType> & data,
      82             :                                 const processor_id_type dest_pid);
      83             : 
      84             :   /**
      85             :    * Gets the receive buffer.
      86             :    */
      87             :   const ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>> &
      88             :   receiveBuffer() const
      89             :   {
      90             :     return *_receive_buffer;
      91             :   }
      92             : 
      93             :   /**
      94             :    * Gets the work buffer.
      95             :    */
      96             :   const MooseUtils::Buffer<WorkType> & workBuffer() const { return *_work_buffer; }
      97             : 
      98             :   /**
      99             :    * Gets the total number of send buffer pools created.
     100             :    */
     101             :   unsigned long long int sendBufferPoolCreated() const;
     102             :   /**
     103             :    * Gets the total number of parallel data objects sent from this processor.
     104             :    */
     105             :   unsigned long long int parallelDataSent() const;
     106             :   /**
     107             :    * Gets the total number of buffers sent from this processor.
     108             :    */
     109             :   unsigned long long int buffersSent() const;
     110             :   /**
     111             :    * Gets the total number of parallel data created in all of the threaded pools
     112             :    */
     113             :   unsigned long long int poolParallelDataCreated() const;
     114             : 
     115             :   /**
     116             :    * Gets the total amount of work started from this processor.
     117             :    */
     118          14 :   unsigned long long int localWorkStarted() const { return _local_work_started; }
     119             :   /**
     120             :    * Gets the total amount of work executed on this processor.
     121             :    */
     122          14 :   unsigned long long int localWorkExecuted() const { return _local_work_executed; }
     123             :   /**
     124             :    * Gets the total amount of work completeed across all processors.
     125             :    */
     126         284 :   unsigned long long int totalWorkCompleted() const { return _total_work_completed; }
     127             :   /**
     128             :    * Gets the total number of chunks of work executed on this processor.
     129             :    */
     130          14 :   unsigned long long int localChunksExecuted() const { return _local_chunks_executed; }
     131             : 
     132             :   /**
     133             :    * Whether or not this object is currently in execute().
     134             :    */
     135        1104 :   bool currentlyExecuting() const { return _currently_executing; }
     136             :   /**
     137             :    * Whether or not this object is between preExecute() and execute().
     138             :    */
     139        9625 :   bool currentlyPreExecuting() const { return _currently_pre_executing; }
     140             : 
     141             :   /**
     142             :    * Gets the max buffer size
     143             :    */
     144             :   unsigned int maxBufferSize() const { return _max_buffer_size; }
     145             :   /**
     146             :    * Gets the chunk size
     147             :    */
     148             :   unsigned int chunkSize() const { return _chunk_size; }
     149             : 
     150             :   /**
     151             :    * Gets the number of iterations to wait before communicating
     152             :    */
     153             :   unsigned int clicksPerCommunication() const { return _clicks_per_communication; }
     154             :   /**
     155             :    * Gets the number of iterations to wait before communicating with root
     156             :    */
     157             :   unsigned int clicksPerRootCommunication() const { return _clicks_per_root_communication; }
     158             :   /**
     159             :    * Gets the number of iterations to wait before checking for new parallel data
     160             :    */
     161             :   unsigned int clicksPerReceive() const { return _clicks_per_receive; }
     162             : 
     163             :   /**
     164             :    * Gets the method
     165             :    */
     166             :   ParallelStudyMethod method() const { return _method; }
     167             : 
     168             :   /**
     169             :    * Reserve \p size entries in the work buffer.
     170             :    *
     171             :    * This can only be used during the pre-execution phase (between preExecute() and execute()).
     172             :    *
     173             :    * This is particularly useful when one wants to move many work objects into the buffer using
     174             :    * moveWorkToBuffer() and wants to allocate the space ahead of time.
     175             :    */
     176             :   void reserveBuffer(const std::size_t size);
     177             : 
     178             : protected:
     179             :   /**
     180             :    * Enum for providing useful errors during work addition in moveWorkError().
     181             :    */
     182             :   enum MoveWorkError
     183             :   {
     184             :     DURING_EXECUTION_DISABLED,
     185             :     PRE_EXECUTION_AND_EXECUTION_ONLY,
     186             :     PRE_EXECUTION_ONLY,
     187             :     PRE_EXECUTION_THREAD_0_ONLY,
     188             :     CONTINUING_DURING_EXECUTING_WORK
     189             :   };
     190             : 
     191             :   /**
     192             :    * Creates the work buffer
     193             :    *
     194             :    * This is virtual so that derived classes can use their own specialized buffers
     195             :    */
     196             :   virtual std::unique_ptr<MooseUtils::Buffer<WorkType>> createWorkBuffer();
     197             : 
     198             :   /**
     199             :    * Pure virtual to be overridden that executes a single object of work on a given thread
     200             :    */
     201             :   virtual void executeWork(const WorkType & work, const THREAD_ID tid) = 0;
     202             : 
     203             :   /**
     204             :    * Virtual that allows for the customization of error text for moving work into the buffer.
     205             :    */
     206             :   virtual void moveWorkError(const MoveWorkError error, const WorkType * work = nullptr) const;
     207             : 
     208             :   /**
     209             :    * Insertion point for derived classes to provide an alternate ending criteria for
     210             :    * SMART execution. Only called when _has_alternate_ending_criteria == true.
     211             :    */
     212             :   virtual bool alternateSmartEndingCriteriaMet();
     213             : 
     214             :   /**
     215             :    * Insertion point for acting on work that was just executed.
     216             :    *
     217             :    * This is not called in threads.
     218             :    */
     219           0 :   virtual void postExecuteChunk(const work_iterator /* begin */, const work_iterator /* end */) {}
     220             : 
     221             :   /**
     222             :    * Insertion point called just after trying to receive work and just before beginning
     223             :    * work on the work buffer
     224             :    */
     225     4403306 :   virtual void preReceiveAndExecute() {}
     226             : 
     227             :   /**
     228             :    * Pure virtual for acting on parallel data that has JUST been received and
     229             :    * filled into the buffer.
     230             :    *
     231             :    * The parallel data in the range passed here will have its use count reduced
     232             :    * by one if it still exists after this call.
     233             :    */
     234             :   virtual void postReceiveParallelData(const parallel_data_iterator begin,
     235             :                                        const parallel_data_iterator end) = 0;
     236             : 
     237             :   /**
     238             :    * Can be overridden to denote if a piece of work is not complete yet.
     239             :    *
     240             :    * The complete terminology is used within the execution algorithms to
     241             :    * determine if the study is complete.
     242             :    */
     243           0 :   virtual bool workIsComplete(const WorkType & /* work */) { return true; }
     244             : 
     245             :   /**
     246             :    * Moves work that is considered continuing for the purposes of the execution
     247             :    * algorithm into the buffer.
     248             :    */
     249             :   ///@{
     250             :   void moveContinuingWorkToBuffer(WorkType & Work);
     251             :   void moveContinuingWorkToBuffer(const work_iterator begin, const work_iterator end);
     252             :   ///@}
     253             : 
     254             :   /**
     255             :    * Whether or not ALL of the buffers are empty:
     256             :    * Working buffer, threaded buffers, receive buffer, and send buffers.
     257             :    */
     258             :   bool buffersAreEmpty() const;
     259             : 
     260             :   /// This rank
     261             :   const processor_id_type _pid;
     262             :   /// Name for this object for use in error handling
     263             :   const std::string _name;
     264             :   /// The InputParameters
     265             :   const InputParameters & _params;
     266             :   /// The study method
     267             :   const ParallelStudyMethod _method;
     268             :   /// Whether or not this object has alternate ending criteria
     269             :   bool _has_alternate_ending_criteria;
     270             : 
     271             : private:
     272             :   /**
     273             :    * Flushes all parallel data out of the send buffers
     274             :    */
     275             :   void flushSendBuffers();
     276             : 
     277             :   /**
     278             :    * Execute work using SMART
     279             :    */
     280             :   void smartExecute();
     281             :   /**
     282             :    * Execute work using HARM
     283             :    */
     284             :   void harmExecute();
     285             :   /**
     286             :    * Execute work using BS
     287             :    */
     288             :   void bsExecute();
     289             : 
     290             :   /**
     291             :    * Receive packets of parallel data from other processors and executes work
     292             :    */
     293             :   bool receiveAndExecute();
     294             : 
     295             :   /**
     296             :    * Execute a chunk of work and buffer
     297             :    */
     298             :   void executeAndBuffer(const std::size_t chunk_size);
     299             : 
     300             :   /**
     301             :    * Internal check for if it is allowed to currently add work in moveWorkToBuffer().
     302             :    */
     303             :   void canMoveWorkCheck(const THREAD_ID tid);
     304             : 
     305             :   /**
     306             :    * Internal method for acting on the parallel data that has just been received into
     307             :    * the parallel buffer
     308             :    */
     309             :   void postReceiveParallelDataInternal();
     310             : 
     311             :   /// Minimum size of a SendBuffer
     312             :   const unsigned int _min_buffer_size;
     313             :   /// Number of objects to buffer before communication
     314             :   const unsigned int _max_buffer_size;
     315             :   /// Multiplier for the buffer size for growing the buffer
     316             :   const Real _buffer_growth_multiplier;
     317             :   /// Multiplier for the buffer size for shrinking the buffer
     318             :   const Real _buffer_shrink_multiplier;
     319             :   /// Number of objects to execute at once during communication
     320             :   const unsigned int _chunk_size;
     321             :   /// Whether or not to allow the addition of new work to the buffer during execution
     322             :   const bool _allow_new_work_during_execution;
     323             : 
     324             :   /// Iterations to wait before communicating
     325             :   const unsigned int _clicks_per_communication;
     326             :   /// Iterations to wait before communicating with root
     327             :   const unsigned int _clicks_per_root_communication;
     328             :   /// Iterations to wait before checking for new objects
     329             :   const unsigned int _clicks_per_receive;
     330             : 
     331             :   /// MessageTag for sending parallel data
     332             :   Parallel::MessageTag _parallel_data_buffer_tag;
     333             :   /// Pools for re-using destructed parallel data objects (one for each thread)
     334             :   std::vector<MooseUtils::SharedPool<ParallelDataType>> _parallel_data_pools;
     335             :   /// Threaded temprorary storage for work added while we're using the _work_buffer (one for each thread)
     336             :   std::vector<std::vector<WorkType>> _temp_threaded_work;
     337             :   /// Buffer for executing work
     338             :   const std::unique_ptr<MooseUtils::Buffer<WorkType>> _work_buffer;
     339             :   /// The receive buffer
     340             :   const std::unique_ptr<ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>
     341             :       _receive_buffer;
     342             :   /// Send buffers for each processor
     343             :   std::unordered_map<
     344             :       processor_id_type,
     345             :       std::unique_ptr<SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>>
     346             :       _send_buffers;
     347             : 
     348             :   /// Number of chunks of work executed on this processor
     349             :   unsigned long long int _local_chunks_executed;
     350             :   /// Amount of work completed on this processor
     351             :   unsigned long long int _local_work_completed;
     352             :   /// Amount of work started on this processor
     353             :   unsigned long long int _local_work_started;
     354             :   /// Amount of work executed on this processor
     355             :   unsigned long long int _local_work_executed;
     356             :   /// Amount of work started on all processors
     357             :   unsigned long long int _total_work_started;
     358             :   /// Amount of work completed on all processors
     359             :   unsigned long long int _total_work_completed;
     360             : 
     361             :   /// Whether we are within execute()
     362             :   bool _currently_executing;
     363             :   /// Whether we are between preExecute() and execute()
     364             :   bool _currently_pre_executing;
     365             :   /// Whether or not we are currently within executeAndBuffer()
     366             :   bool _currently_executing_work;
     367             : };
     368             : 
     369             : template <typename WorkType, typename ParallelDataType>
     370        3827 : ParallelStudy<WorkType, ParallelDataType>::ParallelStudy(
     371             :     const libMesh::Parallel::Communicator & comm,
     372             :     const InputParameters & params,
     373             :     const std::string & name)
     374             :   : ParallelObject(comm),
     375        3827 :     _pid(comm.rank()),
     376        3827 :     _name(name),
     377        3827 :     _params(params),
     378             : 
     379        3827 :     _method((ParallelStudyMethod)(int)(params.get<MooseEnum>("method"))),
     380        3827 :     _has_alternate_ending_criteria(false),
     381        7654 :     _min_buffer_size(params.isParamSetByUser("min_buffer_size")
     382        7654 :                          ? params.get<unsigned int>("min_buffer_size")
     383        7654 :                          : params.get<unsigned int>("send_buffer_size")),
     384        3827 :     _max_buffer_size(params.get<unsigned int>("send_buffer_size")),
     385        3827 :     _buffer_growth_multiplier(params.get<Real>("buffer_growth_multiplier")),
     386        3827 :     _buffer_shrink_multiplier(params.get<Real>("buffer_shrink_multiplier")),
     387        3827 :     _chunk_size(params.get<unsigned int>("chunk_size")),
     388        3827 :     _allow_new_work_during_execution(params.get<bool>("allow_new_work_during_execution")),
     389             : 
     390        3827 :     _clicks_per_communication(params.get<unsigned int>("clicks_per_communication")),
     391        3827 :     _clicks_per_root_communication(params.get<unsigned int>("clicks_per_root_communication")),
     392        3827 :     _clicks_per_receive(params.get<unsigned int>("clicks_per_receive")),
     393             : 
     394        3827 :     _parallel_data_buffer_tag(comm.get_unique_tag()),
     395        3827 :     _parallel_data_pools(libMesh::n_threads()),
     396        3827 :     _temp_threaded_work(libMesh::n_threads()),
     397        3827 :     _work_buffer(createWorkBuffer()),
     398        3827 :     _receive_buffer(std::make_unique<
     399             :                     ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>(
     400        3827 :         comm, this, _method, _clicks_per_receive, _parallel_data_buffer_tag)),
     401             : 
     402        3827 :     _currently_executing(false),
     403        3827 :     _currently_pre_executing(false),
     404        3827 :     _currently_executing_work(false)
     405             : {
     406             : #ifndef LIBMESH_HAVE_OPENMP
     407             :   if (libMesh::n_threads() != 1)
     408             :     mooseWarning(_name, ": Threading will not be used without OpenMP");
     409             : #endif
     410             : 
     411        3827 :   if (_method != ParallelStudyMethod::SMART && _allow_new_work_during_execution)
     412           0 :     mooseError(_name,
     413             :                ": When allowing new work addition during execution\n",
     414             :                "('allow_new_work_during_execution = true'), the method must be SMART");
     415        3827 : }
     416             : 
     417             : template <typename WorkType, typename ParallelDataType>
     418             : std::unique_ptr<MooseUtils::Buffer<WorkType>>
     419        3827 : ParallelStudy<WorkType, ParallelDataType>::createWorkBuffer()
     420             : {
     421        3827 :   std::unique_ptr<MooseUtils::Buffer<WorkType>> buffer;
     422             : 
     423        3827 :   const auto buffer_type = _params.get<MooseEnum>("work_buffer_type");
     424        3827 :   if (buffer_type == "lifo")
     425          19 :     buffer = std::make_unique<MooseUtils::LIFOBuffer<WorkType>>();
     426        3808 :   else if (buffer_type == "circular")
     427        3808 :     buffer = std::make_unique<MooseUtils::CircularBuffer<WorkType>>();
     428             :   else
     429           0 :     mooseError("Unknown work buffer type");
     430             : 
     431        3827 :   return buffer;
     432        3827 : }
     433             : 
     434             : template <typename WorkType, typename ParallelDataType>
     435             : InputParameters
     436        7576 : ParallelStudy<WorkType, ParallelDataType>::validParams()
     437             : {
     438        7576 :   auto params = emptyInputParameters();
     439             : 
     440       22728 :   params.addRangeCheckedParam<unsigned int>(
     441       15152 :       "send_buffer_size", 100, "send_buffer_size > 0", "The size of the send buffer");
     442       22728 :   params.addRangeCheckedParam<unsigned int>(
     443             :       "chunk_size",
     444       15152 :       100,
     445             :       "chunk_size > 0",
     446             :       "The number of objects to process at one time during execution");
     447       22728 :   params.addRangeCheckedParam<unsigned int>("clicks_per_communication",
     448       15152 :                                             10,
     449             :                                             "clicks_per_communication >= 0",
     450             :                                             "Iterations to wait before communicating");
     451       22728 :   params.addRangeCheckedParam<unsigned int>("clicks_per_root_communication",
     452       15152 :                                             10,
     453             :                                             "clicks_per_root_communication > 0",
     454             :                                             "Iterations to wait before communicating with root");
     455       22728 :   params.addRangeCheckedParam<unsigned int>("clicks_per_receive",
     456       15152 :                                             1,
     457             :                                             "clicks_per_receive > 0",
     458             :                                             "Iterations to wait before checking for new objects");
     459             : 
     460       15152 :   params.addParam<unsigned int>("min_buffer_size",
     461             :                                 "The initial size of the SendBuffer and the floor for shrinking "
     462             :                                 "it.  This defaults to send_buffer_size if not set (i.e. the "
     463             :                                 "buffer won't change size)");
     464       15152 :   params.addParam<Real>("buffer_growth_multiplier",
     465       15152 :                         2.,
     466             :                         "How much to grow a SendBuffer by if the buffer completely fills and "
     467             :                         "dumps.  Will max at send_buffer_size");
     468       22728 :   params.addRangeCheckedParam<Real>("buffer_shrink_multiplier",
     469       15152 :                                     0.5,
     470             :                                     "0 < buffer_shrink_multiplier <= 1.0",
     471             :                                     "Multiplier (between 0 and 1) to apply to the current buffer "
     472             :                                     "size if it is force dumped.  Will stop at "
     473             :                                     "min_buffer_size.");
     474             : 
     475       15152 :   params.addParam<bool>(
     476             :       "allow_new_work_during_execution",
     477       15152 :       true,
     478             :       "Whether or not to allow the addition of new work to the work buffer during execution");
     479             : 
     480       15152 :   MooseEnum methods("smart harm bs", "smart");
     481       15152 :   params.addParam<MooseEnum>("method", methods, "The algorithm to use");
     482             : 
     483       15152 :   MooseEnum work_buffers("lifo circular", "circular");
     484       15152 :   params.addParam<MooseEnum>("work_buffer_type", work_buffers, "The work buffer type to use");
     485             : 
     486       15152 :   params.addParamNamesToGroup(
     487             :       "send_buffer_size chunk_size clicks_per_communication clicks_per_root_communication "
     488             :       "clicks_per_receive min_buffer_size buffer_growth_multiplier buffer_shrink_multiplier method "
     489             :       "work_buffer_type allow_new_work_during_execution",
     490             :       "Advanced");
     491             : 
     492        7576 :   return params;
     493        7576 : }
     494             : 
     495             : template <typename WorkType, typename ParallelDataType>
     496             : void
     497       62535 : ParallelStudy<WorkType, ParallelDataType>::executeAndBuffer(const std::size_t chunk_size)
     498             : {
     499       62535 :   _currently_executing_work = true;
     500             : 
     501             :   // If chunk_size > the number of objects left, this will properly grab all of them
     502       62535 :   const auto begin = _work_buffer->beginChunk(chunk_size);
     503       62535 :   const auto end = _work_buffer->endChunk(chunk_size);
     504             : 
     505       62509 :   _local_chunks_executed++;
     506             : 
     507             : #ifdef LIBMESH_HAVE_OPENMP
     508       62509 : #pragma omp parallel
     509             : #endif
     510             :   {
     511             :     const THREAD_ID tid =
     512             : #ifdef LIBMESH_HAVE_OPENMP
     513             :         omp_get_thread_num();
     514             : #else
     515             :         0;
     516             : #endif
     517             : 
     518             : #ifdef LIBMESH_HAVE_OPENMP
     519             : #pragma omp for schedule(dynamic, 20) nowait
     520             : #endif
     521             :     for (auto it = begin; it < end; ++it)
     522             :       executeWork(*it, tid);
     523             :   }
     524             : 
     525             :   // Increment the executed and completed counters
     526       62509 :   _local_work_executed += std::distance(begin, end);
     527     5442679 :   for (auto it = begin; it != end; ++it)
     528     5380170 :     if (workIsComplete(*it))
     529     4120844 :       ++_local_work_completed;
     530             : 
     531             :   // Insertion point for derived classes to do something to the completed work
     532             :   // Example: Create ParallelData to spawn additional work on another processor
     533       62509 :   postExecuteChunk(begin, end);
     534             : 
     535             :   // Remove the objects we just worked on from the buffer
     536       62509 :   _work_buffer->eraseChunk(chunk_size);
     537             : 
     538             :   // If new work is allowed to be geneated during execution, it goes into _temp_threaded_work
     539             :   // during the threaded execution phase and then must be moved into the working buffer
     540       62509 :   if (_allow_new_work_during_execution)
     541             :   {
     542             :     // Amount of work that needs to be moved into the main working buffer from
     543             :     // the temporary working buffer
     544             :     std::size_t threaded_work_size = 0;
     545      139017 :     for (const auto & work_objects : _temp_threaded_work)
     546       76570 :       threaded_work_size += work_objects.size();
     547             : 
     548       62447 :     if (threaded_work_size)
     549             :     {
     550             :       // We don't ever want to decrease the capacity, so only set it if we need more entries
     551         718 :       if (_work_buffer->capacity() < _work_buffer->size() + threaded_work_size)
     552             :         _work_buffer->setCapacity(_work_buffer->size() + threaded_work_size);
     553             : 
     554             :       // Move the work into the buffer
     555        1593 :       for (auto & threaded_work_vector : _temp_threaded_work)
     556             :       {
     557       83315 :         for (auto & work : threaded_work_vector)
     558       82440 :           _work_buffer->move(work);
     559         875 :         threaded_work_vector.clear();
     560             :       }
     561             : 
     562             :       // Variable that must be set when adding work so that the algorithm can keep count
     563             :       // of how much work still needs to be executed
     564         718 :       _local_work_started += threaded_work_size;
     565             :     }
     566             :   }
     567             : 
     568       62509 :   if (_method == ParallelStudyMethod::HARM)
     569          24 :     flushSendBuffers();
     570             : 
     571       62509 :   _currently_executing_work = false;
     572       62509 : }
     573             : 
     574             : template <typename WorkType, typename ParallelDataType>
     575             : void
     576     1259326 : ParallelStudy<WorkType, ParallelDataType>::moveParallelDataToBuffer(
     577             :     std::shared_ptr<ParallelDataType> & data, const processor_id_type dest_pid)
     578             : {
     579             :   mooseAssert(comm().size() > dest_pid, "Invalid processor ID");
     580             :   mooseAssert(_pid != dest_pid, "Processor ID is self");
     581             : 
     582     1259326 :   if (!_currently_executing && !_currently_pre_executing)
     583           0 :     mooseError(_name, ": Cannot sendParallelData() when not executing");
     584             : 
     585             :   // Get the send buffer for the proc this object is going to
     586             :   auto find_pair = _send_buffers.find(dest_pid);
     587             :   // Need to create a send buffer for said processor
     588     1259326 :   if (find_pair == _send_buffers.end())
     589             :     _send_buffers
     590        3653 :         .emplace(dest_pid,
     591             :                  std::make_unique<
     592             :                      SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>(
     593             :                      comm(),
     594             :                      this,
     595             :                      dest_pid,
     596        3653 :                      _method,
     597        3653 :                      _min_buffer_size,
     598        3653 :                      _max_buffer_size,
     599        3653 :                      _buffer_growth_multiplier,
     600        3653 :                      _buffer_shrink_multiplier,
     601        3653 :                      _parallel_data_buffer_tag))
     602        3653 :         .first->second->moveObject(data);
     603             :   // Send buffer exists for this processor
     604             :   else
     605     1255673 :     find_pair->second->moveObject(data);
     606     1259326 : }
     607             : 
     608             : template <typename WorkType, typename ParallelDataType>
     609             : void
     610      445549 : ParallelStudy<WorkType, ParallelDataType>::flushSendBuffers()
     611             : {
     612      784066 :   for (auto & send_buffer_iter : _send_buffers)
     613      338517 :     send_buffer_iter.second->forceSend();
     614      445549 : }
     615             : 
     616             : template <typename WorkType, typename ParallelDataType>
     617             : void
     618        7561 : ParallelStudy<WorkType, ParallelDataType>::reserveBuffer(const std::size_t size)
     619             : {
     620        7561 :   if (!_currently_pre_executing)
     621           0 :     mooseError(_name, ": Can only reserve in object buffer during pre-execution");
     622             : 
     623             :   // We don't ever want to decrease the capacity, so only set if we need more entries
     624        7561 :   if (_work_buffer->capacity() < size)
     625             :     _work_buffer->setCapacity(size);
     626        7561 : }
     627             : 
     628             : template <typename WorkType, typename ParallelDataType>
     629             : void
     630     4417935 : ParallelStudy<WorkType, ParallelDataType>::postReceiveParallelDataInternal()
     631             : {
     632     4417935 :   if (_receive_buffer->buffer().empty())
     633             :     return;
     634             : 
     635             :   // Let derived classes work on the data and then clear it after
     636        7019 :   postReceiveParallelData(_receive_buffer->buffer().begin(), _receive_buffer->buffer().end());
     637     1266345 :   for (auto & data : _receive_buffer->buffer())
     638     1259326 :     if (data)
     639           0 :       data.reset();
     640             : 
     641             :   _receive_buffer->buffer().clear();
     642             : }
     643             : 
     644             : template <typename WorkType, typename ParallelDataType>
     645             : bool
     646     4403306 : ParallelStudy<WorkType, ParallelDataType>::receiveAndExecute()
     647             : {
     648             :   bool executed_some = false;
     649             : 
     650     4403306 :   if (_receive_buffer->currentlyReceiving() && _method == ParallelStudyMethod::SMART)
     651             :     _receive_buffer->cleanupRequests();
     652             :   else
     653     4403306 :     _receive_buffer->receive();
     654             : 
     655     4403306 :   postReceiveParallelDataInternal();
     656             : 
     657     4403306 :   preReceiveAndExecute();
     658             : 
     659     4465777 :   while (!_work_buffer->empty())
     660             :   {
     661             :     executed_some = true;
     662             : 
     663             :     // Switch between tracing a chunk and buffering with SMART
     664       62497 :     if (_method == ParallelStudyMethod::SMART)
     665             :     {
     666             :       // Look for extra work first so that these transfers can be finishing while we're executing
     667             :       // Start receives only if our work buffer is decently sized
     668       62473 :       const bool start_receives_only = _work_buffer->size() > (2 * _chunk_size);
     669       62473 :       _receive_buffer->receive(_work_buffer->size() > (2 * _chunk_size));
     670       62473 :       if (!start_receives_only)
     671       14554 :         postReceiveParallelDataInternal();
     672             : 
     673             :       // Execute some objects
     674       62473 :       executeAndBuffer(_chunk_size);
     675             :     }
     676             :     // Execute all of them and then buffer with the other methods
     677             :     else
     678          24 :       executeAndBuffer(_work_buffer->size());
     679             :   }
     680             : 
     681     4403280 :   return executed_some;
     682             : }
     683             : 
     684             : template <typename WorkType, typename ParallelDataType>
     685             : void
     686        7925 : ParallelStudy<WorkType, ParallelDataType>::smartExecute()
     687             : {
     688             :   mooseAssert(_method == ParallelStudyMethod::SMART, "Should be called with SMART only");
     689             : 
     690             :   // Request for the sum of the started work
     691        7925 :   Parallel::Request started_request;
     692             :   // Request for the sum of the completed work
     693        7925 :   Parallel::Request completed_request;
     694             : 
     695             :   // Temp for use in sending the current value in a nonblocking sum instead of an updated value
     696             :   unsigned long long int temp;
     697             : 
     698             :   // Whether or not to make the started request first, or after every finished request.
     699             :   // When allowing adding new work during the execution phase, the starting object counts could
     700             :   // change after right now, so we must update them after each finished request is complete.
     701             :   // When not allowing generation during propagation, we know the counts up front.
     702        7925 :   const bool started_request_first = !_allow_new_work_during_execution;
     703             : 
     704             :   // Get the amount of work that was started in the whole domain, if applicable
     705        7925 :   if (started_request_first)
     706           0 :     comm().sum(_local_work_started, _total_work_started, started_request);
     707             : 
     708             :   // Whether or not the started request has been made
     709             :   bool made_started_request = started_request_first;
     710             :   // Whether or not the completed request has been made
     711             :   bool made_completed_request = false;
     712             : 
     713             :   // Good time to get rid of whatever's currently in our SendBuffers
     714        7925 :   flushSendBuffers();
     715             : 
     716             :   // Use these to try to delay some forced communication
     717             :   unsigned int non_executing_clicks = 0;
     718             :   unsigned int non_executing_root_clicks = 0;
     719             :   bool executed_some = true;
     720             : 
     721             :   // Keep executing work until it has all completed
     722             :   while (true)
     723             :   {
     724     4403254 :     executed_some = receiveAndExecute();
     725             : 
     726     4403228 :     if (executed_some)
     727             :     {
     728             :       non_executing_clicks = 0;
     729             :       non_executing_root_clicks = 0;
     730             :     }
     731             :     else
     732             :     {
     733     4391913 :       non_executing_clicks++;
     734     4391913 :       non_executing_root_clicks++;
     735             :     }
     736             : 
     737     4403228 :     if (non_executing_clicks >= _clicks_per_communication)
     738             :     {
     739             :       non_executing_clicks = 0;
     740             : 
     741      437459 :       flushSendBuffers();
     742             :     }
     743             : 
     744     4403228 :     if (_has_alternate_ending_criteria)
     745             :     {
     746           0 :       if (buffersAreEmpty() && alternateSmartEndingCriteriaMet())
     747             :       {
     748           0 :         comm().barrier();
     749             :         return;
     750             :       }
     751             :     }
     752     4403228 :     else if (non_executing_root_clicks >= _clicks_per_root_communication)
     753             :     {
     754             :       non_executing_root_clicks = 0;
     755             : 
     756             :       // We need the starting work sum first but said request isn't complete yet
     757      437459 :       if (started_request_first && !started_request.test())
     758           0 :         continue;
     759             : 
     760             :       // At this point, we need to make a request for the completed work sum
     761      437459 :       if (!made_completed_request)
     762             :       {
     763             :         made_completed_request = true;
     764       14261 :         temp = _local_work_completed;
     765       14261 :         comm().sum(temp, _total_work_completed, completed_request);
     766       14261 :         continue;
     767             :       }
     768             : 
     769             :       // We have the completed work sum
     770      423198 :       if (completed_request.test())
     771             :       {
     772             :         // The starting work sum must be requested /after/ we have finishing counts and we
     773             :         // need to make the request for said sum
     774      153191 :         if (!made_started_request)
     775             :         {
     776             :           made_started_request = true;
     777       14261 :           temp = _local_work_started;
     778       14261 :           comm().sum(temp, _total_work_started, started_request);
     779       14261 :           continue;
     780             :         }
     781             : 
     782             :         // The starting work sum must be requested /after/ we have finishing sum and we
     783             :         // don't have the starting sum yet
     784      138930 :         if (!started_request_first && !started_request.test())
     785      124669 :           continue;
     786             : 
     787             :         // Started count is the same as the finished count - we're done!
     788       14261 :         if (_total_work_started == _total_work_completed)
     789             :           return;
     790             : 
     791             :         // Next time around we should make a completed sum request
     792             :         made_completed_request = false;
     793             :         // If we need the starting work sum after the completed work sum, we need those now as well
     794        6362 :         if (!started_request_first)
     795             :           made_started_request = false;
     796             :       }
     797             :     }
     798             :   }
     799        7899 : }
     800             : 
     801             : template <typename WorkType, typename ParallelDataType>
     802             : void
     803          14 : ParallelStudy<WorkType, ParallelDataType>::harmExecute()
     804             : {
     805          14 :   if (_has_alternate_ending_criteria)
     806           0 :     mooseError("ParallelStudy: Alternate ending criteria not yet supported for HARM");
     807          14 :   if (_allow_new_work_during_execution)
     808           0 :     mooseError(_name, ": The addition of new work during execution is not supported by HARM");
     809             :   mooseAssert(_method == ParallelStudyMethod::HARM, "Should be called with HARM only");
     810             : 
     811             :   // Request for the total amount of work started
     812          14 :   Parallel::Request work_started_request;
     813             :   // Requests for sending the amount of finished worked to every other processor
     814          14 :   std::vector<Parallel::Request> work_completed_requests(comm().size());
     815             :   // Whether or not the finished requests have been sent to each processor
     816          14 :   std::vector<bool> work_completed_requests_sent(comm().size(), false);
     817             :   // Values of work completed on this processor that are being sent to other processors
     818          14 :   std::vector<unsigned long long int> work_completed_requests_temps(comm().size(), 0);
     819             :   // Work completed by each processor
     820          14 :   std::vector<unsigned long long int> work_completed_per_proc(comm().size(), 0);
     821             :   // Tag for sending work finished
     822          14 :   const auto work_completed_requests_tag = comm().get_unique_tag();
     823             : 
     824             :   // Get the amount of work that was started in the whole domain
     825          14 :   comm().sum(_local_work_started, _total_work_started, work_started_request);
     826             : 
     827             :   // All work has been executed, so time to communicate
     828          14 :   flushSendBuffers();
     829             : 
     830             :   // HARM only does some communication based on times through the loop.
     831             :   // This counter will be used for that
     832             :   unsigned int communication_clicks = 0;
     833             : 
     834             :   Parallel::Status work_completed_probe_status;
     835             :   int work_completed_probe_flag;
     836             : 
     837             :   // Keep working until done
     838          38 :   while (true)
     839             :   {
     840          52 :     receiveAndExecute();
     841             : 
     842          52 :     flushSendBuffers();
     843             : 
     844          52 :     if (communication_clicks > comm().size())
     845             :     {
     846             :       // Receive messages about work being finished
     847             :       do
     848             :       {
     849          34 :         MPI_Iprobe(MPI_ANY_SOURCE,
     850             :                    work_completed_requests_tag.value(),
     851             :                    comm().get(),
     852             :                    &work_completed_probe_flag,
     853             :                    work_completed_probe_status.get());
     854             : 
     855          34 :         if (work_completed_probe_flag)
     856             :         {
     857             :           auto proc = work_completed_probe_status.source();
     858          20 :           comm().receive(proc, work_completed_per_proc[proc], work_completed_requests_tag);
     859             :         }
     860          34 :       } while (work_completed_probe_flag);
     861             : 
     862          28 :       _total_work_completed = std::accumulate(
     863             :           work_completed_per_proc.begin(), work_completed_per_proc.end(), _local_work_completed);
     864             : 
     865             :       // Reset
     866             :       communication_clicks = 0;
     867             :     }
     868             : 
     869             :     // Send messages about objects being finished
     870         144 :     for (processor_id_type pid = 0; pid < comm().size(); ++pid)
     871          40 :       if (pid != _pid &&
     872         132 :           (!work_completed_requests_sent[pid] || work_completed_requests[pid].test()) &&
     873          40 :           _local_work_completed > work_completed_requests_temps[pid])
     874             :       {
     875          20 :         work_completed_requests_temps[pid] = _local_work_completed;
     876          20 :         comm().send(pid,
     877             :                     work_completed_requests_temps[pid],
     878             :                     work_completed_requests[pid],
     879             :                     work_completed_requests_tag);
     880             :         work_completed_requests_sent[pid] = true;
     881             :       }
     882             : 
     883             :     // All procs agree on the amount of work started and we've finished all the work started
     884          52 :     if (work_started_request.test() && _total_work_started == _total_work_completed)
     885             :     {
     886             :       // Need to call the post wait work for all of the requests
     887          38 :       for (processor_id_type pid = 0; pid < comm().size(); ++pid)
     888          24 :         if (pid != _pid)
     889          10 :           work_completed_requests[pid].wait();
     890             : 
     891          14 :       return;
     892             :     }
     893             : 
     894          38 :     communication_clicks++;
     895             :   }
     896          28 : }
     897             : 
     898             : template <typename WorkType, typename ParallelDataType>
     899             : void
     900          14 : ParallelStudy<WorkType, ParallelDataType>::bsExecute()
     901             : {
     902          14 :   if (_has_alternate_ending_criteria)
     903           0 :     mooseError("ParallelStudy: Alternate ending criteria not yet supported for BS");
     904          14 :   if (_allow_new_work_during_execution)
     905           0 :     mooseError(_name, ": The addition of new work during execution is not supported by BS");
     906             :   mooseAssert(_method == ParallelStudyMethod::BS, "Should be called with BS only");
     907             : 
     908          14 :   Parallel::Request work_completed_probe_status;
     909          14 :   Parallel::Request work_completed_request;
     910             : 
     911             :   // Temp for use in sending the current value in a nonblocking sum instead of an updated value
     912             :   unsigned long long int temp;
     913             : 
     914             :   // Get the amount of work that were started in the whole domain
     915          14 :   comm().sum(_local_work_started, _total_work_started, work_completed_probe_status);
     916             : 
     917             :   // Keep working until done
     918          38 :   while (true)
     919             :   {
     920             :     bool receiving = false;
     921             :     bool sending = false;
     922             : 
     923          38 :     Parallel::Request some_left_request;
     924          38 :     unsigned int some_left = 0;
     925          38 :     unsigned int all_some_left = 1;
     926             : 
     927             :     do
     928             :     {
     929          75 :       _receive_buffer->receive();
     930          75 :       postReceiveParallelDataInternal();
     931          75 :       flushSendBuffers();
     932             : 
     933             :       receiving = _receive_buffer->currentlyReceiving();
     934             : 
     935             :       sending = false;
     936         109 :       for (auto & send_buffer : _send_buffers)
     937          34 :         sending = sending || send_buffer.second->currentlySending() ||
     938             :                   send_buffer.second->currentlyBuffered();
     939             : 
     940          75 :       if (!receiving && !sending && some_left_request.test() && all_some_left)
     941             :       {
     942          38 :         some_left = receiving || sending;
     943          38 :         comm().sum(some_left, all_some_left, some_left_request);
     944             :       }
     945         112 :     } while (receiving || sending || !some_left_request.test() || all_some_left);
     946             : 
     947          38 :     executeAndBuffer(_work_buffer->size());
     948             : 
     949          38 :     comm().barrier();
     950             : 
     951          38 :     if (work_completed_probe_status.test() && work_completed_request.test())
     952             :     {
     953          38 :       if (_total_work_started == _total_work_completed)
     954          14 :         return;
     955             : 
     956          24 :       temp = _local_work_completed;
     957          24 :       comm().sum(temp, _total_work_completed, work_completed_request);
     958             :     }
     959             :   }
     960          14 : }
     961             : 
     962             : template <typename WorkType, typename ParallelDataType>
     963             : void
     964        8023 : ParallelStudy<WorkType, ParallelDataType>::preExecute()
     965             : {
     966        8023 :   if (!buffersAreEmpty())
     967           0 :     mooseError(_name, ": Buffers are not empty in preExecute()");
     968             : 
     969             :   // Clear communication buffers
     970        9921 :   for (auto & send_buffer_pair : _send_buffers)
     971        1898 :     send_buffer_pair.second->clear();
     972             :   _send_buffers.clear();
     973        8023 :   _receive_buffer->clear();
     974             : 
     975             :   // Clear counters
     976        8023 :   _local_chunks_executed = 0;
     977        8023 :   _local_work_completed = 0;
     978        8023 :   _local_work_started = 0;
     979        8023 :   _local_work_executed = 0;
     980        8023 :   _total_work_started = 0;
     981        8023 :   _total_work_completed = 0;
     982             : 
     983        8023 :   _currently_pre_executing = true;
     984        8023 : }
     985             : 
     986             : template <typename WorkType, typename ParallelDataType>
     987             : void
     988        7953 : ParallelStudy<WorkType, ParallelDataType>::execute()
     989             : {
     990        7953 :   if (!_currently_pre_executing)
     991           0 :     mooseError(_name, ": preExecute() was not called before execute()");
     992             : 
     993        7953 :   _currently_pre_executing = false;
     994        7953 :   _currently_executing = true;
     995             : 
     996        7953 :   switch (_method)
     997             :   {
     998        7925 :     case ParallelStudyMethod::SMART:
     999        7925 :       smartExecute();
    1000        7899 :       break;
    1001          14 :     case ParallelStudyMethod::HARM:
    1002          14 :       harmExecute();
    1003          14 :       break;
    1004          14 :     case ParallelStudyMethod::BS:
    1005          14 :       bsExecute();
    1006          14 :       break;
    1007           0 :     default:
    1008           0 :       mooseError("Unknown ParallelStudyMethod");
    1009             :   }
    1010             : 
    1011        7927 :   _currently_executing = false;
    1012             : 
    1013             :   // Sanity checks on if we're really done
    1014        7927 :   comm().barrier();
    1015             : 
    1016        7927 :   if (!buffersAreEmpty())
    1017           0 :     mooseError(_name, ": Buffers are not empty after execution");
    1018        7927 : }
    1019             : 
    1020             : template <typename WorkType, typename ParallelDataType>
    1021             : void
    1022           0 : ParallelStudy<WorkType, ParallelDataType>::moveWorkError(
    1023             :     const MoveWorkError error, const WorkType * /* work = nullptr */) const
    1024             : {
    1025             :   if (error == MoveWorkError::DURING_EXECUTION_DISABLED)
    1026           0 :     mooseError(_name,
    1027             :                ": The moving of new work into the buffer during work execution requires\n",
    1028             :                "that the parameter 'allow_new_work_during_execution = true'");
    1029             :   if (error == MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY)
    1030           0 :     mooseError(
    1031           0 :         _name,
    1032             :         ": Can only move work into the buffer in the pre-execution and execution phase\n(between "
    1033             :         "preExecute() and the end of execute()");
    1034             :   if (error == MoveWorkError::PRE_EXECUTION_ONLY)
    1035           0 :     mooseError(_name,
    1036             :                ": Can only move work into the buffer in the pre-execution phase\n(between "
    1037             :                "preExecute() and execute()");
    1038             :   if (error == MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY)
    1039           0 :     mooseError(_name,
    1040             :                ": Can only move work into the buffer in the pre-execution phase\n(between "
    1041             :                "preExecute() and execute()) on thread 0");
    1042             :   if (error == CONTINUING_DURING_EXECUTING_WORK)
    1043           0 :     mooseError(_name, ": Cannot move continuing work into the buffer during executeAndBuffer()");
    1044             : 
    1045           0 :   mooseError("Unknown MoveWorkError");
    1046             : }
    1047             : 
    1048             : template <typename WorkType, typename ParallelDataType>
    1049             : void
    1050     4120172 : ParallelStudy<WorkType, ParallelDataType>::canMoveWorkCheck(const THREAD_ID tid)
    1051             : {
    1052     4120172 :   if (_currently_executing)
    1053             :   {
    1054       82440 :     if (!_allow_new_work_during_execution)
    1055           0 :       moveWorkError(MoveWorkError::DURING_EXECUTION_DISABLED);
    1056             :   }
    1057     4037732 :   else if (!_currently_pre_executing)
    1058             :   {
    1059           0 :     if (_allow_new_work_during_execution)
    1060           0 :       moveWorkError(MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY);
    1061             :     else
    1062           0 :       moveWorkError(MoveWorkError::PRE_EXECUTION_ONLY);
    1063             :   }
    1064     4037732 :   else if (tid != 0)
    1065           0 :     moveWorkError(MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY);
    1066     4120172 : }
    1067             : 
    1068             : template <typename WorkType, typename ParallelDataType>
    1069             : void
    1070     4119890 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(WorkType & work, const THREAD_ID tid)
    1071             : {
    1072             :   // Error checks for moving work into the buffer at unallowed times
    1073     4119890 :   canMoveWorkCheck(tid);
    1074             : 
    1075             :   // Can move directly into the work buffer on thread 0 when we're not executing work
    1076     4119890 :   if (!_currently_executing_work && tid == 0)
    1077             :   {
    1078     4037450 :     ++_local_work_started; // must ALWAYS increment when adding new work to the working buffer
    1079     4037450 :     _work_buffer->move(work);
    1080             :   }
    1081             :   // Objects added during execution go into a temporary threaded vector (is thread safe) to be
    1082             :   // moved into the working buffer when possible
    1083             :   else
    1084       82440 :     _temp_threaded_work[tid].emplace_back(std::move(work));
    1085     4119890 : }
    1086             : 
    1087             : template <typename WorkType, typename ParallelDataType>
    1088             : void
    1089         282 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(const work_iterator begin,
    1090             :                                                             const work_iterator end,
    1091             :                                                             const THREAD_ID tid)
    1092             : {
    1093             :   // Error checks for moving work into the buffer at unallowed times
    1094         282 :   canMoveWorkCheck(tid);
    1095             : 
    1096             :   // Get work size beforehand so we can resize
    1097             :   const auto size = std::distance(begin, end);
    1098             : 
    1099             :   // Can move directly into the work buffer on thread 0 when we're not executing work
    1100         282 :   if (!_currently_executing_work && tid == 0)
    1101             :   {
    1102         282 :     if (_work_buffer->capacity() < _work_buffer->size() + size)
    1103             :       _work_buffer->setCapacity(_work_buffer->size() + size);
    1104         282 :     _local_work_started += size;
    1105             :   }
    1106             :   else
    1107           0 :     _temp_threaded_work[tid].reserve(_temp_threaded_work[tid].size() + size);
    1108             : 
    1109             :   // Move the objects
    1110         282 :   if (!_currently_executing_work && tid == 0)
    1111        1272 :     for (auto it = begin; it != end; ++it)
    1112         990 :       _work_buffer->move(*it);
    1113             :   else
    1114           0 :     for (auto it = begin; it != end; ++it)
    1115           0 :       _temp_threaded_work[tid].emplace_back(std::move(*it));
    1116         282 : }
    1117             : 
    1118             : template <typename WorkType, typename ParallelDataType>
    1119             : void
    1120         282 : ParallelStudy<WorkType, ParallelDataType>::moveWorkToBuffer(std::vector<WorkType> & work_vector,
    1121             :                                                             const THREAD_ID tid)
    1122             : {
    1123         282 :   moveWorkToBuffer(work_vector.begin(), work_vector.end(), tid);
    1124         282 : }
    1125             : 
    1126             : template <typename WorkType, typename ParallelDataType>
    1127             : void
    1128             : ParallelStudy<WorkType, ParallelDataType>::moveContinuingWorkToBuffer(WorkType & work)
    1129             : {
    1130             :   if (_currently_executing_work)
    1131             :     moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
    1132             : 
    1133             :   _work_buffer->move(work);
    1134             : }
    1135             : 
    1136             : template <typename WorkType, typename ParallelDataType>
    1137             : void
    1138        7019 : ParallelStudy<WorkType, ParallelDataType>::moveContinuingWorkToBuffer(const work_iterator begin,
    1139             :                                                                       const work_iterator end)
    1140             : {
    1141        7019 :   if (_currently_executing_work)
    1142           0 :     moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
    1143             : 
    1144             :   const auto size = std::distance(begin, end);
    1145        7019 :   if (_work_buffer->capacity() < _work_buffer->size() + size)
    1146             :     _work_buffer->setCapacity(_work_buffer->size() + size);
    1147             : 
    1148     1266345 :   for (auto it = begin; it != end; ++it)
    1149     1259326 :     _work_buffer->move(*it);
    1150        7019 : }
    1151             : 
    1152             : template <typename WorkType, typename ParallelDataType>
    1153             : unsigned long long int
    1154          14 : ParallelStudy<WorkType, ParallelDataType>::sendBufferPoolCreated() const
    1155             : {
    1156             :   unsigned long long int total = 0;
    1157             : 
    1158          24 :   for (const auto & buffer : _send_buffers)
    1159          10 :     total += buffer.second->bufferPoolCreated();
    1160             : 
    1161          14 :   return total;
    1162             : }
    1163             : 
    1164             : template <typename WorkType, typename ParallelDataType>
    1165             : unsigned long long int
    1166          14 : ParallelStudy<WorkType, ParallelDataType>::parallelDataSent() const
    1167             : {
    1168             :   unsigned long long int total_sent = 0;
    1169             : 
    1170          24 :   for (const auto & buffer : _send_buffers)
    1171          10 :     total_sent += buffer.second->objectsSent();
    1172             : 
    1173          14 :   return total_sent;
    1174             : }
    1175             : 
    1176             : template <typename WorkType, typename ParallelDataType>
    1177             : unsigned long long int
    1178          14 : ParallelStudy<WorkType, ParallelDataType>::buffersSent() const
    1179             : {
    1180             :   unsigned long long int total_sent = 0;
    1181             : 
    1182          24 :   for (const auto & buffer : _send_buffers)
    1183          10 :     total_sent += buffer.second->buffersSent();
    1184             : 
    1185          14 :   return total_sent;
    1186             : }
    1187             : 
    1188             : template <typename WorkType, typename ParallelDataType>
    1189             : unsigned long long int
    1190             : ParallelStudy<WorkType, ParallelDataType>::poolParallelDataCreated() const
    1191             : {
    1192             :   unsigned long long int num_created = 0;
    1193             : 
    1194          31 :   for (const auto & pool : _parallel_data_pools)
    1195          17 :     num_created += pool.num_created();
    1196             : 
    1197             :   return num_created;
    1198             : }
    1199             : 
    1200             : template <typename WorkType, typename ParallelDataType>
    1201             : bool
    1202           0 : ParallelStudy<WorkType, ParallelDataType>::alternateSmartEndingCriteriaMet()
    1203             : {
    1204           0 :   mooseError(_name, ": Unimplemented alternateSmartEndingCriteriaMet()");
    1205             : }
    1206             : 
    1207             : template <typename WorkType, typename ParallelDataType>
    1208             : bool
    1209       15950 : ParallelStudy<WorkType, ParallelDataType>::buffersAreEmpty() const
    1210             : {
    1211       15950 :   if (!_work_buffer->empty())
    1212             :     return false;
    1213       36594 :   for (const auto & threaded_buffer : _temp_threaded_work)
    1214       20644 :     if (!threaded_buffer.empty())
    1215             :       return false;
    1216       15950 :   if (_receive_buffer->currentlyReceiving())
    1217             :     return false;
    1218       21501 :   for (const auto & map_pair : _send_buffers)
    1219        5551 :     if (map_pair.second->currentlySending() || map_pair.second->currentlyBuffered())
    1220             :       return false;
    1221             : 
    1222             :   return true;
    1223             : }

Generated by: LCOV version 1.14