20 #include "libmesh/parallel.h" 21 #include "libmesh/parallel_object.h" 27 template <
typename Object,
typename Context>
34 const unsigned int clicks_per_receive,
35 const Parallel::MessageTag object_buffer_tag);
77 void receive(
const bool start_receives_only =
false);
119 std::list<std::pair<std::shared_ptr<Parallel::Request>,
120 std::shared_ptr<std::vector<std::shared_ptr<Object>>>>>
142 std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
145 template <
typename C,
typename OutputIter,
typename T>
151 Parallel::Request & req,
152 Parallel::Status & stat,
153 const Parallel::MessageTag & tag)
const 155 libmesh_experimental();
157 typedef typename Parallel::Packing<T>::buffer_type buffer_t;
163 std::vector<buffer_t> *
buffer =
new std::vector<buffer_t>(stat.size());
167 req.add_post_wait_work(
168 new libMesh::Parallel::PostWaitUnpackBuffer<std::vector<buffer_t>,
C, OutputIter, T>(
172 req.add_post_wait_work(
173 new libMesh::Parallel::PostWaitDeleteBuffer<std::vector<buffer_t>>(
buffer));
177 template <
typename Object,
typename Context>
181 const unsigned int clicks_per_receive,
182 const Parallel::MessageTag object_buffer_tag)
183 : ParallelObject(comm),
186 _clicks_per_receive(clicks_per_receive),
187 _object_buffer_tag(object_buffer_tag)
191 template <
typename Object,
typename Context>
194 if (!_requests.empty())
198 template <
typename Object,
typename Context>
203 Parallel::Status stat;
205 static unsigned int current_clicks = 0;
207 if (current_clicks % _clicks_per_receive == 0)
214 stat = _communicator.template packed_range_probe<std::shared_ptr<Object>>(
215 Parallel::any_source, _object_buffer_tag, flag);
221 auto req = std::make_shared<Parallel::Request>();
222 std::shared_ptr<std::vector<std::shared_ptr<Object>>> objects =
223 _object_buffer_pool.acquire();
229 blocking_receive_packed_range(comm(),
232 std::back_inserter(*objects),
233 (std::shared_ptr<Object> *)(libmesh_nullptr),
240 std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
241 buffer = _buffer_pool.acquire();
243 _communicator.nonblocking_receive_packed_range(
246 std::back_inserter(*objects),
247 (std::shared_ptr<Object> *)(libmesh_nullptr),
254 _requests.emplace_back(req, objects);
261 if (!start_receives_only)
265 template <
typename Object,
typename Context>
271 _receive_loop_time = std::chrono::steady_clock::duration::zero();
272 _cleanup_requests_time = std::chrono::steady_clock::duration::zero();
274 _objects_received = 0;
275 _buffers_received = 0;
279 template <
typename Object,
typename Context>
284 [&](std::pair<std::shared_ptr<Parallel::Request>,
285 std::shared_ptr<std::vector<std::shared_ptr<Object>>>> & request_pair)
287 auto req = request_pair.first;
288 auto objects = request_pair.second;
296 _objects_received += objects->size();
298 if (_buffer.capacity() < _buffer.size() + objects->size())
299 _buffer.setCapacity(_buffer.size() + objects->size());
301 for (
auto &
object : *objects)
302 _buffer.move(
object);
size_t num_created() const
ReceiveBuffer(const libMesh::Parallel::Communicator &comm, Context *const context, const ParallelStudyMethod method, const unsigned int clicks_per_receive, const Parallel::MessageTag object_buffer_tag)
void cleanupRequests()
Checks to see if any Requests can be finished.
unsigned long int numProbes() const
The total number of times we've polled for messages.
Context *const _context
The context.
unsigned long int objectsReceived() const
The work received since the last reset.
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
unsigned long int bufferPoolCreated() const
Number of buffers created in the buffer pool.
void mooseError(Args &&... args)
unsigned long int buffersReceived() const
The number of buffers received since the last reset.
const Parallel::Communicator & comm() const
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > & buffer()
Gets the buffer that the received objects are filled into after the requests are finished.
bool currentlyReceiving() const
Whether or not there are messages that are being currently received.
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.
const ParallelStudyMethod _method
The method.
void clear()
Clear all existing data.
std::chrono::steady_clock::duration _receive_loop_time
Receive loop time.
uint8_t processor_id_type
unsigned long int _buffers_received
Total object buffers received.
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
unsigned long int objectPoolCreated() const
Number of buffers created in the object buffer pool.
unsigned long int _num_probes
Total number of times we've polled for messages.
~ReceiveBuffer() override
Destructor: ensures that all send requests have completed.
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
unsigned long int _objects_received
Total objects received.
void blocking_receive_packed_range(const Parallel::Communicator &comm, const processor_id_type src_processor_id, C *context, OutputIter out, const T *, Parallel::Request &req, Parallel::Status &stat, const Parallel::MessageTag &tag) const
std::chrono::steady_clock::duration _cleanup_requests_time
Time cleaning up requests.
Real cleanupRequestsTime() const
Amount of time (in seconds) spent finishing receives and collecting objects.
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.
Real receiveLoopTime() const
Amount of time (in seconds) spent in the loop checking for messages and creating Receives.
void receive(const bool start_receives_only=false)
Start receives for all currently available messages.
MooseUtils::SharedPool< std::vector< std::shared_ptr< Object > > > _object_buffer_pool
Shared pool of object buffers for incoming messages.
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > _buffer
The buffer that finished requests are filled into.
static const std::string C