TIMPI
request.C
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 // Local includes
19 #include "timpi/request.h"
20 
21 // TIMPI includes
22 #include "timpi/timpi_call_mpi.h"
23 #include "timpi/timpi_assert.h"
24 #include "timpi/post_wait_work.h"
25 #include "timpi/status.h"
26 
27 // Disable libMesh logging until we decide how to port it best
28 // #include "libmesh/libmesh_logging.h"
29 #define TIMPI_LOG_SCOPE(f,c)
30 
31 // C++ includes
32 #include <memory>
33 #include <vector>
34 #include <utility>
35 
36 
37 namespace TIMPI
38 {
39 
40 #ifdef TIMPI_HAVE_MPI
41 const request Request::null_request = MPI_REQUEST_NULL;
42 #else
44 #endif
45 
46 // ------------------------------------------------------------
47 // Request member functions
49  _request(null_request),
50  post_wait_work(nullptr)
51 {}
52 
54  _request(r),
55  post_wait_work(nullptr)
56 {}
57 
58 Request::Request (const Request & other) :
59  _request(other._request),
60  post_wait_work(other.post_wait_work)
61 {
62  if (other._prior_request.get())
63  _prior_request = std::unique_ptr<Request>
64  (new Request(*other._prior_request.get()));
65 
66  // operator= should behave like a shared pointer
67  if (post_wait_work)
68  post_wait_work->second++;
69 }
70 
72 {
73  if (post_wait_work)
74  {
75  // Decrement the use count
76  post_wait_work->second--;
77 
78  if (!post_wait_work->second)
79  {
80 #ifdef DEBUG
81  // If we're done using this request, then we'd better have
82  // done the work we waited for
83  for (const auto & item : post_wait_work->first)
84  timpi_assert(!item);
85 #endif
86  delete post_wait_work;
87  post_wait_work = nullptr;
88  }
89  }
90 }
91 
93 {
94  this->cleanup();
95  _request = other._request;
97 
98  if (other._prior_request.get())
99  _prior_request = std::unique_ptr<Request>
100  (new Request(*other._prior_request.get()));
101 
102  // operator= should behave like a shared pointer
103  if (post_wait_work)
104  post_wait_work->second++;
105 
106  return *this;
107 }
108 
110 {
111  this->cleanup();
112  _request = r;
113  post_wait_work = nullptr;
114  return *this;
115 }
116 
118  this->cleanup();
119 }
120 
122 {
123  TIMPI_LOG_SCOPE("wait()", "Request");
124 
125  if (_prior_request.get())
126  {
127  _prior_request->wait();
128  _prior_request.reset(nullptr);
129  }
130 
131  Status stat {};
132 #ifdef TIMPI_HAVE_MPI
133  timpi_call_mpi
134  (MPI_Wait (&_request, stat.get()));
135 #endif
136  if (post_wait_work)
137  {
138  for (auto & item : post_wait_work->first)
139  {
140  // The user should never try to give us non-existent work or try
141  // to wait() twice.
142  timpi_assert (item);
143  item->run();
144  delete item;
145  item = nullptr;
146  }
147  post_wait_work->first.clear();
148  }
149 
150  return stat;
151 }
152 
154 {
155 #ifdef TIMPI_HAVE_MPI
156  int val=0;
157 
158  timpi_call_mpi
159  (MPI_Test (&_request, &val, MPI_STATUS_IGNORE));
160 
161  if (val)
162  {
163  timpi_assert (_request == null_request);
164  timpi_assert_equal_to (val, 1);
165  }
166 
167  return val;
168 #else
169  return true;
170 #endif
171 }
172 
173 #ifdef TIMPI_HAVE_MPI
174 bool Request::test (status & stat)
175 {
176  int val=0;
177 
178  timpi_call_mpi
179  (MPI_Test (&_request, &val, &stat));
180 
181  return val;
182 }
183 #else
184 bool Request::test (status &)
185 {
186  return true;
187 }
188 #endif
189 
191 {
192  // We're making a chain of prior requests, not a tree
193  timpi_assert(!req._prior_request.get());
194 
195  Request * new_prior_req = new Request(req);
196 
197  // new_prior_req takes ownership of our existing _prior_request
198  new_prior_req->_prior_request.reset(this->_prior_request.release());
199 
200  // Our _prior_request now manages the new resource we just set up
201  this->_prior_request.reset(new_prior_req);
202 }
203 
205 {
206  if (!post_wait_work)
207  post_wait_work = new
208  std::pair<std::vector <PostWaitWork * >, unsigned int>
209  (std::vector <PostWaitWork * >(), 1);
210  post_wait_work->first.push_back(work);
211 }
212 
213 void wait (std::vector<Request> & r)
214 {
215  for (auto & req : r)
216  req.wait();
217 }
218 
219 std::size_t waitany (std::vector<Request> & r)
220 {
221  timpi_assert(!r.empty());
222 
223  int r_size = cast_int<int>(r.size());
224  std::vector<request> raw(r_size);
225  int non_null = r_size;
226  for (int i=0; i != r_size; ++i)
227  {
228  Request * root = &r[i];
229  // If we have prior requests, we need to complete the first one
230  // first
231  while (root->_prior_request.get())
232  root = root->_prior_request.get();
233  raw[i] = *root->get();
234 
235  if (raw[i] != Request::null_request)
236  non_null = std::min(non_null,i);
237  }
238 
239  if (non_null == r_size)
240  return std::size_t(-1);
241 
242  int index = non_null;
243 
244 #ifdef TIMPI_HAVE_MPI
245  bool only_priors_completed = false;
246  Request * next;
247 
248  do
249  {
250  timpi_call_mpi
251  (MPI_Waitany(r_size, raw.data(), &index, MPI_STATUS_IGNORE));
252 
253  timpi_assert_not_equal_to(index, MPI_UNDEFINED);
254 
255  timpi_assert_less(index, r_size);
256 
257  Request * completed = &r[index];
258  next = completed;
259 
260  // If we completed a prior request, we're not really done yet,
261  // so find the next in that line to try again.
262  while (completed->_prior_request.get())
263  {
264  only_priors_completed = true;
265  next = completed;
266  completed = completed->_prior_request.get();
267  }
268 
269  // MPI sets a completed MPI_Request to MPI_REQUEST_NULL; we want
270  // to preserve that
271  completed->_request = raw[index];
272 
273  // Do any post-wait work for the completed request
274  if (completed->post_wait_work)
275  for (auto & item : completed->post_wait_work->first)
276  {
277  // The user should never try to give us non-existent work or try
278  // to wait() twice.
279  timpi_assert (item);
280  item->run();
281  delete item;
282  item = nullptr;
283  }
284 
285  next->_prior_request.reset(nullptr);
286  raw[index] = *next->get();
287 
288  } while(only_priors_completed);
289 #else
290  r[index]._request = Request::null_request;
291 #endif
292 
293  return index;
294 }
295 
296 } // namespace TIMPI
void add_prior_request(const Request &req)
Definition: request.C:190
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
static const request null_request
Definition: request.h:111
void cleanup()
Definition: request.C:71
std::unique_ptr< Request > _prior_request
Definition: request.h:116
request _request
Definition: request.h:114
An abstract base class that can be subclassed to allow other code to perform work after a MPI_Wait su...
Status wait()
Definition: request.C:121
void add_post_wait_work(PostWaitWork *work)
Definition: request.C:204
Encapsulates the MPI_Request.
Definition: request.h:67
std::size_t waitany(std::vector< Request > &r)
Wait for at least one non-blocking operation to finish.
Definition: request.C:219
request * get()
Definition: request.h:84
Status wait(Request &r)
Wait for a non-blocking send or receive to finish.
Definition: request.h:135
Encapsulates the MPI_Status struct.
Definition: status.h:75
std::pair< std::vector< PostWaitWork *>, unsigned int > * post_wait_work
Definition: request.h:124
bool test()
Definition: request.C:153
Request & operator=(const Request &other)
Definition: request.C:92