TIMPI
Functions
TIMPI::detail Namespace Reference

Functions

void empty_send_assertion (const Communicator &comm, processor_id_type empty_target_pid)
 
template<typename MapToContainers , typename SendFunctor , typename PossiblyReceiveFunctor , typename ActionFunctor >
void push_parallel_nbx_helper (const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
 
template<typename MapToContainers , typename SendFunctor , typename ReceiveFunctor , typename ActionFunctor >
void push_parallel_alltoall_helper (const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
 
template<typename MapToContainers , typename SendReceiveFunctor , typename ActionFunctor >
void push_parallel_roundrobin_helper (const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
 

Function Documentation

◆ empty_send_assertion()

void TIMPI::detail::empty_send_assertion ( const Communicator comm,
processor_id_type  empty_target_pid 
)
inline

Definition at line 219 of file parallel_sync.h.

References TIMPI::Communicator::max(), and TIMPI::Communicator::rank().

Referenced by push_parallel_alltoall_helper(), push_parallel_nbx_helper(), and push_parallel_roundrobin_helper().

221 {
222  bool someone_found_empty_send = (empty_target_pid != processor_id_type(-1));
223  comm.max(someone_found_empty_send);
224  std::stringstream err_msg;
225  if (empty_target_pid != processor_id_type(-1))
226  err_msg << " [" << comm.rank() << "] sent an empty to [" <<
227  empty_target_pid << "]";
228  timpi_assert_msg(!someone_found_empty_send,
229  "Some rank(s) sent empty data!" + err_msg.str());
230 }
uint8_t processor_id_type
Definition: communicator.h:54

◆ push_parallel_alltoall_helper()

template<typename MapToContainers , typename SendFunctor , typename ReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_alltoall_helper ( const Communicator comm,
MapToContainers &&  data,
const SendFunctor &  send_functor,
const ReceiveFunctor &  receive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 450 of file parallel_sync.h.

References TIMPI::Communicator::alltoall(), TIMPI::any_source, empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::probe(), TIMPI::Communicator::rank(), and TIMPI::Communicator::size().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

455 {
456  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
457  container_type;
458 
459  // This function must be run on all processors at once
460  timpi_parallel_only(comm);
461 
462  // This function implements a simpler asynchronous protocol than
463  // NBX. Every processor will know exactly how many receives to
464  // post.
465 
466  processor_id_type num_procs = comm.size();
467 
468  // Don't give us empty vectors to send. We'll yell at you (after
469  // the sync is done, so all ranks get out of it and we can throw an
470  // assertion failure everywhere) if we're debugging and we catch
471  // one.
472  //
473  // But in opt mode we'll just skip empty vectors.
474 #ifndef NDEBUG
475  processor_id_type empty_target_pid = processor_id_type(-1);
476 #endif
477 
478  // Number of vectors to send to each procesor
479  std::vector<std::size_t> will_send_to(num_procs, 0);
480  for (auto & datapair : data)
481  {
482  // In the case of data partitioned into more processors than we
483  // have ranks, we "wrap around"
484  processor_id_type dest_pid = datapair.first % num_procs;
485 
486  // But in opt mode we'll just try to stay consistent with what
487  // we can do in the other backends
488  if (datapair.second.empty())
489  {
490 #ifndef NDEBUG
491  empty_target_pid = dest_pid;
492 #endif
493  continue;
494  }
495 
496  will_send_to[dest_pid]++;
497  }
498 
499  // Tell everyone about where everyone will send to
500  comm.alltoall(will_send_to);
501 
502  // will_send_to now represents how many vectors we'll receive from
503  // each processor; give it a better name.
504  auto & will_receive_from = will_send_to;
505 
506  processor_id_type n_receives = 0;
507  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
508  n_receives += will_receive_from[proc_id];
509 
510  // We'll grab a tag so we can overlap request sends and receives
511  // without confusing one for the other
512  MessageTag tag = comm.get_unique_tag();
513 
514  // The send requests
515  std::list<Request> requests;
516 
517  // Post all of the non-empty sends, non-blocking
518  for (auto & datapair : data)
519  {
520  processor_id_type destid = datapair.first % num_procs;
521  auto & datum = datapair.second;
522 
523  if (datum.empty())
524  continue;
525 
526  // Just act on data if the user requested a send-to-self
527  if (destid == comm.rank())
528  {
529  act_on_data(destid, std::move(datum));
530  n_receives--;
531  }
532  else
533  {
534  requests.emplace_back();
535  send_functor(destid, datum, requests.back(), tag);
536  }
537  }
538 
539  // In serial we've now acted on all our data.
540  if (num_procs == 1)
541  return;
542 
543  // Post all of the receives.
544  for (processor_id_type i = 0; i != n_receives; ++i)
545  {
546  Status stat(comm.probe(any_source, tag));
547  const processor_id_type
548  proc_id = cast_int<processor_id_type>(stat.source());
549 
550  container_type received_data;
551  receive_functor(proc_id, received_data, tag);
552  act_on_data(proc_id, std::move(received_data));
553  }
554 
555  // Wait on all the sends to complete
556  for (auto & req : requests)
557  req.wait();
558 
559  // So, *did* we see any empty containers being sent?
560 #ifndef NDEBUG
561  empty_send_assertion(comm, empty_target_pid);
562 #endif // NDEBUG
563 }
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)

◆ push_parallel_nbx_helper()

template<typename MapToContainers , typename SendFunctor , typename PossiblyReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_nbx_helper ( const Communicator comm,
MapToContainers &&  data,
const SendFunctor &  send_functor,
const PossiblyReceiveFunctor &  possibly_receive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 238 of file parallel_sync.h.

References TIMPI::any_source, empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::nonblocking_barrier(), TIMPI::Communicator::rank(), TIMPI::Communicator::send_mode(), TIMPI::Communicator::size(), TIMPI::Communicator::SYNCHRONOUS, TIMPI::Request::test(), and TIMPI::Request::wait().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

243 {
244  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
245  container_type;
246 
247  // This function must be run on all processors at once
248  timpi_parallel_only(comm);
249 
250  // This function implements the "NBX" algorithm from
251  // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf
252 
253  // We'll grab a tag so we can overlap request sends and receives
254  // without confusing one for the other
255  const auto tag = comm.get_unique_tag();
256 
257  // Save off the old send_mode so we can restore it after this
258  const auto old_send_mode = comm.send_mode();
259 
260  // Set the sending to synchronous - this is so that we can know when
261  // the sends are complete
262  const_cast<Communicator &>(comm).send_mode(Communicator::SYNCHRONOUS);
263 
264  // The send requests
265  std::list<Request> send_requests;
266 
267  const processor_id_type num_procs = comm.size();
268 
269  // Don't give us empty vectors to send. We'll yell at you (after
270  // the sync is done, so all ranks get out of it and we can throw an
271  // assertion failure everywhere) if we're debugging and we catch
272  // one.
273  //
274  // But in opt mode we'll just skip empty vectors.
275 #ifndef NDEBUG
276  processor_id_type empty_target_pid = processor_id_type(-1);
277 #endif
278 
279  for (auto & datapair : data)
280  {
281  // In the case of data partitioned into more processors than we
282  // have ranks, we "wrap around"
283  processor_id_type dest_pid = datapair.first % num_procs;
284  auto & datum = datapair.second;
285 
286  if (datum.empty())
287  {
288 #ifndef NDEBUG
289  empty_target_pid = dest_pid;
290 #endif
291  continue;
292  }
293 
294  // Just act on data if the user requested a send-to-self
295  if (dest_pid == comm.rank())
296  act_on_data(dest_pid, std::move(datum));
297  else
298  {
299  send_requests.emplace_back();
300  send_functor(dest_pid, datum, send_requests.back(), tag);
301  }
302  }
303 
304  // In serial we've now acted on all our data.
305  if (num_procs == 1)
306  return;
307 
308  // Whether or not the nonblocking barrier has started
309  bool started_barrier = false;
310  // Request for the nonblocking barrier
311  Request barrier_request;
312 
313  struct IncomingInfo
314  {
315  unsigned int src_pid = any_source;
316  Request request;
317  container_type data;
318  };
319 
320  // Storage for the incoming requests and data
321  // The last entry in this list will _always_ be an invalid entry
322  // that is available for use for processing the next incoming
323  // request. That is, its size will always be >= 1
324  std::list<IncomingInfo> incoming;
325  incoming.emplace_back(); // add the first invalid entry for receives
326 
327  // Helper for checking and processing receives if there are any; we
328  // need to check this in multiple places
329  auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
330  auto & next_incoming = incoming.back();
331  timpi_assert_equal_to(next_incoming.src_pid, any_source);
332  if (possibly_receive_functor(next_incoming.src_pid,
333  next_incoming.data,
334  next_incoming.request, tag))
335  {
336  timpi_assert(next_incoming.src_pid != any_source);
337 
338  // Insert another entry so that the next poll has something
339  // to fill into if needed
340  incoming.emplace_back();
341 
342  // We received something
343  return true;
344  }
345 
346  // We didn't receive anything
347  return false;
348  };
349 
350  // Keep looking for receives
351  while (true)
352  {
353  timpi_assert(incoming.size() > 0);
354 
355  // Check if there is a message and start receiving it
356  possibly_receive();
357 
358  // Work through the incoming requests and act on them if they're ready
359  incoming.remove_if
360  ([&act_on_data
361 #ifndef NDEBUG
362  ,&incoming
363 #endif
364  ](IncomingInfo & info)
365  {
366  // The last entry (marked by an invalid src pid) should be skipped;
367  // it needs to remain in the list for potential filling in the next poll
368  const bool is_invalid_entry = info.src_pid == any_source;
369  timpi_assert_equal_to(is_invalid_entry, &info == &incoming.back());
370 
371  if (is_invalid_entry)
372  return false;
373 
374  // If it's finished - let's act on it
375  if (info.request.test())
376  {
377  // Do any post-wait work
378  info.request.wait();
379 
380  // Act on the data
381  act_on_data(info.src_pid, std::move(info.data));
382 
383  // This removes it from the list
384  return true;
385  }
386 
387  // Not finished yet
388  return false;
389  });
390 
391  // Remove any sends that have completed in user space
392  send_requests.remove_if
393  ([](Request & req)
394  {
395  if (req.test())
396  {
397  // Do Post-Wait work
398  req.wait();
399  return true;
400  }
401 
402  // Not finished yet
403  return false;
404  });
405 
406  // If all of the sends are complete, we can start the barrier.
407  // We strongly believe that the MPI standard guarantees
408  // if a synchronous send is marked as completed, then there
409  // is a corresponding user-posted request for said send.
410  // Therefore, send_requests being empty is enough
411  // to state that our sends are done and everyone that we have
412  // sent data is expecting it. Double therefore, this condition
413  // being satisified on all processors in addition to all
414  // receive requests being complete is a sufficient stopping
415  // criteria
416  if (send_requests.empty() && !started_barrier)
417  {
418  started_barrier = true;
419  comm.nonblocking_barrier(barrier_request);
420  }
421 
422  // There is no data to act on (we reserve a single value in
423  // \p incoming for filling within the next poll loop)
424  if (incoming.size() == 1)
425  // We've started the barrier
426  if (started_barrier)
427  // The barrier is complete (everyone is done)
428  if (barrier_request.test())
429  // Profit
430  break;
431  }
432 
433  // There better not be anything left at this point
434  timpi_assert(!possibly_receive());
435 
436  // Reset the send mode
437  const_cast<Communicator &>(comm).send_mode(old_send_mode);
438 
439  // So, *did* we see any empty containers being sent?
440 #ifndef NDEBUG
441  empty_send_assertion(comm, empty_target_pid);
442 #endif // NDEBUG
443 }
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
MPI_Info info
Info object used by some MPI-3 methods.
Definition: communicator.h:79
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)

◆ push_parallel_roundrobin_helper()

template<typename MapToContainers , typename SendReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_roundrobin_helper ( const Communicator comm,
MapToContainers &&  data,
const SendReceiveFunctor &  sendreceive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 569 of file parallel_sync.h.

References empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::max(), TIMPI::Communicator::rank(), and TIMPI::Communicator::size().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

573 {
574  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
575  container_type;
576 
577  // This function must be run on all processors at once
578  timpi_parallel_only(comm);
579 
580  // This function implements the simplest protocol possible, fully
581  // synchronous. Every processor talks to every other. Only use this for
582  // debugging, and only when you're desperate.
583 
584  unsigned int num_procs = comm.size();
585 
586  // Don't give us empty vectors to send. We'll yell at you (after
587  // the sync is done, so all ranks get out of it and we can throw an
588  // assertion failure everywhere) if we're debugging and we catch
589  // one.
590  //
591  // But in opt mode we'll just skip empty vectors.
592 #ifndef NDEBUG
593  processor_id_type empty_target_pid = processor_id_type(-1);
594 #endif
595 
596  // Do multiple exchanges if we have an oversized data map
597  processor_id_type n_exchanges = 1;
598  for (auto & datapair : data)
599  {
600  const unsigned int dest_pid = datapair.first;
601  n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
602 
603  if (datapair.second.empty())
604  {
605 #ifndef NDEBUG
606  empty_target_pid = dest_pid;
607 #endif
608  continue;
609  }
610  }
611 
612  comm.max(n_exchanges);
613 
614  // We'll grab a tag so responses and queries won't be confused when
615  // this is used within a pull
616  auto tag = comm.get_unique_tag();
617 
618  // Do the send_receives, blocking
619  for (processor_id_type e=0; e != n_exchanges; ++e)
620  for (processor_id_type p=0; p != num_procs; ++p)
621  {
622  const processor_id_type procup =
623  cast_int<processor_id_type>((comm.rank() + p) %
624  num_procs);
625  const processor_id_type procdown =
626  cast_int<processor_id_type>((comm.rank() + num_procs - p) %
627  num_procs);
628 
629  container_type empty_container;
630  auto data_it = data.find(procup + e*num_procs);
631  auto * const data_to_send =
632  (data_it == data.end()) ?
633  &empty_container : &data_it->second;
634 
635  container_type received_data;
636  sendreceive_functor(procup, *data_to_send,
637  procdown, received_data, tag);
638 
639  // Empty containers aren't *real* data, they're an artifact of
640  // doing send_receive with everyone. Just skip them.
641  if (!received_data.empty())
642  act_on_data(procdown, std::move(received_data));
643  }
644 
645  // So, *did* we see any empty containers being sent?
646 #ifndef NDEBUG
647  empty_send_assertion(comm, empty_target_pid);
648 #endif // NDEBUG
649 }
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)