41 #include "Tpetra_Distributor.hpp" 42 #include "Tpetra_Details_gathervPrint.hpp" 43 #include "Teuchos_StandardParameterEntryValidators.hpp" 44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp" 52 if (sendType == DISTRIBUTOR_ISEND) {
55 else if (sendType == DISTRIBUTOR_RSEND) {
58 else if (sendType == DISTRIBUTOR_SEND) {
61 else if (sendType == DISTRIBUTOR_SSEND) {
65 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid " 66 "EDistributorSendType enum value " << sendType <<
".");
74 case Details::DISTRIBUTOR_NOT_INITIALIZED:
75 return "Not initialized yet";
76 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
77 return "By createFromSends";
78 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
79 return "By createFromRecvs";
80 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81 return "By createReverseDistributor";
82 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83 return "By copy constructor";
90 Teuchos::Array<std::string>
93 Teuchos::Array<std::string> sendTypes;
94 sendTypes.push_back (
"Isend");
95 sendTypes.push_back (
"Rsend");
96 sendTypes.push_back (
"Send");
97 sendTypes.push_back (
"Ssend");
107 const bool tpetraDistributorDebugDefault =
false;
109 const bool barrierBetween_default =
false;
111 const bool useDistinctTags_default =
true;
114 int Distributor::getTag (
const int pathTag)
const {
115 return useDistinctTags_ ? pathTag : comm_->getTag ();
119 #ifdef TPETRA_DISTRIBUTOR_TIMERS 120 void Distributor::makeTimers () {
121 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
122 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
123 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
124 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
125 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
126 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
127 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
128 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
129 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
131 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
132 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
133 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
134 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
135 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
136 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
137 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
138 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
139 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
141 #endif // TPETRA_DISTRIBUTOR_TIMERS 144 Distributor::init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
145 const Teuchos::RCP<Teuchos::FancyOStream>& out,
146 const Teuchos::RCP<Teuchos::ParameterList>& plist)
149 this->out_ = out.is_null () ?
150 Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
151 if (! plist.is_null ()) {
152 this->setParameterList (plist);
155 #ifdef TPETRA_DISTRIBUTOR_TIMERS 157 #endif // TPETRA_DISTRIBUTOR_TIMERS 159 if (verbose || debug_) {
160 TEUCHOS_TEST_FOR_EXCEPTION
161 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: " 162 "verbose and/or debug_ are true but out_ (pointer to the output " 163 "stream) is NULL. Please report this bug to the Tpetra developers.");
166 Teuchos::OSTab tab (out_);
167 std::ostringstream os;
168 os << comm_->getRank ()
169 <<
": Distributor ctor done" << std::endl;
176 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
177 , sendType_ (
Details::DISTRIBUTOR_SEND)
178 , barrierBetween_ (barrierBetween_default)
179 , debug_ (tpetraDistributorDebugDefault)
180 , selfMessage_ (false)
184 , totalReceiveLength_ (0)
185 , lastRoundBytesSend_ (0)
186 , lastRoundBytesRecv_ (0)
187 , useDistinctTags_ (useDistinctTags_default)
189 init (comm, Teuchos::null, Teuchos::null);
193 const Teuchos::RCP<Teuchos::FancyOStream>& out)
195 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
196 , sendType_ (
Details::DISTRIBUTOR_SEND)
197 , barrierBetween_ (barrierBetween_default)
198 , debug_ (tpetraDistributorDebugDefault)
199 , selfMessage_ (false)
203 , totalReceiveLength_ (0)
204 , lastRoundBytesSend_ (0)
205 , lastRoundBytesRecv_ (0)
206 , useDistinctTags_ (useDistinctTags_default)
208 init (comm, out, Teuchos::null);
212 const Teuchos::RCP<Teuchos::ParameterList>& plist)
214 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
215 , sendType_ (
Details::DISTRIBUTOR_SEND)
216 , barrierBetween_ (barrierBetween_default)
217 , debug_ (tpetraDistributorDebugDefault)
218 , selfMessage_ (false)
222 , totalReceiveLength_ (0)
223 , lastRoundBytesSend_ (0)
224 , lastRoundBytesRecv_ (0)
225 , useDistinctTags_ (useDistinctTags_default)
227 init (comm, Teuchos::null, plist);
231 const Teuchos::RCP<Teuchos::FancyOStream>& out,
232 const Teuchos::RCP<Teuchos::ParameterList>& plist)
234 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
235 , sendType_ (
Details::DISTRIBUTOR_SEND)
236 , barrierBetween_ (barrierBetween_default)
237 , debug_ (tpetraDistributorDebugDefault)
238 , selfMessage_ (false)
242 , totalReceiveLength_ (0)
243 , lastRoundBytesSend_ (0)
244 , lastRoundBytesRecv_ (0)
245 , useDistinctTags_ (useDistinctTags_default)
247 init (comm, out, plist);
251 : comm_ (distributor.comm_)
252 , out_ (distributor.out_)
253 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
254 , sendType_ (distributor.sendType_)
255 , barrierBetween_ (distributor.barrierBetween_)
256 , debug_ (distributor.debug_)
257 , selfMessage_ (distributor.selfMessage_)
258 , numSends_ (distributor.numSends_)
259 , procsTo_ (distributor.procsTo_)
260 , startsTo_ (distributor.startsTo_)
261 , lengthsTo_ (distributor.lengthsTo_)
262 , maxSendLength_ (distributor.maxSendLength_)
263 , indicesTo_ (distributor.indicesTo_)
264 , numReceives_ (distributor.numReceives_)
265 , totalReceiveLength_ (distributor.totalReceiveLength_)
266 , lengthsFrom_ (distributor.lengthsFrom_)
267 , procsFrom_ (distributor.procsFrom_)
268 , startsFrom_ (distributor.startsFrom_)
269 , indicesFrom_ (distributor.indicesFrom_)
270 , reverseDistributor_ (distributor.reverseDistributor_)
271 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
272 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
273 , useDistinctTags_ (distributor.useDistinctTags_)
275 using Teuchos::ParameterList;
276 using Teuchos::parameterList;
286 RCP<const ParameterList> rhsList = distributor.getParameterList ();
287 if (! rhsList.is_null ()) {
288 this->setMyParamList (parameterList (* rhsList));
291 #ifdef TPETRA_DISTRIBUTOR_TIMERS 293 #endif // TPETRA_DISTRIBUTOR_TIMERS 296 if (verbose || debug_) {
297 TEUCHOS_TEST_FOR_EXCEPTION
298 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: " 299 "verbose and/or debug_ are true but out_ (pointer to the output " 300 "stream) is NULL. Please report this bug to the Tpetra developers.");
301 Teuchos::OSTab tab (out_);
304 std::ostringstream os;
305 os << comm_->getRank ()
306 <<
": Distributor copy ctor done" << std::endl;
312 using Teuchos::ParameterList;
313 using Teuchos::parameterList;
316 std::swap (comm_, rhs.comm_);
317 std::swap (out_, rhs.out_);
318 std::swap (howInitialized_, rhs.howInitialized_);
319 std::swap (sendType_, rhs.sendType_);
320 std::swap (barrierBetween_, rhs.barrierBetween_);
321 std::swap (debug_, rhs.debug_);
322 std::swap (selfMessage_, rhs.selfMessage_);
323 std::swap (numSends_, rhs.numSends_);
324 std::swap (procsTo_, rhs.procsTo_);
325 std::swap (startsTo_, rhs.startsTo_);
326 std::swap (lengthsTo_, rhs.lengthsTo_);
327 std::swap (maxSendLength_, rhs.maxSendLength_);
328 std::swap (indicesTo_, rhs.indicesTo_);
329 std::swap (numReceives_, rhs.numReceives_);
330 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
331 std::swap (lengthsFrom_, rhs.lengthsFrom_);
332 std::swap (procsFrom_, rhs.procsFrom_);
333 std::swap (startsFrom_, rhs.startsFrom_);
334 std::swap (indicesFrom_, rhs.indicesFrom_);
335 std::swap (reverseDistributor_, rhs.reverseDistributor_);
336 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
337 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
338 std::swap (useDistinctTags_, rhs.useDistinctTags_);
342 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
343 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
344 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
345 rhsList = parameterList (*rhsList);
347 if (! rhsList.is_null ()) {
348 this->setMyParamList (rhsList);
350 if (! lhsList.is_null ()) {
351 rhs.setMyParamList (lhsList);
380 using Teuchos::FancyOStream;
381 using Teuchos::getIntegralValue;
382 using Teuchos::includesVerbLevel;
383 using Teuchos::OSTab;
384 using Teuchos::ParameterList;
385 using Teuchos::parameterList;
390 plist->validateParametersAndSetDefaults (*validParams);
392 const bool barrierBetween =
393 plist->get<
bool> (
"Barrier between receives and sends");
395 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
396 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
397 const bool debug = plist->get<
bool> (
"Debug");
402 const bool enable_cuda_rdma =
403 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
404 TEUCHOS_TEST_FOR_EXCEPTION
405 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::" 406 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA " 407 "support\" = false. This is no longer valid. You don't need to " 408 "specify this option any more; Tpetra assumes it is always true. " 409 "This is a very light assumption on the MPI implementation, and in " 410 "fact does not actually involve hardware or system RDMA support. " 411 "Tpetra just assumes that the MPI implementation can tell whether a " 412 "pointer points to host memory or CUDA device memory.");
419 TEUCHOS_TEST_FOR_EXCEPTION(
420 ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
421 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
422 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier " 423 "between receives and sends." << endl <<
"This is invalid; you must " 424 "include the barrier if you use ready sends." << endl <<
"Ready sends " 425 "require that their corresponding receives have already been posted, " 426 "and the only way to guarantee that in general is with a barrier.");
429 sendType_ = sendType;
430 barrierBetween_ = barrierBetween;
431 useDistinctTags_ = useDistinctTags;
436 this->setMyParamList (plist);
439 Teuchos::RCP<const Teuchos::ParameterList>
442 using Teuchos::Array;
443 using Teuchos::ParameterList;
444 using Teuchos::parameterList;
446 using Teuchos::setStringToIntegralParameter;
448 const bool barrierBetween = barrierBetween_default;
449 const bool useDistinctTags = useDistinctTags_default;
450 const bool debug = tpetraDistributorDebugDefault;
453 const std::string defaultSendType (
"Send");
454 Array<Details::EDistributorSendType> sendTypeEnums;
455 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
456 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
457 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
458 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
460 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
461 plist->set (
"Barrier between receives and sends", barrierBetween,
462 "Whether to execute a barrier between receives and sends in do" 463 "[Reverse]Posts(). Required for correctness when \"Send type\"" 464 "=\"Rsend\", otherwise correct but not recommended.");
465 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
466 defaultSendType,
"When using MPI, the variant of send to use in " 467 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
468 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct " 469 "MPI message tags for different code paths. Highly recommended" 470 " to avoid message collisions.");
471 plist->set (
"Debug", debug,
"Whether to print copious debugging output on " 473 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can " 474 "tell whether a pointer points to host memory or CUDA device " 475 "memory. You don't need to specify this option any more; " 476 "Tpetra assumes it is always true. This is a very light " 477 "assumption on the MPI implementation, and in fact does not " 478 "actually involve hardware or system RDMA support.");
486 Teuchos::setupVerboseObjectSublist (&*plist);
487 return Teuchos::rcp_const_cast<
const ParameterList> (plist);
492 {
return totalReceiveLength_; }
495 {
return numReceives_; }
498 {
return selfMessage_; }
501 {
return numSends_; }
504 {
return maxSendLength_; }
507 {
return procsFrom_; }
510 {
return lengthsFrom_; }
516 {
return lengthsTo_; }
518 Teuchos::RCP<Distributor>
520 if (reverseDistributor_.is_null ()) {
521 createReverseDistributor ();
523 TEUCHOS_TEST_FOR_EXCEPTION
524 (reverseDistributor_.is_null (), std::logic_error,
"The reverse " 525 "Distributor is null after createReverseDistributor returned. " 526 "Please report this bug to the Tpetra developers.");
527 return reverseDistributor_;
532 Distributor::createReverseDistributor()
const 534 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
535 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
536 reverseDistributor_->sendType_ = sendType_;
537 reverseDistributor_->barrierBetween_ = barrierBetween_;
538 reverseDistributor_->debug_ = debug_;
543 size_t totalSendLength =
544 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
549 size_t maxReceiveLength = 0;
550 const int myProcID = comm_->getRank();
551 for (
size_t i=0; i < numReceives_; ++i) {
552 if (procsFrom_[i] != myProcID) {
554 if (lengthsFrom_[i] > maxReceiveLength) {
555 maxReceiveLength = lengthsFrom_[i];
564 reverseDistributor_->selfMessage_ = selfMessage_;
565 reverseDistributor_->numSends_ = numReceives_;
566 reverseDistributor_->procsTo_ = procsFrom_;
567 reverseDistributor_->startsTo_ = startsFrom_;
568 reverseDistributor_->lengthsTo_ = lengthsFrom_;
569 reverseDistributor_->maxSendLength_ = maxReceiveLength;
570 reverseDistributor_->indicesTo_ = indicesFrom_;
571 reverseDistributor_->numReceives_ = numSends_;
572 reverseDistributor_->totalReceiveLength_ = totalSendLength;
573 reverseDistributor_->lengthsFrom_ = lengthsTo_;
574 reverseDistributor_->procsFrom_ = procsTo_;
575 reverseDistributor_->startsFrom_ = startsTo_;
576 reverseDistributor_->indicesFrom_ = indicesTo_;
585 reverseDistributor_->lastRoundBytesSend_ = 0;
586 reverseDistributor_->lastRoundBytesRecv_ = 0;
588 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
601 reverseDistributor_->reverseDistributor_ = Teuchos::null;
606 using Teuchos::Array;
607 using Teuchos::CommRequest;
608 using Teuchos::FancyOStream;
609 using Teuchos::includesVerbLevel;
610 using Teuchos::is_null;
611 using Teuchos::OSTab;
613 using Teuchos::waitAll;
616 Teuchos::OSTab tab (out_);
618 #ifdef TPETRA_DISTRIBUTOR_TIMERS 619 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
620 #endif // TPETRA_DISTRIBUTOR_TIMERS 622 const int myRank = comm_->getRank ();
626 std::ostringstream os;
627 os << myRank <<
": doWaits: # reqs = " 628 << requests_.size () << endl;
632 if (requests_.size() > 0) {
633 waitAll (*comm_, requests_());
635 #ifdef HAVE_TEUCHOS_DEBUG 637 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
638 it != requests_.end(); ++it)
640 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
641 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests " 642 "should all be null aftr calling Teuchos::waitAll() on them, but " 643 "at least one request is not null.");
645 #endif // HAVE_TEUCHOS_DEBUG 648 requests_.resize (0);
651 #ifdef HAVE_TEUCHOS_DEBUG 653 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
654 int globalSizeNonzero = 0;
655 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
657 Teuchos::outArg (globalSizeNonzero));
658 TEUCHOS_TEST_FOR_EXCEPTION(
659 globalSizeNonzero != 0, std::runtime_error,
660 "Tpetra::Distributor::doWaits: After waitAll, at least one process has " 661 "a nonzero number of outstanding posts. There should be none at this " 662 "point. Please report this bug to the Tpetra developers.");
664 #endif // HAVE_TEUCHOS_DEBUG 667 std::ostringstream os;
668 os << myRank <<
": doWaits done" << endl;
675 if (! reverseDistributor_.is_null()) {
676 reverseDistributor_->doWaits();
681 std::ostringstream out;
683 out <<
"\"Tpetra::Distributor\": {";
684 const std::string label = this->getObjectLabel ();
686 out <<
"Label: " << label <<
", ";
688 out <<
"How initialized: " 692 << DistributorSendTypeEnumToString (sendType_)
693 <<
", Barrier between receives and sends: " 694 << (barrierBetween_ ?
"true" :
"false")
695 <<
", Use distinct tags: " 696 << (useDistinctTags_ ?
"true" :
"false")
697 <<
", Debug: " << (debug_ ?
"true" :
"false")
704 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const 706 using Teuchos::toString;
707 using Teuchos::VERB_HIGH;
708 using Teuchos::VERB_EXTREME;
712 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
713 return std::string ();
716 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
717 auto outp = Teuchos::getFancyOStream (outStringP);
718 Teuchos::FancyOStream& out = *outp;
720 const int myRank = comm_->getRank ();
721 const int numProcs = comm_->getSize ();
722 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
723 Teuchos::OSTab tab1 (out);
727 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
728 out <<
"procsTo: " << toString (procsTo_) << endl;
729 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
732 if (vl == VERB_EXTREME) {
733 out <<
"startsTo: " << toString (startsTo_) << endl;
734 out <<
"indicesTo: " << toString (indicesTo_) << endl;
736 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
739 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
740 out <<
"startsFrom: " << toString (startsFrom_) << endl;
741 out <<
"procsFrom: " << toString (procsFrom_) << endl;
745 return outStringP->str ();
751 const Teuchos::EVerbosityLevel verbLevel)
const 754 using Teuchos::VERB_DEFAULT;
755 using Teuchos::VERB_NONE;
756 using Teuchos::VERB_LOW;
757 using Teuchos::VERB_MEDIUM;
758 using Teuchos::VERB_HIGH;
759 using Teuchos::VERB_EXTREME;
760 const Teuchos::EVerbosityLevel vl =
761 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
763 if (vl == VERB_NONE) {
771 if (comm_.is_null ()) {
774 const int myRank = comm_->getRank ();
775 const int numProcs = comm_->getSize ();
784 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
790 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
793 out <<
"\"Tpetra::Distributor\":" << endl;
794 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
796 const std::string label = this->getObjectLabel ();
798 out <<
"Label: " << label << endl;
800 out <<
"Number of processes: " << numProcs << endl
801 <<
"How initialized: " 805 out <<
"Parameters: " << endl;
806 Teuchos::OSTab tab2 (out);
807 out <<
"\"Send type\": " 808 << DistributorSendTypeEnumToString (sendType_) << endl
809 <<
"\"Barrier between receives and sends\": " 810 << (barrierBetween_ ?
"true" :
"false") << endl
811 <<
"\"Use distinct tags\": " 812 << (useDistinctTags_ ?
"true" :
"false") << endl
813 <<
"\"Debug\": " << (debug_ ?
"true" :
"false") << endl;
819 const std::string lclStr = this->localDescribeToString (vl);
823 out <<
"Reverse Distributor:";
824 if (reverseDistributor_.is_null ()) {
825 out <<
" null" << endl;
829 reverseDistributor_->describe (out, vl);
834 Distributor::computeReceives ()
836 using Teuchos::Array;
837 using Teuchos::ArrayRCP;
839 using Teuchos::CommStatus;
840 using Teuchos::CommRequest;
841 using Teuchos::ireceive;
844 using Teuchos::REDUCE_SUM;
845 using Teuchos::receive;
846 using Teuchos::reduce;
847 using Teuchos::scatter;
849 using Teuchos::waitAll;
852 Teuchos::OSTab tab (out_);
853 const int myRank = comm_->getRank();
854 const int numProcs = comm_->getSize();
857 const int pathTag = 2;
858 const int tag = this->getTag (pathTag);
862 std::ostringstream os;
863 os << myRank <<
": computeReceives: " 864 "{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
865 <<
", tag: " << tag <<
"}" << endl;
875 Array<int> toProcsFromMe (numProcs, 0);
876 #ifdef HAVE_TEUCHOS_DEBUG 877 bool counting_error =
false;
878 #endif // HAVE_TEUCHOS_DEBUG 879 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
880 #ifdef HAVE_TEUCHOS_DEBUG 881 if (toProcsFromMe[procsTo_[i]] != 0) {
882 counting_error =
true;
884 #endif // HAVE_TEUCHOS_DEBUG 885 toProcsFromMe[procsTo_[i]] = 1;
887 #ifdef HAVE_TEUCHOS_DEBUG 889 "Tpetra::Distributor::computeReceives: There was an error on at least " 890 "one process in counting the number of messages send by that process to " 891 "the other processs. Please report this bug to the Tpetra developers.",
893 #endif // HAVE_TEUCHOS_DEBUG 896 std::ostringstream os;
897 os << myRank <<
": computeReceives: Calling reduce and scatter" << endl;
954 Array<int> numRecvsOnEachProc;
955 if (myRank == root) {
956 numRecvsOnEachProc.resize (numProcs);
958 int numReceivesAsInt = 0;
959 reduce<int, int> (toProcsFromMe.getRawPtr (),
960 numRecvsOnEachProc.getRawPtr (),
961 numProcs, REDUCE_SUM, root, *comm_);
962 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
963 &numReceivesAsInt, 1, root, *comm_);
964 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
970 lengthsFrom_.assign (numReceives_, 0);
971 procsFrom_.assign (numReceives_, 0);
987 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
993 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
994 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
995 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
1000 const int anySourceProc = MPI_ANY_SOURCE;
1002 const int anySourceProc = -1;
1006 std::ostringstream os;
1007 os << myRank <<
": computeReceives: Posting " 1008 << actualNumReceives <<
" irecvs" << endl;
1013 for (
size_t i = 0; i < actualNumReceives; ++i) {
1018 lengthsFromBuffers[i].resize (1);
1019 lengthsFromBuffers[i][0] = as<size_t> (0);
1020 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1022 std::ostringstream os;
1023 os << myRank <<
": computeReceives: " 1024 "Posted any-proc irecv w/ specified tag " << tag << endl;
1030 std::ostringstream os;
1031 os << myRank <<
": computeReceives: " 1032 "posting " << numSends_ <<
" sends" << endl;
1043 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1044 if (procsTo_[i] != myRank) {
1048 const size_t*
const lengthsTo_i = &lengthsTo_[i];
1049 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1051 std::ostringstream os;
1052 os << myRank <<
": computeReceives: " 1053 "Posted send to Proc " << procsTo_[i] <<
" w/ specified tag " 1065 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1066 procsFrom_[numReceives_-1] = myRank;
1071 std::ostringstream os;
1072 os << myRank <<
": computeReceives: waitAll on " 1073 << requests.size () <<
" requests" << endl;
1082 waitAll (*comm_, requests (), statuses ());
1083 for (
size_t i = 0; i < actualNumReceives; ++i) {
1084 lengthsFrom_[i] = *lengthsFromBuffers[i];
1085 procsFrom_[i] = statuses[i]->getSourceRank ();
1091 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1094 totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1095 indicesFrom_.clear ();
1096 indicesFrom_.reserve (totalReceiveLength_);
1097 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1098 indicesFrom_.push_back(i);
1101 startsFrom_.clear ();
1102 startsFrom_.reserve (numReceives_);
1103 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1104 startsFrom_.push_back(j);
1105 j += lengthsFrom_[i];
1113 std::ostringstream os;
1114 os << myRank <<
": computeReceives: done" << endl;
1122 using Teuchos::outArg;
1123 using Teuchos::REDUCE_MAX;
1124 using Teuchos::reduceAll;
1127 Teuchos::OSTab tab (out_);
1128 const size_t numExports = exportProcIDs.size();
1129 const int myProcID = comm_->getRank();
1130 const int numProcs = comm_->getSize();
1133 std::ostringstream os;
1134 os << myProcID <<
": createFromSends" << endl;
1186 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1189 size_t numActive = 0;
1190 int needSendBuff = 0;
1192 #ifdef HAVE_TPETRA_DEBUG 1194 #endif // HAVE_TPETRA_DEBUG 1195 for (
size_t i = 0; i < numExports; ++i) {
1196 const int exportID = exportProcIDs[i];
1197 if (exportID >= numProcs) {
1198 #ifdef HAVE_TPETRA_DEBUG 1200 #endif // HAVE_TPETRA_DEBUG 1203 else if (exportID >= 0) {
1217 if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1224 #ifdef HAVE_TPETRA_DEBUG 1231 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1232 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1233 Teuchos::typeName(*
this) <<
"::createFromSends(): Process " << gbl_badID
1234 <<
", perhaps among other processes, got a bad send process ID.");
1249 #endif // HAVE_TPETRA_DEBUG 1251 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS) 1253 int global_needSendBuff;
1254 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1255 outArg (global_needSendBuff));
1257 global_needSendBuff != 0, std::runtime_error,
1258 "::createFromSends: Grouping export IDs together by process rank often " 1259 "improves performance.");
1265 if (starts[myProcID] != 0) {
1266 selfMessage_ =
true;
1269 selfMessage_ =
false;
1272 #ifdef HAVE_TEUCHOS_DEBUG 1273 bool index_neq_numActive =
false;
1274 bool send_neq_numSends =
false;
1276 if (! needSendBuff) {
1281 for (
int i = 0; i < numProcs; ++i) {
1289 indicesTo_.resize(0);
1292 procsTo_.assign(numSends_,0);
1293 startsTo_.assign(numSends_,0);
1294 lengthsTo_.assign(numSends_,0);
1301 size_t index = 0, procIndex = 0;
1302 for (
size_t i = 0; i < numSends_; ++i) {
1303 while (exportProcIDs[procIndex] < 0) {
1306 startsTo_[i] = procIndex;
1307 int procID = exportProcIDs[procIndex];
1308 procsTo_[i] = procID;
1309 index += starts[procID];
1310 procIndex += starts[procID];
1312 #ifdef HAVE_TEUCHOS_DEBUG 1313 if (index != numActive) {
1314 index_neq_numActive =
true;
1320 if (numSends_ > 0) {
1321 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1325 for (
size_t i = 0; i < numSends_; ++i) {
1326 int procID = procsTo_[i];
1327 lengthsTo_[i] = starts[procID];
1328 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1329 maxSendLength_ = lengthsTo_[i];
1340 if (starts[0] == 0 ) {
1346 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1348 i != starts.end(); ++i)
1350 if (*i != 0) ++numSends_;
1356 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1357 i=starts.rbegin()+1;
1358 i != starts.rend(); ++i)
1367 indicesTo_.resize(numActive);
1369 for (
size_t i = 0; i < numExports; ++i) {
1370 if (exportProcIDs[i] >= 0) {
1372 indicesTo_[starts[exportProcIDs[i]]] = i;
1374 ++starts[exportProcIDs[i]];
1386 for (
int proc = numProcs-1; proc != 0; --proc) {
1387 starts[proc] = starts[proc-1];
1390 starts[numProcs] = numActive;
1397 procsTo_.resize(numSends_);
1398 startsTo_.resize(numSends_);
1399 lengthsTo_.resize(numSends_);
1406 for (
int proc = 0; proc < numProcs; ++proc ) {
1407 if (starts[proc+1] != starts[proc]) {
1408 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1409 startsTo_[snd] = starts[proc];
1411 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1412 maxSendLength_ = lengthsTo_[snd];
1414 procsTo_[snd] = proc;
1418 #ifdef HAVE_TEUCHOS_DEBUG 1419 if (snd != numSends_) {
1420 send_neq_numSends =
true;
1424 #ifdef HAVE_TEUCHOS_DEBUG 1426 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1428 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1431 if (selfMessage_) --numSends_;
1437 std::ostringstream os;
1438 os << myProcID <<
": createFromSends: done" << endl;
1444 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1446 return totalReceiveLength_;
1452 const Teuchos::ArrayView<const int>& remoteProcIDs)
1461 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1464 int myProcID = comm_->getRank ();
1465 int numProcs = comm_->getSize();
1467 const size_t numExportIDs = exportProcIDs.size();
1468 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1470 size_t numActive = 0;
1471 int needSendBuff = 0;
1473 for(
size_t i = 0; i < numExportIDs; i++ )
1475 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1477 if( exportProcIDs[i] >= 0 )
1479 ++starts[ exportProcIDs[i] ];
1484 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1490 if (starts[0] == 0 ) {
1496 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1498 i != starts.end(); ++i)
1500 if (*i != 0) ++numSends_;
1506 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1507 i=starts.rbegin()+1;
1508 i != starts.rend(); ++i)
1517 indicesTo_.resize(numActive);
1519 for (
size_t i = 0; i < numExportIDs; ++i) {
1520 if (exportProcIDs[i] >= 0) {
1522 indicesTo_[starts[exportProcIDs[i]]] = i;
1524 ++starts[exportProcIDs[i]];
1527 for (
int proc = numProcs-1; proc != 0; --proc) {
1528 starts[proc] = starts[proc-1];
1531 starts[numProcs] = numActive;
1532 procsTo_.resize(numSends_);
1533 startsTo_.resize(numSends_);
1534 lengthsTo_.resize(numSends_);
1537 for (
int proc = 0; proc < numProcs; ++proc ) {
1538 if (starts[proc+1] != starts[proc]) {
1539 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1540 startsTo_[snd] = starts[proc];
1542 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1543 maxSendLength_ = lengthsTo_[snd];
1545 procsTo_[snd] = proc;
1555 for (
int i = 0; i < numProcs; ++i) {
1563 indicesTo_.resize(0);
1566 procsTo_.assign(numSends_,0);
1567 startsTo_.assign(numSends_,0);
1568 lengthsTo_.assign(numSends_,0);
1575 size_t index = 0, procIndex = 0;
1576 for (
size_t i = 0; i < numSends_; ++i) {
1577 while (exportProcIDs[procIndex] < 0) {
1580 startsTo_[i] = procIndex;
1581 int procID = exportProcIDs[procIndex];
1582 procsTo_[i] = procID;
1583 index += starts[procID];
1584 procIndex += starts[procID];
1589 if (numSends_ > 0) {
1590 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1594 for (
size_t i = 0; i < numSends_; ++i) {
1595 int procID = procsTo_[i];
1596 lengthsTo_[i] = starts[procID];
1597 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1598 maxSendLength_ = lengthsTo_[i];
1604 numSends_ -= selfMessage_;
1605 std::vector<int> recv_list;
1606 recv_list.reserve(numSends_);
1609 for(
int i=0; i<remoteProcIDs.size(); i++) {
1610 if(remoteProcIDs[i]>last_pid) {
1611 recv_list.push_back(remoteProcIDs[i]);
1612 last_pid = remoteProcIDs[i];
1614 else if (remoteProcIDs[i]<last_pid)
1615 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1617 numReceives_ = recv_list.size();
1619 procsFrom_.assign(numReceives_,0);
1620 lengthsFrom_.assign(numReceives_,0);
1621 indicesFrom_.assign(numReceives_,0);
1622 startsFrom_.assign(numReceives_,0);
1624 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1626 procsFrom_[i] = recv_list[i];
1628 for( ; j<(size_t)remoteProcIDs.size() &&
1629 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1630 lengthsFrom_[i] = j-jlast;
1632 totalReceiveLength_ = remoteProcIDs.size();
1633 indicesFrom_.clear ();
1634 indicesFrom_.reserve (totalReceiveLength_);
1635 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1636 indicesFrom_.push_back(i);
1639 numReceives_-=selfMessage_;
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
static bool verbose()
Whether Tpetra is in verbose mode.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.