TIMPI
parallel_implementation.h
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 #ifndef TIMPI_PARALLEL_IMPLEMENTATION_H
20 #define TIMPI_PARALLEL_IMPLEMENTATION_H
21 
22 // TIMPI includes
23 #include "timpi/attributes.h"
24 #include "timpi/communicator.h"
25 #include "timpi/data_type.h"
26 #include "timpi/timpi_call_mpi.h"
27 #include "timpi/message_tag.h"
28 #include "timpi/op_function.h"
29 #include "timpi/packing.h"
30 #include "timpi/timpi_assert.h"
38 #include "timpi/post_wait_work.h"
39 #include "timpi/request.h"
40 #include "timpi/status.h"
41 #include "timpi/standard_type.h"
42 
43 #ifndef TIMPI_HAVE_MPI
45 #endif
46 
47 // Boost include if necessary for float128
48 #ifdef TIMPI_DEFAULT_QUADRUPLE_PRECISION
49 # include <boost/multiprecision/float128.hpp>
50 #endif
51 
52 // Disable libMesh logging until we decide how to port it best
53 // #include "libmesh/libmesh_logging.h"
54 #define TIMPI_LOG_SCOPE(f,c)
55 
56 // C++ includes
57 #include <complex>
58 #include <cstddef>
59 #include <iterator>
60 #include <limits>
61 #include <map>
62 #include <memory>
63 #include <set>
64 #include <string>
65 #include <utility>
66 #include <vector>
67 #include <type_traits>
68 
69 namespace TIMPI {
70 
72 
73 #ifdef TIMPI_HAVE_MPI
74 
80 template <typename T>
81 inline data_type dataplusint_type() { return MPI_DATATYPE_NULL; }
82 
83 #endif // TIMPI_HAVE_MPI
84 
88 template <typename T>
90 {
91 public:
92  T val;
93  int rank;
94 };
95 
96 } // namespace Parallel
97 
98 
99 // Anonymous namespace for helper functions
100 namespace {
101 
102 // Internal helper function to create vector<something_usable> from
103 // vector<bool> for compatibility with MPI bitwise operations
104 template <typename T, typename A1, typename A2>
105 inline void pack_vector_bool(const std::vector<bool,A1> & vec_in,
106  std::vector<T,A2> & vec_out)
107 {
108  unsigned int data_bits = 8*sizeof(T);
109  std::size_t in_size = vec_in.size();
110  std::size_t out_size = in_size/data_bits + ((in_size%data_bits)?1:0);
111  vec_out.clear();
112  vec_out.resize(out_size);
113  for (std::size_t i=0; i != in_size; ++i)
114  {
115  std::size_t index = i/data_bits;
116  std::size_t offset = i%data_bits;
117  vec_out[index] += (vec_in[i]?1u:0u) << offset;
118  }
119 }
120 
121 // Internal helper function to create vector<bool> from
122 // vector<something usable> for compatibility with MPI byte
123 // operations
124 template <typename T, typename A1, typename A2>
125 inline void unpack_vector_bool(const std::vector<T,A1> & vec_in,
126  std::vector<bool,A2> & vec_out)
127 {
128  unsigned int data_bits = 8*sizeof(T);
129  // We need the output vector to already be properly sized
130  std::size_t out_size = vec_out.size();
131  timpi_assert_equal_to
132  (out_size/data_bits + (out_size%data_bits?1:0), vec_in.size());
133 
134  for (std::size_t i=0; i != out_size; ++i)
135  {
136  std::size_t index = i/data_bits;
137  std::size_t offset = i%data_bits;
138  vec_out[i] = (vec_in[index] >> offset) & 1;
139  }
140 }
141 
142 
143 #ifdef TIMPI_HAVE_MPI
144 // We use a helper function here to avoid ambiguity when calling
145 // send_receive of (vector<vector<T>>,vector<vector<T>>)
146 template <typename T1, typename T2, typename A1, typename A2, typename A3, typename A4>
147 inline void send_receive_vec_of_vec(const unsigned int dest_processor_id,
148  const std::vector<std::vector<T1,A1>,A2> & send_data,
149  const unsigned int source_processor_id,
150  std::vector<std::vector<T2,A3>,A4> & recv_data,
151  const TIMPI::MessageTag & send_tag,
152  const TIMPI::MessageTag & recv_tag,
153  const TIMPI::Communicator & comm)
154 {
155  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
156 
157  if (dest_processor_id == comm.rank() &&
158  source_processor_id == comm.rank())
159  {
160  recv_data = send_data;
161  return;
162  }
163 
164  TIMPI::Request req;
165  comm.send (dest_processor_id, send_data, req, send_tag);
166  comm.receive (source_processor_id, recv_data, recv_tag);
167  req.wait();
168 }
169 
170 #endif // TIMPI_HAVE_MPI
171 
172 } // Anonymous namespace
173 
174 
175 
176 namespace TIMPI
177 {
178 
179 #ifdef TIMPI_HAVE_MPI
180 template<>
181 inline data_type dataplusint_type<short int>() { return MPI_SHORT_INT; }
182 
183 template<>
184 inline data_type dataplusint_type<int>() { return MPI_2INT; }
185 
186 template<>
187 inline data_type dataplusint_type<long>() { return MPI_LONG_INT; }
188 
189 template<>
190 inline data_type dataplusint_type<float>() { return MPI_FLOAT_INT; }
191 
192 template<>
193 inline data_type dataplusint_type<double>() { return MPI_DOUBLE_INT; }
194 
195 template<>
196 inline data_type dataplusint_type<long double>() { return MPI_LONG_DOUBLE_INT; }
197 
198 template <typename T>
199 inline
200 std::pair<data_type, std::unique_ptr<StandardType<std::pair<T,int>>>>
202 {
203  std::pair<data_type, std::unique_ptr<StandardType<std::pair<T,int>>>> return_val;
204  return_val.first = dataplusint_type<T>();
205  if (return_val.first == MPI_DATATYPE_NULL)
206  {
207  return_val.second.reset(new StandardType<std::pair<T,int>>());
208  return_val.first = *return_val.second;
209  }
210  return return_val;
211 }
212 
213 
214 
215 #if MPI_VERSION > 3
216  typedef MPI_Aint DispType;
217 # define TIMPI_COUNT_TYPE MPI_COUNT
218 # define TIMPI_PACK_SIZE MPI_Pack_size_c
219 # define TIMPI_SEND MPI_Send_c
220 # define TIMPI_SSEND MPI_Ssend_c
221 # define TIMPI_ALLREDUCE MPI_Allreduce_c
222 # define TIMPI_IALLREDUCE MPI_Iallreduce_c
223 # define TIMPI_ISEND MPI_Isend_c
224 # define TIMPI_ISSEND MPI_Issend_c
225 # define TIMPI_PACK MPI_Pack_c
226 # define TIMPI_UNPACK MPI_Unpack_c
227 # define TIMPI_RECV MPI_Recv_c
228 # define TIMPI_IRECV MPI_Irecv_c
229 # define TIMPI_SENDRECV MPI_Sendrecv_c
230 # define TIMPI_ALLGATHERV MPI_Allgatherv_c
231 # define TIMPI_ALLGATHER MPI_Allgather_c
232 # define TIMPI_BCAST MPI_Bcast_c
233 # define TIMPI_GATHER MPI_Gather_c
234 # define TIMPI_GATHERV MPI_Gatherv_c
235 # define TIMPI_SCATTER MPI_Scatter_c
236 # define TIMPI_SCATTERV MPI_Scatterv_c
237 # define TIMPI_ALLTOALL MPI_Alltoall_c
238 #else
239  typedef int DispType;
240 # define TIMPI_COUNT_TYPE MPI_INT
241 # define TIMPI_PACK_SIZE MPI_Pack_size
242 # define TIMPI_SEND MPI_Send
243 # define TIMPI_SSEND MPI_Ssend
244 # define TIMPI_ALLREDUCE MPI_Allreduce
245 # define TIMPI_IALLREDUCE MPI_Iallreduce
246 # define TIMPI_ISEND MPI_Isend
247 # define TIMPI_ISSEND MPI_Issend
248 # define TIMPI_PACK MPI_Pack
249 # define TIMPI_UNPACK MPI_Unpack
250 # define TIMPI_RECV MPI_Recv
251 # define TIMPI_IRECV MPI_Irecv
252 # define TIMPI_SENDRECV MPI_Sendrecv
253 # define TIMPI_ALLGATHERV MPI_Allgatherv
254 # define TIMPI_ALLGATHER MPI_Allgather
255 # define TIMPI_BCAST MPI_Bcast
256 # define TIMPI_GATHER MPI_Gather
257 # define TIMPI_GATHERV MPI_Gatherv
258 # define TIMPI_SCATTER MPI_Scatter
259 # define TIMPI_SCATTERV MPI_Scatterv
260 # define TIMPI_ALLTOALL MPI_Alltoall
261 #endif
262 
263 
264 
265 template <typename T, typename A1, typename A2>
266 std::size_t Communicator::packed_size_of(const std::vector<std::vector<T,A1>,A2> & buf,
267  const DataType & type) const
268 {
269  // Figure out how many bytes we need to pack all the data
270  //
271  // Start with the outer buffer size
272  CountType packedsize=0;
273 
274  timpi_call_mpi
275  (TIMPI_PACK_SIZE (1, TIMPI_COUNT_TYPE, this->get(), &packedsize));
276 
277  std::size_t sendsize = packedsize;
278 
279  const std::size_t n_vecs = buf.size();
280 
281  for (std::size_t i = 0; i != n_vecs; ++i)
282  {
283  // The size of the ith inner buffer
284  timpi_call_mpi
285  (TIMPI_PACK_SIZE (1, TIMPI_COUNT_TYPE, this->get(), &packedsize));
286 
287  sendsize += packedsize;
288 
289  // The data for each inner buffer
290  timpi_call_mpi
291  (TIMPI_PACK_SIZE (cast_int<CountType>(buf[i].size()),
292  type, this->get(), &packedsize));
293 
294  sendsize += packedsize;
295  }
296 
297  timpi_assert (sendsize /* should at least be 1! */);
298  return sendsize;
299 }
300 
301 
302 template<typename T>
303 inline void Communicator::send (const unsigned int dest_processor_id,
304  const std::basic_string<T> & buf,
305  const MessageTag & tag) const
306 {
307  TIMPI_LOG_SCOPE("send()", "Parallel");
308 
309  T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());
310 
311  timpi_assert_less(dest_processor_id, this->size());
312 
313  timpi_call_mpi
314  (((this->send_mode() == SYNCHRONOUS) ?
315  TIMPI_SSEND : TIMPI_SEND)
316  (dataptr, cast_int<CountType>(buf.size()),
317  StandardType<T>(dataptr), dest_processor_id, tag.value(),
318  this->get()));
319 }
320 
321 
322 
323 template <typename T>
324 inline void Communicator::send (const unsigned int dest_processor_id,
325  const std::basic_string<T> & buf,
326  Request & req,
327  const MessageTag & tag) const
328 {
329  TIMPI_LOG_SCOPE("send()", "Parallel");
330 
331  T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());
332 
333  timpi_assert_less(dest_processor_id, this->size());
334 
335  timpi_call_mpi
336  (((this->send_mode() == SYNCHRONOUS) ?
337  TIMPI_ISSEND : TIMPI_ISEND)
338  (dataptr, cast_int<CountType>(buf.size()),
339  StandardType<T>(dataptr), dest_processor_id, tag.value(),
340  this->get(), req.get()));
341 
342  // The MessageTag should stay registered for the Request lifetime
344  (new PostWaitDereferenceTag(tag));
345 }
346 
347 
348 
349 template <typename T>
350 inline void Communicator::send (const unsigned int dest_processor_id,
351  const T & buf,
352  const MessageTag & tag) const
353 {
354  TIMPI_LOG_SCOPE("send()", "Parallel");
355 
356  T * dataptr = const_cast<T*> (&buf);
357 
358  timpi_assert_less(dest_processor_id, this->size());
359 
360  timpi_call_mpi
361  (((this->send_mode() == SYNCHRONOUS) ?
362  TIMPI_SSEND : TIMPI_SEND)
363  (dataptr, 1, StandardType<T>(dataptr), dest_processor_id,
364  tag.value(), this->get()));
365 }
366 
367 
368 
369 template <typename T>
370 inline void Communicator::send (const unsigned int dest_processor_id,
371  const T & buf,
372  Request & req,
373  const MessageTag & tag) const
374 {
375  TIMPI_LOG_SCOPE("send()", "Parallel");
376 
377  T * dataptr = const_cast<T*>(&buf);
378 
379  timpi_assert_less(dest_processor_id, this->size());
380 
381  timpi_call_mpi
382  (((this->send_mode() == SYNCHRONOUS) ?
383  TIMPI_ISSEND : TIMPI_ISEND)
384  (dataptr, 1, StandardType<T>(dataptr), dest_processor_id,
385  tag.value(), this->get(), req.get()));
386 
387  // The MessageTag should stay registered for the Request lifetime
389  (new PostWaitDereferenceTag(tag));
390 }
391 
392 
393 
394 template <typename T, typename C, typename A>
395 inline void Communicator::send (const unsigned int dest_processor_id,
396  const std::set<T,C,A> & buf,
397  const MessageTag & tag) const
398 {
399  this->send(dest_processor_id, buf,
400  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
401 }
402 
403 
404 
405 template <typename T, typename C, typename A>
406 inline void Communicator::send (const unsigned int dest_processor_id,
407  const std::set<T,C,A> & buf,
408  Request & req,
409  const MessageTag & tag) const
410 {
411  this->send(dest_processor_id, buf,
412  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
413 }
414 
415 
416 
417 template <typename T, typename C, typename A>
418 inline void Communicator::send (const unsigned int dest_processor_id,
419  const std::set<T,C,A> & buf,
420  const DataType & type,
421  const MessageTag & tag) const
422 {
423  TIMPI_LOG_SCOPE("send()", "Parallel");
424 
425  std::vector<T> vecbuf(buf.begin(), buf.end());
426  this->send(dest_processor_id, vecbuf, type, tag);
427 }
428 
429 
430 
431 template <typename T, typename C, typename A>
432 inline void Communicator::send (const unsigned int dest_processor_id,
433  const std::set<T,C,A> & buf,
434  const DataType & type,
435  Request & req,
436  const MessageTag & tag) const
437 {
438  TIMPI_LOG_SCOPE("send()", "Parallel");
439 
440  // Allocate temporary buffer on the heap so it lives until after
441  // the non-blocking send completes
442  std::vector<T> * vecbuf =
443  new std::vector<T,A>(buf.begin(), buf.end());
444 
445  // Make the Request::wait() handle deleting the buffer
446  req.add_post_wait_work
447  (new PostWaitDeleteBuffer<std::vector<T,A>>(vecbuf));
448 
449  this->send(dest_processor_id, *vecbuf, type, req, tag);
450 }
451 
452 
453 
454 template <typename T, typename A>
455 inline void Communicator::send (const unsigned int dest_processor_id,
456  const std::vector<T,A> & buf,
457  const MessageTag & tag) const
458 {
459  this->send(dest_processor_id, buf,
460  StandardType<T>(buf.empty() ? nullptr : &buf.front()), tag);
461 }
462 
463 
464 
465 template <typename T, typename A,
466  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
467 inline void Communicator::send (const unsigned int dest_processor_id,
468  const std::vector<T,A> & buf,
469  Request & req,
470  const MessageTag & tag) const
471 {
472  this->send(dest_processor_id, buf,
473  StandardType<T>(buf.empty() ? nullptr : &buf.front()), req, tag);
474 }
475 
476 template <typename T, typename A,
477  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
478 inline void Communicator::send (const unsigned int dest_processor_id,
479  const std::vector<T,A> & buf,
480  Request & req,
481  const MessageTag & tag) const
482 {
483  this->nonblocking_send_packed_range(dest_processor_id,
484  (void *)(nullptr),
485  buf.begin(),
486  buf.end(),
487  req,
488  tag);
489 }
490 
491 
492 template <typename T, typename A>
493 inline void Communicator::send (const unsigned int dest_processor_id,
494  const std::vector<T,A> & buf,
495  const DataType & type,
496  const MessageTag & tag) const
497 {
498  TIMPI_LOG_SCOPE("send()", "Parallel");
499 
500  timpi_call_mpi
501  (((this->send_mode() == SYNCHRONOUS) ?
502  TIMPI_SSEND : TIMPI_SEND)
503  (buf.empty() ? nullptr : const_cast<T*>(buf.data()),
504  cast_int<CountType>(buf.size()), type, dest_processor_id,
505  tag.value(), this->get()));
506 }
507 
508 
509 
510 template <typename T, typename A, typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
511 inline void Communicator::send (const unsigned int dest_processor_id,
512  const std::vector<T,A> & buf,
513  const DataType & type,
514  Request & req,
515  const MessageTag & tag) const
516 {
517  TIMPI_LOG_SCOPE("send()", "Parallel");
518 
519  timpi_assert_less(dest_processor_id, this->size());
520 
521  timpi_call_mpi
522  (((this->send_mode() == SYNCHRONOUS) ?
523  TIMPI_ISSEND : TIMPI_ISEND)
524  (buf.empty() ? nullptr : const_cast<T*>(buf.data()),
525  cast_int<CountType>(buf.size()), type, dest_processor_id,
526  tag.value(), this->get(), req.get()));
527 
528  // The MessageTag should stay registered for the Request lifetime
529  req.add_post_wait_work
530  (new PostWaitDereferenceTag(tag));
531 }
532 
533 template <typename T, typename A, typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
534 inline void Communicator::send (const unsigned int dest_processor_id,
535  const std::vector<T,A> & buf,
536  const NotADataType &,
537  Request & req,
538  const MessageTag & tag) const
539 {
540  TIMPI_LOG_SCOPE("send()", "Parallel");
541 
542  timpi_assert_less(dest_processor_id, this->size());
543 
544  this->nonblocking_send_packed_range(dest_processor_id,
545  (void *)(nullptr),
546  buf.begin(),
547  buf.end(),
548  req,
549  tag);
550 }
551 
552 
553 
554 template <typename T, typename A1, typename A2>
555 inline void Communicator::send (const unsigned int dest_processor_id,
556  const std::vector<std::vector<T,A1>,A2> & buf,
557  const MessageTag & tag) const
558 {
559  this->send(dest_processor_id, buf,
560  StandardType<T>((buf.empty() || buf.front().empty()) ?
561  nullptr : &(buf.front().front())), tag);
562 }
563 
564 
565 
566 template <typename T, typename A1, typename A2>
567 inline void Communicator::send (const unsigned int dest_processor_id,
568  const std::vector<std::vector<T,A1>,A2> & buf,
569  Request & req,
570  const MessageTag & tag) const
571 {
572  this->send(dest_processor_id, buf,
573  StandardType<T>((buf.empty() || buf.front().empty()) ?
574  nullptr : &(buf.front().front())), req, tag);
575 }
576 
577 
578 
579 template <typename T, typename A1, typename A2>
580 inline void Communicator::send (const unsigned int dest_processor_id,
581  const std::vector<std::vector<T,A1>,A2> & buf,
582  const DataType & type,
583  const MessageTag & tag) const
584 {
585  // We'll avoid redundant code (at the cost of using heap rather
586  // than stack buffer allocation) by reusing the non-blocking send
587  Request req;
588  this->send(dest_processor_id, buf, type, req, tag);
589  req.wait();
590 }
591 
592 
593 
594 template <typename T, typename A1, typename A2>
595 inline void Communicator::send (const unsigned int dest_processor_id,
596  const std::vector<std::vector<T,A1>,A2> & send_vecs,
597  const DataType & type,
598  Request & req,
599  const MessageTag & tag) const
600 {
601  // figure out how many bytes we need to pack all the data
602  const CountType sendsize =
603  cast_int<CountType>(this->packed_size_of(send_vecs, type));
604 
605  // temporary buffer - this will be sized in bytes
606  // and manipulated with MPI_Pack
607  std::vector<char> * sendbuf = new std::vector<char>(sendsize);
608 
609  // Pack the send buffer
610  CountType pos=0;
611 
612  // ... the size of the outer buffer
613  const std::size_t n_vecs = send_vecs.size();
614  const CountType mpi_n_vecs = cast_int<CountType>(n_vecs);
615 
616  timpi_call_mpi
617  (TIMPI_PACK (&mpi_n_vecs, 1, TIMPI_COUNT_TYPE, sendbuf->data(),
618  sendsize, &pos, this->get()));
619 
620  for (std::size_t i = 0; i != n_vecs; ++i)
621  {
622  // ... the size of the ith inner buffer
623  const CountType subvec_size =
624  cast_int<CountType>(send_vecs[i].size());
625 
626  timpi_call_mpi
627  (TIMPI_PACK (&subvec_size, 1, TIMPI_COUNT_TYPE,
628  sendbuf->data(), sendsize, &pos, this->get()));
629 
630  // ... the contents of the ith inner buffer
631  if (!send_vecs[i].empty())
632  timpi_call_mpi
633  (TIMPI_PACK (const_cast<T*>(send_vecs[i].data()),
634  subvec_size, type, sendbuf->data(), sendsize,
635  &pos, this->get()));
636  }
637 
638  timpi_assert_equal_to (pos, sendsize);
639 
640  req.add_post_wait_work
641  (new PostWaitDeleteBuffer<std::vector<char>> (sendbuf));
642 
643  this->send (dest_processor_id, *sendbuf, MPI_PACKED, req, tag);
644 }
645 
646 
647 template <typename Context, typename Iter>
648 inline void Communicator::send_packed_range (const unsigned int dest_processor_id,
649  const Context * context,
650  Iter range_begin,
651  const Iter range_end,
652  const MessageTag & tag,
653  std::size_t approx_buffer_size) const
654 {
655  // We will serialize variable size objects from *range_begin to
656  // *range_end as a sequence of plain data (e.g. ints) in this buffer
657  typedef typename std::iterator_traits<Iter>::value_type T;
658 
659  std::size_t total_buffer_size =
660  packed_range_size (context, range_begin, range_end);
661 
662  this->send(dest_processor_id, total_buffer_size, tag);
663 
664 #ifdef DEBUG
665  std::size_t used_buffer_size = 0;
666 #endif
667 
668  while (range_begin != range_end)
669  {
670  timpi_assert_greater (std::distance(range_begin, range_end), 0);
671 
672  std::vector<typename Packing<T>::buffer_type> buffer;
673 
674  const Iter next_range_begin = pack_range
675  (context, range_begin, range_end, buffer, approx_buffer_size);
676 
677  timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);
678 
679  range_begin = next_range_begin;
680 
681 #ifdef DEBUG
682  used_buffer_size += buffer.size();
683 #endif
684 
685  // Blocking send of the buffer
686  this->send(dest_processor_id, buffer, tag);
687  }
688 
689 #ifdef DEBUG
690  timpi_assert_equal_to(used_buffer_size, total_buffer_size);
691 #endif
692 }
693 
694 
695 template <typename Context, typename Iter>
696 inline void Communicator::send_packed_range (const unsigned int dest_processor_id,
697  const Context * context,
698  Iter range_begin,
699  const Iter range_end,
700  Request & req,
701  const MessageTag & tag,
702  std::size_t approx_buffer_size) const
703 {
704  // Allocate a buffer on the heap so we don't have to free it until
705  // after the Request::wait()
706  typedef typename std::iterator_traits<Iter>::value_type T;
707  typedef typename Packing<T>::buffer_type buffer_t;
708 
709  std::size_t total_buffer_size =
710  packed_range_size (context, range_begin, range_end);
711 
712  // That local variable will be gone soon; we need a send buffer that
713  // will stick around. I heard you like buffering so I put a buffer
714  // for your buffer size so you can buffer the size of your buffer.
715  std::size_t * total_buffer_size_buffer = new std::size_t;
716  *total_buffer_size_buffer = total_buffer_size;
717 
718  // Delete the buffer size's buffer when we're done
719  Request intermediate_req = request();
720  intermediate_req.add_post_wait_work
721  (new PostWaitDeleteBuffer<std::size_t>(total_buffer_size_buffer));
722  this->send(dest_processor_id, *total_buffer_size_buffer, intermediate_req, tag);
723 
724  // And don't finish up the full request until we're done with its
725  // dependencies
726  req.add_prior_request(intermediate_req);
727 
728 #ifdef DEBUG
729  std::size_t used_buffer_size = 0;
730 #endif
731 
732  while (range_begin != range_end)
733  {
734  timpi_assert_greater (std::distance(range_begin, range_end), 0);
735 
736  std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
737 
738  const Iter next_range_begin = pack_range
739  (context, range_begin, range_end, *buffer, approx_buffer_size);
740 
741  timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);
742 
743  range_begin = next_range_begin;
744 
745 #ifdef DEBUG
746  used_buffer_size += buffer->size();
747 #endif
748 
749  Request next_intermediate_req;
750 
751  Request * my_req = (range_begin == range_end) ? &req : &next_intermediate_req;
752 
753  // Make the Request::wait() handle deleting the buffer
754  my_req->add_post_wait_work
755  (new PostWaitDeleteBuffer<std::vector<buffer_t>>
756  (buffer));
757 
758  // Non-blocking send of the buffer
759  this->send(dest_processor_id, *buffer, *my_req, tag);
760 
761  if (range_begin != range_end)
762  req.add_prior_request(*my_req);
763  }
764 }
765 
766 
767 
768 
769 
770 
771 
772 template <typename Context, typename Iter>
773 inline void Communicator::nonblocking_send_packed_range (const unsigned int dest_processor_id,
774  const Context * context,
775  Iter range_begin,
776  const Iter range_end,
777  Request & req,
778  const MessageTag & tag) const
779 {
780  // Allocate a buffer on the heap so we don't have to free it until
781  // after the Request::wait()
782  typedef typename std::iterator_traits<Iter>::value_type T;
783  typedef typename Packing<T>::buffer_type buffer_t;
784 
785  if (range_begin != range_end)
786  {
787  std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
788 
789  range_begin =
790  pack_range(context,
791  range_begin,
792  range_end,
793  *buffer,
794  // MPI-2/3 can only use signed integers for size,
795  // and with this API we need to fit a non-blocking
796  // send into one buffer
797  std::numeric_limits<CountType>::max());
798 
799  if (range_begin != range_end)
800  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
801 
802  // Make the Request::wait() handle deleting the buffer
804  (new PostWaitDeleteBuffer<std::vector<buffer_t>>
805  (buffer));
806 
807  // Non-blocking send of the buffer
808  this->send(dest_processor_id, *buffer, req, tag);
809  }
810 }
811 
812 
813 template <typename T>
814 inline Status Communicator::receive (const unsigned int src_processor_id,
815  std::basic_string<T> & buf,
816  const MessageTag & tag) const
817 {
818  std::vector<T> tempbuf; // Officially C++ won't let us get a
819  // modifiable array from a string
820 
821  Status stat = this->receive(src_processor_id, tempbuf, tag);
822  buf.assign(tempbuf.begin(), tempbuf.end());
823  return stat;
824 }
825 
826 
827 
828 template <typename T>
829 inline void Communicator::receive (const unsigned int src_processor_id,
830  std::basic_string<T> & buf,
831  Request & req,
832  const MessageTag & tag) const
833 {
834  // Officially C++ won't let us get a modifiable array from a
835  // string, and we can't even put one on the stack for the
836  // non-blocking case.
837  std::vector<T> * tempbuf = new std::vector<T>();
838 
839  // We can clear the string, but the Request::wait() will need to
840  // handle copying our temporary buffer to it
841  buf.clear();
842 
844  (new PostWaitCopyBuffer<std::vector<T>,
845  std::back_insert_iterator<std::basic_string<T>>>
846  (tempbuf, std::back_inserter(buf)));
847 
848  // Make the Request::wait() then handle deleting the buffer
850  (new PostWaitDeleteBuffer<std::vector<T>>(tempbuf));
851 
852  this->receive(src_processor_id, tempbuf, req, tag);
853 }
854 
855 
856 
857 template <typename T>
858 inline Status Communicator::receive (const unsigned int src_processor_id,
859  T & buf,
860  const MessageTag & tag) const
861 {
862  TIMPI_LOG_SCOPE("receive()", "Parallel");
863 
864  // Get the status of the message, explicitly provide the
865  // datatype so we can later query the size
866  Status stat(this->probe(src_processor_id, tag), StandardType<T>(&buf));
867 
868  timpi_assert(src_processor_id < this->size() ||
869  src_processor_id == any_source);
870 
871  timpi_call_mpi
872  (TIMPI_RECV (&buf, 1, StandardType<T>(&buf), src_processor_id,
873  tag.value(), this->get(), stat.get()));
874 
875  return stat;
876 }
877 
878 
879 
880 template <typename T>
881 inline void Communicator::receive (const unsigned int src_processor_id,
882  T & buf,
883  Request & req,
884  const MessageTag & tag) const
885 {
886  TIMPI_LOG_SCOPE("receive()", "Parallel");
887 
888  timpi_assert(src_processor_id < this->size() ||
889  src_processor_id == any_source);
890 
891  timpi_call_mpi
892  (TIMPI_IRECV (&buf, 1, StandardType<T>(&buf), src_processor_id,
893  tag.value(), this->get(), req.get()));
894 
895  // The MessageTag should stay registered for the Request lifetime
897  (new PostWaitDereferenceTag(tag));
898 }
899 
900 
901 
902 template <typename T, typename C, typename A>
903 inline Status Communicator::receive (const unsigned int src_processor_id,
904  std::set<T,C,A> & buf,
905  const MessageTag & tag) const
906 {
907  return this->receive
908  (src_processor_id, buf,
909  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
910 }
911 
912 
913 
914 /*
915  * No non-blocking receives of std::set until we figure out how to
916  * resize the temporary buffer
917  */
918 #if 0
919 template <typename T, typename C, typename A>
920 inline void Communicator::receive (const unsigned int src_processor_id,
921  std::set<T,C,A> & buf,
922  Request & req,
923  const MessageTag & tag) const
924 {
925  this->receive (src_processor_id, buf,
926  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
927 }
928 #endif // 0
929 
930 
931 
932 template <typename T, typename C, typename A>
933 inline Status Communicator::receive (const unsigned int src_processor_id,
934  std::set<T,C,A> & buf,
935  const DataType & type,
936  const MessageTag & tag) const
937 {
938  TIMPI_LOG_SCOPE("receive()", "Parallel");
939 
940  std::vector<T> vecbuf;
941  Status stat = this->receive(src_processor_id, vecbuf, type, tag);
942  buf.clear();
943  buf.insert(vecbuf.begin(), vecbuf.end());
944 
945  return stat;
946 }
947 
948 
949 
950 /*
951  * No non-blocking receives of std::set until we figure out how to
952  * resize the temporary buffer
953  */
954 #if 0
955 template <typename T, typename C, typename A>
956 inline void Communicator::receive (const unsigned int src_processor_id,
957  std::set<T,C,A> & buf,
958  const DataType & type,
959  Request & req,
960  const MessageTag & tag) const
961 {
962  TIMPI_LOG_SCOPE("receive()", "Parallel");
963 
964  // Allocate temporary buffer on the heap so it lives until after
965  // the non-blocking send completes
966  std::vector<T> * vecbuf = new std::vector<T>();
967 
968  // We can clear the set, but the Request::wait() will need to
969  // handle copying our temporary buffer to it
970  buf.clear();
971 
972  req.add_post_wait_work
973  (new PostWaitCopyBuffer<std::vector<T>,
974  std::insert_iterator<std::set<T,C,A>>>
975  (*vecbuf, std::inserter(buf,buf.end())));
976 
977  // Make the Request::wait() then handle deleting the buffer
978  req.add_post_wait_work
979  (new PostWaitDeleteBuffer<std::vector<T>>(vecbuf));
980 
981  this->receive(src_processor_id, *vecbuf, type, req, tag);
982 }
983 #endif // 0
984 
985 
986 
987 template <typename T, typename A>
988 inline Status Communicator::receive (const unsigned int src_processor_id,
989  std::vector<T,A> & buf,
990  const MessageTag & tag) const
991 {
992  return this->receive
993  (src_processor_id, buf,
994  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), tag);
995 }
996 
997 
998 
999 template <typename T, typename A>
1000 inline void Communicator::receive (const unsigned int src_processor_id,
1001  std::vector<T,A> & buf,
1002  Request & req,
1003  const MessageTag & tag) const
1004 {
1005  this->receive (src_processor_id, buf,
1006  StandardType<T>(buf.empty() ? nullptr : &(*buf.begin())), req, tag);
1007 }
1008 
1009 
1010 
1011 template <typename T, typename A>
1012 inline Status Communicator::receive (const unsigned int src_processor_id,
1013  std::vector<T,A> & buf,
1014  const DataType & type,
1015  const MessageTag & tag) const
1016 {
1017  TIMPI_LOG_SCOPE("receive()", "Parallel");
1018 
1019  // Get the status of the message, explicitly provide the
1020  // datatype so we can later query the size
1021  Status stat(this->probe(src_processor_id, tag), type);
1022 
1023  buf.resize(stat.size());
1024 
1025  timpi_assert(src_processor_id < this->size() ||
1026  src_processor_id == any_source);
1027 
1028  // Use stat.source() and stat.tag() in the receive - if
1029  // src_processor_id is or tag is "any" then we want to be sure we
1030  // try to receive the same message we just probed.
1031  timpi_call_mpi
1032  (TIMPI_RECV (buf.empty() ? nullptr : buf.data(),
1033  cast_int<CountType>(buf.size()), type, stat.source(),
1034  stat.tag(), this->get(), stat.get()));
1035 
1036  timpi_assert_equal_to (cast_int<std::size_t>(stat.size()),
1037  buf.size());
1038 
1039  return stat;
1040 }
1041 
1042 
1043 
1044 template <typename T, typename A,
1045  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1046 Status Communicator::receive (const unsigned int src_processor_id,
1047  std::vector<T,A> & buf,
1048  const DataType & type,
1049  const MessageTag & tag) const
1050 {
1051  bool flag = false;
1052  Status stat;
1053  while (!flag)
1054  stat = this->packed_range_probe<T>(src_processor_id, tag, flag);
1055 
1056  Request req;
1057  this->nonblocking_receive_packed_range(src_processor_id, (void *)(nullptr),
1058  std::inserter(buf, buf.end()),
1059  type, req, stat, tag);
1060  req.wait();
1061 
1062  return stat;
1063 }
1064 
1065 
1066 template <typename T, typename A,
1067  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1068 Status Communicator::receive (const unsigned int src_processor_id,
1069  std::vector<T,A> & buf,
1070  const NotADataType &,
1071  const MessageTag & tag) const
1072 {
1073  bool flag = false;
1074  Status stat;
1075  while (!flag)
1076  stat = this->packed_range_probe<T>(src_processor_id, tag, flag);
1077 
1078  Request req;
1079  this->nonblocking_receive_packed_range(src_processor_id, (void *)(nullptr),
1080  std::inserter(buf, buf.end()),
1081  buf.data(), req, stat, tag);
1082  req.wait();
1083 
1084  return stat;
1085 }
1086 
1087 
1088 template <typename T, typename A>
1089 inline void Communicator::receive (const unsigned int src_processor_id,
1090  std::vector<T,A> & buf,
1091  const DataType & type,
1092  Request & req,
1093  const MessageTag & tag) const
1094 {
1095  TIMPI_LOG_SCOPE("receive()", "Parallel");
1096 
1097  timpi_assert(src_processor_id < this->size() ||
1098  src_processor_id == any_source);
1099 
1100  timpi_call_mpi
1101  (TIMPI_IRECV(buf.empty() ? nullptr : buf.data(),
1102  cast_int<CountType>(buf.size()), type, src_processor_id,
1103  tag.value(), this->get(), req.get()));
1104 
1105  // The MessageTag should stay registered for the Request lifetime
1106  req.add_post_wait_work
1107  (new PostWaitDereferenceTag(tag));
1108 }
1109 
1110 
1111 
1112 template <typename T, typename A1, typename A2>
1113 inline Status Communicator::receive (const unsigned int src_processor_id,
1114  std::vector<std::vector<T,A1>,A2> & buf,
1115  const MessageTag & tag) const
1116 {
1117  return this->receive
1118  (src_processor_id, buf,
1119  StandardType<T>((buf.empty() || buf.front().empty()) ?
1120  nullptr : &(buf.front().front())), tag);
1121 }
1122 
1123 
1124 
1125 template <typename T, typename A1, typename A2>
1126 inline void Communicator::receive (const unsigned int src_processor_id,
1127  std::vector<std::vector<T,A1>,A2> & buf,
1128  Request & req,
1129  const MessageTag & tag) const
1130 {
1131  this->receive (src_processor_id, buf,
1132  StandardType<T>((buf.empty() || buf.front().empty()) ?
1133  nullptr : &(buf.front().front())), req, tag);
1134 }
1135 
1136 
1137 
1138 template <typename T, typename A1, typename A2>
1139 inline Status Communicator::receive (const unsigned int src_processor_id,
1140  std::vector<std::vector<T,A1>,A2> & recv,
1141  const DataType & type,
1142  const MessageTag & tag) const
1143 {
1144  // temporary buffer - this will be sized in bytes
1145  // and manipulated with MPI_Unpack
1146  std::vector<char> recvbuf;
1147 
1148  Status stat = this->receive (src_processor_id, recvbuf, MPI_PACKED, tag);
1149 
1150  // We should at least have one header datum, for outer vector size
1151  timpi_assert (!recvbuf.empty());
1152 
1153  // Unpack the received buffer
1154  CountType bufsize = cast_int<CountType>(recvbuf.size());
1155  CountType recvsize, pos=0;
1156  timpi_call_mpi
1157  (TIMPI_UNPACK(recvbuf.data(), bufsize, &pos, &recvsize, 1,
1158  TIMPI_COUNT_TYPE, this->get()));
1159 
1160  // ... size the outer buffer
1161  recv.resize (recvsize);
1162 
1163  const std::size_t n_vecs = recvsize;
1164  for (std::size_t i = 0; i != n_vecs; ++i)
1165  {
1166  CountType subvec_size;
1167 
1168  timpi_call_mpi
1169  (TIMPI_UNPACK (recvbuf.data(), bufsize, &pos, &subvec_size, 1,
1170  TIMPI_COUNT_TYPE, this->get()));
1171 
1172  // ... size the inner buffer
1173  recv[i].resize (subvec_size);
1174 
1175  // ... unpack the inner buffer if it is not empty
1176  if (!recv[i].empty())
1177  timpi_call_mpi
1178  (TIMPI_UNPACK(recvbuf.data(), bufsize, &pos, recv[i].data(),
1179  subvec_size, type, this->get()));
1180  }
1181 
1182  return stat;
1183 }
1184 
1185 
1186 
1187 template <typename T, typename A1, typename A2>
1188 inline void Communicator::receive (const unsigned int src_processor_id,
1189  std::vector<std::vector<T,A1>,A2> & buf,
1190  const DataType & type,
1191  Request & req,
1192  const MessageTag & tag) const
1193 {
1194  // figure out how many bytes we need to receive all the data into
1195  // our properly pre-sized buf
1196  const CountType sendsize =
1197  cast_int<CountType>(this->packed_size_of(buf, type));
1198 
1199  // temporary buffer - this will be sized in bytes
1200  // and manipulated with MPI_Unpack
1201  std::vector<char> * recvbuf = new std::vector<char>(sendsize);
1202 
1203  // Get ready to receive the temporary buffer
1204  this->receive (src_processor_id, *recvbuf, MPI_PACKED, req, tag);
1205 
1206  // When we wait on the receive, we'll unpack the temporary buffer
1207  req.add_post_wait_work
1208  (new PostWaitUnpackNestedBuffer<std::vector<std::vector<T,A1>,A2>>
1209  (*recvbuf, buf, type, *this));
1210 
1211  // And then we'll free the temporary buffer
1212  req.add_post_wait_work
1213  (new PostWaitDeleteBuffer<std::vector<char>>(recvbuf));
1214 
1215  // The MessageTag should stay registered for the Request lifetime
1216  req.add_post_wait_work
1217  (new PostWaitDereferenceTag(tag));
1218 }
1219 
1220 
1221 template <typename Context, typename OutputIter, typename T>
1222 inline void Communicator::receive_packed_range (const unsigned int src_processor_id,
1223  Context * context,
1224  OutputIter out_iter,
1225  const T * output_type,
1226  const MessageTag & tag) const
1227 {
1228  typedef typename Packing<T>::buffer_type buffer_t;
1229 
1230  // Receive serialized variable size objects as sequences of buffer_t
1231  std::size_t total_buffer_size = 0;
1232  Status stat = this->receive(src_processor_id, total_buffer_size, tag);
1233 
1234  // Use stat.source() and stat.tag() in subsequent receives - if
1235  // src_processor_id is or tag is "any" then we want to be sure we
1236  // try to receive messages all corresponding to the same send.
1237 
1238  std::size_t received_buffer_size = 0;
1239 
1240  // OutputIter might not have operator= implemented; for maximum
1241  // compatibility we'll rely on its copy constructor.
1242  std::unique_ptr<OutputIter> next_out_iter =
1243  std::make_unique<OutputIter>(out_iter);
1244 
1245  while (received_buffer_size < total_buffer_size)
1246  {
1247  std::vector<buffer_t> buffer;
1248  this->receive(stat.source(), buffer, MessageTag(stat.tag()));
1249  received_buffer_size += buffer.size();
1250  auto return_out_iter = unpack_range
1251  (buffer, context, *next_out_iter, output_type);
1252  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
1253  }
1254 }
1255 
1256 
1257 
1258 // template <typename Context, typename OutputIter>
1259 // inline void Communicator::receive_packed_range (const unsigned int src_processor_id,
1260 // Context * context,
1261 // OutputIter out_iter,
1262 // Request & req,
1263 // const MessageTag & tag) const
1264 // {
1265 // typedef typename std::iterator_traits<OutputIter>::value_type T;
1266 // typedef typename Packing<T>::buffer_type buffer_t;
1267 //
1268 // // Receive serialized variable size objects as a sequence of
1269 // // buffer_t.
1270 // // Allocate a buffer on the heap so we don't have to free it until
1271 // // after the Request::wait()
1272 // std::vector<buffer_t> * buffer = new std::vector<buffer_t>();
1273 // this->receive(src_processor_id, *buffer, req, tag);
1274 //
1275 // // Make the Request::wait() handle unpacking the buffer
1276 // req.add_post_wait_work
1277 // (new PostWaitUnpackBuffer<std::vector<buffer_t>, Context, OutputIter>
1278 // (buffer, context, out_iter));
1279 //
1280 // // Make the Request::wait() then handle deleting the buffer
1281 // req.add_post_wait_work
1282 // (new PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
1283 // }
1284 
1285 template <typename Context, typename OutputIter, typename T>
1286 inline void Communicator::nonblocking_receive_packed_range (const unsigned int src_processor_id,
1287  Context * context,
1288  OutputIter out,
1289  const T * /* output_type */,
1290  Request & req,
1291  Status & stat,
1292  const MessageTag & tag) const
1293 {
1294  typedef typename Packing<T>::buffer_type buffer_t;
1295 
1296  // Receive serialized variable size objects as a sequence of
1297  // buffer_t.
1298  // Allocate a buffer on the heap so we don't have to free it until
1299  // after the Request::wait()
1300  std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
1301  this->receive(src_processor_id, *buffer, req, tag);
1302 
1303  // Make the Request::wait() handle unpacking the buffer
1304  req.add_post_wait_work
1305  (new PostWaitUnpackBuffer<std::vector<buffer_t>, Context, OutputIter, T>(*buffer, context, out));
1306 
1307  // Make the Request::wait() then handle deleting the buffer
1308  req.add_post_wait_work
1309  (new PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
1310 
1311  // The MessageTag should stay registered for the Request lifetime
1312  req.add_post_wait_work
1313  (new PostWaitDereferenceTag(tag));
1314 }
1315 
1316 
1317 
1318 template <typename T1, typename T2, typename A1, typename A2>
1319 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1320  const std::vector<T1,A1> & sendvec,
1321  const DataType & type1,
1322  const unsigned int source_processor_id,
1323  std::vector<T2,A2> & recv,
1324  const DataType & type2,
1325  const MessageTag & send_tag,
1326  const MessageTag & recv_tag) const
1327 {
1328  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1329 
1330  if (dest_processor_id == this->rank() &&
1331  source_processor_id == this->rank())
1332  {
1333  recv = sendvec;
1334  return;
1335  }
1336 
1337  Request req;
1338 
1339  this->send (dest_processor_id, sendvec, type1, req, send_tag);
1340 
1341  this->receive (source_processor_id, recv, type2, recv_tag);
1342 
1343  req.wait();
1344 }
1345 
1346 
1347 template <typename T1, typename T2, typename A1, typename A2,
1348  typename std::enable_if<Has_buffer_type<Packing<T1>>::value &&
1349  Has_buffer_type<Packing<T2>>::value, int>::type>
1350 inline
1351 void
1352 Communicator::send_receive(const unsigned int dest_processor_id,
1353  const std::vector<T1,A1> & send_data,
1354  const unsigned int source_processor_id,
1355  std::vector<T2,A2> &recv_data,
1356  const MessageTag &send_tag,
1357  const MessageTag &recv_tag) const
1358 {
1359  this->send_receive_packed_range(dest_processor_id, (void *)(nullptr),
1360  send_data.begin(), send_data.end(),
1361  source_processor_id, (void *)(nullptr),
1362  std::back_inserter(recv_data),
1363  (const T2 *)(nullptr),
1364  send_tag, recv_tag);
1365 }
1366 
1367 
1368 template <typename T, typename A,
1369  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
1370 inline
1371 void
1372 Communicator::send_receive(const unsigned int dest_processor_id,
1373  const std::vector<T,A> & send_data,
1374  const unsigned int source_processor_id,
1375  std::vector<T,A> &recv_data,
1376  const MessageTag &send_tag,
1377  const MessageTag &recv_tag) const
1378 {
1379  this->send_receive_packed_range(dest_processor_id, (void *)(nullptr),
1380  send_data.begin(), send_data.end(),
1381  source_processor_id, (void *)(nullptr),
1382  std::back_inserter(recv_data),
1383  (const T *)(nullptr),
1384  send_tag, recv_tag);
1385 }
1386 
1387 
1388 
1389 template <typename T1, typename T2,
1390  typename std::enable_if<std::is_base_of<DataType, StandardType<T1>>::value &&
1391  std::is_base_of<DataType, StandardType<T2>>::value,
1392  int>::type>
1393 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1394  const T1 & sendvec,
1395  const unsigned int source_processor_id,
1396  T2 & recv,
1397  const MessageTag & send_tag,
1398  const MessageTag & recv_tag) const
1399 {
1400  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1401 
1402  if (dest_processor_id == this->rank() &&
1403  source_processor_id == this->rank())
1404  {
1405  recv = sendvec;
1406  return;
1407  }
1408 
1409  timpi_assert_less(dest_processor_id, this->size());
1410  timpi_assert(source_processor_id < this->size() ||
1411  source_processor_id == any_source);
1412 
1413  // MPI_STATUS_IGNORE is from MPI-2; using it with some versions of
1414  // MPICH may cause a crash:
1415  // https://bugzilla.mcs.anl.gov/globus/show_bug.cgi?id=1798
1416  timpi_call_mpi
1417  (TIMPI_SENDRECV(const_cast<T1*>(&sendvec), 1, StandardType<T1>(&sendvec),
1418  dest_processor_id, send_tag.value(), &recv, 1,
1419  StandardType<T2>(&recv), source_processor_id,
1420  recv_tag.value(), this->get(), MPI_STATUS_IGNORE));
1421 }
1422 
1423 
1424 
1425 // This is both a declaration and definition for a new overloaded
1426 // function template, so we have to re-specify the default
1427 // arguments.
1428 //
1429 // We specialize on the T1==T2 case so that we can handle
1430 // send_receive-to-self with a plain copy rather than going through
1431 // MPI.
1432 template <typename T, typename A,
1433  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value,
1434  int>::type>
1435 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1436  const std::vector<T,A> & sendvec,
1437  const unsigned int source_processor_id,
1438  std::vector<T,A> & recv,
1439  const MessageTag & send_tag,
1440  const MessageTag & recv_tag) const
1441 {
1442  if (dest_processor_id == this->rank() &&
1443  source_processor_id == this->rank())
1444  {
1445  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1446  recv = sendvec;
1447  return;
1448  }
1449 
1450  const T* example = sendvec.empty() ?
1451  (recv.empty() ? nullptr : recv.data()) : sendvec.data();
1452 
1453  // Call the user-defined type version with automatic
1454  // type conversion based on template argument:
1455  this->send_receive (dest_processor_id, sendvec,
1456  StandardType<T>(example),
1457  source_processor_id, recv,
1458  StandardType<T>(example),
1459  send_tag, recv_tag);
1460 }
1461 
1462 
1463 template <typename T1, typename T2, typename A1, typename A2,
1464  typename std::enable_if<std::is_base_of<DataType, StandardType<T1>>::value &&
1465  std::is_base_of<DataType, StandardType<T2>>::value,
1466  int>::type>
1467 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1468  const std::vector<T1,A1> & sendvec,
1469  const unsigned int source_processor_id,
1470  std::vector<T2,A2> & recv,
1471  const MessageTag & send_tag,
1472  const MessageTag & recv_tag) const
1473 {
1474  // Call the user-defined type version with automatic
1475  // type conversion based on template argument:
1476  this->send_receive (dest_processor_id, sendvec,
1477  StandardType<T1>(sendvec.empty() ? nullptr : sendvec.data()),
1478  source_processor_id, recv,
1479  StandardType<T2>(recv.empty() ? nullptr : recv.data()),
1480  send_tag, recv_tag);
1481 }
1482 
1483 
1484 
1485 
1486 template <typename T1, typename T2, typename A1, typename A2, typename A3, typename A4>
1487 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1488  const std::vector<std::vector<T1,A1>,A2> & sendvec,
1489  const unsigned int source_processor_id,
1490  std::vector<std::vector<T2,A3>,A4> & recv,
1491  const MessageTag & /* send_tag */,
1492  const MessageTag & /* recv_tag */) const
1493 {
1494  // FIXME - why aren't we honoring send_tag and recv_tag here?
1495  send_receive_vec_of_vec
1496  (dest_processor_id, sendvec, source_processor_id, recv,
1497  no_tag, any_tag, *this);
1498 }
1499 
1500 
1501 
1502 // This is both a declaration and definition for a new overloaded
1503 // function template, so we have to re-specify the default arguments
1504 template <typename T, typename A1, typename A2>
1505 inline void Communicator::send_receive(const unsigned int dest_processor_id,
1506  const std::vector<std::vector<T,A1>,A2> & sendvec,
1507  const unsigned int source_processor_id,
1508  std::vector<std::vector<T,A1>,A2> & recv,
1509  const MessageTag & send_tag,
1510  const MessageTag & recv_tag) const
1511 {
1512  send_receive_vec_of_vec
1513  (dest_processor_id, sendvec, source_processor_id, recv,
1514  send_tag, recv_tag, *this);
1515 }
1516 
1517 
1518 
1519 
1520 template <typename Context1, typename RangeIter, typename Context2,
1521  typename OutputIter, typename T>
1522 inline void
1523 Communicator::send_receive_packed_range (const unsigned int dest_processor_id,
1524  const Context1 * context1,
1525  RangeIter send_begin,
1526  const RangeIter send_end,
1527  const unsigned int source_processor_id,
1528  Context2 * context2,
1529  OutputIter out_iter,
1530  const T * output_type,
1531  const MessageTag & send_tag,
1532  const MessageTag & recv_tag,
1533  std::size_t approx_buffer_size) const
1534 {
1535  TIMPI_LOG_SCOPE("send_receive()", "Parallel");
1536 
1537  timpi_assert_equal_to
1538  ((dest_processor_id == this->rank()),
1539  (source_processor_id == this->rank()));
1540 
1541  if (dest_processor_id == this->rank() &&
1542  source_processor_id == this->rank())
1543  {
1544  // We need to pack and unpack, even if we don't need to
1545  // communicate the buffer, just in case user Packing
1546  // specializations have side effects
1547 
1548  // OutputIter might not have operator= implemented; for maximum
1549  // compatibility we'll rely on its copy constructor.
1550  std::unique_ptr<OutputIter> next_out_iter =
1551  std::make_unique<OutputIter>(out_iter);
1552 
1553  typedef typename Packing<T>::buffer_type buffer_t;
1554  while (send_begin != send_end)
1555  {
1556  std::vector<buffer_t> buffer;
1557  send_begin = pack_range
1558  (context1, send_begin, send_end, buffer, approx_buffer_size);
1559  auto return_out_iter = unpack_range
1560  (buffer, context2, *next_out_iter, output_type);
1561  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
1562  }
1563  return;
1564  }
1565 
1566  Request req;
1567 
1568  this->send_packed_range (dest_processor_id, context1, send_begin, send_end,
1569  req, send_tag, approx_buffer_size);
1570 
1571  this->receive_packed_range (source_processor_id, context2, out_iter,
1572  output_type, recv_tag);
1573 
1574  req.wait();
1575 }
1576 
1577 
1578 
1579 template <typename Context, typename Iter>
1580 inline void Communicator::nonblocking_send_packed_range (const unsigned int dest_processor_id,
1581  const Context * context,
1582  Iter range_begin,
1583  const Iter range_end,
1584  Request & req,
1585  std::shared_ptr<std::vector<typename Packing<typename std::iterator_traits<Iter>::value_type>::buffer_type>> & buffer,
1586  const MessageTag & tag) const
1587 {
1588  // Allocate a buffer on the heap so we don't have to free it until
1589  // after the Request::wait()
1590  typedef typename std::iterator_traits<Iter>::value_type T;
1591  typedef typename Packing<T>::buffer_type buffer_t;
1592 
1593  if (range_begin != range_end)
1594  {
1595  if (buffer == nullptr)
1596  buffer = std::make_shared<std::vector<buffer_t>>();
1597  else
1598  buffer->clear();
1599 
1600  range_begin =
1601  pack_range(context,
1602  range_begin,
1603  range_end,
1604  *buffer,
1605  // MPI-2/3 can only use signed integers for size,
1606  // and with this API we need to fit a non-blocking
1607  // send into one buffer
1608  std::numeric_limits<CountType>::max());
1609 
1610  if (range_begin != range_end)
1611  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
1612 
1613  // Make it dereference the shared pointer (possibly freeing the buffer)
1614  req.add_post_wait_work
1615  (new PostWaitDereferenceSharedPtr<std::vector<buffer_t>>(buffer));
1616 
1617  // Non-blocking send of the buffer
1618  this->send(dest_processor_id, *buffer, req, tag);
1619  }
1620 }
1621 
1622 
1623 
1624 template <typename T, typename A>
1625 inline void Communicator::allgather(const std::basic_string<T> & sendval,
1626  std::vector<std::basic_string<T>,A> & recv,
1627  const bool identical_buffer_sizes) const
1628 {
1629  TIMPI_LOG_SCOPE ("allgather()","Parallel");
1630 
1631  timpi_assert(this->size());
1632  recv.assign(this->size(), "");
1633 
1634  // serial case
1635  if (this->size() < 2)
1636  {
1637  recv.resize(1);
1638  recv[0] = sendval;
1639  return;
1640  }
1641 
1642  std::vector<CountType>
1643  sendlengths (this->size(), 0);
1644  std::vector<DispType>
1645  displacements(this->size(), 0);
1646 
1647  const CountType mysize = cast_int<CountType>(sendval.size());
1648 
1649  if (identical_buffer_sizes)
1650  sendlengths.assign(this->size(), mysize);
1651  else
1652  // first comm step to determine buffer sizes from all processors
1653  this->allgather(mysize, sendlengths);
1654 
1655  // Find the total size of the final array and
1656  // set up the displacement offsets for each processor
1657  CountType globalsize = 0;
1658  for (unsigned int i=0; i != this->size(); ++i)
1659  {
1660  displacements[i] = globalsize;
1661  globalsize += sendlengths[i];
1662  }
1663 
1664  // Check for quick return
1665  if (globalsize == 0)
1666  return;
1667 
1668  // monolithic receive buffer
1669  std::basic_string<T> r(globalsize, 0);
1670 
1671  // and get the data from the remote processors.
1672  timpi_call_mpi
1673  (TIMPI_ALLGATHERV(const_cast<T*>(mysize ? sendval.data() : nullptr),
1674  mysize, StandardType<T>(),
1675  &r[0], sendlengths.data(), displacements.data(),
1676  StandardType<T>(), this->get()));
1677 
1678  // slice receive buffer up
1679  for (unsigned int i=0; i != this->size(); ++i)
1680  recv[i] = r.substr(displacements[i], sendlengths[i]);
1681 }
1682 
1683 
1684 
1685 inline void Communicator::broadcast (bool & data,
1686  const unsigned int root_id,
1687  const bool /* identical_sizes */) const
1688 {
1689  if (this->size() == 1)
1690  {
1691  timpi_assert (!this->rank());
1692  timpi_assert (!root_id);
1693  return;
1694  }
1695 
1696  timpi_assert_less (root_id, this->size());
1697 
1698  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1699 
1700  // We don't want to depend on MPI-2 or C++ MPI, so we don't have
1701  // MPI::BOOL available
1702  char char_data = data;
1703 
1704  timpi_assert_less(root_id, this->size());
1705 
1706  // Spread data to remote processors.
1707  timpi_call_mpi
1708  (TIMPI_BCAST (&char_data, 1, StandardType<char>(&char_data),
1709  root_id, this->get()));
1710 
1711  data = char_data;
1712 }
1713 
1714 
1715 template <typename T>
1716 inline void Communicator::broadcast (std::basic_string<T> & data,
1717  const unsigned int root_id,
1718  const bool identical_sizes) const
1719 {
1720  if (this->size() == 1)
1721  {
1722  timpi_assert (!this->rank());
1723  timpi_assert (!root_id);
1724  return;
1725  }
1726 
1727  timpi_assert_less (root_id, this->size());
1728  timpi_assert (this->verify(identical_sizes));
1729 
1730  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1731 
1732  std::size_t data_size = data.size();
1733 
1734  if (identical_sizes)
1735  timpi_assert(this->verify(data_size));
1736  else
1737  this->broadcast(data_size, root_id);
1738 
1739  std::vector<T> data_c(data_size);
1740 #ifndef NDEBUG
1741  std::basic_string<T> orig(data);
1742 #endif
1743 
1744  if (this->rank() == root_id)
1745  for (std::size_t i=0; i<data.size(); i++)
1746  data_c[i] = data[i];
1747 
1748  this->broadcast (data_c, root_id, StandardType<T>::is_fixed_type);
1749 
1750  data.assign(data_c.begin(), data_c.end());
1751 
1752 #ifndef NDEBUG
1753  if (this->rank() == root_id)
1754  timpi_assert_equal_to (data, orig);
1755 #endif
1756 }
1757 
1758 
1759 template <typename T, typename A>
1760 inline void Communicator::broadcast (std::vector<std::basic_string<T>,A> & data,
1761  const unsigned int root_id,
1762  const bool identical_sizes) const
1763 {
1764  if (this->size() == 1)
1765  {
1766  timpi_assert (!this->rank());
1767  timpi_assert (!root_id);
1768  return;
1769  }
1770 
1771  timpi_assert_less (root_id, this->size());
1772  timpi_assert (this->verify(identical_sizes));
1773 
1774  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1775 
1776  std::size_t bufsize=0;
1777  if (root_id == this->rank() || identical_sizes)
1778  {
1779  for (std::size_t i=0; i<data.size(); ++i)
1780  bufsize += data[i].size() + 1; // Add one for the string length word
1781  }
1782 
1783  if (identical_sizes)
1784  timpi_assert(this->verify(bufsize));
1785  else
1786  this->broadcast(bufsize, root_id);
1787 
1788  // Here we use unsigned int to store up to 32-bit characters
1789  std::vector<unsigned int> temp; temp.reserve(bufsize);
1790  // Pack the strings
1791  if (root_id == this->rank())
1792  {
1793  for (std::size_t i=0; i<data.size(); ++i)
1794  {
1795  temp.push_back(cast_int<unsigned int>(data[i].size()));
1796  for (std::size_t j=0; j != data[i].size(); ++j)
1801  temp.push_back(data[i][j]);
1802  }
1803  }
1804  else
1805  temp.resize(bufsize);
1806 
1807  // broad cast the packed strings
1808  this->broadcast(temp, root_id, true);
1809 
1810  // Unpack the strings
1811  if (root_id != this->rank())
1812  {
1813  data.clear();
1814  typename std::vector<unsigned int>::const_iterator iter = temp.begin();
1815  while (iter != temp.end())
1816  {
1817  std::size_t curr_len = *iter++;
1818  data.push_back(std::basic_string<T>(iter, iter+curr_len));
1819  iter += curr_len;
1820  }
1821  }
1822 }
1823 
1824 
1825 
1826 template <typename T, typename A1, typename A2>
1827 inline void Communicator::broadcast (std::vector<std::vector<T,A1>,A2> & data,
1828  const unsigned int root_id,
1829  const bool identical_sizes) const
1830 {
1831  if (this->size() == 1)
1832  {
1833  timpi_assert (!this->rank());
1834  timpi_assert (!root_id);
1835  return;
1836  }
1837 
1838  timpi_assert_less (root_id, this->size());
1839  timpi_assert (this->verify(identical_sizes));
1840 
1841  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1842 
1843  std::size_t size_sizes = data.size();
1844  if (identical_sizes)
1845  timpi_assert(this->verify(size_sizes));
1846  else
1847  this->broadcast(size_sizes, root_id);
1848  std::vector<std::size_t> sizes(size_sizes);
1849 
1850  if (root_id == this->rank() || identical_sizes)
1851  for (std::size_t i=0; i<size_sizes; ++i)
1852  sizes[i] = data[i].size();
1853 
1854  if (identical_sizes)
1855  timpi_assert(this->verify(sizes));
1856  else
1857  this->broadcast(sizes, root_id);
1858 
1859  std::size_t bufsize = 0;
1860  for (std::size_t i=0; i<size_sizes; ++i)
1861  bufsize += sizes[i];
1862 
1863  std::vector<T> temp; temp.reserve(bufsize);
1864  // Pack the vectors
1865  if (root_id == this->rank())
1866  {
1867  // The data will be packed in one long array
1868  for (std::size_t i=0; i<size_sizes; ++i)
1869  temp.insert(temp.end(), data[i].begin(), data[i].end());
1870  }
1871  else
1872  temp.resize(bufsize);
1873 
1874  // broad cast the packed data
1875  this->broadcast(temp, root_id, StandardType<T>::is_fixed_type);
1876 
1877  // Unpack the data
1878  if (root_id != this->rank())
1879  {
1880  data.clear();
1881  data.resize(size_sizes);
1882  typename std::vector<T>::const_iterator iter = temp.begin();
1883  for (std::size_t i=0; i<size_sizes; ++i)
1884  {
1885  data[i].insert(data[i].end(), iter, iter+sizes[i]);
1886  iter += sizes[i];
1887  }
1888  }
1889 }
1890 
1891 
1892 
1893 
1894 template <typename T, typename C, typename A>
1895 inline void Communicator::broadcast (std::set<T,C,A> & data,
1896  const unsigned int root_id,
1897  const bool identical_sizes) const
1898 {
1899  if (this->size() == 1)
1900  {
1901  timpi_assert (!this->rank());
1902  timpi_assert (!root_id);
1903  return;
1904  }
1905 
1906  timpi_assert_less (root_id, this->size());
1907  timpi_assert (this->verify(identical_sizes));
1908 
1909  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
1910 
1911  std::vector<T> vecdata;
1912  if (this->rank() == root_id)
1913  vecdata.assign(data.begin(), data.end());
1914 
1915  std::size_t vecsize = vecdata.size();
1916  if (identical_sizes)
1917  timpi_assert(this->verify(vecsize));
1918  else
1919  this->broadcast(vecsize, root_id);
1920  if (this->rank() != root_id)
1921  vecdata.resize(vecsize);
1922 
1923  this->broadcast(vecdata, root_id, StandardType<T>::is_fixed_type);
1924  if (this->rank() != root_id)
1925  {
1926  data.clear();
1927  data.insert(vecdata.begin(), vecdata.end());
1928  }
1929 }
1930 
1931 
1932 template <typename Context, typename OutputIter, typename T>
1933 inline void Communicator::nonblocking_receive_packed_range (const unsigned int src_processor_id,
1934  Context * context,
1935  OutputIter out,
1936  const T * /*output_type*/,
1937  Request & req,
1938  Status & stat,
1939  std::shared_ptr<std::vector<typename Packing<T>::buffer_type>> & buffer,
1940  const MessageTag & tag) const
1941 {
1942  // If they didn't pass in a buffer - let's make one
1943  if (buffer == nullptr)
1944  buffer = std::make_shared<std::vector<typename Packing<T>::buffer_type>>();
1945  else
1946  buffer->clear();
1947 
1948  // Receive serialized variable size objects as a sequence of
1949  // buffer_t.
1950  // Allocate a buffer on the heap so we don't have to free it until
1951  // after the Request::wait()
1952  buffer->resize(stat.size());
1953  this->receive(src_processor_id, *buffer, req, tag);
1954 
1955  // Make the Request::wait() handle unpacking the buffer
1956  req.add_post_wait_work
1957  (new PostWaitUnpackBuffer<std::vector<typename Packing<T>::buffer_type>, Context, OutputIter, T>(*buffer, context, out));
1958 
1959  // Make it dereference the shared pointer (possibly freeing the buffer)
1960  req.add_post_wait_work
1961  (new PostWaitDereferenceSharedPtr<std::vector<typename Packing<T>::buffer_type>>(buffer));
1962 }
1963 
1964 
1965 
1966 template <typename T, typename A, typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
1967 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
1968  std::vector<T,A> & buf,
1969  const DataType & type,
1970  Request & req,
1971  const MessageTag & tag) const
1972 {
1973  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
1974 
1975  Status stat(type);
1976 
1977  int int_flag = 0;
1978 
1979  timpi_assert(src_processor_id < this->size() ||
1980  src_processor_id == any_source);
1981 
1982  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
1983  tag.value(),
1984  this->get(),
1985  &int_flag,
1986  stat.get()));
1987 
1988  if (int_flag)
1989  {
1990  buf.resize(stat.size());
1991 
1992  src_processor_id = stat.source();
1993 
1994  timpi_call_mpi
1995  (TIMPI_IRECV(buf.data(), cast_int<CountType>(buf.size()), type,
1996  src_processor_id, tag.value(), this->get(),
1997  req.get()));
1998 
1999  // The MessageTag should stay registered for the Request lifetime
2000  req.add_post_wait_work
2001  (new PostWaitDereferenceTag(tag));
2002  }
2003 
2004  return int_flag;
2005 }
2006 
2007 template <typename T, typename A, typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
2008 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
2009  std::vector<T,A> & buf,
2010  const NotADataType &,
2011  Request & req,
2012  const MessageTag & tag) const
2013 {
2014  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
2015 
2016  return this->possibly_receive_packed_range(src_processor_id,
2017  (void *)(nullptr),
2018  std::inserter(buf, buf.end()),
2019  (T *)(nullptr),
2020  req,
2021  tag);
2022 }
2023 
2024 
2025 
2026 template <typename T, typename A1, typename A2>
2027 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
2028  std::vector<std::vector<T,A1>,A2> & buf,
2029  const DataType & type,
2030  Request & req,
2031  const MessageTag & tag) const
2032 {
2033  TIMPI_LOG_SCOPE("possibly_receive()", "Parallel");
2034 
2035  Status stat(type);
2036 
2037  int int_flag = 0;
2038 
2039  timpi_assert(src_processor_id < this->size() ||
2040  src_processor_id == any_source);
2041 
2042  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
2043  tag.value(),
2044  this->get(),
2045  &int_flag,
2046  stat.get()));
2047 
2048  if (int_flag)
2049  {
2050  src_processor_id = stat.source();
2051 
2052  std::vector<char> * recvbuf =
2053  new std::vector<char>(stat.size(StandardType<char>()));
2054 
2055  this->receive(src_processor_id, *recvbuf, MPI_PACKED, req, tag);
2056 
2057  // When we wait on the receive, we'll unpack the temporary buffer
2058  req.add_post_wait_work
2059  (new PostWaitUnpackNestedBuffer<std::vector<std::vector<T,A1>,A2>>
2060  (*recvbuf, buf, type, *this));
2061 
2062  // And then we'll free the temporary buffer
2063  req.add_post_wait_work
2064  (new PostWaitDeleteBuffer<std::vector<char>>(recvbuf));
2065 
2066  // The MessageTag should stay registered for the Request lifetime
2067  req.add_post_wait_work
2068  (new PostWaitDereferenceTag(tag));
2069  }
2070 
2071  return int_flag;
2072 }
2073 
2074 #else
2075  typedef int DispType;
2076 #endif // TIMPI_HAVE_MPI
2077 
2078 
2079 // Some of our methods are implemented indirectly via other
2080 // MPI-encapsulated methods and the implementation works with or
2081 // without MPI.
2082 //
2083 // Other methods have a "this->size() == 1" shortcut which still
2084 // applies in the without-MPI case, and the timpi_call_mpi macro
2085 // becomes a no-op so wrapped MPI methods still compile without MPI
2086 
2087 template <typename T>
2088 inline bool Communicator::verify(const T & r) const
2089 {
2090  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2091  {
2092  T tempmin = r, tempmax = r;
2093  this->min(tempmin);
2094  this->max(tempmax);
2095  bool verified = (r == tempmin) &&
2096  (r == tempmax);
2097  this->min(verified);
2098  return verified;
2099  }
2100 
2101  static_assert(Attributes<T>::has_min_max,
2102  "Tried to verify an unverifiable type");
2103 
2104  return true;
2105 }
2106 
2107 template <typename T>
2108 inline bool Communicator::semiverify(const T * r) const
2109 {
2110  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2111  {
2112  T tempmin, tempmax;
2113  if (r)
2114  tempmin = tempmax = *r;
2115  else
2116  {
2117  Attributes<T>::set_highest(tempmin);
2118  Attributes<T>::set_lowest(tempmax);
2119  }
2120  this->min(tempmin);
2121  this->max(tempmax);
2122  bool invalid = r && ((*r != tempmin) ||
2123  (*r != tempmax));
2124  this->max(invalid);
2125  return !invalid;
2126  }
2127 
2128  static_assert(Attributes<T>::has_min_max,
2129  "Tried to semiverify an unverifiable type");
2130 
2131  return true;
2132 }
2133 
2134 
2135 
2136 template <typename T, typename A>
2137 inline bool Communicator::semiverify(const std::vector<T,A> * r) const
2138 {
2139  if (this->size() > 1 && Attributes<T>::has_min_max == true)
2140  {
2141  std::size_t rsize = r ? r->size() : 0;
2142  std::size_t * psize = r ? &rsize : nullptr;
2143 
2144  if (!this->semiverify(psize))
2145  return false;
2146 
2147  this->max(rsize);
2148 
2149  std::vector<T,A> tempmin, tempmax;
2150  if (r)
2151  {
2152  tempmin = tempmax = *r;
2153  }
2154  else
2155  {
2156  tempmin.resize(rsize);
2157  tempmax.resize(rsize);
2158  Attributes<std::vector<T,A>>::set_highest(tempmin);
2159  Attributes<std::vector<T,A>>::set_lowest(tempmax);
2160  }
2161  this->min(tempmin);
2162  this->max(tempmax);
2163  bool invalid = r && ((*r != tempmin) ||
2164  (*r != tempmax));
2165  this->max(invalid);
2166  return !invalid;
2167  }
2168 
2169  static_assert(Attributes<T>::has_min_max,
2170  "Tried to semiverify a vector of an unverifiable type");
2171 
2172  return true;
2173 }
2174 
2175 
2176 
2177 
2178 template <typename T>
2179 inline void Communicator::min(const T & r,
2180  T & o,
2181  Request & req) const
2182 {
2183  if (this->size() > 1)
2184  {
2185  TIMPI_LOG_SCOPE("min()", "Parallel");
2186 
2187  timpi_call_mpi
2188  (TIMPI_IALLREDUCE(&r, &o, 1, StandardType<T>(&r),
2189  OpFunction<T>::min(), this->get(),
2190  req.get()));
2191  }
2192  else
2193  {
2194  o = r;
2195  req = Request::null_request;
2196  }
2197 }
2198 
2199 
2200 
2201 template <typename T>
2202 inline void Communicator::min(T & timpi_mpi_var(r)) const
2203 {
2204  if (this->size() > 1)
2205  {
2206  TIMPI_LOG_SCOPE("min(scalar)", "Parallel");
2207 
2208  timpi_call_mpi
2209  (TIMPI_ALLREDUCE(MPI_IN_PLACE, &r, 1,
2211  this->get()));
2212  }
2213 }
2214 
2215 
2216 
2217 template <typename T, typename A>
2218 inline void Communicator::min(std::vector<T,A> & r) const
2219 {
2220  if (this->size() > 1 && !r.empty())
2221  {
2222  TIMPI_LOG_SCOPE("min(vector)", "Parallel");
2223 
2224  timpi_assert(this->verify(r.size()));
2225 
2226  timpi_call_mpi
2227  (TIMPI_ALLREDUCE
2228  (MPI_IN_PLACE, r.data(), cast_int<CountType>(r.size()),
2229  StandardType<T>(r.data()), OpFunction<T>::min(),
2230  this->get()));
2231  }
2232 }
2233 
2234 
2235 template <typename A>
2236 inline void Communicator::min(std::vector<bool,A> & r) const
2237 {
2238  if (this->size() > 1 && !r.empty())
2239  {
2240  TIMPI_LOG_SCOPE("min(vector<bool>)", "Parallel");
2241 
2242  timpi_assert(this->verify(r.size()));
2243 
2244  std::vector<unsigned int> ruint;
2245  pack_vector_bool(r, ruint);
2246  std::vector<unsigned int> temp(ruint.size());
2247  timpi_call_mpi
2248  (TIMPI_ALLREDUCE
2249  (ruint.data(), temp.data(),
2250  cast_int<CountType>(ruint.size()),
2251  StandardType<unsigned int>(), MPI_BAND, this->get()));
2252  unpack_vector_bool(temp, r);
2253  }
2254 }
2255 
2256 
2257 template <typename T>
2258 inline void Communicator::minloc(T & r,
2259  unsigned int & min_id) const
2260 {
2261  if (this->size() > 1)
2262  {
2263  TIMPI_LOG_SCOPE("minloc(scalar)", "Parallel");
2264 
2265  DataPlusInt<T> data_in;
2266  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
2267  data_in.val = r;
2268  data_in.rank = this->rank();
2269 
2270  timpi_call_mpi
2271  (TIMPI_ALLREDUCE (MPI_IN_PLACE, &data_in, 1,
2272  dataplusint_type_acquire<T>().first,
2273  OpFunction<T>::min_location(), this->get()));
2274  r = data_in.val;
2275  min_id = data_in.rank;
2276  }
2277  else
2278  min_id = this->rank();
2279 }
2280 
2281 
2282 template <typename T, typename A1, typename A2>
2283 inline void Communicator::minloc(std::vector<T,A1> & r,
2284  std::vector<unsigned int,A2> & min_id) const
2285 {
2286  if (this->size() > 1 && !r.empty())
2287  {
2288  TIMPI_LOG_SCOPE("minloc(vector)", "Parallel");
2289 
2290  timpi_assert(this->verify(r.size()));
2291 
2292  std::vector<DataPlusInt<T>> data_in(r.size());
2293  for (std::size_t i=0; i != r.size(); ++i)
2294  {
2295  data_in[i].val = r[i];
2296  data_in[i].rank = this->rank();
2297  }
2298  std::vector<DataPlusInt<T>> data_out(r.size());
2299 
2300  timpi_call_mpi
2301  (TIMPI_ALLREDUCE (data_in.data(), data_out.data(),
2302  cast_int<CountType>(r.size()),
2303  dataplusint_type_acquire<T>().first,
2304  OpFunction<T>::min_location(), this->get()));
2305  for (std::size_t i=0; i != r.size(); ++i)
2306  {
2307  r[i] = data_out[i].val;
2308  min_id[i] = data_out[i].rank;
2309  }
2310  }
2311  else if (!r.empty())
2312  {
2313  for (std::size_t i=0; i != r.size(); ++i)
2314  min_id[i] = this->rank();
2315  }
2316 }
2317 
2318 
2319 template <typename A1, typename A2>
2320 inline void Communicator::minloc(std::vector<bool,A1> & r,
2321  std::vector<unsigned int,A2> & min_id) const
2322 {
2323  if (this->size() > 1 && !r.empty())
2324  {
2325  TIMPI_LOG_SCOPE("minloc(vector<bool>)", "Parallel");
2326 
2327  timpi_assert(this->verify(r.size()));
2328 
2329  std::vector<DataPlusInt<int>> data_in(r.size());
2330  for (std::size_t i=0; i != r.size(); ++i)
2331  {
2332  data_in[i].val = r[i];
2333  data_in[i].rank = this->rank();
2334  }
2335  std::vector<DataPlusInt<int>> data_out(r.size());
2336  timpi_call_mpi
2337  (TIMPI_ALLREDUCE
2338  (data_in.data(), data_out.data(),
2339  cast_int<CountType>(r.size()), StandardType<int>(),
2340  OpFunction<int>::min_location(), this->get()));
2341  for (std::size_t i=0; i != r.size(); ++i)
2342  {
2343  r[i] = data_out[i].val;
2344  min_id[i] = data_out[i].rank;
2345  }
2346  }
2347  else if (!r.empty())
2348  {
2349  for (std::size_t i=0; i != r.size(); ++i)
2350  min_id[i] = this->rank();
2351  }
2352 }
2353 
2354 
2355 template <typename T>
2356 inline void Communicator::max(const T & r,
2357  T & o,
2358  Request & req) const
2359 {
2360  if (this->size() > 1)
2361  {
2362  TIMPI_LOG_SCOPE("max()", "Parallel");
2363 
2364  timpi_call_mpi
2365  (TIMPI_IALLREDUCE(&r, &o, 1, StandardType<T>(&r),
2366  OpFunction<T>::max(), this->get(),
2367  req.get()));
2368  }
2369  else
2370  {
2371  o = r;
2372  req = Request::null_request;
2373  }
2374 }
2375 
2376 
2377 template <typename T>
2378 inline void Communicator::max(T & timpi_mpi_var(r)) const
2379 {
2380  if (this->size() > 1)
2381  {
2382  TIMPI_LOG_SCOPE("max(scalar)", "Parallel");
2383 
2384  timpi_call_mpi
2385  (TIMPI_ALLREDUCE (MPI_IN_PLACE, &r, 1, StandardType<T>(&r),
2386  OpFunction<T>::max(), this->get()));
2387  }
2388 }
2389 
2390 
2391 template <typename T, typename A>
2392 inline void Communicator::max(std::vector<T,A> & r) const
2393 {
2394  if (this->size() > 1 && !r.empty())
2395  {
2396  TIMPI_LOG_SCOPE("max(vector)", "Parallel");
2397 
2398  timpi_assert(this->verify(r.size()));
2399 
2400  timpi_call_mpi
2401  (TIMPI_ALLREDUCE (MPI_IN_PLACE, r.data(),
2402  cast_int<CountType>(r.size()),
2403  StandardType<T>(r.data()),
2404  OpFunction<T>::max(), this->get()));
2405  }
2406 }
2407 
2408 
2409 template <typename A>
2410 inline void Communicator::max(std::vector<bool,A> & r) const
2411 {
2412  if (this->size() > 1 && !r.empty())
2413  {
2414  TIMPI_LOG_SCOPE("max(vector<bool>)", "Parallel");
2415 
2416  timpi_assert(this->verify(r.size()));
2417 
2418  std::vector<unsigned int> ruint;
2419  pack_vector_bool(r, ruint);
2420  std::vector<unsigned int> temp(ruint.size());
2421  timpi_call_mpi
2422  (TIMPI_ALLREDUCE (ruint.data(), temp.data(),
2423  cast_int<CountType>(ruint.size()),
2424  StandardType<unsigned int>(), MPI_BOR,
2425  this->get()));
2426  unpack_vector_bool(temp, r);
2427  }
2428 }
2429 
2430 
2431 
2432 template <typename Map,
2433  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2434  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
2435  int>::type>
2436 void Communicator::map_max(Map & data) const
2437 {
2438  if (this->size() > 1)
2439  {
2440  TIMPI_LOG_SCOPE("max(map)", "Parallel");
2441 
2442  // Since the input map may have different keys on different
2443  // processors, we first gather all the keys and values, then for
2444  // each key we choose the max value over all procs. We
2445  // initialize the max with the first value we encounter rather
2446  // than some "global" minimum, since the latter is difficult to
2447  // do generically.
2448  std::vector<std::pair<typename Map::key_type, typename Map::mapped_type>>
2449  vecdata(data.begin(), data.end());
2450 
2451  this->allgather(vecdata, /*identical_buffer_sizes=*/false);
2452 
2453  data.clear();
2454 
2455  for (const auto & pr : vecdata)
2456  {
2457  // Attempt to insert this value. If it works, then the value didn't
2458  // already exist and we can go on. If it fails, compute the std::max
2459  // between the current and existing values.
2460  auto result = data.insert(pr);
2461 
2462  bool inserted = result.second;
2463 
2464  if (!inserted)
2465  {
2466  auto it = result.first;
2467  it->second = std::max(it->second, pr.second);
2468  }
2469  }
2470  }
2471 }
2472 
2473 
2474 
2475 template <typename Map,
2476  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2477  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
2478  int>::type>
2479 void Communicator::map_max(Map & data) const
2480 {
2481  if (this->size() > 1)
2482  {
2483  TIMPI_LOG_SCOPE("max(map)", "Parallel");
2484 
2485  // Since the input map may have different keys on different
2486  // processors, we first gather all the keys and values, then for
2487  // each key we choose the max value over all procs. We
2488  // initialize the max with the first value we encounter rather
2489  // than some "global" minimum, since the latter is difficult to
2490  // do generically.
2491  std::vector<typename Map::key_type> keys;
2492  std::vector<typename Map::mapped_type> vals;
2493 
2494  auto data_size = data.size();
2495  keys.reserve(data_size);
2496  vals.reserve(data_size);
2497 
2498  for (const auto & pr : data)
2499  {
2500  keys.push_back(pr.first);
2501  vals.push_back(pr.second);
2502  }
2503 
2504  this->allgather(keys, /*identical_buffer_sizes=*/false);
2505  this->allgather(vals, /*identical_buffer_sizes=*/false);
2506 
2507  data.clear();
2508 
2509  for (std::size_t i=0; i<keys.size(); ++i)
2510  {
2511  // Attempt to emplace this value. If it works, then the value didn't
2512  // already exist and we can go on. If it fails, compute the std::max
2513  // between the current and existing values.
2514  auto pr = data.emplace(keys[i], vals[i]);
2515 
2516  bool emplaced = pr.second;
2517 
2518  if (!emplaced)
2519  {
2520  auto it = pr.first;
2521  it->second = std::max(it->second, vals[i]);
2522  }
2523  }
2524  }
2525 }
2526 
2527 
2528 
2529 template <typename K, typename V, typename C, typename A>
2530 inline
2531 void Communicator::max(std::map<K,V,C,A> & data) const
2532 {
2533  this->map_max(data);
2534 }
2535 
2536 
2537 
2538 template <typename K, typename V, typename H, typename E, typename A>
2539 inline
2540 void Communicator::max(std::unordered_map<K,V,H,E,A> & data) const
2541 {
2542  this->map_max(data);
2543 }
2544 
2545 
2546 
2547 template <typename T>
2548 inline void Communicator::maxloc(T & r,
2549  unsigned int & max_id) const
2550 {
2551  if (this->size() > 1)
2552  {
2553  TIMPI_LOG_SCOPE("maxloc(scalar)", "Parallel");
2554 
2555  DataPlusInt<T> data_in;
2556  ignore(data_in); // unused ifndef TIMPI_HAVE_MPI
2557  data_in.val = r;
2558  data_in.rank = this->rank();
2559 
2560  timpi_call_mpi
2561  (TIMPI_ALLREDUCE (MPI_IN_PLACE, &data_in, 1,
2562  dataplusint_type_acquire<T>().first,
2563  OpFunction<T>::max_location(), this->get()));
2564  r = data_in.val;
2565  max_id = data_in.rank;
2566  }
2567  else
2568  max_id = this->rank();
2569 }
2570 
2571 
2572 template <typename T, typename A1, typename A2>
2573 inline void Communicator::maxloc(std::vector<T,A1> & r,
2574  std::vector<unsigned int,A2> & max_id) const
2575 {
2576  if (this->size() > 1 && !r.empty())
2577  {
2578  TIMPI_LOG_SCOPE("maxloc(vector)", "Parallel");
2579 
2580  timpi_assert(this->verify(r.size()));
2581 
2582  std::vector<DataPlusInt<T>> data_in(r.size());
2583  for (std::size_t i=0; i != r.size(); ++i)
2584  {
2585  data_in[i].val = r[i];
2586  data_in[i].rank = this->rank();
2587  }
2588  std::vector<DataPlusInt<T>> data_out(r.size());
2589 
2590  timpi_call_mpi
2591  (TIMPI_ALLREDUCE(data_in.data(), data_out.data(),
2592  cast_int<CountType>(r.size()),
2593  dataplusint_type_acquire<T>().first,
2595  this->get()));
2596  for (std::size_t i=0; i != r.size(); ++i)
2597  {
2598  r[i] = data_out[i].val;
2599  max_id[i] = data_out[i].rank;
2600  }
2601  }
2602  else if (!r.empty())
2603  {
2604  for (std::size_t i=0; i != r.size(); ++i)
2605  max_id[i] = this->rank();
2606  }
2607 }
2608 
2609 
2610 template <typename A1, typename A2>
2611 inline void Communicator::maxloc(std::vector<bool,A1> & r,
2612  std::vector<unsigned int,A2> & max_id) const
2613 {
2614  if (this->size() > 1 && !r.empty())
2615  {
2616  TIMPI_LOG_SCOPE("maxloc(vector<bool>)", "Parallel");
2617 
2618  timpi_assert(this->verify(r.size()));
2619 
2620  std::vector<DataPlusInt<int>> data_in(r.size());
2621  for (std::size_t i=0; i != r.size(); ++i)
2622  {
2623  data_in[i].val = r[i];
2624  data_in[i].rank = this->rank();
2625  }
2626  std::vector<DataPlusInt<int>> data_out(r.size());
2627  timpi_call_mpi
2628  (TIMPI_ALLREDUCE(data_in.data(), data_out.data(),
2629  cast_int<CountType>(r.size()),
2632  this->get()));
2633  for (std::size_t i=0; i != r.size(); ++i)
2634  {
2635  r[i] = data_out[i].val;
2636  max_id[i] = data_out[i].rank;
2637  }
2638  }
2639  else if (!r.empty())
2640  {
2641  for (std::size_t i=0; i != r.size(); ++i)
2642  max_id[i] = this->rank();
2643  }
2644 }
2645 
2646 
2647 template <typename T>
2648 inline void Communicator::sum(const T & r,
2649  T & o,
2650  Request & req) const
2651 {
2652 #ifdef TIMPI_HAVE_MPI
2653  if (this->size() > 1)
2654  {
2655  TIMPI_LOG_SCOPE("sum()", "Parallel");
2656 
2657  timpi_call_mpi
2658  (TIMPI_IALLREDUCE(&r, &o, 1, StandardType<T>(&r),
2659  OpFunction<T>::sum(), this->get(),
2660  req.get()));
2661  }
2662  else
2663 #endif
2664  {
2665  o = r;
2666  req = Request::null_request;
2667  }
2668 }
2669 
2670 
2671 template <typename T>
2672 inline void Communicator::sum(T & timpi_mpi_var(r)) const
2673 {
2674  if (this->size() > 1)
2675  {
2676  TIMPI_LOG_SCOPE("sum()", "Parallel");
2677 
2678  timpi_call_mpi
2679  (TIMPI_ALLREDUCE(MPI_IN_PLACE, &r, 1,
2680  StandardType<T>(&r),
2682  this->get()));
2683  }
2684 }
2685 
2686 
2687 template <typename T, typename A>
2688 inline void Communicator::sum(std::vector<T,A> & r) const
2689 {
2690  if (this->size() > 1 && !r.empty())
2691  {
2692  TIMPI_LOG_SCOPE("sum()", "Parallel");
2693 
2694  timpi_assert(this->verify(r.size()));
2695 
2696  timpi_call_mpi
2697  (TIMPI_ALLREDUCE(MPI_IN_PLACE, r.data(),
2698  cast_int<CountType>(r.size()),
2699  StandardType<T>(r.data()),
2701  this->get()));
2702  }
2703 }
2704 
2705 
2706 // We still do function overloading for complex sums - in a perfect
2707 // world we'd have a StandardSumOp to go along with StandardType...
2708 template <typename T>
2709 inline void Communicator::sum(std::complex<T> & timpi_mpi_var(r)) const
2710 {
2711  if (this->size() > 1)
2712  {
2713  TIMPI_LOG_SCOPE("sum()", "Parallel");
2714 
2715  timpi_call_mpi
2716  (TIMPI_ALLREDUCE(MPI_IN_PLACE, &r, 2,
2717  StandardType<T>(),
2719  this->get()));
2720  }
2721 }
2722 
2723 
2724 template <typename T, typename A>
2725 inline void Communicator::sum(std::vector<std::complex<T>,A> & r) const
2726 {
2727  if (this->size() > 1 && !r.empty())
2728  {
2729  TIMPI_LOG_SCOPE("sum()", "Parallel");
2730 
2731  timpi_assert(this->verify(r.size()));
2732 
2733  timpi_call_mpi
2734  (TIMPI_ALLREDUCE(MPI_IN_PLACE, r.data(),
2735  cast_int<CountType>(r.size() * 2),
2736  StandardType<T>(nullptr),
2737  OpFunction<T>::sum(), this->get()));
2738  }
2739 }
2740 
2741 
2742 
2743 // Helper function for summing std::map and std::unordered_map with
2744 // fixed type (key, value) pairs.
2745 template <typename Map,
2746  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2747  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
2748  int>::type>
2749 inline void Communicator::map_sum(Map & data) const
2750 {
2751  if (this->size() > 1)
2752  {
2753  TIMPI_LOG_SCOPE("sum(map)", "Parallel");
2754 
2755  // There may be different keys on different processors, so we
2756  // first gather all the (key, value) pairs and then insert
2757  // them, summing repeated keys, back into the map.
2758  //
2759  // Note: We don't simply use Map::value_type here because the
2760  // key type is const in that case and we don't have the proper
2761  // StandardType overloads for communicating const types.
2762  std::vector<std::pair<typename Map::key_type, typename Map::mapped_type>>
2763  vecdata(data.begin(), data.end());
2764 
2765  this->allgather(vecdata, /*identical_buffer_sizes=*/false);
2766 
2767  data.clear();
2768  for (const auto & pr : vecdata)
2769  data[pr.first] += pr.second;
2770  }
2771 }
2772 
2773 
2774 
2775 // Helper function for summing std::map and std::unordered_map with
2776 // non-fixed-type (key, value) pairs.
2777 template <typename Map,
2778  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
2779  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
2780  int>::type>
2781 inline void Communicator::map_sum(Map & data) const
2782 {
2783  if (this->size() > 1)
2784  {
2785  TIMPI_LOG_SCOPE("sum(map)", "Parallel");
2786 
2787  // There may be different keys on different processors, so we
2788  // first gather all the (key, value) pairs and then insert
2789  // them, summing repeated keys, back into the map.
2790  std::vector<typename Map::key_type> keys;
2791  std::vector<typename Map::mapped_type> vals;
2792 
2793  auto data_size = data.size();
2794  keys.reserve(data_size);
2795  vals.reserve(data_size);
2796 
2797  for (const auto & pr : data)
2798  {
2799  keys.push_back(pr.first);
2800  vals.push_back(pr.second);
2801  }
2802 
2803  this->allgather(keys, /*identical_buffer_sizes=*/false);
2804  this->allgather(vals, /*identical_buffer_sizes=*/false);
2805 
2806  data.clear();
2807 
2808  for (std::size_t i=0; i<keys.size(); ++i)
2809  data[keys[i]] += vals[i];
2810  }
2811 }
2812 
2813 
2814 
2815 template <typename K, typename V, typename C, typename A>
2816 inline void Communicator::sum(std::map<K,V,C,A> & data) const
2817 {
2818  return this->map_sum(data);
2819 }
2820 
2821 
2822 
2823 template <typename K, typename V, typename H, typename E, typename A>
2824 inline void Communicator::sum(std::unordered_map<K,V,H,E,A> & data) const
2825 {
2826  return this->map_sum(data);
2827 }
2828 
2829 
2830 
2831 template <typename T, typename A1, typename A2,
2832  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
2833 inline void Communicator::allgather(const std::vector<T,A1> & sendval,
2834  std::vector<std::vector<T,A1>,A2> & recv,
2835  const bool identical_buffer_sizes) const
2836 {
2837  TIMPI_LOG_SCOPE ("allgather()","Parallel");
2838 
2839  timpi_assert(this->size());
2840 
2841  // serial case
2842  if (this->size() < 2)
2843  {
2844  recv.resize(1);
2845  recv[0] = sendval;
2846  return;
2847  }
2848 
2849  recv.clear();
2850  recv.resize(this->size());
2851 
2852  std::vector<CountType>
2853  sendlengths (this->size(), 0);
2854  std::vector<DispType>
2855  displacements(this->size(), 0);
2856 
2857  const CountType mysize = cast_int<CountType>(sendval.size());
2858 
2859  if (identical_buffer_sizes)
2860  sendlengths.assign(this->size(), mysize);
2861  else
2862  // first comm step to determine buffer sizes from all processors
2863  this->allgather(mysize, sendlengths);
2864 
2865  // Find the total size of the final array and
2866  // set up the displacement offsets for each processor
2867  CountType globalsize = 0;
2868  for (unsigned int i=0; i != this->size(); ++i)
2869  {
2870  displacements[i] = globalsize;
2871  globalsize += sendlengths[i];
2872  }
2873 
2874  // Check for quick return
2875  if (globalsize == 0)
2876  return;
2877 
2878  // monolithic receive buffer
2879  std::vector<T,A1> r(globalsize, 0);
2880 
2881  // and get the data from the remote processors.
2882  timpi_call_mpi
2883  (TIMPI_ALLGATHERV(const_cast<T*>(mysize ? sendval.data() : nullptr),
2884  mysize, StandardType<T>(),
2885  &r[0], sendlengths.data(), displacements.data(),
2886  StandardType<T>(), this->get()));
2887 
2888  // slice receive buffer up
2889  for (unsigned int i=0; i != this->size(); ++i)
2890  recv[i].assign(r.begin()+displacements[i],
2891  r.begin()+displacements[i]+sendlengths[i]);
2892 }
2893 
2894 
2895 
2896 template <typename T, typename A1, typename A2,
2897  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
2898 inline void Communicator::allgather(const std::vector<T,A1> & sendval,
2899  std::vector<std::vector<T,A1>,A2> & recv,
2900  const bool /* identical_buffer_sizes */) const
2901 {
2902  TIMPI_LOG_SCOPE ("allgather()","Parallel");
2903 
2904  typedef typename Packing<T>::buffer_type buffer_t;
2905 
2906  std::vector<buffer_t> buffer;
2907  auto next_iter = pack_range ((void *)nullptr, sendval.begin(),
2908  sendval.end(), buffer,
2909  std::numeric_limits<CountType>::max());
2910 
2911  if (next_iter != sendval.end())
2912  timpi_error_msg("Non-blocking packed range sends cannot exceed " << std::numeric_limits<CountType>::max() << "in size");
2913 
2914  std::vector<std::vector<buffer_t>> allbuffers;
2915 
2916  timpi_assert(this->size());
2917  recv.clear();
2918  recv.resize(this->size());
2919 
2920  // Even if our vector sizes were identical, the variable-sized
2921  // data's buffer sizes might not be.
2922  this->allgather(buffer, allbuffers, false);
2923 
2924  for (processor_id_type i=0; i != this->size(); ++i)
2925  unpack_range(allbuffers[i], (void *)nullptr,
2926  std::back_inserter(recv[i]), (T*)nullptr);
2927 }
2928 
2929 
2930 
2931 template <typename T, typename C, typename A>
2932 inline void Communicator::set_union(std::set<T,C,A> & data,
2933  const unsigned int root_id) const
2934 {
2935  if (this->size() > 1)
2936  {
2937  std::vector<T> vecdata(data.begin(), data.end());
2938  this->gather(root_id, vecdata);
2939  if (this->rank() == root_id)
2940  data.insert(vecdata.begin(), vecdata.end());
2941  }
2942 }
2943 
2944 
2945 
2946 template <typename T, typename C, typename A>
2947 inline void Communicator::set_union(std::set<T,C,A> & data) const
2948 {
2949  if (this->size() > 1)
2950  {
2951  std::vector<T> vecdata(data.begin(), data.end());
2952  this->allgather(vecdata, false);
2953  data.insert(vecdata.begin(), vecdata.end());
2954  }
2955 }
2956 
2957 
2958 
2959 template <typename T, typename C, typename A>
2960 inline void Communicator::set_union(std::multiset<T,C,A> & data,
2961  const unsigned int root_id) const
2962 {
2963  if (this->size() > 1)
2964  {
2965  std::vector<T> vecdata(data.begin(), data.end());
2966  this->gather(root_id, vecdata);
2967  if (this->rank() == root_id)
2968  {
2969  // Clear first so the root's data doesn't get duplicated
2970  data.clear();
2971  data.insert(vecdata.begin(), vecdata.end());
2972  }
2973  }
2974 }
2975 
2976 
2977 template <typename T, typename C, typename A>
2978 inline void Communicator::set_union(std::multiset<T,C,A> & data) const
2979 {
2980  if (this->size() > 1)
2981  {
2982  std::vector<T> vecdata(data.begin(), data.end());
2983  this->allgather(vecdata, false);
2984 
2985  // Don't let our data duplicate itself
2986  data.clear();
2987 
2988  data.insert(vecdata.begin(), vecdata.end());
2989  }
2990 }
2991 
2992 
2993 
2994 template <typename T1, typename T2, typename C, typename A>
2995 inline void Communicator::set_union(std::map<T1,T2,C,A> & data,
2996  const unsigned int root_id) const
2997 {
2998  if (this->size() > 1)
2999  {
3000  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
3001  this->gather(root_id, vecdata);
3002 
3003  if (this->rank() == root_id)
3004  {
3005  // If we have a non-zero root_id, we still want to let pid
3006  // 0's values take precedence in the event we have duplicate
3007  // keys
3008  data.clear();
3009 
3010  data.insert(vecdata.begin(), vecdata.end());
3011  }
3012  }
3013 }
3014 
3015 
3016 
3017 template <typename T1, typename T2, typename C, typename A>
3018 inline void Communicator::set_union(std::map<T1,T2,C,A> & data) const
3019 {
3020  if (this->size() > 1)
3021  {
3022  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
3023  this->allgather(vecdata, false);
3024 
3025  // We want values on lower pids to take precedence in the event
3026  // we have duplicate keys
3027  data.clear();
3028 
3029  data.insert(vecdata.begin(), vecdata.end());
3030  }
3031 }
3032 
3033 
3034 
3035 template <typename T1, typename T2, typename C, typename A>
3036 inline void Communicator::set_union(std::multimap<T1,T2,C,A> & data,
3037  const unsigned int root_id) const
3038 {
3039  if (this->size() > 1)
3040  {
3041  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
3042  this->gather(root_id, vecdata);
3043 
3044  if (this->rank() == root_id)
3045  {
3046  // Don't let root's data duplicate itself
3047  data.clear();
3048 
3049  data.insert(vecdata.begin(), vecdata.end());
3050  }
3051  }
3052 }
3053 
3054 
3055 
3056 template <typename T1, typename T2, typename C, typename A>
3057 inline void Communicator::set_union(std::multimap<T1,T2,C,A> & data) const
3058 {
3059  if (this->size() > 1)
3060  {
3061  std::vector<std::pair<T1,T2>> vecdata(data.begin(), data.end());
3062  this->allgather(vecdata, false);
3063 
3064  // Don't let our data duplicate itself
3065  data.clear();
3066 
3067  data.insert(vecdata.begin(), vecdata.end());
3068  }
3069 }
3070 
3071 
3072 
3073 template <typename K, typename H, typename KE, typename A>
3074 inline void Communicator::set_union(std::unordered_set<K,H,KE,A> & data,
3075  const unsigned int root_id) const
3076 {
3077  if (this->size() > 1)
3078  {
3079  std::vector<K> vecdata(data.begin(), data.end());
3080  this->gather(root_id, vecdata);
3081  if (this->rank() == root_id)
3082  data.insert(vecdata.begin(), vecdata.end());
3083  }
3084 }
3085 
3086 
3087 
3088 template <typename K, typename H, typename KE, typename A>
3089 inline void Communicator::set_union(std::unordered_set<K,H,KE,A> & data) const
3090 {
3091  if (this->size() > 1)
3092  {
3093  std::vector<K> vecdata(data.begin(), data.end());
3094  this->allgather(vecdata, false);
3095  data.insert(vecdata.begin(), vecdata.end());
3096  }
3097 }
3098 
3099 
3100 
3101 template <typename K, typename H, typename KE, typename A>
3102 inline void Communicator::set_union(std::unordered_multiset<K,H,KE,A> & data,
3103  const unsigned int root_id) const
3104 {
3105  if (this->size() > 1)
3106  {
3107  std::vector<K> vecdata(data.begin(), data.end());
3108  this->gather(root_id, vecdata);
3109  if (this->rank() == root_id)
3110  {
3111  // Don't let root's data duplicate itself
3112  data.clear();
3113 
3114  data.insert(vecdata.begin(), vecdata.end());
3115  }
3116  }
3117 }
3118 
3119 
3120 
3121 template <typename K, typename H, typename KE, typename A>
3122 inline void Communicator::set_union(std::unordered_multiset<K,H,KE,A> & data) const
3123 {
3124  if (this->size() > 1)
3125  {
3126  std::vector<K> vecdata(data.begin(), data.end());
3127  this->allgather(vecdata, false);
3128 
3129  // Don't let our data duplicate itself
3130  data.clear();
3131 
3132  data.insert(vecdata.begin(), vecdata.end());
3133  }
3134 }
3135 
3136 
3137 
3138 template <typename K, typename T, typename H, typename KE, typename A>
3139 inline void Communicator::set_union(std::unordered_map<K,T,H,KE,A> & data,
3140  const unsigned int root_id) const
3141 {
3142  if (this->size() > 1)
3143  {
3144  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3145  this->gather(root_id, vecdata);
3146 
3147  if (this->rank() == root_id)
3148  {
3149  // If we have a non-zero root_id, we still want to let pid
3150  // 0's values take precedence in the event we have duplicate
3151  // keys
3152  data.clear();
3153 
3154  data.insert(vecdata.begin(), vecdata.end());
3155  }
3156  }
3157 }
3158 
3159 
3160 
3161 template <typename K, typename T, typename H, typename KE, typename A>
3162 inline void Communicator::set_union(std::unordered_map<K,T,H,KE,A> & data) const
3163 {
3164  if (this->size() > 1)
3165  {
3166  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3167  this->allgather(vecdata, false);
3168 
3169  // We want values on lower pids to take precedence in the event
3170  // we have duplicate keys
3171  data.clear();
3172 
3173  data.insert(vecdata.begin(), vecdata.end());
3174  }
3175 }
3176 
3177 
3178 
3179 template <typename K, typename T, typename H, typename KE, typename A>
3180 inline void Communicator::set_union(std::unordered_multimap<K,T,H,KE,A> & data,
3181  const unsigned int root_id) const
3182 {
3183  if (this->size() > 1)
3184  {
3185  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3186  this->gather(root_id, vecdata);
3187 
3188  if (this->rank() == root_id)
3189  {
3190  // Don't let root's data duplicate itself
3191  data.clear();
3192 
3193  data.insert(vecdata.begin(), vecdata.end());
3194  }
3195  }
3196 }
3197 
3198 
3199 
3200 template <typename K, typename T, typename H, typename KE, typename A>
3201 inline void Communicator::set_union(std::unordered_multimap<K,T,H,KE,A> & data) const
3202 {
3203  if (this->size() > 1)
3204  {
3205  std::vector<std::pair<K,T>> vecdata(data.begin(), data.end());
3206  this->allgather(vecdata, false);
3207 
3208  // Don't let our data duplicate itself
3209  data.clear();
3210 
3211  data.insert(vecdata.begin(), vecdata.end());
3212  }
3213 }
3214 
3215 
3216 
3217 template <typename T, typename A>
3218 inline void Communicator::gather(const unsigned int root_id,
3219  const T & sendval,
3220  std::vector<T,A> & recv) const
3221 {
3222  timpi_assert_less (root_id, this->size());
3223 
3224  if (this->rank() == root_id)
3225  recv.resize(this->size());
3226 
3227  if (this->size() > 1)
3228  {
3229  TIMPI_LOG_SCOPE("gather()", "Parallel");
3230 
3231  StandardType<T> send_type(&sendval);
3232 
3233  timpi_assert_less(root_id, this->size());
3234 
3235  timpi_call_mpi
3236  (TIMPI_GATHER(const_cast<T*>(&sendval), 1, send_type,
3237  recv.empty() ? nullptr : recv.data(), 1, send_type,
3238  root_id, this->get()));
3239  }
3240  else
3241  recv[0] = sendval;
3242 }
3243 
3244 
3245 
3246 template <typename T, typename A,
3247  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3248 inline void Communicator::gather(const unsigned int root_id,
3249  std::vector<T,A> & r) const
3250 {
3251  if (this->size() == 1)
3252  {
3253  timpi_assert (!this->rank());
3254  timpi_assert (!root_id);
3255  return;
3256  }
3257 
3258  timpi_assert_less (root_id, this->size());
3259 
3260  std::vector<CountType>
3261  sendlengths (this->size(), 0);
3262  std::vector<DispType>
3263  displacements(this->size(), 0);
3264 
3265  const CountType mysize = cast_int<CountType>(r.size());
3266  this->allgather(mysize, sendlengths);
3267 
3268  TIMPI_LOG_SCOPE("gather()", "Parallel");
3269 
3270  // Find the total size of the final array and
3271  // set up the displacement offsets for each processor.
3272  CountType globalsize = 0;
3273  for (unsigned int i=0; i != this->size(); ++i)
3274  {
3275  displacements[i] = globalsize;
3276  globalsize += sendlengths[i];
3277  }
3278 
3279  // Check for quick return
3280  if (globalsize == 0)
3281  return;
3282 
3283  // copy the input buffer
3284  std::vector<T,A> r_src(r);
3285 
3286  // now resize it to hold the global data
3287  // on the receiving processor
3288  if (root_id == this->rank())
3289  r.resize(globalsize);
3290 
3291  timpi_assert_less(root_id, this->size());
3292 
3293  // and get the data from the remote processors
3294  timpi_call_mpi
3295  (TIMPI_GATHERV(r_src.empty() ? nullptr : r_src.data(), mysize,
3296  StandardType<T>(), r.empty() ? nullptr : r.data(),
3297  sendlengths.data(), displacements.data(),
3298  StandardType<T>(), root_id, this->get()));
3299 }
3300 
3301 
3302 template <typename T, typename A,
3303  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3304 inline void Communicator::gather(const unsigned int root_id,
3305  std::vector<T,A> & r) const
3306 {
3307  std::vector<T,A> gathered;
3308  this->gather_packed_range(root_id, (void *)(nullptr),
3309  r.begin(), r.end(),
3310  std::inserter(gathered, gathered.end()));
3311 
3312  gathered.swap(r);
3313 }
3314 
3315 
3316 
3317 template <typename T, typename A>
3318 inline void Communicator::gather(const unsigned int root_id,
3319  const std::basic_string<T> & sendval,
3320  std::vector<std::basic_string<T>,A> & recv,
3321  const bool identical_buffer_sizes) const
3322 {
3323  timpi_assert_less (root_id, this->size());
3324 
3325  if (this->rank() == root_id)
3326  recv.resize(this->size());
3327 
3328  if (this->size() > 1)
3329  {
3330  TIMPI_LOG_SCOPE ("gather()","Parallel");
3331 
3332  std::vector<CountType>
3333  sendlengths (this->size(), 0);
3334  std::vector<DispType>
3335  displacements(this->size(), 0);
3336 
3337  const CountType mysize = cast_int<CountType>(sendval.size());
3338 
3339  if (identical_buffer_sizes)
3340  sendlengths.assign(this->size(), mysize);
3341  else
3342  // first comm step to determine buffer sizes from all processors
3343  this->gather(root_id, mysize, sendlengths);
3344 
3345  // Find the total size of the final array and
3346  // set up the displacement offsets for each processor
3347  CountType globalsize = 0;
3348  for (unsigned int i=0; i < this->size(); ++i)
3349  {
3350  displacements[i] = globalsize;
3351  globalsize += sendlengths[i];
3352  }
3353 
3354  // monolithic receive buffer
3355  std::basic_string<T> r;
3356  if (this->rank() == root_id)
3357  r.resize(globalsize, 0);
3358 
3359  timpi_assert_less(root_id, this->size());
3360 
3361  // and get the data from the remote processors.
3362  timpi_call_mpi
3363  (TIMPI_GATHERV(const_cast<T*>(sendval.data()),
3364  mysize, StandardType<T>(),
3365  this->rank() == root_id ? &r[0] : nullptr,
3366  sendlengths.data(), displacements.data(),
3367  StandardType<T>(), root_id, this->get()));
3368 
3369  // slice receive buffer up
3370  if (this->rank() == root_id)
3371  for (unsigned int i=0; i != this->size(); ++i)
3372  recv[i] = r.substr(displacements[i], sendlengths[i]);
3373  }
3374  else
3375  recv[0] = sendval;
3376 }
3377 
3378 
3379 
3380 template <typename T, typename A,
3381  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3382 inline void Communicator::allgather(const T & sendval,
3383  std::vector<T,A> & recv) const
3384 {
3385  TIMPI_LOG_SCOPE ("allgather()","Parallel");
3386 
3387  timpi_assert(this->size());
3388  recv.resize(this->size());
3389 
3390  const unsigned int comm_size = this->size();
3391  if (comm_size > 1)
3392  {
3393  StandardType<T> send_type(&sendval);
3394 
3395  timpi_call_mpi
3396  (TIMPI_ALLGATHER(const_cast<T*>(&sendval), 1, send_type, recv.data(), 1,
3397  send_type, this->get()));
3398  }
3399  else if (comm_size > 0)
3400  recv[0] = sendval;
3401 }
3402 
3403 template <typename T, typename A,
3404  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3405 inline void Communicator::allgather(const T & sendval,
3406  std::vector<T,A> & recv) const
3407 {
3408  TIMPI_LOG_SCOPE ("allgather()","Parallel");
3409 
3410  timpi_assert(this->size());
3411  recv.resize(this->size());
3412 
3413  static const std::size_t approx_total_buffer_size = 1e8;
3414  const std::size_t approx_each_buffer_size =
3415  approx_total_buffer_size / this->size();
3416 
3417  unsigned int comm_size = this->size();
3418  if (comm_size > 1)
3419  {
3420  std::vector<T> range = {sendval};
3421 
3422  allgather_packed_range((void *)(nullptr), range.begin(), range.end(), recv.begin(),
3423  approx_each_buffer_size);
3424  }
3425  else if (comm_size > 0)
3426  recv[0] = sendval;
3427 }
3428 
3429 template <typename T, typename A,
3430  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3431 inline void Communicator::allgather(std::vector<T,A> & r,
3432  const bool identical_buffer_sizes) const
3433 {
3434  if (this->size() < 2)
3435  return;
3436 
3437  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3438 
3439  if (identical_buffer_sizes)
3440  {
3441  timpi_assert(this->verify(r.size()));
3442  if (r.empty())
3443  return;
3444 
3445  std::vector<T,A> r_src(r.size()*this->size());
3446  r_src.swap(r);
3447  StandardType<T> send_type(r_src.data());
3448 
3449  timpi_call_mpi
3450  (TIMPI_ALLGATHER(r_src.data(), cast_int<CountType>(r_src.size()),
3451  send_type, r.data(), cast_int<CountType>(r_src.size()),
3452  send_type, this->get()));
3453  // timpi_assert(this->verify(r));
3454  return;
3455  }
3456 
3457  std::vector<CountType>
3458  sendlengths (this->size(), 0);
3459  std::vector<DispType>
3460  displacements(this->size(), 0);
3461 
3462  const CountType mysize = cast_int<CountType>(r.size());
3463  this->allgather(mysize, sendlengths);
3464 
3465  // Find the total size of the final array and
3466  // set up the displacement offsets for each processor.
3467  CountType globalsize = 0;
3468  for (unsigned int i=0; i != this->size(); ++i)
3469  {
3470  displacements[i] = globalsize;
3471  globalsize += sendlengths[i];
3472  }
3473 
3474  // Check for quick return
3475  if (globalsize == 0)
3476  return;
3477 
3478  // copy the input buffer
3479  std::vector<T,A> r_src(globalsize);
3480  r_src.swap(r);
3481 
3482  StandardType<T> send_type(r.data());
3483 
3484  // and get the data from the remote processors.
3485  // Pass nullptr if our vector is empty.
3486  timpi_call_mpi
3487  (TIMPI_ALLGATHERV(r_src.empty() ? nullptr : r_src.data(), mysize,
3488  send_type, r.data(), sendlengths.data(),
3489  displacements.data(), send_type, this->get()));
3490 }
3491 
3492 template <typename T, typename A,
3493  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3494 inline void Communicator::allgather(std::vector<T,A> & r,
3495  const bool identical_buffer_sizes) const
3496 {
3497  if (this->size() < 2)
3498  return;
3499 
3500  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3501 
3502  if (identical_buffer_sizes)
3503  {
3504  timpi_assert(this->verify(r.size()));
3505  if (r.empty())
3506  return;
3507 
3508 
3509  std::vector<T,A> r_src(r.size()*this->size());
3510  r_src.swap(r);
3511 
3512  this->allgather_packed_range((void *)(nullptr),
3513  r_src.begin(),
3514  r_src.end(),
3515  r.begin());
3516  return;
3517  }
3518 
3519  std::vector<CountType>
3520  sendlengths (this->size(), 0);
3521  std::vector<DispType>
3522  displacements(this->size(), 0);
3523 
3524  const CountType mysize = cast_int<CountType>(r.size());
3525  this->allgather(mysize, sendlengths);
3526 
3527  // Find the total size of the final array
3528  CountType globalsize = 0;
3529  for (unsigned int i=0; i != this->size(); ++i)
3530  globalsize += sendlengths[i];
3531 
3532  // Check for quick return
3533  if (globalsize == 0)
3534  return;
3535 
3536  // copy the input buffer
3537  std::vector<T,A> r_src(globalsize);
3538  r_src.swap(r);
3539 
3540  this->allgather_packed_range((void *)(nullptr),
3541  r_src.begin(),
3542  r_src.end(),
3543  r.begin());
3544 }
3545 
3546 template <typename T, typename A>
3547 inline void Communicator::allgather(std::vector<std::basic_string<T>,A> & r,
3548  const bool identical_buffer_sizes) const
3549 {
3550  if (this->size() < 2)
3551  return;
3552 
3553  TIMPI_LOG_SCOPE("allgather()", "Parallel");
3554 
3555  if (identical_buffer_sizes)
3556  {
3557  timpi_assert(this->verify(r.size()));
3558 
3559  // identical_buffer_sizes doesn't buy us much since we have to
3560  // communicate the lengths of strings within each buffer anyway
3561  if (r.empty())
3562  return;
3563  }
3564 
3565  // Concatenate the input buffer into a send buffer, and keep track
3566  // of input string lengths
3567  std::vector<CountType> mystrlengths (r.size());
3568  std::vector<T> concat_src;
3569 
3570  CountType myconcatsize = 0;
3571  for (std::size_t i=0; i != r.size(); ++i)
3572  {
3573  CountType stringlen = cast_int<CountType>(r[i].size());
3574  mystrlengths[i] = stringlen;
3575  myconcatsize += stringlen;
3576  }
3577  concat_src.reserve(myconcatsize);
3578  for (std::size_t i=0; i != r.size(); ++i)
3579  concat_src.insert
3580  (concat_src.end(), r[i].begin(), r[i].end());
3581 
3582  // Get the string lengths from all other processors
3583  std::vector<CountType> strlengths = mystrlengths;
3584  this->allgather(strlengths, identical_buffer_sizes);
3585 
3586  // We now know how many strings we'll be receiving
3587  r.resize(strlengths.size());
3588 
3589  // Get the concatenated data sizes from all other processors
3590  std::vector<CountType> concat_sizes;
3591  this->allgather(myconcatsize, concat_sizes);
3592 
3593  // Find the total size of the final concatenated array and
3594  // set up the displacement offsets for each processor.
3595  std::vector<DispType> displacements(this->size(), 0);
3596  CountType globalsize = 0;
3597  for (unsigned int i=0; i != this->size(); ++i)
3598  {
3599  displacements[i] = globalsize;
3600  globalsize += concat_sizes[i];
3601  }
3602 
3603  // Check for quick return
3604  if (globalsize == 0)
3605  return;
3606 
3607  // Get the concatenated data from the remote processors.
3608  // Pass nullptr if our vector is empty.
3609  std::vector<T> concat(globalsize);
3610 
3611  // We may have concat_src.empty(), but we know concat has at least
3612  // one element we can use as an example for StandardType
3613  StandardType<T> send_type(concat.data());
3614 
3615  timpi_call_mpi
3616  (TIMPI_ALLGATHERV(concat_src.empty() ?
3617  nullptr : concat_src.data(), myconcatsize,
3618  send_type, concat.data(), concat_sizes.data(),
3619  displacements.data(), send_type, this->get()));
3620 
3621  // Finally, split concatenated data into strings
3622  const T * begin = concat.data();
3623  for (std::size_t i=0; i != r.size(); ++i)
3624  {
3625  const T * end = begin + strlengths[i];
3626  r[i].assign(begin, end);
3627  begin = end;
3628  }
3629 }
3630 
3631 
3632 
3633 template <typename T, typename A>
3634 void Communicator::scatter(const std::vector<T,A> & data,
3635  T & recv,
3636  const unsigned int root_id) const
3637 {
3638  ignore(root_id); // Only needed for MPI and/or dbg/devel
3639  timpi_assert_less (root_id, this->size());
3640 
3641  // Do not allow the root_id to scatter a nullptr vector.
3642  // That would leave recv in an indeterminate state.
3643  timpi_assert (this->rank() != root_id || this->size() == data.size());
3644 
3645  if (this->size() == 1)
3646  {
3647  timpi_assert (!this->rank());
3648  timpi_assert (!root_id);
3649  recv = data[0];
3650  return;
3651  }
3652 
3653  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3654 
3655  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3656  ignore(data_ptr); // unused ifndef TIMPI_HAVE_MPI
3657 
3658  timpi_assert_less(root_id, this->size());
3659 
3660  timpi_call_mpi
3661  (TIMPI_SCATTER(data_ptr, 1, StandardType<T>(data_ptr),
3662  &recv, 1, StandardType<T>(&recv), root_id, this->get()));
3663 }
3664 
3665 
3666 
3667 template <typename T, typename A>
3668 void Communicator::scatter(const std::vector<T,A> & data,
3669  std::vector<T,A> & recv,
3670  const unsigned int root_id) const
3671 {
3672  timpi_assert_less (root_id, this->size());
3673 
3674  if (this->size() == 1)
3675  {
3676  timpi_assert (!this->rank());
3677  timpi_assert (!root_id);
3678  recv.assign(data.begin(), data.end());
3679  return;
3680  }
3681 
3682  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3683 
3684  std::size_t recv_buffer_size = 0;
3685  if (this->rank() == root_id)
3686  {
3687  timpi_assert(data.size() % this->size() == 0);
3688  recv_buffer_size = cast_int<std::size_t>(data.size() / this->size());
3689  }
3690 
3691  this->broadcast(recv_buffer_size);
3692  recv.resize(recv_buffer_size);
3693 
3694  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3695  T * recv_ptr = recv.empty() ? nullptr : recv.data();
3696  ignore(data_ptr, recv_ptr); // unused ifndef TIMPI_HAVE_MPI
3697 
3698  timpi_assert_less(root_id, this->size());
3699 
3700  timpi_call_mpi
3701  (TIMPI_SCATTER(data_ptr, recv_buffer_size, StandardType<T>(data_ptr),
3702  recv_ptr, recv_buffer_size, StandardType<T>(recv_ptr),
3703  root_id, this->get()));
3704 }
3705 
3706 
3707 
3708 template <typename T, typename A1, typename A2>
3709 void Communicator::scatter(const std::vector<T,A1> & data,
3710  const std::vector<CountType,A2> counts,
3711  std::vector<T,A1> & recv,
3712  const unsigned int root_id) const
3713 {
3714  timpi_assert_less (root_id, this->size());
3715 
3716  if (this->size() == 1)
3717  {
3718  timpi_assert (!this->rank());
3719  timpi_assert (!root_id);
3720  timpi_assert (counts.size() == this->size());
3721  recv.assign(data.begin(), data.begin() + counts[0]);
3722  return;
3723  }
3724 
3725  std::vector<DispType> displacements(this->size(), 0);
3726  if (root_id == this->rank())
3727  {
3728  timpi_assert(counts.size() == this->size());
3729 
3730  // Create a displacements vector from the incoming counts vector
3731  std::size_t globalsize = 0;
3732  for (unsigned int i=0; i < this->size(); ++i)
3733  {
3734  displacements[i] = globalsize;
3735  globalsize += counts[i];
3736  }
3737 
3738  timpi_assert(data.size() == globalsize);
3739  }
3740 
3741  TIMPI_LOG_SCOPE("scatter()", "Parallel");
3742 
3743  // Scatter the buffer sizes to size remote buffers
3744  CountType recv_buffer_size = 0;
3745  this->scatter(counts, recv_buffer_size, root_id);
3746  recv.resize(recv_buffer_size);
3747 
3748  T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
3749  CountType * count_ptr = const_cast<CountType*>(counts.empty() ? nullptr : counts.data());
3750  T * recv_ptr = recv.empty() ? nullptr : recv.data();
3751  ignore(data_ptr, count_ptr, recv_ptr); // unused ifndef TIMPI_HAVE_MPI
3752 
3753  timpi_assert_less(root_id, this->size());
3754 
3755  // Scatter the non-uniform chunks
3756  timpi_call_mpi
3757  (TIMPI_SCATTERV(data_ptr, count_ptr, displacements.data(), StandardType<T>(data_ptr),
3758  recv_ptr, recv_buffer_size, StandardType<T>(recv_ptr), root_id, this->get()));
3759 }
3760 
3761 
3762 #ifdef TIMPI_HAVE_MPI
3763 #if MPI_VERSION > 3
3764 
3767 template <typename T, typename A1, typename A2>
3768 void Communicator::scatter(const std::vector<T,A1> & data,
3769  const std::vector<int,A2> counts,
3770  std::vector<T,A1> & recv,
3771  const unsigned int root_id) const
3772 {
3773  std::vector<CountType> full_counts(counts.begin(), counts.end());
3774  this->scatter(data, full_counts, recv, root_id);
3775 }
3776 #endif
3777 #endif
3778 
3779 
3780 
3781 template <typename T, typename A1, typename A2>
3782 void Communicator::scatter(const std::vector<std::vector<T,A1>,A2> & data,
3783  std::vector<T,A1> & recv,
3784  const unsigned int root_id,
3785  const bool identical_buffer_sizes) const
3786 {
3787  timpi_assert_less (root_id, this->size());
3788 
3789  if (this->size() == 1)
3790  {
3791  timpi_assert (!this->rank());
3792  timpi_assert (!root_id);
3793  timpi_assert (data.size() == this->size());
3794  recv.assign(data[0].begin(), data[0].end());
3795  return;
3796  }
3797 
3798  std::vector<T,A1> stacked_data;
3799  std::vector<CountType> counts;
3800 
3801  if (root_id == this->rank())
3802  {
3803  timpi_assert (data.size() == this->size());
3804 
3805  if (!identical_buffer_sizes)
3806  counts.resize(this->size());
3807 
3808  for (std::size_t i=0; i < data.size(); ++i)
3809  {
3810  if (!identical_buffer_sizes)
3811  counts[i] = cast_int<CountType>(data[i].size());
3812 #ifndef NDEBUG
3813  else
3814  // Check that buffer sizes are indeed equal
3815  timpi_assert(!i || data[i-1].size() == data[i].size());
3816 #endif
3817  std::copy(data[i].begin(), data[i].end(), std::back_inserter(stacked_data));
3818  }
3819  }
3820 
3821  if (identical_buffer_sizes)
3822  this->scatter(stacked_data, recv, root_id);
3823  else
3824  this->scatter(stacked_data, counts, recv, root_id);
3825 }
3826 
3827 
3828 
3829 template <typename T, typename A>
3830 inline void Communicator::alltoall(std::vector<T,A> & buf) const
3831 {
3832  if (this->size() < 2 || buf.empty())
3833  return;
3834 
3835  TIMPI_LOG_SCOPE("alltoall()", "Parallel");
3836 
3837  // the per-processor size. this is the same for all
3838  // processors using MPI_Alltoall, could be variable
3839  // using MPI_Alltoallv
3840  const CountType size_per_proc =
3841  cast_int<CountType>(buf.size()/this->size());
3842  ignore(size_per_proc);
3843 
3844  timpi_assert_equal_to (buf.size()%this->size(), 0);
3845 
3846  timpi_assert(this->verify(size_per_proc));
3847 
3848  StandardType<T> send_type(buf.data());
3849 
3850  timpi_call_mpi
3851  (TIMPI_ALLTOALL(MPI_IN_PLACE, size_per_proc, send_type, buf.data(),
3852  size_per_proc, send_type, this->get()));
3853 }
3854 
3855 
3856 
3857 template <typename T
3858 #ifdef TIMPI_HAVE_MPI
3859  ,
3860  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type
3861 #endif
3862  >
3863 inline void Communicator::broadcast (T & timpi_mpi_var(data),
3864  const unsigned int root_id,
3865  const bool /* identical_sizes */) const
3866 {
3867  ignore(root_id); // Only needed for MPI and/or dbg/devel
3868  if (this->size() == 1)
3869  {
3870  timpi_assert (!this->rank());
3871  timpi_assert (!root_id);
3872  return;
3873  }
3874 
3875  timpi_assert_less (root_id, this->size());
3876 
3877  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3878 
3879  // Spread data to remote processors.
3880  timpi_call_mpi
3881  (TIMPI_BCAST(&data, 1, StandardType<T>(&data), root_id,
3882  this->get()));
3883 }
3884 
3885 #ifdef TIMPI_HAVE_MPI
3886 template <typename T,
3887  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3888 inline void Communicator::broadcast (T & data,
3889  const unsigned int root_id,
3890  const bool /* identical_sizes */) const
3891 {
3892  ignore(root_id); // Only needed for MPI and/or dbg/devel
3893  if (this->size() == 1)
3894  {
3895  timpi_assert (!this->rank());
3896  timpi_assert (!root_id);
3897  return;
3898  }
3899 
3900  timpi_assert_less (root_id, this->size());
3901 
3902 // // If we don't have MPI, then we should be done, and calling the below can
3903 // // have the side effect of instantiating Packing<T> classes that are not
3904 // // defined. (Normally we would be calling a more specialized overload of
3905 // // broacast that would then call broadcast_packed_range with appropriate
3906 // // template arguments)
3907 // #ifdef TIMPI_HAVE_MPI
3908  std::vector<T> range = {data};
3909 
3910  this->broadcast_packed_range((void *)(nullptr),
3911  range.begin(),
3912  range.end(),
3913  (void *)(nullptr),
3914  range.begin(),
3915  root_id);
3916 
3917  data = range[0];
3918 // #endif
3919 }
3920 #endif
3921 
3922 template <typename T, typename A,
3923  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
3924 inline void Communicator::broadcast (std::vector<T,A> & timpi_mpi_var(data),
3925  const unsigned int root_id,
3926  const bool timpi_mpi_var(identical_sizes)) const
3927 {
3928  ignore(root_id); // Only needed for MPI and/or dbg/devel
3929  if (this->size() == 1)
3930  {
3931  timpi_assert (!this->rank());
3932  timpi_assert (!root_id);
3933  return;
3934  }
3935 
3936 #ifdef TIMPI_HAVE_MPI
3937 
3938  timpi_assert_less (root_id, this->size());
3939  timpi_assert (this->verify(identical_sizes));
3940 
3941  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3942 
3943  std::size_t data_size = data.size();
3944 
3945  if (identical_sizes)
3946  timpi_assert(this->verify(data_size));
3947  else
3948  this->broadcast(data_size, root_id);
3949 
3950  data.resize(data_size);
3951 
3952  // and get the data from the remote processors.
3953  // Pass nullptr if our vector is empty.
3954  T * data_ptr = data.empty() ? nullptr : data.data();
3955 
3956  timpi_assert_less(root_id, this->size());
3957 
3958  timpi_call_mpi
3959  (TIMPI_BCAST(data_ptr, cast_int<CountType>(data.size()),
3960  StandardType<T>(data_ptr), root_id, this->get()));
3961 #endif
3962 }
3963 
3964 template <typename T, typename A,
3965  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
3966 inline void Communicator::broadcast (std::vector<T,A> & data,
3967  const unsigned int root_id,
3968  const bool identical_sizes) const
3969 {
3970  if (this->size() == 1)
3971  {
3972  timpi_assert (!this->rank());
3973  timpi_assert (!root_id);
3974  return;
3975  }
3976 
3977  timpi_assert_less (root_id, this->size());
3978  timpi_assert (this->verify(identical_sizes));
3979 
3980  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
3981 
3982  std::size_t data_size = data.size();
3983 
3984  if (identical_sizes)
3985  timpi_assert(this->verify(data_size));
3986  else
3987  this->broadcast(data_size, root_id);
3988 
3989  data.resize(data_size);
3990 
3991  timpi_assert_less(root_id, this->size());
3992 
3993  this->broadcast_packed_range((void *)(nullptr),
3994  data.begin(),
3995  data.end(),
3996  (void *)(nullptr),
3997  data.begin(),
3998  root_id);
3999 }
4000 
4001 template <typename Map,
4002  typename std::enable_if<std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
4003  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value,
4004  int>::type>
4005 inline void Communicator::map_broadcast(Map & timpi_mpi_var(data),
4006  const unsigned int root_id,
4007  const bool timpi_mpi_var(identical_sizes)) const
4008 {
4009  ignore(root_id); // Only needed for MPI and/or dbg/devel
4010  if (this->size() == 1)
4011  {
4012  timpi_assert (!this->rank());
4013  timpi_assert (!root_id);
4014  return;
4015  }
4016 
4017 #ifdef TIMPI_HAVE_MPI
4018  timpi_assert_less (root_id, this->size());
4019  timpi_assert (this->verify(identical_sizes));
4020 
4021  TIMPI_LOG_SCOPE("broadcast(map)", "Parallel");
4022 
4023  std::size_t data_size=data.size();
4024  if (identical_sizes)
4025  timpi_assert(this->verify(data_size));
4026  else
4027  this->broadcast(data_size, root_id);
4028 
4029  std::vector<std::pair<typename Map::key_type,
4030  typename Map::mapped_type>> comm_data;
4031 
4032  if (root_id == this->rank())
4033  comm_data.assign(data.begin(), data.end());
4034  else
4035  comm_data.resize(data_size);
4036 
4037  this->broadcast(comm_data, root_id, true);
4038 
4039  if (this->rank() != root_id)
4040  {
4041  data.clear();
4042  data.insert(comm_data.begin(), comm_data.end());
4043  }
4044 #endif
4045 }
4046 
4047 template <typename Map,
4048  typename std::enable_if<!(std::is_base_of<DataType, StandardType<typename Map::key_type>>::value &&
4049  std::is_base_of<DataType, StandardType<typename Map::mapped_type>>::value),
4050  int>::type>
4051 inline void Communicator::map_broadcast(Map & timpi_mpi_var(data),
4052  const unsigned int root_id,
4053  const bool timpi_mpi_var(identical_sizes)) const
4054 {
4055  ignore(root_id); // Only needed for MPI and/or dbg/devel
4056  if (this->size() == 1)
4057  {
4058  timpi_assert (!this->rank());
4059  timpi_assert (!root_id);
4060  return;
4061  }
4062 
4063 #ifdef TIMPI_HAVE_MPI
4064  timpi_assert_less (root_id, this->size());
4065  timpi_assert (this->verify(identical_sizes));
4066 
4067  TIMPI_LOG_SCOPE("broadcast()", "Parallel");
4068 
4069  std::size_t data_size=data.size();
4070  if (identical_sizes)
4071  timpi_assert(this->verify(data_size));
4072  else
4073  this->broadcast(data_size, root_id);
4074 
4075  std::vector<typename Map::key_type> pair_first; pair_first.reserve(data_size);
4076  std::vector<typename Map::mapped_type> pair_second; pair_first.reserve(data_size);
4077 
4078  if (root_id == this->rank())
4079  {
4080  for (const auto & pr : data)
4081  {
4082  pair_first.push_back(pr.first);
4083  pair_second.push_back(pr.second);
4084  }
4085  }
4086  else
4087  {
4088  pair_first.resize(data_size);
4089  pair_second.resize(data_size);
4090  }
4091 
4092  this->broadcast
4093  (pair_first, root_id,
4095  this->broadcast
4096  (pair_second, root_id,
4098 
4099  timpi_assert(pair_first.size() == pair_first.size());
4100 
4101  if (this->rank() != root_id)
4102  {
4103  data.clear();
4104  for (std::size_t i=0; i<pair_first.size(); ++i)
4105  data[pair_first[i]] = pair_second[i];
4106  }
4107 #endif
4108 }
4109 
4110 template <typename T1, typename T2, typename C, typename A>
4111 inline void Communicator::broadcast(std::map<T1,T2,C,A> & data,
4112  const unsigned int root_id,
4113  const bool identical_sizes) const
4114 {
4115  this->map_broadcast(data, root_id, identical_sizes);
4116 }
4117 
4118 
4119 
4120 template <typename K, typename V, typename H, typename E, typename A>
4121 inline void Communicator::broadcast(std::unordered_map<K,V,H,E,A> & data,
4122  const unsigned int root_id,
4123  const bool identical_sizes) const
4124 {
4125  this->map_broadcast(data, root_id, identical_sizes);
4126 }
4127 
4128 template <typename Context, typename OutputContext,
4129  typename Iter, typename OutputIter>
4130 inline void Communicator::broadcast_packed_range(const Context * context1,
4131  Iter range_begin,
4132  const Iter range_end,
4133  OutputContext * context2,
4134  OutputIter out_iter,
4135  const unsigned int root_id,
4136  std::size_t approx_buffer_size) const
4137 {
4138  typedef typename std::iterator_traits<Iter>::value_type T;
4139  typedef typename Packing<T>::buffer_type buffer_t;
4140 
4141  if (this->size() == 1)
4142  {
4143  timpi_assert (!this->rank());
4144  timpi_assert (!root_id);
4145  return;
4146  }
4147 
4148  do
4149  {
4150  // We will serialize variable size objects from *range_begin to
4151  // *range_end as a sequence of ints in this buffer
4152  std::vector<buffer_t> buffer;
4153 
4154  if (this->rank() == root_id)
4155  range_begin = pack_range
4156  (context1, range_begin, range_end, buffer, approx_buffer_size);
4157 
4158  // this->broadcast(vector) requires the receiving vectors to
4159  // already be the appropriate size
4160  std::size_t buffer_size = buffer.size();
4161  this->broadcast (buffer_size, root_id);
4162 
4163  // We continue until there's nothing left to broadcast
4164  if (!buffer_size)
4165  break;
4166 
4167  buffer.resize(buffer_size);
4168 
4169  // Broadcast the packed data
4170  this->broadcast (buffer, root_id);
4171 
4172  // OutputIter might not have operator= implemented; for maximum
4173  // compatibility we'll rely on its copy constructor.
4174  std::unique_ptr<OutputIter> next_out_iter =
4175  std::make_unique<OutputIter>(out_iter);
4176 
4177  if (this->rank() != root_id)
4178  {
4179  auto return_out_iter = unpack_range
4180  (buffer, context2, *next_out_iter, (T*)nullptr);
4181  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4182  }
4183  } while (true); // break above when we reach buffer_size==0
4184 }
4185 
4186 
4187 template <typename Context, typename Iter, typename OutputIter>
4188 inline void Communicator::gather_packed_range(const unsigned int root_id,
4189  Context * context,
4190  Iter range_begin,
4191  const Iter range_end,
4192  OutputIter out_iter,
4193  std::size_t approx_buffer_size) const
4194 {
4195  typedef typename std::iterator_traits<Iter>::value_type T;
4196  typedef typename Packing<T>::buffer_type buffer_t;
4197 
4198  bool nonempty_range = (range_begin != range_end);
4199  this->max(nonempty_range);
4200 
4201  // OutputIter might not have operator= implemented; for maximum
4202  // compatibility we'll rely on its copy constructor.
4203  std::unique_ptr<OutputIter> next_out_iter =
4204  std::make_unique<OutputIter>(out_iter);
4205 
4206  while (nonempty_range)
4207  {
4208  // We will serialize variable size objects from *range_begin to
4209  // *range_end as a sequence of ints in this buffer
4210  std::vector<buffer_t> buffer;
4211 
4212  range_begin = pack_range
4213  (context, range_begin, range_end, buffer, approx_buffer_size);
4214 
4215  this->gather(root_id, buffer);
4216 
4217  auto return_out_iter = unpack_range
4218  (buffer, context, *next_out_iter, (T*)(nullptr));
4219  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4220 
4221  nonempty_range = (range_begin != range_end);
4222  this->max(nonempty_range);
4223  }
4224 }
4225 
4226 
4227 template <typename Context, typename Iter, typename OutputIter>
4228 inline void Communicator::allgather_packed_range(Context * context,
4229  Iter range_begin,
4230  const Iter range_end,
4231  OutputIter out_iter,
4232  std::size_t approx_buffer_size) const
4233 {
4234  typedef typename std::iterator_traits<Iter>::value_type T;
4235  typedef typename Packing<T>::buffer_type buffer_t;
4236 
4237  bool nonempty_range = (range_begin != range_end);
4238  this->max(nonempty_range);
4239 
4240  // OutputIter might not have operator= implemented; for maximum
4241  // compatibility we'll rely on its copy constructor.
4242  std::unique_ptr<OutputIter> next_out_iter =
4243  std::make_unique<OutputIter>(out_iter);
4244 
4245  while (nonempty_range)
4246  {
4247  // We will serialize variable size objects from *range_begin to
4248  // *range_end as a sequence of ints in this buffer
4249  std::vector<buffer_t> buffer;
4250 
4251  range_begin = pack_range
4252  (context, range_begin, range_end, buffer, approx_buffer_size);
4253 
4254  this->allgather(buffer, false);
4255 
4256  timpi_assert(buffer.size());
4257 
4258  auto return_out_iter = unpack_range
4259  (buffer, context, *next_out_iter, (T*)nullptr);
4260  next_out_iter = std::make_unique<OutputIter>(return_out_iter);
4261 
4262  nonempty_range = (range_begin != range_end);
4263  this->max(nonempty_range);
4264  }
4265 }
4266 
4267 
4268 
4269 template<typename T>
4270 inline Status Communicator::packed_range_probe (const unsigned int src_processor_id,
4271  const MessageTag & tag,
4272  bool & flag) const
4273 {
4274  TIMPI_LOG_SCOPE("packed_range_probe()", "Parallel");
4275 
4276  ignore(src_processor_id, tag); // unused in opt mode w/o MPI
4277 
4278  Status stat((StandardType<typename Packing<T>::buffer_type>()));
4279 
4280  int int_flag = 0;
4281 
4282  timpi_assert(src_processor_id < this->size() ||
4283  src_processor_id == any_source);
4284 
4285  timpi_call_mpi(MPI_Iprobe(int(src_processor_id),
4286  tag.value(),
4287  this->get(),
4288  &int_flag,
4289  stat.get()));
4290 
4291  flag = int_flag;
4292 
4293  return stat;
4294 }
4295 
4296 
4297 
4298 template <typename T, typename A,
4299  typename std::enable_if<std::is_base_of<DataType, StandardType<T>>::value, int>::type>
4300 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4301  std::vector<T,A> & buf,
4302  Request & req,
4303  const MessageTag & tag) const
4304 {
4305  T * dataptr = buf.empty() ? nullptr : buf.data();
4306 
4307  return this->possibly_receive(src_processor_id, buf, StandardType<T>(dataptr), req, tag);
4308 }
4309 
4310 template <typename T, typename A,
4311  typename std::enable_if<Has_buffer_type<Packing<T>>::value, int>::type>
4312 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4313  std::vector<T,A> & buf,
4314  Request & req,
4315  const MessageTag & tag) const
4316 {
4317  return this->possibly_receive_packed_range(src_processor_id,
4318  (void *)(nullptr),
4319  buf.begin(),
4320  (T *)(nullptr),
4321  req,
4322  tag);
4323 }
4324 
4325 
4326 
4327 template <typename T, typename A1, typename A2>
4328 inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
4329  std::vector<std::vector<T,A1>,A2> & buf,
4330  Request & req,
4331  const MessageTag & tag) const
4332 {
4333  T * dataptr = buf.empty() ? nullptr : (buf[0].empty() ? nullptr : buf[0].data());
4334 
4335  return this->possibly_receive(src_processor_id, buf, StandardType<T>(dataptr), req, tag);
4336 }
4337 
4338 
4339 template <typename Context, typename OutputIter, typename T>
4340 inline bool Communicator::possibly_receive_packed_range (unsigned int & src_processor_id,
4341  Context * context,
4342  OutputIter out,
4343  const T * type,
4344  Request & req,
4345  const MessageTag & tag) const
4346 {
4347  TIMPI_LOG_SCOPE("possibly_receive_packed_range()", "Parallel");
4348 
4349  bool int_flag = 0;
4350 
4351  auto stat = packed_range_probe<T>(src_processor_id, tag, int_flag);
4352 
4353  if (int_flag)
4354  {
4355  src_processor_id = stat.source();
4356 
4357  nonblocking_receive_packed_range(src_processor_id,
4358  context,
4359  out,
4360  type,
4361  req,
4362  stat,
4363  tag);
4364 
4365  // The MessageTag should stay registered for the Request lifetime
4366  req.add_post_wait_work
4367  (new PostWaitDereferenceTag(tag));
4368  }
4369 
4370  timpi_assert(!int_flag || (int_flag &&
4371  src_processor_id < this->size() &&
4372  src_processor_id != any_source));
4373 
4374  return int_flag;
4375 }
4376 
4377 
4378 } // namespace TIMPI
4379 
4380 #endif // TIMPI_PARALLEL_IMPLEMENTATION_H
void gather_packed_range(const unsigned int root_id, Context *context, Iter range_begin, const Iter range_end, OutputIter out, std::size_t approx_buffer_size=1000000) const
Take a range of local variables, combine it with ranges from all processors, and write the output to ...
void add_prior_request(const Request &req)
Definition: request.C:190
data_type dataplusint_type< short int >()
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
void allgather(const T &send_data, std::vector< T, A > &recv_data) const
Take a vector of length this->size(), and fill in recv[processor_id] = the value of send on that proc...
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
void broadcast_packed_range(const Context *context1, Iter range_begin, const Iter range_end, OutputContext *context2, OutputIter out, const unsigned int root_id=0, std::size_t approx_buffer_size=1000000) const
Blocking-broadcast range-of-pointers to one processor.
OutputIter unpack_range(const std::vector< buffertype > &buffer, Context *context, OutputIter out_iter, const T *)
Helper function for range unpacking.
Definition: packing.h:1103
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void scatter(const std::vector< T, A > &data, T &recv, const unsigned int root_id=0) const
Take a vector of local variables and scatter the ith item to the ith processor in the communicator...
void map_max(Map &data) const
Private implementation function called by the map-based max() specializations.
void minloc(T &r, unsigned int &min_id) const
Take a local variable and replace it with the minimum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the minimum value.
static const request null_request
Definition: request.h:111
Types combined with an int.
int value() const
Definition: message_tag.h:92
void gather(const unsigned int root_id, const T &send_data, std::vector< T, A > &recv) const
Take a vector of length comm.size(), and on processor root_id fill in recv[processor_id] = the value ...
const MessageTag any_tag
Default message tag ids.
Definition: message_tag.h:114
void sum(T &r) const
Take a local variable and replace it with the sum of it&#39;s values on all processors.
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
processor_id_type rank() const
Definition: communicator.h:208
Encapsulates the MPI_Datatype.
Definition: data_type.h:50
void allgather_packed_range(Context *context, Iter range_begin, const Iter range_end, OutputIter out, std::size_t approx_buffer_size=1000000) const
Take a range of local variables, combine it with ranges from all processors, and write the output to ...
void map_broadcast(Map &data, const unsigned int root_id, const bool identical_sizes) const
Private implementation function called by the map-based broadcast() specializations.
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: standard_type.h:83
void assign(const communicator &comm)
Utility function for setting our member variables from an MPI communicator.
Definition: communicator.C:185
data_type dataplusint_type< float >()
void ignore(const Args &...)
Definition: timpi_assert.h:54
int source() const
Definition: status.h:142
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
Definition: packing.h:60
data_type dataplusint_type()
Templated function to return the appropriate MPI datatype for use with built-in C types when combined...
void send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, const MessageTag &tag=no_tag, std::size_t approx_buffer_size=1000000) const
Blocking-send range-of-pointers to one processor.
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
const MessageTag no_tag
Definition: message_tag.h:119
processor_id_type size() const
Definition: communicator.h:211
std::size_t packed_size_of(const std::vector< std::vector< T, A1 >, A2 > &buf, const DataType &type) const
uint8_t processor_id_type
Definition: communicator.h:54
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
Encapsulates the MPI tag integers.
Definition: message_tag.h:46
void min(const T &r, T &o, Request &req) const
Non-blocking minimum of the local value r into o with the request req.
void receive_packed_range(const unsigned int dest_processor_id, Context *context, OutputIter out, const T *output_type, const MessageTag &tag=any_tag) const
Blocking-receive range-of-pointers from one processor.
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
Definition: communicator.C:283
int tag() const
Definition: status.h:151
data_type dataplusint_type< int >()
MPI_Count CountType
Definition: status.h:47
Status wait()
Definition: request.C:121
Iter pack_range(const Context *context, Iter range_begin, const Iter range_end, std::vector< buffertype > &buffer, std::size_t approx_buffer_size)
Helper function for range packing.
Definition: packing.h:1044
static const bool has_min_max
Definition: attributes.h:48
void maxloc(T &r, unsigned int &max_id) const
Take a local variable and replace it with the maximum of it&#39;s values on all processors, returning the minimum rank of a processor which originally held the maximum value.
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
timpi_pure bool semiverify(const T *r) const
Check whether a local pointer points to the same value on all processors where it is not null...
static const bool is_fixed_type
Definition: data_type.h:130
data_type dataplusint_type< long >()
void broadcast(T &data, const unsigned int root_id=0, const bool identical_sizes=false) const
Take a local value and broadcast it to all processors.
void add_post_wait_work(PostWaitWork *work)
Definition: request.C:204
StandardType<T>&#39;s which do not define a way to MPI_Type T should inherit from this class...
Definition: data_type.h:120
data_type dataplusint_type< long double >()
CountType size(const data_type &type) const
Definition: status.h:161
void map_sum(Map &data) const
Private implementation function called by the map-based sum() specializations.
Encapsulates the MPI_Request.
Definition: request.h:67
timpi_pure bool verify(const T &r) const
Check whether a local variable has the same value on all processors, returning true if it does or fal...
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
request * get()
Definition: request.h:84
data_type dataplusint_type< double >()
SendMode send_mode() const
Gets the user-requested SendMode.
Definition: communicator.h:338
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
Definition: status.h:75
std::size_t packed_range_size(const Context *context, Iter range_begin, const Iter range_end)
Helper function for range packing.
Definition: packing.h:1023
std::pair< data_type, std::unique_ptr< StandardType< std::pair< T, int > > > > dataplusint_type_acquire()
status * get()
Definition: status.h:95
void set_union(T &data, const unsigned int root_id) const
Take a container (set, map, unordered_set, multimap, etc) of local variables on each processor...
Templated class to provide the appropriate MPI reduction operations for use with built-in C types or ...
Definition: op_function.h:120