https://mooseframework.inl.gov
Public Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
ReceiveBuffer< Object, Context > Class Template Reference

#include <ReceiveBuffer.h>

Inheritance diagram for ReceiveBuffer< Object, Context >:
[legend]

Public Member Functions

 ReceiveBuffer (const libMesh::Parallel::Communicator &comm, Context *const context, const ParallelStudyMethod method, const unsigned int clicks_per_receive, const Parallel::MessageTag object_buffer_tag)
 
 ~ReceiveBuffer () override
 Destructor: ensures that all send requests have completed. More...
 
bool currentlyReceiving () const
 Whether or not there are messages that are being currently received. More...
 
unsigned long int objectsReceived () const
 The work received since the last reset. More...
 
unsigned long int buffersReceived () const
 The number of buffers received since the last reset. More...
 
unsigned long int numProbes () const
 The total number of times we've polled for messages. More...
 
unsigned long int objectPoolCreated () const
 Number of buffers created in the object buffer pool. More...
 
unsigned long int bufferPoolCreated () const
 Number of buffers created in the buffer pool. More...
 
void receive (const bool start_receives_only=false)
 Start receives for all currently available messages. More...
 
void clear ()
 Clear all existing data. More...
 
Real receiveLoopTime () const
 Amount of time (in seconds) spent in the loop checking for messages and creating Receives. More...
 
Real cleanupRequestsTime () const
 Amount of time (in seconds) spent finishing receives and collecting objects. More...
 
void cleanupRequests ()
 Checks to see if any Requests can be finished. More...
 
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > & buffer ()
 Gets the buffer that the received objects are filled into after the requests are finished. More...
 
const Parallel::Communicator & comm () const
 
processor_id_type n_processors () const
 
processor_id_type processor_id () const
 

Protected Attributes

const Parallel::Communicator & _communicator
 

Private Member Functions

template<typename C , typename OutputIter , typename T >
void blocking_receive_packed_range (const Parallel::Communicator &comm, const processor_id_type src_processor_id, C *context, OutputIter out, const T *, Parallel::Request &req, Parallel::Status &stat, const Parallel::MessageTag &tag) const
 

Private Attributes

Context *const _context
 The context. More...
 
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > _buffer
 The buffer that finished requests are filled into. More...
 
const ParallelStudyMethod _method
 The method. More...
 
const unsigned int _clicks_per_receive
 Iterations to wait before checking for new objects. More...
 
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
 List of Requests and buffers for each request. More...
 
const Parallel::MessageTag _object_buffer_tag
 MessageTag for sending objects. More...
 
std::chrono::steady_clock::duration _receive_loop_time
 Receive loop time. More...
 
std::chrono::steady_clock::duration _cleanup_requests_time
 Time cleaning up requests. More...
 
unsigned long int _objects_received
 Total objects received. More...
 
unsigned long int _buffers_received
 Total object buffers received. More...
 
unsigned long int _num_probes
 Total number of times we've polled for messages. More...
 
MooseUtils::SharedPool< std::vector< std::shared_ptr< Object > > > _object_buffer_pool
 Shared pool of object buffers for incoming messages. More...
 
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
 Shared pool of buffers. More...
 

Detailed Description

template<typename Object, typename Context>
class ReceiveBuffer< Object, Context >

Definition at line 28 of file ReceiveBuffer.h.

Constructor & Destructor Documentation

◆ ReceiveBuffer()

template<typename Object , typename Context >
ReceiveBuffer< Object, Context >::ReceiveBuffer ( const libMesh::Parallel::Communicator comm,
Context *const  context,
const ParallelStudyMethod  method,
const unsigned int  clicks_per_receive,
const Parallel::MessageTag  object_buffer_tag 
)

Definition at line 178 of file ReceiveBuffer.h.

183  : ParallelObject(comm),
184  _context(context),
185  _method(method),
186  _clicks_per_receive(clicks_per_receive),
187  _object_buffer_tag(object_buffer_tag)
188 {
189 }
ParallelObject(const Parallel::Communicator &comm_in)
Context *const _context
The context.
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
const Parallel::Communicator & comm() const
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
const ParallelStudyMethod _method
The method.

◆ ~ReceiveBuffer()

template<typename Object , typename Context >
ReceiveBuffer< Object, Context >::~ReceiveBuffer ( )
override

Destructor: ensures that all send requests have completed.

Definition at line 192 of file ReceiveBuffer.h.

193 {
194  if (!_requests.empty())
195  mooseError("Some requests not serviced!");
196 }
void mooseError(Args &&... args)
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.

Member Function Documentation

◆ blocking_receive_packed_range()

template<typename Object , typename Context >
template<typename C , typename OutputIter , typename T >
void ReceiveBuffer< Object, Context >::blocking_receive_packed_range ( const Parallel::Communicator &  comm,
const processor_id_type  src_processor_id,
C *  context,
OutputIter  out,
const T *  ,
Parallel::Request &  req,
Parallel::Status &  stat,
const Parallel::MessageTag &  tag 
) const
inlineprivate

Definition at line 146 of file ReceiveBuffer.h.

154  {
155  libmesh_experimental();
156 
157  typedef typename Parallel::Packing<T>::buffer_type buffer_t;
158 
159  // Receive serialized variable size objects as a sequence of
160  // buffer_t.
161  // Allocate a buffer on the heap so we don't have to free it until
162  // after the Request::wait()
163  std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
164  comm.receive(src_processor_id, *buffer, tag);
165 
166  // Make the Request::wait() handle unpacking the buffer
167  req.add_post_wait_work(
168  new libMesh::Parallel::PostWaitUnpackBuffer<std::vector<buffer_t>, C, OutputIter, T>(
169  *buffer, context, out));
170 
171  // Make the Request::wait() handle deleting the buffer
172  req.add_post_wait_work(
173  new libMesh::Parallel::PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
174  }
const Parallel::Communicator & comm() const
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > & buffer()
Gets the buffer that the received objects are filled into after the requests are finished.
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
static const std::string C
Definition: NS.h:168

◆ buffer()

template<typename Object , typename Context >
MooseUtils::LIFOBuffer<std::shared_ptr<Object> >& ReceiveBuffer< Object, Context >::buffer ( )
inline

Gets the buffer that the received objects are filled into after the requests are finished.

Definition at line 105 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::blocking_receive_packed_range().

105 { return _buffer; }
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > _buffer
The buffer that finished requests are filled into.

◆ bufferPoolCreated()

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::bufferPoolCreated ( ) const
inline

Number of buffers created in the buffer pool.

Definition at line 70 of file ReceiveBuffer.h.

70 { return _buffer_pool.num_created(); }
size_t num_created() const
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.

◆ buffersReceived()

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::buffersReceived ( ) const
inline

The number of buffers received since the last reset.

Definition at line 55 of file ReceiveBuffer.h.

55 { return _buffers_received; }
unsigned long int _buffers_received
Total object buffers received.

◆ cleanupRequests()

template<typename Object , typename Context >
void ReceiveBuffer< Object, Context >::cleanupRequests ( )

Checks to see if any Requests can be finished.

Definition at line 281 of file ReceiveBuffer.h.

282 {
283  _requests.remove_if(
284  [&](std::pair<std::shared_ptr<Parallel::Request>,
285  std::shared_ptr<std::vector<std::shared_ptr<Object>>>> & request_pair)
286  {
287  auto req = request_pair.first;
288  auto objects = request_pair.second;
289 
290  if (req->test()) // See if the receive has completed
291  {
292  req->wait(); // MUST call wait() to do post_wait_work which actually fills the object
293  // buffer
294 
296  _objects_received += objects->size();
297 
298  if (_buffer.capacity() < _buffer.size() + objects->size())
299  _buffer.setCapacity(_buffer.size() + objects->size());
300 
301  for (auto & object : *objects)
302  _buffer.move(object);
303 
304  objects->clear();
305 
306  return true;
307  }
308  else
309  return false;
310  });
311 }
std::size_t size() const
std::size_t capacity() const
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.
void setCapacity(const std::size_t capacity)
unsigned long int _buffers_received
Total object buffers received.
void move(T &value)
unsigned long int _objects_received
Total objects received.
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > _buffer
The buffer that finished requests are filled into.

◆ cleanupRequestsTime()

template<typename Object , typename Context >
Real ReceiveBuffer< Object, Context >::cleanupRequestsTime ( ) const
inline

Amount of time (in seconds) spent finishing receives and collecting objects.

Definition at line 92 of file ReceiveBuffer.h.

93  {
94  return std::chrono::duration<Real>(_cleanup_requests_time).count();
95  }
std::chrono::steady_clock::duration _cleanup_requests_time
Time cleaning up requests.

◆ clear()

template<typename Object , typename Context >
void ReceiveBuffer< Object, Context >::clear ( )

Clear all existing data.

Definition at line 267 of file ReceiveBuffer.h.

268 {
269  _requests.clear();
270 
271  _receive_loop_time = std::chrono::steady_clock::duration::zero();
272  _cleanup_requests_time = std::chrono::steady_clock::duration::zero();
273 
274  _objects_received = 0;
275  _buffers_received = 0;
276  _num_probes = 0;
277 }
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.
std::chrono::steady_clock::duration _receive_loop_time
Receive loop time.
unsigned long int _buffers_received
Total object buffers received.
unsigned long int _num_probes
Total number of times we&#39;ve polled for messages.
unsigned long int _objects_received
Total objects received.
std::chrono::steady_clock::duration _cleanup_requests_time
Time cleaning up requests.

◆ currentlyReceiving()

template<typename Object , typename Context >
bool ReceiveBuffer< Object, Context >::currentlyReceiving ( ) const
inline

Whether or not there are messages that are being currently received.

Definition at line 45 of file ReceiveBuffer.h.

45 { return _requests.size(); }
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.

◆ numProbes()

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::numProbes ( ) const
inline

The total number of times we've polled for messages.

Definition at line 60 of file ReceiveBuffer.h.

60 { return _num_probes; }
unsigned long int _num_probes
Total number of times we&#39;ve polled for messages.

◆ objectPoolCreated()

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::objectPoolCreated ( ) const
inline

Number of buffers created in the object buffer pool.

Definition at line 65 of file ReceiveBuffer.h.

65 { return _object_buffer_pool.num_created(); }
size_t num_created() const
MooseUtils::SharedPool< std::vector< std::shared_ptr< Object > > > _object_buffer_pool
Shared pool of object buffers for incoming messages.

◆ objectsReceived()

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::objectsReceived ( ) const
inline

The work received since the last reset.

Definition at line 50 of file ReceiveBuffer.h.

50 { return _objects_received; }
unsigned long int _objects_received
Total objects received.

◆ receive()

template<typename Object , typename Context >
void ReceiveBuffer< Object, Context >::receive ( const bool  start_receives_only = false)

Start receives for all currently available messages.

Adds the to the working buffer

Definition at line 200 of file ReceiveBuffer.h.

201 {
202  bool flag = false;
203  Parallel::Status stat;
204 
205  static unsigned int current_clicks = 0;
206 
207  if (current_clicks % _clicks_per_receive == 0)
208  {
209  current_clicks = 0;
210 
211  // Receive and process a bunch of objects
212  do
213  {
214  stat = _communicator.template packed_range_probe<std::shared_ptr<Object>>(
215  Parallel::any_source, _object_buffer_tag, flag);
216 
217  _num_probes++;
218 
219  if (flag)
220  {
221  auto req = std::make_shared<Parallel::Request>();
222  std::shared_ptr<std::vector<std::shared_ptr<Object>>> objects =
224 
225  // Make sure the buffer is clear - this shouldn't resize the storage though.
226  objects->clear();
227 
230  stat.source(),
231  _context,
232  std::back_inserter(*objects),
233  (std::shared_ptr<Object> *)(libmesh_nullptr),
234  *req,
235  stat,
237  else
238  {
239  std::shared_ptr<
240  std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
242 
244  stat.source(),
245  _context,
246  std::back_inserter(*objects),
247  (std::shared_ptr<Object> *)(libmesh_nullptr),
248  *req,
249  stat,
250  buffer,
252  }
253 
254  _requests.emplace_back(req, objects);
255  }
256  } while (flag);
257  }
258 
259  current_clicks++;
260 
261  if (!start_receives_only)
262  cleanupRequests();
263 }
void cleanupRequests()
Checks to see if any Requests can be finished.
Context *const _context
The context.
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
const Parallel::Communicator & comm() const
MooseUtils::LIFOBuffer< std::shared_ptr< Object > > & buffer()
Gets the buffer that the received objects are filled into after the requests are finished.
const Parallel::Communicator & _communicator
const unsigned int _clicks_per_receive
Iterations to wait before checking for new objects.
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
std::list< std::pair< std::shared_ptr< Parallel::Request >, std::shared_ptr< std::vector< std::shared_ptr< Object > > > > > _requests
List of Requests and buffers for each request.
const ParallelStudyMethod _method
The method.
unsigned long int _num_probes
Total number of times we&#39;ve polled for messages.
void blocking_receive_packed_range(const Parallel::Communicator &comm, const processor_id_type src_processor_id, C *context, OutputIter out, const T *, Parallel::Request &req, Parallel::Status &stat, const Parallel::MessageTag &tag) const
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.
MooseUtils::SharedPool< std::vector< std::shared_ptr< Object > > > _object_buffer_pool
Shared pool of object buffers for incoming messages.
PtrType acquire(Args &&... args)

◆ receiveLoopTime()

template<typename Object , typename Context >
Real ReceiveBuffer< Object, Context >::receiveLoopTime ( ) const
inline

Amount of time (in seconds) spent in the loop checking for messages and creating Receives.

Definition at line 87 of file ReceiveBuffer.h.

87 { return std::chrono::duration<Real>(_receive_loop_time).count(); }
std::chrono::steady_clock::duration _receive_loop_time
Receive loop time.

Member Data Documentation

◆ _buffer

template<typename Object , typename Context >
MooseUtils::LIFOBuffer<std::shared_ptr<Object> > ReceiveBuffer< Object, Context >::_buffer
private

The buffer that finished requests are filled into.

Definition at line 111 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::buffer().

◆ _buffer_pool

template<typename Object , typename Context >
MooseUtils::SharedPool< std::vector<typename Parallel::Packing<std::shared_ptr<Object> >::buffer_type> > ReceiveBuffer< Object, Context >::_buffer_pool
private

Shared pool of buffers.

Definition at line 143 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::bufferPoolCreated().

◆ _buffers_received

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::_buffers_received
private

Total object buffers received.

Definition at line 134 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::buffersReceived().

◆ _cleanup_requests_time

template<typename Object , typename Context >
std::chrono::steady_clock::duration ReceiveBuffer< Object, Context >::_cleanup_requests_time
private

Time cleaning up requests.

Definition at line 129 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::cleanupRequestsTime().

◆ _clicks_per_receive

template<typename Object , typename Context >
const unsigned int ReceiveBuffer< Object, Context >::_clicks_per_receive
private

Iterations to wait before checking for new objects.

Definition at line 116 of file ReceiveBuffer.h.

◆ _context

template<typename Object , typename Context >
Context* const ReceiveBuffer< Object, Context >::_context
private

The context.

Definition at line 109 of file ReceiveBuffer.h.

◆ _method

template<typename Object , typename Context >
const ParallelStudyMethod ReceiveBuffer< Object, Context >::_method
private

The method.

Definition at line 114 of file ReceiveBuffer.h.

◆ _num_probes

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::_num_probes
private

Total number of times we've polled for messages.

Definition at line 136 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::numProbes().

◆ _object_buffer_pool

template<typename Object , typename Context >
MooseUtils::SharedPool<std::vector<std::shared_ptr<Object> > > ReceiveBuffer< Object, Context >::_object_buffer_pool
private

Shared pool of object buffers for incoming messages.

Definition at line 139 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::objectPoolCreated().

◆ _object_buffer_tag

template<typename Object , typename Context >
const Parallel::MessageTag ReceiveBuffer< Object, Context >::_object_buffer_tag
private

MessageTag for sending objects.

Definition at line 124 of file ReceiveBuffer.h.

◆ _objects_received

template<typename Object , typename Context >
unsigned long int ReceiveBuffer< Object, Context >::_objects_received
private

Total objects received.

Definition at line 132 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::objectsReceived().

◆ _receive_loop_time

template<typename Object , typename Context >
std::chrono::steady_clock::duration ReceiveBuffer< Object, Context >::_receive_loop_time
private

Receive loop time.

Definition at line 127 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::receiveLoopTime().

◆ _requests

template<typename Object , typename Context >
std::list<std::pair<std::shared_ptr<Parallel::Request>, std::shared_ptr<std::vector<std::shared_ptr<Object> > > > > ReceiveBuffer< Object, Context >::_requests
private

List of Requests and buffers for each request.

Definition at line 121 of file ReceiveBuffer.h.

Referenced by ReceiveBuffer< Object, Context >::currentlyReceiving().


The documentation for this class was generated from the following file: