TIMPI
communicator.C
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 // Local includes
20 #include "timpi/communicator.h"
21 
22 // TIMPI includes
23 #include "timpi/parallel_implementation.h" // for inline max(int)
24 #include "timpi/timpi_assert.h"
25 
26 // Disable libMesh logging until we decide how to port it best
27 // #include "libmesh/libmesh_logging.h"
28 #define TIMPI_LOG_SCOPE(f,c)
29 
30 namespace TIMPI
31 {
32 
33 
34 // ------------------------------------------------------------
35 // Simple Communicator member functions
36 
37 void Communicator::reference_unique_tag(int tagvalue) const
38 {
39  // This had better be an already-acquired tag.
40  timpi_assert(used_tag_values.count(tagvalue));
41 
42  used_tag_values[tagvalue]++;
43 }
44 
45 
46 void Communicator::dereference_unique_tag(int tagvalue) const
47 {
48  // This had better be an already-acquired tag.
49  timpi_assert(used_tag_values.count(tagvalue));
50 
51  used_tag_values[tagvalue]--;
52  // If we don't have any more outstanding references, we
53  // don't even need to keep this tag in our "used" set.
54  if (!used_tag_values[tagvalue])
55  used_tag_values.erase(tagvalue);
56 }
57 
58 
60 #ifdef TIMPI_HAVE_MPI
61  _communicator(MPI_COMM_SELF),
62 #endif
63  _rank(0),
64  _size(1),
65  _send_mode(DEFAULT),
66  _sync_type(NBX),
67  used_tag_values(),
68  _next_tag(0),
69  _max_tag(std::numeric_limits<int>::max()),
70  _I_duped_it(false) {}
71 
72 
74 #ifdef TIMPI_HAVE_MPI
75  _communicator(MPI_COMM_SELF),
76 #endif
77  _rank(0),
78  _size(1),
79  _send_mode(DEFAULT),
80  _sync_type(NBX),
81  used_tag_values(),
82  _next_tag(0),
83  _max_tag(std::numeric_limits<int>::max()),
84  _I_duped_it(false)
85 {
86  this->assign(comm);
87 }
88 
89 
91 {
92  this->clear();
93 }
94 
95 
96 #ifdef TIMPI_HAVE_MPI
97 void Communicator::split(int color, int key, Communicator & target) const
98 {
99  target.clear();
100  MPI_Comm newcomm;
101  timpi_call_mpi
102  (MPI_Comm_split(this->get(), color, key, &newcomm));
103 
104  target.assign(newcomm);
105  target._I_duped_it = (color != MPI_UNDEFINED);
106  target.send_mode(this->send_mode());
107  target.sync_type(this->sync_type());
108 }
109 
110 
111 void Communicator::split_by_type(int split_type, int key, info i, Communicator & target) const
112 {
113  target.clear();
114  MPI_Comm newcomm;
115  timpi_call_mpi
116  (MPI_Comm_split_type(this->get(), split_type, key, i, &newcomm));
117 
118  target.assign(newcomm);
119  target._I_duped_it = (split_type != MPI_UNDEFINED);
120  target.send_mode(this->send_mode());
121  target.sync_type(this->sync_type());
122 }
123 
124 #else
125 void Communicator::split(int, int, Communicator & target) const
126 {
127  target.assign(this->get());
128 }
129 
130 void Communicator::split_by_type(int, int, info, Communicator & target) const
131 {
132  target.assign(this->get());
133 }
134 #endif
135 
136 
138 {
139  this->duplicate(comm._communicator);
140  this->send_mode(comm.send_mode());
141  this->sync_type(comm.sync_type());
142 }
143 
144 
145 #ifdef TIMPI_HAVE_MPI
147 {
148  if (_communicator != MPI_COMM_NULL)
149  {
150  timpi_call_mpi
151  (MPI_Comm_dup(comm, &_communicator));
152 
153  _I_duped_it = true;
154  }
155  this->assign(_communicator);
156 }
157 #else
158 void Communicator::duplicate(const communicator &) { }
159 #endif
160 
161 
163 #ifdef TIMPI_HAVE_MPI
164  if (_I_duped_it)
165  {
166  timpi_assert (_communicator != MPI_COMM_NULL);
167  timpi_call_mpi
168  (MPI_Comm_free(&_communicator));
169 
170  _communicator = MPI_COMM_NULL;
171  }
172  _I_duped_it = false;
173 #endif
174 }
175 
176 
178 {
179  this->clear();
180  this->assign(comm);
181  return *this;
182 }
183 
184 
186 {
187  _communicator = comm;
188 #ifdef TIMPI_HAVE_MPI
189  if (_communicator != MPI_COMM_NULL)
190  {
191  int i;
192  timpi_call_mpi
193  (MPI_Comm_size(_communicator, &i));
194 
195  timpi_assert_greater (i, 0);
196  _size = cast_int<processor_id_type>(i);
197 
198  timpi_call_mpi
199  (MPI_Comm_rank(_communicator, &i));
200 
201  timpi_assert_greater_equal (i, 0);
202  _rank = cast_int<processor_id_type>(i);
203 
204  int * maxTag;
205  int flag = false;
206  timpi_call_mpi(MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &maxTag, &flag));
207  timpi_assert(flag);
208  _max_tag = *maxTag;
209  }
210  else
211  {
212  _rank = 0;
213  _size = 1;
214  _max_tag = std::numeric_limits<int>::max();
215  }
216  _next_tag = _max_tag / 2;
217 #endif
218 }
219 
220 
224 #ifdef TIMPI_HAVE_MPI
226 {
227  if (this->size() > 1)
228  {
229  TIMPI_LOG_SCOPE("barrier()", "Communicator");
230  timpi_call_mpi(MPI_Barrier (this->get()));
231  }
232 }
233 #else
234 void Communicator::barrier () const {}
235 #endif
236 
237 #ifdef TIMPI_HAVE_MPI
239 {
240  if (this->size() > 1)
241  {
242  TIMPI_LOG_SCOPE("nonblocking_barrier()", "Communicator");
243  timpi_call_mpi(MPI_Ibarrier (this->get(), req.get()));
244  }
245 }
246 #else
247 void Communicator::nonblocking_barrier (Request & /*req*/) const {}
248 #endif
249 
250 
252 {
253  if (tagvalue == MessageTag::invalid_tag)
254  {
255 #ifndef NDEBUG
256  // Automatic tag values have to be requested in sync
257  int maxval = _next_tag;
258  this->max(maxval);
259  timpi_assert_equal_to(_next_tag, maxval);
260 #endif
261  tagvalue = _next_tag++;
262  }
263 
264  if (used_tag_values.count(tagvalue))
265  {
266  // Get the largest value in the used values, and pick one
267  // larger
268  tagvalue = used_tag_values.rbegin()->first+1;
269  timpi_assert(!used_tag_values.count(tagvalue));
270  }
271  if (tagvalue >= _next_tag)
272  _next_tag = tagvalue+1;
273 
274  if (_next_tag >= _max_tag)
275  _next_tag = _max_tag/2;
276 
277  used_tag_values[tagvalue] = 1;
278 
279  return MessageTag(tagvalue, this);
280 }
281 
282 
283 status Communicator::probe (const unsigned int src_processor_id,
284  const MessageTag & tag) const
285 {
286  TIMPI_LOG_SCOPE("probe()", "Communicator");
287 
288 #ifndef TIMPI_HAVE_MPI
289  timpi_not_implemented();
290  ignore(src_processor_id, tag);
291 #endif
292 
293  status stat;
294 
295  timpi_assert(src_processor_id < this->size() ||
296  src_processor_id == any_source);
297 
298  timpi_call_mpi
299  (MPI_Probe (int(src_processor_id), tag.value(), this->get(), &stat));
300 
301  return stat;
302 }
303 
304 
305 bool Communicator::verify(const bool & r) const
306 {
307  const unsigned char rnew = r;
308  return this->verify(rnew);
309 }
310 
311 
312 bool Communicator::semiverify(const bool * r) const
313 {
314  if (r)
315  {
316  const unsigned char rnew = *r;
317  return this->semiverify(&rnew);
318  }
319 
320  const unsigned char * rptr = nullptr;
321  return this->semiverify(rptr);
322 }
323 
324 
325 bool Communicator::verify(const std::string & r) const
326 {
327  if (this->size() > 1)
328  {
329  // Cannot use <char> since MPI_MIN is not
330  // strictly defined for chars!
331  std::vector<short int> temp; temp.reserve(r.size());
332  for (std::size_t i=0; i != r.size(); ++i)
333  temp.push_back(r[i]);
334  return this->verify(temp);
335  }
336  return true;
337 }
338 
339 
340 bool Communicator::semiverify(const std::string * r) const
341 {
342  if (this->size() > 1)
343  {
344  std::size_t rsize = r ? r->size() : 0;
345  std::size_t * psize = r ? &rsize : nullptr;
346 
347  if (!this->semiverify(psize))
348  return false;
349 
350  this->max(rsize);
351 
352  // Cannot use <char> since MPI_MIN is not
353  // strictly defined for chars!
354  std::vector<short int> temp (rsize);
355  if (r)
356  {
357  temp.reserve(rsize);
358  for (std::size_t i=0; i != rsize; ++i)
359  temp.push_back((*r)[i]);
360  }
361 
362  std::vector<short int> * ptemp = r ? &temp: nullptr;
363 
364  return this->semiverify(ptemp);
365  }
366  return true;
367 }
368 
369 
370 void Communicator::min(bool & r) const
371 {
372  if (this->size() > 1)
373  {
374  TIMPI_LOG_SCOPE("min(bool)", "Communicator");
375 
376  unsigned int temp = r;
377  timpi_call_mpi
378  (MPI_Allreduce (MPI_IN_PLACE, &temp, 1,
379  StandardType<unsigned int>(),
380  OpFunction<unsigned int>::min(),
381  this->get()));
382  r = temp;
383  }
384 }
385 
386 
387 void Communicator::minloc(bool & r,
388  unsigned int & min_id) const
389 {
390  if (this->size() > 1)
391  {
392  TIMPI_LOG_SCOPE("minloc(bool)", "Communicator");
393 
394  DataPlusInt<int> data_in;
395  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
396  data_in.val = r;
397  data_in.rank = this->rank();
398  DataPlusInt<int> data_out = data_in;
399 
400  timpi_call_mpi
401  (MPI_Allreduce (&data_in, &data_out, 1,
402  dataplusint_type_acquire<int>().first,
403  OpFunction<int>::min_location(), this->get()));
404  r = data_out.val;
405  min_id = data_out.rank;
406  }
407  else
408  min_id = this->rank();
409 }
410 
411 
412 void Communicator::max(bool & r) const
413 {
414  if (this->size() > 1)
415  {
416  TIMPI_LOG_SCOPE("max(bool)", "Communicator");
417 
418  unsigned int temp = r;
419  timpi_call_mpi
420  (MPI_Allreduce (MPI_IN_PLACE, &temp, 1,
421  StandardType<unsigned int>(),
422  OpFunction<unsigned int>::max(),
423  this->get()));
424  r = temp;
425  }
426 }
427 
428 
429 void Communicator::maxloc(bool & r,
430  unsigned int & max_id) const
431 {
432  if (this->size() > 1)
433  {
434  TIMPI_LOG_SCOPE("maxloc(bool)", "Communicator");
435 
436  DataPlusInt<int> data_in;
437  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
438  data_in.val = r;
439  data_in.rank = this->rank();
440  DataPlusInt<int> data_out = data_in;
441 
442  timpi_call_mpi
443  (MPI_Allreduce (&data_in, &data_out, 1,
444  dataplusint_type_acquire<int>().first,
445  OpFunction<int>::max_location(),
446  this->get()));
447  r = data_out.val;
448  max_id = data_out.rank;
449  }
450  else
451  max_id = this->rank();
452 }
453 
454 
455 void Communicator::sync_type(const std::string & st)
456 {
457  SyncType type = NBX;
458  if (st == "sendreceive")
459  type = SENDRECEIVE;
460  else if (st == "alltoall")
461  type = ALLTOALL_COUNTS;
462  else if (st != "nbx")
463  timpi_error_msg("Unrecognized TIMPI sync type " << st);
464  this->sync_type(type);
465 }
466 
467 
468 } // namespace TIMPI
void send_mode(const SendMode sm)
Explicitly sets the SendMode type used for send operations.
Definition: communicator.h:333
communicator _communicator
Definition: communicator.h:236
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
MPI_Info info
Info object used by some MPI-3 methods.
Definition: communicator.h:79
std::map< int, unsigned int > used_tag_values
Definition: communicator.h:243
void minloc(T &r, unsigned int &min_id) const
Take a local variable and replace it with the minimum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the minimum value.
int value() const
Definition: message_tag.h:92
MessageTag get_unique_tag(int tagvalue=MessageTag::invalid_tag) const
Get a tag that is unique to this Communicator.
Definition: communicator.C:251
void barrier() const
Pause execution until all processors reach a certain point.
Definition: communicator.C:225
processor_id_type rank() const
Definition: communicator.h:208
SyncType sync_type() const
Gets the user-requested SyncType.
Definition: communicator.h:355
void clear()
Free and reset this communicator.
Definition: communicator.C:162
void assign(const communicator &comm)
Utility function for setting our member variables from an MPI communicator.
Definition: communicator.C:185
void ignore(const Args &...)
Definition: timpi_assert.h:54
processor_id_type _size
Definition: communicator.h:237
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
processor_id_type size() const
Definition: communicator.h:211
static const int invalid_tag
Invalid tag, to allow for default construction.
Definition: message_tag.h:53
Encapsulates the MPI tag integers.
Definition: message_tag.h:46
void min(const T &r, T &o, Request &req) const
Non-blocking minimum of the local value r into o with the request req.
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
Definition: communicator.C:283
processor_id_type _rank
Definition: communicator.h:237
void split(int color, int key, Communicator &target) const
Definition: communicator.C:97
Communicator & operator=(const Communicator &)=delete
SyncType
What algorithm to use for parallel synchronization?
Definition: communicator.h:225
void maxloc(T &r, unsigned int &max_id) const
Take a local variable and replace it with the maximum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the maximum value.
timpi_pure bool semiverify(const T *r) const
Check whether a local pointer points to the same value on all processors where it is not null...
void split_by_type(int split_type, int key, info i, Communicator &target) const
Definition: communicator.C:111
Encapsulates the MPI_Request.
Definition: request.h:67
timpi_pure bool verify(const T &r) const
Check whether a local variable has the same value on all processors, returning true if it does or fal...
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
DIE A HORRIBLE DEATH HERE typedef MPI_Comm communicator
Communicator object for talking with subsets of processors.
Definition: communicator.h:74
Communicator()
Default Constructor.
Definition: communicator.C:59
void dereference_unique_tag(int tagvalue) const
Dereference an already-acquired tag, and see if we can re-release it.
Definition: communicator.C:46
request * get()
Definition: request.h:84
SendMode send_mode() const
Gets the user-requested SendMode.
Definition: communicator.h:338
void reference_unique_tag(int tagvalue) const
Reference an already-acquired tag, so that we know it will be dereferenced multiple times before we c...
Definition: communicator.C:37
void duplicate(const Communicator &comm)
Definition: communicator.C:137
void nonblocking_barrier(Request &req) const
Start a barrier that doesn&#39;t block.
Definition: communicator.C:238