TIMPI
parallel_sync.h
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 
20 #ifndef TIMPI_PARALLEL_SYNC_H
21 #define TIMPI_PARALLEL_SYNC_H
22 
23 // Local Includes
25 
26 // C++ includes
27 #include <algorithm> // max
28 #include <iterator> // inserter
29 #include <list>
30 #include <map> // map, multimap, pair
31 #include <type_traits> // remove_reference, remove_const
32 #include <utility> // move
33 #include <vector>
34 
35 
36 namespace TIMPI {
37 
38 //------------------------------------------------------------------------
68 template <typename MapToVectors,
70  typename ActionFunctor,
71  typename std::enable_if< std::is_base_of<DataType, StandardType<
72  typename InnermostType<typename std::remove_const<
73  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
74  >::type>::type>>::value, int>::type = 0>
75 void push_parallel_vector_data(const Communicator & comm,
76  MapToVectors && data,
77  const ActionFunctor & act_on_data);
79 
80 //------------------------------------------------------------------------
88 template <typename MapToVectors,
90  typename ActionFunctor,
91  typename std::enable_if<Has_buffer_type<Packing<
92  typename InnermostType<typename std::remove_const<
93  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
94  >::type>::type>>::value, int>::type = 0>
95 void push_parallel_vector_data(const Communicator & comm,
96  MapToVectors && data,
97  const ActionFunctor & act_on_data);
99 
135 template <typename datum,
136  typename MapToVectors,
137  typename GatherFunctor,
138  typename ActionFunctor>
139 void pull_parallel_vector_data(const Communicator & comm,
140  const MapToVectors & queries,
141  GatherFunctor & gather_data,
142  const ActionFunctor & act_on_data,
143  const datum * example);
144 
175 template <typename MapToVectors,
177  typename ActionFunctor,
178  typename Context>
179 void push_parallel_packed_range(const Communicator & comm,
180  MapToVectors && data,
181  Context * context,
182  const ActionFunctor & act_on_data);
184 
185 //------------------------------------------------------------------------
186 // Parallel function overloads
187 //
188 
189 /*
190  * A specialization for types that are harder to non-blocking receive.
191  */
192 template <typename datum,
193  typename A,
194  typename MapToVectors,
195  typename GatherFunctor,
196  typename ActionFunctor>
197 void pull_parallel_vector_data(const Communicator & comm,
198  const MapToVectors & queries,
199  GatherFunctor & gather_data,
200  ActionFunctor & act_on_data,
201  const std::vector<datum,A> * example);
202 
203 
204 
205 
206 
207 
208 //------------------------------------------------------------------------
209 // Parallel members
210 //
211 
212 // Separate namespace for not-for-public-use helper functions
213 namespace detail {
214 
215 #ifndef NDEBUG
216 inline
217 void
219  processor_id_type empty_target_pid)
220 {
221  bool someone_found_empty_send = (empty_target_pid != processor_id_type(-1));
222  comm.max(someone_found_empty_send);
223  std::stringstream err_msg;
224  if (empty_target_pid != processor_id_type(-1))
225  err_msg << " [" << comm.rank() << "] sent an empty to [" <<
226  empty_target_pid << "]";
227  timpi_assert_msg(!someone_found_empty_send,
228  "Some rank(s) sent empty data!" + err_msg.str());
229 }
230 #endif
231 
232 template <typename MapToContainers,
233  typename SendFunctor,
234  typename PossiblyReceiveFunctor,
235  typename ActionFunctor>
236 void
238  MapToContainers && data,
239  const SendFunctor & send_functor,
240  const PossiblyReceiveFunctor & possibly_receive_functor,
241  const ActionFunctor & act_on_data)
242 {
243  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
244  container_type;
245 
246  // This function must be run on all processors at once
247  timpi_parallel_only(comm);
248 
249  // This function implements the "NBX" algorithm from
250  // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf
251 
252  // We'll grab a tag so we can overlap request sends and receives
253  // without confusing one for the other
254  const auto tag = comm.get_unique_tag();
255 
256  // Save off the old send_mode so we can restore it after this
257  const auto old_send_mode = comm.send_mode();
258 
259  // Set the sending to synchronous - this is so that we can know when
260  // the sends are complete
261  const_cast<Communicator &>(comm).send_mode(Communicator::SYNCHRONOUS);
262 
263  // The send requests
264  std::list<Request> send_requests;
265 
266  const processor_id_type num_procs = comm.size();
267 
268  // Don't give us empty vectors to send. We'll yell at you (after
269  // the sync is done, so all ranks get out of it and we can throw an
270  // assertion failure everywhere) if we're debugging and we catch
271  // one.
272  //
273  // But in opt mode we'll just skip empty vectors.
274 #ifndef NDEBUG
275  processor_id_type empty_target_pid = processor_id_type(-1);
276 #endif
277 
278  for (auto & datapair : data)
279  {
280  // In the case of data partitioned into more processors than we
281  // have ranks, we "wrap around"
282  processor_id_type dest_pid = datapair.first % num_procs;
283  auto & datum = datapair.second;
284 
285  if (datum.empty())
286  {
287 #ifndef NDEBUG
288  empty_target_pid = dest_pid;
289 #endif
290  continue;
291  }
292 
293  // Just act on data if the user requested a send-to-self
294  if (dest_pid == comm.rank())
295  act_on_data(dest_pid, std::move(datum));
296  else
297  {
298  send_requests.emplace_back();
299  send_functor(dest_pid, datum, send_requests.back(), tag);
300  }
301  }
302 
303  // In serial we've now acted on all our data.
304  if (num_procs == 1)
305  return;
306 
307  // Whether or not the nonblocking barrier has started
308  bool started_barrier = false;
309  // Request for the nonblocking barrier
310  Request barrier_request;
311 
312  struct IncomingInfo
313  {
314  unsigned int src_pid = any_source;
316  container_type data;
317  };
318 
319  // Storage for the incoming requests and data
320  // The last entry in this list will _always_ be an invalid entry
321  // that is available for use for processing the next incoming
322  // request. That is, its size will always be >= 1
323  std::list<IncomingInfo> incoming;
324  incoming.emplace_back(); // add the first invalid entry for receives
325 
326  // Helper for checking and processing receives if there are any; we
327  // need to check this in multiple places
328  auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
329  auto & next_incoming = incoming.back();
330  timpi_assert_equal_to(next_incoming.src_pid, any_source);
331  if (possibly_receive_functor(next_incoming.src_pid,
332  next_incoming.data,
333  next_incoming.request, tag))
334  {
335  timpi_assert(next_incoming.src_pid != any_source);
336 
337  // Insert another entry so that the next poll has something
338  // to fill into if needed
339  incoming.emplace_back();
340 
341  // We received something
342  return true;
343  }
344 
345  // We didn't receive anything
346  return false;
347  };
348 
349  // Keep looking for receives
350  while (true)
351  {
352  timpi_assert(incoming.size() > 0);
353 
354  // Check if there is a message and start receiving it
355  possibly_receive();
356 
357  // Work through the incoming requests and act on them if they're ready
358  incoming.remove_if
359  ([&act_on_data
360 #ifndef NDEBUG
361  ,&incoming
362 #endif
363  ](IncomingInfo & info)
364  {
365  // The last entry (marked by an invalid src pid) should be skipped;
366  // it needs to remain in the list for potential filling in the next poll
367  const bool is_invalid_entry = info.src_pid == any_source;
368  timpi_assert_equal_to(is_invalid_entry, &info == &incoming.back());
369 
370  if (is_invalid_entry)
371  return false;
372 
373  // If it's finished - let's act on it
374  if (info.request.test())
375  {
376  // Do any post-wait work
377  info.request.wait();
378 
379  // Act on the data
380  act_on_data(info.src_pid, std::move(info.data));
381 
382  // This removes it from the list
383  return true;
384  }
385 
386  // Not finished yet
387  return false;
388  });
389 
390  // Remove any sends that have completed in user space
391  send_requests.remove_if
392  ([](Request & req)
393  {
394  if (req.test())
395  {
396  // Do Post-Wait work
397  req.wait();
398  return true;
399  }
400 
401  // Not finished yet
402  return false;
403  });
404 
405  // If all of the sends are complete, we can start the barrier.
406  // We strongly believe that the MPI standard guarantees
407  // if a synchronous send is marked as completed, then there
408  // is a corresponding user-posted request for said send.
409  // Therefore, send_requests being empty is enough
410  // to state that our sends are done and everyone that we have
411  // sent data is expecting it. Double therefore, this condition
412  // being satisified on all processors in addition to all
413  // receive requests being complete is a sufficient stopping
414  // criteria
415  if (send_requests.empty() && !started_barrier)
416  {
417  started_barrier = true;
418  comm.nonblocking_barrier(barrier_request);
419  }
420 
421  // There is no data to act on (we reserve a single value in
422  // \p incoming for filling within the next poll loop)
423  if (incoming.size() == 1)
424  // We've started the barrier
425  if (started_barrier)
426  // The barrier is complete (everyone is done)
427  if (barrier_request.test())
428  // Profit
429  break;
430  }
431 
432  // There better not be anything left at this point
433  timpi_assert(!possibly_receive());
434 
435  // Reset the send mode
436  const_cast<Communicator &>(comm).send_mode(old_send_mode);
437 
438  // So, *did* we see any empty containers being sent?
439 #ifndef NDEBUG
440  empty_send_assertion(comm, empty_target_pid);
441 #endif // NDEBUG
442 }
443 
444 template <typename MapToContainers,
445  typename SendFunctor,
446  typename ReceiveFunctor,
447  typename ActionFunctor>
448 void
450  MapToContainers && data,
451  const SendFunctor & send_functor,
452  const ReceiveFunctor & receive_functor,
453  const ActionFunctor & act_on_data)
454 {
455  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
456  container_type;
457 
458  // This function must be run on all processors at once
459  timpi_parallel_only(comm);
460 
461  // This function implements a simpler asynchronous protocol than
462  // NBX. Every processor will know exactly how many receives to
463  // post.
464 
465  processor_id_type num_procs = comm.size();
466 
467  // Don't give us empty vectors to send. We'll yell at you (after
468  // the sync is done, so all ranks get out of it and we can throw an
469  // assertion failure everywhere) if we're debugging and we catch
470  // one.
471  //
472  // But in opt mode we'll just skip empty vectors.
473 #ifndef NDEBUG
474  processor_id_type empty_target_pid = processor_id_type(-1);
475 #endif
476 
477  // Number of vectors to send to each procesor
478  std::vector<std::size_t> will_send_to(num_procs, 0);
479  for (auto & datapair : data)
480  {
481  // In the case of data partitioned into more processors than we
482  // have ranks, we "wrap around"
483  processor_id_type dest_pid = datapair.first % num_procs;
484 
485  // But in opt mode we'll just try to stay consistent with what
486  // we can do in the other backends
487  if (datapair.second.empty())
488  {
489 #ifndef NDEBUG
490  empty_target_pid = dest_pid;
491 #endif
492  continue;
493  }
494 
495  will_send_to[dest_pid]++;
496  }
497 
498  // Tell everyone about where everyone will send to
499  comm.alltoall(will_send_to);
500 
501  // will_send_to now represents how many vectors we'll receive from
502  // each processor; give it a better name.
503  auto & will_receive_from = will_send_to;
504 
505  processor_id_type n_receives = 0;
506  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
507  n_receives += will_receive_from[proc_id];
508 
509  // We'll grab a tag so we can overlap request sends and receives
510  // without confusing one for the other
511  MessageTag tag = comm.get_unique_tag();
512 
513  // The send requests
514  std::list<Request> requests;
515 
516  // Post all of the non-empty sends, non-blocking
517  for (auto & datapair : data)
518  {
519  processor_id_type destid = datapair.first % num_procs;
520  auto & datum = datapair.second;
521 
522  if (datum.empty())
523  continue;
524 
525  // Just act on data if the user requested a send-to-self
526  if (destid == comm.rank())
527  {
528  act_on_data(destid, std::move(datum));
529  n_receives--;
530  }
531  else
532  {
533  requests.emplace_back();
534  send_functor(destid, datum, requests.back(), tag);
535  }
536  }
537 
538  // In serial we've now acted on all our data.
539  if (num_procs == 1)
540  return;
541 
542  // Post all of the receives.
543  for (processor_id_type i = 0; i != n_receives; ++i)
544  {
545  Status stat(comm.probe(any_source, tag));
546  const processor_id_type
547  proc_id = cast_int<processor_id_type>(stat.source());
548 
549  container_type received_data;
550  receive_functor(proc_id, received_data, tag);
551  act_on_data(proc_id, std::move(received_data));
552  }
553 
554  // Wait on all the sends to complete
555  for (auto & req : requests)
556  req.wait();
557 
558  // So, *did* we see any empty containers being sent?
559 #ifndef NDEBUG
560  empty_send_assertion(comm, empty_target_pid);
561 #endif // NDEBUG
562 }
563 
564 template <typename MapToContainers,
565  typename SendReceiveFunctor,
566  typename ActionFunctor>
567 void
569  MapToContainers && data,
570  const SendReceiveFunctor & sendreceive_functor,
571  const ActionFunctor & act_on_data)
572 {
573  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
574  container_type;
575 
576  // This function must be run on all processors at once
577  timpi_parallel_only(comm);
578 
579  // This function implements the simplest protocol possible, fully
580  // synchronous. Every processor talks to every other. Only use this for
581  // debugging, and only when you're desperate.
582 
583  unsigned int num_procs = comm.size();
584 
585  // Don't give us empty vectors to send. We'll yell at you (after
586  // the sync is done, so all ranks get out of it and we can throw an
587  // assertion failure everywhere) if we're debugging and we catch
588  // one.
589  //
590  // But in opt mode we'll just skip empty vectors.
591 #ifndef NDEBUG
592  processor_id_type empty_target_pid = processor_id_type(-1);
593 #endif
594 
595  // Do multiple exchanges if we have an oversized data map
596  processor_id_type n_exchanges = 1;
597  for (auto & datapair : data)
598  {
599  const unsigned int dest_pid = datapair.first;
600  n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
601 
602  if (datapair.second.empty())
603  {
604 #ifndef NDEBUG
605  empty_target_pid = dest_pid;
606 #endif
607  continue;
608  }
609  }
610 
611  comm.max(n_exchanges);
612 
613  // We'll grab a tag so responses and queries won't be confused when
614  // this is used within a pull
615  auto tag = comm.get_unique_tag();
616 
617  // Do the send_receives, blocking
618  for (processor_id_type e=0; e != n_exchanges; ++e)
619  for (processor_id_type p=0; p != num_procs; ++p)
620  {
621  const processor_id_type procup =
622  cast_int<processor_id_type>((comm.rank() + p) %
623  num_procs);
624  const processor_id_type procdown =
625  cast_int<processor_id_type>((comm.rank() + num_procs - p) %
626  num_procs);
627 
628  container_type empty_container;
629  auto data_it = data.find(procup + e*num_procs);
630  auto * const data_to_send =
631  (data_it == data.end()) ?
632  &empty_container : &data_it->second;
633 
634  container_type received_data;
635  sendreceive_functor(procup, *data_to_send,
636  procdown, received_data, tag);
637 
638  // Empty containers aren't *real* data, they're an artifact of
639  // doing send_receive with everyone. Just skip them.
640  if (!received_data.empty())
641  act_on_data(procdown, std::move(received_data));
642  }
643 
644  // So, *did* we see any empty containers being sent?
645 #ifndef NDEBUG
646  empty_send_assertion(comm, empty_target_pid);
647 #endif // NDEBUG
648 }
649 
650 
651 } // namespace detail
652 
653 
654 
655 template <typename MapToContainers,
656  typename ActionFunctor,
657  typename Context>
659  MapToContainers && data,
660  Context * context,
661  const ActionFunctor & act_on_data)
662 {
663  typedef typename std::remove_reference<MapToContainers>::type::mapped_type container_type;
664  typedef typename container_type::value_type nonref_type;
665  typename std::remove_const<nonref_type>::type * output_type = nullptr;
666 
667  switch (comm.sync_type()) {
668  case Communicator::NBX:
669  {
670  auto send_functor = [&context, &comm](const processor_id_type dest_pid,
671  const container_type & datum,
672  Request & send_request,
673  const MessageTag tag) {
674  comm.nonblocking_send_packed_range(dest_pid, context, datum.begin(), datum.end(), send_request, tag);
675  };
676 
677  auto possibly_receive_functor = [&context, &output_type, &comm](unsigned int & current_src_proc,
678  container_type & current_incoming_data,
679  Request & current_request,
680  const MessageTag tag) {
681  return comm.possibly_receive_packed_range(
682  current_src_proc,
683  context,
684  std::inserter(current_incoming_data, current_incoming_data.end()),
685  output_type,
686  current_request,
687  tag);
688  };
689 
691  (comm, data, send_functor, possibly_receive_functor, act_on_data);
692  }
693  break;
695  {
696  auto send_functor = [&context, &comm](const processor_id_type dest_pid,
697  const container_type & datum,
698  Request & send_request,
699  const MessageTag tag) {
700  comm.nonblocking_send_packed_range(dest_pid, context, datum.begin(), datum.end(), send_request, tag);
701  };
702 
703  auto receive_functor = [&context, &output_type, &comm](unsigned int current_src_proc,
704  container_type & current_incoming_data,
705  const MessageTag tag) {
706  bool flag = false;
707  Status stat(comm.packed_range_probe<container_type>(current_src_proc, tag, flag));
708  timpi_assert(flag);
709 
710  Request req;
711  comm.nonblocking_receive_packed_range(current_src_proc, context,
712  std::inserter(current_incoming_data, current_incoming_data.end()),
713  output_type, req, stat, tag);
714  req.wait();
715  };
716 
718  (comm, data, send_functor, receive_functor, act_on_data);
719  }
720  break;
722  {
723  auto sendreceive_functor = [&context, &output_type, &comm]
724  (const processor_id_type dest_pid,
725  const container_type & data_to_send,
726  const processor_id_type src_pid,
727  container_type & received_data,
728  const MessageTag tag) {
729  comm.send_receive_packed_range(dest_pid, context,
730  data_to_send.begin(),
731  data_to_send.end(), src_pid,
732  context,
733  std::inserter(received_data,
734  received_data.end()),
735  output_type, tag, tag);
736  };
737 
739  (comm, data, sendreceive_functor, act_on_data);
740  }
741  break;
742  default:
743  timpi_error_msg("Invalid sync_type setting " << comm.sync_type());
744  }
745 
746 }
747 
748 
749 template <typename MapToVectors,
750  typename ActionFunctor,
751  typename std::enable_if< std::is_base_of<DataType, StandardType<
752  typename InnermostType<typename std::remove_const<
753  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
754  >::type>::type>>::value, int>::type>
756  MapToVectors && data,
757  const ActionFunctor & act_on_data)
758 {
759  typedef typename std::remove_reference<MapToVectors>::type::mapped_type container_type;
760  typedef typename container_type::value_type nonref_type;
761  typedef typename std::remove_const<nonref_type>::type nonconst_nonref_type;
762 
763  // We'll construct the StandardType once rather than inside a loop.
764  // We can't pass in example data here, because we might have
765  // data.empty() on some ranks, so we'll need StandardType to be able
766  // to construct the user's data type without an example.
767  auto type = build_standard_type(static_cast<nonconst_nonref_type *>(nullptr));
768 
769  switch (comm.sync_type()) {
770  case Communicator::NBX:
771  {
772  auto send_functor = [&type, &comm](const processor_id_type dest_pid,
773  const container_type & datum,
774  Request & send_request,
775  const MessageTag tag) {
776  comm.send(dest_pid, datum, type, send_request, tag);
777  };
778 
779  auto possibly_receive_functor = [&type, &comm](unsigned int & current_src_proc,
780  container_type & current_incoming_data,
781  Request & current_request,
782  const MessageTag tag) {
783  return comm.possibly_receive(
784  current_src_proc, current_incoming_data, type, current_request, tag);
785  };
786 
788  (comm, data, send_functor, possibly_receive_functor, act_on_data);
789  }
790  break;
792  {
793 #ifdef TIMPI_HAVE_MPI // We should never hit these functors in serial
794  auto send_functor = [&type, &comm](const processor_id_type dest_pid,
795  const container_type & datum,
796  Request & send_request,
797  const MessageTag tag) {
798  comm.send(dest_pid, datum, type, send_request, tag);
799  };
800 
801  auto receive_functor = [&type, &comm](unsigned int current_src_proc,
802  container_type & current_incoming_data,
803  const MessageTag tag) {
804  comm.receive(current_src_proc, current_incoming_data, type, tag);
805  };
806 #else
807  auto send_functor = [](const processor_id_type,
808  const container_type &,
809  Request &,
810  const MessageTag) {
811  timpi_error(); // We should never hit these in serial
812  };
813 
814  auto receive_functor = [](unsigned int,
815  container_type &,
816  const MessageTag) {
817  timpi_error();
818  };
819 #endif
820 
822  (comm, data, send_functor, receive_functor, act_on_data);
823  }
824  break;
826  {
827  auto sendreceive_functor = [&comm](const processor_id_type dest_pid,
828  const container_type & data_to_send,
829  const processor_id_type src_pid,
830  container_type & received_data,
831  const MessageTag tag) {
832  comm.send_receive(dest_pid, data_to_send,
833  src_pid, received_data, tag, tag);
834  };
835 
837  (comm, data, sendreceive_functor, act_on_data);
838  }
839  break;
840  default:
841  timpi_error_msg("Invalid sync_type setting " << comm.sync_type());
842  }
843 }
844 
845 
846 
847 template <typename MapToVectors,
848  typename ActionFunctor,
849  typename std::enable_if<Has_buffer_type<Packing<
850  typename InnermostType<typename std::remove_const<
851  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
852  >::type>::type>>::value, int>::type>
853 void push_parallel_vector_data(const Communicator & comm,
854  MapToVectors && data,
855  const ActionFunctor & act_on_data)
856 {
857  void * context = nullptr;
858  push_parallel_packed_range(comm, data, context, act_on_data);
859 }
860 
861 
862 template <typename datum,
863  typename MapToVectors,
864  typename GatherFunctor,
865  typename ActionFunctor>
867  const MapToVectors & queries,
868  GatherFunctor & gather_data,
869  ActionFunctor & act_on_data,
870  const datum *)
871 {
872  typedef typename MapToVectors::mapped_type query_type;
873 
874  std::multimap<processor_id_type, std::vector<datum> >
875  response_data;
876 
877 #ifndef NDEBUG
878  processor_id_type max_pid = 0;
879  for (auto p : queries)
880  max_pid = std::max(max_pid, p.first);
881 
882  // Our SENDRECEIVE implementation doesn't preserve ordering, but we
883  // need ordering preserved for the multimap trick here to work.
884  if (comm.sync_type() == Communicator::SENDRECEIVE &&
885  max_pid > comm.size())
886  timpi_not_implemented();
887 #endif
888 
889  auto gather_functor =
890  [&gather_data, &response_data]
891  (processor_id_type pid, query_type query)
892  {
893  auto new_data_it =
894  response_data.emplace(pid, std::vector<datum>());
895  gather_data(pid, query, new_data_it->second);
896  timpi_assert_equal_to(query.size(), new_data_it->second.size());
897  };
898 
899  push_parallel_vector_data (comm, queries, gather_functor);
900 
901  std::map<processor_id_type, unsigned int> responses_acted_on;
902 
903  const processor_id_type num_procs = comm.size();
904 
905  auto action_functor =
906  [&act_on_data, &queries, &responses_acted_on,
907 #ifndef NDEBUG
908  max_pid,
909 #endif
910  num_procs
911  ]
912  (processor_id_type pid, const std::vector<datum> & data)
913  {
914  // We rely on responses coming in the same order as queries
915  const unsigned int nth_query = responses_acted_on[pid]++;
916 
917  auto q_pid_its = queries.equal_range(pid);
918  auto query_it = q_pid_its.first;
919 
920  // In an oversized pull we might not have any queries addressed
921  // to the *base* pid, but only to pid+N*num_procs for some N>1
922  // timpi_assert(query_it != q_pid_its.second);
923  while (query_it == q_pid_its.second)
924  {
925  pid += num_procs;
926  q_pid_its = queries.equal_range(pid);
927  timpi_assert_less_equal(pid, max_pid);
928  query_it = q_pid_its.first;
929  }
930 
931  for (unsigned int i=0; i != nth_query; ++i)
932  {
933  query_it++;
934  if (query_it == q_pid_its.second)
935  {
936  do
937  {
938  pid += num_procs;
939  q_pid_its = queries.equal_range(pid);
940  timpi_assert_less_equal(pid, max_pid);
941  } while (q_pid_its.first == q_pid_its.second);
942  query_it = q_pid_its.first;
943  }
944  }
945 
946  act_on_data(pid, query_it->second, data);
947  };
948 
949  push_parallel_vector_data (comm, response_data, action_functor);
950 }
951 
952 
953 
954 
955 template <typename datum,
956  typename A,
957  typename MapToVectors,
958  typename GatherFunctor,
959  typename ActionFunctor>
961  const MapToVectors & queries,
962  GatherFunctor & gather_data,
963  ActionFunctor & act_on_data,
964  const std::vector<datum,A> *)
965 {
966  typedef typename MapToVectors::mapped_type query_type;
967 
968  // First index: order of creation, irrelevant
969  std::vector<std::vector<std::vector<datum,A>>> response_data;
970  std::vector<Request> response_requests;
971 
972  // We'll grab a tag so we can overlap request sends and receives
973  // without confusing one for the other
974  MessageTag tag = comm.get_unique_tag();
975 
976  auto gather_functor =
977  [&comm, &gather_data, &act_on_data,
978  &response_data, &response_requests, &tag]
979  (processor_id_type pid, query_type query)
980  {
981  std::vector<std::vector<datum,A>> response;
982  gather_data(pid, query, response);
983  timpi_assert_equal_to(query.size(),
984  response.size());
985 
986  // Just act on data if the user requested a send-to-self
987  if (pid == comm.rank())
988  {
989  act_on_data(pid, query, response);
990  }
991  else
992  {
993  Request sendreq;
994  comm.send(pid, response, sendreq, tag);
995  response_requests.push_back(sendreq);
996  response_data.push_back(std::move(response));
997  }
998  };
999 
1000  push_parallel_vector_data (comm, queries, gather_functor);
1001 
1002  // Every outgoing query should now have an incoming response.
1003  //
1004  // Post all of the receives.
1005  //
1006  // Use blocking API here since we can't use the pre-sized
1007  // non-blocking APIs with this data type.
1008  //
1009  // FIXME - implement Derek's API from #1684, switch to that!
1010  std::vector<Request> receive_requests;
1011  std::vector<processor_id_type> receive_procids;
1012  for (std::size_t i = 0,
1013  n_queries = queries.size() - queries.count(comm.rank());
1014  i != n_queries; ++i)
1015  {
1016  Status stat(comm.probe(any_source, tag));
1017  const processor_id_type
1018  proc_id = cast_int<processor_id_type>(stat.source());
1019 
1020  std::vector<std::vector<datum,A>> received_data;
1021  comm.receive(proc_id, received_data, tag);
1022 
1023  timpi_assert(queries.count(proc_id));
1024  auto & querydata = queries.at(proc_id);
1025  timpi_assert_equal_to(querydata.size(), received_data.size());
1026  act_on_data(proc_id, querydata, received_data);
1027  }
1028 
1029  wait(response_requests);
1030 }
1031 
1032 } // namespace TIMPI
1033 
1034 #endif // TIMPI_PARALLEL_SYNC_H
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
void send_mode(const SendMode sm)
Explicitly sets the SendMode type used for send operations.
Definition: communicator.h:333
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
MPI_Info info
Info object used by some MPI-3 methods.
Definition: communicator.h:79
MessageTag get_unique_tag(int tagvalue=MessageTag::invalid_tag) const
Get a tag that is unique to this Communicator.
Definition: communicator.C:251
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
void push_parallel_packed_range(const Communicator &comm, MapToVectors &&data, Context *context, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
void push_parallel_roundrobin_helper(const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
processor_id_type rank() const
Definition: communicator.h:208
Encapsulates the MPI_Datatype.
Definition: data_type.h:50
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: standard_type.h:83
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
Definition: packing.h:60
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
StandardType< T > build_standard_type(const T *example=nullptr)
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)
Encapsulates the MPI tag integers.
Definition: message_tag.h:46
void push_parallel_alltoall_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
Definition: communicator.C:283
void push_parallel_nbx_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
Status wait()
Definition: request.C:121
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
Encapsulates the MPI_Request.
Definition: request.h:67
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
Status wait(Request &r)
Wait for a non-blocking send or receive to finish.
Definition: request.h:135
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
Definition: status.h:75
void nonblocking_barrier(Request &req) const
Start a barrier that doesn&#39;t block.
Definition: communicator.C:238
bool test()
Definition: request.C:153