17 #include "libmesh/parallel_object.h" 25 template <
typename Object,
typename Context>
33 const unsigned int min_buffer_size,
34 const unsigned int max_buffer_size,
35 const Real buffer_growth_multiplier,
36 const Real buffer_shrink_multiplier,
37 const Parallel::MessageTag object_buffer_tag);
71 void moveObject(std::shared_ptr<Object> &
object);
76 void forceSend(
const bool shrink_current_buffer_size =
true);
125 std::list<std::shared_ptr<Parallel::Request>>
_requests;
129 std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
138 template <
typename Object,
typename Context>
143 const unsigned int min_buffer_size,
144 const unsigned int max_buffer_size,
145 const Real buffer_growth_multiplier,
146 const Real buffer_shrink_multiplier,
147 const Parallel::MessageTag object_buffer_tag)
148 : ParallelObject(comm),
152 _min_buffer_size(min_buffer_size),
153 _max_buffer_size(max_buffer_size),
154 _buffer_growth_multiplier(buffer_growth_multiplier),
155 _buffer_shrink_multiplier(buffer_shrink_multiplier),
156 _object_buffer_tag(object_buffer_tag),
157 _current_buffer_size(_min_buffer_size),
158 _current_buffer_size_real(_min_buffer_size),
159 _buffer_size_bytes(0),
163 _buffer.reserve(max_buffer_size);
166 template <
typename Object,
typename Context>
173 template <
typename Object,
typename Context>
177 _buffer_size_bytes +=
178 Parallel::Packing<std::shared_ptr<Object>>::packable_size(
object, _context) *
179 sizeof(
typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type);
180 _buffer.emplace_back(std::move(
object));
184 (_buffer.size() >= _current_buffer_size || _buffer.size() == _max_buffer_size)) ||
185 (_buffer_size_bytes > 1048576))
187 _current_buffer_size_real =
188 std::min(_buffer_growth_multiplier * _current_buffer_size_real, (
Real)_max_buffer_size);
190 if (_current_buffer_size != (
unsigned int)_current_buffer_size_real)
191 _current_buffer_size = _current_buffer_size_real;
197 template <
typename Object,
typename Context>
201 if (!_buffer.empty())
203 auto req = std::make_shared<Parallel::Request>();
205 _requests.push_back(req);
207 _buffer_size_bytes = 0;
208 _objects_sent += _buffer.size();
211 std::shared_ptr<std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
212 buffer = _buffer_pool.acquire();
214 comm().nonblocking_send_packed_range(
215 _pid, _context, _buffer.begin(), _buffer.end(), *req, buffer, _object_buffer_tag);
218 _buffer.reserve(_max_buffer_size);
222 _current_buffer_size_real =
223 std::max((
Real)_min_buffer_size, _current_buffer_size_real * _buffer_shrink_multiplier);
225 if (_current_buffer_size != (
unsigned int)_current_buffer_size_real)
226 _current_buffer_size = _current_buffer_size_real;
233 template <
typename Object,
typename Context>
244 template <
typename Object,
typename Context>
249 [](std::shared_ptr<Parallel::Request> & req)
261 template <
typename Object,
typename Context>
265 std::for_each(_requests.begin(), _requests.end(), [](
auto &
request) {
request->wait(); });
unsigned long int buffersSent() const
Get the number of buffers sent from this buffer.
size_t num_created() const
const Parallel::MessageTag _object_buffer_tag
MessageTag for sending objects.
unsigned long int _objects_sent
Counter for objects sent.
SendBuffer(const libMesh::Parallel::Communicator &comm, const Context *const context, const processor_id_type pid, const ParallelStudyMethod &method, const unsigned int min_buffer_size, const unsigned int max_buffer_size, const Real buffer_growth_multiplier, const Real buffer_shrink_multiplier, const Parallel::MessageTag object_buffer_tag)
const Context *const _context
The context.
unsigned long int bufferPoolCreated() const
Get the number of buffers created in the buffer pool.
unsigned int _current_buffer_size
Current size of the buffer (in objects)
const unsigned int _max_buffer_size
Maximum size of the buffer (in objects)
bool currentlySending() const
Whether or not messages are currently being sent.
const Parallel::Communicator & comm() const
const Real _buffer_shrink_multiplier
Multiplier for the buffer size for shrinking the buffer.
std::list< std::shared_ptr< Parallel::Request > > _requests
List of Requests.
Real _current_buffer_size_real
Running buffer size.
uint8_t processor_id_type
void forceSend(const bool shrink_current_buffer_size=true)
Forces a Send for all currently buffered objects.
void waitAll()
Wait for all requests to finish.
void moveObject(std::shared_ptr< Object > &object)
Move an object to the buffer.
bool currentlyBuffered() const
Whether or not objects are currently waiting to be sent.
const Real _buffer_growth_multiplier
Multiplier for the buffer size for growing the buffer.
const unsigned int _min_buffer_size
Minimum size of the buffer (in objects)
const ParallelStudyMethod & _method
DIE A HORRIBLE DEATH HERE typedef LIBMESH_DEFAULT_SCALAR_TYPE Real
std::size_t _buffer_size_bytes
The size of the objects in the buffer in bytes.
std::vector< std::shared_ptr< Object > > _buffer
The buffer.
void cleanupRequests()
Checks to see if any Requests can be finished.
unsigned long int objectsSent() const
Get the number of objects sent from this buffer.
const processor_id_type _pid
The processor ID this buffer will send to.
unsigned long int _buffers_sent
Counter for buffers sent.
void clear()
Clear all existing data.
~SendBuffer() override
Destructor: ensures that all send requests have completed.
MooseUtils::SharedPool< std::vector< typename Parallel::Packing< std::shared_ptr< Object > >::buffer_type > > _buffer_pool
Shared pool of buffers.