TIMPI
Functions | Variables
parallel_sync_unit.C File Reference

Go to the source code of this file.

Functions

void fill_scalar_data (std::map< processor_id_type, std::vector< unsigned int >> &data, int M)
 
void fill_scalar_data (std::multimap< processor_id_type, std::vector< unsigned int >> &data, int M)
 
void fill_vector_data (std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)
 
void fill_vector_data (std::multimap< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)
 
void testPushImpl (int M)
 
void testPush ()
 
void testPushOversized ()
 
void testPushMove ()
 
void testPullImpl (int M)
 
void testPull ()
 
void testPullOversized ()
 
void testPushVecVecImpl (int M)
 
void testPushVecVec ()
 
void testPushVecVecOversized ()
 
void testPullVecVecImpl (int M)
 
void testPullVecVec ()
 
void testPullVecVecOversized ()
 
void testPushMultimapImpl (int M)
 
void testPushMultimap ()
 
void testPushMultimapOversized ()
 
void testPushMultimapVecVecImpl (int M)
 
void testPushMultimapVecVec ()
 
void testPushMultimapVecVecOversized ()
 
void testEmptyEntry ()
 
void testStringSyncType ()
 
void run_tests ()
 
int main (int argc, const char *const *argv)
 

Variables

CommunicatorTestCommWorld
 

Function Documentation

◆ fill_scalar_data() [1/2]

void fill_scalar_data ( std::map< processor_id_type, std::vector< unsigned int >> &  data,
int  M 
)

Definition at line 19 of file parallel_sync_unit.C.

References TIMPI::Communicator::rank(), and TestCommWorld.

Referenced by testEmptyEntry(), testPullImpl(), testPushImpl(), testPushMove(), and testPushMultimapImpl().

21  {
22  const int rank = TestCommWorld->rank();
23  for (int d=0; d != M; ++d)
24  {
25  int diffsize = std::abs(d-rank);
26  int diffsqrt = std::sqrt(diffsize);
27  if (diffsqrt*diffsqrt == diffsize)
28  for (int i=-1; i != diffsqrt; ++i)
29  data[d].push_back(d);
30  }
31  }
processor_id_type rank() const
Definition: communicator.h:208
Communicator * TestCommWorld

◆ fill_scalar_data() [2/2]

void fill_scalar_data ( std::multimap< processor_id_type, std::vector< unsigned int >> &  data,
int  M 
)

Definition at line 39 of file parallel_sync_unit.C.

References TIMPI::Communicator::rank(), and TestCommWorld.

41  {
42  const int rank = TestCommWorld->rank();
43  for (int d=0; d != M; ++d)
44  {
45  int diffsize = std::abs(d-rank);
46  int diffsqrt = std::sqrt(diffsize);
47  if (diffsqrt*diffsqrt == diffsize)
48  {
49  std::vector<unsigned int> v;
50  for (int i=-1; i != diffsqrt; ++i)
51  v.push_back(d);
52  data.emplace(d, v);
53  v.resize(1, d);
54  data.emplace(d, v);
55  }
56  }
57  }
processor_id_type rank() const
Definition: communicator.h:208
Communicator * TestCommWorld

◆ fill_vector_data() [1/2]

void fill_vector_data ( std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &  data,
int  M 
)

Definition at line 65 of file parallel_sync_unit.C.

References TIMPI::Communicator::rank(), and TestCommWorld.

Referenced by testPullVecVecImpl(), testPushMultimapVecVecImpl(), and testPushVecVecImpl().

67  {
68  const int rank = TestCommWorld->rank();
69  for (int d=0; d != M; ++d)
70  {
71  int diffsize = std::abs(d-rank);
72  int diffsqrt = std::sqrt(diffsize);
73  if (diffsqrt*diffsqrt == diffsize)
74  {
75  data[d].resize(2);
76  for (int i=-1; i != diffsqrt; ++i)
77  data[d][0].push_back(d);
78  data[d][1].push_back(d);
79  }
80  }
81  }
processor_id_type rank() const
Definition: communicator.h:208
Communicator * TestCommWorld

◆ fill_vector_data() [2/2]

void fill_vector_data ( std::multimap< processor_id_type, std::vector< std::vector< unsigned int >>> &  data,
int  M 
)

Definition at line 90 of file parallel_sync_unit.C.

References TIMPI::Communicator::rank(), and TestCommWorld.

92  {
93  const int rank = TestCommWorld->rank();
94  for (int d=0; d != M; ++d)
95  {
96  int diffsize = std::abs(d-rank);
97  int diffsqrt = std::sqrt(diffsize);
98  if (diffsqrt*diffsqrt == diffsize)
99  {
100  std::vector<std::vector<unsigned int>> vv(2);
101  for (int i=-1; i != diffsqrt; ++i)
102  vv[0].push_back(d);
103  vv[1].push_back(d);
104  data.emplace(d, vv);
105  vv.resize(1);
106  vv[0].resize(1);
107  data.emplace(d, vv);
108  }
109  }
110  }
processor_id_type rank() const
Definition: communicator.h:208
Communicator * TestCommWorld

◆ main()

int main ( int  argc,
const char *const *  argv 
)

Definition at line 741 of file parallel_sync_unit.C.

References TIMPI::Communicator::ALLTOALL_COUNTS, TIMPI::TIMPIInit::comm(), run_tests(), TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::sync_type(), and TestCommWorld.

742 {
743  TIMPI::TIMPIInit init(argc, argv);
744  TestCommWorld = &init.comm();
745 
746  run_tests();
747 
748  TestCommWorld->sync_type(Communicator::ALLTOALL_COUNTS);
749  run_tests();
750 
751  TestCommWorld->sync_type(Communicator::SENDRECEIVE);
752  run_tests();
753 
754  return 0;
755 }
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
The TIMPIInit class, when constructed, initializes any dependent libraries (e.g.
Definition: timpi_init.h:57
void run_tests()
Communicator * TestCommWorld

◆ run_tests()

void run_tests ( )

Definition at line 715 of file parallel_sync_unit.C.

References testEmptyEntry(), testPull(), testPullOversized(), testPullVecVec(), testPullVecVecOversized(), testPush(), testPushMove(), testPushMultimap(), testPushMultimapOversized(), testPushMultimapVecVec(), testPushMultimapVecVecOversized(), testPushOversized(), testPushVecVec(), testPushVecVecOversized(), and testStringSyncType().

Referenced by main().

716 {
717  testPush();
718  testPushMove();
719  testPull();
720  testPushVecVec();
721  testPullVecVec();
724 
725  testEmptyEntry();
726 
727  // Our sync functions need to support sending to ranks that don't
728  // exist! If we're on N processors but working on a mesh
729  // partitioned into M parts with M > N, then subpartition p belongs
730  // to processor p%N. Let's make M > N for these tests.
737 
739 }
void testPullVecVec()
void testPushMultimapVecVec()
void testPush()
void testPull()
void testPullOversized()
void testPullVecVecOversized()
void testPushMultimapVecVecOversized()
void testPushMultimapOversized()
void testPushOversized()
void testEmptyEntry()
void testPushVecVec()
void testPushMove()
void testPushVecVecOversized()
void testStringSyncType()
void testPushMultimap()

◆ testEmptyEntry()

void testEmptyEntry ( )

Definition at line 626 of file parallel_sync_unit.C.

References fill_scalar_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::size(), and TestCommWorld.

Referenced by run_tests().

627  {
628  const int size = TestCommWorld->size(),
629  rank = TestCommWorld->rank();
630  const int M=size;
631 
632  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
633 
634  fill_scalar_data(data, M);
635 
636  // Give some processors empty sends
637  for (int i=0; i != M; ++i)
638  if (!(rank % 3))
639  data[i];
640 
641  auto collect_data =
642  [&received_data]
643  (processor_id_type pid,
644  const typename std::vector<unsigned int> & vec_received)
645  {
646  // Even if we send them we shouldn't get them
647  TIMPI_UNIT_ASSERT (!vec_received.empty())
648 
649  auto & vec = received_data[pid];
650  vec.insert(vec.end(), vec_received.begin(), vec_received.end());
651  };
652 
653 #ifndef NDEBUG
654  bool caught_exception = false;
655  try {
656 #endif
657  TIMPI::push_parallel_vector_data(*TestCommWorld, data, collect_data);
658 #ifndef NDEBUG
659  }
660  catch (std::logic_error & e) {
661  caught_exception = true;
662  std::regex msg_regex("empty data");
663  TIMPI_UNIT_ASSERT(std::regex_search(e.what(), msg_regex));
664  }
665 
666  // We didn't leave room for empty sends in the 2 processor case
667  if (M > 2)
668  TIMPI_UNIT_ASSERT(caught_exception);
669 #endif
670 
671  // Test the received results, for each processor id p we're in
672  // charge of.
673  std::vector<std::size_t> checked_sizes(size, 0);
674  for (int p=rank; p < M; p += size)
675  for (int srcp=0; srcp != size; ++srcp)
676  {
677  int diffsize = std::abs(srcp-p);
678  int diffsqrt = std::sqrt(diffsize);
679  if (diffsqrt*diffsqrt != diffsize)
680  {
681  if (received_data.count(srcp))
682  {
683  const std::vector<unsigned int> & datum = received_data[srcp];
684  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(0));
685  }
686  continue;
687  }
688 
689  TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1));
690  const std::vector<unsigned int> & datum = received_data[srcp];
691  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(diffsqrt+1));
692  checked_sizes[srcp] += diffsqrt+1;
693  }
694 
695  for (int srcp=0; srcp != size; ++srcp)
696  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size());
697  }
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)

◆ testPull()

void testPull ( )

Definition at line 283 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPullImpl().

Referenced by run_tests().

284  {
286  }
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld
void testPullImpl(int M)

◆ testPullImpl()

void testPullImpl ( int  M)

Definition at line 225 of file parallel_sync_unit.C.

References fill_scalar_data(), TIMPI::pull_parallel_vector_data(), TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::size(), TIMPI::Communicator::sync_type(), and TestCommWorld.

Referenced by testPull(), and testPullOversized().

226  {
227  // Oversized pulls are well-defined with NBX and ALLTOALL_COUNTS
228  // because of C++11's guarantees regarding preservation of insert
229  // ordering in multimaps, combined with MPI's guarantees about
230  // non-overtaking ... but we do receives in a different order with
231  // SENDRECEIVE mode, so let's skip oversized test there.
232  if (TestCommWorld->sync_type() == Communicator::SENDRECEIVE &&
233  M > int(TestCommWorld->size()))
234  return;
235 
236  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
237 
238  fill_scalar_data(data, M);
239 
240  auto compose_replies =
241  []
242  (processor_id_type /* pid */,
243  const std::vector<unsigned int> & query,
244  std::vector<unsigned int> & response)
245  {
246  const std::size_t query_size = query.size();
247  response.resize(query_size);
248  for (unsigned int i=0; i != query_size; ++i)
249  response[i] = query[i]*query[i];
250  };
251 
252 
253  auto collect_replies =
254  [&received_data]
255  (processor_id_type pid,
256  const std::vector<unsigned int> & query,
257  const std::vector<unsigned int> & response)
258  {
259  const std::size_t query_size = query.size();
260  TIMPI_UNIT_ASSERT(query_size == response.size());
261  for (unsigned int i=0; i != query_size; ++i)
262  {
263  TIMPI_UNIT_ASSERT(query[i]*query[i] == response[i]);
264  }
265  received_data[pid] = response;
266  };
267 
268  // Do the pull
269  unsigned int * ex = nullptr;
271  (*TestCommWorld, data, compose_replies, collect_replies, ex);
272 
273  // Test the received results, for each query we sent.
274  for (int p=0; p != M; ++p)
275  {
276  TIMPI_UNIT_ASSERT(data[p].size() == received_data[p].size());
277  for (std::size_t i = 0; i != data[p].size(); ++i)
278  TIMPI_UNIT_ASSERT(data[p][i]*data[p][i] == received_data[p][i]);
279  }
280  }
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
processor_id_type size() const
Definition: communicator.h:211
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)

◆ testPullOversized()

void testPullOversized ( )

Definition at line 289 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPullImpl().

Referenced by run_tests().

290  {
291  testPullImpl((TestCommWorld->size() + 4) * 2);
292  }
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld
void testPullImpl(int M)

◆ testPullVecVec()

void testPullVecVec ( )

Definition at line 432 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPullVecVecImpl().

Referenced by run_tests().

433  {
435  }
void testPullVecVecImpl(int M)
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld

◆ testPullVecVecImpl()

void testPullVecVecImpl ( int  M)

Definition at line 360 of file parallel_sync_unit.C.

References fill_vector_data(), TIMPI::pull_parallel_vector_data(), TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::size(), TIMPI::Communicator::sync_type(), and TestCommWorld.

Referenced by testPullVecVec().

361  {
362  // Oversized pulls are well-defined with NBX and ALLTOALL_COUNTS
363  // because of C++11's guarantees regarding preservation of insert
364  // ordering in multimaps, combined with MPI's guarantees about
365  // non-overtaking ... but we do receives in a different order with
366  // SENDRECEIVE mode, so let's skip oversized test there.
367  if (TestCommWorld->sync_type() == Communicator::SENDRECEIVE &&
368  M > int(TestCommWorld->size()))
369  return;
370 
371  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
372  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> received_data;
373 
374  fill_vector_data(data, M);
375 
376  auto compose_replies =
377  []
378  (processor_id_type /* pid */,
379  const std::vector<std::vector<unsigned int>> & query,
380  std::vector<std::vector<unsigned int>> & response)
381  {
382  const std::size_t query_size = query.size();
383  response.resize(query_size);
384  for (unsigned int i=0; i != query_size; ++i)
385  {
386  const std::size_t query_i_size = query[i].size();
387  response[i].resize(query_i_size);
388  for (unsigned int j=0; j != query_i_size; ++j)
389  response[i][j] = query[i][j]*query[i][j];
390  }
391  };
392 
393 
394  auto collect_replies =
395  [&received_data]
396  (processor_id_type pid,
397  const std::vector<std::vector<unsigned int>> & query,
398  const std::vector<std::vector<unsigned int>> & response)
399  {
400  const std::size_t query_size = query.size();
401  TIMPI_UNIT_ASSERT(query_size == response.size());
402  for (unsigned int i=0; i != query_size; ++i)
403  {
404  const std::size_t query_i_size = query[i].size();
405  TIMPI_UNIT_ASSERT(query_i_size == response[i].size());
406  for (unsigned int j=0; j != query_i_size; ++j)
407  TIMPI_UNIT_ASSERT(query[i][j]*query[i][j] == response[i][j]);
408  }
409  auto & vec = received_data[pid];
410  vec.emplace_back(response[0].begin(), response[0].end());
411  TIMPI_UNIT_ASSERT(response[1].size() == std::size_t(1));
412  TIMPI_UNIT_ASSERT(response[1][0] == response[0][0]);
413  vec.emplace_back(response[1].begin(), response[1].end());
414  };
415 
416  // Do the pull
417  std::vector<unsigned int> * ex = nullptr;
419  (*TestCommWorld, data, compose_replies, collect_replies, ex);
420 
421  // Test the received results, for each query we sent.
422  for (int p=0; p != M; ++p)
423  {
424  TIMPI_UNIT_ASSERT(data[p].size() == received_data[p].size());
425  for (std::size_t i = 0; i != data[p].size(); ++i)
426  for (std::size_t j = 0; j != data[p][i].size(); ++j)
427  TIMPI_UNIT_ASSERT(data[p][i][j]*data[p][i][j] == received_data[p][i][j]);
428  }
429  }
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, GatherFunctor &gather_data, const ActionFunctor &act_on_data, const datum *example)
Send query vectors, receive and answer them with vectors of data, then act on those answers...
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
processor_id_type size() const
Definition: communicator.h:211
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_vector_data(std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)

◆ testPullVecVecOversized()

void testPullVecVecOversized ( )

Definition at line 438 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushVecVecImpl().

Referenced by run_tests().

439  {
440  testPushVecVecImpl((TestCommWorld->size() + 4) * 2);
441  }
void testPushVecVecImpl(int M)
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld

◆ testPush()

void testPush ( )

Definition at line 162 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushImpl().

Referenced by run_tests().

163  {
165  }
processor_id_type size() const
Definition: communicator.h:211
void testPushImpl(int M)
Communicator * TestCommWorld

◆ testPushImpl()

void testPushImpl ( int  M)

Definition at line 113 of file parallel_sync_unit.C.

References fill_scalar_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::size(), and TestCommWorld.

Referenced by testPush(), and testPushOversized().

114  {
115  const int size = TestCommWorld->size(),
116  rank = TestCommWorld->rank();
117 
118  std::map<processor_id_type, std::vector<unsigned int> > data, received_data;
119 
120  fill_scalar_data(data, M);
121 
122  auto collect_data =
123  [&received_data]
124  (processor_id_type pid,
125  const typename std::vector<unsigned int> & vec_received)
126  {
127  auto & vec = received_data[pid];
128  vec.insert(vec.end(), vec_received.begin(), vec_received.end());
129  };
130 
131  TIMPI::push_parallel_vector_data(*TestCommWorld, data, collect_data);
132 
133  // Test the received results, for each processor id p we're in
134  // charge of.
135  std::vector<std::size_t> checked_sizes(size, 0);
136  for (int p=rank; p < M; p += size)
137  for (int srcp=0; srcp != size; ++srcp)
138  {
139  int diffsize = std::abs(srcp-p);
140  int diffsqrt = std::sqrt(diffsize);
141  if (diffsqrt*diffsqrt != diffsize)
142  {
143  if (received_data.count(srcp))
144  {
145  const std::vector<unsigned int> & datum = received_data[srcp];
146  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(0));
147  }
148  continue;
149  }
150 
151  TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1));
152  const std::vector<unsigned int> & datum = received_data[srcp];
153  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(diffsqrt+1));
154  checked_sizes[srcp] += diffsqrt+1;
155  }
156 
157  for (int srcp=0; srcp != size; ++srcp)
158  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size());
159  }
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)

◆ testPushMove()

void testPushMove ( )

Definition at line 174 of file parallel_sync_unit.C.

References fill_scalar_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::size(), and TestCommWorld.

Referenced by run_tests().

175  {
176  const int size = TestCommWorld->size(),
177  rank = TestCommWorld->rank();
178 
179  std::map<processor_id_type, std::vector<unsigned int>> data, received_data;
180 
181  const int M = TestCommWorld->size();
182  fill_scalar_data(data, M);
183 
184  auto collect_data =
185  [&received_data]
186  (processor_id_type pid,
187  typename std::vector<unsigned int> && vec_received)
188  {
189  auto & vec = received_data[pid];
190  for (auto & val : vec_received)
191  vec.emplace_back(std::move(val));
192  };
193 
194  TIMPI::push_parallel_vector_data(*TestCommWorld, std::move(data), collect_data);
195 
196  // Test the received results, for each processor id p we're in
197  // charge of.
198  std::vector<std::size_t> checked_sizes(size, 0);
199  for (int p=rank; p < M; p += size)
200  for (int srcp=0; srcp != size; ++srcp)
201  {
202  int diffsize = std::abs(srcp-p);
203  int diffsqrt = std::sqrt(diffsize);
204  if (diffsqrt*diffsqrt != diffsize)
205  {
206  if (received_data.count(srcp))
207  {
208  const std::vector<unsigned int> & datum = received_data[srcp];
209  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(0));
210  }
211  continue;
212  }
213 
214  TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1));
215  const std::vector<unsigned int> & datum = received_data[srcp];
216  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(diffsqrt+1));
217  checked_sizes[srcp] += diffsqrt+1;
218  }
219 
220  for (int srcp=0; srcp != size; ++srcp)
221  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size());
222  }
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)

◆ testPushMultimap()

void testPushMultimap ( )

Definition at line 517 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushMultimapImpl().

Referenced by run_tests().

518  {
520  }
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld
void testPushMultimapImpl(int M)

◆ testPushMultimapImpl()

void testPushMultimapImpl ( int  M)

Definition at line 444 of file parallel_sync_unit.C.

References fill_scalar_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::size(), TIMPI::Communicator::sync_type(), and TestCommWorld.

Referenced by testPushMultimap(), and testPushMultimapOversized().

445  {
446  const int size = TestCommWorld->size(),
447  rank = TestCommWorld->rank();
448 
449  // This is going to be well-defined with NBX and ALLTOALL_COUNTS
450  // because of C++11's guarantees regarding preservation of insert
451  // ordering in multimaps, combined with MPI's guarantees about
452  // non-overtaking ... but we do receives in a different order with
453  // SENDRECEIVE mode, so let's skip this test there.
454  if (TestCommWorld->sync_type() == Communicator::SENDRECEIVE)
455  return;
456 
457  std::multimap<processor_id_type, std::vector<unsigned int> > data, received_data;
458 
459  fill_scalar_data(data, M);
460 
461  auto collect_data =
462  [&received_data]
463  (processor_id_type pid,
464  const typename std::vector<unsigned int> & vec_received)
465  {
466  received_data.emplace(pid, vec_received);
467  };
468 
469  TIMPI::push_parallel_vector_data(*TestCommWorld, data, collect_data);
470 
471  // Test the received results, for each processor id p we're in
472  // charge of.
473  std::vector<std::size_t> checked_sizes(size, 0);
474  for (int p=rank; p < M; p += size)
475  for (int srcp=0; srcp != size; ++srcp)
476  {
477  int diffsize = std::abs(srcp-p);
478  int diffsqrt = std::sqrt(diffsize);
479  auto rng = received_data.equal_range(srcp);
480  if (diffsqrt*diffsqrt != diffsize)
481  {
482  for (auto pv_it = rng.first; pv_it != rng.second; ++pv_it)
483  {
484  TIMPI_UNIT_ASSERT(std::count(pv_it->second.begin(), pv_it->second.end(), p) == std::ptrdiff_t(0));
485  }
486  continue;
487  }
488 
489  TIMPI_UNIT_ASSERT(rng.first != rng.second);
490  for (auto pv_it = rng.first; pv_it != rng.second; ++pv_it)
491  {
492  std::ptrdiff_t cnt = std::count(pv_it->second.begin(), pv_it->second.end(), p);
493  if (cnt)
494  {
495  TIMPI_UNIT_ASSERT(cnt == std::ptrdiff_t(diffsqrt+1));
496  auto pv_it2 = pv_it; ++pv_it2;
497  TIMPI_UNIT_ASSERT(pv_it2 != rng.second);
498  std::ptrdiff_t cnt2 = std::count(pv_it2->second.begin(), pv_it2->second.end(), p);
499  TIMPI_UNIT_ASSERT(cnt2 == std::ptrdiff_t(1));
500  checked_sizes[srcp] += cnt + cnt2;
501  break;
502  }
503  }
504  }
505 
506  for (int srcp=0; srcp != size; ++srcp)
507  {
508  std::size_t total_size = 0;
509  auto rng = received_data.equal_range(srcp);
510  for (auto pv_it = rng.first; pv_it != rng.second; ++pv_it)
511  total_size += pv_it->second.size();
512  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == total_size);
513  }
514  }
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_scalar_data(std::map< processor_id_type, std::vector< unsigned int >> &data, int M)

◆ testPushMultimapOversized()

void testPushMultimapOversized ( )

Definition at line 523 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushMultimapImpl().

Referenced by run_tests().

524  {
526  }
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld
void testPushMultimapImpl(int M)

◆ testPushMultimapVecVec()

void testPushMultimapVecVec ( )

Definition at line 612 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushMultimapVecVecImpl().

Referenced by run_tests().

613  {
615  }
processor_id_type size() const
Definition: communicator.h:211
void testPushMultimapVecVecImpl(int M)
Communicator * TestCommWorld

◆ testPushMultimapVecVecImpl()

void testPushMultimapVecVecImpl ( int  M)

Definition at line 529 of file parallel_sync_unit.C.

References fill_vector_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::size(), TIMPI::Communicator::sync_type(), and TestCommWorld.

Referenced by testPushMultimapVecVec(), and testPushMultimapVecVecOversized().

530  {
531  const int size = TestCommWorld->size(),
532  rank = TestCommWorld->rank();
533 
534  // This is going to be well-defined with NBX and ALLTOALL_COUNTS
535  // because of C++11's guarantees regarding preservation of insert
536  // ordering in multimaps, combined with MPI's guarantees about
537  // non-overtaking ... but we do receives in a different order with
538  // SENDRECEIVE mode, so let's skip this test there.
539  if (TestCommWorld->sync_type() == Communicator::SENDRECEIVE)
540  return;
541 
542  std::multimap<processor_id_type, std::vector<std::vector<unsigned int>>> data, received_data;
543 
544  fill_vector_data(data, M);
545 
546  auto collect_data =
547  [&received_data]
548  (processor_id_type pid,
549  const typename std::vector<std::vector<unsigned int>> & vecvec_received)
550  {
551  received_data.emplace(pid, vecvec_received);
552  };
553 
554  TIMPI::push_parallel_vector_data(*TestCommWorld, data, collect_data);
555 
556  // Test the received results, for each processor id p we're in
557  // charge of.
558  std::vector<std::size_t> checked_sizes(size, 0);
559  for (int p=rank; p < M; p += size)
560  for (int srcp=0; srcp != size; ++srcp)
561  {
562  int diffsize = std::abs(srcp-p);
563  int diffsqrt = std::sqrt(diffsize);
564  auto rng = received_data.equal_range(srcp);
565  if (diffsqrt*diffsqrt != diffsize)
566  {
567  for (auto pvv = rng.first; pvv != rng.second; ++pvv)
568  {
569  for (auto & v : pvv->second)
570  TIMPI_UNIT_ASSERT(std::count(v.begin(), v.end(), p) == std::ptrdiff_t(0));
571  }
572  continue;
573  }
574 
575  TIMPI_UNIT_ASSERT(rng.first != rng.second);
576  for (auto pvv_it = rng.first; pvv_it != rng.second; ++pvv_it)
577  {
578  if(pvv_it->second.size() != std::size_t(2))
579  timpi_error();
580  TIMPI_UNIT_ASSERT(pvv_it->second.size() == std::size_t(2));
581  std::ptrdiff_t cnt = std::count(pvv_it->second[0].begin(), pvv_it->second[0].end(), p);
582  if (cnt)
583  {
584  TIMPI_UNIT_ASSERT(cnt == std::ptrdiff_t(diffsqrt+1));
585  std::ptrdiff_t cnt2 = std::count(pvv_it->second[1].begin(), pvv_it->second[1].end(), p);
586  TIMPI_UNIT_ASSERT(cnt2 == std::ptrdiff_t(1));
587  auto pvv_it2 = pvv_it; ++pvv_it2;
588  TIMPI_UNIT_ASSERT(pvv_it2 != rng.second);
589  TIMPI_UNIT_ASSERT(pvv_it2->second.size() == std::size_t(1));
590  std::ptrdiff_t cnt3 = std::count(pvv_it2->second[0].begin(), pvv_it2->second[0].end(), p);
591  TIMPI_UNIT_ASSERT(cnt3 == std::ptrdiff_t(1));
592  checked_sizes[srcp] += cnt + cnt2 + cnt3;
593  break;
594  }
595  ++pvv_it;
596  timpi_assert(pvv_it != rng.second);
597  }
598  }
599 
600  for (int srcp=0; srcp != size; ++srcp)
601  {
602  std::size_t total_size = 0;
603  auto rng = received_data.equal_range(srcp);
604  for (auto pvv = rng.first; pvv != rng.second; ++pvv)
605  for (auto & v : pvv->second)
606  total_size += v.size();
607  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == total_size);
608  }
609  }
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_vector_data(std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)

◆ testPushMultimapVecVecOversized()

void testPushMultimapVecVecOversized ( )

Definition at line 618 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushMultimapVecVecImpl().

Referenced by run_tests().

619  {
621  }
processor_id_type size() const
Definition: communicator.h:211
void testPushMultimapVecVecImpl(int M)
Communicator * TestCommWorld

◆ testPushOversized()

void testPushOversized ( )

Definition at line 168 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushImpl().

Referenced by run_tests().

169  {
170  testPushImpl((TestCommWorld->size() + 4) * 2);
171  }
processor_id_type size() const
Definition: communicator.h:211
void testPushImpl(int M)
Communicator * TestCommWorld

◆ testPushVecVec()

void testPushVecVec ( )

Definition at line 348 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushVecVecImpl().

Referenced by run_tests().

349  {
351  }
void testPushVecVecImpl(int M)
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld

◆ testPushVecVecImpl()

void testPushVecVecImpl ( int  M)

Definition at line 295 of file parallel_sync_unit.C.

References fill_vector_data(), TIMPI::push_parallel_vector_data(), TIMPI::Communicator::rank(), TIMPI::Communicator::size(), and TestCommWorld.

Referenced by testPullVecVecOversized(), testPushVecVec(), and testPushVecVecOversized().

296  {
297  const int size = TestCommWorld->size(),
298  rank = TestCommWorld->rank();
299 
300  std::map<processor_id_type, std::vector<std::vector<unsigned int>>> data;
301  std::map<processor_id_type, std::vector<unsigned int>> received_data;
302 
303  fill_vector_data(data, M);
304 
305  auto collect_data =
306  [&received_data]
307  (processor_id_type pid,
308  const typename std::vector<std::vector<unsigned int>> & vecvec_received)
309  {
310  TIMPI_UNIT_ASSERT(vecvec_received.size() == std::size_t(2));
311  TIMPI_UNIT_ASSERT(vecvec_received[1].size() == std::size_t(1));
312  TIMPI_UNIT_ASSERT(vecvec_received[0][0] == vecvec_received[1][0]);
313  auto & vec = received_data[pid];
314  vec.insert(vec.end(), vecvec_received[0].begin(), vecvec_received[0].end());
315  };
316 
317  TIMPI::push_parallel_vector_data(*TestCommWorld, data, collect_data);
318 
319  // Test the received results, for each processor id p we're in
320  // charge of.
321  std::vector<std::size_t> checked_sizes(size, 0);
322  for (int p=rank; p < M; p += size)
323  for (int srcp=0; srcp != size; ++srcp)
324  {
325  int diffsize = std::abs(srcp-p);
326  int diffsqrt = std::sqrt(diffsize);
327  if (diffsqrt*diffsqrt != diffsize)
328  {
329  if (received_data.count(srcp))
330  {
331  const std::vector<unsigned int> & datum = received_data[srcp];
332  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(0));
333  }
334  continue;
335  }
336 
337  TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1));
338  const std::vector<unsigned int> & datum = received_data[srcp];
339  TIMPI_UNIT_ASSERT(std::count(datum.begin(), datum.end(), p) == std::ptrdiff_t(diffsqrt+1));
340  checked_sizes[srcp] += diffsqrt+1;
341  }
342 
343  for (int srcp=0; srcp != size; ++srcp)
344  TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size());
345  }
processor_id_type rank() const
Definition: communicator.h:208
processor_id_type size() const
Definition: communicator.h:211
void push_parallel_vector_data(const Communicator &comm, MapToVectors &&data, const ActionFunctor &act_on_data)
Send and receive and act on vectors of data.
uint8_t processor_id_type
Definition: communicator.h:54
Communicator * TestCommWorld
void fill_vector_data(std::map< processor_id_type, std::vector< std::vector< unsigned int >>> &data, int M)

◆ testPushVecVecOversized()

void testPushVecVecOversized ( )

Definition at line 354 of file parallel_sync_unit.C.

References TIMPI::Communicator::size(), TestCommWorld, and testPushVecVecImpl().

Referenced by run_tests().

355  {
356  testPushVecVecImpl((TestCommWorld->size() + 4) * 2);
357  }
void testPushVecVecImpl(int M)
processor_id_type size() const
Definition: communicator.h:211
Communicator * TestCommWorld

◆ testStringSyncType()

void testStringSyncType ( )

Definition at line 700 of file parallel_sync_unit.C.

References TIMPI::Communicator::ALLTOALL_COUNTS, TIMPI::Communicator::duplicate(), TIMPI::Communicator::NBX, TIMPI::Communicator::SENDRECEIVE, TIMPI::Communicator::sync_type(), and TestCommWorld.

Referenced by run_tests().

701  {
702  Communicator comm;
703  comm.duplicate(*TestCommWorld);
704 
705  comm.sync_type("nbx");
706  TIMPI_UNIT_ASSERT(comm.sync_type() == Communicator::NBX);
707 
708  comm.sync_type("sendreceive");
709  TIMPI_UNIT_ASSERT(comm.sync_type() == Communicator::SENDRECEIVE);
710 
711  comm.sync_type("alltoall");
712  TIMPI_UNIT_ASSERT(comm.sync_type() == Communicator::ALLTOALL_COUNTS);
713  }
void sync_type(const SyncType st)
Explicitly sets the SyncType used for sync operations.
Definition: communicator.h:343
Encapsulates the MPI_Comm object.
Definition: communicator.h:108
Communicator * TestCommWorld
void duplicate(const Communicator &comm)
Definition: communicator.C:137

Variable Documentation

◆ TestCommWorld

Communicator* TestCommWorld