20 #ifndef TIMPI_PARALLEL_SYNC_H 21 #define TIMPI_PARALLEL_SYNC_H 31 #include <type_traits> 68 template <
typename MapToVectors,
70 typename ActionFunctor,
73 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
74 >::type>::type>>::value,
int>::type = 0>
77 const ActionFunctor & act_on_data);
88 template <
typename MapToVectors,
90 typename ActionFunctor,
93 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
94 >::type>::type>>::value,
int>::type = 0>
97 const ActionFunctor & act_on_data);
135 template <
typename datum,
136 typename MapToVectors,
137 typename GatherFunctor,
138 typename ActionFunctor>
140 const MapToVectors & queries,
141 GatherFunctor & gather_data,
142 const ActionFunctor & act_on_data,
143 const datum * example);
175 template <
typename MapToVectors,
177 typename ActionFunctor,
180 MapToVectors && data,
182 const ActionFunctor & act_on_data);
192 template <
typename datum,
194 typename MapToVectors,
195 typename GatherFunctor,
196 typename ActionFunctor>
198 const MapToVectors & queries,
199 GatherFunctor & gather_data,
200 ActionFunctor & act_on_data,
201 const std::vector<datum,A> * example);
222 comm.
max(someone_found_empty_send);
223 std::stringstream err_msg;
225 err_msg <<
" [" << comm.
rank() <<
"] sent an empty to [" <<
226 empty_target_pid <<
"]";
227 timpi_assert_msg(!someone_found_empty_send,
228 "Some rank(s) sent empty data!" + err_msg.str());
232 template <
typename MapToContainers,
233 typename SendFunctor,
234 typename PossiblyReceiveFunctor,
235 typename ActionFunctor>
238 MapToContainers && data,
239 const SendFunctor & send_functor,
240 const PossiblyReceiveFunctor & possibly_receive_functor,
241 const ActionFunctor & act_on_data)
243 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
247 timpi_parallel_only(comm);
257 const auto old_send_mode = comm.
send_mode();
264 std::list<Request> send_requests;
278 for (
auto & datapair : data)
283 auto & datum = datapair.second;
288 empty_target_pid = dest_pid;
294 if (dest_pid == comm.
rank())
295 act_on_data(dest_pid, std::move(datum));
298 send_requests.emplace_back();
299 send_functor(dest_pid, datum, send_requests.back(), tag);
308 bool started_barrier =
false;
323 std::list<IncomingInfo> incoming;
324 incoming.emplace_back();
328 auto possibly_receive = [&incoming, &tag, &possibly_receive_functor]() {
329 auto & next_incoming = incoming.back();
330 timpi_assert_equal_to(next_incoming.src_pid,
any_source);
331 if (possibly_receive_functor(next_incoming.src_pid,
333 next_incoming.request, tag))
335 timpi_assert(next_incoming.src_pid !=
any_source);
339 incoming.emplace_back();
352 timpi_assert(incoming.size() > 0);
363 ](IncomingInfo &
info)
368 timpi_assert_equal_to(is_invalid_entry, &
info == &incoming.back());
370 if (is_invalid_entry)
374 if (
info.request.test())
380 act_on_data(
info.src_pid, std::move(
info.data));
391 send_requests.remove_if
415 if (send_requests.empty() && !started_barrier)
417 started_barrier =
true;
423 if (incoming.size() == 1)
427 if (barrier_request.
test())
433 timpi_assert(!possibly_receive());
436 const_cast<Communicator &
>(comm).send_mode(old_send_mode);
444 template <
typename MapToContainers,
445 typename SendFunctor,
446 typename ReceiveFunctor,
447 typename ActionFunctor>
450 MapToContainers && data,
451 const SendFunctor & send_functor,
452 const ReceiveFunctor & receive_functor,
453 const ActionFunctor & act_on_data)
455 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
459 timpi_parallel_only(comm);
478 std::vector<std::size_t> will_send_to(num_procs, 0);
479 for (
auto & datapair : data)
487 if (datapair.second.empty())
490 empty_target_pid = dest_pid;
495 will_send_to[dest_pid]++;
503 auto & will_receive_from = will_send_to;
507 n_receives += will_receive_from[proc_id];
514 std::list<Request> requests;
517 for (
auto & datapair : data)
520 auto & datum = datapair.second;
526 if (destid == comm.
rank())
528 act_on_data(destid, std::move(datum));
533 requests.emplace_back();
534 send_functor(destid, datum, requests.back(), tag);
547 proc_id = cast_int<processor_id_type>(stat.source());
549 container_type received_data;
550 receive_functor(proc_id, received_data, tag);
551 act_on_data(proc_id, std::move(received_data));
555 for (
auto & req : requests)
564 template <
typename MapToContainers,
565 typename SendReceiveFunctor,
566 typename ActionFunctor>
569 MapToContainers && data,
570 const SendReceiveFunctor & sendreceive_functor,
571 const ActionFunctor & act_on_data)
573 typedef typename std::remove_reference<MapToContainers>::type::value_type::second_type
577 timpi_parallel_only(comm);
583 unsigned int num_procs = comm.
size();
597 for (
auto & datapair : data)
599 const unsigned int dest_pid = datapair.first;
600 n_exchanges = std::max(n_exchanges, dest_pid/num_procs+1);
602 if (datapair.second.empty())
605 empty_target_pid = dest_pid;
611 comm.
max(n_exchanges);
622 cast_int<processor_id_type>((comm.
rank() + p) %
625 cast_int<processor_id_type>((comm.
rank() + num_procs - p) %
628 container_type empty_container;
629 auto data_it = data.find(procup + e*num_procs);
630 auto *
const data_to_send =
631 (data_it == data.end()) ?
632 &empty_container : &data_it->second;
634 container_type received_data;
635 sendreceive_functor(procup, *data_to_send,
636 procdown, received_data, tag);
640 if (!received_data.empty())
641 act_on_data(procdown, std::move(received_data));
655 template <
typename MapToContainers,
656 typename ActionFunctor,
659 MapToContainers && data,
661 const ActionFunctor & act_on_data)
663 typedef typename std::remove_reference<MapToContainers>::type::mapped_type container_type;
664 typedef typename container_type::value_type nonref_type;
665 typename std::remove_const<nonref_type>::type * output_type =
nullptr;
671 const container_type & datum,
677 auto possibly_receive_functor = [&context, &output_type, &comm](
unsigned int & current_src_proc,
678 container_type & current_incoming_data,
684 std::inserter(current_incoming_data, current_incoming_data.end()),
691 (comm, data, send_functor, possibly_receive_functor, act_on_data);
697 const container_type & datum,
703 auto receive_functor = [&context, &output_type, &comm](
unsigned int current_src_proc,
704 container_type & current_incoming_data,
712 std::inserter(current_incoming_data, current_incoming_data.end()),
713 output_type, req, stat, tag);
718 (comm, data, send_functor, receive_functor, act_on_data);
723 auto sendreceive_functor = [&context, &output_type, &comm]
725 const container_type & data_to_send,
727 container_type & received_data,
730 data_to_send.begin(),
731 data_to_send.end(), src_pid,
733 std::inserter(received_data,
734 received_data.end()),
735 output_type, tag, tag);
739 (comm, data, sendreceive_functor, act_on_data);
743 timpi_error_msg(
"Invalid sync_type setting " << comm.
sync_type());
749 template <
typename MapToVectors,
750 typename ActionFunctor,
751 typename std::enable_if< std::is_base_of<DataType, StandardType<
752 typename InnermostType<
typename std::remove_const<
753 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
754 >::type>::type>>::value,
int>::type>
756 MapToVectors && data,
757 const ActionFunctor & act_on_data)
759 typedef typename std::remove_reference<MapToVectors>::type::mapped_type container_type;
760 typedef typename container_type::value_type nonref_type;
761 typedef typename std::remove_const<nonref_type>::type nonconst_nonref_type;
773 const container_type & datum,
776 comm.
send(dest_pid, datum, type, send_request, tag);
779 auto possibly_receive_functor = [&type, &comm](
unsigned int & current_src_proc,
780 container_type & current_incoming_data,
784 current_src_proc, current_incoming_data, type, current_request, tag);
788 (comm, data, send_functor, possibly_receive_functor, act_on_data);
793 #ifdef TIMPI_HAVE_MPI // We should never hit these functors in serial 795 const container_type & datum,
798 comm.
send(dest_pid, datum, type, send_request, tag);
801 auto receive_functor = [&type, &comm](
unsigned int current_src_proc,
802 container_type & current_incoming_data,
804 comm.
receive(current_src_proc, current_incoming_data, type, tag);
808 const container_type &,
814 auto receive_functor = [](
unsigned int,
822 (comm, data, send_functor, receive_functor, act_on_data);
828 const container_type & data_to_send,
830 container_type & received_data,
833 src_pid, received_data, tag, tag);
837 (comm, data, sendreceive_functor, act_on_data);
841 timpi_error_msg(
"Invalid sync_type setting " << comm.
sync_type());
847 template <
typename MapToVectors,
848 typename ActionFunctor,
849 typename std::enable_if<Has_buffer_type<Packing<
850 typename InnermostType<
typename std::remove_const<
851 typename std::remove_reference<MapToVectors>::type::mapped_type::value_type
852 >::type>::type>>::value,
int>::type>
854 MapToVectors && data,
855 const ActionFunctor & act_on_data)
857 void * context =
nullptr;
862 template <
typename datum,
863 typename MapToVectors,
864 typename GatherFunctor,
865 typename ActionFunctor>
867 const MapToVectors & queries,
868 GatherFunctor & gather_data,
869 ActionFunctor & act_on_data,
872 typedef typename MapToVectors::mapped_type query_type;
874 std::multimap<processor_id_type, std::vector<datum> >
879 for (
auto p : queries)
880 max_pid = std::max(max_pid, p.first);
885 max_pid > comm.
size())
886 timpi_not_implemented();
889 auto gather_functor =
890 [&gather_data, &response_data]
894 response_data.emplace(pid, std::vector<datum>());
895 gather_data(pid, query, new_data_it->second);
896 timpi_assert_equal_to(query.size(), new_data_it->second.size());
901 std::map<processor_id_type, unsigned int> responses_acted_on;
905 auto action_functor =
906 [&act_on_data, &queries, &responses_acted_on,
915 const unsigned int nth_query = responses_acted_on[pid]++;
917 auto q_pid_its = queries.equal_range(pid);
918 auto query_it = q_pid_its.first;
923 while (query_it == q_pid_its.second)
926 q_pid_its = queries.equal_range(pid);
927 timpi_assert_less_equal(pid, max_pid);
928 query_it = q_pid_its.first;
931 for (
unsigned int i=0; i != nth_query; ++i)
934 if (query_it == q_pid_its.second)
939 q_pid_its = queries.equal_range(pid);
940 timpi_assert_less_equal(pid, max_pid);
941 }
while (q_pid_its.first == q_pid_its.second);
942 query_it = q_pid_its.first;
946 act_on_data(pid, query_it->second, data);
955 template <
typename datum,
957 typename MapToVectors,
958 typename GatherFunctor,
959 typename ActionFunctor>
961 const MapToVectors & queries,
962 GatherFunctor & gather_data,
963 ActionFunctor & act_on_data,
964 const std::vector<datum,A> *)
966 typedef typename MapToVectors::mapped_type query_type;
969 std::vector<std::vector<std::vector<datum,A>>> response_data;
970 std::vector<Request> response_requests;
976 auto gather_functor =
977 [&comm, &gather_data, &act_on_data,
978 &response_data, &response_requests, &tag]
981 std::vector<std::vector<datum,A>> response;
982 gather_data(pid, query, response);
983 timpi_assert_equal_to(query.size(),
987 if (pid == comm.
rank())
989 act_on_data(pid, query, response);
994 comm.
send(pid, response, sendreq, tag);
995 response_requests.push_back(sendreq);
996 response_data.push_back(std::move(response));
1010 std::vector<Request> receive_requests;
1011 std::vector<processor_id_type> receive_procids;
1012 for (std::size_t i = 0,
1013 n_queries = queries.size() - queries.count(comm.
rank());
1014 i != n_queries; ++i)
1018 proc_id = cast_int<processor_id_type>(stat.source());
1020 std::vector<std::vector<datum,A>> received_data;
1021 comm.
receive(proc_id, received_data, tag);
1023 timpi_assert(queries.count(proc_id));
1024 auto & querydata = queries.at(proc_id);
1025 timpi_assert_equal_to(querydata.size(), received_data.size());
1026 act_on_data(proc_id, querydata, received_data);
1029 wait(response_requests);
1034 #endif // TIMPI_PARALLEL_SYNC_H void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
MPI_Request request
Request object for non-blocking I/O.
void send_mode(const SendMode sm)
Explicitly sets the SendMode type used for send operations.
void nonblocking_send_packed_range(const unsigned int dest_processor_id, const Context *context, Iter range_begin, const Iter range_end, Request &req, const MessageTag &tag=no_tag) const
Similar to the above Nonblocking send_packed_range with a few important differences: ...
Status packed_range_probe(const unsigned int src_processor_id, const MessageTag &tag, bool &flag) const
Non-Blocking message probe for a packed range message.
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
MPI_Info info
Info object used by some MPI-3 methods.
MessageTag get_unique_tag(int tagvalue=MessageTag::invalid_tag) const
Get a tag that is unique to this Communicator.
void alltoall(std::vector< T, A > &r) const
Effectively transposes the input vector across all processors.
void push_parallel_packed_range(const Communicator &comm, MapToVectors &&data, Context *context, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
void push_parallel_roundrobin_helper(const Communicator &comm, MapToContainers &&data, const SendReceiveFunctor &sendreceive_functor, const ActionFunctor &act_on_data)
processor_id_type rank() const
Encapsulates the MPI_Datatype.
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Define data types and (un)serialization functions for use when encoding a potentially-variable-size o...
void nonblocking_receive_packed_range(const unsigned int src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, Status &stat, const MessageTag &tag=any_tag) const
Non-Blocking-receive range-of-pointers from one processor.
const unsigned int any_source
Processor id meaning "Accept from any source".
bool possibly_receive(unsigned int &src_processor_id, std::vector< T, A > &buf, Request &req, const MessageTag &tag) const
Nonblocking-receive from one processor with user-defined type.
StandardType< T > build_standard_type(const T *example=nullptr)
Encapsulates the MPI_Comm object.
processor_id_type size() const
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
Blocking-receive from one processor with data-defined type.
void empty_send_assertion(const Communicator &comm, processor_id_type empty_target_pid)
Encapsulates the MPI tag integers.
void push_parallel_alltoall_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const ReceiveFunctor &receive_functor, const ActionFunctor &act_on_data)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
Blocking message probe.
void push_parallel_nbx_helper(const Communicator &comm, MapToContainers &&data, const SendFunctor &send_functor, const PossiblyReceiveFunctor &possibly_receive_functor, const ActionFunctor &act_on_data)
void send_receive(const unsigned int dest_processor_id, const T1 &send_data, const unsigned int source_processor_id, T2 &recv_data, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag) const
Send data send to one processor while simultaneously receiving other data recv from a (potentially di...
bool possibly_receive_packed_range(unsigned int &src_processor_id, Context *context, OutputIter out, const T *output_type, Request &req, const MessageTag &tag) const
Nonblocking packed range receive from one processor with user-defined type.
Encapsulates the MPI_Request.
void max(const T &r, T &o, Request &req) const
Non-blocking maximum of the local value r into o with the request req.
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
Blocking-send to one processor with data-defined type.
Status wait(Request &r)
Wait for a non-blocking send or receive to finish.
void send_receive_packed_range(const unsigned int dest_processor_id, const Context1 *context1, RangeIter send_begin, const RangeIter send_end, const unsigned int source_processor_id, Context2 *context2, OutputIter out, const T *output_type, const MessageTag &send_tag=no_tag, const MessageTag &recv_tag=any_tag, std::size_t approx_buffer_size=1000000) const
Send a range-of-pointers to one processor while simultaneously receiving another range from a (potent...
Encapsulates the MPI_Status struct.
void nonblocking_barrier(Request &req) const
Start a barrier that doesn't block.