libMesh
parallel_sync_test.C
Go to the documentation of this file.
1 
2 // libMesh includes
3 #include <libmesh/int_range.h>
4 #include <libmesh/simple_range.h>
5 
6 // Using a *shim* here to test backwards compatibility
7 #include <libmesh/parallel_sync.h>
8 
9 #include <algorithm>
10 
11 #include "test_comm.h"
12 #include "libmesh_cppunit.h"
13 
14 
15 using namespace libMesh;
16 
17 class ParallelSyncTest : public CppUnit::TestCase {
18 public:
19  CPPUNIT_TEST_SUITE( ParallelSyncTest );
20 
21  // Our sync functions are most typically used with a map of
22  // processor ids that *only* includes ranks currently running.
23  CPPUNIT_TEST( testPush );
24  CPPUNIT_TEST( testPull );
25  CPPUNIT_TEST( testPushVecVec );
26  CPPUNIT_TEST( testPullVecVec );
27  CPPUNIT_TEST( testPushMultimap );
28  CPPUNIT_TEST( testPushMultimapVecVec );
29 
30  // Our sync functions need to support sending to ranks that don't
31  // exist! If we're on N processors but working on a mesh
32  // partitioned into M parts with M > N, then subpartition p belongs
33  // to processor p%N. Let's make M > N for these tests.
34  CPPUNIT_TEST( testPushOversized );
35  CPPUNIT_TEST( testPullOversized );
36  CPPUNIT_TEST( testPushVecVecOversized );
37  CPPUNIT_TEST( testPullVecVecOversized );
38  CPPUNIT_TEST( testPushMultimapOversized );
39  CPPUNIT_TEST( testPushMultimapVecVecOversized );
40 
41  CPPUNIT_TEST_SUITE_END();
42 
43 public:
44  void setUp()
45  {}
46 
47  void tearDown()
48  {}
49 
50 
51  // Data to send/recieve with each processor rank. For this test,
52  // processor p will send to destination d the integer d, in a vector
53  // with sqrt(c)+1 copies, iff c := |p-d| is a square number.
54  void fill_scalar_data
55  (std::map<processor_id_type, std::vector<unsigned int>> & data,
56  int M)
57  {
58  const int rank = TestCommWorld->rank();
59  for (int d=0; d != M; ++d)
60  {
61  int diffsize = std::abs(d-rank);
62  int diffsqrt = std::sqrt(diffsize);
63  if (diffsqrt*diffsqrt == diffsize)
64  for (int i=-1; i != diffsqrt; ++i)
65  data[d].push_back(d);
66  }
67  }
68 
69 
70  // Multimap data to send/recieve with each processor rank. For this
71  // test, processor p will send to destination d the integer d, in a
72  // vector with sqrt(c)+1 copies followed by a vector with 1 copy,
73  // iff c := |p-d| is a square number.
74  void fill_scalar_data
75  (std::multimap<processor_id_type, std::vector<unsigned int>> & data,
76  int M)
77  {
78  const int rank = TestCommWorld->rank();
79  for (int d=0; d != M; ++d)
80  {
81  int diffsize = std::abs(d-rank);
82  int diffsqrt = std::sqrt(diffsize);
83  if (diffsqrt*diffsqrt == diffsize)
84  {
85  std::vector<unsigned int> v;
86  for (int i=-1; i != diffsqrt; ++i)
87  v.push_back(d);
88  data.emplace(d, v);
89  v.resize(1, d);
90  data.emplace(d, v);
91  }
92  }
93  }
94 
95 
96  // Data to send/recieve with each processor rank. For this test,
97  // processor p will send to destination d the integer d, in two
98  // subvectors with sqrt(c) and 1 copies, iff c := |p-d| is a square
99  // number.
100  void fill_vector_data
101  (std::map<processor_id_type, std::vector<std::vector<unsigned int>>> & data,
102  int M)
103  {
104  const int rank = TestCommWorld->rank();
105  for (int d=0; d != M; ++d)
106  {
107  int diffsize = std::abs(d-rank);
108  int diffsqrt = std::sqrt(diffsize);
109  if (diffsqrt*diffsqrt == diffsize)
110  {
111  data[d].resize(2);
112  for (int i=-1; i != diffsqrt; ++i)
113  data[d][0].push_back(d);
114  data[d][1].push_back(d);
115  }
116  }
117  }
118 
119 
120 
121  // Multimap data to send/recieve with each processor rank. For this
122  // test, processor p will send to destination d the integer d, in
123  // two subvectors with sqrt(c) and 1 copies, followed by a vector
124  // with 1 copy, iff c := |p-d| is a square number.
125  void fill_vector_data
126  (std::multimap<processor_id_type, std::vector<std::vector<unsigned int>>> & data,
127  int M)
128  {
129  const int rank = TestCommWorld->rank();
130  for (int d=0; d != M; ++d)
131  {
132  int diffsize = std::abs(d-rank);
133  int diffsqrt = std::sqrt(diffsize);
134  if (diffsqrt*diffsqrt == diffsize)
135  {
136  std::vector<std::vector<unsigned int>> vv(2);
137  for (int i=-1; i != diffsqrt; ++i)
138  vv[0].push_back(d);
139  vv[1].push_back(d);
140  data.emplace(d, vv);
141  vv.resize(1);
142  vv[0].resize(1);
143  data.emplace(d, vv);
144  }
145  }
146  }
147 
148 
149  void testPushImpl(int M)
150  {
151  const int size = TestCommWorld->size(),
152  rank = TestCommWorld->rank();
153 
154  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
155 
156  fill_scalar_data(data, M);
157 
158  auto collect_data =
159  [&received_data]
160  (processor_id_type pid,
161  const typename std::vector<unsigned int> & data)
162  {
163  auto & vec = received_data[pid];
164  vec.insert(vec.end(), data.begin(), data.end());
165  };
166 
167  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
168 
169  // Test the received results, for each processor id p we're in
170  // charge of.
171  std::vector<std::size_t> checked_sizes(size, 0);
172  for (int p=rank; p != M; p += size)
173  for (int srcp=0; srcp != size; ++srcp)
174  {
175  int diffsize = std::abs(srcp-p);
176  int diffsqrt = std::sqrt(diffsize);
177  if (diffsqrt*diffsqrt != diffsize)
178  {
179  if (received_data.count(srcp))
180  {
181  const std::vector<unsigned int> & datum = received_data[srcp];
182  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(0));
183  }
184  continue;
185  }
186 
187  CPPUNIT_ASSERT_EQUAL(received_data.count(srcp), std::size_t(1));
188  const std::vector<unsigned int> & datum = received_data[srcp];
189  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(diffsqrt+1));
190  checked_sizes[srcp] += diffsqrt+1;
191  }
192 
193  for (int srcp=0; srcp != size; ++srcp)
194  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], received_data[srcp].size());
195  }
196 
197 
198  void testPush()
199  {
200  testPushImpl(TestCommWorld->size());
201  }
202 
203 
205  {
206  testPushImpl((TestCommWorld->size() + 4) * 2);
207  }
208 
209 
210  void testPullImpl(int M)
211  {
212  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
213 
214  fill_scalar_data(data, M);
215 
216  auto compose_replies =
217  []
218  (processor_id_type pid,
219  const std::vector<unsigned int> & query,
220  std::vector<unsigned int> & response)
221  {
222  const std::size_t query_size = query.size();
223  response.resize(query_size);
224  for (unsigned int i=0; i != query_size; ++i)
225  response[i] = query[i]*query[i];
226  };
227 
228 
229  auto collect_replies =
230  [&received_data]
231  (processor_id_type pid,
232  const std::vector<unsigned int> & query,
233  const std::vector<unsigned int> & response)
234  {
235  const std::size_t query_size = query.size();
236  CPPUNIT_ASSERT_EQUAL(query_size, response.size());
237  for (unsigned int i=0; i != query_size; ++i)
238  {
239  CPPUNIT_ASSERT_EQUAL(query[i]*query[i], response[i]);
240  }
241  received_data[pid] = response;
242  };
243 
244  // Do the pull
245  unsigned int * ex = nullptr;
246  Parallel::pull_parallel_vector_data
247  (*TestCommWorld, data, compose_replies, collect_replies, ex);
248 
249  // Test the received results, for each query we sent.
250  for (int p=0; p != M; ++p)
251  {
252  CPPUNIT_ASSERT_EQUAL(data[p].size(), received_data[p].size());
253  for (auto i : index_range(data[p]))
254  CPPUNIT_ASSERT_EQUAL(data[p][i]*data[p][i], received_data[p][i]);
255  }
256  }
257 
258 
259  void testPull()
260  {
261  testPullImpl(TestCommWorld->size());
262  }
263 
264 
266  {
267  testPullImpl((TestCommWorld->size() + 4) * 2);
268  }
269 
270 
271  void testPushVecVecImpl(int M)
272  {
273  const int size = TestCommWorld->size(),
274  rank = TestCommWorld->rank();
275 
276  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
277  std::map<processor_id_type, std::vector<unsigned int>> received_data;
278 
279  fill_vector_data(data, M);
280 
281  auto collect_data =
282  [&received_data]
283  (processor_id_type pid,
284  const typename std::vector<std::vector<unsigned int>> & data)
285  {
286  auto & vec = received_data[pid];
287  vec.insert(vec.end(), data[0].begin(), data[0].end());
288  CPPUNIT_ASSERT_EQUAL(data.size(), std::size_t(2));
289  CPPUNIT_ASSERT_EQUAL(data[1].size(), std::size_t(1));
290  CPPUNIT_ASSERT_EQUAL(data[0][0], data[1][0]);
291  };
292 
293  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
294 
295  // Test the received results, for each processor id p we're in
296  // charge of.
297  std::vector<std::size_t> checked_sizes(size, 0);
298  for (int p=rank; p != M; p += size)
299  for (int srcp=0; srcp != size; ++srcp)
300  {
301  int diffsize = std::abs(srcp-p);
302  int diffsqrt = std::sqrt(diffsize);
303  if (diffsqrt*diffsqrt != diffsize)
304  {
305  if (received_data.count(srcp))
306  {
307  const std::vector<unsigned int> & datum = received_data[srcp];
308  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(0));
309  }
310  continue;
311  }
312 
313  CPPUNIT_ASSERT_EQUAL(received_data.count(srcp), std::size_t(1));
314  const std::vector<unsigned int> & datum = received_data[srcp];
315  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(diffsqrt+1));
316  checked_sizes[srcp] += diffsqrt+1;
317  }
318 
319  for (int srcp=0; srcp != size; ++srcp)
320  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], received_data[srcp].size());
321  }
322 
323 
325  {
326  testPushVecVecImpl(TestCommWorld->size());
327  }
328 
329 
331  {
332  testPushVecVecImpl((TestCommWorld->size() + 4) * 2);
333  }
334 
335 
336  void testPullVecVecImpl(int M)
337  {
338  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
339  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> received_data;
340 
341  fill_vector_data(data, M);
342 
343  auto compose_replies =
344  []
345  (processor_id_type pid,
346  const std::vector<std::vector<unsigned int>> & query,
347  std::vector<std::vector<unsigned int>> & response)
348  {
349  const std::size_t query_size = query.size();
350  response.resize(query_size);
351  for (unsigned int i=0; i != query_size; ++i)
352  {
353  const std::size_t query_i_size = query[i].size();
354  response[i].resize(query_i_size);
355  for (unsigned int j=0; j != query_i_size; ++j)
356  response[i][j] = query[i][j]*query[i][j];
357  }
358  };
359 
360 
361  auto collect_replies =
362  [&received_data]
363  (processor_id_type pid,
364  const std::vector<std::vector<unsigned int>> & query,
365  const std::vector<std::vector<unsigned int>> & response)
366  {
367  const std::size_t query_size = query.size();
368  CPPUNIT_ASSERT_EQUAL(query_size, response.size());
369  for (unsigned int i=0; i != query_size; ++i)
370  {
371  const std::size_t query_i_size = query[i].size();
372  CPPUNIT_ASSERT_EQUAL(query_i_size, response[i].size());
373  for (unsigned int j=0; j != query_i_size; ++j)
374  CPPUNIT_ASSERT_EQUAL(query[i][j]*query[i][j], response[i][j]);
375  }
376  auto & vec = received_data[pid];
377  vec.emplace_back(response[0].begin(), response[0].end());
378  CPPUNIT_ASSERT_EQUAL(response[1].size(), std::size_t(1));
379  CPPUNIT_ASSERT_EQUAL(response[1][0], response[0][0]);
380  vec.emplace_back(response[1].begin(), response[1].end());
381  };
382 
383  // Do the pull
384  std::vector<unsigned int> * ex = nullptr;
385  Parallel::pull_parallel_vector_data
386  (*TestCommWorld, data, compose_replies, collect_replies, ex);
387 
388  // Test the received results, for each query we sent.
389  for (int p=0; p != M; ++p)
390  {
391  CPPUNIT_ASSERT_EQUAL(data[p].size(), received_data[p].size());
392  for (auto i : index_range(data[p]))
393  for (auto j : index_range(data[p][i]))
394  CPPUNIT_ASSERT_EQUAL(data[p][i][j]*data[p][i][j], received_data[p][i][j]);
395  }
396  }
397 
398 
400  {
401  testPullVecVecImpl(TestCommWorld->size());
402  }
403 
404 
406  {
407  testPushVecVecImpl((TestCommWorld->size() + 4) * 2);
408  }
409 
410 
412  {
413  const int size = TestCommWorld->size(),
414  rank = TestCommWorld->rank();
415 
416  // This is going to make sense because of C++11's guarantees
417  // regarding preservation of insert ordering in multimaps,
418  // combined with MPI's guarantees about non-overtaking
419  std::multimap<processor_id_type, std::vector<unsigned int> > data, received_data;
420 
421  fill_scalar_data(data, M);
422 
423  auto collect_data =
424  [&received_data]
425  (processor_id_type pid,
426  const typename std::vector<unsigned int> & data)
427  {
428  received_data.emplace(pid, data);
429  };
430 
431  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
432 
433  // Test the received results, for each processor id p we're in
434  // charge of.
435  std::vector<std::size_t> checked_sizes(size, 0);
436  for (int p=rank; p != M; p += size)
437  for (int srcp=0; srcp != size; ++srcp)
438  {
439  int diffsize = std::abs(srcp-p);
440  int diffsqrt = std::sqrt(diffsize);
441  auto rng = received_data.equal_range(srcp);
442  if (diffsqrt*diffsqrt != diffsize)
443  {
444  for (auto & pv_it : as_range(rng))
445  {
446  CPPUNIT_ASSERT_EQUAL(std::count(pv_it.second.begin(), pv_it.second.end(), p), std::ptrdiff_t(0));
447  }
448  continue;
449  }
450 
451  CPPUNIT_ASSERT(rng.first != rng.second);
452  for (auto pv_it = rng.first; pv_it != rng.second; ++pv_it)
453  {
454  std::ptrdiff_t cnt = std::count(pv_it->second.begin(), pv_it->second.end(), p);
455  if (cnt)
456  {
457  CPPUNIT_ASSERT_EQUAL(cnt, std::ptrdiff_t(diffsqrt+1));
458  auto pv_it2 = pv_it; ++pv_it2;
459  CPPUNIT_ASSERT(pv_it2 != rng.second);
460  std::ptrdiff_t cnt2 = std::count(pv_it2->second.begin(), pv_it2->second.end(), p);
461  CPPUNIT_ASSERT_EQUAL(cnt2, std::ptrdiff_t(1));
462  checked_sizes[srcp] += cnt + cnt2;
463  break;
464  }
465  }
466  }
467 
468  for (int srcp=0; srcp != size; ++srcp)
469  {
470  std::size_t total_size = 0;
471  for (auto & pv_it : as_range(received_data.equal_range(srcp)))
472  total_size += pv_it.second.size();
473  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], total_size);
474  }
475  }
476 
477 
479  {
480  testPushMultimapImpl(TestCommWorld->size());
481  }
482 
483 
485  {
486  testPushMultimapImpl((TestCommWorld->size() + 4) * 2);
487  }
488 
489 
491  {
492  const int size = TestCommWorld->size(),
493  rank = TestCommWorld->rank();
494 
495  // This is going to make sense because of C++11's guarantees
496  // regarding preservation of insert ordering in multimaps,
497  // combined with MPI's guarantees about non-overtaking
498  std::multimap<processor_id_type, std::vector<std::vector<unsigned int>>> data, received_data;
499 
500  fill_vector_data(data, M);
501 
502  auto collect_data =
503  [&received_data]
504  (processor_id_type pid,
505  const typename std::vector<std::vector<unsigned int>> & data)
506  {
507  received_data.emplace(pid, data);
508  };
509 
510  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
511 
512  // Test the received results, for each processor id p we're in
513  // charge of.
514  std::vector<std::size_t> checked_sizes(size, 0);
515  for (int p=rank; p != M; p += size)
516  for (int srcp=0; srcp != size; ++srcp)
517  {
518  int diffsize = std::abs(srcp-p);
519  int diffsqrt = std::sqrt(diffsize);
520  auto rng = received_data.equal_range(srcp);
521  if (diffsqrt*diffsqrt != diffsize)
522  {
523  for (auto & pvv : as_range(rng))
524  {
525  for (auto & v : pvv.second)
526  CPPUNIT_ASSERT_EQUAL(std::count(v.begin(), v.end(), p), std::ptrdiff_t(0));
527  }
528  continue;
529  }
530 
531  CPPUNIT_ASSERT(rng.first != rng.second);
532  for (auto pvv_it = rng.first; pvv_it != rng.second; ++pvv_it)
533  {
534  if(pvv_it->second.size() != std::size_t(2))
535  libmesh_error();
536  CPPUNIT_ASSERT_EQUAL(pvv_it->second.size(), std::size_t(2));
537  std::ptrdiff_t cnt = std::count(pvv_it->second[0].begin(), pvv_it->second[0].end(), p);
538  if (cnt)
539  {
540  CPPUNIT_ASSERT_EQUAL(cnt, std::ptrdiff_t(diffsqrt+1));
541  std::ptrdiff_t cnt2 = std::count(pvv_it->second[1].begin(), pvv_it->second[1].end(), p);
542  CPPUNIT_ASSERT_EQUAL(cnt2, std::ptrdiff_t(1));
543  auto pvv_it2 = pvv_it; ++pvv_it2;
544  CPPUNIT_ASSERT(pvv_it2 != rng.second);
545  CPPUNIT_ASSERT_EQUAL(pvv_it2->second.size(), std::size_t(1));
546  std::ptrdiff_t cnt3 = std::count(pvv_it2->second[0].begin(), pvv_it2->second[0].end(), p);
547  CPPUNIT_ASSERT_EQUAL(cnt3, std::ptrdiff_t(1));
548  checked_sizes[srcp] += cnt + cnt2 + cnt3;
549  break;
550  }
551  ++pvv_it;
552  libmesh_assert(pvv_it != rng.second);
553  }
554  }
555 
556  for (int srcp=0; srcp != size; ++srcp)
557  {
558  std::size_t total_size = 0;
559  for (auto & pvv : as_range(received_data.equal_range(srcp)))
560  for (auto & v : pvv.second)
561  total_size += v.size();
562  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], total_size);
563  }
564  }
565 
566 
568  {
569  testPushMultimapVecVecImpl(TestCommWorld->size());
570  }
571 
572 
574  {
575  testPushMultimapVecVecImpl((TestCommWorld->size() + 4) * 2);
576  }
577 
578 };
579 
ParallelSyncTest::testPullVecVec
void testPullVecVec()
Definition: parallel_sync_test.C:399
ParallelSyncTest::testPushVecVecOversized
void testPushVecVecOversized()
Definition: parallel_sync_test.C:330
CPPUNIT_TEST_SUITE_REGISTRATION
CPPUNIT_TEST_SUITE_REGISTRATION(ParallelSyncTest)
ParallelSyncTest::testPushMultimapVecVecImpl
void testPushMultimapVecVecImpl(int M)
Definition: parallel_sync_test.C:490
ParallelSyncTest::testPullVecVecOversized
void testPullVecVecOversized()
Definition: parallel_sync_test.C:405
libMesh::index_range
IntRange< std::size_t > index_range(const std::vector< T > &vec)
Helper function that returns an IntRange<std::size_t> representing all the indices of the passed-in v...
Definition: int_range.h:106
libMesh
The libMesh namespace provides an interface to certain functionality in the library.
Definition: factoryfunction.C:55
end
IterBase * end
Also have a polymorphic pointer to the end object, this prevents iterating past the end.
Definition: variant_filter_iterator.h:343
std::sqrt
MetaPhysicL::DualNumber< T, D > sqrt(const MetaPhysicL::DualNumber< T, D > &in)
ParallelSyncTest::testPushVecVecImpl
void testPushVecVecImpl(int M)
Definition: parallel_sync_test.C:271
ParallelSyncTest::testPullOversized
void testPullOversized()
Definition: parallel_sync_test.C:265
libMesh::libmesh_assert
libmesh_assert(ctx)
ParallelSyncTest::testPushOversized
void testPushOversized()
Definition: parallel_sync_test.C:204
std::abs
MetaPhysicL::DualNumber< T, D > abs(const MetaPhysicL::DualNumber< T, D > &in)
ParallelSyncTest::testPushMultimapVecVec
void testPushMultimapVecVec()
Definition: parallel_sync_test.C:567
ParallelSyncTest::setUp
void setUp()
Definition: parallel_sync_test.C:44
ParallelSyncTest
Definition: parallel_sync_test.C:17
ParallelSyncTest::testPush
void testPush()
Definition: parallel_sync_test.C:198
TestCommWorld
libMesh::Parallel::Communicator * TestCommWorld
Definition: driver.C:111
ParallelSyncTest::testPullVecVecImpl
void testPullVecVecImpl(int M)
Definition: parallel_sync_test.C:336
libMesh::processor_id_type
uint8_t processor_id_type
Definition: id_types.h:104
ParallelSyncTest::testPushMultimap
void testPushMultimap()
Definition: parallel_sync_test.C:478
query
query_obj query
Definition: mesh_communication.C:1426
ParallelSyncTest::testPull
void testPull()
Definition: parallel_sync_test.C:259
libMesh::as_range
SimpleRange< IndexType > as_range(const std::pair< IndexType, IndexType > &p)
Helper function that allows us to treat a homogenous pair as a range.
Definition: simple_range.h:57
ParallelSyncTest::testPushImpl
void testPushImpl(int M)
Definition: parallel_sync_test.C:149
ParallelSyncTest::testPushMultimapVecVecOversized
void testPushMultimapVecVecOversized()
Definition: parallel_sync_test.C:573
ParallelSyncTest::testPullImpl
void testPullImpl(int M)
Definition: parallel_sync_test.C:210
libmesh_cppunit.h
ParallelSyncTest::testPushMultimapImpl
void testPushMultimapImpl(int M)
Definition: parallel_sync_test.C:411
data
IterBase * data
Ideally this private member data should have protected access.
Definition: variant_filter_iterator.h:337
ParallelSyncTest::testPushVecVec
void testPushVecVec()
Definition: parallel_sync_test.C:324
test_comm.h
ParallelSyncTest::testPushMultimapOversized
void testPushMultimapOversized()
Definition: parallel_sync_test.C:484
ParallelSyncTest::tearDown
void tearDown()
Definition: parallel_sync_test.C:47