20 #ifndef TIMPI_PARALLEL_SYNC_H 21 #define TIMPI_PARALLEL_SYNC_H 31 #include <type_traits> 68 template <
typename MapToVectors,
70 typename ActionFunctor,
73 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
74 >::type>::type>>::value,
int>::type = 0>
77 const ActionFunctor & act_on_data);
88 template <
typename MapToVectors,
90 typename ActionFunctor,
93 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
94 >::type>::type>>::value,
int>::type = 0>
97 const ActionFunctor & act_on_data);
136 template <
typename datum,
137 typename MapToVectors,
138 typename GatherFunctor,
139 typename ActionFunctor>
141 const MapToVectors & queries,
142 GatherFunctor & gather_data,
143 const ActionFunctor & act_on_data,
144 const datum * example);
176 template <
typename MapToVectors,
178 typename ActionFunctor,
181 MapToVectors && data,
183 const ActionFunctor & act_on_data);
193 template <
typename datum,
195 typename MapToVectors,
196 typename GatherFunctor,
197 typename ActionFunctor>
199 const MapToVectors & queries,
200 GatherFunctor & gather_data,
201 ActionFunctor & act_on_data,
202 const std::vector<datum,A> * example);
223 comm.
max(someone_found_empty_send);
224 std::stringstream err_msg;
226 err_msg <<
" [" << comm.
rank() <<
"] sent an empty to [" <<
227 empty_target_pid <<
"]";
228 timpi_assert_msg(!someone_found_empty_send,
229 "Some rank(s) sent empty data!" + err_msg.str());
233 template <
typename MapToContainers,
234 typename SendFunctor,
235 typename PossiblyReceiveFunctor,
236 typename ActionFunctor>
239 MapToContainers && data,
240 const SendFunctor & send_functor,
241 const PossiblyReceiveFunctor & possibly_receive_functor,
242 const ActionFunctor & act_on_data)
244 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
248 timpi_parallel_only(comm);
258 const auto old_send_mode = comm.
send_mode();
265 std::list<Request> send_requests;
279 for (
auto & datapair : data)
284 auto & datum = datapair.second;
289 empty_target_pid = dest_pid;
295 if (dest_pid == comm.
rank())
296 act_on_data(dest_pid, std::move(datum));
299 send_requests.emplace_back();
300 send_functor(dest_pid, datum, send_requests.back(), tag);
309 bool started_barrier =
false;
324 std::list<IncomingInfo> incoming;
325 incoming.emplace_back();
329 auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
330 auto & next_incoming = incoming.back();
331 timpi_assert_equal_to(next_incoming.src_pid,
any_source);
332 if (possibly_receive_functor(next_incoming.src_pid,
334 next_incoming.request, tag))
336 timpi_assert(next_incoming.src_pid !=
any_source);
340 incoming.emplace_back();
353 timpi_assert(incoming.size() > 0);
364 ](IncomingInfo &
info)
369 timpi_assert_equal_to(is_invalid_entry, &
info == &incoming.back());
371 if (is_invalid_entry)
375 if (
info.request.test())
381 act_on_data(
info.src_pid, std::move(
info.data));
392 send_requests.remove_if
416 if (send_requests.empty() && !started_barrier)
418 started_barrier =
true;
424 if (incoming.size() == 1)
428 if (barrier_request.
test())
434 timpi_assert(!possibly_receive());
437 const_cast<Communicator &
>(comm).send_mode(old_send_mode);
445 template <
typename MapToContainers,
446 typename SendFunctor,
447 typename ReceiveFunctor,
448 typename ActionFunctor>
451 MapToContainers && data,
452 const SendFunctor & send_functor,
453 const ReceiveFunctor & receive_functor,
454 const ActionFunctor & act_on_data)
456 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
460 timpi_parallel_only(comm);
479 std::vector<std::size_t> will_send_to(num_procs, 0);
480 for (
auto & datapair : data)
488 if (datapair.second.empty())
491 empty_target_pid = dest_pid;
496 will_send_to[dest_pid]++;
504 auto & will_receive_from = will_send_to;
508 n_receives += will_receive_from[proc_id];
515 std::list<Request> requests;
518 for (
auto & datapair : data)
521 auto & datum = datapair.second;
527 if (destid == comm.
rank())
529 act_on_data(destid, std::move(datum));
534 requests.emplace_back();
535 send_functor(destid, datum, requests.back(), tag);
548 proc_id = cast_int<processor_id_type>(stat.source());
550 container_type received_data;
551 receive_functor(proc_id, received_data, tag);
552 act_on_data(proc_id, std::move(received_data));
556 for (
auto & req : requests)
565 template <
typename MapToContainers,
566 typename SendReceiveFunctor,
567 typename ActionFunctor>
570 MapToContainers && data,
571 const SendReceiveFunctor & sendreceive_functor,
572 const ActionFunctor & act_on_data)
574 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
578 timpi_parallel_only(comm);
584 unsigned int num_procs = comm.
size();
598 for (
auto & datapair : data)
600 const unsigned int dest_pid = datapair.first;
601 n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
603 if (datapair.second.empty())
606 empty_target_pid = dest_pid;
612 comm.
max(n_exchanges);
623 cast_int<processor_id_type>((comm.
rank() + p) %
626 cast_int<processor_id_type>((comm.
rank() + num_procs - p) %
629 container_type empty_container;
630 auto data_it = data.find(procup + e*num_procs);
631 auto *
const data_to_send =
632 (data_it == data.end()) ?
633 &empty_container : &data_it->second;
635 container_type received_data;
636 sendreceive_functor(procup, *data_to_send,
637 procdown, received_data, tag);
641 if (!received_data.empty())
642 act_on_data(procdown, std::move(received_data));
656 template <
typename MapToContainers,
657 typename ActionFunctor,
660 MapToContainers && data,
662 const ActionFunctor & act_on_data)
664 typedef typename std::remove_reference<MapToContainers>::type::mapped_type container_type;
665 typedef typename container_type::value_type nonref_type;
666 typename std::remove_const<nonref_type>::type * output_type =
nullptr;
672 const container_type & datum,
678 auto possibly_receive_functor = [&context, &output_type, &comm](
unsigned int & current_src_proc,
679 container_type & current_incoming_data,
685 std::inserter(current_incoming_data, current_incoming_data.end()),
692 (comm, data, send_functor, possibly_receive_functor, act_on_data);
698 const container_type & datum,
704 auto receive_functor = [&context, &output_type, &comm](
unsigned int current_src_proc,
705 container_type & current_incoming_data,
713 std::inserter(current_incoming_data, current_incoming_data.end()),
714 output_type, req, stat, tag);
719 (comm, data, send_functor, receive_functor, act_on_data);
724 auto sendreceive_functor = [&context, &output_type, &comm]
726 const container_type & data_to_send,
728 container_type & received_data,
731 data_to_send.begin(),
732 data_to_send.end(), src_pid,
734 std::inserter(received_data,
735 received_data.end()),
736 output_type, tag, tag);
740 (comm, data, sendreceive_functor, act_on_data);
744 timpi_error_msg(
"Invalid sync_type setting " << comm.
sync_type());
750 template <
typename MapToVectors,
751 typename ActionFunctor,
752 typename std::enable_if< std::is_base_of<DataType, StandardType<
753 typename InnermostType<
typename std::remove_const<
754 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
755 >::type>::type>>::value,
int>::type>
757 MapToVectors && data,
758 const ActionFunctor & act_on_data)
760 typedef typename std::remove_reference<MapToVectors>::type::mapped_type container_type;
761 typedef typename container_type::value_type nonref_type;
762 typedef typename std::remove_const<nonref_type>::type nonconst_nonref_type;
774 const container_type & datum,
777 comm.
send(dest_pid, datum, type, send_request, tag);
780 auto possibly_receive_functor = [&type, &comm](
unsigned int & current_src_proc,
781 container_type & current_incoming_data,
785 current_src_proc, current_incoming_data, type, current_request, tag);
789 (comm, data, send_functor, possibly_receive_functor, act_on_data);
794 #ifdef TIMPI_HAVE_MPI // We should never hit these functors in serial 796 const container_type & datum,
799 comm.
send(dest_pid, datum, type, send_request, tag);
802 auto receive_functor = [&type, &comm](
unsigned int current_src_proc,
803 container_type & current_incoming_data,
805 comm.
receive(current_src_proc, current_incoming_data, type, tag);
809 const container_type &,
815 auto receive_functor = [](
unsigned int,
823 (comm, data, send_functor, receive_functor, act_on_data);
829 const container_type & data_to_send,
831 container_type & received_data,
834 src_pid, received_data, tag, tag);
838 (comm, data, sendreceive_functor, act_on_data);
842 timpi_error_msg(
"Invalid sync_type setting " << comm.
sync_type());
848 template <
typename MapToVectors,
849 typename ActionFunctor,
850 typename std::enable_if<Has_buffer_type<Packing<
851 typename InnermostType<
typename std::remove_const<
852 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
853 >::type>::type>>::value,
int>::type>
855 MapToVectors && data,
856 const ActionFunctor & act_on_data)
858 void * context =
nullptr;
863 template <
typename datum,
864 typename MapToVectors,
865 typename GatherFunctor,
866 typename ActionFunctor>
868 const MapToVectors & queries,
869 GatherFunctor & gather_data,
870 ActionFunctor & act_on_data,
873 typedef typename MapToVectors::mapped_type query_type;
875 std::multimap<processor_id_type, std::vector<datum> >
880 for (
auto p : queries)
881 max_pid = std::max(max_pid, p.first);
886 max_pid > comm.
size())
887 timpi_not_implemented();
890 auto gather_functor =
891 [&gather_data, &response_data]
895 response_data.emplace(pid, std::vector<datum>());
896 gather_data(pid, query, new_data_it->second);
897 timpi_assert_equal_to(query.size(), new_data_it->second.size());
902 std::map<processor_id_type, unsigned int> responses_acted_on;
906 auto action_functor =
907 [&act_on_data, &queries, &responses_acted_on,
916 const unsigned int nth_query = responses_acted_on[pid]++;
918 auto q_pid_its = queries.equal_range(pid);
919 auto query_it = q_pid_its.first;
924 while (query_it == q_pid_its.second)
927 q_pid_its = queries.equal_range(pid);
928 timpi_assert_less_equal(pid, max_pid);
929 query_it = q_pid_its.first;
932 for (
unsigned int i=0; i != nth_query; ++i)
935 if (query_it == q_pid_its.second)
940 q_pid_its = queries.equal_range(pid);
941 timpi_assert_less_equal(pid, max_pid);
942 }
while (q_pid_its.first == q_pid_its.second);
943 query_it = q_pid_its.first;
947 act_on_data(pid, query_it->second, data);
956 template <
typename datum,
958 typename MapToVectors,
959 typename GatherFunctor,
960 typename ActionFunctor>
962 const MapToVectors & queries,
963 GatherFunctor & gather_data,
964 ActionFunctor & act_on_data,
965 const std::vector<datum,A> *)
967 typedef typename MapToVectors::mapped_type query_type;
970 std::vector<std::vector<std::vector<datum,A>>> response_data;
971 std::vector<Request> response_requests;
977 auto gather_functor =
978 [&comm, &gather_data, &act_on_data,
979 &response_data, &response_requests, &tag]
982 std::vector<std::vector<datum,A>> response;
983 gather_data(pid, query, response);
984 timpi_assert_equal_to(query.size(),
988 if (pid == comm.
rank())
990 act_on_data(pid, query, response);
995 comm.
send(pid, response, sendreq, tag);
996 response_requests.push_back(sendreq);
997 response_data.push_back(std::move(response));
1011 std::vector<Request> receive_requests;
1012 std::vector<processor_id_type> receive_procids;
1013 for (std::size_t i = 0,
1014 n_queries = queries.size() - queries.count(comm.
rank());
1015 i != n_queries; ++i)
1019 proc_id = cast_int<processor_id_type>(stat.source());
1021 std::vector<std::vector<datum,A>> received_data;
1022 comm.
receive(proc_id, received_data, tag);
1024 timpi_assert(queries.count(proc_id));
1025 auto & querydata = queries.at(proc_id);
1026 timpi_assert_equal_to(querydata.size(), received_data.size());
1027 act_on_data(proc_id, querydata, received_data);
1030 wait(response_requests);
1035 #endif // TIMPI_PARALLEL_SYNC_H void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
MPI_Request request
Request object for non-blocking I/O.
void send_mode(const SendMode sm)
Explicitly sets the SendMode type used for send operations.
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
MPI_Info info
Info object used by some MPI-3 methods.
MessageTag get_unique_tag(int tagvalue=MessageTag::invalid_tag) const
Get a tag that is unique to this Communicator.
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
void push_parallel_packed_range(const Communicator &comm, MapToVectors &&data, Context *context, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
void push_parallel_roundrobin_helper(const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
processor_id_type rank() const
Encapsulates the MPI_Datatype.
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
StandardType< T > build_standard_type(const T *example=nullptr)
Encapsulates the MPI_Comm object.
processor_id_type size() const
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)
Encapsulates the MPI tag integers.
void push_parallel_alltoall_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
void push_parallel_nbx_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
Encapsulates the MPI_Request.
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
Status wait(Request &r)
Wait for a non-blocking send or receive to finish.
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
void nonblocking_barrier(Request &req) const
Start a barrier that doesn't block.