26 #include "libmesh/parallel_object.h" 28 template <
typename WorkType,
typename ParallelDataType>
40 const std::string &
name);
71 template <
typename... Args>
340 const std::unique_ptr<ReceiveBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>
345 std::unique_ptr<SendBuffer<ParallelDataType, ParallelStudy<WorkType, ParallelDataType>>>>
369 template <
typename WorkType,
typename ParallelDataType>
373 const std::string &
name)
374 : ParallelObject(comm),
380 _has_alternate_ending_criteria(false),
381 _min_buffer_size(params.isParamSetByUser(
"min_buffer_size")
382 ? params.
get<unsigned
int>(
"min_buffer_size")
383 : params.
get<unsigned
int>(
"send_buffer_size")),
384 _max_buffer_size(params.
get<unsigned
int>(
"send_buffer_size")),
385 _buffer_growth_multiplier(params.
get<
Real>(
"buffer_growth_multiplier")),
386 _buffer_shrink_multiplier(params.
get<
Real>(
"buffer_shrink_multiplier")),
387 _chunk_size(params.
get<unsigned
int>(
"chunk_size")),
388 _allow_new_work_during_execution(params.
get<bool>(
"allow_new_work_during_execution")),
390 _clicks_per_communication(params.
get<unsigned
int>(
"clicks_per_communication")),
391 _clicks_per_root_communication(params.
get<unsigned
int>(
"clicks_per_root_communication")),
392 _clicks_per_receive(params.
get<unsigned
int>(
"clicks_per_receive")),
394 _parallel_data_buffer_tag(comm.get_unique_tag()),
397 _work_buffer(createWorkBuffer()),
398 _receive_buffer(
std::make_unique<
400 comm, this, _method, _clicks_per_receive, _parallel_data_buffer_tag)),
402 _currently_executing(false),
403 _currently_pre_executing(false),
404 _currently_executing_work(false)
406 #ifndef LIBMESH_HAVE_OPENMP 413 ": When allowing new work addition during execution\n",
414 "('allow_new_work_during_execution = true'), the method must be SMART");
417 template <
typename WorkType,
typename ParallelDataType>
418 std::unique_ptr<MooseUtils::Buffer<WorkType>>
421 std::unique_ptr<MooseUtils::Buffer<WorkType>> buffer;
423 const auto buffer_type = _params.get<
MooseEnum>(
"work_buffer_type");
424 if (buffer_type ==
"lifo")
425 buffer = std::make_unique<MooseUtils::LIFOBuffer<WorkType>>();
426 else if (buffer_type ==
"circular")
427 buffer = std::make_unique<MooseUtils::CircularBuffer<WorkType>>();
434 template <
typename WorkType,
typename ParallelDataType>
440 params.addRangeCheckedParam<
unsigned int>(
441 "send_buffer_size", 100,
"send_buffer_size > 0",
"The size of the send buffer");
442 params.addRangeCheckedParam<
unsigned int>(
446 "The number of objects to process at one time during execution");
447 params.addRangeCheckedParam<
unsigned int>(
"clicks_per_communication",
449 "clicks_per_communication >= 0",
450 "Iterations to wait before communicating");
451 params.addRangeCheckedParam<
unsigned int>(
"clicks_per_root_communication",
453 "clicks_per_root_communication > 0",
454 "Iterations to wait before communicating with root");
455 params.addRangeCheckedParam<
unsigned int>(
"clicks_per_receive",
457 "clicks_per_receive > 0",
458 "Iterations to wait before checking for new objects");
460 params.addParam<
unsigned int>(
"min_buffer_size",
461 "The initial size of the SendBuffer and the floor for shrinking " 462 "it. This defaults to send_buffer_size if not set (i.e. the " 463 "buffer won't change size)");
464 params.addParam<
Real>(
"buffer_growth_multiplier",
466 "How much to grow a SendBuffer by if the buffer completely fills and " 467 "dumps. Will max at send_buffer_size");
468 params.addRangeCheckedParam<
Real>(
"buffer_shrink_multiplier",
470 "0 < buffer_shrink_multiplier <= 1.0",
471 "Multiplier (between 0 and 1) to apply to the current buffer " 472 "size if it is force dumped. Will stop at " 475 params.addParam<
bool>(
476 "allow_new_work_during_execution",
478 "Whether or not to allow the addition of new work to the work buffer during execution");
480 MooseEnum methods(
"smart harm bs",
"smart");
481 params.addParam<
MooseEnum>(
"method", methods,
"The algorithm to use");
483 MooseEnum work_buffers(
"lifo circular",
"circular");
484 params.addParam<
MooseEnum>(
"work_buffer_type", work_buffers,
"The work buffer type to use");
486 params.addParamNamesToGroup(
487 "send_buffer_size chunk_size clicks_per_communication clicks_per_root_communication " 488 "clicks_per_receive min_buffer_size buffer_growth_multiplier buffer_shrink_multiplier method " 489 "work_buffer_type allow_new_work_during_execution",
495 template <
typename WorkType,
typename ParallelDataType>
499 _currently_executing_work =
true;
502 const auto begin = _work_buffer->beginChunk(chunk_size);
503 const auto end = _work_buffer->endChunk(chunk_size);
505 _local_chunks_executed++;
507 #ifdef LIBMESH_HAVE_OPENMP 512 #ifdef LIBMESH_HAVE_OPENMP 513 omp_get_thread_num();
518 #ifdef LIBMESH_HAVE_OPENMP 519 #pragma omp for schedule(dynamic, 20) nowait 521 for (
auto it = begin; it < end; ++it)
522 executeWork(*it, tid);
526 _local_work_executed += std::distance(begin, end);
527 for (
auto it = begin; it != end; ++it)
528 if (workIsComplete(*it))
529 ++_local_work_completed;
533 postExecuteChunk(begin, end);
536 _work_buffer->eraseChunk(chunk_size);
540 if (_allow_new_work_during_execution)
544 std::size_t threaded_work_size = 0;
545 for (
const auto & work_objects : _temp_threaded_work)
546 threaded_work_size += work_objects.size();
548 if (threaded_work_size)
551 if (_work_buffer->capacity() < _work_buffer->size() + threaded_work_size)
552 _work_buffer->setCapacity(_work_buffer->size() + threaded_work_size);
555 for (
auto & threaded_work_vector : _temp_threaded_work)
557 for (
auto & work : threaded_work_vector)
558 _work_buffer->move(work);
559 threaded_work_vector.clear();
564 _local_work_started += threaded_work_size;
571 _currently_executing_work =
false;
574 template <
typename WorkType,
typename ParallelDataType>
579 mooseAssert(comm().size() > dest_pid,
"Invalid processor ID");
580 mooseAssert(_pid != dest_pid,
"Processor ID is self");
582 if (!_currently_executing && !_currently_pre_executing)
583 mooseError(_name,
": Cannot sendParallelData() when not executing");
586 auto find_pair = _send_buffers.find(dest_pid);
588 if (find_pair == _send_buffers.end())
599 _buffer_growth_multiplier,
600 _buffer_shrink_multiplier,
601 _parallel_data_buffer_tag))
602 .first->second->moveObject(data);
605 find_pair->second->moveObject(data);
608 template <
typename WorkType,
typename ParallelDataType>
612 for (
auto & send_buffer_iter : _send_buffers)
613 send_buffer_iter.second->forceSend();
616 template <
typename WorkType,
typename ParallelDataType>
620 if (!_currently_pre_executing)
621 mooseError(_name,
": Can only reserve in object buffer during pre-execution");
624 if (_work_buffer->capacity() < size)
625 _work_buffer->setCapacity(size);
628 template <
typename WorkType,
typename ParallelDataType>
632 if (_receive_buffer->buffer().empty())
636 postReceiveParallelData(_receive_buffer->buffer().begin(), _receive_buffer->buffer().end());
637 for (
auto & data : _receive_buffer->buffer())
641 _receive_buffer->buffer().clear();
644 template <
typename WorkType,
typename ParallelDataType>
648 bool executed_some =
false;
651 _receive_buffer->cleanupRequests();
653 _receive_buffer->receive();
655 postReceiveParallelDataInternal();
657 preReceiveAndExecute();
659 while (!_work_buffer->empty())
661 executed_some =
true;
668 const bool start_receives_only = _work_buffer->size() > (2 * _chunk_size);
669 _receive_buffer->receive(_work_buffer->size() > (2 * _chunk_size));
670 if (!start_receives_only)
671 postReceiveParallelDataInternal();
674 executeAndBuffer(_chunk_size);
678 executeAndBuffer(_work_buffer->size());
681 return executed_some;
684 template <
typename WorkType,
typename ParallelDataType>
691 Parallel::Request started_request;
693 Parallel::Request completed_request;
696 unsigned long long int temp;
702 const bool started_request_first = !_allow_new_work_during_execution;
705 if (started_request_first)
706 comm().sum(_local_work_started, _total_work_started, started_request);
709 bool made_started_request = started_request_first;
711 bool made_completed_request =
false;
717 unsigned int non_executing_clicks = 0;
718 unsigned int non_executing_root_clicks = 0;
719 bool executed_some =
true;
724 executed_some = receiveAndExecute();
728 non_executing_clicks = 0;
729 non_executing_root_clicks = 0;
733 non_executing_clicks++;
734 non_executing_root_clicks++;
737 if (non_executing_clicks >= _clicks_per_communication)
739 non_executing_clicks = 0;
744 if (_has_alternate_ending_criteria)
746 if (buffersAreEmpty() && alternateSmartEndingCriteriaMet())
752 else if (non_executing_root_clicks >= _clicks_per_root_communication)
754 non_executing_root_clicks = 0;
757 if (started_request_first && !started_request.test())
761 if (!made_completed_request)
763 made_completed_request =
true;
764 temp = _local_work_completed;
765 comm().sum(temp, _total_work_completed, completed_request);
770 if (completed_request.test())
774 if (!made_started_request)
776 made_started_request =
true;
777 temp = _local_work_started;
778 comm().sum(temp, _total_work_started, started_request);
784 if (!started_request_first && !started_request.test())
788 if (_total_work_started == _total_work_completed)
792 made_completed_request =
false;
794 if (!started_request_first)
795 made_started_request =
false;
801 template <
typename WorkType,
typename ParallelDataType>
805 if (_has_alternate_ending_criteria)
806 mooseError(
"ParallelStudy: Alternate ending criteria not yet supported for HARM");
807 if (_allow_new_work_during_execution)
808 mooseError(_name,
": The addition of new work during execution is not supported by HARM");
812 Parallel::Request work_started_request;
814 std::vector<Parallel::Request> work_completed_requests(comm().size());
816 std::vector<bool> work_completed_requests_sent(comm().size(),
false);
818 std::vector<unsigned long long int> work_completed_requests_temps(comm().size(), 0);
820 std::vector<unsigned long long int> work_completed_per_proc(comm().size(), 0);
822 const auto work_completed_requests_tag = comm().get_unique_tag();
825 comm().sum(_local_work_started, _total_work_started, work_started_request);
832 unsigned int communication_clicks = 0;
834 Parallel::Status work_completed_probe_status;
835 int work_completed_probe_flag;
844 if (communication_clicks > comm().size())
849 MPI_Iprobe(MPI_ANY_SOURCE,
850 work_completed_requests_tag.value(),
852 &work_completed_probe_flag,
853 work_completed_probe_status.get());
855 if (work_completed_probe_flag)
857 auto proc = work_completed_probe_status.source();
858 comm().receive(proc, work_completed_per_proc[proc], work_completed_requests_tag);
860 }
while (work_completed_probe_flag);
862 _total_work_completed = std::accumulate(
863 work_completed_per_proc.begin(), work_completed_per_proc.end(), _local_work_completed);
866 communication_clicks = 0;
872 (!work_completed_requests_sent[pid] || work_completed_requests[pid].test()) &&
873 _local_work_completed > work_completed_requests_temps[pid])
875 work_completed_requests_temps[pid] = _local_work_completed;
877 work_completed_requests_temps[pid],
878 work_completed_requests[pid],
879 work_completed_requests_tag);
880 work_completed_requests_sent[pid] =
true;
884 if (work_started_request.test() && _total_work_started == _total_work_completed)
889 work_completed_requests[pid].wait();
894 communication_clicks++;
898 template <
typename WorkType,
typename ParallelDataType>
902 if (_has_alternate_ending_criteria)
903 mooseError(
"ParallelStudy: Alternate ending criteria not yet supported for BS");
904 if (_allow_new_work_during_execution)
905 mooseError(_name,
": The addition of new work during execution is not supported by BS");
908 Parallel::Request work_completed_probe_status;
909 Parallel::Request work_completed_request;
912 unsigned long long int temp;
915 comm().sum(_local_work_started, _total_work_started, work_completed_probe_status);
920 bool receiving =
false;
921 bool sending =
false;
923 Parallel::Request some_left_request;
924 unsigned int some_left = 0;
925 unsigned int all_some_left = 1;
929 _receive_buffer->receive();
930 postReceiveParallelDataInternal();
933 receiving = _receive_buffer->currentlyReceiving();
936 for (
auto & send_buffer : _send_buffers)
937 sending = sending || send_buffer.second->currentlySending() ||
938 send_buffer.second->currentlyBuffered();
940 if (!receiving && !sending && some_left_request.test() && all_some_left)
942 some_left = receiving || sending;
943 comm().sum(some_left, all_some_left, some_left_request);
945 }
while (receiving || sending || !some_left_request.test() || all_some_left);
947 executeAndBuffer(_work_buffer->size());
951 if (work_completed_probe_status.test() && work_completed_request.test())
953 if (_total_work_started == _total_work_completed)
956 temp = _local_work_completed;
957 comm().sum(temp, _total_work_completed, work_completed_request);
962 template <
typename WorkType,
typename ParallelDataType>
966 if (!buffersAreEmpty())
967 mooseError(_name,
": Buffers are not empty in preExecute()");
970 for (
auto & send_buffer_pair : _send_buffers)
971 send_buffer_pair.second->clear();
972 _send_buffers.clear();
973 _receive_buffer->clear();
976 _local_chunks_executed = 0;
977 _local_work_completed = 0;
978 _local_work_started = 0;
979 _local_work_executed = 0;
980 _total_work_started = 0;
981 _total_work_completed = 0;
983 _currently_pre_executing =
true;
986 template <
typename WorkType,
typename ParallelDataType>
990 if (!_currently_pre_executing)
991 mooseError(_name,
": preExecute() was not called before execute()");
993 _currently_pre_executing =
false;
994 _currently_executing =
true;
1011 _currently_executing =
false;
1016 if (!buffersAreEmpty())
1017 mooseError(_name,
": Buffers are not empty after execution");
1020 template <
typename WorkType,
typename ParallelDataType>
1025 if (error == MoveWorkError::DURING_EXECUTION_DISABLED)
1027 ": The moving of new work into the buffer during work execution requires\n",
1028 "that the parameter 'allow_new_work_during_execution = true'");
1029 if (error == MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY)
1032 ": Can only move work into the buffer in the pre-execution and execution phase\n(between " 1033 "preExecute() and the end of execute()");
1034 if (error == MoveWorkError::PRE_EXECUTION_ONLY)
1036 ": Can only move work into the buffer in the pre-execution phase\n(between " 1037 "preExecute() and execute()");
1038 if (error == MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY)
1040 ": Can only move work into the buffer in the pre-execution phase\n(between " 1041 "preExecute() and execute()) on thread 0");
1042 if (error == CONTINUING_DURING_EXECUTING_WORK)
1043 mooseError(_name,
": Cannot move continuing work into the buffer during executeAndBuffer()");
1048 template <
typename WorkType,
typename ParallelDataType>
1052 if (_currently_executing)
1054 if (!_allow_new_work_during_execution)
1055 moveWorkError(MoveWorkError::DURING_EXECUTION_DISABLED);
1057 else if (!_currently_pre_executing)
1059 if (_allow_new_work_during_execution)
1060 moveWorkError(MoveWorkError::PRE_EXECUTION_AND_EXECUTION_ONLY);
1062 moveWorkError(MoveWorkError::PRE_EXECUTION_ONLY);
1065 moveWorkError(MoveWorkError::PRE_EXECUTION_THREAD_0_ONLY);
1068 template <
typename WorkType,
typename ParallelDataType>
1073 canMoveWorkCheck(tid);
1076 if (!_currently_executing_work && tid == 0)
1078 ++_local_work_started;
1079 _work_buffer->move(work);
1084 _temp_threaded_work[tid].emplace_back(std::move(work));
1087 template <
typename WorkType,
typename ParallelDataType>
1094 canMoveWorkCheck(tid);
1097 const auto size = std::distance(begin, end);
1100 if (!_currently_executing_work && tid == 0)
1102 if (_work_buffer->capacity() < _work_buffer->size() + size)
1103 _work_buffer->setCapacity(_work_buffer->size() + size);
1104 _local_work_started += size;
1107 _temp_threaded_work[tid].reserve(_temp_threaded_work[tid].size() + size);
1110 if (!_currently_executing_work && tid == 0)
1111 for (
auto it = begin; it != end; ++it)
1112 _work_buffer->move(*it);
1114 for (
auto it = begin; it != end; ++it)
1115 _temp_threaded_work[tid].emplace_back(std::move(*it));
1118 template <
typename WorkType,
typename ParallelDataType>
1123 moveWorkToBuffer(work_vector.begin(), work_vector.end(), tid);
1126 template <
typename WorkType,
typename ParallelDataType>
1130 if (_currently_executing_work)
1131 moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1133 _work_buffer->move(work);
1136 template <
typename WorkType,
typename ParallelDataType>
1141 if (_currently_executing_work)
1142 moveWorkError(MoveWorkError::CONTINUING_DURING_EXECUTING_WORK);
1144 const auto size = std::distance(begin, end);
1145 if (_work_buffer->capacity() < _work_buffer->size() + size)
1146 _work_buffer->setCapacity(_work_buffer->size() + size);
1148 for (
auto it = begin; it != end; ++it)
1149 _work_buffer->move(*it);
1152 template <
typename WorkType,
typename ParallelDataType>
1153 unsigned long long int 1156 unsigned long long int total = 0;
1158 for (
const auto & buffer : _send_buffers)
1159 total += buffer.second->bufferPoolCreated();
1164 template <
typename WorkType,
typename ParallelDataType>
1165 unsigned long long int 1168 unsigned long long int total_sent = 0;
1170 for (
const auto & buffer : _send_buffers)
1171 total_sent += buffer.second->objectsSent();
1176 template <
typename WorkType,
typename ParallelDataType>
1177 unsigned long long int 1180 unsigned long long int total_sent = 0;
1182 for (
const auto & buffer : _send_buffers)
1183 total_sent += buffer.second->buffersSent();
1188 template <
typename WorkType,
typename ParallelDataType>
1189 unsigned long long int 1192 unsigned long long int num_created = 0;
1194 for (
const auto & pool : _parallel_data_pools)
1195 num_created += pool.num_created();
1200 template <
typename WorkType,
typename ParallelDataType>
1204 mooseError(_name,
": Unimplemented alternateSmartEndingCriteriaMet()");
1207 template <
typename WorkType,
typename ParallelDataType>
1211 if (!_work_buffer->empty())
1213 for (
const auto & threaded_buffer : _temp_threaded_work)
1214 if (!threaded_buffer.empty())
1216 if (_receive_buffer->currentlyReceiving())
1218 for (
const auto & map_pair : _send_buffers)
1219 if (map_pair.second->currentlySending() || map_pair.second->currentlyBuffered())
void reserveBuffer(const std::size_t size)
Reserve size entries in the work buffer.
const std::unique_ptr< ReceiveBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > > _receive_buffer
The receive buffer.
unsigned int clicksPerCommunication() const
Gets the number of iterations to wait before communicating.
const unsigned int _max_buffer_size
Number of objects to buffer before communication.
const unsigned int _chunk_size
Number of objects to execute at once during communication.
unsigned long long int poolParallelDataCreated() const
Gets the total number of parallel data created in all of the threaded pools.
unsigned long long int parallelDataSent() const
Gets the total number of parallel data objects sent from this processor.
std::vector< std::vector< WorkType > > _temp_threaded_work
Threaded temprorary storage for work added while we're using the _work_buffer (one for each thread) ...
const unsigned int _clicks_per_communication
Iterations to wait before communicating.
unsigned long long int localChunksExecuted() const
Gets the total number of chunks of work executed on this processor.
void execute()
Execute method.
const Real _buffer_shrink_multiplier
Multiplier for the buffer size for shrinking the buffer.
const unsigned int _min_buffer_size
Minimum size of a SendBuffer.
void mooseError(Args &&... args)
unsigned long long int totalWorkCompleted() const
Gets the total amount of work completeed across all processors.
bool _currently_pre_executing
Whether we are between preExecute() and execute()
unsigned long long int _local_work_executed
Amount of work executed on this processor.
unsigned long long int localWorkStarted() const
Gets the total amount of work started from this processor.
void mooseWarning(Args &&... args)
void moveWorkToBuffer(WorkType &work, const THREAD_ID tid)
Adds work to the buffer to be executed.
virtual bool alternateSmartEndingCriteriaMet()
Insertion point for derived classes to provide an alternate ending criteria for SMART execution...
const std::string _name
Name for this object for use in error handling.
void canMoveWorkCheck(const THREAD_ID tid)
Internal check for if it is allowed to currently add work in moveWorkToBuffer().
const Parallel::Communicator & comm() const
bool _currently_executing
Whether we are within execute()
ParallelStudyMethod method() const
Gets the method.
unsigned long long int _local_work_completed
Amount of work completed on this processor.
void flushSendBuffers()
Flushes all parallel data out of the send buffers.
const std::unique_ptr< MooseUtils::Buffer< WorkType > > _work_buffer
Buffer for executing work.
std::vector< T >::iterator iterator
The following methods are specializations for using the Parallel::packed_range_* routines for a vecto...
const processor_id_type _pid
This rank.
void moveParallelDataToBuffer(std::shared_ptr< ParallelDataType > &data, const processor_id_type dest_pid)
Moves parallel data objects to the send buffer to be communicated to processor dest_pid.
const Real _buffer_growth_multiplier
Multiplier for the buffer size for growing the buffer.
const ParallelStudyMethod _method
The study method.
uint8_t processor_id_type
void moveContinuingWorkToBuffer(WorkType &Work)
Moves work that is considered continuing for the purposes of the execution algorithm into the buffer...
virtual bool workIsComplete(const WorkType &)
Can be overridden to denote if a piece of work is not complete yet.
void smartExecute()
Execute work using SMART.
std::unique_ptr< T, ExternalDeleter > PtrType
bool receiveAndExecute()
Receive packets of parallel data from other processors and executes work.
ParallelStudy(const libMesh::Parallel::Communicator &comm, const InputParameters ¶ms, const std::string &name)
virtual void preReceiveAndExecute()
Insertion point called just after trying to receive work and just before beginning work on the work b...
const unsigned int _clicks_per_root_communication
Iterations to wait before communicating with root.
bool buffersAreEmpty() const
Whether or not ALL of the buffers are empty: Working buffer, threaded buffers, receive buffer...
MoveWorkError
Enum for providing useful errors during work addition in moveWorkError().
static InputParameters validParams()
unsigned long long int _local_chunks_executed
Number of chunks of work executed on this processor.
unsigned int chunkSize() const
Gets the chunk size.
void harmExecute()
Execute work using HARM.
Parallel::MessageTag _parallel_data_buffer_tag
MessageTag for sending parallel data.
unsigned long long int _total_work_completed
Amount of work completed on all processors.
virtual void moveWorkError(const MoveWorkError error, const WorkType *work=nullptr) const
Virtual that allows for the customization of error text for moving work into the buffer.
unsigned int clicksPerRootCommunication() const
Gets the number of iterations to wait before communicating with root.
MooseUtils::SharedPool< ParallelDataType >::PtrType acquireParallelData(const THREAD_ID tid, Args &&... args)
Acquire a parallel data object from the pool.
void preExecute()
Pre-execute method that MUST be called before execute() and before adding work.
void postReceiveParallelDataInternal()
Internal method for acting on the parallel data that has just been received into the parallel buffer...
const ReceiveBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > & receiveBuffer() const
Gets the receive buffer.
unsigned long long int _local_work_started
Amount of work started on this processor.
std::vector< MooseUtils::SharedPool< ParallelDataType > > _parallel_data_pools
Pools for re-using destructed parallel data objects (one for each thread)
void bsExecute()
Execute work using BS.
const MooseUtils::Buffer< WorkType > & workBuffer() const
Gets the work buffer.
virtual void postReceiveParallelData(const parallel_data_iterator begin, const parallel_data_iterator end)=0
Pure virtual for acting on parallel data that has JUST been received and filled into the buffer...
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
const bool _allow_new_work_during_execution
Whether or not to allow the addition of new work to the buffer during execution.
MooseUtils::Buffer< WorkType >::iterator work_iterator
unsigned long long int buffersSent() const
Gets the total number of buffers sent from this processor.
MooseUtils::Buffer< std::shared_ptr< ParallelDataType > >::iterator parallel_data_iterator
virtual void postExecuteChunk(const work_iterator, const work_iterator)
Insertion point for acting on work that was just executed.
bool currentlyPreExecuting() const
Whether or not this object is between preExecute() and execute().
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
const InputParameters & _params
The InputParameters.
bool _currently_executing_work
Whether or not we are currently within executeAndBuffer()
unsigned int maxBufferSize() const
Gets the max buffer size.
bool currentlyExecuting() const
Whether or not this object is currently in execute().
void executeAndBuffer(const std::size_t chunk_size)
Execute a chunk of work and buffer.
virtual std::unique_ptr< MooseUtils::Buffer< WorkType > > createWorkBuffer()
Creates the work buffer.
virtual void executeWork(const WorkType &work, const THREAD_ID tid)=0
Pure virtual to be overridden that executes a single object of work on a given thread.
unsigned int clicksPerReceive() const
Gets the number of iterations to wait before checking for new parallel data.
void ErrorVector unsigned int
const Elem & get(const ElemType type_in)
unsigned long long int _total_work_started
Amount of work started on all processors.
bool _has_alternate_ending_criteria
Whether or not this object has alternate ending criteria.
unsigned long long int localWorkExecuted() const
Gets the total amount of work executed on this processor.
std::unordered_map< processor_id_type, std::unique_ptr< SendBuffer< ParallelDataType, ParallelStudy< WorkType, ParallelDataType > > > > _send_buffers
Send buffers for each processor.
unsigned long long int sendBufferPoolCreated() const
Gets the total number of send buffer pools created.