https://mooseframework.inl.gov
ReceiveBuffer.h
Go to the documentation of this file.
1 //* This file is part of the MOOSE framework
2 //* https://mooseframework.inl.gov
3 //*
4 //* All rights reserved, see COPYRIGHT for full restrictions
5 //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 //*
7 //* Licensed under LGPL 2.1, please see LICENSE for details
8 //* https://www.gnu.org/licenses/lgpl-2.1.html
9 
10 #pragma once
11 
12 // Moose Includes
13 #include "LIFOBuffer.h"
14 #include "SharedPool.h"
15 
16 // Local includes
17 #include "ParallelStudyMethod.h"
18 
19 // libMesh Includes
20 #include "libmesh/parallel.h"
21 #include "libmesh/parallel_object.h"
22 
23 // System Includes
24 #include <list>
25 #include <chrono>
26 
27 template <typename Object, typename Context>
29 {
30 public:
32  Context * const context,
33  const ParallelStudyMethod method,
34  const unsigned int clicks_per_receive,
35  const Parallel::MessageTag object_buffer_tag);
36 
40  ~ReceiveBuffer() override;
41 
45  bool currentlyReceiving() const { return _requests.size(); }
46 
50  unsigned long int objectsReceived() const { return _objects_received; }
51 
55  unsigned long int buffersReceived() const { return _buffers_received; }
56 
60  unsigned long int numProbes() const { return _num_probes; }
61 
65  unsigned long int objectPoolCreated() const { return _object_buffer_pool.num_created(); }
66 
70  unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
71 
77  void receive(const bool start_receives_only = false);
78 
82  void clear();
83 
87  Real receiveLoopTime() const { return std::chrono::duration<Real>(_receive_loop_time).count(); }
88 
93  {
94  return std::chrono::duration<Real>(_cleanup_requests_time).count();
95  }
96 
100  void cleanupRequests();
101 
106 
107 private:
109  Context * const _context;
112 
116  const unsigned int _clicks_per_receive;
117 
119  std::list<std::pair<std::shared_ptr<Parallel::Request>,
120  std::shared_ptr<std::vector<std::shared_ptr<Object>>>>>
122 
124  const Parallel::MessageTag _object_buffer_tag;
125 
127  std::chrono::steady_clock::duration _receive_loop_time;
129  std::chrono::steady_clock::duration _cleanup_requests_time;
130 
132  unsigned long int _objects_received;
134  unsigned long int _buffers_received;
136  unsigned long int _num_probes;
137 
142  std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
144 
145  template <typename C, typename OutputIter, typename T>
146  inline void blocking_receive_packed_range(const Parallel::Communicator & comm,
147  const processor_id_type src_processor_id,
148  C * context,
149  OutputIter out,
150  const T * /* output_type */,
151  Parallel::Request & req,
152  Parallel::Status & stat,
153  const Parallel::MessageTag & tag) const
154  {
155  libmesh_experimental();
156 
157  typedef typename Parallel::Packing<T>::buffer_type buffer_t;
158 
159  // Receive serialized variable size objects as a sequence of
160  // buffer_t.
161  // Allocate a buffer on the heap so we don't have to free it until
162  // after the Request::wait()
163  std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
164  comm.receive(src_processor_id, *buffer, tag);
165 
166  // Make the Request::wait() handle unpacking the buffer
167  req.add_post_wait_work(
168  new libMesh::Parallel::PostWaitUnpackBuffer<std::vector<buffer_t>, C, OutputIter, T>(
169  *buffer, context, out));
170 
171  // Make the Request::wait() handle deleting the buffer
172  req.add_post_wait_work(
173  new libMesh::Parallel::PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
174  }
175 };
176 
177 template <typename Object, typename Context>
179  Context * const context,
180  const ParallelStudyMethod method,
181  const unsigned int clicks_per_receive,
182  const Parallel::MessageTag object_buffer_tag)
183  : ParallelObject(comm),
184  _context(context),
185  _method(method),
186  _clicks_per_receive(clicks_per_receive),
187  _object_buffer_tag(object_buffer_tag)
188 {
189 }
190 
191 template <typename Object, typename Context>
193 {
194  if (!_requests.empty())
195  mooseError("Some requests not serviced!");
196 }
197 
198 template <typename Object, typename Context>
199 void
200 ReceiveBuffer<Object, Context>::receive(const bool start_receives_only /* = false */)
201 {
202  bool flag = false;
203  Parallel::Status stat;
204 
205  static unsigned int current_clicks = 0;
206 
207  if (current_clicks % _clicks_per_receive == 0)
208  {
209  current_clicks = 0;
210 
211  // Receive and process a bunch of objects
212  do
213  {
214  stat = _communicator.template packed_range_probe<std::shared_ptr<Object>>(
215  Parallel::any_source, _object_buffer_tag, flag);
216 
217  _num_probes++;
218 
219  if (flag)
220  {
221  auto req = std::make_shared<Parallel::Request>();
222  std::shared_ptr<std::vector<std::shared_ptr<Object>>> objects =
223  _object_buffer_pool.acquire();
224 
225  // Make sure the buffer is clear - this shouldn't resize the storage though.
226  objects->clear();
227 
228  if (_method == ParallelStudyMethod::HARM || _method == ParallelStudyMethod::BS)
229  blocking_receive_packed_range(comm(),
230  stat.source(),
231  _context,
232  std::back_inserter(*objects),
233  (std::shared_ptr<Object> *)(libmesh_nullptr),
234  *req,
235  stat,
236  _object_buffer_tag);
237  else
238  {
239  std::shared_ptr<
240  std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
241  buffer = _buffer_pool.acquire();
242 
243  _communicator.nonblocking_receive_packed_range(
244  stat.source(),
245  _context,
246  std::back_inserter(*objects),
247  (std::shared_ptr<Object> *)(libmesh_nullptr),
248  *req,
249  stat,
250  buffer,
251  _object_buffer_tag);
252  }
253 
254  _requests.emplace_back(req, objects);
255  }
256  } while (flag);
257  }
258 
259  current_clicks++;
260 
261  if (!start_receives_only)
262  cleanupRequests();
263 }
264 
265 template <typename Object, typename Context>
266 void
268 {
269  _requests.clear();
270 
271  _receive_loop_time = std::chrono::steady_clock::duration::zero();
272  _cleanup_requests_time = std::chrono::steady_clock::duration::zero();
273 
274  _objects_received = 0;
275  _buffers_received = 0;
276  _num_probes = 0;
277 }
278 
279 template <typename Object, typename Context>
280 void
282 {
283  _requests.remove_if(
284  [&](std::pair<std::shared_ptr<Parallel::Request>,
285  std::shared_ptr<std::vector<std::shared_ptr<Object>>>> & request_pair)
286  {
287  auto req = request_pair.first;
288  auto objects = request_pair.second;
289 
290  if (req->test()) // See if the receive has completed
291  {
292  req->wait(); // MUST call wait() to do post_wait_work which actually fills the object
293  // buffer
294 
295  _buffers_received++;
296  _objects_received += objects->size();
297 
298  if (_buffer.capacity() < _buffer.size() + objects->size())
299  _buffer.setCapacity(_buffer.size() + objects->size());
300 
301  for (auto & object : *objects)
302  _buffer.move(object);
303 
304  objects->clear();
305 
306  return true;
307  }
308  else
309  return false;
310  });
311 }
size_t num_created() const
ReceiveBuffer(const libMesh::Parallel::Communicator &comm, Context *const context, const ParallelStudyMethod method, const unsigned int clicks_per_receive, const Parallel::MessageTag object_buffer_tag)
void cleanupRequests()
Checks to see if any Requests can be finished.
unsigned long int numProbes() const
The total number of times we&#39;ve polled for messages.
Definition: ReceiveBuffer.h:60
Context *const _context
The context.
unsigned long int objectsReceived() const
The work received since the last reset.
Definition: ReceiveBuffer.h:50
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
unsigned long int bufferPoolCreated() const
Number of buffers created in the buffer pool.
Definition: ReceiveBuffer.h:70
void mooseError(Args &&... args)
unsigned long int buffersReceived() const
The number of buffers received since the last reset.
Definition: ReceiveBuffer.h:55
const Parallel::Communicator & comm() const
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > & buffer()
Gets the buffer that the received objects are filled into after the requests are finished.
bool currentlyReceiving() const
Whether or not there are messages that are being currently received.
Definition: ReceiveBuffer.h:45
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.
const ParallelStudyMethod _method
The method.
void clear()
Clear all existing data.
std::chrono::steady_clock::duration _receive_loop_time
Receive loop time.
uint8_t processor_id_type
unsigned long int _buffers_received
Total object buffers received.
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
unsigned long int objectPoolCreated() const
Number of buffers created in the object buffer pool.
Definition: ReceiveBuffer.h:65
unsigned long int _num_probes
Total number of times we&#39;ve polled for messages.
Context
~ReceiveBuffer() override
Destructor: ensures that all send requests have completed.
ParallelStudyMethod
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
unsigned long int _objects_received
Total objects received.
OStreamProxy out
void blocking_receive_packed_range(const Parallel::Communicator &comm, const processor_id_type src_processor_id, C *context, OutputIter out, const T *, Parallel::Request &req, Parallel::Status &stat, const Parallel::MessageTag &tag) const
std::chrono::steady_clock::duration _cleanup_requests_time
Time cleaning up requests.
Real cleanupRequestsTime() const
Amount of time (in seconds) spent finishing receives and collecting objects.
Definition: ReceiveBuffer.h:92
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.
Real receiveLoopTime() const
Amount of time (in seconds) spent in the loop checking for messages and creating Receives.
Definition: ReceiveBuffer.h:87
void receive(const bool start_receives_only=false)
Start receives for all currently available messages.
MooseUtils::SharedPool< std::vector< std::shared_ptr< Object > > > _object_buffer_pool
Shared pool of object buffers for incoming messages.
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > _buffer
The buffer that finished requests are filled into.
static const std::string C
Definition: NS.h:168