TIMPI
dispatch_to_packed_unit.C
Go to the documentation of this file.
1 #include <timpi/timpi.h>
2 // timpi.h doesn't pull in parallel_sync
3 #include <timpi/parallel_sync.h>
4 
5 #include <algorithm>
6 #include <array>
7 #include <iterator>
8 #include <list>
9 #include <map>
10 #include <set>
11 #include <tuple>
12 #include <unordered_map>
13 #include <unordered_set>
14 #include <utility> // pair
15 #include <vector>
16 #include <unistd.h>
17 
18 #define TIMPI_UNIT_ASSERT(expr) \
19  if (!(expr)) \
20  timpi_error()
21 
22 namespace TIMPI
23 {
24 template <typename T>
25 class StandardType<std::set<T>> : public NotADataType
26 {
27 public:
28  StandardType(const std::set<T> *)
29  {
30  }
31 };
32 }
33 
34 using namespace TIMPI;
35 
37 
38 
39 template <typename Container>
40 Container createContainer(std::size_t size)
41 {
42  std::vector<typename Container::value_type> temp(size);
43  std::iota(temp.begin(), temp.end(), 0);
44  return Container(temp.begin(), temp.end());
45 }
46 
47 
48 template <typename T>
49 std::set<T> createSet(std::size_t size)
50 { return createContainer<std::set<T>>(size); }
51 
52 
53 template <typename Container>
54 Container createMapContainer(std::size_t size)
55 {
56  Container c;
57 
58  for (std::size_t i = 0; i != size; ++i)
59  c.insert(std::make_pair(i*10, i*50));
60 
61  return c;
62 }
63 
64 
65 
66  template <typename Container>
68  {
69  std::vector<Container> vals;
70  const unsigned int my_rank = TestCommWorld->rank();
71 
72  auto my_val = createContainer<Container>(my_rank + 1);
73 
74  TestCommWorld->allgather(my_val, vals);
75 
76  const std::size_t comm_size = TestCommWorld->size();
77  const std::size_t vec_size = vals.size();
78  TIMPI_UNIT_ASSERT(comm_size == vec_size);
79 
80  for (std::size_t i = 0; i < vec_size; ++i)
81  {
82  const auto & current_container = vals[i];
83  TIMPI_UNIT_ASSERT(current_container.size() == i+1);
84  for (std::size_t n = 0; n != i+1; ++n)
85  {
86  auto it = std::find(current_container.begin(),
87  current_container.end(), n);
88  TIMPI_UNIT_ASSERT(it != current_container.end());
89  }
90  }
91  }
92 
93  template <typename Container>
95  {
96  std::vector<Container> vals;
97  const unsigned int my_rank = TestCommWorld->rank();
98 
99  auto my_val = createMapContainer<Container>(my_rank + 1);
100 
101  TestCommWorld->allgather(my_val, vals);
102 
103  const std::size_t comm_size = TestCommWorld->size();
104  const std::size_t vec_size = vals.size();
105  TIMPI_UNIT_ASSERT(comm_size == vec_size);
106 
107  for (std::size_t i = 0; i < vec_size; ++i)
108  {
109  const auto & current_container = vals[i];
110  TIMPI_UNIT_ASSERT(current_container.size() == i+1);
111  for (std::size_t n = 0; n != i+1; ++n)
112  {
113  auto it = current_container.find(n*10);
114  TIMPI_UNIT_ASSERT(it != current_container.end());
115  TIMPI_UNIT_ASSERT(it->second == n*50);
116  }
117  }
118  }
119 
121  {
122  std::set<std::vector<std::tuple<int,int>>> data;
123  const int N = TestCommWorld->size();
124 
125  auto set_inserter = [&data](int i)
126  {
127  std::vector<std::tuple<int,int>> datum(1);
128  std::get<0>(datum[0]) = i;
129  std::get<1>(datum[0]) = 2*i;
130  data.insert(datum);
131  };
132 
133  auto set_tester = [&data](int i)
134  {
135  std::vector<std::tuple<int,int>> datum(1);
136  std::get<0>(datum[0]) = i;
137  std::get<1>(datum[0]) = 2*i;
138  TIMPI_UNIT_ASSERT(data.count(datum) == std::size_t(1));
139  };
140 
141  set_inserter(TestCommWorld->rank());
142  set_inserter(2*N);
143  set_inserter(3*N + TestCommWorld->rank());
144 
145  TestCommWorld->set_union(data);
146 
147  TIMPI_UNIT_ASSERT( data.size() == std::size_t(2*N+1) );
148  set_tester(2*N);
149  for (int p=0; p<N; ++p)
150  {
151  set_tester(p);
152  set_tester(3*N+p);
153  }
154  }
155 
157  {
158  std::vector<std::set<unsigned int>> vals(1);
159  const unsigned int my_rank = TestCommWorld->rank();
160  vals[0] = createSet<unsigned int>(my_rank + 1);
161 
162  TestCommWorld->allgather(vals);
163 
164  const std::size_t comm_size = TestCommWorld->size();
165  const std::size_t vec_size = vals.size();
166  TIMPI_UNIT_ASSERT(comm_size == vec_size);
167 
168  for (std::size_t i = 0; i < vec_size; ++i)
169  {
170  const auto & current_set = vals[i];
171  TIMPI_UNIT_ASSERT(current_set.size() == i+1);
172  unsigned int value = 0;
173  for (auto number : current_set)
174  TIMPI_UNIT_ASSERT(number == value++);
175  }
176  }
177 
179  {
180  std::vector<std::array<std::set<unsigned int>, 2>> vals;
181  const unsigned int my_rank = TestCommWorld->rank();
182 
183  std::array<std::set<unsigned int>, 2> vals_out
184  {{createSet<unsigned int>(my_rank + 1),
185  createSet<unsigned int>(my_rank + 10)}};
186 
187  TestCommWorld->allgather(vals_out, vals);
188 
189  const std::size_t comm_size = TestCommWorld->size();
190  const std::size_t vec_size = vals.size();
191  TIMPI_UNIT_ASSERT(comm_size == vec_size);
192 
193  for (std::size_t i = 0; i < vec_size; ++i)
194  {
195  const auto & first_set = vals[i][0];
196  TIMPI_UNIT_ASSERT(first_set.size() == i+1);
197  unsigned int value = 0;
198  for (auto number : first_set)
199  TIMPI_UNIT_ASSERT(number == value++);
200  const auto & second_set = vals[i][1];
201  TIMPI_UNIT_ASSERT(second_set.size() == i+10);
202  value = 0;
203  for (auto number : second_set)
204  TIMPI_UNIT_ASSERT(number == value++);
205  }
206  }
207 
209  {
210  std::vector<std::tuple<std::set<unsigned int>, unsigned int, unsigned int>> vals;
211  const unsigned int my_rank = TestCommWorld->rank();
212 
213  TestCommWorld->allgather(std::make_tuple(
214  createSet<unsigned int>(my_rank + 1),
215  my_rank, 2*my_rank), vals);
216 
217  const std::size_t comm_size = TestCommWorld->size();
218  const std::size_t vec_size = vals.size();
219  TIMPI_UNIT_ASSERT(comm_size == vec_size);
220 
221  for (std::size_t i = 0; i < vec_size; ++i)
222  {
223  const auto & current_set = std::get<0>(vals[i]);
224  unsigned int value = 0;
225  for (auto number : current_set)
226  TIMPI_UNIT_ASSERT(number == value++);
227  TIMPI_UNIT_ASSERT(std::get<1>(vals[i]) == i);
228  TIMPI_UNIT_ASSERT(std::get<2>(vals[i]) == 2*i);
229  }
230  }
231 
233  {
234  std::vector<std::pair<std::set<unsigned int>, unsigned int>> vals;
235  const unsigned int my_rank = TestCommWorld->rank();
236 
237  TestCommWorld->allgather(std::make_pair(
238  createSet<unsigned int>(my_rank + 1),
239  my_rank), vals);
240 
241  const std::size_t comm_size = TestCommWorld->size();
242  const std::size_t vec_size = vals.size();
243  TIMPI_UNIT_ASSERT(comm_size == vec_size);
244 
245  for (std::size_t i = 0; i < vec_size; ++i)
246  {
247  const auto & current_set = vals[i].first;
248  unsigned int value = 0;
249  for (auto number : current_set)
250  TIMPI_UNIT_ASSERT(number == value++);
251  TIMPI_UNIT_ASSERT(vals[i].second == i);
252  }
253  }
254 
256  {
257  std::set<unsigned int> val;
258  const unsigned int my_rank = TestCommWorld->rank();
259 
260  if (my_rank == 0)
261  val.insert(0);
262 
263  TestCommWorld->broadcast(val);
264 
265  TIMPI_UNIT_ASSERT(val.size() == 1);
266  TIMPI_UNIT_ASSERT(*val.begin() == 0);
267  }
268 
270  {
271  std::vector<std::set<unsigned int>> vals;
272  const unsigned int my_rank = TestCommWorld->rank();
273  const std::size_t comm_size = TestCommWorld->size();
274 
275  if (my_rank == 0)
276  {
277  vals.resize(comm_size + 1);
278  unsigned int counter = 1;
279  for (auto & val : vals)
280  {
281  for (unsigned int number = 0; number < counter; ++number)
282  val.insert(number);
283  ++counter;
284  }
285  }
286  TestCommWorld->broadcast(vals);
287 
288  const std::size_t vec_size = vals.size();
289  TIMPI_UNIT_ASSERT((comm_size + 1) == vec_size);
290 
291  std::size_t counter = 1;
292  for (const auto & current_set : vals)
293  {
294  TIMPI_UNIT_ASSERT(current_set.size() == counter);
295  unsigned int number = 0;
296  for (auto elem : current_set)
297  TIMPI_UNIT_ASSERT(elem == number++);
298  ++counter;
299  }
300  }
301 
302  // Data to send/recieve with each processor rank. For this test, processor p
303  // will send to destination d a set with d+1 elements numbered from 0 to d, in
304  // a vector with sqrt(c)+1 copies, iff c := |p-d| is a square number.
305  void fill_data
306  (std::map<processor_id_type, std::vector<std::set<unsigned int>>> & data,
307  int M)
308  {
309  const int rank = TestCommWorld->rank();
310  for (int d=0; d != M; ++d)
311  {
312  const int diffsize = std::abs(d-rank);
313  const int diffsqrt = std::sqrt(diffsize);
314  if (diffsqrt*diffsqrt == diffsize)
315  for (int i=-1; i != diffsqrt; ++i)
316  data[d].push_back(createSet<unsigned int>(d+1));
317  }
318  }
319 
320  void fill_data
321  (std::map<processor_id_type, std::vector<std::set<std::string>>> & data,
322  int M)
323  {
324  auto stringy_number = [] (int number)
325  {
326  std::string digit_strings [10] = {"zero", "one", "two",
327  "three", "four", "five", "six", "seven", "eight", "nine"};
328 
329  std::string returnval = "done";
330  while (number)
331  {
332  returnval = digit_strings[number%10]+" "+returnval;
333  number = number/10;
334  };
335 
336  return returnval;
337  };
338 
339  const int rank = TestCommWorld->rank();
340  for (int d=0; d != M; ++d)
341  {
342  const int diffsize = std::abs(d-rank);
343  const int diffsqrt = std::sqrt(diffsize);
344  if (diffsqrt*diffsqrt == diffsize)
345  for (int i=-1; i != diffsqrt; ++i)
346  {
347  data[d].push_back(std::set<std::string>());
348  for (int j=0; j!=d+1; ++j)
349  data[d].back().insert(stringy_number(j));
350  }
351  }
352  }
353 
354  void testPush()
355  {
356  const int size = TestCommWorld->size(),
357  rank = TestCommWorld->rank();
358 
359  std::map<processor_id_type, std::vector<std::set<unsigned int>>> data, received_data;
360 
361  fill_data(data, size);
362 
363  auto collect_data =
364  [&received_data]
365  (processor_id_type pid,
366  const typename std::vector<std::set<unsigned int>> & vecset_received)
367  {
368  auto & vec = received_data[pid];
369  vec.insert(vec.end(), vecset_received.begin(), vecset_received.end());
370  };
371 
372  push_parallel_vector_data(*TestCommWorld, data, collect_data);
373 
374  // We only need to check ourselves to see what we were sent
375  int p = rank;
376  for (int srcp=0; srcp != size; ++srcp)
377  {
378  auto map_it = received_data.find(srcp);
379 
380  const int diffsize = std::abs(srcp-p);
381  const int diffsqrt = std::sqrt(diffsize);
382  if (diffsqrt*diffsqrt != diffsize)
383  {
384  // We shouldn't have been sent anything from srcp!
385  TIMPI_UNIT_ASSERT(map_it == received_data.end() || map_it->second.empty());
386  continue;
387  }
388 
389  TIMPI_UNIT_ASSERT(map_it != received_data.end());
390  const std::vector<std::set<unsigned int>> & datum = map_it->second;
391  TIMPI_UNIT_ASSERT(datum.size() == static_cast<std::size_t>(diffsqrt+1));
392 
393  for (const auto & set : datum)
394  {
395  TIMPI_UNIT_ASSERT(set.size() == static_cast<std::size_t>((p+1)));
396 
397  unsigned int comparator = 0;
398  for (const auto element : set)
399  TIMPI_UNIT_ASSERT(element == comparator++);
400  }
401  }
402  }
403 
404  void testPull()
405  {
406  const int size = TestCommWorld->size();
407 
408  std::map<processor_id_type, std::vector<std::set<unsigned int>> > data, received_data;
409 
410  fill_data(data, size);
411 
412  auto compose_replies =
413  []
414  (processor_id_type /* pid */,
415  const std::vector<std::set<unsigned int>> & query,
416  std::vector<std::set<unsigned int>> & response)
417  {
418  const std::size_t query_size = query.size();
419  response.resize(query_size);
420  for (unsigned int i=0; i != query_size; ++i)
421  {
422  const auto & query_set = query[i];
423  for (const unsigned int elem : query_set)
424  response[i].insert(elem*elem);
425  }
426  };
427 
428 
429  auto collect_replies =
430  [&received_data]
431  (processor_id_type pid,
432  const std::vector<std::set<unsigned int>> & query,
433  const std::vector<std::set<unsigned int>> & response)
434  {
435  const std::size_t query_size = query.size();
436  TIMPI_UNIT_ASSERT(query_size == response.size());
437  for (unsigned int i=0; i != query_size; ++i)
438  {
439  TIMPI_UNIT_ASSERT(query[i].size() == response[i].size());
440 
441  auto query_set_it = query[i].begin(), response_set_it = response[i].begin();
442 
443  for (; query_set_it != query[i].end(); ++query_set_it, ++response_set_it)
444  {
445  const auto query_elem = *query_set_it, response_elem = *response_set_it;
446  TIMPI_UNIT_ASSERT(query_elem * query_elem == response_elem);
447  }
448  }
449  received_data[pid] = response;
450  };
451 
452  // Do the pull
453  std::set<unsigned int> * ex = nullptr;
455  (*TestCommWorld, data, compose_replies, collect_replies, ex);
456 
457  // Test the received results, for each query we sent.
458  for (int p=0; p != size; ++p)
459  {
460  TIMPI_UNIT_ASSERT(data[p].size() == received_data[p].size());
461  for (std::size_t i = 0; i != data[p].size(); ++i)
462  {
463  TIMPI_UNIT_ASSERT(data[p][i].size() == received_data[p][i].size());
464 
465  auto data_set_it = data[p][i].begin(), received_set_it = received_data[p][i].begin();
466 
467  for (; data_set_it != data[p][i].end(); ++data_set_it, ++received_set_it)
468  {
469  const auto data_elem = *data_set_it, received_elem = *received_set_it;
470  TIMPI_UNIT_ASSERT(data_elem * data_elem == received_elem);
471  }
472  }
473  }
474  }
475 
477  {
478  const int size = TestCommWorld->size();
479 
480  std::map<processor_id_type, std::vector<std::set<std::string>> > data, received_data;
481 
482  fill_data(data, size);
483 
484  auto compose_replies =
485  []
486  (processor_id_type /* pid */,
487  const std::vector<std::set<std::string>> & query,
488  std::vector<std::set<std::string>> & response)
489  {
490  const std::size_t query_size = query.size();
491  response.resize(query_size);
492  for (unsigned int i=0; i != query_size; ++i)
493  {
494  const auto & query_set = query[i];
495  for (const std::string & elem : query_set)
496  response[i].insert(elem+elem);
497  }
498  };
499 
500 
501  auto collect_replies =
502  [&received_data]
503  (processor_id_type pid,
504  const std::vector<std::set<std::string>> & query,
505  const std::vector<std::set<std::string>> & response)
506  {
507  const std::size_t query_size = query.size();
508  TIMPI_UNIT_ASSERT(query_size == response.size());
509  for (unsigned int i=0; i != query_size; ++i)
510  {
511  TIMPI_UNIT_ASSERT(query[i].size() == response[i].size());
512 
513  auto query_set_it = query[i].begin(), response_set_it = response[i].begin();
514 
515  for (; query_set_it != query[i].end(); ++query_set_it, ++response_set_it)
516  {
517  const auto query_elem = *query_set_it, response_elem = *response_set_it;
518  TIMPI_UNIT_ASSERT(query_elem + query_elem == response_elem);
519  }
520  }
521  received_data[pid] = response;
522  };
523 
524  // Do the pull
525  std::set<std::string> * ex = nullptr;
527  (*TestCommWorld, data, compose_replies, collect_replies, ex);
528 
529  // Test the received results, for each query we sent.
530  for (int p=0; p != size; ++p)
531  {
532  TIMPI_UNIT_ASSERT(data[p].size() == received_data[p].size());
533  for (std::size_t i = 0; i != data[p].size(); ++i)
534  {
535  TIMPI_UNIT_ASSERT(data[p][i].size() == received_data[p][i].size());
536 
537  auto data_set_it = data[p][i].begin(), received_set_it = received_data[p][i].begin();
538 
539  for (; data_set_it != data[p][i].end(); ++data_set_it, ++received_set_it)
540  {
541  const auto data_elem = *data_set_it, received_elem = *received_set_it;
542  TIMPI_UNIT_ASSERT(data_elem + data_elem == received_elem);
543  }
544  }
545  }
546  }
547 
548 int main(int argc, const char * const * argv)
549 {
550  TIMPI::TIMPIInit init(argc, argv);
551  TestCommWorld = &init.comm();
552 
553  testContainerAllGather<std::list<unsigned int>>();
554  testContainerAllGather<std::set<unsigned int>>();
555  testContainerAllGather<std::unordered_set<unsigned int>>();
556  testContainerAllGather<std::multiset<unsigned int>>();
557  testContainerAllGather<std::unordered_multiset<unsigned int>>();
558  testMapContainerAllGather<std::map<unsigned int, unsigned int>>();
559  testMapContainerAllGather<std::unordered_map<unsigned int, unsigned int>>();
560  testMapContainerAllGather<std::multimap<unsigned int, unsigned int>>();
561  testMapContainerAllGather<std::unordered_multimap<unsigned int, unsigned int>>();
569 
570  testPush();
571  testPull();
572  testPullPacked();
573 
575  testPush();
576  testPull();
577  testPullPacked();
578 
580  testPush();
581  testPull();
582  testPullPacked();
583 
584  return 0;
585 }
void testPullPacked()
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
void allgather(const T &send_data, std::vector< T, A > &recv_data) const
Take a vector of length this->size(), and fill in recv[processor_id] = the value of send on that proc...
void fill_data(std::map< processor_id_type, std::vector< std::set< unsigned int >>> &data, int M)
Container createMapContainer(std::size_t size)
Communicator * TestCommWorld
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
void testMapContainerAllGather()
void testContainerBroadcast()
The TIMPIInit class, when constructed, initializes any dependent libraries (e.g.
Definition: timpi_init.h:57
processor_id_type rank() const
Definition: communicator.h:208
void testPush()
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: standard_type.h:83
void testPackedSetUnion()
std::set< T > createSet(std::size_t size)
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
processor_id_type size() const
Definition: communicator.h:211
void testArrayContainerAllGather()
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
std::string stringy_number(int number)
uint8_t processor_id_type
Definition: communicator.h:54
void testPairContainerAllGather()
int main(int argc, const char *const *argv)
void testVectorOfContainersBroadcast()
void broadcast(T &data, const unsigned int root_id=0, const bool identical_sizes=false) const
Take a local value and broadcast it to all processors.
StandardType<T>&#39;s which do not define a way to MPI_Type T should inherit from this class...
Definition: data_type.h:120
void testTupleContainerAllGather()
void testVectorOfContainersAllGather()
void testContainerAllGather()
void testPull()
Container createContainer(std::size_t size)
const Communicator & comm() const
Returns the Communicator created by this object, which will be a compatibility shim if MPI is not ena...
Definition: timpi_init.h:100
void set_union(T &data, const unsigned int root_id) const
Take a container (set, map, unordered_set, multimap, etc) of local variables on each processor...