https://mooseframework.inl.gov
SendBuffer.h
Go to the documentation of this file.
1 //* This file is part of the MOOSE framework
2 //* https://mooseframework.inl.gov
3 //*
4 //* All rights reserved, see COPYRIGHT for full restrictions
5 //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 //*
7 //* Licensed under LGPL 2.1, please see LICENSE for details
8 //* https://www.gnu.org/licenses/lgpl-2.1.html
9 
10 #pragma once
11 
12 // Moose Includes
13 #include "Moose.h"
14 #include "SharedPool.h"
15 
16 // libMesh Includes
17 #include "libmesh/parallel_object.h"
18 
19 // Local includes
20 #include "ParallelStudyMethod.h"
21 
22 // System Includes
23 #include <list>
24 
25 template <typename Object, typename Context>
27 {
28 public:
30  const Context * const context,
31  const processor_id_type pid,
32  const ParallelStudyMethod & method,
33  const unsigned int min_buffer_size,
34  const unsigned int max_buffer_size,
35  const Real buffer_growth_multiplier,
36  const Real buffer_shrink_multiplier,
37  const Parallel::MessageTag object_buffer_tag);
38 
42  ~SendBuffer() override;
43 
47  unsigned long int objectsSent() const { return _objects_sent; }
51  unsigned long int buffersSent() const { return _buffers_sent; }
55  unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
56 
60  bool currentlySending() const { return _requests.size(); }
64  bool currentlyBuffered() const { return _buffer.size(); }
65 
71  void moveObject(std::shared_ptr<Object> & object);
72 
76  void forceSend(const bool shrink_current_buffer_size = true);
77 
81  void waitAll();
82 
86  void clear();
87 
91  void cleanupRequests();
92 
93 private:
95  const Context * const _context;
98 
100 
102  const unsigned int _min_buffer_size;
104  const unsigned int _max_buffer_size;
105 
110 
112  const Parallel::MessageTag _object_buffer_tag;
113 
115  unsigned int _current_buffer_size;
118 
120  std::vector<std::shared_ptr<Object>> _buffer;
122  std::size_t _buffer_size_bytes;
123 
125  std::list<std::shared_ptr<Parallel::Request>> _requests;
126 
129  std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
131 
133  unsigned long int _objects_sent;
135  unsigned long int _buffers_sent;
136 };
137 
138 template <typename Object, typename Context>
140  const Context * const context,
141  const processor_id_type pid,
142  const ParallelStudyMethod & method,
143  const unsigned int min_buffer_size,
144  const unsigned int max_buffer_size,
145  const Real buffer_growth_multiplier,
146  const Real buffer_shrink_multiplier,
147  const Parallel::MessageTag object_buffer_tag)
148  : ParallelObject(comm),
149  _context(context),
150  _pid(pid),
151  _method(method),
152  _min_buffer_size(min_buffer_size),
153  _max_buffer_size(max_buffer_size),
154  _buffer_growth_multiplier(buffer_growth_multiplier),
155  _buffer_shrink_multiplier(buffer_shrink_multiplier),
156  _object_buffer_tag(object_buffer_tag),
157  _current_buffer_size(_min_buffer_size),
158  _current_buffer_size_real(_min_buffer_size),
159  _buffer_size_bytes(0),
160  _objects_sent(0),
161  _buffers_sent(0)
162 {
163  _buffer.reserve(max_buffer_size);
164 }
165 
166 template <typename Object, typename Context>
168 {
169  waitAll();
170  cleanupRequests();
171 }
172 
173 template <typename Object, typename Context>
174 void
175 SendBuffer<Object, Context>::moveObject(std::shared_ptr<Object> & object)
176 {
177  _buffer_size_bytes +=
178  Parallel::Packing<std::shared_ptr<Object>>::packable_size(object, _context) *
179  sizeof(typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type);
180  _buffer.emplace_back(std::move(object));
181 
182  // Force a send with SMART if we find it appropriate
183  if ((_method == ParallelStudyMethod::SMART &&
184  (_buffer.size() >= _current_buffer_size || _buffer.size() == _max_buffer_size)) ||
185  (_buffer_size_bytes > 1048576)) // 1 MB
186  {
187  _current_buffer_size_real =
188  std::min(_buffer_growth_multiplier * _current_buffer_size_real, (Real)_max_buffer_size);
189 
190  if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
191  _current_buffer_size = _current_buffer_size_real;
192 
193  forceSend(false);
194  }
195 }
196 
197 template <typename Object, typename Context>
198 void
199 SendBuffer<Object, Context>::forceSend(const bool shrink_current_buffer_size)
200 {
201  if (!_buffer.empty())
202  {
203  auto req = std::make_shared<Parallel::Request>();
204 
205  _requests.push_back(req);
206 
207  _buffer_size_bytes = 0;
208  _objects_sent += _buffer.size();
209  _buffers_sent++;
210 
211  std::shared_ptr<std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
212  buffer = _buffer_pool.acquire();
213 
214  comm().nonblocking_send_packed_range(
215  _pid, _context, _buffer.begin(), _buffer.end(), *req, buffer, _object_buffer_tag);
216 
217  _buffer.clear();
218  _buffer.reserve(_max_buffer_size);
219 
220  if (_method == ParallelStudyMethod::SMART && shrink_current_buffer_size)
221  {
222  _current_buffer_size_real =
223  std::max((Real)_min_buffer_size, _current_buffer_size_real * _buffer_shrink_multiplier);
224 
225  if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
226  _current_buffer_size = _current_buffer_size_real;
227  }
228  }
229 
230  cleanupRequests();
231 }
232 
233 template <typename Object, typename Context>
234 void
236 {
237  _buffer.clear();
238  _requests.clear();
239 
240  _objects_sent = 0;
241  _buffers_sent = 0;
242 }
243 
244 template <typename Object, typename Context>
245 void
247 {
248  _requests.remove_if(
249  [](std::shared_ptr<Parallel::Request> & req)
250  {
251  if (req->test())
252  {
253  req->wait(); // MUST call wait() to do post_wait_work
254  return true;
255  }
256  else
257  return false;
258  });
259 }
260 
261 template <typename Object, typename Context>
262 void
264 {
265  std::for_each(_requests.begin(), _requests.end(), [](auto & request) { request->wait(); });
266 }
unsigned long int buffersSent() const
Get the number of buffers sent from this buffer.
Definition: SendBuffer.h:51
size_t num_created() const
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
Definition: SendBuffer.h:112
MPI_Request request
unsigned long int _objects_sent
Counter for objects sent.
Definition: SendBuffer.h:133
SendBuffer(const libMesh::Parallel::Communicator &comm, const Context *const context, const processor_id_type pid, const ParallelStudyMethod &method, const unsigned int min_buffer_size, const unsigned int max_buffer_size, const Real buffer_growth_multiplier, const Real buffer_shrink_multiplier, const Parallel::MessageTag object_buffer_tag)
Definition: SendBuffer.h:139
const Context *const _context
The context.
Definition: SendBuffer.h:95
unsigned long int bufferPoolCreated() const
Get the number of buffers created in the buffer pool.
Definition: SendBuffer.h:55
unsigned int _current_buffer_size
Current size of the buffer (in objects)
Definition: SendBuffer.h:115
const unsigned int _max_buffer_size
Maximum size of the buffer (in objects)
Definition: SendBuffer.h:104
bool currentlySending() const
Whether or not messages are currently being sent.
Definition: SendBuffer.h:60
const Parallel::Communicator & comm() const
const Real _buffer_shrink_multiplier
Multiplier for the buffer size for shrinking the buffer.
Definition: SendBuffer.h:109
std::list< std::shared_ptr< Parallel::Request > > _requests
List of Requests.
Definition: SendBuffer.h:125
Real _current_buffer_size_real
Running buffer size.
Definition: SendBuffer.h:117
uint8_t processor_id_type
void forceSend(const bool shrink_current_buffer_size=true)
Forces a Send for all currently buffered objects.
Definition: SendBuffer.h:199
void waitAll()
Wait for all requests to finish.
Definition: SendBuffer.h:263
Context
void moveObject(std::shared_ptr< Object > &object)
Move an object to the buffer.
Definition: SendBuffer.h:175
bool currentlyBuffered() const
Whether or not objects are currently waiting to be sent.
Definition: SendBuffer.h:64
const Real _buffer_growth_multiplier
Multiplier for the buffer size for growing the buffer.
Definition: SendBuffer.h:107
const unsigned int _min_buffer_size
Minimum size of the buffer (in objects)
Definition: SendBuffer.h:102
const ParallelStudyMethod & _method
Definition: SendBuffer.h:99
ParallelStudyMethod
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
std::size_t _buffer_size_bytes
The size of the objects in the buffer in bytes.
Definition: SendBuffer.h:122
std::vector< std::shared_ptr< Object > > _buffer
The buffer.
Definition: SendBuffer.h:120
void cleanupRequests()
Checks to see if any Requests can be finished.
Definition: SendBuffer.h:246
unsigned long int objectsSent() const
Get the number of objects sent from this buffer.
Definition: SendBuffer.h:47
const processor_id_type _pid
The processor ID this buffer will send to.
Definition: SendBuffer.h:97
unsigned long int _buffers_sent
Counter for buffers sent.
Definition: SendBuffer.h:135
void clear()
Clear all existing data.
Definition: SendBuffer.h:235
~SendBuffer() override
Destructor: ensures that all send requests have completed.
Definition: SendBuffer.h:167
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.
Definition: SendBuffer.h:130