TIMPI
Functions
TIMPI::detail Namespace Reference

Functions

void empty_send_assertion (const Communicator &comm, processor_id_type empty_target_pid)
 
template<typename MapToContainers , typename SendFunctor , typename PossiblyReceiveFunctor , typename ActionFunctor >
void push_parallel_nbx_helper (const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
 
template<typename MapToContainers , typename SendFunctor , typename ReceiveFunctor , typename ActionFunctor >
void push_parallel_alltoall_helper (const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
 
template<typename MapToContainers , typename SendReceiveFunctor , typename ActionFunctor >
void push_parallel_roundrobin_helper (const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
 

Function Documentation

◆ empty_send_assertion()

void TIMPI::detail::empty_send_assertion ( const Communicator comm,
processor_id_type  empty_target_pid 
)
inline

Definition at line 218 of file parallel_sync.h.

References TIMPI::Communicator::max(), and TIMPI::Communicator::rank().

Referenced by push_parallel_alltoall_helper(), push_parallel_nbx_helper(), and push_parallel_roundrobin_helper().

220 {
221  bool someone_found_empty_send = (empty_target_pid != processor_id_type(-1));
222  comm.max(someone_found_empty_send);
223  std::stringstream err_msg;
224  if (empty_target_pid != processor_id_type(-1))
225  err_msg << " [" << comm.rank() << "] sent an empty to [" <<
226  empty_target_pid << "]";
227  timpi_assert_msg(!someone_found_empty_send,
228  "Some rank(s) sent empty data!" + err_msg.str());
229 }
uint8_t processor_id_type
Definition: communicator.h:54

◆ push_parallel_alltoall_helper()

template<typename MapToContainers , typename SendFunctor , typename ReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_alltoall_helper ( const Communicator comm,
MapToContainers &&  data,
const SendFunctor &  send_functor,
const ReceiveFunctor &  receive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 449 of file parallel_sync.h.

References TIMPI::Communicator::alltoall(), TIMPI::any_source, empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::probe(), TIMPI::Communicator::rank(), and TIMPI::Communicator::size().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

454 {
455  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
456  container_type;
457 
458  // This function must be run on all processors at once
459  timpi_parallel_only(comm);
460 
461  // This function implements a simpler asynchronous protocol than
462  // NBX. Every processor will know exactly how many receives to
463  // post.
464 
465  processor_id_type num_procs = comm.size();
466 
467  // Don't give us empty vectors to send. We'll yell at you (after
468  // the sync is done, so all ranks get out of it and we can throw an
469  // assertion failure everywhere) if we're debugging and we catch
470  // one.
471  //
472  // But in opt mode we'll just skip empty vectors.
473 #ifndef NDEBUG
474  processor_id_type empty_target_pid = processor_id_type(-1);
475 #endif
476 
477  // Number of vectors to send to each procesor
478  std::vector<std::size_t> will_send_to(num_procs, 0);
479  for (auto & datapair : data)
480  {
481  // In the case of data partitioned into more processors than we
482  // have ranks, we "wrap around"
483  processor_id_type dest_pid = datapair.first % num_procs;
484 
485  // But in opt mode we'll just try to stay consistent with what
486  // we can do in the other backends
487  if (datapair.second.empty())
488  {
489 #ifndef NDEBUG
490  empty_target_pid = dest_pid;
491 #endif
492  continue;
493  }
494 
495  will_send_to[dest_pid]++;
496  }
497 
498  // Tell everyone about where everyone will send to
499  comm.alltoall(will_send_to);
500 
501  // will_send_to now represents how many vectors we'll receive from
502  // each processor; give it a better name.
503  auto & will_receive_from = will_send_to;
504 
505  processor_id_type n_receives = 0;
506  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
507  n_receives += will_receive_from[proc_id];
508 
509  // We'll grab a tag so we can overlap request sends and receives
510  // without confusing one for the other
511  MessageTag tag = comm.get_unique_tag();
512 
513  // The send requests
514  std::list<Request> requests;
515 
516  // Post all of the non-empty sends, non-blocking
517  for (auto & datapair : data)
518  {
519  processor_id_type destid = datapair.first % num_procs;
520  auto & datum = datapair.second;
521 
522  if (datum.empty())
523  continue;
524 
525  // Just act on data if the user requested a send-to-self
526  if (destid == comm.rank())
527  {
528  act_on_data(destid, std::move(datum));
529  n_receives--;
530  }
531  else
532  {
533  requests.emplace_back();
534  send_functor(destid, datum, requests.back(), tag);
535  }
536  }
537 
538  // In serial we've now acted on all our data.
539  if (num_procs == 1)
540  return;
541 
542  // Post all of the receives.
543  for (processor_id_type i = 0; i != n_receives; ++i)
544  {
545  Status stat(comm.probe(any_source, tag));
546  const processor_id_type
547  proc_id = cast_int<processor_id_type>(stat.source());
548 
549  container_type received_data;
550  receive_functor(proc_id, received_data, tag);
551  act_on_data(proc_id, std::move(received_data));
552  }
553 
554  // Wait on all the sends to complete
555  for (auto & req : requests)
556  req.wait();
557 
558  // So, *did* we see any empty containers being sent?
559 #ifndef NDEBUG
560  empty_send_assertion(comm, empty_target_pid);
561 #endif // NDEBUG
562 }
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)

◆ push_parallel_nbx_helper()

template<typename MapToContainers , typename SendFunctor , typename PossiblyReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_nbx_helper ( const Communicator comm,
MapToContainers &&  data,
const SendFunctor &  send_functor,
const PossiblyReceiveFunctor &  possibly_receive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 237 of file parallel_sync.h.

References TIMPI::any_source, empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::nonblocking_barrier(), TIMPI::Communicator::rank(), TIMPI::Communicator::send_mode(), TIMPI::Communicator::size(), TIMPI::Communicator::SYNCHRONOUS, TIMPI::Request::test(), and TIMPI::Request::wait().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

242 {
243  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
244  container_type;
245 
246  // This function must be run on all processors at once
247  timpi_parallel_only(comm);
248 
249  // This function implements the "NBX" algorithm from
250  // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf
251 
252  // We'll grab a tag so we can overlap request sends and receives
253  // without confusing one for the other
254  const auto tag = comm.get_unique_tag();
255 
256  // Save off the old send_mode so we can restore it after this
257  const auto old_send_mode = comm.send_mode();
258 
259  // Set the sending to synchronous - this is so that we can know when
260  // the sends are complete
261  const_cast<Communicator &>(comm).send_mode(Communicator::SYNCHRONOUS);
262 
263  // The send requests
264  std::list<Request> send_requests;
265 
266  const processor_id_type num_procs = comm.size();
267 
268  // Don't give us empty vectors to send. We'll yell at you (after
269  // the sync is done, so all ranks get out of it and we can throw an
270  // assertion failure everywhere) if we're debugging and we catch
271  // one.
272  //
273  // But in opt mode we'll just skip empty vectors.
274 #ifndef NDEBUG
275  processor_id_type empty_target_pid = processor_id_type(-1);
276 #endif
277 
278  for (auto & datapair : data)
279  {
280  // In the case of data partitioned into more processors than we
281  // have ranks, we "wrap around"
282  processor_id_type dest_pid = datapair.first % num_procs;
283  auto & datum = datapair.second;
284 
285  if (datum.empty())
286  {
287 #ifndef NDEBUG
288  empty_target_pid = dest_pid;
289 #endif
290  continue;
291  }
292 
293  // Just act on data if the user requested a send-to-self
294  if (dest_pid == comm.rank())
295  act_on_data(dest_pid, std::move(datum));
296  else
297  {
298  send_requests.emplace_back();
299  send_functor(dest_pid, datum, send_requests.back(), tag);
300  }
301  }
302 
303  // In serial we've now acted on all our data.
304  if (num_procs == 1)
305  return;
306 
307  // Whether or not the nonblocking barrier has started
308  bool started_barrier = false;
309  // Request for the nonblocking barrier
310  Request barrier_request;
311 
312  struct IncomingInfo
313  {
314  unsigned int src_pid = any_source;
315  Request request;
316  container_type data;
317  };
318 
319  // Storage for the incoming requests and data
320  // The last entry in this list will _always_ be an invalid entry
321  // that is available for use for processing the next incoming
322  // request. That is, its size will always be >= 1
323  std::list<IncomingInfo> incoming;
324  incoming.emplace_back(); // add the first invalid entry for receives
325 
326  // Helper for checking and processing receives if there are any; we
327  // need to check this in multiple places
328  auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
329  auto & next_incoming = incoming.back();
330  timpi_assert_equal_to(next_incoming.src_pid, any_source);
331  if (possibly_receive_functor(next_incoming.src_pid,
332  next_incoming.data,
333  next_incoming.request, tag))
334  {
335  timpi_assert(next_incoming.src_pid != any_source);
336 
337  // Insert another entry so that the next poll has something
338  // to fill into if needed
339  incoming.emplace_back();
340 
341  // We received something
342  return true;
343  }
344 
345  // We didn't receive anything
346  return false;
347  };
348 
349  // Keep looking for receives
350  while (true)
351  {
352  timpi_assert(incoming.size() > 0);
353 
354  // Check if there is a message and start receiving it
355  possibly_receive();
356 
357  // Work through the incoming requests and act on them if they're ready
358  incoming.remove_if
359  ([&act_on_data
360 #ifndef NDEBUG
361  ,&incoming
362 #endif
363  ](IncomingInfo & info)
364  {
365  // The last entry (marked by an invalid src pid) should be skipped;
366  // it needs to remain in the list for potential filling in the next poll
367  const bool is_invalid_entry = info.src_pid == any_source;
368  timpi_assert_equal_to(is_invalid_entry, &info == &incoming.back());
369 
370  if (is_invalid_entry)
371  return false;
372 
373  // If it's finished - let's act on it
374  if (info.request.test())
375  {
376  // Do any post-wait work
377  info.request.wait();
378 
379  // Act on the data
380  act_on_data(info.src_pid, std::move(info.data));
381 
382  // This removes it from the list
383  return true;
384  }
385 
386  // Not finished yet
387  return false;
388  });
389 
390  // Remove any sends that have completed in user space
391  send_requests.remove_if
392  ([](Request & req)
393  {
394  if (req.test())
395  {
396  // Do Post-Wait work
397  req.wait();
398  return true;
399  }
400 
401  // Not finished yet
402  return false;
403  });
404 
405  // If all of the sends are complete, we can start the barrier.
406  // We strongly believe that the MPI standard guarantees
407  // if a synchronous send is marked as completed, then there
408  // is a corresponding user-posted request for said send.
409  // Therefore, send_requests being empty is enough
410  // to state that our sends are done and everyone that we have
411  // sent data is expecting it. Double therefore, this condition
412  // being satisified on all processors in addition to all
413  // receive requests being complete is a sufficient stopping
414  // criteria
415  if (send_requests.empty() && !started_barrier)
416  {
417  started_barrier = true;
418  comm.nonblocking_barrier(barrier_request);
419  }
420 
421  // There is no data to act on (we reserve a single value in
422  // \p incoming for filling within the next poll loop)
423  if (incoming.size() == 1)
424  // We've started the barrier
425  if (started_barrier)
426  // The barrier is complete (everyone is done)
427  if (barrier_request.test())
428  // Profit
429  break;
430  }
431 
432  // There better not be anything left at this point
433  timpi_assert(!possibly_receive());
434 
435  // Reset the send mode
436  const_cast<Communicator &>(comm).send_mode(old_send_mode);
437 
438  // So, *did* we see any empty containers being sent?
439 #ifndef NDEBUG
440  empty_send_assertion(comm, empty_target_pid);
441 #endif // NDEBUG
442 }
MPI_Request request
Request object for non-blocking I/O.
Definition: request.h:41
MPI_Info info
Info object used by some MPI-3 methods.
Definition: communicator.h:79
const unsigned int any_source
Processor id meaning "Accept from any source".
Definition: communicator.h:84
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)

◆ push_parallel_roundrobin_helper()

template<typename MapToContainers , typename SendReceiveFunctor , typename ActionFunctor >
void TIMPI::detail::push_parallel_roundrobin_helper ( const Communicator comm,
MapToContainers &&  data,
const SendReceiveFunctor &  sendreceive_functor,
const ActionFunctor &  act_on_data 
)

Definition at line 568 of file parallel_sync.h.

References empty_send_assertion(), TIMPI::Communicator::get_unique_tag(), TIMPI::Communicator::max(), TIMPI::Communicator::rank(), and TIMPI::Communicator::size().

Referenced by TIMPI::push_parallel_packed_range(), and TIMPI::push_parallel_vector_data().

572 {
573  typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
574  container_type;
575 
576  // This function must be run on all processors at once
577  timpi_parallel_only(comm);
578 
579  // This function implements the simplest protocol possible, fully
580  // synchronous. Every processor talks to every other. Only use this for
581  // debugging, and only when you're desperate.
582 
583  unsigned int num_procs = comm.size();
584 
585  // Don't give us empty vectors to send. We'll yell at you (after
586  // the sync is done, so all ranks get out of it and we can throw an
587  // assertion failure everywhere) if we're debugging and we catch
588  // one.
589  //
590  // But in opt mode we'll just skip empty vectors.
591 #ifndef NDEBUG
592  processor_id_type empty_target_pid = processor_id_type(-1);
593 #endif
594 
595  // Do multiple exchanges if we have an oversized data map
596  processor_id_type n_exchanges = 1;
597  for (auto & datapair : data)
598  {
599  const unsigned int dest_pid = datapair.first;
600  n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
601 
602  if (datapair.second.empty())
603  {
604 #ifndef NDEBUG
605  empty_target_pid = dest_pid;
606 #endif
607  continue;
608  }
609  }
610 
611  comm.max(n_exchanges);
612 
613  // We'll grab a tag so responses and queries won't be confused when
614  // this is used within a pull
615  auto tag = comm.get_unique_tag();
616 
617  // Do the send_receives, blocking
618  for (processor_id_type e=0; e != n_exchanges; ++e)
619  for (processor_id_type p=0; p != num_procs; ++p)
620  {
621  const processor_id_type procup =
622  cast_int<processor_id_type>((comm.rank() + p) %
623  num_procs);
624  const processor_id_type procdown =
625  cast_int<processor_id_type>((comm.rank() + num_procs - p) %
626  num_procs);
627 
628  container_type empty_container;
629  auto data_it = data.find(procup + e*num_procs);
630  auto * const data_to_send =
631  (data_it == data.end()) ?
632  &empty_container : &data_it->second;
633 
634  container_type received_data;
635  sendreceive_functor(procup, *data_to_send,
636  procdown, received_data, tag);
637 
638  // Empty containers aren't *real* data, they're an artifact of
639  // doing send_receive with everyone. Just skip them.
640  if (!received_data.empty())
641  act_on_data(procdown, std::move(received_data));
642  }
643 
644  // So, *did* we see any empty containers being sent?
645 #ifndef NDEBUG
646  empty_send_assertion(comm, empty_target_pid);
647 #endif // NDEBUG
648 }
uint8_t processor_id_type
Definition: communicator.h:54
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)