Line data Source code
1 : //* This file is part of the MOOSE framework
2 : //* https://mooseframework.inl.gov
3 : //*
4 : //* All rights reserved, see COPYRIGHT for full restrictions
5 : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 : //*
7 : //* Licensed under LGPL 2.1, please see LICENSE for details
8 : //* https://www.gnu.org/licenses/lgpl-2.1.html
9 :
10 : #pragma once
11 :
12 : // Moose Includes
13 : #include "Moose.h"
14 : #include "SharedPool.h"
15 :
16 : // libMesh Includes
17 : #include "libmesh/parallel_object.h"
18 :
19 : // Local includes
20 : #include "ParallelStudyMethod.h"
21 :
22 : // System Includes
23 : #include <list>
24 :
25 : template <typename Object, typename Context>
26 : class SendBuffer : public libMesh::ParallelObject
27 : {
28 : public:
29 : SendBuffer(const libMesh::Parallel::Communicator & comm,
30 : const Context * const context,
31 : const processor_id_type pid,
32 : const ParallelStudyMethod & method,
33 : const unsigned int min_buffer_size,
34 : const unsigned int max_buffer_size,
35 : const Real buffer_growth_multiplier,
36 : const Real buffer_shrink_multiplier,
37 : const Parallel::MessageTag object_buffer_tag);
38 :
39 : /**
40 : * Destructor: ensures that all send requests have completed
41 : */
42 : ~SendBuffer() override;
43 :
44 : /**
45 : * Get the number of objects sent from this buffer
46 : */
47 10 : unsigned long int objectsSent() const { return _objects_sent; }
48 : /**
49 : * Get the number of buffers sent from this buffer
50 : */
51 10 : unsigned long int buffersSent() const { return _buffers_sent; }
52 : /**
53 : * Get the number of buffers created in the buffer pool
54 : */
55 : unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
56 :
57 : /**
58 : * Whether or not messages are currently being sent
59 : */
60 : bool currentlySending() const { return _requests.size(); }
61 : /**
62 : * Whether or not objects are currently waiting to be sent
63 : */
64 : bool currentlyBuffered() const { return _buffer.size(); }
65 :
66 : /**
67 : * Move an object to the buffer. May cause the buffer to be communicated.
68 : *
69 : * This DOES call std::move on the object
70 : */
71 : void moveObject(std::shared_ptr<Object> & object);
72 :
73 : /**
74 : * Forces a Send for all currently buffered objects
75 : */
76 : void forceSend(const bool shrink_current_buffer_size = true);
77 :
78 : /**
79 : * Wait for all requests to finish
80 : */
81 : void waitAll();
82 :
83 : /**
84 : * Clear all existing data
85 : */
86 : void clear();
87 :
88 : /**
89 : * Checks to see if any Requests can be finished
90 : */
91 : void cleanupRequests();
92 :
93 : private:
94 : /// The context
95 : const Context * const _context;
96 : /// The processor ID this buffer will send to
97 : const processor_id_type _pid;
98 :
99 : const ParallelStudyMethod & _method;
100 :
101 : /// Minimum size of the buffer (in objects)
102 : const unsigned int _min_buffer_size;
103 : /// Maximum size of the buffer (in objects)
104 : const unsigned int _max_buffer_size;
105 :
106 : /// Multiplier for the buffer size for growing the buffer
107 : const Real _buffer_growth_multiplier;
108 : /// Multiplier for the buffer size for shrinking the buffer
109 : const Real _buffer_shrink_multiplier;
110 :
111 : /// MessageTag for sending objects
112 : const Parallel::MessageTag _object_buffer_tag;
113 :
114 : /// Current size of the buffer (in objects)
115 : unsigned int _current_buffer_size;
116 : /// Running buffer size
117 : Real _current_buffer_size_real;
118 :
119 : /// The buffer
120 : std::vector<std::shared_ptr<Object>> _buffer;
121 : /// The size of the objects in the buffer in bytes
122 : std::size_t _buffer_size_bytes;
123 :
124 : /// List of Requests
125 : std::list<std::shared_ptr<Parallel::Request>> _requests;
126 :
127 : /// Shared pool of buffers
128 : MooseUtils::SharedPool<
129 : std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
130 : _buffer_pool;
131 :
132 : /// Counter for objects sent
133 : unsigned long int _objects_sent;
134 : /// Counter for buffers sent
135 : unsigned long int _buffers_sent;
136 : };
137 :
138 : template <typename Object, typename Context>
139 3653 : SendBuffer<Object, Context>::SendBuffer(const libMesh::Parallel::Communicator & comm,
140 : const Context * const context,
141 : const processor_id_type pid,
142 : const ParallelStudyMethod & method,
143 : const unsigned int min_buffer_size,
144 : const unsigned int max_buffer_size,
145 : const Real buffer_growth_multiplier,
146 : const Real buffer_shrink_multiplier,
147 : const Parallel::MessageTag object_buffer_tag)
148 : : ParallelObject(comm),
149 3653 : _context(context),
150 3653 : _pid(pid),
151 3653 : _method(method),
152 3653 : _min_buffer_size(min_buffer_size),
153 3653 : _max_buffer_size(max_buffer_size),
154 3653 : _buffer_growth_multiplier(buffer_growth_multiplier),
155 3653 : _buffer_shrink_multiplier(buffer_shrink_multiplier),
156 3653 : _object_buffer_tag(object_buffer_tag),
157 3653 : _current_buffer_size(_min_buffer_size),
158 3653 : _current_buffer_size_real(_min_buffer_size),
159 3653 : _buffer_size_bytes(0),
160 3653 : _objects_sent(0),
161 3653 : _buffers_sent(0)
162 : {
163 3653 : _buffer.reserve(max_buffer_size);
164 3653 : }
165 :
166 : template <typename Object, typename Context>
167 7306 : SendBuffer<Object, Context>::~SendBuffer()
168 : {
169 3653 : waitAll();
170 : cleanupRequests();
171 14612 : }
172 :
173 : template <typename Object, typename Context>
174 : void
175 1259326 : SendBuffer<Object, Context>::moveObject(std::shared_ptr<Object> & object)
176 : {
177 1259326 : _buffer_size_bytes +=
178 1259326 : Parallel::Packing<std::shared_ptr<Object>>::packable_size(object, _context) *
179 : sizeof(typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type);
180 1259326 : _buffer.emplace_back(std::move(object));
181 :
182 : // Force a send with SMART if we find it appropriate
183 2517172 : if ((_method == ParallelStudyMethod::SMART &&
184 1259326 : (_buffer.size() >= _current_buffer_size || _buffer.size() == _max_buffer_size)) ||
185 1247695 : (_buffer_size_bytes > 1048576)) // 1 MB
186 : {
187 11631 : _current_buffer_size_real =
188 11631 : std::min(_buffer_growth_multiplier * _current_buffer_size_real, (Real)_max_buffer_size);
189 :
190 11631 : if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
191 0 : _current_buffer_size = _current_buffer_size_real;
192 :
193 11631 : forceSend(false);
194 : }
195 1259326 : }
196 :
197 : template <typename Object, typename Context>
198 : void
199 350148 : SendBuffer<Object, Context>::forceSend(const bool shrink_current_buffer_size)
200 : {
201 350148 : if (!_buffer.empty())
202 : {
203 : auto req = std::make_shared<Parallel::Request>();
204 :
205 16959 : _requests.push_back(req);
206 :
207 16959 : _buffer_size_bytes = 0;
208 16959 : _objects_sent += _buffer.size();
209 16959 : _buffers_sent++;
210 :
211 : std::shared_ptr<std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
212 33918 : buffer = _buffer_pool.acquire();
213 :
214 16959 : comm().nonblocking_send_packed_range(
215 16959 : _pid, _context, _buffer.begin(), _buffer.end(), *req, buffer, _object_buffer_tag);
216 :
217 16959 : _buffer.clear();
218 16959 : _buffer.reserve(_max_buffer_size);
219 :
220 16959 : if (_method == ParallelStudyMethod::SMART && shrink_current_buffer_size)
221 : {
222 5308 : _current_buffer_size_real =
223 5308 : std::max((Real)_min_buffer_size, _current_buffer_size_real * _buffer_shrink_multiplier);
224 :
225 5308 : if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
226 0 : _current_buffer_size = _current_buffer_size_real;
227 : }
228 : }
229 :
230 : cleanupRequests();
231 350148 : }
232 :
233 : template <typename Object, typename Context>
234 : void
235 1898 : SendBuffer<Object, Context>::clear()
236 : {
237 1898 : _buffer.clear();
238 : _requests.clear();
239 :
240 1898 : _objects_sent = 0;
241 1898 : _buffers_sent = 0;
242 1898 : }
243 :
244 : template <typename Object, typename Context>
245 : void
246 : SendBuffer<Object, Context>::cleanupRequests()
247 : {
248 353801 : _requests.remove_if(
249 16959 : [](std::shared_ptr<Parallel::Request> & req)
250 : {
251 16959 : if (req->test())
252 : {
253 16959 : req->wait(); // MUST call wait() to do post_wait_work
254 16959 : return true;
255 : }
256 : else
257 : return false;
258 : });
259 : }
260 :
261 : template <typename Object, typename Context>
262 : void
263 3653 : SendBuffer<Object, Context>::waitAll()
264 : {
265 0 : std::for_each(_requests.begin(), _requests.end(), [](auto & request) { request->wait(); });
266 3653 : }
|