Line data Source code
1 : //* This file is part of the MOOSE framework
2 : //* https://mooseframework.inl.gov
3 : //*
4 : //* All rights reserved, see COPYRIGHT for full restrictions
5 : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
6 : //*
7 : //* Licensed under LGPL 2.1, please see LICENSE for details
8 : //* https://www.gnu.org/licenses/lgpl-2.1.html
9 :
10 : #pragma once
11 :
12 : // Moose Includes
13 : #include "LIFOBuffer.h"
14 : #include "SharedPool.h"
15 :
16 : // Local includes
17 : #include "ParallelStudyMethod.h"
18 :
19 : // libMesh Includes
20 : #include "libmesh/parallel.h"
21 : #include "libmesh/parallel_object.h"
22 :
23 : // System Includes
24 : #include <list>
25 : #include <chrono>
26 :
27 : template <typename Object, typename Context>
28 : class ReceiveBuffer : public libMesh::ParallelObject
29 : {
30 : public:
31 : ReceiveBuffer(const libMesh::Parallel::Communicator & comm,
32 : Context * const context,
33 : const ParallelStudyMethod method,
34 : const unsigned int clicks_per_receive,
35 : const Parallel::MessageTag object_buffer_tag);
36 :
37 : /**
38 : * Destructor: ensures that all send requests have completed
39 : */
40 : ~ReceiveBuffer() override;
41 :
42 : /**
43 : * Whether or not there are messages that are being currently received
44 : */
45 75 : bool currentlyReceiving() const { return _requests.size(); }
46 :
47 : /**
48 : * The work received since the last reset
49 : */
50 14 : unsigned long int objectsReceived() const { return _objects_received; }
51 :
52 : /**
53 : * The number of buffers received since the last reset
54 : */
55 14 : unsigned long int buffersReceived() const { return _buffers_received; }
56 :
57 : /**
58 : * The total number of times we've polled for messages
59 : */
60 14 : unsigned long int numProbes() const { return _num_probes; }
61 :
62 : /**
63 : * Number of buffers created in the object buffer pool
64 : */
65 : unsigned long int objectPoolCreated() const { return _object_buffer_pool.num_created(); }
66 :
67 : /**
68 : * Number of buffers created in the buffer pool
69 : */
70 : unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
71 :
72 : /**
73 : * Start receives for all currently available messages
74 : *
75 : * Adds the to the working buffer
76 : */
77 : void receive(const bool start_receives_only = false);
78 :
79 : /**
80 : * Clear all existing data
81 : */
82 : void clear();
83 :
84 : /**
85 : * Amount of time (in seconds) spent in the loop checking for messages and creating Receives
86 : */
87 : Real receiveLoopTime() const { return std::chrono::duration<Real>(_receive_loop_time).count(); }
88 :
89 : /**
90 : * Amount of time (in seconds) spent finishing receives and collecting objects
91 : */
92 : Real cleanupRequestsTime() const
93 : {
94 : return std::chrono::duration<Real>(_cleanup_requests_time).count();
95 : }
96 :
97 : /**
98 : * Checks to see if any Requests can be finished
99 : */
100 : void cleanupRequests();
101 :
102 : /**
103 : * Gets the buffer that the received objects are filled into after the requests are finished.
104 : */
105 : MooseUtils::LIFOBuffer<std::shared_ptr<Object>> & buffer() { return _buffer; }
106 :
107 : private:
108 : /// The context
109 : Context * const _context;
110 : /// The buffer that finished requests are filled into
111 : MooseUtils::LIFOBuffer<std::shared_ptr<Object>> _buffer;
112 :
113 : /// The method
114 : const ParallelStudyMethod _method;
115 : /// Iterations to wait before checking for new objects
116 : const unsigned int _clicks_per_receive;
117 :
118 : /// List of Requests and buffers for each request
119 : std::list<std::pair<std::shared_ptr<Parallel::Request>,
120 : std::shared_ptr<std::vector<std::shared_ptr<Object>>>>>
121 : _requests;
122 :
123 : /// MessageTag for sending objects
124 : const Parallel::MessageTag _object_buffer_tag;
125 :
126 : /// Receive loop time
127 : std::chrono::steady_clock::duration _receive_loop_time;
128 : /// Time cleaning up requests
129 : std::chrono::steady_clock::duration _cleanup_requests_time;
130 :
131 : /// Total objects received
132 : unsigned long int _objects_received;
133 : /// Total object buffers received
134 : unsigned long int _buffers_received;
135 : /// Total number of times we've polled for messages
136 : unsigned long int _num_probes;
137 :
138 : /// Shared pool of object buffers for incoming messages
139 : MooseUtils::SharedPool<std::vector<std::shared_ptr<Object>>> _object_buffer_pool;
140 : /// Shared pool of buffers
141 : MooseUtils::SharedPool<
142 : std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
143 : _buffer_pool;
144 :
145 : template <typename C, typename OutputIter, typename T>
146 20 : inline void blocking_receive_packed_range(const Parallel::Communicator & comm,
147 : const processor_id_type src_processor_id,
148 : C * context,
149 : OutputIter out,
150 : const T * /* output_type */,
151 : Parallel::Request & req,
152 : Parallel::Status & stat,
153 : const Parallel::MessageTag & tag) const
154 : {
155 : libmesh_experimental();
156 :
157 : typedef typename Parallel::Packing<T>::buffer_type buffer_t;
158 :
159 : // Receive serialized variable size objects as a sequence of
160 : // buffer_t.
161 : // Allocate a buffer on the heap so we don't have to free it until
162 : // after the Request::wait()
163 20 : std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
164 20 : comm.receive(src_processor_id, *buffer, tag);
165 :
166 : // Make the Request::wait() handle unpacking the buffer
167 20 : req.add_post_wait_work(
168 20 : new libMesh::Parallel::PostWaitUnpackBuffer<std::vector<buffer_t>, C, OutputIter, T>(
169 : *buffer, context, out));
170 :
171 : // Make the Request::wait() handle deleting the buffer
172 20 : req.add_post_wait_work(
173 20 : new libMesh::Parallel::PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
174 20 : }
175 : };
176 :
177 : template <typename Object, typename Context>
178 3827 : ReceiveBuffer<Object, Context>::ReceiveBuffer(const libMesh::Parallel::Communicator & comm,
179 : Context * const context,
180 : const ParallelStudyMethod method,
181 : const unsigned int clicks_per_receive,
182 : const Parallel::MessageTag object_buffer_tag)
183 : : ParallelObject(comm),
184 3827 : _context(context),
185 3827 : _method(method),
186 3827 : _clicks_per_receive(clicks_per_receive),
187 3827 : _object_buffer_tag(object_buffer_tag)
188 : {
189 3827 : }
190 :
191 : template <typename Object, typename Context>
192 7134 : ReceiveBuffer<Object, Context>::~ReceiveBuffer()
193 : {
194 3567 : if (!_requests.empty())
195 0 : mooseError("Some requests not serviced!");
196 7134 : }
197 :
198 : template <typename Object, typename Context>
199 : void
200 4465854 : ReceiveBuffer<Object, Context>::receive(const bool start_receives_only /* = false */)
201 : {
202 4465854 : bool flag = false;
203 : Parallel::Status stat;
204 :
205 : static unsigned int current_clicks = 0;
206 :
207 4465854 : if (current_clicks % _clicks_per_receive == 0)
208 : {
209 4465854 : current_clicks = 0;
210 :
211 : // Receive and process a bunch of objects
212 4482813 : do
213 : {
214 4482813 : stat = _communicator.template packed_range_probe<std::shared_ptr<Object>>(
215 4482813 : Parallel::any_source, _object_buffer_tag, flag);
216 :
217 4482813 : _num_probes++;
218 :
219 4482813 : if (flag)
220 : {
221 : auto req = std::make_shared<Parallel::Request>();
222 33918 : std::shared_ptr<std::vector<std::shared_ptr<Object>>> objects =
223 : _object_buffer_pool.acquire();
224 :
225 : // Make sure the buffer is clear - this shouldn't resize the storage though.
226 16959 : objects->clear();
227 :
228 16959 : if (_method == ParallelStudyMethod::HARM || _method == ParallelStudyMethod::BS)
229 20 : blocking_receive_packed_range(comm(),
230 : stat.source(),
231 20 : _context,
232 : std::back_inserter(*objects),
233 : (std::shared_ptr<Object> *)(libmesh_nullptr),
234 : *req,
235 : stat,
236 : _object_buffer_tag);
237 : else
238 : {
239 : std::shared_ptr<
240 : std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
241 33878 : buffer = _buffer_pool.acquire();
242 :
243 16939 : _communicator.nonblocking_receive_packed_range(
244 : stat.source(),
245 16939 : _context,
246 : std::back_inserter(*objects),
247 : (std::shared_ptr<Object> *)(libmesh_nullptr),
248 : *req,
249 : stat,
250 : buffer,
251 : _object_buffer_tag);
252 : }
253 :
254 16959 : _requests.emplace_back(req, objects);
255 : }
256 : } while (flag);
257 : }
258 :
259 4465854 : current_clicks++;
260 :
261 4465854 : if (!start_receives_only)
262 : cleanupRequests();
263 4465854 : }
264 :
265 : template <typename Object, typename Context>
266 : void
267 8023 : ReceiveBuffer<Object, Context>::clear()
268 : {
269 : _requests.clear();
270 :
271 8023 : _receive_loop_time = std::chrono::steady_clock::duration::zero();
272 8023 : _cleanup_requests_time = std::chrono::steady_clock::duration::zero();
273 :
274 8023 : _objects_received = 0;
275 8023 : _buffers_received = 0;
276 8023 : _num_probes = 0;
277 8023 : }
278 :
279 : template <typename Object, typename Context>
280 : void
281 : ReceiveBuffer<Object, Context>::cleanupRequests()
282 : {
283 4417935 : _requests.remove_if(
284 16959 : [&](std::pair<std::shared_ptr<Parallel::Request>,
285 : std::shared_ptr<std::vector<std::shared_ptr<Object>>>> & request_pair)
286 : {
287 : auto req = request_pair.first;
288 : auto objects = request_pair.second;
289 :
290 16959 : if (req->test()) // See if the receive has completed
291 : {
292 16959 : req->wait(); // MUST call wait() to do post_wait_work which actually fills the object
293 : // buffer
294 :
295 16959 : _buffers_received++;
296 16959 : _objects_received += objects->size();
297 :
298 16959 : if (_buffer.capacity() < _buffer.size() + objects->size())
299 : _buffer.setCapacity(_buffer.size() + objects->size());
300 :
301 1276285 : for (auto & object : *objects)
302 1259326 : _buffer.move(object);
303 :
304 16959 : objects->clear();
305 :
306 16959 : return true;
307 : }
308 : else
309 : return false;
310 : });
311 4417935 : }
|