20 #ifndef LIBMESH_PARALLEL_SYNC_H 21 #define LIBMESH_PARALLEL_SYNC_H 28 #include <type_traits> 61 template <
typename MapToVectors,
62 typename RequestContainer,
63 typename ActionFunctor>
65 const MapToVectors &
data,
66 RequestContainer & reqs,
67 ActionFunctor & act_on_data);
86 template <
typename MapToVectors,
87 typename ActionFunctor>
89 const MapToVectors &
data,
90 ActionFunctor & act_on_data);
125 template <
typename datum,
126 typename MapToVectors,
127 typename RequestContainer,
128 typename GatherFunctor,
129 typename ActionFunctor>
131 const MapToVectors & queries,
132 RequestContainer & reqs,
133 GatherFunctor & gather_data,
134 ActionFunctor & act_on_data,
135 const datum * example);
164 template <
typename datum,
165 typename MapToVectors,
166 typename GatherFunctor,
167 typename ActionFunctor>
169 const MapToVectors & queries,
170 GatherFunctor & gather_data,
171 ActionFunctor & act_on_data,
172 const datum * example);
181 template <
template <
typename,
typename,
typename ...>
class MapType,
186 typename ... ExtraTypes,
187 typename RequestContainer,
188 typename ActionFunctor>
191 RequestContainer & reqs,
192 ActionFunctor & act_on_data);
199 template <
template <
typename,
typename,
typename ...>
class MapType,
204 typename ... ExtraTypes,
205 typename ActionFunctor>
208 ActionFunctor & act_on_data);
213 template <
typename datum,
215 typename MapToVectors,
216 typename RequestContainer,
217 typename GatherFunctor,
218 typename ActionFunctor>
220 const MapToVectors & queries,
221 RequestContainer & reqs,
222 GatherFunctor & gather_data,
223 ActionFunctor & act_on_data,
224 const std::vector<datum,A> * example);
236 template <
typename MapToVectors,
237 typename RequestContainer,
238 typename ActionFunctor>
240 const MapToVectors &
data,
241 RequestContainer & reqs,
242 ActionFunctor & act_on_data)
245 libmesh_parallel_only(comm);
250 std::vector<std::size_t> will_send_to(num_procs, 0);
252 for (
auto & datapair :
data)
255 libmesh_assert_less(datapair.first, num_procs);
258 libmesh_assert_greater(datapair.second.size(), 0);
260 will_send_to[datapair.first] = datapair.second.size();
269 auto & will_receive_from = will_send_to;
285 typedef decltype(
data.begin()->second.front()) ref_type;
286 typedef typename std::remove_reference<ref_type>::type nonref_type;
293 MapToVectors received_data;
296 for (
auto & datapair :
data)
299 libmesh_assert_less(destid, num_procs);
300 auto & datum = datapair.second;
303 if (destid == comm.
rank())
304 act_on_data(destid, datum);
308 comm.
send(destid, datum, datatype, sendreq, tag);
309 reqs.insert(reqs.end(), sendreq);
314 std::vector<Request> receive_reqs;
315 std::vector<processor_id_type> receive_procids;
317 if (will_receive_from[proc_id] && proc_id != comm.
rank())
320 auto & incoming_data = received_data[proc_id];
321 incoming_data.resize(will_receive_from[proc_id]);
322 comm.
receive(proc_id, incoming_data, datatype, req, tag);
323 receive_reqs.push_back(req);
324 receive_procids.push_back(proc_id);
327 while(receive_reqs.size())
329 std::size_t completed =
waitany(receive_reqs);
331 receive_reqs.erase(receive_reqs.begin() + completed);
332 receive_procids.erase(receive_procids.begin() + completed);
334 act_on_data(proc_id, received_data[proc_id]);
335 received_data.erase(proc_id);
341 template <
template <
typename,
typename,
typename ...>
class MapType,
345 typename ... ExtraTypes,
346 typename RequestContainer,
347 typename ActionFunctor>
350 RequestContainer & reqs,
351 ActionFunctor & act_on_data)
354 libmesh_parallel_only(comm);
359 std::vector<std::size_t> will_send_to(num_procs, 0);
361 for (
auto & datapair :
data)
364 libmesh_assert_less(datapair.first, num_procs);
367 libmesh_assert_greater(datapair.second.size(), 0);
369 will_send_to[datapair.first] = datapair.second.size();
378 auto & will_receive_from = will_send_to;
382 if (will_receive_from[proc_id])
393 for (
auto & datapair :
data)
396 libmesh_assert_less(destid, num_procs);
397 auto & datum = datapair.second;
400 if (destid == comm.
rank())
402 act_on_data(destid, datum);
408 comm.
send(destid, datum, datatype, sendreq, tag);
409 reqs.insert(reqs.end(), sendreq);
423 proc_id = cast_int<processor_id_type>(stat.source());
425 std::vector<std::vector<ValueType,A1>,A2> received_data;
426 comm.
receive(proc_id, received_data, datatype, tag);
427 act_on_data(proc_id, received_data);
433 template <
typename MapToVectors,
434 typename ActionFunctor>
436 const MapToVectors &
data,
437 ActionFunctor & act_on_data)
439 std::vector<Request> requests;
448 template <
template <
typename,
typename,
typename ...>
class MapType,
452 typename ... ExtraTypes,
453 typename ActionFunctor>
456 ActionFunctor & act_on_data)
458 std::vector<Request> requests;
467 template <
typename datum,
468 typename MapToVectors,
469 typename RequestContainer,
470 typename GatherFunctor,
471 typename ActionFunctor>
473 const MapToVectors & queries,
474 RequestContainer & reqs,
475 GatherFunctor & gather_data,
476 ActionFunctor & act_on_data,
479 typedef typename MapToVectors::mapped_type query_type;
481 std::map<processor_id_type, std::vector<datum> >
482 response_data, received_data;
483 std::vector<Request> response_reqs;
491 auto gather_functor =
492 [&comm, &gather_data, &response_data, &response_reqs, &datatype, &tag]
495 gather_data(pid,
query, response_data[pid]);
496 libmesh_assert_equal_to(
query.size(), response_data[pid].size());
499 if (pid != comm.
rank())
502 comm.
send(pid, response_data[pid], datatype, sendreq, tag);
503 response_reqs.push_back(sendreq);
511 std::vector<Request> receive_reqs;
512 std::vector<processor_id_type> receive_procids;
513 for (
auto & querypair : queries)
516 libmesh_assert_less(proc_id, comm.
size());
518 if (proc_id == comm.
rank())
520 libmesh_assert(queries.count(proc_id));
521 libmesh_assert_equal_to(queries.at(proc_id).size(),
522 response_data.at(proc_id).size());
523 act_on_data(proc_id, queries.at(proc_id), response_data.at(proc_id));
527 auto & querydata = querypair.second;
529 auto & incoming_data = received_data[proc_id];
530 incoming_data.resize(querydata.size());
531 comm.
receive(proc_id, incoming_data, datatype, req, tag);
532 receive_reqs.push_back(req);
533 receive_procids.push_back(proc_id);
537 while(receive_reqs.size())
539 std::size_t completed =
waitany(receive_reqs);
541 receive_reqs.erase(receive_reqs.begin() + completed);
542 receive_procids.erase(receive_procids.begin() + completed);
544 libmesh_assert(queries.count(proc_id));
545 libmesh_assert_equal_to(queries.at(proc_id).size(),
546 received_data[proc_id].size());
547 act_on_data(proc_id, queries.at(proc_id), received_data[proc_id]);
548 received_data.erase(proc_id);
555 template <
typename datum,
556 typename MapToVectors,
557 typename GatherFunctor,
558 typename ActionFunctor>
560 const MapToVectors & queries,
561 GatherFunctor & gather_data,
562 ActionFunctor & act_on_data,
563 const datum * example)
565 std::vector<Request> requests;
568 act_on_data, example);
574 template <
typename datum,
576 typename MapToVectors,
577 typename RequestContainer,
578 typename GatherFunctor,
579 typename ActionFunctor>
581 const MapToVectors & queries,
582 RequestContainer & reqs,
583 GatherFunctor & gather_data,
584 ActionFunctor & act_on_data,
585 const std::vector<datum,A> *)
587 typedef typename MapToVectors::mapped_type query_type;
589 std::map<processor_id_type, std::vector<std::vector<datum,A>>>
591 std::vector<Request> response_reqs;
597 auto gather_functor =
598 [&comm, &gather_data, &act_on_data,
599 &response_data, &response_reqs, &tag]
602 gather_data(pid,
query, response_data[pid]);
603 libmesh_assert_equal_to(
query.size(),
604 response_data[pid].size());
607 if (pid == comm.
rank())
609 act_on_data(pid,
query, response_data[pid]);
614 comm.
send(pid, response_data[pid], sendreq, tag);
615 response_reqs.push_back(sendreq);
629 std::vector<Request> receive_reqs;
630 std::vector<processor_id_type> receive_procids;
631 for (std::size_t i = 0,
632 n_queries = queries.size() - queries.count(comm.
rank());
637 proc_id = cast_int<processor_id_type>(stat.source());
639 std::vector<std::vector<datum,A>> received_data;
640 comm.
receive(proc_id, received_data, tag);
642 libmesh_assert(queries.count(proc_id));
643 auto & querydata = queries.at(proc_id);
644 libmesh_assert_equal_to(querydata.size(), received_data.size());
645 act_on_data(proc_id, querydata, received_data);
657 #endif // LIBMESH_PARALLEL_SYNC_H void send(const unsigned int dest_processor_id, const T &buf, const MessageTag &tag=no_tag) const
void wait(std::vector< Request > &r)
const unsigned int any_source
processor_id_type size() const
uint8_t processor_id_type
void alltoall(std::vector< T, A > &r) const
MessageTag get_unique_tag(int tagvalue) const
processor_id_type rank() const
void pull_parallel_vector_data(const Communicator &comm, const MapToVectors &queries, RequestContainer &reqs, GatherFunctor &gather_data, ActionFunctor &act_on_data, const datum *example)
status probe(const unsigned int src_processor_id, const MessageTag &tag=any_tag) const
std::size_t waitany(std::vector< Request > &r)
static PetscErrorCode Mat * A
Status receive(const unsigned int dest_processor_id, T &buf, const MessageTag &tag=any_tag) const
void push_parallel_vector_data(const Communicator &comm, const MapToVectors &data, RequestContainer &reqs, ActionFunctor &act_on_data)