TIMPI
parallel_sync.h
Go to the documentation of this file.
1 // The TIMPI Message-Passing Parallelism Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 
20 #ifndef TIMPI_PARALLEL_SYNC_H
21 #define TIMPI_PARALLEL_SYNC_H
22 
23 // Local Includes
25 
26 // C++ includes
27 #include <algorithm> // max
28 #include <iterator> // inserter
29 #include <list>
30 #include <map> // map, multimap, pair
31 #include <type_traits> // remove_reference, remove_const
32 #include <utility> // move
33 #include <vector>
34 
35 
36 namespace TIMPI {
37 
38 //------------------------------------------------------------------------
68 template <typename MapToVectors,
70  typename ActionFunctor,
71  typename std::enable_if< std::is_base_of<DataType, StandardType<
72  typename InnermostType<typename std::remove_const<
73  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
74  >::type>::type>>::value, int>::type = 0>
75 void push_parallel_vector_data(const Communicator & comm,
76  MapToVectors && data,
77  const ActionFunctor & act_on_data);
79 
80 //------------------------------------------------------------------------
88 template <typename MapToVectors,
90  typename ActionFunctor,
91  typename std::enable_if<Has_buffer_type<Packing<
92  typename InnermostType<typename std::remove_const<
93  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
94  >::type>::type>>::value, int>::type = 0>
95 void push_parallel_vector_data(const Communicator & comm,
96  MapToVectors && data,
97  const ActionFunctor & act_on_data);
99 
136 template <typename datum,
137  typename MapToVectors,
138  typename GatherFunctor,
139  typename ActionFunctor>
140 void pull_parallel_vector_data(const Communicator & comm,
141  const MapToVectors & queries,
142  GatherFunctor & gather_data,
143  const ActionFunctor & act_on_data,
144  const datum * example);
145 
176 template <typename MapToVectors,
178  typename ActionFunctor,
179  typename Context>
180 void push_parallel_packed_range(const Communicator & comm,
181  MapToVectors && data,
182  Context * context,
183  const ActionFunctor & act_on_data);
185 
186 //------------------------------------------------------------------------
187 // Parallel function overloads
188 //
189 
190 /*
191  * A specialization for types that are harder to non-blocking receive.
192  */
193 template <typename datum,
194  typename A,
195  typename MapToVectors,
196  typename GatherFunctor,
197  typename ActionFunctor>
198 void pull_parallel_vector_data(const Communicator & comm,
199  const MapToVectors & queries,
200  GatherFunctor & gather_data,
201  ActionFunctor & act_on_data,
202  const std::vector<datum,A> * example);
203 
204 
205 
206 
207 
208 
209 //------------------------------------------------------------------------
210 // Parallel members
211 //
212 
213 // Separate namespace for not-for-public-use helper functions
214 namespace detail {
215 
216 #ifndef NDEBUG
217 inline
218 void
220  processor_id_type empty_target_pid)
221 {
222  bool someone_found_empty_send = (empty_target_pid != processor_id_type(-1));
223  comm.max(someone_found_empty_send);
224  std::stringstream err_msg;
225  if (empty_target_pid != processor_id_type(-1))
226  err_msg << " [" << comm.rank() << "] sent an empty to [" <<
227  empty_target_pid << "]";
228  timpi_assert_msg(!someone_found_empty_send,
229  "Some rank(s) sent empty data!" + err_msg.str());
230 }
231 #endif
232 
233 template <typename MapToContainers,
234  typename SendFunctor,
235  typename PossiblyReceiveFunctor,
236  typename ActionFunctor>
237 void
239  MapToContainers && data,
240  const SendFunctor & send_functor,
241  const PossiblyReceiveFunctor & possibly_receive_functor,
242  const ActionFunctor & act_on_data)
243 {
244  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
245  container_type;
246 
247  // This function must be run on all processors at once
248  timpi_parallel_only(comm);
249 
250  // This function implements the "NBX" algorithm from
251  // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf
252 
253  // We'll grab a tag so we can overlap request sends and receives
254  // without confusing one for the other
255  const auto tag = comm.get_unique_tag();
256 
257  // Save off the old send_mode so we can restore it after this
258  const auto old_send_mode = comm.send_mode();
259 
260  // Set the sending to synchronous - this is so that we can know when
261  // the sends are complete
262  const_cast<Communicator &>(comm).send_mode(Communicator::SYNCHRONOUS);
263 
264  // The send requests
265  std::list<Request> send_requests;
266 
267  const processor_id_type num_procs = comm.size();
268 
269  // Don't give us empty vectors to send. We'll yell at you (after
270  // the sync is done, so all ranks get out of it and we can throw an
271  // assertion failure everywhere) if we're debugging and we catch
272  // one.
273  //
274  // But in opt mode we'll just skip empty vectors.
275 #ifndef NDEBUG
276  processor_id_type empty_target_pid = processor_id_type(-1);
277 #endif
278 
279  for (auto & datapair : data)
280  {
281  // In the case of data partitioned into more processors than we
282  // have ranks, we "wrap around"
283  processor_id_type dest_pid = datapair.first % num_procs;
284  auto & datum = datapair.second;
285 
286  if (datum.empty())
287  {
288 #ifndef NDEBUG
289  empty_target_pid = dest_pid;
290 #endif
291  continue;
292  }
293 
294  // Just act on data if the user requested a send-to-self
295  if (dest_pid == comm.rank())
296  act_on_data(dest_pid, std::move(datum));
297  else
298  {
299  send_requests.emplace_back();
300  send_functor(dest_pid, datum, send_requests.back(), tag);
301  }
302  }
303 
304  // In serial we've now acted on all our data.
305  if (num_procs == 1)
306  return;
307 
308  // Whether or not the nonblocking barrier has started
309  bool started_barrier = false;
310  // Request for the nonblocking barrier
311  Request barrier_request;
312 
313  struct IncomingInfo
314  {
315  unsigned int src_pid = any_source;
317  container_type data;
318  };
319 
320  // Storage for the incoming requests and data
321  // The last entry in this list will _always_ be an invalid entry
322  // that is available for use for processing the next incoming
323  // request. That is, its size will always be >= 1
324  std::list<IncomingInfo> incoming;
325  incoming.emplace_back(); // add the first invalid entry for receives
326 
327  // Helper for checking and processing receives if there are any; we
328  // need to check this in multiple places
329  auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
330  auto & next_incoming = incoming.back();
331  timpi_assert_equal_to(next_incoming.src_pid, any_source);
332  if (possibly_receive_functor(next_incoming.src_pid,
333  next_incoming.data,
334  next_incoming.request, tag))
335  {
336  timpi_assert(next_incoming.src_pid != any_source);
337 
338  // Insert another entry so that the next poll has something
339  // to fill into if needed
340  incoming.emplace_back();
341 
342  // We received something
343  return true;
344  }
345 
346  // We didn't receive anything
347  return false;
348  };
349 
350  // Keep looking for receives
351  while (true)
352  {
353  timpi_assert(incoming.size() > 0);
354 
355  // Check if there is a message and start receiving it
356  possibly_receive();
357 
358  // Work through the incoming requests and act on them if they're ready
359  incoming.remove_if
360  ([&act_on_data
361 #ifndef NDEBUG
362  ,&incoming
363 #endif
364  ](IncomingInfo & info)
365  {
366  // The last entry (marked by an invalid src pid) should be skipped;
367  // it needs to remain in the list for potential filling in the next poll
368  const bool is_invalid_entry = info.src_pid == any_source;
369  timpi_assert_equal_to(is_invalid_entry, &info == &incoming.back());
370 
371  if (is_invalid_entry)
372  return false;
373 
374  // If it's finished - let's act on it
375  if (info.request.test())
376  {
377  // Do any post-wait work
378  info.request.wait();
379 
380  // Act on the data
381  act_on_data(info.src_pid, std::move(info.data));
382 
383  // This removes it from the list
384  return true;
385  }
386 
387  // Not finished yet
388  return false;
389  });
390 
391  // Remove any sends that have completed in user space
392  send_requests.remove_if
393  ([](Request & req)
394  {
395  if (req.test())
396  {
397  // Do Post-Wait work
398  req.wait();
399  return true;
400  }
401 
402  // Not finished yet
403  return false;
404  });
405 
406  // If all of the sends are complete, we can start the barrier.
407  // We strongly believe that the MPI standard guarantees
408  // if a synchronous send is marked as completed, then there
409  // is a corresponding user-posted request for said send.
410  // Therefore, send_requests being empty is enough
411  // to state that our sends are done and everyone that we have
412  // sent data is expecting it. Double therefore, this condition
413  // being satisified on all processors in addition to all
414  // receive requests being complete is a sufficient stopping
415  // criteria
416  if (send_requests.empty() && !started_barrier)
417  {
418  started_barrier = true;
419  comm.nonblocking_barrier(barrier_request);
420  }
421 
422  // There is no data to act on (we reserve a single value in
423  // \p incoming for filling within the next poll loop)
424  if (incoming.size() == 1)
425  // We've started the barrier
426  if (started_barrier)
427  // The barrier is complete (everyone is done)
428  if (barrier_request.test())
429  // Profit
430  break;
431  }
432 
433  // There better not be anything left at this point
434  timpi_assert(!possibly_receive());
435 
436  // Reset the send mode
437  const_cast<Communicator &>(comm).send_mode(old_send_mode);
438 
439  // So, *did* we see any empty containers being sent?
440 #ifndef NDEBUG
441  empty_send_assertion(comm, empty_target_pid);
442 #endif // NDEBUG
443 }
444 
445 template <typename MapToContainers,
446  typename SendFunctor,
447  typename ReceiveFunctor,
448  typename ActionFunctor>
449 void
451  MapToContainers && data,
452  const SendFunctor & send_functor,
453  const ReceiveFunctor & receive_functor,
454  const ActionFunctor & act_on_data)
455 {
456  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
457  container_type;
458 
459  // This function must be run on all processors at once
460  timpi_parallel_only(comm);
461 
462  // This function implements a simpler asynchronous protocol than
463  // NBX. Every processor will know exactly how many receives to
464  // post.
465 
466  processor_id_type num_procs = comm.size();
467 
468  // Don't give us empty vectors to send. We'll yell at you (after
469  // the sync is done, so all ranks get out of it and we can throw an
470  // assertion failure everywhere) if we're debugging and we catch
471  // one.
472  //
473  // But in opt mode we'll just skip empty vectors.
474 #ifndef NDEBUG
475  processor_id_type empty_target_pid = processor_id_type(-1);
476 #endif
477 
478  // Number of vectors to send to each procesor
479  std::vector<std::size_t> will_send_to(num_procs, 0);
480  for (auto & datapair : data)
481  {
482  // In the case of data partitioned into more processors than we
483  // have ranks, we "wrap around"
484  processor_id_type dest_pid = datapair.first % num_procs;
485 
486  // But in opt mode we'll just try to stay consistent with what
487  // we can do in the other backends
488  if (datapair.second.empty())
489  {
490 #ifndef NDEBUG
491  empty_target_pid = dest_pid;
492 #endif
493  continue;
494  }
495 
496  will_send_to[dest_pid]++;
497  }
498 
499  // Tell everyone about where everyone will send to
500  comm.alltoall(will_send_to);
501 
502  // will_send_to now represents how many vectors we'll receive from
503  // each processor; give it a better name.
504  auto & will_receive_from = will_send_to;
505 
506  processor_id_type n_receives = 0;
507  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
508  n_receives += will_receive_from[proc_id];
509 
510  // We'll grab a tag so we can overlap request sends and receives
511  // without confusing one for the other
512  MessageTag tag = comm.get_unique_tag();
513 
514  // The send requests
515  std::list<Request> requests;
516 
517  // Post all of the non-empty sends, non-blocking
518  for (auto & datapair : data)
519  {
520  processor_id_type destid = datapair.first % num_procs;
521  auto & datum = datapair.second;
522 
523  if (datum.empty())
524  continue;
525 
526  // Just act on data if the user requested a send-to-self
527  if (destid == comm.rank())
528  {
529  act_on_data(destid, std::move(datum));
530  n_receives--;
531  }
532  else
533  {
534  requests.emplace_back();
535  send_functor(destid, datum, requests.back(), tag);
536  }
537  }
538 
539  // In serial we've now acted on all our data.
540  if (num_procs == 1)
541  return;
542 
543  // Post all of the receives.
544  for (processor_id_type i = 0; i != n_receives; ++i)
545  {
546  Status stat(comm.probe(any_source, tag));
547  const processor_id_type
548  proc_id = cast_int<processor_id_type>(stat.source());
549 
550  container_type received_data;
551  receive_functor(proc_id, received_data, tag);
552  act_on_data(proc_id, std::move(received_data));
553  }
554 
555  // Wait on all the sends to complete
556  for (auto & req : requests)
557  req.wait();
558 
559  // So, *did* we see any empty containers being sent?
560 #ifndef NDEBUG
561  empty_send_assertion(comm, empty_target_pid);
562 #endif // NDEBUG
563 }
564 
565 template <typename MapToContainers,
566  typename SendReceiveFunctor,
567  typename ActionFunctor>
568 void
570  MapToContainers && data,
571  const SendReceiveFunctor & sendreceive_functor,
572  const ActionFunctor & act_on_data)
573 {
574  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
575  container_type;
576 
577  // This function must be run on all processors at once
578  timpi_parallel_only(comm);
579 
580  // This function implements the simplest protocol possible, fully
581  // synchronous. Every processor talks to every other. Only use this for
582  // debugging, and only when you're desperate.
583 
584  unsigned int num_procs = comm.size();
585 
586  // Don't give us empty vectors to send. We'll yell at you (after
587  // the sync is done, so all ranks get out of it and we can throw an
588  // assertion failure everywhere) if we're debugging and we catch
589  // one.
590  //
591  // But in opt mode we'll just skip empty vectors.
592 #ifndef NDEBUG
593  processor_id_type empty_target_pid = processor_id_type(-1);
594 #endif
595 
596  // Do multiple exchanges if we have an oversized data map
597  processor_id_type n_exchanges = 1;
598  for (auto & datapair : data)
599  {
600  const unsigned int dest_pid = datapair.first;
601  n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
602 
603  if (datapair.second.empty())
604  {
605 #ifndef NDEBUG
606  empty_target_pid = dest_pid;
607 #endif
608  continue;
609  }
610  }
611 
612  comm.max(n_exchanges);
613 
614  // We'll grab a tag so responses and queries won't be confused when
615  // this is used within a pull
616  auto tag = comm.get_unique_tag();
617 
618  // Do the send_receives, blocking
619  for (processor_id_type e=0; e != n_exchanges; ++e)
620  for (processor_id_type p=0; p != num_procs; ++p)
621  {
622  const processor_id_type procup =
623  cast_int<processor_id_type>((comm.rank() + p) %
624  num_procs);
625  const processor_id_type procdown =
626  cast_int<processor_id_type>((comm.rank() + num_procs - p) %
627  num_procs);
628 
629  container_type empty_container;
630  auto data_it = data.find(procup + e*num_procs);
631  auto * const data_to_send =
632  (data_it == data.end()) ?
633  &empty_container : &data_it->second;
634 
635  container_type received_data;
636  sendreceive_functor(procup, *data_to_send,
637  procdown, received_data, tag);
638 
639  // Empty containers aren't *real* data, they're an artifact of
640  // doing send_receive with everyone. Just skip them.
641  if (!received_data.empty())
642  act_on_data(procdown, std::move(received_data));
643  }
644 
645  // So, *did* we see any empty containers being sent?
646 #ifndef NDEBUG
647  empty_send_assertion(comm, empty_target_pid);
648 #endif // NDEBUG
649 }
650 
651 
652 } // namespace detail
653 
654 
655 
656 template <typename MapToContainers,
657  typename ActionFunctor,
658  typename Context>
660  MapToContainers && data,
661  Context * context,
662  const ActionFunctor & act_on_data)
663 {
664  typedef typename std::remove_reference<MapToContainers>::type::mapped_type container_type;
665  typedef typename container_type::value_type nonref_type;
666  typename std::remove_const<nonref_type>::type * output_type = nullptr;
667 
668  switch (comm.sync_type()) {
669  case Communicator::NBX:
670  {
671  auto send_functor = [&context, &comm](const processor_id_type dest_pid,
672  const container_type & datum,
673  Request & send_request,
674  const MessageTag tag) {
675  comm.nonblocking_send_packed_range(dest_pid, context, datum.begin(), datum.end(), send_request, tag);
676  };
677 
678  auto possibly_receive_functor = [&context, &output_type, &comm](unsigned int & current_src_proc,
679  container_type & current_incoming_data,
680  Request & current_request,
681  const MessageTag tag) {
682  return comm.possibly_receive_packed_range(
683  current_src_proc,
684  context,
685  std::inserter(current_incoming_data, current_incoming_data.end()),
686  output_type,
687  current_request,
688  tag);
689  };
690 
692  (comm, data, send_functor, possibly_receive_functor, act_on_data);
693  }
694  break;
696  {
697  auto send_functor = [&context, &comm](const processor_id_type dest_pid,
698  const container_type & datum,
699  Request & send_request,
700  const MessageTag tag) {
701  comm.nonblocking_send_packed_range(dest_pid, context, datum.begin(), datum.end(), send_request, tag);
702  };
703 
704  auto receive_functor = [&context, &output_type, &comm](unsigned int current_src_proc,
705  container_type & current_incoming_data,
706  const MessageTag tag) {
707  bool flag = false;
708  Status stat(comm.packed_range_probe<container_type>(current_src_proc, tag, flag));
709  timpi_assert(flag);
710 
711  Request req;
712  comm.nonblocking_receive_packed_range(current_src_proc, context,
713  std::inserter(current_incoming_data, current_incoming_data.end()),
714  output_type, req, stat, tag);
715  req.wait();
716  };
717 
719  (comm, data, send_functor, receive_functor, act_on_data);
720  }
721  break;
723  {
724  auto sendreceive_functor = [&context, &output_type, &comm]
725  (const processor_id_type dest_pid,
726  const container_type & data_to_send,
727  const processor_id_type src_pid,
728  container_type & received_data,
729  const MessageTag tag) {
730  comm.send_receive_packed_range(dest_pid, context,
731  data_to_send.begin(),
732  data_to_send.end(), src_pid,
733  context,
734  std::inserter(received_data,
735  received_data.end()),
736  output_type, tag, tag);
737  };
738 
740  (comm, data, sendreceive_functor, act_on_data);
741  }
742  break;
743  default:
744  timpi_error_msg("Invalid sync_type setting " << comm.sync_type());
745  }
746 
747 }
748 
749 
750 template <typename MapToVectors,
751  typename ActionFunctor,
752  typename std::enable_if< std::is_base_of<DataType, StandardType<
753  typename InnermostType<typename std::remove_const<
754  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
755  >::type>::type>>::value, int>::type>
757  MapToVectors && data,
758  const ActionFunctor & act_on_data)
759 {
760  typedef typename std::remove_reference<MapToVectors>::type::mapped_type container_type;
761  typedef typename container_type::value_type nonref_type;
762  typedef typename std::remove_const<nonref_type>::type nonconst_nonref_type;
763 
764  // We'll construct the StandardType once rather than inside a loop.
765  // We can't pass in example data here, because we might have
766  // data.empty() on some ranks, so we'll need StandardType to be able
767  // to construct the user's data type without an example.
768  auto type = build_standard_type(static_cast<nonconst_nonref_type *>(nullptr));
769 
770  switch (comm.sync_type()) {
771  case Communicator::NBX:
772  {
773  auto send_functor = [&type, &comm](const processor_id_type dest_pid,
774  const container_type & datum,
775  Request & send_request,
776  const MessageTag tag) {
777  comm.send(dest_pid, datum, type, send_request, tag);
778  };
779 
780  auto possibly_receive_functor = [&type, &comm](unsigned int & current_src_proc,
781  container_type & current_incoming_data,
782  Request & current_request,
783  const MessageTag tag) {
784  return comm.possibly_receive(
785  current_src_proc, current_incoming_data, type, current_request, tag);
786  };
787 
789  (comm, data, send_functor, possibly_receive_functor, act_on_data);
790  }
791  break;
793  {
794 #ifdef TIMPI_HAVE_MPI // We should never hit these functors in serial
795  auto send_functor = [&type, &comm](const processor_id_type dest_pid,
796  const container_type & datum,
797  Request & send_request,
798  const MessageTag tag) {
799  comm.send(dest_pid, datum, type, send_request, tag);
800  };
801 
802  auto receive_functor = [&type, &comm](unsigned int current_src_proc,
803  container_type & current_incoming_data,
804  const MessageTag tag) {
805  comm.receive(current_src_proc, current_incoming_data, type, tag);
806  };
807 #else
808  auto send_functor = [](const processor_id_type,
809  const container_type &,
810  Request &,
811  const MessageTag) {
812  timpi_error(); // We should never hit these in serial
813  };
814 
815  auto receive_functor = [](unsigned int,
816  container_type &,
817  const MessageTag) {
818  timpi_error();
819  };
820 #endif
821 
823  (comm, data, send_functor, receive_functor, act_on_data);
824  }
825  break;
827  {
828  auto sendreceive_functor = [&comm](const processor_id_type dest_pid,
829  const container_type & data_to_send,
830  const processor_id_type src_pid,
831  container_type & received_data,
832  const MessageTag tag) {
833  comm.send_receive(dest_pid, data_to_send,
834  src_pid, received_data, tag, tag);
835  };
836 
838  (comm, data, sendreceive_functor, act_on_data);
839  }
840  break;
841  default:
842  timpi_error_msg("Invalid sync_type setting " << comm.sync_type());
843  }
844 }
845 
846 
847 
848 template <typename MapToVectors,
849  typename ActionFunctor,
850  typename std::enable_if<Has_buffer_type<Packing<
851  typename InnermostType<typename std::remove_const<
852  typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
853  >::type>::type>>::value, int>::type>
854 void push_parallel_vector_data(const Communicator & comm,
855  MapToVectors && data,
856  const ActionFunctor & act_on_data)
857 {
858  void * context = nullptr;
859  push_parallel_packed_range(comm, data, context, act_on_data);
860 }
861 
862 
863 template <typename datum,
864  typename MapToVectors,
865  typename GatherFunctor,
866  typename ActionFunctor>
868  const MapToVectors & queries,
869  GatherFunctor & gather_data,
870  ActionFunctor & act_on_data,
871  const datum *)
872 {
873  typedef typename MapToVectors::mapped_type query_type;
874 
875  std::multimap<processor_id_type, std::vector<datum> >
876  response_data;
877 
878 #ifndef NDEBUG
879  processor_id_type max_pid = 0;
880  for (auto p : queries)
881  max_pid = std::max(max_pid, p.first);
882 
883  // Our SENDRECEIVE implementation doesn't preserve ordering, but we
884  // need ordering preserved for the multimap trick here to work.
885  if (comm.sync_type() == Communicator::SENDRECEIVE &&
886  max_pid > comm.size())
887  timpi_not_implemented();
888 #endif
889 
890  auto gather_functor =
891  [&gather_data, &response_data]
892  (processor_id_type pid, query_type query)
893  {
894  auto new_data_it =
895  response_data.emplace(pid, std::vector<datum>());
896  gather_data(pid, query, new_data_it->second);
897  timpi_assert_equal_to(query.size(), new_data_it->second.size());
898  };
899 
900  push_parallel_vector_data (comm, queries, gather_functor);
901 
902  std::map<processor_id_type, unsigned int> responses_acted_on;
903 
904  const processor_id_type num_procs = comm.size();
905 
906  auto action_functor =
907  [&act_on_data, &queries, &responses_acted_on,
908 #ifndef NDEBUG
909  max_pid,
910 #endif
911  num_procs
912  ]
913  (processor_id_type pid, const std::vector<datum> & data)
914  {
915  // We rely on responses coming in the same order as queries
916  const unsigned int nth_query = responses_acted_on[pid]++;
917 
918  auto q_pid_its = queries.equal_range(pid);
919  auto query_it = q_pid_its.first;
920 
921  // In an oversized pull we might not have any queries addressed
922  // to the *base* pid, but only to pid+N*num_procs for some N>1
923  // timpi_assert(query_it != q_pid_its.second);
924  while (query_it == q_pid_its.second)
925  {
926  pid += num_procs;
927  q_pid_its = queries.equal_range(pid);
928  timpi_assert_less_equal(pid, max_pid);
929  query_it = q_pid_its.first;
930  }
931 
932  for (unsigned int i=0; i != nth_query; ++i)
933  {
934  query_it++;
935  if (query_it == q_pid_its.second)
936  {
937  do
938  {
939  pid += num_procs;
940  q_pid_its = queries.equal_range(pid);
941  timpi_assert_less_equal(pid, max_pid);
942  } while (q_pid_its.first == q_pid_its.second);
943  query_it = q_pid_its.first;
944  }
945  }
946 
947  act_on_data(pid, query_it->second, data);
948  };
949 
950  push_parallel_vector_data (comm, response_data, action_functor);
951 }
952 
953 
954 
955 
956 template <typename datum,
957  typename A,
958  typename MapToVectors,
959  typename GatherFunctor,
960  typename ActionFunctor>
962  const MapToVectors & queries,
963  GatherFunctor & gather_data,
964  ActionFunctor & act_on_data,
965  const std::vector<datum,A> *)
966 {
967  typedef typename MapToVectors::mapped_type query_type;
968 
969  // First index: order of creation, irrelevant
970  std::vector<std::vector<std::vector<datum,A>>> response_data;
971  std::vector<Request> response_requests;
972 
973  // We'll grab a tag so we can overlap request sends and receives
974  // without confusing one for the other
975  MessageTag tag = comm.get_unique_tag();
976 
977  auto gather_functor =
978  [&comm, &gather_data, &act_on_data,
979  &response_data, &response_requests, &tag]
980  (processor_id_type pid, query_type query)
981  {
982  std::vector<std::vector<datum,A>> response;
983  gather_data(pid, query, response);
984  timpi_assert_equal_to(query.size(),
985  response.size());
986 
987  // Just act on data if the user requested a send-to-self
988  if (pid == comm.rank())
989  {
990  act_on_data(pid, query, response);
991  }
992  else
993  {
994  Request sendreq;
995  comm.send(pid, response, sendreq, tag);
996  response_requests.push_back(sendreq);
997  response_data.push_back(std::move(response));
998  }
999  };
1000 
1001  push_parallel_vector_data (comm, queries, gather_functor);
1002 
1003  // Every outgoing query should now have an incoming response.
1004  //
1005  // Post all of the receives.
1006  //
1007  // Use blocking API here since we can't use the pre-sized
1008  // non-blocking APIs with this data type.
1009  //
1010  // FIXME - implement Derek's API from #1684, switch to that!
1011  std::vector<Request> receive_requests;
1012  std::vector<processor_id_type> receive_procids;
1013  for (std::size_t i = 0,
1014  n_queries = queries.size() - queries.count(comm.rank());
1015  i != n_queries; ++i)
1016  {
1017  Status stat(comm.probe(any_source, tag));
1018  const processor_id_type
1019  proc_id = cast_int<processor_id_type>(stat.source());
1020 
1021  std::vector<std::vector<datum,A>> received_data;
1022  comm.receive(proc_id, received_data, tag);
1023 
1024  timpi_assert(queries.count(proc_id));
1025  auto & querydata = queries.at(proc_id);
1026  timpi_assert_equal_to(querydata.size(), received_data.size());
1027  act_on_data(proc_id, querydata, received_data);
1028  }
1029 
1030  wait(response_requests);
1031 }
1032 
1033 } // namespace TIMPI
1034 
1035 #endif // TIMPI_PARALLEL_SYNC_H
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
void send_mode(const SendMode sm)
Explicitly sets the SendMode type used for send operations.
Definition: communicator.h:333
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
MPI_Info info
Info object used by some MPI-3 methods.
Definition: communicator.h:79
MessageTag get_unique_tag(int tagvalue=MessageTag::invalid_tag) const
Get a tag that is unique to this Communicator.
Definition: communicator.C:251
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
void push_parallel_packed_range(const Communicator &comm, MapToVectors &&data, Context *context, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
void push_parallel_roundrobin_helper(const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
processor_id_type rank() const
Definition: communicator.h:208
Encapsulates the MPI_Datatype.
Definition: data_type.h:50
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: standard_type.h:83
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
Definition: packing.h:60
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
StandardType< T > build_standard_type(const T *example=nullptr)
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)
Encapsulates the MPI tag integers.
Definition: message_tag.h:46
void push_parallel_alltoall_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
Definition: communicator.C:283
void push_parallel_nbx_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
Status wait()
Definition: request.C:121
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
Encapsulates the MPI_Request.
Definition: request.h:67
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
Status wait(Request &r)
Wait for a non-blocking send or receive to finish.
Definition: request.h:135
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
Definition: status.h:75
void nonblocking_barrier(Request &req) const
Start a barrier that doesn&#39;t block.
Definition: communicator.C:238
bool test()
Definition: request.C:153