TIMPI
parallel_implementation.h
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 #ifndef TIMPI_PARALLEL_IMPLEMENTATION_H
20 #define TIMPI_PARALLEL_IMPLEMENTATION_H
21 
22 // TIMPI includes
23 #include "timpi/attributes.h"
24 #include "timpi/communicator.h"
25 #include "timpi/data_type.h"
26 #include "timpi/timpi_call_mpi.h"
27 #include "timpi/message_tag.h"
28 #include "timpi/op_function.h"
29 #include "timpi/packing.h"
30 #include "timpi/timpi_assert.h"
38 #include "timpi/post_wait_work.h"
39 #include "timpi/request.h"
40 #include "timpi/status.h"
41 #include "timpi/standard_type.h"
42 
43 #ifndef TIMPI_HAVE_MPI
45 #endif
46 
47 // Boost include if necessary for float128
48 #ifdef TIMPI_DEFAULT_QUADRUPLE_PRECISION
49 # include <boost/multiprecision/float128.hpp>
50 #endif
51 
52 // Disable libMesh logging until we decide how to port it best
53 // #include "libmesh/libmesh_logging.h"
54 #define TIMPI_LOG_SCOPE(f,c)
55 
56 // C++ includes
57 #include <complex>
58 #include <cstddef>
59 #include <iterator>
60 #include <limits>
61 #include <map>
62 #include <memory>
63 #include <set>
64 #include <string>
65 #include <utility>
66 #include <vector>
67 #include <type_traits>
68 
69 namespace TIMPI {
70 
72 
73 #ifdef TIMPI_HAVE_MPI
74 
80 template <typename T>
81 inline data_type dataplusint_type() { return MPI_DATATYPE_NULL; }
82 
83 #endif // TIMPI_HAVE_MPI
84 
88 template <typename T>
90 {
91 public:
92  T val;
93  int rank;
94 };
95 
96 } // namespace Parallel
97 
98 
99 // Anonymous namespace for helper functions
100 namespace {
101 
102 // Internal helper function to create vector<something_usable> from
103 // vector<bool> for compatibility with MPI bitwise operations
104 template <typename T, typename A1, typename A2>
105 inline void pack_vector_bool(const std::vector<bool,A1> & vec_in,
106  std::vector<T,A2> & vec_out)
107 {
108  unsigned int data_bits = 8*sizeof(T);
109  std::size_t in_size = vec_in.size();
110  std::size_t out_size = in_size/data_bits + ((in_size%data_bits)?1:0);
111  vec_out.clear();
112  vec_out.resize(out_size);
113  for (std::size_t i=0; i != in_size; ++i)
114  {
115  std::size_t index = i/data_bits;
116  std::size_t offset = i%data_bits;
117  vec_out[index] += (vec_in[i]?1u:0u) << offset;
118  }
119 }
120 
121 // Internal helper function to create vector<bool> from
122 // vector<something usable> for compatibility with MPI byte
123 // operations
124 template <typename T, typename A1, typename A2>
125 inline void unpack_vector_bool(const std::vector<T,A1> & vec_in,
126  std::vector<bool,A2> & vec_out)
127 {
128  unsigned int data_bits = 8*sizeof(T);
129  // We need the output vector to already be properly sized
130  std::size_t out_size = vec_out.size();
131  timpi_assert_equal_to
132  (out_size/data_bits + (out_size%data_bits?1:0), vec_in.size());
133 
134  for (std::size_t i=0; i != out_size; ++i)
135  {
136  std::size_t index = i/data_bits;
137  std::size_t offset = i%data_bits;
138  vec_out[i] = (vec_in[index] >> offset) & 1;
139  }
140 }
141 
142 
143 #ifdef TIMPI_HAVE_MPI
144 // We use a helper function here to avoid ambiguity when calling
145 // send_receive of (vector<vector<T>>,vector<vector<T>>)
146 template <typename T1, typename T2, typename A1, typename A2, typename A3, typename A4>
147 inline void send_receive_vec_of_vec(const unsigned int dest_processor_id,
148  const std::vector<std::vector<T1,A1>,A2> & send_data,
149  const unsigned int source_processor_id,
150  std::vector<std::vector<T2,A3>,A4> & recv_data,
151  const TIMPI::MessageTag & send_tag,
152  const TIMPI::MessageTag & recv_tag,
153  const TIMPI::Communicator & comm)
154 {
155  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
156 
157  if (dest_processor_id == comm.rank() &&
158  source_processor_id == comm.rank())
159  {
160  recv_data = send_data;
161  return;
162  }
163 
164  TIMPI::Request req;
165  comm.send (dest_processor_id, send_data, req, send_tag);
166  comm.receive (source_processor_id, recv_data, recv_tag);
167  req.wait();
168 }
169 
170 #endif // TIMPI_HAVE_MPI
171 
172 } // Anonymous namespace
173 
174 
175 
176 namespace TIMPI
177 {
178 
179 #ifdef TIMPI_HAVE_MPI
180 template<>
181 inline data_type dataplusint_type<short int>() { return MPI_SHORT_INT; }
182 
183 template<>
184 inline data_type dataplusint_type<int>() { return MPI_2INT; }
185 
186 template<>
187 inline data_type dataplusint_type<long>() { return MPI_LONG_INT; }
188 
189 template<>
190 inline data_type dataplusint_type<float>() { return MPI_FLOAT_INT; }
191 
192 template<>
193 inline data_type dataplusint_type<double>() { return MPI_DOUBLE_INT; }
194 
195 template<>
196 inline data_type dataplusint_type<long double>() { return MPI_LONG_DOUBLE_INT; }
197 
198 template <typename T>
199 inline
200 std::pair<data_type, std::unique_ptr<StandardType<std::pair<T,int>>>>
202 {
203  std::pair<data_type, std::unique_ptr<StandardType<std::pair<T,int>>>> return_val;
204  return_val.first = dataplusint_type<T>();
205  if (return_val.first == MPI_DATATYPE_NULL)
206  {
207  return_val.second.reset(new StandardType<std::pair<T,int>>());
208  return_val.first = *return_val.second;
209  }
210  return return_val;
211 }
212 
213 
214 
215 #if MPI_VERSION > 3
216  typedef MPI_Aint DispType;
217 # define TIMPI_COUNT_TYPE MPI_COUNT
218 # define TIMPI_PACK_SIZE MPI_Pack_size_c
219 # define TIMPI_SEND MPI_Send_c
220 # define TIMPI_SSEND MPI_Ssend_c
221 # define TIMPI_ALLREDUCE MPI_Allreduce_c
222 # define TIMPI_IALLREDUCE MPI_Iallreduce_c
223 # define TIMPI_ISEND MPI_Isend_c
224 # define TIMPI_ISSEND MPI_Issend_c
225 # define TIMPI_PACK MPI_Pack_c
226 # define TIMPI_UNPACK MPI_Unpack_c
227 # define TIMPI_RECV MPI_Recv_c
228 # define TIMPI_IRECV MPI_Irecv_c
229 # define TIMPI_SENDRECV MPI_Sendrecv_c
230 # define TIMPI_ALLGATHERV MPI_Allgatherv_c
231 # define TIMPI_ALLGATHER MPI_Allgather_c
232 # define TIMPI_BCAST MPI_Bcast_c
233 # define TIMPI_GATHER MPI_Gather_c
234 # define TIMPI_GATHERV MPI_Gatherv_c
235 # define TIMPI_SCATTER MPI_Scatter_c
236 # define TIMPI_SCATTERV MPI_Scatterv_c
237 # define TIMPI_ALLTOALL MPI_Alltoall_c
238 #else
239  typedef int DispType;
240 # define TIMPI_COUNT_TYPE MPI_INT
241 # define TIMPI_PACK_SIZE MPI_Pack_size
242 # define TIMPI_SEND MPI_Send
243 # define TIMPI_SSEND MPI_Ssend
244 # define TIMPI_ALLREDUCE MPI_Allreduce
245 # define TIMPI_IALLREDUCE MPI_Iallreduce
246 # define TIMPI_ISEND MPI_Isend
247 # define TIMPI_ISSEND MPI_Issend
248 # define TIMPI_PACK MPI_Pack
249 # define TIMPI_UNPACK MPI_Unpack
250 # define TIMPI_RECV MPI_Recv
251 # define TIMPI_IRECV MPI_Irecv
252 # define TIMPI_SENDRECV MPI_Sendrecv
253 # define TIMPI_ALLGATHERV MPI_Allgatherv
254 # define TIMPI_ALLGATHER MPI_Allgather
255 # define TIMPI_BCAST MPI_Bcast
256 # define TIMPI_GATHER MPI_Gather
257 # define TIMPI_GATHERV MPI_Gatherv
258 # define TIMPI_SCATTER MPI_Scatter
259 # define TIMPI_SCATTERV MPI_Scatterv
260 # define TIMPI_ALLTOALL MPI_Alltoall
261 #endif
262 
263 
264 
265 template <typename T, typename A1, typename A2>
266 std::size_t Communicator::packed_size_of(const std::vector<std::vector<T,A1>,A2> & buf,
267  const DataType & type) const
268 {
269  // Figure out how many bytes we need to pack all the data
270  //
271  // Start with the outer buffer size
272  CountType packedsize=0;
273 
274  timpi_call_mpi
275  (TIMPI_PACK_SIZE (1, TIMPI_COUNT_TYPE, this->get(), &packedsize));
276 
277  std::size_t sendsize = packedsize;
278 
279  const std::size_t n_vecs = buf.size();
280 
281  for (std::size_t i = 0; i != n_vecs; ++i)
282  {
283  // The size of the ith inner buffer
284  timpi_call_mpi
285  (TIMPI_PACK_SIZE (1, TIMPI_COUNT_TYPE, this->get(), &packedsize));
286 
287  sendsize += packedsize;
288 
289  // The data for each inner buffer
290  timpi_call_mpi
291  (TIMPI_PACK_SIZE (cast_int<CountType>(buf[i].size()),
292  type, this->get(), &packedsize));
293 
294  sendsize += packedsize;
295  }
296 
297  timpi_assert (sendsize /* should at least be 1! */);
298  return sendsize;
299 }
300 
301 
302 template<typename T>
303 inline void Communicator::send (const unsigned int dest_processor_id,
304  const std::basic_string<T> & buf,
305  const MessageTag & tag) const
306 {
307  TIMPI_LOG_SCOPE("send()", "Parallel");
308 
309  T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());
310 
311  timpi_assert_less(dest_processor_id, this->size());
312 
313  timpi_call_mpi
314  (((this->send_mode() == SYNCHRONOUS) ?
315  TIMPI_SSEND : TIMPI_SEND)
316  (dataptr, cast_int<CountType>(buf.size()),
317  StandardType<T>(dataptr), dest_processor_id, tag.value(),
318  this->get()));
319 }
320 
321 
322 
323 template <typename T>
324 inline void Communicator::send (const unsigned int dest_processor_id,
325  const std::basic_string<T> & buf,
326  Request & req,
327  const MessageTag & tag) const
328 {
329  TIMPI_LOG_SCOPE("send()", "Parallel");
330 
331  T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());
332 
333  timpi_assert_less(dest_processor_id, this->size());
334 
335  timpi_call_mpi
336  (((this->send_mode() == SYNCHRONOUS) ?
337  TIMPI_ISSEND : TIMPI_ISEND)
338  (dataptr, cast_int<CountType>(buf.size()),
339  StandardType<T>(dataptr), dest_processor_id, tag.value(),
340  this->get(), req.get()));
341 
342  // The MessageTag should stay registered for the Request lifetime
344  (new PostWaitDereferenceTag(tag));
345 }
346 
347 
348 
349 template <typename T>
350 inline void Communicator::send (const unsigned int dest_processor_id,
351  const T & buf,
352  const MessageTag & tag) const
353 {
354  TIMPI_LOG_SCOPE("send()", "Parallel");
355 
356  T * dataptr = const_cast<T*> (&buf);
357 
358  timpi_assert_less(dest_processor_id, this->size());
359 
360  timpi_call_mpi
361  (((this->send_mode() == SYNCHRONOUS) ?
362  TIMPI_SSEND : TIMPI_SEND)
363  (dataptr, 1, StandardType<T>(dataptr), dest_processor_id,
364  tag.value(), this->get()));
365 }
366 
367 
368 
369 template <typename T>
370 inline void Communicator::send (const unsigned int dest_processor_id,
371  const T & buf,
372  Request & req,
373  const MessageTag & tag) const
374 {
375  TIMPI_LOG_SCOPE("send()", "Parallel");
376 
377  T * dataptr = const_cast<T*>(&buf);
378 
379  timpi_assert_less(dest_processor_id, this->size());
380 
381  timpi_call_mpi
382  (((this->send_mode() == SYNCHRONOUS) ?
383  TIMPI_ISSEND : TIMPI_ISEND)
384  (dataptr, 1, StandardType<T>(dataptr), dest_processor_id,
385  tag.value(), this->get(), req.get()));
386 
387  // The MessageTag should stay registered for the Request lifetime
389  (new PostWaitDereferenceTag(tag));
390 }
391 
392 
393 
394 template <typename T, typename C, typename A>
395 inline void Communicator::send (const unsigned int dest_processor_id,
396  const std::set<T,C,A> & buf,
397  const MessageTag & tag) const
398 {
399  this->send(dest_processor_id, buf,
400  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
401 }
402 
403 
404 
405 template <typename T, typename C, typename A>
406 inline void Communicator::send (const unsigned int dest_processor_id,
407  const std::set<T,C,A> & buf,
408  Request & req,
409  const MessageTag & tag) const
410 {
411  this->send(dest_processor_id, buf,
412  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
413 }
414 
415 
416 
417 template <typename T, typename C, typename A>
418 inline void Communicator::send (const unsigned int dest_processor_id,
419  const std::set<T,C,A> & buf,
420  const DataType & type,
421  const MessageTag & tag) const
422 {
423  TIMPI_LOG_SCOPE("send()", "Parallel");
424 
425  std::vector<T> vecbuf(buf.begin(), buf.end());
426  this->send(dest_processor_id, vecbuf, type, tag);
427 }
428 
429 
430 
431 template <typename T, typename C, typename A>
432 inline void Communicator::send (const unsigned int dest_processor_id,
433  const std::set<T,C,A> & buf,
434  const DataType & type,
435  Request & req,
436  const MessageTag & tag) const
437 {
438  TIMPI_LOG_SCOPE("send()", "Parallel");
439 
440  // Allocate temporary buffer on the heap so it lives until after
441  // the non-blocking send completes
442  std::vector<T> * vecbuf =
443  new std::vector<T,A>(buf.begin(), buf.end());
444 
445  // Make the Request::wait() handle deleting the buffer
446  req.add_post_wait_work
447  (new PostWaitDeleteBuffer<std::vector<T,A>>(vecbuf));
448 
449  this->send(dest_processor_id, *vecbuf, type, req, tag);
450 }
451 
452 
453 
454 template <typename T, typename A>
455 inline void Communicator::send (const unsigned int dest_processor_id,
456  const std::vector<T,A> & buf,
457  const MessageTag & tag) const
458 {
459  this->send(dest_processor_id, buf,
460  StandardType<T>(buf.empty() ? nullptr : &buf.front()), tag);
461 }
462 
463 
464 
465 template <typename T, typename A,
466  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
467 inline void Communicator::send (const unsigned int dest_processor_id,
468  const std::vector<T,A> & buf,
469  Request & req,
470  const MessageTag & tag) const
471 {
472  this->send(dest_processor_id, buf,
473  StandardType<T>(buf.empty() ? nullptr : &buf.front()), req, tag);
474 }
475 
476 template <typename T, typename A,
477  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
478 inline void Communicator::send (const unsigned int dest_processor_id,
479  const std::vector<T,A> & buf,
480  Request & req,
481  const MessageTag & tag) const
482 {
483  this->nonblocking_send_packed_range(dest_processor_id,
484  (void *)(nullptr),
485  buf.begin(),
486  buf.end(),
487  req,
488  tag);
489 }
490 
491 
492 template <typename T, typename A>
493 inline void Communicator::send (const unsigned int dest_processor_id,
494  const std::vector<T,A> & buf,
495  const DataType & type,
496  const MessageTag & tag) const
497 {
498  TIMPI_LOG_SCOPE("send()", "Parallel");
499 
500  timpi_call_mpi
501  (((this->send_mode() == SYNCHRONOUS) ?
502  TIMPI_SSEND : TIMPI_SEND)
503  (buf.empty() ? nullptr : const_cast<T*>(buf.data()),
504  cast_int<CountType>(buf.size()), type, dest_processor_id,
505  tag.value(), this->get()));
506 }
507 
508 
509 
510 template <typename T, typename A, typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
511 inline void Communicator::send (const unsigned int dest_processor_id,
512  const std::vector<T,A> & buf,
513  const DataType & type,
514  Request & req,
515  const MessageTag & tag) const
516 {
517  TIMPI_LOG_SCOPE("send()", "Parallel");
518 
519  timpi_assert_less(dest_processor_id, this->size());
520 
521  timpi_call_mpi
522  (((this->send_mode() == SYNCHRONOUS) ?
523  TIMPI_ISSEND : TIMPI_ISEND)
524  (buf.empty() ? nullptr : const_cast<T*>(buf.data()),
525  cast_int<CountType>(buf.size()), type, dest_processor_id,
526  tag.value(), this->get(), req.get()));
527 
528  // The MessageTag should stay registered for the Request lifetime
529  req.add_post_wait_work
530  (new PostWaitDereferenceTag(tag));
531 }
532 
533 template <typename T, typename A, typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
534 inline void Communicator::send (const unsigned int dest_processor_id,
535  const std::vector<T,A> & buf,
536  const NotADataType &,
537  Request & req,
538  const MessageTag & tag) const
539 {
540  TIMPI_LOG_SCOPE("send()", "Parallel");
541 
542  timpi_assert_less(dest_processor_id, this->size());
543 
544  this->nonblocking_send_packed_range(dest_processor_id,
545  (void *)(nullptr),
546  buf.begin(),
547  buf.end(),
548  req,
549  tag);
550 }
551 
552 
553 
554 template <typename T, typename A1, typename A2>
555 inline void Communicator::send (const unsigned int dest_processor_id,
556  const std::vector<std::vector<T,A1>,A2> & buf,
557  const MessageTag & tag) const
558 {
559  this->send(dest_processor_id, buf,
560  StandardType<T>((buf.empty() || buf.front().empty()) ?
561  nullptr : &(buf.front().front())), tag);
562 }
563 
564 
565 
566 template <typename T, typename A1, typename A2>
567 inline void Communicator::send (const unsigned int dest_processor_id,
568  const std::vector<std::vector<T,A1>,A2> & buf,
569  Request & req,
570  const MessageTag & tag) const
571 {
572  this->send(dest_processor_id, buf,
573  StandardType<T>((buf.empty() || buf.front().empty()) ?
574  nullptr : &(buf.front().front())), req, tag);
575 }
576 
577 
578 
579 template <typename T, typename A1, typename A2>
580 inline void Communicator::send (const unsigned int dest_processor_id,
581  const std::vector<std::vector<T,A1>,A2> & buf,
582  const DataType & type,
583  const MessageTag & tag) const
584 {
585  // We'll avoid redundant code (at the cost of using heap rather
586  // than stack buffer allocation) by reusing the non-blocking send
587  Request req;
588  this->send(dest_processor_id, buf, type, req, tag);
589  req.wait();
590 }
591 
592 
593 
594 template <typename T, typename A1, typename A2>
595 inline void Communicator::send (const unsigned int dest_processor_id,
596  const std::vector<std::vector<T,A1>,A2> & send_vecs,
597  const DataType & type,
598  Request & req,
599  const MessageTag & tag) const
600 {
601  // figure out how many bytes we need to pack all the data
602  const CountType sendsize =
603  cast_int<CountType>(this->packed_size_of(send_vecs, type));
604 
605  // temporary buffer - this will be sized in bytes
606  // and manipulated with MPI_Pack
607  std::vector<char> * sendbuf = new std::vector<char>(sendsize);
608 
609  // Pack the send buffer
610  CountType pos=0;
611 
612  // ... the size of the outer buffer
613  const std::size_t n_vecs = send_vecs.size();
614  const CountType mpi_n_vecs = cast_int<CountType>(n_vecs);
615 
616  timpi_call_mpi
617  (TIMPI_PACK (&mpi_n_vecs, 1, TIMPI_COUNT_TYPE, sendbuf->data(),
618  sendsize, &pos, this->get()));
619 
620  for (std::size_t i = 0; i != n_vecs; ++i)
621  {
622  // ... the size of the ith inner buffer
623  const CountType subvec_size =
624  cast_int<CountType>(send_vecs[i].size());
625 
626  timpi_call_mpi
627  (TIMPI_PACK (&subvec_size, 1, TIMPI_COUNT_TYPE,
628  sendbuf->data(), sendsize, &pos, this->get()));
629 
630  // ... the contents of the ith inner buffer
631  if (!send_vecs[i].empty())
632  timpi_call_mpi
633  (TIMPI_PACK (const_cast<T*>(send_vecs[i].data()),
634  subvec_size, type, sendbuf->data(), sendsize,
635  &pos, this->get()));
636  }
637 
638  timpi_assert_equal_to (pos, sendsize);
639 
640  req.add_post_wait_work
641  (new PostWaitDeleteBuffer<std::vector<char>> (sendbuf));
642 
643  this->send (dest_processor_id, *sendbuf, MPI_PACKED, req, tag);
644 }
645 
646 
647 template <typename Context, typename Iter>
648 inline void Communicator::send_packed_range (const unsigned int dest_processor_id,
649  const Context * context,
650  Iter range_begin,
651  const Iter range_end,
652  const MessageTag & tag,
653  std::size_t approx_buffer_size) const
654 {
655  // We will serialize variable size objects from *range_begin to
656  // *range_end as a sequence of plain data (e.g. ints) in this buffer
657  typedef typename std::iterator_traits<Iter>::value_type T;
658 
659  std::size_t total_buffer_size =
660  packed_range_size (context, range_begin, range_end);
661 
662  this->send(dest_processor_id, total_buffer_size, tag);
663 
664 #ifdef DEBUG
665  std::size_t used_buffer_size = 0;
666 #endif
667 
668  while (range_begin != range_end)
669  {
670  timpi_assert_greater (std::distance(range_begin, range_end), 0);
671 
672  std::vector<typename Packing<T>::buffer_type> buffer;
673 
674  const Iter next_range_begin = pack_range
675  (context, range_begin, range_end, buffer, approx_buffer_size);
676 
677  timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);
678 
679  range_begin = next_range_begin;
680 
681 #ifdef DEBUG
682  used_buffer_size += buffer.size();
683 #endif
684 
685  // Blocking send of the buffer
686  this->send(dest_processor_id, buffer, tag);
687  }
688 
689 #ifdef DEBUG
690  timpi_assert_equal_to(used_buffer_size, total_buffer_size);
691 #endif
692 }
693 
694 
695 template <typename Context, typename Iter>
696 inline void Communicator::send_packed_range (const unsigned int dest_processor_id,
697  const Context * context,
698  Iter range_begin,
699  const Iter range_end,
700  Request & req,
701  const MessageTag & tag,
702  std::size_t approx_buffer_size) const
703 {
704  // Allocate a buffer on the heap so we don't have to free it until
705  // after the Request::wait()
706  typedef typename std::iterator_traits<Iter>::value_type T;
707  typedef typename Packing<T>::buffer_type buffer_t;
708 
709  std::size_t total_buffer_size =
710  packed_range_size (context, range_begin, range_end);
711 
712  // That local variable will be gone soon; we need a send buffer that
713  // will stick around. I heard you like buffering so I put a buffer
714  // for your buffer size so you can buffer the size of your buffer.
715  std::size_t * total_buffer_size_buffer = new std::size_t;
716  *total_buffer_size_buffer = total_buffer_size;
717 
718  // Delete the buffer size's buffer when we're done
719  Request intermediate_req = request();
720  intermediate_req.add_post_wait_work
721  (new PostWaitDeleteBuffer<std::size_t>(total_buffer_size_buffer));
722  this->send(dest_processor_id, *total_buffer_size_buffer, intermediate_req, tag);
723 
724  // And don't finish up the full request until we're done with its
725  // dependencies
726  req.add_prior_request(intermediate_req);
727 
728 #ifdef DEBUG
729  std::size_t used_buffer_size = 0;
730 #endif
731 
732  while (range_begin != range_end)
733  {
734  timpi_assert_greater (std::distance(range_begin, range_end), 0);
735 
736  std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
737 
738  const Iter next_range_begin = pack_range
739  (context, range_begin, range_end, *buffer, approx_buffer_size);
740 
741  timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);
742 
743  range_begin = next_range_begin;
744 
745 #ifdef DEBUG
746  used_buffer_size += buffer->size();
747 #endif
748 
749  Request next_intermediate_req;
750 
751  Request * my_req = (range_begin == range_end) ? &req : &next_intermediate_req;
752 
753  // Make the Request::wait() handle deleting the buffer
754  my_req->add_post_wait_work
755  (new PostWaitDeleteBuffer<std::vector<buffer_t>>
756  (buffer));
757 
758  // Non-blocking send of the buffer
759  this->send(dest_processor_id, *buffer, *my_req, tag);
760 
761  if (range_begin != range_end)
762  req.add_prior_request(*my_req);
763  }
764 
765 #ifdef DEBUG
766  timpi_assert_equal_to(used_buffer_size, total_buffer_size);
767 #endif
768 }
769 
770 
771 
772 
773 
774 
775 
776 template <typename Context, typename Iter>
777 inline void Communicator::nonblocking_send_packed_range (const unsigned int dest_processor_id,
778  const Context * context,
779  Iter range_begin,
780  const Iter range_end,
781  Request & req,
782  const MessageTag & tag) const
783 {
784  // Allocate a buffer on the heap so we don't have to free it until
785  // after the Request::wait()
786  typedef typename std::iterator_traits<Iter>::value_type T;
787  typedef typename Packing<T>::buffer_type buffer_t;
788 
789  if (range_begin != range_end)
790  {
791  std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
792 
793  range_begin =
794  pack_range(context,
795  range_begin,
796  range_end,
797  *buffer,
798  // MPI-2/3 can only use signed integers for size,
799  // and with this API we need to fit a non-blocking
800  // send into one buffer
801  std::numeric_limits<CountType>::max());
802 
803  if (range_begin != range_end)
804  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
805 
806  // Make the Request::wait() handle deleting the buffer
808  (new PostWaitDeleteBuffer<std::vector<buffer_t>>
809  (buffer));
810 
811  // Non-blocking send of the buffer
812  this->send(dest_processor_id, *buffer, req, tag);
813  }
814 }
815 
816 
817 template <typename T>
818 inline Status Communicator::receive (const unsigned int src_processor_id,
819  std::basic_string<T> & buf,
820  const MessageTag & tag) const
821 {
822  std::vector<T> tempbuf; // Officially C++ won't let us get a
823  // modifiable array from a string
824 
825  Status stat = this->receive(src_processor_id, tempbuf, tag);
826  buf.assign(tempbuf.begin(), tempbuf.end());
827  return stat;
828 }
829 
830 
831 
832 template <typename T>
833 inline void Communicator::receive (const unsigned int src_processor_id,
834  std::basic_string<T> & buf,
835  Request & req,
836  const MessageTag & tag) const
837 {
838  // Officially C++ won't let us get a modifiable array from a
839  // string, and we can't even put one on the stack for the
840  // non-blocking case.
841  std::vector<T> * tempbuf = new std::vector<T>(buf.size());
842 
843  // We can clear the string, but the Request::wait() will need to
844  // handle copying our temporary buffer to it
845  buf.clear();
846 
848  (new PostWaitCopyBuffer<std::vector<T>,
849  std::back_insert_iterator<std::basic_string<T>>>
850  (*tempbuf, std::back_inserter(buf)));
851 
852  // Make the Request::wait() then handle deleting the buffer
854  (new PostWaitDeleteBuffer<std::vector<T>>(tempbuf));
855 
856  this->receive(src_processor_id, *tempbuf, req, tag);
857 }
858 
859 
860 
861 template <typename T>
862 inline Status Communicator::receive (const unsigned int src_processor_id,
863  T & buf,
864  const MessageTag & tag) const
865 {
866  TIMPI_LOG_SCOPE("receive()", "Parallel");
867 
868  // Get the status of the message, explicitly provide the
869  // datatype so we can later query the size
870  Status stat(this->probe(src_processor_id, tag), StandardType<T>(&buf));
871 
872  timpi_assert(src_processor_id < this->size() ||
873  src_processor_id == any_source);
874 
875  timpi_call_mpi
876  (TIMPI_RECV (&buf, 1, StandardType<T>(&buf), src_processor_id,
877  tag.value(), this->get(), stat.get()));
878 
879  return stat;
880 }
881 
882 
883 
884 template <typename T>
885 inline void Communicator::receive (const unsigned int src_processor_id,
886  T & buf,
887  Request & req,
888  const MessageTag & tag) const
889 {
890  TIMPI_LOG_SCOPE("receive()", "Parallel");
891 
892  timpi_assert(src_processor_id < this->size() ||
893  src_processor_id == any_source);
894 
895  timpi_call_mpi
896  (TIMPI_IRECV (&buf, 1, StandardType<T>(&buf), src_processor_id,
897  tag.value(), this->get(), req.get()));
898 
899  // The MessageTag should stay registered for the Request lifetime
901  (new PostWaitDereferenceTag(tag));
902 }
903 
904 
905 
906 template <typename T, typename C, typename A>
907 inline Status Communicator::receive (const unsigned int src_processor_id,
908  std::set<T,C,A> & buf,
909  const MessageTag & tag) const
910 {
911  return this->receive
912  (src_processor_id, buf,
913  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
914 }
915 
916 
917 
918 /*
919  * No non-blocking receives of std::set until we figure out how to
920  * resize the temporary buffer
921  */
922 #if 0
923 template <typename T, typename C, typename A>
924 inline void Communicator::receive (const unsigned int src_processor_id,
925  std::set<T,C,A> & buf,
926  Request & req,
927  const MessageTag & tag) const
928 {
929  this->receive (src_processor_id, buf,
930  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
931 }
932 #endif // 0
933 
934 
935 
936 template <typename T, typename C, typename A>
937 inline Status Communicator::receive (const unsigned int src_processor_id,
938  std::set<T,C,A> & buf,
939  const DataType & type,
940  const MessageTag & tag) const
941 {
942  TIMPI_LOG_SCOPE("receive()", "Parallel");
943 
944  std::vector<T> vecbuf;
945  Status stat = this->receive(src_processor_id, vecbuf, type, tag);
946  buf.clear();
947  buf.insert(vecbuf.begin(), vecbuf.end());
948 
949  return stat;
950 }
951 
952 
953 
954 /*
955  * No non-blocking receives of std::set until we figure out how to
956  * resize the temporary buffer
957  */
958 #if 0
959 template <typename T, typename C, typename A>
960 inline void Communicator::receive (const unsigned int src_processor_id,
961  std::set<T,C,A> & buf,
962  const DataType & type,
963  Request & req,
964  const MessageTag & tag) const
965 {
966  TIMPI_LOG_SCOPE("receive()", "Parallel");
967 
968  // Allocate temporary buffer on the heap so it lives until after
969  // the non-blocking send completes
970  std::vector<T> * vecbuf = new std::vector<T>();
971 
972  // We can clear the set, but the Request::wait() will need to
973  // handle copying our temporary buffer to it
974  buf.clear();
975 
976  req.add_post_wait_work
977  (new PostWaitCopyBuffer<std::vector<T>,
978  std::insert_iterator<std::set<T,C,A>>>
979  (*vecbuf, std::inserter(buf,buf.end())));
980 
981  // Make the Request::wait() then handle deleting the buffer
982  req.add_post_wait_work
983  (new PostWaitDeleteBuffer<std::vector<T>>(vecbuf));
984 
985  this->receive(src_processor_id, *vecbuf, type, req, tag);
986 }
987 #endif // 0
988 
989 
990 
991 template <typename T, typename A>
992 inline Status Communicator::receive (const unsigned int src_processor_id,
993  std::vector<T,A> & buf,
994  const MessageTag & tag) const
995 {
996  return this->receive
997  (src_processor_id, buf,
998  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
999 }
1000 
1001 
1002 
1003 template <typename T, typename A>
1004 inline void Communicator::receive (const unsigned int src_processor_id,
1005  std::vector<T,A> & buf,
1006  Request & req,
1007  const MessageTag & tag) const
1008 {
1009  this->receive (src_processor_id, buf,
1010  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
1011 }
1012 
1013 
1014 
1015 template <typename T, typename A>
1016 inline Status Communicator::receive (const unsigned int src_processor_id,
1017  std::vector<T,A> & buf,
1018  const DataType & type,
1019  const MessageTag & tag) const
1020 {
1021  TIMPI_LOG_SCOPE("receive()", "Parallel");
1022 
1023  // Get the status of the message, explicitly provide the
1024  // datatype so we can later query the size
1025  Status stat(this->probe(src_processor_id, tag), type);
1026 
1027  buf.resize(stat.size());
1028 
1029  timpi_assert(src_processor_id < this->size() ||
1030  src_processor_id == any_source);
1031 
1032  // Use stat.source() and stat.tag() in the receive - if
1033  // src_processor_id is or tag is "any" then we want to be sure we
1034  // try to receive the same message we just probed.
1035  timpi_call_mpi
1036  (TIMPI_RECV (buf.empty() ? nullptr : buf.data(),
1037  cast_int<CountType>(buf.size()), type, stat.source(),
1038  stat.tag(), this->get(), stat.get()));
1039 
1040  timpi_assert_equal_to (cast_int<std::size_t>(stat.size()),
1041  buf.size());
1042 
1043  return stat;
1044 }
1045 
1046 
1047 
1048 template <typename T, typename A,
1049  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1050 Status Communicator::receive (const unsigned int src_processor_id,
1051  std::vector<T,A> & buf,
1052  const DataType & type,
1053  const MessageTag & tag) const
1054 {
1055  bool flag = false;
1056  Status stat;
1057  while (!flag)
1058  stat = this->packed_range_probe<T>(src_processor_id, tag, flag);
1059 
1060  Request req;
1061  this->nonblocking_receive_packed_range(src_processor_id, (void *)(nullptr),
1062  std::inserter(buf, buf.end()),
1063  type, req, stat, tag);
1064  req.wait();
1065 
1066  return stat;
1067 }
1068 
1069 
1070 template <typename T, typename A,
1071  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1072 Status Communicator::receive (const unsigned int src_processor_id,
1073  std::vector<T,A> & buf,
1074  const NotADataType &,
1075  const MessageTag & tag) const
1076 {
1077  bool flag = false;
1078  Status stat;
1079  while (!flag)
1080  stat = this->packed_range_probe<T>(src_processor_id, tag, flag);
1081 
1082  Request req;
1083  this->nonblocking_receive_packed_range(src_processor_id, (void *)(nullptr),
1084  std::inserter(buf, buf.end()),
1085  buf.data(), req, stat, tag);
1086  req.wait();
1087 
1088  return stat;
1089 }
1090 
1091 
1092 template <typename T, typename A>
1093 inline void Communicator::receive (const unsigned int src_processor_id,
1094  std::vector<T,A> & buf,
1095  const DataType & type,
1096  Request & req,
1097  const MessageTag & tag) const
1098 {
1099  TIMPI_LOG_SCOPE("receive()", "Parallel");
1100 
1101  timpi_assert(src_processor_id < this->size() ||
1102  src_processor_id == any_source);
1103 
1104  timpi_call_mpi
1105  (TIMPI_IRECV(buf.empty() ? nullptr : buf.data(),
1106  cast_int<CountType>(buf.size()), type, src_processor_id,
1107  tag.value(), this->get(), req.get()));
1108 
1109  // The MessageTag should stay registered for the Request lifetime
1110  req.add_post_wait_work
1111  (new PostWaitDereferenceTag(tag));
1112 }
1113 
1114 
1115 
1116 template <typename T, typename A1, typename A2>
1117 inline Status Communicator::receive (const unsigned int src_processor_id,
1118  std::vector<std::vector<T,A1>,A2> & buf,
1119  const MessageTag & tag) const
1120 {
1121  return this->receive
1122  (src_processor_id, buf,
1123  StandardType<T>((buf.empty() || buf.front().empty()) ?
1124  nullptr : &(buf.front().front())), tag);
1125 }
1126 
1127 
1128 
1129 template <typename T, typename A1, typename A2>
1130 inline void Communicator::receive (const unsigned int src_processor_id,
1131  std::vector<std::vector<T,A1>,A2> & buf,
1132  Request & req,
1133  const MessageTag & tag) const
1134 {
1135  this->receive (src_processor_id, buf,
1136  StandardType<T>((buf.empty() || buf.front().empty()) ?
1137  nullptr : &(buf.front().front())), req, tag);
1138 }
1139 
1140 
1141 
1142 template <typename T, typename A1, typename A2>
1143 inline Status Communicator::receive (const unsigned int src_processor_id,
1144  std::vector<std::vector<T,A1>,A2> & recv,
1145  const DataType & type,
1146  const MessageTag & tag) const
1147 {
1148  // temporary buffer - this will be sized in bytes
1149  // and manipulated with MPI_Unpack
1150  std::vector<char> recvbuf;
1151 
1152  Status stat = this->receive (src_processor_id, recvbuf, MPI_PACKED, tag);
1153 
1154  // We should at least have one header datum, for outer vector size
1155  timpi_assert (!recvbuf.empty());
1156 
1157  // Unpack the received buffer
1158  CountType bufsize = cast_int<CountType>(recvbuf.size());
1159  CountType recvsize, pos=0;
1160  timpi_call_mpi
1161  (TIMPI_UNPACK(recvbuf.data(), bufsize, &pos, &recvsize, 1,
1162  TIMPI_COUNT_TYPE, this->get()));
1163 
1164  // ... size the outer buffer
1165  recv.resize (recvsize);
1166 
1167  const std::size_t n_vecs = recvsize;
1168  for (std::size_t i = 0; i != n_vecs; ++i)
1169  {
1170  CountType subvec_size;
1171 
1172  timpi_call_mpi
1173  (TIMPI_UNPACK (recvbuf.data(), bufsize, &pos, &subvec_size, 1,
1174  TIMPI_COUNT_TYPE, this->get()));
1175 
1176  // ... size the inner buffer
1177  recv[i].resize (subvec_size);
1178 
1179  // ... unpack the inner buffer if it is not empty
1180  if (!recv[i].empty())
1181  timpi_call_mpi
1182  (TIMPI_UNPACK(recvbuf.data(), bufsize, &pos, recv[i].data(),
1183  subvec_size, type, this->get()));
1184  }
1185 
1186  return stat;
1187 }
1188 
1189 
1190 
1191 template <typename T, typename A1, typename A2>
1192 inline void Communicator::receive (const unsigned int src_processor_id,
1193  std::vector<std::vector<T,A1>,A2> & buf,
1194  const DataType & type,
1195  Request & req,
1196  const MessageTag & tag) const
1197 {
1198  // figure out how many bytes we need to receive all the data into
1199  // our properly pre-sized buf
1200  const CountType sendsize =
1201  cast_int<CountType>(this->packed_size_of(buf, type));
1202 
1203  // temporary buffer - this will be sized in bytes
1204  // and manipulated with MPI_Unpack
1205  std::vector<char> * recvbuf = new std::vector<char>(sendsize);
1206 
1207  // Get ready to receive the temporary buffer
1208  this->receive (src_processor_id, *recvbuf, MPI_PACKED, req, tag);
1209 
1210  // When we wait on the receive, we'll unpack the temporary buffer
1211  req.add_post_wait_work
1212  (new PostWaitUnpackNestedBuffer<std::vector<std::vector<T,A1>,A2>>
1213  (*recvbuf, buf, type, *this));
1214 
1215  // And then we'll free the temporary buffer
1216  req.add_post_wait_work
1217  (new PostWaitDeleteBuffer<std::vector<char>>(recvbuf));
1218 
1219  // The MessageTag should stay registered for the Request lifetime
1220  req.add_post_wait_work
1221  (new PostWaitDereferenceTag(tag));
1222 }
1223 
1224 
1225 template <typename Context, typename OutputIter, typename T>
1226 inline void Communicator::receive_packed_range (const unsigned int src_processor_id,
1227  Context * context,
1228  OutputIter out_iter,
1229  const T * output_type,
1230  const MessageTag & tag) const
1231 {
1232  typedef typename Packing<T>::buffer_type buffer_t;
1233 
1234  // Receive serialized variable size objects as sequences of buffer_t
1235  std::size_t total_buffer_size = 0;
1236  Status stat = this->receive(src_processor_id, total_buffer_size, tag);
1237 
1238  // Use stat.source() and stat.tag() in subsequent receives - if
1239  // src_processor_id is or tag is "any" then we want to be sure we
1240  // try to receive messages all corresponding to the same send.
1241 
1242  std::size_t received_buffer_size = 0;
1243 
1244  // OutputIter might not have operator= implemented; for maximum
1245  // compatibility we'll rely on its copy constructor.
1246  std::unique_ptr<OutputIter> next_out_iter =
1247  std::make_unique<OutputIter>(out_iter);
1248 
1249  while (received_buffer_size < total_buffer_size)
1250  {
1251  std::vector<buffer_t> buffer;
1252  this->receive(stat.source(), buffer, MessageTag(stat.tag()));
1253  received_buffer_size += buffer.size();
1254  auto return_out_iter = unpack_range
1255  (buffer, context, *next_out_iter, output_type);
1256  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
1257  }
1258 }
1259 
1260 
1261 
1262 // template <typename Context, typename OutputIter>
1263 // inline void Communicator::receive_packed_range (const unsigned int src_processor_id,
1264 // Context * context,
1265 // OutputIter out_iter,
1266 // Request & req,
1267 // const MessageTag & tag) const
1268 // {
1269 // typedef typename std::iterator_traits<OutputIter>::value_type T;
1270 // typedef typename Packing<T>::buffer_type buffer_t;
1271 //
1272 // // Receive serialized variable size objects as a sequence of
1273 // // buffer_t.
1274 // // Allocate a buffer on the heap so we don't have to free it until
1275 // // after the Request::wait()
1276 // std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
1277 // this->receive(src_processor_id, *buffer, req, tag);
1278 //
1279 // // Make the Request::wait() handle unpacking the buffer
1280 // req.add_post_wait_work
1281 // (new PostWaitUnpackBuffer<std::vector<buffer_t>, Context, OutputIter>
1282 // (buffer, context, out_iter));
1283 //
1284 // // Make the Request::wait() then handle deleting the buffer
1285 // req.add_post_wait_work
1286 // (new PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
1287 // }
1288 
1289 template <typename Context, typename OutputIter, typename T>
1290 inline void Communicator::nonblocking_receive_packed_range (const unsigned int src_processor_id,
1291  Context * context,
1292  OutputIter out,
1293  const T * /* output_type */,
1294  Request & req,
1295  Status & stat,
1296  const MessageTag & tag) const
1297 {
1298  typedef typename Packing<T>::buffer_type buffer_t;
1299 
1300  // Receive serialized variable size objects as a sequence of
1301  // buffer_t.
1302  // Allocate a buffer on the heap so we don't have to free it until
1303  // after the Request::wait()
1304  std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
1305  this->receive(src_processor_id, *buffer, req, tag);
1306 
1307  // Make the Request::wait() handle unpacking the buffer
1308  req.add_post_wait_work
1309  (new PostWaitUnpackBuffer<std::vector<buffer_t>, Context, OutputIter, T>(*buffer, context, out));
1310 
1311  // Make the Request::wait() then handle deleting the buffer
1312  req.add_post_wait_work
1313  (new PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
1314 
1315  // The MessageTag should stay registered for the Request lifetime
1316  req.add_post_wait_work
1317  (new PostWaitDereferenceTag(tag));
1318 }
1319 
1320 
1321 
1322 template <typename T1, typename T2, typename A1, typename A2>
1323 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1324  const std::vector<T1,A1> & sendvec,
1325  const DataType & type1,
1326  const unsigned int source_processor_id,
1327  std::vector<T2,A2> & recv,
1328  const DataType & type2,
1329  const MessageTag & send_tag,
1330  const MessageTag & recv_tag) const
1331 {
1332  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1333 
1334  if (dest_processor_id == this->rank() &&
1335  source_processor_id == this->rank())
1336  {
1337  recv = sendvec;
1338  return;
1339  }
1340 
1341  Request req;
1342 
1343  this->send (dest_processor_id, sendvec, type1, req, send_tag);
1344 
1345  this->receive (source_processor_id, recv, type2, recv_tag);
1346 
1347  req.wait();
1348 }
1349 
1350 
1351 template <typename T1, typename T2, typename A1, typename A2,
1352  typename std::enable_if<Has_buffer_type<Packing<T1>>::value &&
1353  Has_buffer_type<Packing<T2>>::value, int>::type>
1354 inline
1355 void
1356 Communicator::send_receive(const unsigned int dest_processor_id,
1357  const std::vector<T1,A1> & send_data,
1358  const unsigned int source_processor_id,
1359  std::vector<T2,A2> &recv_data,
1360  const MessageTag &send_tag,
1361  const MessageTag &recv_tag) const
1362 {
1363  this->send_receive_packed_range(dest_processor_id, (void *)(nullptr),
1364  send_data.begin(), send_data.end(),
1365  source_processor_id, (void *)(nullptr),
1366  std::back_inserter(recv_data),
1367  (const T2 *)(nullptr),
1368  send_tag, recv_tag);
1369 }
1370 
1371 
1372 template <typename T, typename A,
1373  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1374 inline
1375 void
1376 Communicator::send_receive(const unsigned int dest_processor_id,
1377  const std::vector<T,A> & send_data,
1378  const unsigned int source_processor_id,
1379  std::vector<T,A> &recv_data,
1380  const MessageTag &send_tag,
1381  const MessageTag &recv_tag) const
1382 {
1383  this->send_receive_packed_range(dest_processor_id, (void *)(nullptr),
1384  send_data.begin(), send_data.end(),
1385  source_processor_id, (void *)(nullptr),
1386  std::back_inserter(recv_data),
1387  (const T *)(nullptr),
1388  send_tag, recv_tag);
1389 }
1390 
1391 
1392 
1393 template <typename T1, typename T2,
1394  typename std::enable_if<std::is_base_of<DataType, StandardType<T1>>::value &&
1395  std::is_base_of<DataType, StandardType<T2>>::value,
1396  int>::type>
1397 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1398  const T1 & sendvec,
1399  const unsigned int source_processor_id,
1400  T2 & recv,
1401  const MessageTag & send_tag,
1402  const MessageTag & recv_tag) const
1403 {
1404  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1405 
1406  if (dest_processor_id == this->rank() &&
1407  source_processor_id == this->rank())
1408  {
1409  recv = sendvec;
1410  return;
1411  }
1412 
1413  timpi_assert_less(dest_processor_id, this->size());
1414  timpi_assert(source_processor_id < this->size() ||
1415  source_processor_id == any_source);
1416 
1417  // MPI_STATUS_IGNORE is from MPI-2; using it with some versions of
1418  // MPICH may cause a crash:
1419  // https://bugzilla.mcs.anl.gov/globus/show_bug.cgi?id=1798
1420  timpi_call_mpi
1421  (TIMPI_SENDRECV(const_cast<T1*>(&sendvec), 1, StandardType<T1>(&sendvec),
1422  dest_processor_id, send_tag.value(), &recv, 1,
1423  StandardType<T2>(&recv), source_processor_id,
1424  recv_tag.value(), this->get(), MPI_STATUS_IGNORE));
1425 }
1426 
1427 
1428 
1429 // This is both a declaration and definition for a new overloaded
1430 // function template, so we have to re-specify the default
1431 // arguments.
1432 //
1433 // We specialize on the T1==T2 case so that we can handle
1434 // send_receive-to-self with a plain copy rather than going through
1435 // MPI.
1436 template <typename T, typename A,
1437  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value,
1438  int>::type>
1439 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1440  const std::vector<T,A> & sendvec,
1441  const unsigned int source_processor_id,
1442  std::vector<T,A> & recv,
1443  const MessageTag & send_tag,
1444  const MessageTag & recv_tag) const
1445 {
1446  if (dest_processor_id == this->rank() &&
1447  source_processor_id == this->rank())
1448  {
1449  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1450  recv = sendvec;
1451  return;
1452  }
1453 
1454  const T* example = sendvec.empty() ?
1455  (recv.empty() ? nullptr : recv.data()) : sendvec.data();
1456 
1457  // Call the user-defined type version with automatic
1458  // type conversion based on template argument:
1459  this->send_receive (dest_processor_id, sendvec,
1460  StandardType<T>(example),
1461  source_processor_id, recv,
1462  StandardType<T>(example),
1463  send_tag, recv_tag);
1464 }
1465 
1466 
1467 template <typename T1, typename T2, typename A1, typename A2,
1468  typename std::enable_if<std::is_base_of<DataType, StandardType<T1>>::value &&
1469  std::is_base_of<DataType, StandardType<T2>>::value,
1470  int>::type>
1471 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1472  const std::vector<T1,A1> & sendvec,
1473  const unsigned int source_processor_id,
1474  std::vector<T2,A2> & recv,
1475  const MessageTag & send_tag,
1476  const MessageTag & recv_tag) const
1477 {
1478  // Call the user-defined type version with automatic
1479  // type conversion based on template argument:
1480  this->send_receive (dest_processor_id, sendvec,
1481  StandardType<T1>(sendvec.empty() ? nullptr : sendvec.data()),
1482  source_processor_id, recv,
1483  StandardType<T2>(recv.empty() ? nullptr : recv.data()),
1484  send_tag, recv_tag);
1485 }
1486 
1487 
1488 
1489 
1490 template <typename T1, typename T2, typename A1, typename A2, typename A3, typename A4>
1491 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1492  const std::vector<std::vector<T1,A1>,A2> & sendvec,
1493  const unsigned int source_processor_id,
1494  std::vector<std::vector<T2,A3>,A4> & recv,
1495  const MessageTag & /* send_tag */,
1496  const MessageTag & /* recv_tag */) const
1497 {
1498  // FIXME - why aren't we honoring send_tag and recv_tag here?
1499  send_receive_vec_of_vec
1500  (dest_processor_id, sendvec, source_processor_id, recv,
1501  no_tag, any_tag, *this);
1502 }
1503 
1504 
1505 
1506 // This is both a declaration and definition for a new overloaded
1507 // function template, so we have to re-specify the default arguments
1508 template <typename T, typename A1, typename A2>
1509 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1510  const std::vector<std::vector<T,A1>,A2> & sendvec,
1511  const unsigned int source_processor_id,
1512  std::vector<std::vector<T,A1>,A2> & recv,
1513  const MessageTag & send_tag,
1514  const MessageTag & recv_tag) const
1515 {
1516  send_receive_vec_of_vec
1517  (dest_processor_id, sendvec, source_processor_id, recv,
1518  send_tag, recv_tag, *this);
1519 }
1520 
1521 
1522 
1523 
1524 template <typename Context1, typename RangeIter, typename Context2,
1525  typename OutputIter, typename T>
1526 inline void
1527 Communicator::send_receive_packed_range (const unsigned int dest_processor_id,
1528  const Context1 * context1,
1529  RangeIter send_begin,
1530  const RangeIter send_end,
1531  const unsigned int source_processor_id,
1532  Context2 * context2,
1533  OutputIter out_iter,
1534  const T * output_type,
1535  const MessageTag & send_tag,
1536  const MessageTag & recv_tag,
1537  std::size_t approx_buffer_size) const
1538 {
1539  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1540 
1541  timpi_assert_equal_to
1542  ((dest_processor_id == this->rank()),
1543  (source_processor_id == this->rank()));
1544 
1545  if (dest_processor_id == this->rank() &&
1546  source_processor_id == this->rank())
1547  {
1548  // We need to pack and unpack, even if we don't need to
1549  // communicate the buffer, just in case user Packing
1550  // specializations have side effects
1551 
1552  // OutputIter might not have operator= implemented; for maximum
1553  // compatibility we'll rely on its copy constructor.
1554  std::unique_ptr<OutputIter> next_out_iter =
1555  std::make_unique<OutputIter>(out_iter);
1556 
1557  typedef typename Packing<T>::buffer_type buffer_t;
1558  while (send_begin != send_end)
1559  {
1560  std::vector<buffer_t> buffer;
1561  send_begin = pack_range
1562  (context1, send_begin, send_end, buffer, approx_buffer_size);
1563  auto return_out_iter = unpack_range
1564  (buffer, context2, *next_out_iter, output_type);
1565  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
1566  }
1567  return;
1568  }
1569 
1570  Request req;
1571 
1572  this->send_packed_range (dest_processor_id, context1, send_begin, send_end,
1573  req, send_tag, approx_buffer_size);
1574 
1575  this->receive_packed_range (source_processor_id, context2, out_iter,
1576  output_type, recv_tag);
1577 
1578  req.wait();
1579 }
1580 
1581 
1582 
1583 template <typename Context, typename Iter>
1584 inline void Communicator::nonblocking_send_packed_range (const unsigned int dest_processor_id,
1585  const Context * context,
1586  Iter range_begin,
1587  const Iter range_end,
1588  Request & req,
1589  std::shared_ptr<std::vector<typename Packing<typename std::iterator_traits<Iter>::value_type>::buffer_type>> & buffer,
1590  const MessageTag & tag) const
1591 {
1592  // Allocate a buffer on the heap so we don't have to free it until
1593  // after the Request::wait()
1594  typedef typename std::iterator_traits<Iter>::value_type T;
1595  typedef typename Packing<T>::buffer_type buffer_t;
1596 
1597  if (range_begin != range_end)
1598  {
1599  if (buffer == nullptr)
1600  buffer = std::make_shared<std::vector<buffer_t>>();
1601  else
1602  buffer->clear();
1603 
1604  range_begin =
1605  pack_range(context,
1606  range_begin,
1607  range_end,
1608  *buffer,
1609  // MPI-2/3 can only use signed integers for size,
1610  // and with this API we need to fit a non-blocking
1611  // send into one buffer
1612  std::numeric_limits<CountType>::max());
1613 
1614  if (range_begin != range_end)
1615  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
1616 
1617  // Make it dereference the shared pointer (possibly freeing the buffer)
1618  req.add_post_wait_work
1619  (new PostWaitDereferenceSharedPtr<std::vector<buffer_t>>(buffer));
1620 
1621  // Non-blocking send of the buffer
1622  this->send(dest_processor_id, *buffer, req, tag);
1623  }
1624 }
1625 
1626 
1627 
1628 template <typename T, typename A>
1629 inline void Communicator::allgather(const std::basic_string<T> & sendval,
1630  std::vector<std::basic_string<T>,A> & recv,
1631  const bool identical_buffer_sizes) const
1632 {
1633  TIMPI_LOG_SCOPE ("allgather()","Parallel");
1634 
1635  timpi_assert(this->size());
1636  recv.assign(this->size(), "");
1637 
1638  // serial case
1639  if (this->size() < 2)
1640  {
1641  recv.resize(1);
1642  recv[0] = sendval;
1643  return;
1644  }
1645 
1646  std::vector<CountType>
1647  sendlengths (this->size(), 0);
1648  std::vector<DispType>
1649  displacements(this->size(), 0);
1650 
1651  const CountType mysize = cast_int<CountType>(sendval.size());
1652 
1653  if (identical_buffer_sizes)
1654  sendlengths.assign(this->size(), mysize);
1655  else
1656  // first comm step to determine buffer sizes from all processors
1657  this->allgather(mysize, sendlengths);
1658 
1659  // Find the total size of the final array and
1660  // set up the displacement offsets for each processor
1661  CountType globalsize = 0;
1662  for (unsigned int i=0; i != this->size(); ++i)
1663  {
1664  displacements[i] = globalsize;
1665  globalsize += sendlengths[i];
1666  }
1667 
1668  // Check for quick return
1669  if (globalsize == 0)
1670  return;
1671 
1672  // monolithic receive buffer
1673  std::basic_string<T> r(globalsize, 0);
1674 
1675  // and get the data from the remote processors.
1676  timpi_call_mpi
1677  (TIMPI_ALLGATHERV(const_cast<T*>(mysize ? sendval.data() : nullptr),
1678  mysize, StandardType<T>(),
1679  &r[0], sendlengths.data(), displacements.data(),
1680  StandardType<T>(), this->get()));
1681 
1682  // slice receive buffer up
1683  for (unsigned int i=0; i != this->size(); ++i)
1684  recv[i] = r.substr(displacements[i], sendlengths[i]);
1685 }
1686 
1687 
1688 
1689 inline void Communicator::broadcast (bool & data,
1690  const unsigned int root_id,
1691  const bool /* identical_sizes */) const
1692 {
1693  if (this->size() == 1)
1694  {
1695  timpi_assert (!this->rank());
1696  timpi_assert (!root_id);
1697  return;
1698  }
1699 
1700  timpi_assert_less (root_id, this->size());
1701 
1702  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1703 
1704  // We don't want to depend on MPI-2 or C++ MPI, so we don't have
1705  // MPI::BOOL available
1706  char char_data = data;
1707 
1708  timpi_assert_less(root_id, this->size());
1709 
1710  // Spread data to remote processors.
1711  timpi_call_mpi
1712  (TIMPI_BCAST (&char_data, 1, StandardType<char>(&char_data),
1713  root_id, this->get()));
1714 
1715  data = char_data;
1716 }
1717 
1718 
1719 template <typename T>
1720 inline void Communicator::broadcast (std::basic_string<T> & data,
1721  const unsigned int root_id,
1722  const bool identical_sizes) const
1723 {
1724  if (this->size() == 1)
1725  {
1726  timpi_assert (!this->rank());
1727  timpi_assert (!root_id);
1728  return;
1729  }
1730 
1731  timpi_assert_less (root_id, this->size());
1732  timpi_assert (this->verify(identical_sizes));
1733 
1734  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1735 
1736  std::size_t data_size = data.size();
1737 
1738  if (identical_sizes)
1739  timpi_assert(this->verify(data_size));
1740  else
1741  this->broadcast(data_size, root_id);
1742 
1743  std::vector<T> data_c(data_size);
1744 #ifndef NDEBUG
1745  std::basic_string<T> orig(data);
1746 #endif
1747 
1748  if (this->rank() == root_id)
1749  for (std::size_t i=0; i<data.size(); i++)
1750  data_c[i] = data[i];
1751 
1752  this->broadcast (data_c, root_id, StandardType<T>::is_fixed_type);
1753 
1754  data.assign(data_c.begin(), data_c.end());
1755 
1756 #ifndef NDEBUG
1757  if (this->rank() == root_id)
1758  timpi_assert_equal_to (data, orig);
1759 #endif
1760 }
1761 
1762 
1763 template <typename T, typename A>
1764 inline void Communicator::broadcast (std::vector<std::basic_string<T>,A> & data,
1765  const unsigned int root_id,
1766  const bool identical_sizes) const
1767 {
1768  if (this->size() == 1)
1769  {
1770  timpi_assert (!this->rank());
1771  timpi_assert (!root_id);
1772  return;
1773  }
1774 
1775  timpi_assert_less (root_id, this->size());
1776  timpi_assert (this->verify(identical_sizes));
1777 
1778  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1779 
1780  std::size_t bufsize=0;
1781  if (root_id == this->rank() || identical_sizes)
1782  {
1783  for (std::size_t i=0; i<data.size(); ++i)
1784  bufsize += data[i].size() + 1; // Add one for the string length word
1785  }
1786 
1787  if (identical_sizes)
1788  timpi_assert(this->verify(bufsize));
1789  else
1790  this->broadcast(bufsize, root_id);
1791 
1792  // Here we use unsigned int to store up to 32-bit characters
1793  std::vector<unsigned int> temp; temp.reserve(bufsize);
1794  // Pack the strings
1795  if (root_id == this->rank())
1796  {
1797  for (std::size_t i=0; i<data.size(); ++i)
1798  {
1799  temp.push_back(cast_int<unsigned int>(data[i].size()));
1800  for (std::size_t j=0; j != data[i].size(); ++j)
1805  temp.push_back(data[i][j]);
1806  }
1807  }
1808  else
1809  temp.resize(bufsize);
1810 
1811  // broad cast the packed strings
1812  this->broadcast(temp, root_id, true);
1813 
1814  // Unpack the strings
1815  if (root_id != this->rank())
1816  {
1817  data.clear();
1818  typename std::vector<unsigned int>::const_iterator iter = temp.begin();
1819  while (iter != temp.end())
1820  {
1821  std::size_t curr_len = *iter++;
1822  data.push_back(std::basic_string<T>(iter, iter+curr_len));
1823  iter += curr_len;
1824  }
1825  }
1826 }
1827 
1828 
1829 
1830 template <typename T, typename A1, typename A2>
1831 inline void Communicator::broadcast (std::vector<std::vector<T,A1>,A2> & data,
1832  const unsigned int root_id,
1833  const bool identical_sizes) const
1834 {
1835  if (this->size() == 1)
1836  {
1837  timpi_assert (!this->rank());
1838  timpi_assert (!root_id);
1839  return;
1840  }
1841 
1842  timpi_assert_less (root_id, this->size());
1843  timpi_assert (this->verify(identical_sizes));
1844 
1845  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1846 
1847  std::size_t size_sizes = data.size();
1848  if (identical_sizes)
1849  timpi_assert(this->verify(size_sizes));
1850  else
1851  this->broadcast(size_sizes, root_id);
1852  std::vector<std::size_t> sizes(size_sizes);
1853 
1854  if (root_id == this->rank() || identical_sizes)
1855  for (std::size_t i=0; i<size_sizes; ++i)
1856  sizes[i] = data[i].size();
1857 
1858  if (identical_sizes)
1859  timpi_assert(this->verify(sizes));
1860  else
1861  this->broadcast(sizes, root_id);
1862 
1863  std::size_t bufsize = 0;
1864  for (std::size_t i=0; i<size_sizes; ++i)
1865  bufsize += sizes[i];
1866 
1867  std::vector<T> temp; temp.reserve(bufsize);
1868  // Pack the vectors
1869  if (root_id == this->rank())
1870  {
1871  // The data will be packed in one long array
1872  for (std::size_t i=0; i<size_sizes; ++i)
1873  temp.insert(temp.end(), data[i].begin(), data[i].end());
1874  }
1875  else
1876  temp.resize(bufsize);
1877 
1878  // broad cast the packed data
1879  this->broadcast(temp, root_id, StandardType<T>::is_fixed_type);
1880 
1881  // Unpack the data
1882  if (root_id != this->rank())
1883  {
1884  data.clear();
1885  data.resize(size_sizes);
1886  typename std::vector<T>::const_iterator iter = temp.begin();
1887  for (std::size_t i=0; i<size_sizes; ++i)
1888  {
1889  data[i].insert(data[i].end(), iter, iter+sizes[i]);
1890  iter += sizes[i];
1891  }
1892  }
1893 }
1894 
1895 
1896 
1897 
1898 template <typename T, typename C, typename A>
1899 inline void Communicator::broadcast (std::set<T,C,A> & data,
1900  const unsigned int root_id,
1901  const bool identical_sizes) const
1902 {
1903  if (this->size() == 1)
1904  {
1905  timpi_assert (!this->rank());
1906  timpi_assert (!root_id);
1907  return;
1908  }
1909 
1910  timpi_assert_less (root_id, this->size());
1911  timpi_assert (this->verify(identical_sizes));
1912 
1913  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1914 
1915  std::vector<T> vecdata;
1916  if (this->rank() == root_id)
1917  vecdata.assign(data.begin(), data.end());
1918 
1919  std::size_t vecsize = vecdata.size();
1920  if (identical_sizes)
1921  timpi_assert(this->verify(vecsize));
1922  else
1923  this->broadcast(vecsize, root_id);
1924  if (this->rank() != root_id)
1925  vecdata.resize(vecsize);
1926 
1927  this->broadcast(vecdata, root_id, StandardType<T>::is_fixed_type);
1928  if (this->rank() != root_id)
1929  {
1930  data.clear();
1931  data.insert(vecdata.begin(), vecdata.end());
1932  }
1933 }
1934 
1935 
1936 template <typename Context, typename OutputIter, typename T>
1937 inline void Communicator::nonblocking_receive_packed_range (const unsigned int src_processor_id,
1938  Context * context,
1939  OutputIter out,
1940  const T * /*output_type*/,
1941  Request & req,
1942  Status & stat,
1943  std::shared_ptr<std::vector<typename Packing<T>::buffer_type>> & buffer,
1944  const MessageTag & tag) const
1945 {
1946  // If they didn't pass in a buffer - let's make one
1947  if (buffer == nullptr)
1948  buffer = std::make_shared<std::vector<typename Packing<T>::buffer_type>>();
1949  else
1950  buffer->clear();
1951 
1952  // Receive serialized variable size objects as a sequence of
1953  // buffer_t.
1954  // Allocate a buffer on the heap so we don't have to free it until
1955  // after the Request::wait()
1956  buffer->resize(stat.size());
1957  this->receive(src_processor_id, *buffer, req, tag);
1958 
1959  // Make the Request::wait() handle unpacking the buffer
1960  req.add_post_wait_work
1961  (new PostWaitUnpackBuffer<std::vector<typename Packing<T>::buffer_type>, Context, OutputIter, T>(*buffer, context, out));
1962 
1963  // Make it dereference the shared pointer (possibly freeing the buffer)
1964  req.add_post_wait_work
1965  (new PostWaitDereferenceSharedPtr<std::vector<typename Packing<T>::buffer_type>>(buffer));
1966 }
1967 
1968 
1969 
1970 template <typename T, typename A, typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
1971 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
1972  std::vector<T,A> & buf,
1973  const DataType & type,
1974  Request & req,
1975  const MessageTag & tag) const
1976 {
1977  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
1978 
1979  Status stat(type);
1980 
1981  int int_flag = 0;
1982 
1983  timpi_assert(src_processor_id < this->size() ||
1984  src_processor_id == any_source);
1985 
1986  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
1987  tag.value(),
1988  this->get(),
1989  &int_flag,
1990  stat.get()));
1991 
1992  if (int_flag)
1993  {
1994  buf.resize(stat.size());
1995 
1996  src_processor_id = stat.source();
1997 
1998  timpi_call_mpi
1999  (TIMPI_IRECV(buf.data(), cast_int<CountType>(buf.size()), type,
2000  src_processor_id, tag.value(), this->get(),
2001  req.get()));
2002 
2003  // The MessageTag should stay registered for the Request lifetime
2004  req.add_post_wait_work
2005  (new PostWaitDereferenceTag(tag));
2006  }
2007 
2008  return int_flag;
2009 }
2010 
2011 template <typename T, typename A, typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
2012 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
2013  std::vector<T,A> & buf,
2014  const NotADataType &,
2015  Request & req,
2016  const MessageTag & tag) const
2017 {
2018  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
2019 
2020  return this->possibly_receive_packed_range(src_processor_id,
2021  (void *)(nullptr),
2022  std::inserter(buf, buf.end()),
2023  (T *)(nullptr),
2024  req,
2025  tag);
2026 }
2027 
2028 
2029 
2030 template <typename T, typename A1, typename A2>
2031 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
2032  std::vector<std::vector<T,A1>,A2> & buf,
2033  const DataType & type,
2034  Request & req,
2035  const MessageTag & tag) const
2036 {
2037  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
2038 
2039  Status stat(type);
2040 
2041  int int_flag = 0;
2042 
2043  timpi_assert(src_processor_id < this->size() ||
2044  src_processor_id == any_source);
2045 
2046  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
2047  tag.value(),
2048  this->get(),
2049  &int_flag,
2050  stat.get()));
2051 
2052  if (int_flag)
2053  {
2054  src_processor_id = stat.source();
2055 
2056  std::vector<char> * recvbuf =
2057  new std::vector<char>(stat.size(StandardType<char>()));
2058 
2059  this->receive(src_processor_id, *recvbuf, MPI_PACKED, req, tag);
2060 
2061  // When we wait on the receive, we'll unpack the temporary buffer
2062  req.add_post_wait_work
2063  (new PostWaitUnpackNestedBuffer<std::vector<std::vector<T,A1>,A2>>
2064  (*recvbuf, buf, type, *this));
2065 
2066  // And then we'll free the temporary buffer
2067  req.add_post_wait_work
2068  (new PostWaitDeleteBuffer<std::vector<char>>(recvbuf));
2069 
2070  // The MessageTag should stay registered for the Request lifetime
2071  req.add_post_wait_work
2072  (new PostWaitDereferenceTag(tag));
2073  }
2074 
2075  return int_flag;
2076 }
2077 
2078 #else
2079  typedef int DispType;
2080 #endif // TIMPI_HAVE_MPI
2081 
2082 
2083 // Some of our methods are implemented indirectly via other
2084 // MPI-encapsulated methods and the implementation works with or
2085 // without MPI.
2086 //
2087 // Other methods have a "this->size() == 1" shortcut which still
2088 // applies in the without-MPI case, and the timpi_call_mpi macro
2089 // becomes a no-op so wrapped MPI methods still compile without MPI
2090 
2091 template <typename T>
2092 inline bool Communicator::verify(const T & r) const
2093 {
2094  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2095  {
2096  T tempmin = r, tempmax = r;
2097  this->min(tempmin);
2098  this->max(tempmax);
2099  bool verified = (r == tempmin) &&
2100  (r == tempmax);
2101  this->min(verified);
2102  return verified;
2103  }
2104 
2105  static_assert(Attributes<T>::has_min_max,
2106  "Tried to verify an unverifiable type");
2107 
2108  return true;
2109 }
2110 
2111 template <typename T>
2112 inline bool Communicator::semiverify(const T * r) const
2113 {
2114  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2115  {
2116  T tempmin, tempmax;
2117  if (r)
2118  tempmin = tempmax = *r;
2119  else
2120  {
2121  Attributes<T>::set_highest(tempmin);
2122  Attributes<T>::set_lowest(tempmax);
2123  }
2124  this->min(tempmin);
2125  this->max(tempmax);
2126  bool invalid = r && ((*r != tempmin) ||
2127  (*r != tempmax));
2128  this->max(invalid);
2129  return !invalid;
2130  }
2131 
2132  static_assert(Attributes<T>::has_min_max,
2133  "Tried to semiverify an unverifiable type");
2134 
2135  return true;
2136 }
2137 
2138 
2139 
2140 template <typename T, typename A>
2141 inline bool Communicator::semiverify(const std::vector<T,A> * r) const
2142 {
2143  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2144  {
2145  std::size_t rsize = r ? r->size() : 0;
2146  std::size_t * psize = r ? &rsize : nullptr;
2147 
2148  if (!this->semiverify(psize))
2149  return false;
2150 
2151  this->max(rsize);
2152 
2153  std::vector<T,A> tempmin, tempmax;
2154  if (r)
2155  {
2156  tempmin = tempmax = *r;
2157  }
2158  else
2159  {
2160  tempmin.resize(rsize);
2161  tempmax.resize(rsize);
2162  Attributes<std::vector<T,A>>::set_highest(tempmin);
2163  Attributes<std::vector<T,A>>::set_lowest(tempmax);
2164  }
2165  this->min(tempmin);
2166  this->max(tempmax);
2167  bool invalid = r && ((*r != tempmin) ||
2168  (*r != tempmax));
2169  this->max(invalid);
2170  return !invalid;
2171  }
2172 
2173  static_assert(Attributes<T>::has_min_max,
2174  "Tried to semiverify a vector of an unverifiable type");
2175 
2176  return true;
2177 }
2178 
2179 
2180 
2181 template <typename A>
2182 inline void Communicator::min(std::vector<bool,A> & r) const
2183 {
2184  if (this->size() > 1 && !r.empty())
2185  {
2186  TIMPI_LOG_SCOPE("min(vector<bool>)", "Parallel");
2187 
2188  timpi_assert(this->verify(r.size()));
2189 
2190  std::vector<unsigned int> ruint;
2191  pack_vector_bool(r, ruint);
2192  std::vector<unsigned int> temp(ruint.size());
2193  timpi_call_mpi
2194  (TIMPI_ALLREDUCE
2195  (ruint.data(), temp.data(),
2196  cast_int<CountType>(ruint.size()),
2197  StandardType<unsigned int>(), MPI_BAND, this->get()));
2198  unpack_vector_bool(temp, r);
2199  }
2200 }
2201 
2202 
2203 template <typename T>
2204 inline void Communicator::minloc(T & r,
2205  unsigned int & min_id) const
2206 {
2207  if (this->size() > 1)
2208  {
2209  TIMPI_LOG_SCOPE("minloc(scalar)", "Parallel");
2210 
2211  DataPlusInt<T> data_in;
2212  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
2213  data_in.val = r;
2214  data_in.rank = this->rank();
2215 
2216  timpi_call_mpi
2217  (TIMPI_ALLREDUCE (MPI_IN_PLACE, &data_in, 1,
2218  dataplusint_type_acquire<T>().first,
2219  OpFunction<T>::min_location(), this->get()));
2220  r = data_in.val;
2221  min_id = data_in.rank;
2222  }
2223  else
2224  min_id = this->rank();
2225 }
2226 
2227 
2228 template <typename T, typename A1, typename A2>
2229 inline void Communicator::minloc(std::vector<T,A1> & r,
2230  std::vector<unsigned int,A2> & min_id) const
2231 {
2232  if (this->size() > 1 && !r.empty())
2233  {
2234  TIMPI_LOG_SCOPE("minloc(vector)", "Parallel");
2235 
2236  timpi_assert(this->verify(r.size()));
2237 
2238  std::vector<DataPlusInt<T>> data_in(r.size());
2239  for (std::size_t i=0; i != r.size(); ++i)
2240  {
2241  data_in[i].val = r[i];
2242  data_in[i].rank = this->rank();
2243  }
2244  std::vector<DataPlusInt<T>> data_out(r.size());
2245 
2246  timpi_call_mpi
2247  (TIMPI_ALLREDUCE (data_in.data(), data_out.data(),
2248  cast_int<CountType>(r.size()),
2249  dataplusint_type_acquire<T>().first,
2250  OpFunction<T>::min_location(), this->get()));
2251  for (std::size_t i=0; i != r.size(); ++i)
2252  {
2253  r[i] = data_out[i].val;
2254  min_id[i] = data_out[i].rank;
2255  }
2256  }
2257  else if (!r.empty())
2258  {
2259  for (std::size_t i=0; i != r.size(); ++i)
2260  min_id[i] = this->rank();
2261  }
2262 }
2263 
2264 
2265 template <typename A1, typename A2>
2266 inline void Communicator::minloc(std::vector<bool,A1> & r,
2267  std::vector<unsigned int,A2> & min_id) const
2268 {
2269  if (this->size() > 1 && !r.empty())
2270  {
2271  TIMPI_LOG_SCOPE("minloc(vector<bool>)", "Parallel");
2272 
2273  timpi_assert(this->verify(r.size()));
2274 
2275  std::vector<DataPlusInt<int>> data_in(r.size());
2276  for (std::size_t i=0; i != r.size(); ++i)
2277  {
2278  data_in[i].val = r[i];
2279  data_in[i].rank = this->rank();
2280  }
2281  std::vector<DataPlusInt<int>> data_out(r.size());
2282  timpi_call_mpi
2283  (TIMPI_ALLREDUCE
2284  (data_in.data(), data_out.data(),
2285  cast_int<CountType>(r.size()), StandardType<int>(),
2286  OpFunction<int>::min_location(), this->get()));
2287  for (std::size_t i=0; i != r.size(); ++i)
2288  {
2289  r[i] = data_out[i].val;
2290  min_id[i] = data_out[i].rank;
2291  }
2292  }
2293  else if (!r.empty())
2294  {
2295  for (std::size_t i=0; i != r.size(); ++i)
2296  min_id[i] = this->rank();
2297  }
2298 }
2299 
2300 
2301 
2302 template <typename A>
2303 inline void Communicator::max(std::vector<bool,A> & r) const
2304 {
2305  if (this->size() > 1 && !r.empty())
2306  {
2307  TIMPI_LOG_SCOPE("max(vector<bool>)", "Parallel");
2308 
2309  timpi_assert(this->verify(r.size()));
2310 
2311  std::vector<unsigned int> ruint;
2312  pack_vector_bool(r, ruint);
2313  std::vector<unsigned int> temp(ruint.size());
2314  timpi_call_mpi
2315  (TIMPI_ALLREDUCE (ruint.data(), temp.data(),
2316  cast_int<CountType>(ruint.size()),
2317  StandardType<unsigned int>(), MPI_BOR,
2318  this->get()));
2319  unpack_vector_bool(temp, r);
2320  }
2321 }
2322 
2323 
2324 
2325 template <typename Map,
2326  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2327  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
2328  int>::type>
2329 void Communicator::map_max(Map & data) const
2330 {
2331  if (this->size() > 1)
2332  {
2333  TIMPI_LOG_SCOPE("max(map)", "Parallel");
2334 
2335  // Since the input map may have different keys on different
2336  // processors, we first gather all the keys and values, then for
2337  // each key we choose the max value over all procs. We
2338  // initialize the max with the first value we encounter rather
2339  // than some "global" minimum, since the latter is difficult to
2340  // do generically.
2341  std::vector<std::pair<typename Map::key_type, typename Map::mapped_type>>
2342  vecdata(data.begin(), data.end());
2343 
2344  this->allgather(vecdata, /*identical_buffer_sizes=*/false);
2345 
2346  data.clear();
2347 
2348  for (const auto & pr : vecdata)
2349  {
2350  // Attempt to insert this value. If it works, then the value didn't
2351  // already exist and we can go on. If it fails, compute the std::max
2352  // between the current and existing values.
2353  auto result = data.insert(pr);
2354 
2355  bool inserted = result.second;
2356 
2357  if (!inserted)
2358  {
2359  auto it = result.first;
2360  it->second = std::max(it->second, pr.second);
2361  }
2362  }
2363  }
2364 }
2365 
2366 
2367 
2368 template <typename Map,
2369  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2370  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
2371  int>::type>
2372 void Communicator::map_max(Map & data) const
2373 {
2374  if (this->size() > 1)
2375  {
2376  TIMPI_LOG_SCOPE("max(map)", "Parallel");
2377 
2378  // Since the input map may have different keys on different
2379  // processors, we first gather all the keys and values, then for
2380  // each key we choose the max value over all procs. We
2381  // initialize the max with the first value we encounter rather
2382  // than some "global" minimum, since the latter is difficult to
2383  // do generically.
2384  std::vector<typename Map::key_type> keys;
2385  std::vector<typename Map::mapped_type> vals;
2386 
2387  auto data_size = data.size();
2388  keys.reserve(data_size);
2389  vals.reserve(data_size);
2390 
2391  for (const auto & pr : data)
2392  {
2393  keys.push_back(pr.first);
2394  vals.push_back(pr.second);
2395  }
2396 
2397  this->allgather(keys, /*identical_buffer_sizes=*/false);
2398  this->allgather(vals, /*identical_buffer_sizes=*/false);
2399 
2400  data.clear();
2401 
2402  for (std::size_t i=0; i<keys.size(); ++i)
2403  {
2404  // Attempt to emplace this value. If it works, then the value didn't
2405  // already exist and we can go on. If it fails, compute the std::max
2406  // between the current and existing values.
2407  auto pr = data.emplace(keys[i], vals[i]);
2408 
2409  bool emplaced = pr.second;
2410 
2411  if (!emplaced)
2412  {
2413  auto it = pr.first;
2414  it->second = std::max(it->second, vals[i]);
2415  }
2416  }
2417  }
2418 }
2419 
2420 
2421 
2422 template <typename K, typename V, typename C, typename A>
2423 inline
2424 void Communicator::max(std::map<K,V,C,A> & data) const
2425 {
2426  this->map_max(data);
2427 }
2428 
2429 
2430 
2431 template <typename K, typename V, typename H, typename E, typename A>
2432 inline
2433 void Communicator::max(std::unordered_map<K,V,H,E,A> & data) const
2434 {
2435  this->map_max(data);
2436 }
2437 
2438 
2439 
2440 template <typename T>
2441 inline void Communicator::maxloc(T & r,
2442  unsigned int & max_id) const
2443 {
2444  if (this->size() > 1)
2445  {
2446  TIMPI_LOG_SCOPE("maxloc(scalar)", "Parallel");
2447 
2448  DataPlusInt<T> data_in;
2449  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
2450  data_in.val = r;
2451  data_in.rank = this->rank();
2452 
2453  timpi_call_mpi
2454  (TIMPI_ALLREDUCE (MPI_IN_PLACE, &data_in, 1,
2455  dataplusint_type_acquire<T>().first,
2456  OpFunction<T>::max_location(), this->get()));
2457  r = data_in.val;
2458  max_id = data_in.rank;
2459  }
2460  else
2461  max_id = this->rank();
2462 }
2463 
2464 
2465 template <typename T, typename A1, typename A2>
2466 inline void Communicator::maxloc(std::vector<T,A1> & r,
2467  std::vector<unsigned int,A2> & max_id) const
2468 {
2469  if (this->size() > 1 && !r.empty())
2470  {
2471  TIMPI_LOG_SCOPE("maxloc(vector)", "Parallel");
2472 
2473  timpi_assert(this->verify(r.size()));
2474 
2475  std::vector<DataPlusInt<T>> data_in(r.size());
2476  for (std::size_t i=0; i != r.size(); ++i)
2477  {
2478  data_in[i].val = r[i];
2479  data_in[i].rank = this->rank();
2480  }
2481  std::vector<DataPlusInt<T>> data_out(r.size());
2482 
2483  timpi_call_mpi
2484  (TIMPI_ALLREDUCE(data_in.data(), data_out.data(),
2485  cast_int<CountType>(r.size()),
2486  dataplusint_type_acquire<T>().first,
2488  this->get()));
2489  for (std::size_t i=0; i != r.size(); ++i)
2490  {
2491  r[i] = data_out[i].val;
2492  max_id[i] = data_out[i].rank;
2493  }
2494  }
2495  else if (!r.empty())
2496  {
2497  for (std::size_t i=0; i != r.size(); ++i)
2498  max_id[i] = this->rank();
2499  }
2500 }
2501 
2502 
2503 template <typename A1, typename A2>
2504 inline void Communicator::maxloc(std::vector<bool,A1> & r,
2505  std::vector<unsigned int,A2> & max_id) const
2506 {
2507  if (this->size() > 1 && !r.empty())
2508  {
2509  TIMPI_LOG_SCOPE("maxloc(vector<bool>)", "Parallel");
2510 
2511  timpi_assert(this->verify(r.size()));
2512 
2513  std::vector<DataPlusInt<int>> data_in(r.size());
2514  for (std::size_t i=0; i != r.size(); ++i)
2515  {
2516  data_in[i].val = r[i];
2517  data_in[i].rank = this->rank();
2518  }
2519  std::vector<DataPlusInt<int>> data_out(r.size());
2520  timpi_call_mpi
2521  (TIMPI_ALLREDUCE(data_in.data(), data_out.data(),
2522  cast_int<CountType>(r.size()),
2525  this->get()));
2526  for (std::size_t i=0; i != r.size(); ++i)
2527  {
2528  r[i] = data_out[i].val;
2529  max_id[i] = data_out[i].rank;
2530  }
2531  }
2532  else if (!r.empty())
2533  {
2534  for (std::size_t i=0; i != r.size(); ++i)
2535  max_id[i] = this->rank();
2536  }
2537 }
2538 
2539 #define TIMPI_DEFINE_COMMUNICATOR_OPS(OPNAME) \
2540  template <typename T> \
2541  inline void Communicator::OPNAME(T &timpi_mpi_var(r)) const { \
2542  if (this->size() > 1) { \
2543  TIMPI_LOG_SCOPE(#OPNAME "(scalar, blocking)", "Parallel"); \
2544  \
2545  timpi_call_mpi(TIMPI_ALLREDUCE(MPI_IN_PLACE, &r, 1, StandardType<T>(&r), \
2546  OpFunction<T>::OPNAME(), this->get())); \
2547  } \
2548  } \
2549  \
2550  template <typename T, typename A> \
2551  inline void Communicator::OPNAME(std::vector<T, A> &r) const { \
2552  if (this->size() > 1 && !r.empty()) { \
2553  TIMPI_LOG_SCOPE(#OPNAME "(vector, blocking)", "Parallel"); \
2554  \
2555  timpi_assert(this->verify(r.size())); \
2556  \
2557  timpi_call_mpi(TIMPI_ALLREDUCE( \
2558  MPI_IN_PLACE, r.data(), cast_int<CountType>(r.size()), \
2559  StandardType<T>(r.data()), OpFunction<T>::OPNAME(), this->get())); \
2560  } \
2561  } \
2562  template <typename T> \
2563  inline void Communicator::OPNAME(const T &r, T &o, Request &req) const { \
2564  if (this->size() > 1) { \
2565  TIMPI_LOG_SCOPE(#OPNAME "(scalar, nonblocking)", "Parallel"); \
2566  \
2567  timpi_call_mpi(TIMPI_IALLREDUCE(&r, &o, 1, StandardType<T>(&r), \
2568  OpFunction<T>::OPNAME(), this->get(), \
2569  req.get())); \
2570  } else { \
2571  o = r; \
2572  req = Request::null_request; \
2573  } \
2574  }
2575 
2576 TIMPI_DEFINE_COMMUNICATOR_OPS(sum)
2577 TIMPI_DEFINE_COMMUNICATOR_OPS(max)
2578 TIMPI_DEFINE_COMMUNICATOR_OPS(min)
2579 TIMPI_DEFINE_COMMUNICATOR_OPS(product)
2580 TIMPI_DEFINE_COMMUNICATOR_OPS(logical_and)
2581 TIMPI_DEFINE_COMMUNICATOR_OPS(bitwise_and)
2582 TIMPI_DEFINE_COMMUNICATOR_OPS(logical_or)
2583 TIMPI_DEFINE_COMMUNICATOR_OPS(bitwise_or)
2584 TIMPI_DEFINE_COMMUNICATOR_OPS(logical_xor)
2585 TIMPI_DEFINE_COMMUNICATOR_OPS(bitwise_xor)
2586 
2587 
2588 
2589 // We still do function overloading for complex sums - in a perfect
2590 // world we'd have a StandardSumOp to go along with StandardType...
2591 template <typename T>
2592 inline void Communicator::sum(std::complex<T> & timpi_mpi_var(r)) const
2593 {
2594  if (this->size() > 1)
2595  {
2596  TIMPI_LOG_SCOPE("sum()", "Parallel");
2597 
2598  timpi_call_mpi
2599  (TIMPI_ALLREDUCE(MPI_IN_PLACE, &r, 2,
2600  StandardType<T>(),
2602  this->get()));
2603  }
2604 }
2605 
2606 
2607 template <typename T, typename A>
2608 inline void Communicator::sum(std::vector<std::complex<T>,A> & r) const
2609 {
2610  if (this->size() > 1 && !r.empty())
2611  {
2612  TIMPI_LOG_SCOPE("sum()", "Parallel");
2613 
2614  timpi_assert(this->verify(r.size()));
2615 
2616  timpi_call_mpi
2617  (TIMPI_ALLREDUCE(MPI_IN_PLACE, r.data(),
2618  cast_int<CountType>(r.size() * 2),
2619  StandardType<T>(nullptr),
2620  OpFunction<T>::sum(), this->get()));
2621  }
2622 }
2623 
2624 
2625 
2626 // Helper function for summing std::map and std::unordered_map with
2627 // fixed type (key, value) pairs.
2628 template <typename Map,
2629  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2630  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
2631  int>::type>
2632 inline void Communicator::map_sum(Map & data) const
2633 {
2634  if (this->size() > 1)
2635  {
2636  TIMPI_LOG_SCOPE("sum(map)", "Parallel");
2637 
2638  // There may be different keys on different processors, so we
2639  // first gather all the (key, value) pairs and then insert
2640  // them, summing repeated keys, back into the map.
2641  //
2642  // Note: We don't simply use Map::value_type here because the
2643  // key type is const in that case and we don't have the proper
2644  // StandardType overloads for communicating const types.
2645  std::vector<std::pair<typename Map::key_type, typename Map::mapped_type>>
2646  vecdata(data.begin(), data.end());
2647 
2648  this->allgather(vecdata, /*identical_buffer_sizes=*/false);
2649 
2650  data.clear();
2651  for (const auto & pr : vecdata)
2652  data[pr.first] += pr.second;
2653  }
2654 }
2655 
2656 
2657 
2658 // Helper function for summing std::map and std::unordered_map with
2659 // non-fixed-type (key, value) pairs.
2660 template <typename Map,
2661  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2662  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
2663  int>::type>
2664 inline void Communicator::map_sum(Map & data) const
2665 {
2666  if (this->size() > 1)
2667  {
2668  TIMPI_LOG_SCOPE("sum(map)", "Parallel");
2669 
2670  // There may be different keys on different processors, so we
2671  // first gather all the (key, value) pairs and then insert
2672  // them, summing repeated keys, back into the map.
2673  std::vector<typename Map::key_type> keys;
2674  std::vector<typename Map::mapped_type> vals;
2675 
2676  auto data_size = data.size();
2677  keys.reserve(data_size);
2678  vals.reserve(data_size);
2679 
2680  for (const auto & pr : data)
2681  {
2682  keys.push_back(pr.first);
2683  vals.push_back(pr.second);
2684  }
2685 
2686  this->allgather(keys, /*identical_buffer_sizes=*/false);
2687  this->allgather(vals, /*identical_buffer_sizes=*/false);
2688 
2689  data.clear();
2690 
2691  for (std::size_t i=0; i<keys.size(); ++i)
2692  data[keys[i]] += vals[i];
2693  }
2694 }
2695 
2696 
2697 
2698 template <typename K, typename V, typename C, typename A>
2699 inline void Communicator::sum(std::map<K,V,C,A> & data) const
2700 {
2701  return this->map_sum(data);
2702 }
2703 
2704 
2705 
2706 template <typename K, typename V, typename H, typename E, typename A>
2707 inline void Communicator::sum(std::unordered_map<K,V,H,E,A> & data) const
2708 {
2709  return this->map_sum(data);
2710 }
2711 
2712 
2713 
2714 template <typename T, typename A1, typename A2,
2715  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
2716 inline void Communicator::allgather(const std::vector<T,A1> & sendval,
2717  std::vector<std::vector<T,A1>,A2> & recv,
2718  const bool identical_buffer_sizes) const
2719 {
2720  TIMPI_LOG_SCOPE ("allgather()","Parallel");
2721 
2722  timpi_assert(this->size());
2723 
2724  // serial case
2725  if (this->size() < 2)
2726  {
2727  recv.resize(1);
2728  recv[0] = sendval;
2729  return;
2730  }
2731 
2732  recv.clear();
2733  recv.resize(this->size());
2734 
2735  std::vector<CountType>
2736  sendlengths (this->size(), 0);
2737  std::vector<DispType>
2738  displacements(this->size(), 0);
2739 
2740  const CountType mysize = cast_int<CountType>(sendval.size());
2741 
2742  if (identical_buffer_sizes)
2743  sendlengths.assign(this->size(), mysize);
2744  else
2745  // first comm step to determine buffer sizes from all processors
2746  this->allgather(mysize, sendlengths);
2747 
2748  // Find the total size of the final array and
2749  // set up the displacement offsets for each processor
2750  CountType globalsize = 0;
2751  for (unsigned int i=0; i != this->size(); ++i)
2752  {
2753  displacements[i] = globalsize;
2754  globalsize += sendlengths[i];
2755  }
2756 
2757  // Check for quick return
2758  if (globalsize == 0)
2759  return;
2760 
2761  // monolithic receive buffer
2762  std::vector<T,A1> r(globalsize, 0);
2763 
2764  // and get the data from the remote processors.
2765  timpi_call_mpi
2766  (TIMPI_ALLGATHERV(const_cast<T*>(mysize ? sendval.data() : nullptr),
2767  mysize, StandardType<T>(),
2768  &r[0], sendlengths.data(), displacements.data(),
2769  StandardType<T>(), this->get()));
2770 
2771  // slice receive buffer up
2772  for (unsigned int i=0; i != this->size(); ++i)
2773  recv[i].assign(r.begin()+displacements[i],
2774  r.begin()+displacements[i]+sendlengths[i]);
2775 }
2776 
2777 
2778 
2779 template <typename T, typename A1, typename A2,
2780  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
2781 inline void Communicator::allgather(const std::vector<T,A1> & sendval,
2782  std::vector<std::vector<T,A1>,A2> & recv,
2783  const bool /* identical_buffer_sizes */) const
2784 {
2785  TIMPI_LOG_SCOPE ("allgather()","Parallel");
2786 
2787  typedef typename Packing<T>::buffer_type buffer_t;
2788 
2789  std::vector<buffer_t> buffer;
2790  auto next_iter = pack_range ((void *)nullptr, sendval.begin(),
2791  sendval.end(), buffer,
2792  std::numeric_limits<CountType>::max());
2793 
2794  if (next_iter != sendval.end())
2795  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
2796 
2797  std::vector<std::vector<buffer_t>> allbuffers;
2798 
2799  timpi_assert(this->size());
2800  recv.clear();
2801  recv.resize(this->size());
2802 
2803  // Even if our vector sizes were identical, the variable-sized
2804  // data's buffer sizes might not be.
2805  this->allgather(buffer, allbuffers, false);
2806 
2807  for (processor_id_type i=0; i != this->size(); ++i)
2808  unpack_range(allbuffers[i], (void *)nullptr,
2809  std::back_inserter(recv[i]), (T*)nullptr);
2810 }
2811 
2812 
2813 
2814 template <typename T, typename C, typename A>
2815 inline void Communicator::set_union(std::set<T,C,A> & data,
2816  const unsigned int root_id) const
2817 {
2818  if (this->size() > 1)
2819  {
2820  std::vector<T> vecdata(data.begin(), data.end());
2821  this->gather(root_id, vecdata);
2822  if (this->rank() == root_id)
2823  data.insert(vecdata.begin(), vecdata.end());
2824  }
2825 }
2826 
2827 
2828 
2829 template <typename T, typename C, typename A>
2830 inline void Communicator::set_union(std::set<T,C,A> & data) const
2831 {
2832  if (this->size() > 1)
2833  {
2834  std::vector<T> vecdata(data.begin(), data.end());
2835  this->allgather(vecdata, false);
2836  data.insert(vecdata.begin(), vecdata.end());
2837  }
2838 }
2839 
2840 
2841 
2842 template <typename T, typename C, typename A>
2843 inline void Communicator::set_union(std::multiset<T,C,A> & data,
2844  const unsigned int root_id) const
2845 {
2846  if (this->size() > 1)
2847  {
2848  std::vector<T> vecdata(data.begin(), data.end());
2849  this->gather(root_id, vecdata);
2850  if (this->rank() == root_id)
2851  {
2852  // Clear first so the root's data doesn't get duplicated
2853  data.clear();
2854  data.insert(vecdata.begin(), vecdata.end());
2855  }
2856  }
2857 }
2858 
2859 
2860 template <typename T, typename C, typename A>
2861 inline void Communicator::set_union(std::multiset<T,C,A> & data) const
2862 {
2863  if (this->size() > 1)
2864  {
2865  std::vector<T> vecdata(data.begin(), data.end());
2866  this->allgather(vecdata, false);
2867 
2868  // Don't let our data duplicate itself
2869  data.clear();
2870 
2871  data.insert(vecdata.begin(), vecdata.end());
2872  }
2873 }
2874 
2875 
2876 
2877 template <typename T1, typename T2, typename C, typename A>
2878 inline void Communicator::set_union(std::map<T1,T2,C,A> & data,
2879  const unsigned int root_id) const
2880 {
2881  if (this->size() > 1)
2882  {
2883  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
2884  this->gather(root_id, vecdata);
2885 
2886  if (this->rank() == root_id)
2887  {
2888  // If we have a non-zero root_id, we still want to let pid
2889  // 0's values take precedence in the event we have duplicate
2890  // keys
2891  data.clear();
2892 
2893  data.insert(vecdata.begin(), vecdata.end());
2894  }
2895  }
2896 }
2897 
2898 
2899 
2900 template <typename T1, typename T2, typename C, typename A>
2901 inline void Communicator::set_union(std::map<T1,T2,C,A> & data) const
2902 {
2903  if (this->size() > 1)
2904  {
2905  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
2906  this->allgather(vecdata, false);
2907 
2908  // We want values on lower pids to take precedence in the event
2909  // we have duplicate keys
2910  data.clear();
2911 
2912  data.insert(vecdata.begin(), vecdata.end());
2913  }
2914 }
2915 
2916 
2917 
2918 template <typename T1, typename T2, typename C, typename A>
2919 inline void Communicator::set_union(std::multimap<T1,T2,C,A> & data,
2920  const unsigned int root_id) const
2921 {
2922  if (this->size() > 1)
2923  {
2924  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
2925  this->gather(root_id, vecdata);
2926 
2927  if (this->rank() == root_id)
2928  {
2929  // Don't let root's data duplicate itself
2930  data.clear();
2931 
2932  data.insert(vecdata.begin(), vecdata.end());
2933  }
2934  }
2935 }
2936 
2937 
2938 
2939 template <typename T1, typename T2, typename C, typename A>
2940 inline void Communicator::set_union(std::multimap<T1,T2,C,A> & data) const
2941 {
2942  if (this->size() > 1)
2943  {
2944  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
2945  this->allgather(vecdata, false);
2946 
2947  // Don't let our data duplicate itself
2948  data.clear();
2949 
2950  data.insert(vecdata.begin(), vecdata.end());
2951  }
2952 }
2953 
2954 
2955 
2956 template <typename K, typename H, typename KE, typename A>
2957 inline void Communicator::set_union(std::unordered_set<K,H,KE,A> & data,
2958  const unsigned int root_id) const
2959 {
2960  if (this->size() > 1)
2961  {
2962  std::vector<K> vecdata(data.begin(), data.end());
2963  this->gather(root_id, vecdata);
2964  if (this->rank() == root_id)
2965  data.insert(vecdata.begin(), vecdata.end());
2966  }
2967 }
2968 
2969 
2970 
2971 template <typename K, typename H, typename KE, typename A>
2972 inline void Communicator::set_union(std::unordered_set<K,H,KE,A> & data) const
2973 {
2974  if (this->size() > 1)
2975  {
2976  std::vector<K> vecdata(data.begin(), data.end());
2977  this->allgather(vecdata, false);
2978  data.insert(vecdata.begin(), vecdata.end());
2979  }
2980 }
2981 
2982 
2983 
2984 template <typename K, typename H, typename KE, typename A>
2985 inline void Communicator::set_union(std::unordered_multiset<K,H,KE,A> & data,
2986  const unsigned int root_id) const
2987 {
2988  if (this->size() > 1)
2989  {
2990  std::vector<K> vecdata(data.begin(), data.end());
2991  this->gather(root_id, vecdata);
2992  if (this->rank() == root_id)
2993  {
2994  // Don't let root's data duplicate itself
2995  data.clear();
2996 
2997  data.insert(vecdata.begin(), vecdata.end());
2998  }
2999  }
3000 }
3001 
3002 
3003 
3004 template <typename K, typename H, typename KE, typename A>
3005 inline void Communicator::set_union(std::unordered_multiset<K,H,KE,A> & data) const
3006 {
3007  if (this->size() > 1)
3008  {
3009  std::vector<K> vecdata(data.begin(), data.end());
3010  this->allgather(vecdata, false);
3011 
3012  // Don't let our data duplicate itself
3013  data.clear();
3014 
3015  data.insert(vecdata.begin(), vecdata.end());
3016  }
3017 }
3018 
3019 
3020 
3021 template <typename K, typename T, typename H, typename KE, typename A>
3022 inline void Communicator::set_union(std::unordered_map<K,T,H,KE,A> & data,
3023  const unsigned int root_id) const
3024 {
3025  if (this->size() > 1)
3026  {
3027  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3028  this->gather(root_id, vecdata);
3029 
3030  if (this->rank() == root_id)
3031  {
3032  // If we have a non-zero root_id, we still want to let pid
3033  // 0's values take precedence in the event we have duplicate
3034  // keys
3035  data.clear();
3036 
3037  data.insert(vecdata.begin(), vecdata.end());
3038  }
3039  }
3040 }
3041 
3042 
3043 
3044 template <typename K, typename T, typename H, typename KE, typename A>
3045 inline void Communicator::set_union(std::unordered_map<K,T,H,KE,A> & data) const
3046 {
3047  if (this->size() > 1)
3048  {
3049  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3050  this->allgather(vecdata, false);
3051 
3052  // We want values on lower pids to take precedence in the event
3053  // we have duplicate keys
3054  data.clear();
3055 
3056  data.insert(vecdata.begin(), vecdata.end());
3057  }
3058 }
3059 
3060 
3061 
3062 template <typename K, typename T, typename H, typename KE, typename A>
3063 inline void Communicator::set_union(std::unordered_multimap<K,T,H,KE,A> & data,
3064  const unsigned int root_id) const
3065 {
3066  if (this->size() > 1)
3067  {
3068  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3069  this->gather(root_id, vecdata);
3070 
3071  if (this->rank() == root_id)
3072  {
3073  // Don't let root's data duplicate itself
3074  data.clear();
3075 
3076  data.insert(vecdata.begin(), vecdata.end());
3077  }
3078  }
3079 }
3080 
3081 
3082 
3083 template <typename K, typename T, typename H, typename KE, typename A>
3084 inline void Communicator::set_union(std::unordered_multimap<K,T,H,KE,A> & data) const
3085 {
3086  if (this->size() > 1)
3087  {
3088  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3089  this->allgather(vecdata, false);
3090 
3091  // Don't let our data duplicate itself
3092  data.clear();
3093 
3094  data.insert(vecdata.begin(), vecdata.end());
3095  }
3096 }
3097 
3098 
3099 
3100 template <typename T, typename A>
3101 inline void Communicator::gather(const unsigned int root_id,
3102  const T & sendval,
3103  std::vector<T,A> & recv) const
3104 {
3105  timpi_assert_less (root_id, this->size());
3106 
3107  if (this->rank() == root_id)
3108  recv.resize(this->size());
3109 
3110  if (this->size() > 1)
3111  {
3112  TIMPI_LOG_SCOPE("gather()", "Parallel");
3113 
3114  StandardType<T> send_type(&sendval);
3115 
3116  timpi_assert_less(root_id, this->size());
3117 
3118  timpi_call_mpi
3119  (TIMPI_GATHER(const_cast<T*>(&sendval), 1, send_type,
3120  recv.empty() ? nullptr : recv.data(), 1, send_type,
3121  root_id, this->get()));
3122  }
3123  else
3124  recv[0] = sendval;
3125 }
3126 
3127 
3128 
3129 template <typename T, typename A,
3130  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3131 inline void Communicator::gather(const unsigned int root_id,
3132  std::vector<T,A> & r) const
3133 {
3134  if (this->size() == 1)
3135  {
3136  timpi_assert (!this->rank());
3137  timpi_assert (!root_id);
3138  return;
3139  }
3140 
3141  timpi_assert_less (root_id, this->size());
3142 
3143  std::vector<CountType>
3144  sendlengths (this->size(), 0);
3145  std::vector<DispType>
3146  displacements(this->size(), 0);
3147 
3148  const CountType mysize = cast_int<CountType>(r.size());
3149  this->allgather(mysize, sendlengths);
3150 
3151  TIMPI_LOG_SCOPE("gather()", "Parallel");
3152 
3153  // Find the total size of the final array and
3154  // set up the displacement offsets for each processor.
3155  CountType globalsize = 0;
3156  for (unsigned int i=0; i != this->size(); ++i)
3157  {
3158  displacements[i] = globalsize;
3159  globalsize += sendlengths[i];
3160  }
3161 
3162  // Check for quick return
3163  if (globalsize == 0)
3164  return;
3165 
3166  // copy the input buffer
3167  std::vector<T,A> r_src(r);
3168 
3169  // now resize it to hold the global data
3170  // on the receiving processor
3171  if (root_id == this->rank())
3172  r.resize(globalsize);
3173 
3174  timpi_assert_less(root_id, this->size());
3175 
3176  // and get the data from the remote processors
3177  timpi_call_mpi
3178  (TIMPI_GATHERV(r_src.empty() ? nullptr : r_src.data(), mysize,
3179  StandardType<T>(), r.empty() ? nullptr : r.data(),
3180  sendlengths.data(), displacements.data(),
3181  StandardType<T>(), root_id, this->get()));
3182 }
3183 
3184 
3185 template <typename T, typename A,
3186  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3187 inline void Communicator::gather(const unsigned int root_id,
3188  std::vector<T,A> & r) const
3189 {
3190  std::vector<T,A> gathered;
3191  this->gather_packed_range(root_id, (void *)(nullptr),
3192  r.begin(), r.end(),
3193  std::inserter(gathered, gathered.end()));
3194 
3195  gathered.swap(r);
3196 }
3197 
3198 
3199 
3200 template <typename T, typename A>
3201 inline void Communicator::gather(const unsigned int root_id,
3202  const std::basic_string<T> & sendval,
3203  std::vector<std::basic_string<T>,A> & recv,
3204  const bool identical_buffer_sizes) const
3205 {
3206  timpi_assert_less (root_id, this->size());
3207 
3208  if (this->rank() == root_id)
3209  recv.resize(this->size());
3210 
3211  if (this->size() > 1)
3212  {
3213  TIMPI_LOG_SCOPE ("gather()","Parallel");
3214 
3215  std::vector<CountType>
3216  sendlengths (this->size(), 0);
3217  std::vector<DispType>
3218  displacements(this->size(), 0);
3219 
3220  const CountType mysize = cast_int<CountType>(sendval.size());
3221 
3222  if (identical_buffer_sizes)
3223  sendlengths.assign(this->size(), mysize);
3224  else
3225  // first comm step to determine buffer sizes from all processors
3226  this->gather(root_id, mysize, sendlengths);
3227 
3228  // Find the total size of the final array and
3229  // set up the displacement offsets for each processor
3230  CountType globalsize = 0;
3231  for (unsigned int i=0; i < this->size(); ++i)
3232  {
3233  displacements[i] = globalsize;
3234  globalsize += sendlengths[i];
3235  }
3236 
3237  // monolithic receive buffer
3238  std::basic_string<T> r;
3239  if (this->rank() == root_id)
3240  r.resize(globalsize, 0);
3241 
3242  timpi_assert_less(root_id, this->size());
3243 
3244  // and get the data from the remote processors.
3245  timpi_call_mpi
3246  (TIMPI_GATHERV(const_cast<T*>(sendval.data()),
3247  mysize, StandardType<T>(),
3248  this->rank() == root_id ? &r[0] : nullptr,
3249  sendlengths.data(), displacements.data(),
3250  StandardType<T>(), root_id, this->get()));
3251 
3252  // slice receive buffer up
3253  if (this->rank() == root_id)
3254  for (unsigned int i=0; i != this->size(); ++i)
3255  recv[i] = r.substr(displacements[i], sendlengths[i]);
3256  }
3257  else
3258  recv[0] = sendval;
3259 }
3260 
3261 
3262 
3263 template <typename T, typename A,
3264  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3265 inline void Communicator::allgather(const T & sendval,
3266  std::vector<T,A> & recv) const
3267 {
3268  TIMPI_LOG_SCOPE ("allgather()","Parallel");
3269 
3270  timpi_assert(this->size());
3271  recv.resize(this->size());
3272 
3273  const unsigned int comm_size = this->size();
3274  if (comm_size > 1)
3275  {
3276  StandardType<T> send_type(&sendval);
3277 
3278  timpi_call_mpi
3279  (TIMPI_ALLGATHER(const_cast<T*>(&sendval), 1, send_type, recv.data(), 1,
3280  send_type, this->get()));
3281  }
3282  else if (comm_size > 0)
3283  recv[0] = sendval;
3284 }
3285 
3286 template <typename T, typename A,
3287  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3288 inline void Communicator::allgather(const T & sendval,
3289  std::vector<T,A> & recv) const
3290 {
3291  TIMPI_LOG_SCOPE ("allgather()","Parallel");
3292 
3293  timpi_assert(this->size());
3294  recv.resize(this->size());
3295 
3296  static const std::size_t approx_total_buffer_size = 1e8;
3297  const std::size_t approx_each_buffer_size =
3298  approx_total_buffer_size / this->size();
3299 
3300  unsigned int comm_size = this->size();
3301  if (comm_size > 1)
3302  {
3303  std::vector<T> range = {sendval};
3304 
3305  allgather_packed_range((void *)(nullptr), range.begin(), range.end(), recv.begin(),
3306  approx_each_buffer_size);
3307  }
3308  else if (comm_size > 0)
3309  recv[0] = sendval;
3310 }
3311 
3312 template <typename T, typename A,
3313  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3314 inline void Communicator::allgather(std::vector<T,A> & r,
3315  const bool identical_buffer_sizes) const
3316 {
3317  if (this->size() < 2)
3318  return;
3319 
3320  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3321 
3322  if (identical_buffer_sizes)
3323  {
3324  timpi_assert(this->verify(r.size()));
3325  if (r.empty())
3326  return;
3327 
3328  std::vector<T,A> r_src(r.size()*this->size());
3329  r_src.swap(r);
3330  StandardType<T> send_type(r_src.data());
3331 
3332  timpi_call_mpi
3333  (TIMPI_ALLGATHER(r_src.data(), cast_int<CountType>(r_src.size()),
3334  send_type, r.data(), cast_int<CountType>(r_src.size()),
3335  send_type, this->get()));
3336  // timpi_assert(this->verify(r));
3337  return;
3338  }
3339 
3340  std::vector<CountType>
3341  sendlengths (this->size(), 0);
3342  std::vector<DispType>
3343  displacements(this->size(), 0);
3344 
3345  const CountType mysize = cast_int<CountType>(r.size());
3346  this->allgather(mysize, sendlengths);
3347 
3348  // Find the total size of the final array and
3349  // set up the displacement offsets for each processor.
3350  CountType globalsize = 0;
3351  for (unsigned int i=0; i != this->size(); ++i)
3352  {
3353  displacements[i] = globalsize;
3354  globalsize += sendlengths[i];
3355  }
3356 
3357  // Check for quick return
3358  if (globalsize == 0)
3359  return;
3360 
3361  // copy the input buffer
3362  std::vector<T,A> r_src(globalsize);
3363  r_src.swap(r);
3364 
3365  StandardType<T> send_type(r.data());
3366 
3367  // and get the data from the remote processors.
3368  // Pass nullptr if our vector is empty.
3369  timpi_call_mpi
3370  (TIMPI_ALLGATHERV(r_src.empty() ? nullptr : r_src.data(), mysize,
3371  send_type, r.data(), sendlengths.data(),
3372  displacements.data(), send_type, this->get()));
3373 }
3374 
3375 template <typename T, typename A,
3376  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3377 inline void Communicator::allgather(std::vector<T,A> & r,
3378  const bool identical_buffer_sizes) const
3379 {
3380  if (this->size() < 2)
3381  return;
3382 
3383  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3384 
3385  if (identical_buffer_sizes)
3386  {
3387  timpi_assert(this->verify(r.size()));
3388  if (r.empty())
3389  return;
3390 
3391 
3392  std::vector<T,A> r_src(r.size()*this->size());
3393  r_src.swap(r);
3394 
3395  this->allgather_packed_range((void *)(nullptr),
3396  r_src.begin(),
3397  r_src.end(),
3398  r.begin());
3399  return;
3400  }
3401 
3402  std::vector<CountType>
3403  sendlengths (this->size(), 0);
3404  std::vector<DispType>
3405  displacements(this->size(), 0);
3406 
3407  const CountType mysize = cast_int<CountType>(r.size());
3408  this->allgather(mysize, sendlengths);
3409 
3410  // Find the total size of the final array
3411  CountType globalsize = 0;
3412  for (unsigned int i=0; i != this->size(); ++i)
3413  globalsize += sendlengths[i];
3414 
3415  // Check for quick return
3416  if (globalsize == 0)
3417  return;
3418 
3419  // copy the input buffer
3420  std::vector<T,A> r_src(globalsize);
3421  r_src.swap(r);
3422 
3423  this->allgather_packed_range((void *)(nullptr),
3424  r_src.begin(),
3425  r_src.end(),
3426  r.begin());
3427 }
3428 
3429 template <typename T, typename A>
3430 inline void Communicator::allgather(std::vector<std::basic_string<T>,A> & r,
3431  const bool identical_buffer_sizes) const
3432 {
3433  if (this->size() < 2)
3434  return;
3435 
3436  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3437 
3438  if (identical_buffer_sizes)
3439  {
3440  timpi_assert(this->verify(r.size()));
3441 
3442  // identical_buffer_sizes doesn't buy us much since we have to
3443  // communicate the lengths of strings within each buffer anyway
3444  if (r.empty())
3445  return;
3446  }
3447 
3448  // Concatenate the input buffer into a send buffer, and keep track
3449  // of input string lengths
3450  std::vector<CountType> mystrlengths (r.size());
3451  std::vector<T> concat_src;
3452 
3453  CountType myconcatsize = 0;
3454  for (std::size_t i=0; i != r.size(); ++i)
3455  {
3456  CountType stringlen = cast_int<CountType>(r[i].size());
3457  mystrlengths[i] = stringlen;
3458  myconcatsize += stringlen;
3459  }
3460  concat_src.reserve(myconcatsize);
3461  for (std::size_t i=0; i != r.size(); ++i)
3462  concat_src.insert
3463  (concat_src.end(), r[i].begin(), r[i].end());
3464 
3465  // Get the string lengths from all other processors
3466  std::vector<CountType> strlengths = mystrlengths;
3467  this->allgather(strlengths, identical_buffer_sizes);
3468 
3469  // We now know how many strings we'll be receiving
3470  r.resize(strlengths.size());
3471 
3472  // Get the concatenated data sizes from all other processors
3473  std::vector<CountType> concat_sizes;
3474  this->allgather(myconcatsize, concat_sizes);
3475 
3476  // Find the total size of the final concatenated array and
3477  // set up the displacement offsets for each processor.
3478  std::vector<DispType> displacements(this->size(), 0);
3479  CountType globalsize = 0;
3480  for (unsigned int i=0; i != this->size(); ++i)
3481  {
3482  displacements[i] = globalsize;
3483  globalsize += concat_sizes[i];
3484  }
3485 
3486  // Check for quick return
3487  if (globalsize == 0)
3488  return;
3489 
3490  // Get the concatenated data from the remote processors.
3491  // Pass nullptr if our vector is empty.
3492  std::vector<T> concat(globalsize);
3493 
3494  // We may have concat_src.empty(), but we know concat has at least
3495  // one element we can use as an example for StandardType
3496  StandardType<T> send_type(concat.data());
3497 
3498  timpi_call_mpi
3499  (TIMPI_ALLGATHERV(concat_src.empty() ?
3500  nullptr : concat_src.data(), myconcatsize,
3501  send_type, concat.data(), concat_sizes.data(),
3502  displacements.data(), send_type, this->get()));
3503 
3504  // Finally, split concatenated data into strings
3505  const T * begin = concat.data();
3506  for (std::size_t i=0; i != r.size(); ++i)
3507  {
3508  const T * end = begin + strlengths[i];
3509  r[i].assign(begin, end);
3510  begin = end;
3511  }
3512 }
3513 
3514 
3515 
3516 template <typename T, typename A>
3517 void Communicator::scatter(const std::vector<T,A> & data,
3518  T & recv,
3519  const unsigned int root_id) const
3520 {
3521  ignore(root_id); // Only needed for MPI and/or dbg/devel
3522  timpi_assert_less (root_id, this->size());
3523 
3524  // Do not allow the root_id to scatter a nullptr vector.
3525  // That would leave recv in an indeterminate state.
3526  timpi_assert (this->rank() != root_id || this->size() == data.size());
3527 
3528  if (this->size() == 1)
3529  {
3530  timpi_assert (!this->rank());
3531  timpi_assert (!root_id);
3532  recv = data[0];
3533  return;
3534  }
3535 
3536  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3537 
3538  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3539  ignore(data_ptr); // unused ifndef TIMPI_HAVE_MPI
3540 
3541  timpi_assert_less(root_id, this->size());
3542 
3543  timpi_call_mpi
3544  (TIMPI_SCATTER(data_ptr, 1, StandardType<T>(data_ptr),
3545  &recv, 1, StandardType<T>(&recv), root_id, this->get()));
3546 }
3547 
3548 
3549 
3550 template <typename T, typename A>
3551 void Communicator::scatter(const std::vector<T,A> & data,
3552  std::vector<T,A> & recv,
3553  const unsigned int root_id) const
3554 {
3555  timpi_assert_less (root_id, this->size());
3556 
3557  if (this->size() == 1)
3558  {
3559  timpi_assert (!this->rank());
3560  timpi_assert (!root_id);
3561  recv.assign(data.begin(), data.end());
3562  return;
3563  }
3564 
3565  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3566 
3567  std::size_t recv_buffer_size = 0;
3568  if (this->rank() == root_id)
3569  {
3570  timpi_assert(data.size() % this->size() == 0);
3571  recv_buffer_size = cast_int<std::size_t>(data.size() / this->size());
3572  }
3573 
3574  this->broadcast(recv_buffer_size);
3575  recv.resize(recv_buffer_size);
3576 
3577  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3578  T * recv_ptr = recv.empty() ? nullptr : recv.data();
3579  ignore(data_ptr, recv_ptr); // unused ifndef TIMPI_HAVE_MPI
3580 
3581  timpi_assert_less(root_id, this->size());
3582 
3583  timpi_call_mpi
3584  (TIMPI_SCATTER(data_ptr, recv_buffer_size, StandardType<T>(data_ptr),
3585  recv_ptr, recv_buffer_size, StandardType<T>(recv_ptr),
3586  root_id, this->get()));
3587 }
3588 
3589 
3590 
3591 template <typename T, typename A1, typename A2>
3592 void Communicator::scatter(const std::vector<T,A1> & data,
3593  const std::vector<CountType,A2> counts,
3594  std::vector<T,A1> & recv,
3595  const unsigned int root_id) const
3596 {
3597  timpi_assert_less (root_id, this->size());
3598 
3599  if (this->size() == 1)
3600  {
3601  timpi_assert (!this->rank());
3602  timpi_assert (!root_id);
3603  timpi_assert (counts.size() == this->size());
3604  recv.assign(data.begin(), data.begin() + counts[0]);
3605  return;
3606  }
3607 
3608  std::vector<DispType> displacements(this->size(), 0);
3609  if (root_id == this->rank())
3610  {
3611  timpi_assert(counts.size() == this->size());
3612 
3613  // Create a displacements vector from the incoming counts vector
3614  std::size_t globalsize = 0;
3615  for (unsigned int i=0; i < this->size(); ++i)
3616  {
3617  displacements[i] = globalsize;
3618  globalsize += counts[i];
3619  }
3620 
3621  timpi_assert(data.size() == globalsize);
3622  }
3623 
3624  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3625 
3626  // Scatter the buffer sizes to size remote buffers
3627  CountType recv_buffer_size = 0;
3628  this->scatter(counts, recv_buffer_size, root_id);
3629  recv.resize(recv_buffer_size);
3630 
3631  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3632  CountType * count_ptr = const_cast<CountType*>(counts.empty() ? nullptr : counts.data());
3633  T * recv_ptr = recv.empty() ? nullptr : recv.data();
3634  ignore(data_ptr, count_ptr, recv_ptr); // unused ifndef TIMPI_HAVE_MPI
3635 
3636  timpi_assert_less(root_id, this->size());
3637 
3638  // Scatter the non-uniform chunks
3639  timpi_call_mpi
3640  (TIMPI_SCATTERV(data_ptr, count_ptr, displacements.data(), StandardType<T>(data_ptr),
3641  recv_ptr, recv_buffer_size, StandardType<T>(recv_ptr), root_id, this->get()));
3642 }
3643 
3644 
3645 #ifdef TIMPI_HAVE_MPI
3646 #if MPI_VERSION > 3
3647 
3650 template <typename T, typename A1, typename A2>
3651 void Communicator::scatter(const std::vector<T,A1> & data,
3652  const std::vector<int,A2> counts,
3653  std::vector<T,A1> & recv,
3654  const unsigned int root_id) const
3655 {
3656  std::vector<CountType> full_counts(counts.begin(), counts.end());
3657  this->scatter(data, full_counts, recv, root_id);
3658 }
3659 #endif
3660 #endif
3661 
3662 
3663 
3664 template <typename T, typename A1, typename A2>
3665 void Communicator::scatter(const std::vector<std::vector<T,A1>,A2> & data,
3666  std::vector<T,A1> & recv,
3667  const unsigned int root_id,
3668  const bool identical_buffer_sizes) const
3669 {
3670  timpi_assert_less (root_id, this->size());
3671 
3672  if (this->size() == 1)
3673  {
3674  timpi_assert (!this->rank());
3675  timpi_assert (!root_id);
3676  timpi_assert (data.size() == this->size());
3677  recv.assign(data[0].begin(), data[0].end());
3678  return;
3679  }
3680 
3681  std::vector<T,A1> stacked_data;
3682  std::vector<CountType> counts;
3683 
3684  if (root_id == this->rank())
3685  {
3686  timpi_assert (data.size() == this->size());
3687 
3688  if (!identical_buffer_sizes)
3689  counts.resize(this->size());
3690 
3691  for (std::size_t i=0; i < data.size(); ++i)
3692  {
3693  if (!identical_buffer_sizes)
3694  counts[i] = cast_int<CountType>(data[i].size());
3695 #ifndef NDEBUG
3696  else
3697  // Check that buffer sizes are indeed equal
3698  timpi_assert(!i || data[i-1].size() == data[i].size());
3699 #endif
3700  std::copy(data[i].begin(), data[i].end(), std::back_inserter(stacked_data));
3701  }
3702  }
3703 
3704  if (identical_buffer_sizes)
3705  this->scatter(stacked_data, recv, root_id);
3706  else
3707  this->scatter(stacked_data, counts, recv, root_id);
3708 }
3709 
3710 
3711 
3712 template <typename T, typename A>
3713 inline void Communicator::alltoall(std::vector<T,A> & buf) const
3714 {
3715  if (this->size() < 2 || buf.empty())
3716  return;
3717 
3718  TIMPI_LOG_SCOPE("alltoall()", "Parallel");
3719 
3720  // the per-processor size. this is the same for all
3721  // processors using MPI_Alltoall, could be variable
3722  // using MPI_Alltoallv
3723  const CountType size_per_proc =
3724  cast_int<CountType>(buf.size()/this->size());
3725  ignore(size_per_proc);
3726 
3727  timpi_assert_equal_to (buf.size()%this->size(), 0);
3728 
3729  timpi_assert(this->verify(size_per_proc));
3730 
3731  StandardType<T> send_type(buf.data());
3732 
3733  timpi_call_mpi
3734  (TIMPI_ALLTOALL(MPI_IN_PLACE, size_per_proc, send_type, buf.data(),
3735  size_per_proc, send_type, this->get()));
3736 }
3737 
3738 
3739 
3740 template <typename T
3741 #ifdef TIMPI_HAVE_MPI
3742  ,
3743  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type
3744 #endif
3745  >
3746 inline void Communicator::broadcast (T & timpi_mpi_var(data),
3747  const unsigned int root_id,
3748  const bool /* identical_sizes */) const
3749 {
3750  ignore(root_id); // Only needed for MPI and/or dbg/devel
3751  if (this->size() == 1)
3752  {
3753  timpi_assert (!this->rank());
3754  timpi_assert (!root_id);
3755  return;
3756  }
3757 
3758  timpi_assert_less (root_id, this->size());
3759 
3760  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3761 
3762  // Spread data to remote processors.
3763  timpi_call_mpi
3764  (TIMPI_BCAST(&data, 1, StandardType<T>(&data), root_id,
3765  this->get()));
3766 }
3767 
3768 #ifdef TIMPI_HAVE_MPI
3769 template <typename T,
3770  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3771 inline void Communicator::broadcast (T & data,
3772  const unsigned int root_id,
3773  const bool /* identical_sizes */) const
3774 {
3775  ignore(root_id); // Only needed for MPI and/or dbg/devel
3776  if (this->size() == 1)
3777  {
3778  timpi_assert (!this->rank());
3779  timpi_assert (!root_id);
3780  return;
3781  }
3782 
3783  timpi_assert_less (root_id, this->size());
3784 
3785 // // If we don't have MPI, then we should be done, and calling the below can
3786 // // have the side effect of instantiating Packing<T> classes that are not
3787 // // defined. (Normally we would be calling a more specialized overload of
3788 // // broacast that would then call broadcast_packed_range with appropriate
3789 // // template arguments)
3790 // #ifdef TIMPI_HAVE_MPI
3791  std::vector<T> range = {data};
3792 
3793  this->broadcast_packed_range((void *)(nullptr),
3794  range.begin(),
3795  range.end(),
3796  (void *)(nullptr),
3797  range.begin(),
3798  root_id);
3799 
3800  data = range[0];
3801 // #endif
3802 }
3803 #endif
3804 
3805 template <typename T, typename A,
3806  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3807 inline void Communicator::broadcast (std::vector<T,A> & timpi_mpi_var(data),
3808  const unsigned int root_id,
3809  const bool timpi_mpi_var(identical_sizes)) const
3810 {
3811  ignore(root_id); // Only needed for MPI and/or dbg/devel
3812  if (this->size() == 1)
3813  {
3814  timpi_assert (!this->rank());
3815  timpi_assert (!root_id);
3816  return;
3817  }
3818 
3819 #ifdef TIMPI_HAVE_MPI
3820 
3821  timpi_assert_less (root_id, this->size());
3822  timpi_assert (this->verify(identical_sizes));
3823 
3824  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3825 
3826  std::size_t data_size = data.size();
3827 
3828  if (identical_sizes)
3829  timpi_assert(this->verify(data_size));
3830  else
3831  this->broadcast(data_size, root_id);
3832 
3833  data.resize(data_size);
3834 
3835  // and get the data from the remote processors.
3836  // Pass nullptr if our vector is empty.
3837  T * data_ptr = data.empty() ? nullptr : data.data();
3838 
3839  timpi_assert_less(root_id, this->size());
3840 
3841  timpi_call_mpi
3842  (TIMPI_BCAST(data_ptr, cast_int<CountType>(data.size()),
3843  StandardType<T>(data_ptr), root_id, this->get()));
3844 #endif
3845 }
3846 
3847 template <typename T, typename A,
3848  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3849 inline void Communicator::broadcast (std::vector<T,A> & data,
3850  const unsigned int root_id,
3851  const bool identical_sizes) const
3852 {
3853  if (this->size() == 1)
3854  {
3855  timpi_assert (!this->rank());
3856  timpi_assert (!root_id);
3857  return;
3858  }
3859 
3860  timpi_assert_less (root_id, this->size());
3861  timpi_assert (this->verify(identical_sizes));
3862 
3863  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3864 
3865  std::size_t data_size = data.size();
3866 
3867  if (identical_sizes)
3868  timpi_assert(this->verify(data_size));
3869  else
3870  this->broadcast(data_size, root_id);
3871 
3872  data.resize(data_size);
3873 
3874  timpi_assert_less(root_id, this->size());
3875 
3876  this->broadcast_packed_range((void *)(nullptr),
3877  data.begin(),
3878  data.end(),
3879  (void *)(nullptr),
3880  data.begin(),
3881  root_id);
3882 }
3883 
3884 template <typename Map,
3885  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
3886  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
3887  int>::type>
3888 inline void Communicator::map_broadcast(Map & timpi_mpi_var(data),
3889  const unsigned int root_id,
3890  const bool timpi_mpi_var(identical_sizes)) const
3891 {
3892  ignore(root_id); // Only needed for MPI and/or dbg/devel
3893  if (this->size() == 1)
3894  {
3895  timpi_assert (!this->rank());
3896  timpi_assert (!root_id);
3897  return;
3898  }
3899 
3900 #ifdef TIMPI_HAVE_MPI
3901  timpi_assert_less (root_id, this->size());
3902  timpi_assert (this->verify(identical_sizes));
3903 
3904  TIMPI_LOG_SCOPE("broadcast(map)", "Parallel");
3905 
3906  std::size_t data_size=data.size();
3907  if (identical_sizes)
3908  timpi_assert(this->verify(data_size));
3909  else
3910  this->broadcast(data_size, root_id);
3911 
3912  std::vector<std::pair<typename Map::key_type,
3913  typename Map::mapped_type>> comm_data;
3914 
3915  if (root_id == this->rank())
3916  comm_data.assign(data.begin(), data.end());
3917  else
3918  comm_data.resize(data_size);
3919 
3920  this->broadcast(comm_data, root_id, true);
3921 
3922  if (this->rank() != root_id)
3923  {
3924  data.clear();
3925  data.insert(comm_data.begin(), comm_data.end());
3926  }
3927 #endif
3928 }
3929 
3930 template <typename Map,
3931  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
3932  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
3933  int>::type>
3934 inline void Communicator::map_broadcast(Map & timpi_mpi_var(data),
3935  const unsigned int root_id,
3936  const bool timpi_mpi_var(identical_sizes)) const
3937 {
3938  ignore(root_id); // Only needed for MPI and/or dbg/devel
3939  if (this->size() == 1)
3940  {
3941  timpi_assert (!this->rank());
3942  timpi_assert (!root_id);
3943  return;
3944  }
3945 
3946 #ifdef TIMPI_HAVE_MPI
3947  timpi_assert_less (root_id, this->size());
3948  timpi_assert (this->verify(identical_sizes));
3949 
3950  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3951 
3952  std::size_t data_size=data.size();
3953  if (identical_sizes)
3954  timpi_assert(this->verify(data_size));
3955  else
3956  this->broadcast(data_size, root_id);
3957 
3958  std::vector<typename Map::key_type> pair_first; pair_first.reserve(data_size);
3959  std::vector<typename Map::mapped_type> pair_second; pair_first.reserve(data_size);
3960 
3961  if (root_id == this->rank())
3962  {
3963  for (const auto & pr : data)
3964  {
3965  pair_first.push_back(pr.first);
3966  pair_second.push_back(pr.second);
3967  }
3968  }
3969  else
3970  {
3971  pair_first.resize(data_size);
3972  pair_second.resize(data_size);
3973  }
3974 
3975  this->broadcast
3976  (pair_first, root_id,
3978  this->broadcast
3979  (pair_second, root_id,
3981 
3982  timpi_assert(pair_first.size() == pair_first.size());
3983 
3984  if (this->rank() != root_id)
3985  {
3986  data.clear();
3987  for (std::size_t i=0; i<pair_first.size(); ++i)
3988  data[pair_first[i]] = pair_second[i];
3989  }
3990 #endif
3991 }
3992 
3993 template <typename T1, typename T2, typename C, typename A>
3994 inline void Communicator::broadcast(std::map<T1,T2,C,A> & data,
3995  const unsigned int root_id,
3996  const bool identical_sizes) const
3997 {
3998  this->map_broadcast(data, root_id, identical_sizes);
3999 }
4000 
4001 
4002 
4003 template <typename K, typename V, typename H, typename E, typename A>
4004 inline void Communicator::broadcast(std::unordered_map<K,V,H,E,A> & data,
4005  const unsigned int root_id,
4006  const bool identical_sizes) const
4007 {
4008  this->map_broadcast(data, root_id, identical_sizes);
4009 }
4010 
4011 template <typename Context, typename OutputContext,
4012  typename Iter, typename OutputIter>
4013 inline void Communicator::broadcast_packed_range(const Context * context1,
4014  Iter range_begin,
4015  const Iter range_end,
4016  OutputContext * context2,
4017  OutputIter out_iter,
4018  const unsigned int root_id,
4019  std::size_t approx_buffer_size) const
4020 {
4021  typedef typename std::iterator_traits<Iter>::value_type T;
4022  typedef typename Packing<T>::buffer_type buffer_t;
4023 
4024  if (this->size() == 1)
4025  {
4026  timpi_assert (!this->rank());
4027  timpi_assert (!root_id);
4028  return;
4029  }
4030 
4031  do
4032  {
4033  // We will serialize variable size objects from *range_begin to
4034  // *range_end as a sequence of ints in this buffer
4035  std::vector<buffer_t> buffer;
4036 
4037  if (this->rank() == root_id)
4038  range_begin = pack_range
4039  (context1, range_begin, range_end, buffer, approx_buffer_size);
4040 
4041  // this->broadcast(vector) requires the receiving vectors to
4042  // already be the appropriate size
4043  std::size_t buffer_size = buffer.size();
4044  this->broadcast (buffer_size, root_id);
4045 
4046  // We continue until there's nothing left to broadcast
4047  if (!buffer_size)
4048  break;
4049 
4050  buffer.resize(buffer_size);
4051 
4052  // Broadcast the packed data
4053  this->broadcast (buffer, root_id);
4054 
4055  // OutputIter might not have operator= implemented; for maximum
4056  // compatibility we'll rely on its copy constructor.
4057  std::unique_ptr<OutputIter> next_out_iter =
4058  std::make_unique<OutputIter>(out_iter);
4059 
4060  if (this->rank() != root_id)
4061  {
4062  auto return_out_iter = unpack_range
4063  (buffer, context2, *next_out_iter, (T*)nullptr);
4064  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4065  }
4066  } while (true); // break above when we reach buffer_size==0
4067 }
4068 
4069 
4070 template <typename Context, typename Iter, typename OutputIter>
4071 inline void Communicator::gather_packed_range(const unsigned int root_id,
4072  Context * context,
4073  Iter range_begin,
4074  const Iter range_end,
4075  OutputIter out_iter,
4076  std::size_t approx_buffer_size) const
4077 {
4078  typedef typename std::iterator_traits<Iter>::value_type T;
4079  typedef typename Packing<T>::buffer_type buffer_t;
4080 
4081  bool nonempty_range = (range_begin != range_end);
4082  this->max(nonempty_range);
4083 
4084  // OutputIter might not have operator= implemented; for maximum
4085  // compatibility we'll rely on its copy constructor.
4086  std::unique_ptr<OutputIter> next_out_iter =
4087  std::make_unique<OutputIter>(out_iter);
4088 
4089  while (nonempty_range)
4090  {
4091  // We will serialize variable size objects from *range_begin to
4092  // *range_end as a sequence of ints in this buffer
4093  std::vector<buffer_t> buffer;
4094 
4095  range_begin = pack_range
4096  (context, range_begin, range_end, buffer, approx_buffer_size);
4097 
4098  this->gather(root_id, buffer);
4099 
4100  auto return_out_iter = unpack_range
4101  (buffer, context, *next_out_iter, (T*)(nullptr));
4102  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4103 
4104  nonempty_range = (range_begin != range_end);
4105  this->max(nonempty_range);
4106  }
4107 }
4108 
4109 
4110 template <typename Context, typename Iter, typename OutputIter>
4111 inline void Communicator::allgather_packed_range(Context * context,
4112  Iter range_begin,
4113  const Iter range_end,
4114  OutputIter out_iter,
4115  std::size_t approx_buffer_size) const
4116 {
4117  typedef typename std::iterator_traits<Iter>::value_type T;
4118  typedef typename Packing<T>::buffer_type buffer_t;
4119 
4120  bool nonempty_range = (range_begin != range_end);
4121  this->max(nonempty_range);
4122 
4123  // OutputIter might not have operator= implemented; for maximum
4124  // compatibility we'll rely on its copy constructor.
4125  std::unique_ptr<OutputIter> next_out_iter =
4126  std::make_unique<OutputIter>(out_iter);
4127 
4128  while (nonempty_range)
4129  {
4130  // We will serialize variable size objects from *range_begin to
4131  // *range_end as a sequence of ints in this buffer
4132  std::vector<buffer_t> buffer;
4133 
4134  range_begin = pack_range
4135  (context, range_begin, range_end, buffer, approx_buffer_size);
4136 
4137  this->allgather(buffer, false);
4138 
4139  timpi_assert(buffer.size());
4140 
4141  auto return_out_iter = unpack_range
4142  (buffer, context, *next_out_iter, (T*)nullptr);
4143  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4144 
4145  nonempty_range = (range_begin != range_end);
4146  this->max(nonempty_range);
4147  }
4148 }
4149 
4150 
4151 
4152 template<typename T>
4153 inline Status Communicator::packed_range_probe (const unsigned int src_processor_id,
4154  const MessageTag & tag,
4155  bool & flag) const
4156 {
4157  TIMPI_LOG_SCOPE("packed_range_probe()", "Parallel");
4158 
4159  ignore(src_processor_id, tag); // unused in opt mode w/o MPI
4160 
4161  Status stat((StandardType<typename Packing<T>::buffer_type>()));
4162 
4163  int int_flag = 0;
4164 
4165  timpi_assert(src_processor_id < this->size() ||
4166  src_processor_id == any_source);
4167 
4168  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
4169  tag.value(),
4170  this->get(),
4171  &int_flag,
4172  stat.get()));
4173 
4174  flag = int_flag;
4175 
4176  return stat;
4177 }
4178 
4179 
4180 
4181 template <typename T, typename A,
4182  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
4183 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4184  std::vector<T,A> & buf,
4185  Request & req,
4186  const MessageTag & tag) const
4187 {
4188  T * dataptr = buf.empty() ? nullptr : buf.data();
4189 
4190  return this->possibly_receive(src_processor_id, buf, StandardType<T>(dataptr), req, tag);
4191 }
4192 
4193 template <typename T, typename A,
4194  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
4195 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4196  std::vector<T,A> & buf,
4197  Request & req,
4198  const MessageTag & tag) const
4199 {
4200  return this->possibly_receive_packed_range(src_processor_id,
4201  (void *)(nullptr),
4202  buf.begin(),
4203  (T *)(nullptr),
4204  req,
4205  tag);
4206 }
4207 
4208 
4209 
4210 template <typename T, typename A1, typename A2>
4211 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4212  std::vector<std::vector<T,A1>,A2> & buf,
4213  Request & req,
4214  const MessageTag & tag) const
4215 {
4216  T * dataptr = buf.empty() ? nullptr : (buf[0].empty() ? nullptr : buf[0].data());
4217 
4218  return this->possibly_receive(src_processor_id, buf, StandardType<T>(dataptr), req, tag);
4219 }
4220 
4221 
4222 template <typename Context, typename OutputIter, typename T>
4223 inline bool Communicator::possibly_receive_packed_range (unsigned int & src_processor_id,
4224  Context * context,
4225  OutputIter out,
4226  const T * type,
4227  Request & req,
4228  const MessageTag & tag) const
4229 {
4230  TIMPI_LOG_SCOPE("possibly_receive_packed_range()", "Parallel");
4231 
4232  bool int_flag = 0;
4233 
4234  auto stat = packed_range_probe<T>(src_processor_id, tag, int_flag);
4235 
4236  if (int_flag)
4237  {
4238  src_processor_id = stat.source();
4239 
4240  nonblocking_receive_packed_range(src_processor_id,
4241  context,
4242  out,
4243  type,
4244  req,
4245  stat,
4246  tag);
4247 
4248  // The MessageTag should stay registered for the Request lifetime
4249  req.add_post_wait_work
4250  (new PostWaitDereferenceTag(tag));
4251  }
4252 
4253  timpi_assert(!int_flag || (int_flag &&
4254  src_processor_id < this->size() &&
4255  src_processor_id != any_source));
4256 
4257  return int_flag;
4258 }
4259 
4260 
4261 } // namespace TIMPI
4262 
4263 #endif // TIMPI_PARALLEL_IMPLEMENTATION_H
void gather_packed_range(const unsigned int root_id, Context *context, Iter range_begin, const Iter range_end, OutputIter out, std::size_t approx_buffer_size=1000000) const
Take a range of local variables, combine it with ranges from all processors, and write the output to ...
void add_prior_request(const Request &req)
Definition: request.C:190
data_type dataplusint_type< short int >()
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
void allgather(const T &send_data, std::vector< T, A > &recv_data) const
Take a vector of length this->size(), and fill in recv[processor_id] = the value of send on that proc...
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
void broadcast_packed_range(const Context *context1, Iter range_begin, const Iter range_end, OutputContext *context2, OutputIter out, const unsigned int root_id=0, std::size_t approx_buffer_size=1000000) const
Blocking-broadcast range-of-pointers to one processor.
OutputIter unpack_range(const std::vector< buffertype > &buffer, Context *context, OutputIter out_iter, const T *)
Helper function for range unpacking.
Definition: packing.h:1103
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void scatter(const std::vector< T, A > &data, T &recv, const unsigned int root_id=0) const
Take a vector of local variables and scatter the ith item to the ith processor in the communicator...
void map_max(Map &data) const
Private implementation function called by the map-based max() specializations.
void minloc(T &r, unsigned int &min_id) const
Take a local variable and replace it with the minimum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the minimum value.
Types combined with an int.
int value() const
Definition: message_tag.h:92
void gather(const unsigned int root_id, const T &send_data, std::vector< T, A > &recv) const
Take a vector of length comm.size(), and on processor root_id fill in recv[processor_id] = the value ...
const MessageTag any_tag
Default message tag ids.
Definition: message_tag.h:114
void sum(T &r) const
Take a local variable and replace it with the sum of it&#39;s values on all processors.
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
processor_id_type rank() const
Definition: communicator.h:208
Encapsulates the MPI_Datatype.
Definition: data_type.h:50
void allgather_packed_range(Context *context, Iter range_begin, const Iter range_end, OutputIter out, std::size_t approx_buffer_size=1000000) const
Take a range of local variables, combine it with ranges from all processors, and write the output to ...
void map_broadcast(Map &data, const unsigned int root_id, const bool identical_sizes) const
Private implementation function called by the map-based broadcast() specializations.
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: standard_type.h:83
void assign(const communicator &comm)
Utility function for setting our member variables from an MPI communicator.
Definition: communicator.C:185
data_type dataplusint_type< float >()
void ignore(const Args &...)
Definition: timpi_assert.h:54
int source() const
Definition: status.h:142
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
Definition: packing.h:60
data_type dataplusint_type()
Templated function to return the appropriate MPI datatype for use with built-in C types when combined...
void send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, const MessageTag &tag=no_tag, std::size_t approx_buffer_size=1000000) const
Blocking-send range-of-pointers to one processor.
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
const MessageTag no_tag
Definition: message_tag.h:119
processor_id_type size() const
Definition: communicator.h:211
std::size_t packed_size_of(const std::vector< std::vector< T, A1 >, A2 > &buf, const DataType &type) const
uint8_t processor_id_type
Definition: communicator.h:54
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
Encapsulates the MPI tag integers.
Definition: message_tag.h:46
void min(const T &r, T &o, Request &req) const
Non-blocking minimum of the local value r into o with the request req.
void receive_packed_range(const unsigned int dest_processor_id, Context *context, OutputIter out, const T *output_type, const MessageTag &tag=any_tag) const
Blocking-receive range-of-pointers from one processor.
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
Definition: communicator.C:283
int tag() const
Definition: status.h:151
data_type dataplusint_type< int >()
MPI_Count CountType
Definition: status.h:47
Status wait()
Definition: request.C:121
Iter pack_range(const Context *context, Iter range_begin, const Iter range_end, std::vector< buffertype > &buffer, std::size_t approx_buffer_size)
Helper function for range packing.
Definition: packing.h:1044
static const bool has_min_max
Definition: attributes.h:48
void maxloc(T &r, unsigned int &max_id) const
Take a local variable and replace it with the maximum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the maximum value.
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
timpi_pure bool semiverify(const T *r) const
Check whether a local pointer points to the same value on all processors where it is not null...
static const bool is_fixed_type
Definition: data_type.h:130
data_type dataplusint_type< long >()
void broadcast(T &data, const unsigned int root_id=0, const bool identical_sizes=false) const
Take a local value and broadcast it to all processors.
void add_post_wait_work(PostWaitWork *work)
Definition: request.C:204
StandardType<T>&#39;s which do not define a way to MPI_Type T should inherit from this class...
Definition: data_type.h:120
data_type dataplusint_type< long double >()
CountType size(const data_type &type) const
Definition: status.h:161
void map_sum(Map &data) const
Private implementation function called by the map-based sum() specializations.
Encapsulates the MPI_Request.
Definition: request.h:67
timpi_pure bool verify(const T &r) const
Check whether a local variable has the same value on all processors, returning true if it does or fal...
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
request * get()
Definition: request.h:84
data_type dataplusint_type< double >()
SendMode send_mode() const
Gets the user-requested SendMode.
Definition: communicator.h:338
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
Definition: status.h:75
std::size_t packed_range_size(const Context *context, Iter range_begin, const Iter range_end)
Helper function for range packing.
Definition: packing.h:1023
std::pair< data_type, std::unique_ptr< StandardType< std::pair< T, int > > > > dataplusint_type_acquire()
status * get()
Definition: status.h:95
void set_union(T &data, const unsigned int root_id) const
Take a container (set, map, unordered_set, multimap, etc) of local variables on each processor...
Templated class to provide the appropriate MPI reduction operations for use with built-in C types or ...
Definition: op_function.h:120