parallel_sync.h
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2018 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 
20 #ifndef LIBMESH_PARALLEL_SYNC_H
21 #define LIBMESH_PARALLEL_SYNC_H
22 
23 // Local Includes
24 #include "libmesh/parallel.h"
25 
26 // C++ includes
27 #include <map>
28 #include <type_traits>
29 #include <vector>
30 
31 
32 namespace libMesh
33 {
34 
35 
36 
37 //--------------------------------------------------------------------------
38 namespace Parallel {
39 
40 //------------------------------------------------------------------------
61 template <typename MapToVectors,
62  typename RequestContainer,
63  typename ActionFunctor>
64 void push_parallel_vector_data(const Communicator & comm,
65  const MapToVectors & data,
66  RequestContainer & reqs,
67  ActionFunctor & act_on_data);
68 
69 
70 
86 template <typename MapToVectors,
87  typename ActionFunctor>
88 void push_parallel_vector_data(const Communicator & comm,
89  const MapToVectors & data,
90  ActionFunctor & act_on_data);
91 
92 
125 template <typename datum,
126  typename MapToVectors,
127  typename RequestContainer,
128  typename GatherFunctor,
129  typename ActionFunctor>
130 void pull_parallel_vector_data(const Communicator & comm,
131  const MapToVectors & queries,
132  RequestContainer & reqs,
133  GatherFunctor & gather_data,
134  ActionFunctor & act_on_data,
135  const datum * example);
136 
164 template <typename datum,
165  typename MapToVectors,
166  typename GatherFunctor,
167  typename ActionFunctor>
168 void pull_parallel_vector_data(const Communicator & comm,
169  const MapToVectors & queries,
170  GatherFunctor & gather_data,
171  ActionFunctor & act_on_data,
172  const datum * example);
173 
174 //------------------------------------------------------------------------
175 // Parallel function overloads
176 //
177 
178 /*
179  * A specialization for types that are harder to non-blocking receive.
180  */
181 template <template <typename, typename, typename ...> class MapType,
182  typename KeyType,
183  typename ValueType,
184  typename A1,
185  typename A2,
186  typename ... ExtraTypes,
187  typename RequestContainer,
188  typename ActionFunctor>
189 void push_parallel_vector_data(const Communicator & comm,
190  const MapType<processor_id_type, std::vector<std::vector<ValueType,A1>,A2>, ExtraTypes...> & data,
191  RequestContainer & reqs,
192  ActionFunctor & act_on_data);
193 
194 
195 
196 /*
197  * A specialization for types that are harder to non-blocking receive.
198  */
199 template <template <typename, typename, typename ...> class MapType,
200  typename KeyType,
201  typename ValueType,
202  typename A1,
203  typename A2,
204  typename ... ExtraTypes,
205  typename ActionFunctor>
206 void push_parallel_vector_data(const Communicator & comm,
207  const MapType<processor_id_type, std::vector<std::vector<ValueType,A1>,A2>, ExtraTypes...> & data,
208  ActionFunctor & act_on_data);
209 
210 /*
211  * A specialization for types that are harder to non-blocking receive.
212  */
213 template <typename datum,
214  typename A,
215  typename MapToVectors,
216  typename RequestContainer,
217  typename GatherFunctor,
218  typename ActionFunctor>
219 void pull_parallel_vector_data(const Communicator & comm,
220  const MapToVectors & queries,
221  RequestContainer & reqs,
222  GatherFunctor & gather_data,
223  ActionFunctor & act_on_data,
224  const std::vector<datum,A> * example);
225 
226 
227 
228 
229 
230 
231 
232 //------------------------------------------------------------------------
233 // Parallel members
234 //
235 
236 template <typename MapToVectors,
237  typename RequestContainer,
238  typename ActionFunctor>
240  const MapToVectors & data,
241  RequestContainer & reqs,
242  ActionFunctor & act_on_data)
243 {
244  // This function must be run on all processors at once
245  libmesh_parallel_only(comm);
246 
247  processor_id_type num_procs = comm.size();
248 
249  // Size of vectors to send to each procesor
250  std::vector<std::size_t> will_send_to(num_procs, 0);
251  processor_id_type num_sends = 0;
252  for (auto & datapair : data)
253  {
254  // Don't try to send anywhere that doesn't exist
255  libmesh_assert_less(datapair.first, num_procs);
256 
257  // Don't give us empty vectors to send
258  libmesh_assert_greater(datapair.second.size(), 0);
259 
260  will_send_to[datapair.first] = datapair.second.size();
261  num_sends++;
262  }
263 
264  // Tell everyone about where everyone will send to
265  comm.alltoall(will_send_to);
266 
267  // will_send_to now represents who we'll receive from
268  // give it a good name
269  auto & will_receive_from = will_send_to;
270 
271  // This function only works for "flat" data that we can pre-size
272  // receive buffers for: a map to vectors-of-standard-types, not e.g.
273  // vectors-of-vectors.
274  //
275  // Trying to instantiate a StandardType<T> gives us a compiler error
276  // where otherwise we would have had a runtime error.
277  //
278  // Creating a StandardType<T> manually also saves our APIs from
279  // having to do a bunch of automatic creations later.
280  //
281  // This object will be free'd before all non-blocking communications
282  // complete, but the MPI standard for MPI_Type_free specifies "Any
283  // communication that is currently using this datatype will
284  // complete normally." so we're cool.
285  typedef decltype(data.begin()->second.front()) ref_type;
286  typedef typename std::remove_reference<ref_type>::type nonref_type;
288 
289  // We'll grab a tag so we can overlap request sends and receives
290  // without confusing one for the other
291  MessageTag tag = comm.get_unique_tag(1225);
292 
293  MapToVectors received_data;
294 
295  // Post all of the sends, non-blocking
296  for (auto & datapair : data)
297  {
298  processor_id_type destid = datapair.first;
299  libmesh_assert_less(destid, num_procs);
300  auto & datum = datapair.second;
301 
302  // Just act on data if the user requested a send-to-self
303  if (destid == comm.rank())
304  act_on_data(destid, datum);
305  else
306  {
307  Request sendreq;
308  comm.send(destid, datum, datatype, sendreq, tag);
309  reqs.insert(reqs.end(), sendreq);
310  }
311  }
312 
313  // Post all of the receives, non-blocking
314  std::vector<Request> receive_reqs;
315  std::vector<processor_id_type> receive_procids;
316  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
317  if (will_receive_from[proc_id] && proc_id != comm.rank())
318  {
319  Request req;
320  auto & incoming_data = received_data[proc_id];
321  incoming_data.resize(will_receive_from[proc_id]);
322  comm.receive(proc_id, incoming_data, datatype, req, tag);
323  receive_reqs.push_back(req);
324  receive_procids.push_back(proc_id);
325  }
326 
327  while(receive_reqs.size())
328  {
329  std::size_t completed = waitany(receive_reqs);
330  processor_id_type proc_id = receive_procids[completed];
331  receive_reqs.erase(receive_reqs.begin() + completed);
332  receive_procids.erase(receive_procids.begin() + completed);
333 
334  act_on_data(proc_id, received_data[proc_id]);
335  received_data.erase(proc_id);
336  }
337 }
338 
339 
340 
341 template <template <typename, typename, typename ...> class MapType,
342  typename ValueType,
343  typename A1,
344  typename A2,
345  typename ... ExtraTypes,
346  typename RequestContainer,
347  typename ActionFunctor>
349  const MapType<processor_id_type, std::vector<std::vector<ValueType,A1>,A2>, ExtraTypes...> & data,
350  RequestContainer & reqs,
351  ActionFunctor & act_on_data)
352 {
353  // This function must be run on all processors at once
354  libmesh_parallel_only(comm);
355 
356  processor_id_type num_procs = comm.size();
357 
358  // Size of vectors to send to each procesor
359  std::vector<std::size_t> will_send_to(num_procs, 0);
360  processor_id_type num_sends = 0;
361  for (auto & datapair : data)
362  {
363  // Don't try to send anywhere that doesn't exist
364  libmesh_assert_less(datapair.first, num_procs);
365 
366  // Don't give us empty vectors to send
367  libmesh_assert_greater(datapair.second.size(), 0);
368 
369  will_send_to[datapair.first] = datapair.second.size();
370  num_sends++;
371  }
372 
373  // Tell everyone about where everyone will send to
374  comm.alltoall(will_send_to);
375 
376  // will_send_to now represents who we'll receive from
377  // give it a good name
378  auto & will_receive_from = will_send_to;
379 
380  processor_id_type n_receives = 0;
381  for (processor_id_type proc_id = 0; proc_id < num_procs; proc_id++)
382  if (will_receive_from[proc_id])
383  n_receives++;
384 
385  // We'll construct a datatype once for repeated use
386  StandardType<ValueType> datatype;
387 
388  // We'll grab a tag so we can overlap request sends and receives
389  // without confusing one for the other
390  MessageTag tag = comm.get_unique_tag(1225);
391 
392  // Post all of the sends, non-blocking
393  for (auto & datapair : data)
394  {
395  processor_id_type destid = datapair.first;
396  libmesh_assert_less(destid, num_procs);
397  auto & datum = datapair.second;
398 
399  // Just act on data if the user requested a send-to-self
400  if (destid == comm.rank())
401  {
402  act_on_data(destid, datum);
403  n_receives--;
404  }
405  else
406  {
407  Request sendreq;
408  comm.send(destid, datum, datatype, sendreq, tag);
409  reqs.insert(reqs.end(), sendreq);
410  }
411  }
412 
413  // Post all of the receives.
414  //
415  // Use blocking API here since we can't use the pre-sized
416  // non-blocking APIs with this data type.
417  //
418  // FIXME - implement Derek's API from #1684, switch to that!
419  for (processor_id_type i = 0; i != n_receives; ++i)
420  {
421  Status stat(comm.probe(any_source, tag));
422  const processor_id_type
423  proc_id = cast_int<processor_id_type>(stat.source());
424 
425  std::vector<std::vector<ValueType,A1>,A2> received_data;
426  comm.receive(proc_id, received_data, datatype, tag);
427  act_on_data(proc_id, received_data);
428  }
429 }
430 
431 
432 
433 template <typename MapToVectors,
434  typename ActionFunctor>
436  const MapToVectors & data,
437  ActionFunctor & act_on_data)
438 {
439  std::vector<Request> requests;
440 
441  push_parallel_vector_data(comm, data, requests, act_on_data);
442 
443  wait(requests);
444 }
445 
446 
447 
448 template <template <typename, typename, typename ...> class MapType,
449  typename ValueType,
450  typename A1,
451  typename A2,
452  typename ... ExtraTypes,
453  typename ActionFunctor>
455  const MapType<processor_id_type, std::vector<std::vector<ValueType,A1>,A2>, ExtraTypes...> & data,
456  ActionFunctor & act_on_data)
457 {
458  std::vector<Request> requests;
459 
460  push_parallel_vector_data(comm, data, requests, act_on_data);
461 
462  wait(requests);
463 }
464 
465 
466 
467 template <typename datum,
468  typename MapToVectors,
469  typename RequestContainer,
470  typename GatherFunctor,
471  typename ActionFunctor>
473  const MapToVectors & queries,
474  RequestContainer & reqs,
475  GatherFunctor & gather_data,
476  ActionFunctor & act_on_data,
477  const datum *)
478 {
479  typedef typename MapToVectors::mapped_type query_type;
480 
481  std::map<processor_id_type, std::vector<datum> >
482  response_data, received_data;
483  std::vector<Request> response_reqs;
484 
485  StandardType<datum> datatype;
486 
487  // We'll grab a tag so we can overlap request sends and receives
488  // without confusing one for the other
489  MessageTag tag = comm.get_unique_tag(105);
490 
491  auto gather_functor =
492  [&comm, &gather_data, &response_data, &response_reqs, &datatype, &tag]
493  (processor_id_type pid, query_type query)
494  {
495  gather_data(pid, query, response_data[pid]);
496  libmesh_assert_equal_to(query.size(), response_data[pid].size());
497 
498  // Just act on data later if the user requested a send-to-self
499  if (pid != comm.rank())
500  {
501  Request sendreq;
502  comm.send(pid, response_data[pid], datatype, sendreq, tag);
503  response_reqs.push_back(sendreq);
504  }
505  };
506 
507  push_parallel_vector_data (comm, queries, reqs, gather_functor);
508 
509  // Every outgoing query should now have an incoming response.
510  // Post all of the receives, non-blocking
511  std::vector<Request> receive_reqs;
512  std::vector<processor_id_type> receive_procids;
513  for (auto & querypair : queries)
514  {
515  processor_id_type proc_id = querypair.first;
516  libmesh_assert_less(proc_id, comm.size());
517 
518  if (proc_id == comm.rank())
519  {
520  libmesh_assert(queries.count(proc_id));
521  libmesh_assert_equal_to(queries.at(proc_id).size(),
522  response_data.at(proc_id).size());
523  act_on_data(proc_id, queries.at(proc_id), response_data.at(proc_id));
524  }
525  else
526  {
527  auto & querydata = querypair.second;
528  Request req;
529  auto & incoming_data = received_data[proc_id];
530  incoming_data.resize(querydata.size());
531  comm.receive(proc_id, incoming_data, datatype, req, tag);
532  receive_reqs.push_back(req);
533  receive_procids.push_back(proc_id);
534  }
535  }
536 
537  while(receive_reqs.size())
538  {
539  std::size_t completed = waitany(receive_reqs);
540  processor_id_type proc_id = receive_procids[completed];
541  receive_reqs.erase(receive_reqs.begin() + completed);
542  receive_procids.erase(receive_procids.begin() + completed);
543 
544  libmesh_assert(queries.count(proc_id));
545  libmesh_assert_equal_to(queries.at(proc_id).size(),
546  received_data[proc_id].size());
547  act_on_data(proc_id, queries.at(proc_id), received_data[proc_id]);
548  received_data.erase(proc_id);
549  }
550 
551  wait(response_reqs);
552 }
553 
554 
555 template <typename datum,
556  typename MapToVectors,
557  typename GatherFunctor,
558  typename ActionFunctor>
560  const MapToVectors & queries,
561  GatherFunctor & gather_data,
562  ActionFunctor & act_on_data,
563  const datum * example)
564 {
565  std::vector<Request> requests;
566 
567  pull_parallel_vector_data(comm, queries, requests, gather_data,
568  act_on_data, example);
569 
570  wait(requests);
571 }
572 
573 
574 template <typename datum,
575  typename A,
576  typename MapToVectors,
577  typename RequestContainer,
578  typename GatherFunctor,
579  typename ActionFunctor>
581  const MapToVectors & queries,
582  RequestContainer & reqs,
583  GatherFunctor & gather_data,
584  ActionFunctor & act_on_data,
585  const std::vector<datum,A> *)
586 {
587  typedef typename MapToVectors::mapped_type query_type;
588 
589  std::map<processor_id_type, std::vector<std::vector<datum,A>>>
590  response_data;
591  std::vector<Request> response_reqs;
592 
593  // We'll grab a tag so we can overlap request sends and receives
594  // without confusing one for the other
595  MessageTag tag = comm.get_unique_tag(105);
596 
597  auto gather_functor =
598  [&comm, &gather_data, &act_on_data,
599  &response_data, &response_reqs, &tag]
600  (processor_id_type pid, query_type query)
601  {
602  gather_data(pid, query, response_data[pid]);
603  libmesh_assert_equal_to(query.size(),
604  response_data[pid].size());
605 
606  // Just act on data if the user requested a send-to-self
607  if (pid == comm.rank())
608  {
609  act_on_data(pid, query, response_data[pid]);
610  }
611  else
612  {
613  Request sendreq;
614  comm.send(pid, response_data[pid], sendreq, tag);
615  response_reqs.push_back(sendreq);
616  }
617  };
618 
619  push_parallel_vector_data (comm, queries, reqs, gather_functor);
620 
621  // Every outgoing query should now have an incoming response.
622  //
623  // Post all of the receives.
624  //
625  // Use blocking API here since we can't use the pre-sized
626  // non-blocking APIs with this data type.
627  //
628  // FIXME - implement Derek's API from #1684, switch to that!
629  std::vector<Request> receive_reqs;
630  std::vector<processor_id_type> receive_procids;
631  for (std::size_t i = 0,
632  n_queries = queries.size() - queries.count(comm.rank());
633  i != n_queries; ++i)
634  {
635  Status stat(comm.probe(any_source, tag));
636  const processor_id_type
637  proc_id = cast_int<processor_id_type>(stat.source());
638 
639  std::vector<std::vector<datum,A>> received_data;
640  comm.receive(proc_id, received_data, tag);
641 
642  libmesh_assert(queries.count(proc_id));
643  auto & querydata = queries.at(proc_id);
644  libmesh_assert_equal_to(querydata.size(), received_data.size());
645  act_on_data(proc_id, querydata, received_data);
646  }
647 
648  wait(response_reqs);
649 }
650 
651 
652 } // namespace Parallel
653 
654 
655 } // namespace libMesh
656 
657 #endif // LIBMESH_PARALLEL_SYNC_H
void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
void wait(std::vector< Request > &r)
Definition: request.C:213
const unsigned int any_source
Definition: communicator.h:70
processor_id_type size() const
Definition: communicator.h:175
uint8_t processor_id_type
Definition: id_types.h:99
void alltoall(std::vector< T, A > &r) const
MessageTag get_unique_tag(int tagvalue) const
Definition: communicator.C:201
processor_id_type rank() const
Definition: communicator.h:173
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, RequestContainer &reqs, GatherFunctor &gather_data, ActionFunctor &act_on_data, const datum *example)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
query_obj query
std::size_t waitany(std::vector< Request > &r)
Definition: request.C:219
static PetscErrorCode Mat * A
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
void push_parallel_vector_data(const Communicator &comm, const MapToVectors &data, RequestContainer &reqs, ActionFunctor &act_on_data)
IterBase * data