libMesh
parallel_sync_test.C
Go to the documentation of this file.
1 
2 // libMesh includes
3 #include <libmesh/int_range.h>
4 #include <libmesh/simple_range.h>
5 
6 // Using a *shim* here to test backwards compatibility
7 #include <libmesh/parallel_sync.h>
8 
9 #include <algorithm>
10 
11 #include "test_comm.h"
12 #include "libmesh_cppunit.h"
13 
14 
15 using namespace libMesh;
16 
17 class ParallelSyncTest : public CppUnit::TestCase {
18 public:
19  LIBMESH_CPPUNIT_TEST_SUITE( ParallelSyncTest );
20 
21  // Our sync functions are most typically used with a map of
22  // processor ids that *only* includes ranks currently running.
23  CPPUNIT_TEST( testPush );
24  CPPUNIT_TEST( testPull );
25  CPPUNIT_TEST( testPushVecVec );
26  CPPUNIT_TEST( testPullVecVec );
27 
28  // Our sync functions need to support sending to ranks that don't
29  // exist! If we're on N processors but working on a mesh
30  // partitioned into M parts with M > N, then subpartition p belongs
31  // to processor p%N. Let's make M > N for these tests.
32  CPPUNIT_TEST( testPushOversized );
33  CPPUNIT_TEST( testPushVecVecOversized );
34 
35  CPPUNIT_TEST_SUITE_END();
36 
37 public:
38  void setUp()
39  {}
40 
41  void tearDown()
42  {}
43 
44 
45  // Data to send/receive with each processor rank. For this test,
46  // processor p will send to destination d the integer d, in a vector
47  // with sqrt(c)+1 copies, iff c := |p-d| is a square number.
48  void fill_scalar_data
49  (std::map<processor_id_type, std::vector<unsigned int>> & data,
50  int M)
51  {
52  const int rank = TestCommWorld->rank();
53  for (int d=0; d != M; ++d)
54  {
55  int diffsize = std::abs(d-rank);
56  int diffsqrt = std::sqrt(diffsize);
57  if (diffsqrt*diffsqrt == diffsize)
58  for (int i=-1; i != diffsqrt; ++i)
59  data[d].push_back(d);
60  }
61  }
62 
63 
64  // Multimap data to send/receive with each processor rank. For this
65  // test, processor p will send to destination d the integer d, in a
66  // vector with sqrt(c)+1 copies followed by a vector with 1 copy,
67  // iff c := |p-d| is a square number.
68  void fill_scalar_data
69  (std::multimap<processor_id_type, std::vector<unsigned int>> & data,
70  int M)
71  {
72  const int rank = TestCommWorld->rank();
73  for (int d=0; d != M; ++d)
74  {
75  int diffsize = std::abs(d-rank);
76  int diffsqrt = std::sqrt(diffsize);
77  if (diffsqrt*diffsqrt == diffsize)
78  {
79  std::vector<unsigned int> v;
80  for (int i=-1; i != diffsqrt; ++i)
81  v.push_back(d);
82  data.emplace(d, v);
83  v.resize(1, d);
84  data.emplace(d, v);
85  }
86  }
87  }
88 
89 
90  // Data to send/receive with each processor rank. For this test,
91  // processor p will send to destination d the integer d, in two
92  // subvectors with sqrt(c) and 1 copies, iff c := |p-d| is a square
93  // number.
94  void fill_vector_data
95  (std::map<processor_id_type, std::vector<std::vector<unsigned int>>> & data,
96  int M)
97  {
98  const int rank = TestCommWorld->rank();
99  for (int d=0; d != M; ++d)
100  {
101  int diffsize = std::abs(d-rank);
102  int diffsqrt = std::sqrt(diffsize);
103  if (diffsqrt*diffsqrt == diffsize)
104  {
105  data[d].resize(2);
106  for (int i=-1; i != diffsqrt; ++i)
107  data[d][0].push_back(d);
108  data[d][1].push_back(d);
109  }
110  }
111  }
112 
113 
114 
115  // Multimap data to send/receive with each processor rank. For this
116  // test, processor p will send to destination d the integer d, in
117  // two subvectors with sqrt(c) and 1 copies, followed by a vector
118  // with 1 copy, iff c := |p-d| is a square number.
119  void fill_vector_data
120  (std::multimap<processor_id_type, std::vector<std::vector<unsigned int>>> & data,
121  int M)
122  {
123  const int rank = TestCommWorld->rank();
124  for (int d=0; d != M; ++d)
125  {
126  int diffsize = std::abs(d-rank);
127  int diffsqrt = std::sqrt(diffsize);
128  if (diffsqrt*diffsqrt == diffsize)
129  {
130  std::vector<std::vector<unsigned int>> vv(2);
131  for (int i=-1; i != diffsqrt; ++i)
132  vv[0].push_back(d);
133  vv[1].push_back(d);
134  data.emplace(d, vv);
135  vv.resize(1);
136  vv[0].resize(1);
137  data.emplace(d, vv);
138  }
139  }
140  }
141 
142 
143  void testPushImpl(int M)
144  {
145  const int size = TestCommWorld->size(),
146  rank = TestCommWorld->rank();
147 
148  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
149 
150  fill_scalar_data(data, M);
151 
152  auto collect_data =
153  [&received_data]
154  (processor_id_type pid,
155  const typename std::vector<unsigned int> & data)
156  {
157  auto & vec = received_data[pid];
158  vec.insert(vec.end(), data.begin(), data.end());
159  };
160 
161  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
162 
163  // Test the received results, for each processor id p we're in
164  // charge of.
165  std::vector<std::size_t> checked_sizes(size, 0);
166  for (int p=rank; p < M; p += size)
167  for (int srcp=0; srcp != size; ++srcp)
168  {
169  int diffsize = std::abs(srcp-p);
170  int diffsqrt = std::sqrt(diffsize);
171  if (diffsqrt*diffsqrt != diffsize)
172  {
173  if (received_data.count(srcp))
174  {
175  const std::vector<unsigned int> & datum = received_data[srcp];
176  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(0));
177  }
178  continue;
179  }
180 
181  CPPUNIT_ASSERT_EQUAL(received_data.count(srcp), std::size_t(1));
182  const std::vector<unsigned int> & datum = received_data[srcp];
183  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(diffsqrt+1));
184  checked_sizes[srcp] += diffsqrt+1;
185  }
186 
187  for (int srcp=0; srcp != size; ++srcp)
188  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], received_data[srcp].size());
189  }
190 
191 
192  void testPush()
193  {
194  LOG_UNIT_TEST;
195 
197  }
198 
199 
201  {
202  LOG_UNIT_TEST;
203 
204  testPushImpl((TestCommWorld->size() + 4) * 2);
205  }
206 
207 
208  void testPullImpl(int M)
209  {
210  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
211 
212  fill_scalar_data(data, M);
213 
214  auto compose_replies =
215  []
217  const std::vector<unsigned int> & query,
218  std::vector<unsigned int> & response)
219  {
220  const std::size_t query_size = query.size();
221  response.resize(query_size);
222  for (unsigned int i=0; i != query_size; ++i)
223  response[i] = query[i]*query[i];
224  };
225 
226 
227  auto collect_replies =
228  [&received_data]
229  (processor_id_type pid,
230  const std::vector<unsigned int> & query,
231  const std::vector<unsigned int> & response)
232  {
233  const std::size_t query_size = query.size();
234  CPPUNIT_ASSERT_EQUAL(query_size, response.size());
235  for (unsigned int i=0; i != query_size; ++i)
236  {
237  CPPUNIT_ASSERT_EQUAL(query[i]*query[i], response[i]);
238  }
239  received_data[pid] = response;
240  };
241 
242  // Do the pull
243  unsigned int * ex = nullptr;
244  Parallel::pull_parallel_vector_data
245  (*TestCommWorld, data, compose_replies, collect_replies, ex);
246 
247  // Test the received results, for each query we sent.
248  for (int p=0; p != M; ++p)
249  {
250  CPPUNIT_ASSERT_EQUAL(data[p].size(), received_data[p].size());
251  for (auto i : index_range(data[p]))
252  CPPUNIT_ASSERT_EQUAL(data[p][i]*data[p][i], received_data[p][i]);
253  }
254  }
255 
256 
257  void testPull()
258  {
259  LOG_UNIT_TEST;
260 
262  }
263 
264 
265  void testPushVecVecImpl(int M)
266  {
267  const int size = TestCommWorld->size(),
268  rank = TestCommWorld->rank();
269 
270  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
271  std::map<processor_id_type, std::vector<unsigned int>> received_data;
272 
273  fill_vector_data(data, M);
274 
275  auto collect_data =
276  [&received_data]
277  (processor_id_type pid,
278  const typename std::vector<std::vector<unsigned int>> & data)
279  {
280  auto & vec = received_data[pid];
281  vec.insert(vec.end(), data[0].begin(), data[0].end());
282  CPPUNIT_ASSERT_EQUAL(data.size(), std::size_t(2));
283  CPPUNIT_ASSERT_EQUAL(data[1].size(), std::size_t(1));
284  CPPUNIT_ASSERT_EQUAL(data[0][0], data[1][0]);
285  };
286 
287  Parallel::push_parallel_vector_data(*TestCommWorld, data, collect_data);
288 
289  // Test the received results, for each processor id p we're in
290  // charge of.
291  std::vector<std::size_t> checked_sizes(size, 0);
292  for (int p=rank; p < M; p += size)
293  for (int srcp=0; srcp != size; ++srcp)
294  {
295  int diffsize = std::abs(srcp-p);
296  int diffsqrt = std::sqrt(diffsize);
297  if (diffsqrt*diffsqrt != diffsize)
298  {
299  if (received_data.count(srcp))
300  {
301  const std::vector<unsigned int> & datum = received_data[srcp];
302  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(0));
303  }
304  continue;
305  }
306 
307  CPPUNIT_ASSERT_EQUAL(received_data.count(srcp), std::size_t(1));
308  const std::vector<unsigned int> & datum = received_data[srcp];
309  CPPUNIT_ASSERT_EQUAL(std::count(datum.begin(), datum.end(), p), std::ptrdiff_t(diffsqrt+1));
310  checked_sizes[srcp] += diffsqrt+1;
311  }
312 
313  for (int srcp=0; srcp != size; ++srcp)
314  CPPUNIT_ASSERT_EQUAL(checked_sizes[srcp], received_data[srcp].size());
315  }
316 
317 
319  {
320  LOG_UNIT_TEST;
321 
323  }
324 
325 
327  {
328  LOG_UNIT_TEST;
329 
330  testPushVecVecImpl((TestCommWorld->size() + 4) * 2);
331  }
332 
333 
334  void testPullVecVecImpl(int M)
335  {
336  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
337  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> received_data;
338 
339  fill_vector_data(data, M);
340 
341  auto compose_replies =
342  []
344  const std::vector<std::vector<unsigned int>> & query,
345  std::vector<std::vector<unsigned int>> & response)
346  {
347  const std::size_t query_size = query.size();
348  response.resize(query_size);
349  for (unsigned int i=0; i != query_size; ++i)
350  {
351  const std::size_t query_i_size = query[i].size();
352  response[i].resize(query_i_size);
353  for (unsigned int j=0; j != query_i_size; ++j)
354  response[i][j] = query[i][j]*query[i][j];
355  }
356  };
357 
358 
359  auto collect_replies =
360  [&received_data]
361  (processor_id_type pid,
362  const std::vector<std::vector<unsigned int>> & query,
363  const std::vector<std::vector<unsigned int>> & response)
364  {
365  const std::size_t query_size = query.size();
366  CPPUNIT_ASSERT_EQUAL(query_size, response.size());
367  for (unsigned int i=0; i != query_size; ++i)
368  {
369  const std::size_t query_i_size = query[i].size();
370  CPPUNIT_ASSERT_EQUAL(query_i_size, response[i].size());
371  for (unsigned int j=0; j != query_i_size; ++j)
372  CPPUNIT_ASSERT_EQUAL(query[i][j]*query[i][j], response[i][j]);
373  }
374  auto & vec = received_data[pid];
375  vec.emplace_back(response[0].begin(), response[0].end());
376  CPPUNIT_ASSERT_EQUAL(response[1].size(), std::size_t(1));
377  CPPUNIT_ASSERT_EQUAL(response[1][0], response[0][0]);
378  vec.emplace_back(response[1].begin(), response[1].end());
379  };
380 
381  // Do the pull
382  std::vector<unsigned int> * ex = nullptr;
383  Parallel::pull_parallel_vector_data
384  (*TestCommWorld, data, compose_replies, collect_replies, ex);
385 
386  // Test the received results, for each query we sent.
387  for (int p=0; p != M; ++p)
388  {
389  CPPUNIT_ASSERT_EQUAL(data[p].size(), received_data[p].size());
390  for (auto i : index_range(data[p]))
391  for (auto j : index_range(data[p][i]))
392  CPPUNIT_ASSERT_EQUAL(data[p][i][j]*data[p][i][j], received_data[p][i][j]);
393  }
394  }
395 
396 
398  {
399  LOG_UNIT_TEST;
400 
402  }
403 };
404 
void testPullVecVec()
void testPullVecVecImpl(int M)
void testPushImpl(int M)
void testPullVecVecImpl(int M)
void testPullImpl(int M)
libMesh::Parallel::Communicator * TestCommWorld
Definition: driver.C:171
void testPushVecVecImpl(int M)
processor_id_type rank() const
void testPush()
The libMesh namespace provides an interface to certain functionality in the library.
uint8_t processor_id_type
Definition: id_types.h:104
processor_id_type size() const
uint8_t processor_id_type
void testPushOversized()
void testPushImpl(int M)
query_obj query
void testPushVecVecImpl(int M)
void testPull()
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)
void testPullImpl(int M)
CPPUNIT_TEST_SUITE_REGISTRATION(ParallelSyncTest)
void fill_vector_data(std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)
void testPushVecVec()
void testPushVecVecOversized()
auto index_range(const T &sizable)
Helper function that returns an IntRange<std::size_t> representing all the indices of the passed-in v...
Definition: int_range.h:117