Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Teuchos_StandardParameterEntryValidators.hpp"
44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
45 #include <numeric>
46 
47 namespace Tpetra {
48  namespace Details {
49  std::string
51  {
52  if (sendType == DISTRIBUTOR_ISEND) {
53  return "Isend";
54  }
55  else if (sendType == DISTRIBUTOR_RSEND) {
56  return "Rsend";
57  }
58  else if (sendType == DISTRIBUTOR_SEND) {
59  return "Send";
60  }
61  else if (sendType == DISTRIBUTOR_SSEND) {
62  return "Ssend";
63  }
64  else {
65  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
66  "EDistributorSendType enum value " << sendType << ".");
67  }
68  }
69 
70  std::string
72  {
73  switch (how) {
74  case Details::DISTRIBUTOR_NOT_INITIALIZED:
75  return "Not initialized yet";
76  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
77  return "By createFromSends";
78  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
79  return "By createFromRecvs";
80  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81  return "By createReverseDistributor";
82  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83  return "By copy constructor";
84  default:
85  return "INVALID";
86  }
87  }
88  } // namespace Details
89 
90  Teuchos::Array<std::string>
92  {
93  Teuchos::Array<std::string> sendTypes;
94  sendTypes.push_back ("Isend");
95  sendTypes.push_back ("Rsend");
96  sendTypes.push_back ("Send");
97  sendTypes.push_back ("Ssend");
98  return sendTypes;
99  }
100 
101  // We set default values of Distributor's Boolean parameters here,
102  // in this one place. That way, if we want to change the default
103  // value of a parameter, we don't have to search the whole file to
104  // ensure a consistent setting.
105  namespace {
106  // Default value of the "Debug" parameter.
107  const bool tpetraDistributorDebugDefault = false;
108  // Default value of the "Barrier between receives and sends" parameter.
109  const bool barrierBetween_default = false;
110  // Default value of the "Use distinct tags" parameter.
111  const bool useDistinctTags_default = true;
112  } // namespace (anonymous)
113 
114  int Distributor::getTag (const int pathTag) const {
115  return useDistinctTags_ ? pathTag : comm_->getTag ();
116  }
117 
118 
119 #ifdef TPETRA_DISTRIBUTOR_TIMERS
120  void Distributor::makeTimers () {
121  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
122  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
123  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
124  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
125  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
126  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
127  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
128  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
129  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
130 
131  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
132  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
133  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
134  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
135  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
136  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
137  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
138  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
139  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
140  }
141 #endif // TPETRA_DISTRIBUTOR_TIMERS
142 
143  void
144  Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
145  const Teuchos::RCP<Teuchos::FancyOStream>& out,
146  const Teuchos::RCP<Teuchos::ParameterList>& plist)
147  {
148  const bool verbose = Tpetra::Details::Behavior::verbose("Distributor");
149  this->out_ = out.is_null () ?
150  Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
151  if (! plist.is_null ()) {
152  this->setParameterList (plist);
153  }
154 
155 #ifdef TPETRA_DISTRIBUTOR_TIMERS
156  makeTimers ();
157 #endif // TPETRA_DISTRIBUTOR_TIMERS
158 
159  if (verbose || debug_) {
160  TEUCHOS_TEST_FOR_EXCEPTION
161  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: "
162  "verbose and/or debug_ are true but out_ (pointer to the output "
163  "stream) is NULL. Please report this bug to the Tpetra developers.");
164  }
165  if (verbose) {
166  Teuchos::OSTab tab (out_);
167  std::ostringstream os;
168  os << comm_->getRank ()
169  << ": Distributor ctor done" << std::endl;
170  *out_ << os.str ();
171  }
172  }
173 
174  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
175  : comm_ (comm)
176  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
177  , sendType_ (Details::DISTRIBUTOR_SEND)
178  , barrierBetween_ (barrierBetween_default)
179  , debug_ (tpetraDistributorDebugDefault)
180  , selfMessage_ (false)
181  , numSends_ (0)
182  , maxSendLength_ (0)
183  , numReceives_ (0)
184  , totalReceiveLength_ (0)
185  , lastRoundBytesSend_ (0)
186  , lastRoundBytesRecv_ (0)
187  , useDistinctTags_ (useDistinctTags_default)
188  {
189  init (comm, Teuchos::null, Teuchos::null);
190  }
191 
192  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
193  const Teuchos::RCP<Teuchos::FancyOStream>& out)
194  : comm_ (comm)
195  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
196  , sendType_ (Details::DISTRIBUTOR_SEND)
197  , barrierBetween_ (barrierBetween_default)
198  , debug_ (tpetraDistributorDebugDefault)
199  , selfMessage_ (false)
200  , numSends_ (0)
201  , maxSendLength_ (0)
202  , numReceives_ (0)
203  , totalReceiveLength_ (0)
204  , lastRoundBytesSend_ (0)
205  , lastRoundBytesRecv_ (0)
206  , useDistinctTags_ (useDistinctTags_default)
207  {
208  init (comm, out, Teuchos::null);
209  }
210 
211  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
212  const Teuchos::RCP<Teuchos::ParameterList>& plist)
213  : comm_ (comm)
214  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
215  , sendType_ (Details::DISTRIBUTOR_SEND)
216  , barrierBetween_ (barrierBetween_default)
217  , debug_ (tpetraDistributorDebugDefault)
218  , selfMessage_ (false)
219  , numSends_ (0)
220  , maxSendLength_ (0)
221  , numReceives_ (0)
222  , totalReceiveLength_ (0)
223  , lastRoundBytesSend_ (0)
224  , lastRoundBytesRecv_ (0)
225  , useDistinctTags_ (useDistinctTags_default)
226  {
227  init (comm, Teuchos::null, plist);
228  }
229 
230  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
231  const Teuchos::RCP<Teuchos::FancyOStream>& out,
232  const Teuchos::RCP<Teuchos::ParameterList>& plist)
233  : comm_ (comm)
234  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
235  , sendType_ (Details::DISTRIBUTOR_SEND)
236  , barrierBetween_ (barrierBetween_default)
237  , debug_ (tpetraDistributorDebugDefault)
238  , selfMessage_ (false)
239  , numSends_ (0)
240  , maxSendLength_ (0)
241  , numReceives_ (0)
242  , totalReceiveLength_ (0)
243  , lastRoundBytesSend_ (0)
244  , lastRoundBytesRecv_ (0)
245  , useDistinctTags_ (useDistinctTags_default)
246  {
247  init (comm, out, plist);
248  }
249 
250  Distributor::Distributor (const Distributor & distributor)
251  : comm_ (distributor.comm_)
252  , out_ (distributor.out_)
253  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
254  , sendType_ (distributor.sendType_)
255  , barrierBetween_ (distributor.barrierBetween_)
256  , debug_ (distributor.debug_)
257  , selfMessage_ (distributor.selfMessage_)
258  , numSends_ (distributor.numSends_)
259  , procsTo_ (distributor.procsTo_)
260  , startsTo_ (distributor.startsTo_)
261  , lengthsTo_ (distributor.lengthsTo_)
262  , maxSendLength_ (distributor.maxSendLength_)
263  , indicesTo_ (distributor.indicesTo_)
264  , numReceives_ (distributor.numReceives_)
265  , totalReceiveLength_ (distributor.totalReceiveLength_)
266  , lengthsFrom_ (distributor.lengthsFrom_)
267  , procsFrom_ (distributor.procsFrom_)
268  , startsFrom_ (distributor.startsFrom_)
269  , indicesFrom_ (distributor.indicesFrom_)
270  , reverseDistributor_ (distributor.reverseDistributor_)
271  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
272  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
273  , useDistinctTags_ (distributor.useDistinctTags_)
274  {
275  using Teuchos::ParameterList;
276  using Teuchos::parameterList;
277  using Teuchos::RCP;
278  using Teuchos::rcp;
279 
280  // Clone the right-hand side's ParameterList, so that this' list
281  // is decoupled from the right-hand side's list. We don't need to
282  // do validation, since the right-hand side already has validated
283  // its parameters, so just call setMyParamList(). Note that this
284  // won't work if the right-hand side doesn't have a list set yet,
285  // so we first check for null.
286  RCP<const ParameterList> rhsList = distributor.getParameterList ();
287  if (! rhsList.is_null ()) {
288  this->setMyParamList (parameterList (* rhsList));
289  }
290 
291 #ifdef TPETRA_DISTRIBUTOR_TIMERS
292  makeTimers ();
293 #endif // TPETRA_DISTRIBUTOR_TIMERS
294 
295  const bool verbose = Tpetra::Details::Behavior::verbose("Distributor");
296  if (verbose || debug_) {
297  TEUCHOS_TEST_FOR_EXCEPTION
298  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: "
299  "verbose and/or debug_ are true but out_ (pointer to the output "
300  "stream) is NULL. Please report this bug to the Tpetra developers.");
301  Teuchos::OSTab tab (out_);
302  }
303  if (verbose) {
304  std::ostringstream os;
305  os << comm_->getRank ()
306  << ": Distributor copy ctor done" << std::endl;
307  *out_ << os.str ();
308  }
309  }
310 
312  using Teuchos::ParameterList;
313  using Teuchos::parameterList;
314  using Teuchos::RCP;
315 
316  std::swap (comm_, rhs.comm_);
317  std::swap (out_, rhs.out_);
318  std::swap (howInitialized_, rhs.howInitialized_);
319  std::swap (sendType_, rhs.sendType_);
320  std::swap (barrierBetween_, rhs.barrierBetween_);
321  std::swap (debug_, rhs.debug_);
322  std::swap (selfMessage_, rhs.selfMessage_);
323  std::swap (numSends_, rhs.numSends_);
324  std::swap (procsTo_, rhs.procsTo_);
325  std::swap (startsTo_, rhs.startsTo_);
326  std::swap (lengthsTo_, rhs.lengthsTo_);
327  std::swap (maxSendLength_, rhs.maxSendLength_);
328  std::swap (indicesTo_, rhs.indicesTo_);
329  std::swap (numReceives_, rhs.numReceives_);
330  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
331  std::swap (lengthsFrom_, rhs.lengthsFrom_);
332  std::swap (procsFrom_, rhs.procsFrom_);
333  std::swap (startsFrom_, rhs.startsFrom_);
334  std::swap (indicesFrom_, rhs.indicesFrom_);
335  std::swap (reverseDistributor_, rhs.reverseDistributor_);
336  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
337  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
338  std::swap (useDistinctTags_, rhs.useDistinctTags_);
339 
340  // Swap parameter lists. If they are the same object, make a deep
341  // copy first, so that modifying one won't modify the other one.
342  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
343  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
344  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
345  rhsList = parameterList (*rhsList);
346  }
347  if (! rhsList.is_null ()) {
348  this->setMyParamList (rhsList);
349  }
350  if (! lhsList.is_null ()) {
351  rhs.setMyParamList (lhsList);
352  }
353 
354  // We don't need to swap timers, because all instances of
355  // Distributor use the same timers.
356  }
357 
359  {
360  // mfh 10 May 2017: We shouldn't have any outstanding
361  // communication requests at this point. It would be legitimate
362  // to check here, and report an error if requests_.size() != 0.
363  // However, throwing in a destructor is bad form. See #1303:
364  //
365  // https://github.com/trilinos/Trilinos/issues/1303
366  //
367  // If someone wants to restore the error check, please don't
368  // throw; instead, use MPI_Abort (or exit() in a non-MPI build).
369 
370  // TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
371  // "Tpetra::Distributor: Destructor called with " << requests_.size()
372  // << " outstanding posts (unfulfilled communication requests). There "
373  // "should be none at this point. Please report this bug to the Tpetra "
374  // "developers.");
375  }
376 
377  void
378  Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
379  {
380  using Teuchos::FancyOStream;
381  using Teuchos::getIntegralValue;
382  using Teuchos::includesVerbLevel;
383  using Teuchos::OSTab;
384  using Teuchos::ParameterList;
385  using Teuchos::parameterList;
386  using Teuchos::RCP;
387  using std::endl;
388 
389  RCP<const ParameterList> validParams = getValidParameters ();
390  plist->validateParametersAndSetDefaults (*validParams);
391 
392  const bool barrierBetween =
393  plist->get<bool> ("Barrier between receives and sends");
394  const Details::EDistributorSendType sendType =
395  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
396  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
397  const bool debug = plist->get<bool> ("Debug");
398  {
399  // mfh 03 May 2016: We keep this option only for backwards
400  // compatibility, but it must always be true. See discussion of
401  // Github Issue #227.
402  const bool enable_cuda_rdma =
403  plist->get<bool> ("Enable MPI CUDA RDMA support");
404  TEUCHOS_TEST_FOR_EXCEPTION
405  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
406  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
407  "support\" = false. This is no longer valid. You don't need to "
408  "specify this option any more; Tpetra assumes it is always true. "
409  "This is a very light assumption on the MPI implementation, and in "
410  "fact does not actually involve hardware or system RDMA support. "
411  "Tpetra just assumes that the MPI implementation can tell whether a "
412  "pointer points to host memory or CUDA device memory.");
413  }
414 
415  // We check this property explicitly, since we haven't yet learned
416  // how to make a validator that can cross-check properties.
417  // Later, turn this into a validator so that it can be embedded in
418  // the valid ParameterList and used in Optika.
419  TEUCHOS_TEST_FOR_EXCEPTION(
420  ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
421  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
422  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
423  "between receives and sends." << endl << "This is invalid; you must "
424  "include the barrier if you use ready sends." << endl << "Ready sends "
425  "require that their corresponding receives have already been posted, "
426  "and the only way to guarantee that in general is with a barrier.");
427 
428  // Now that we've validated the input list, save the results.
429  sendType_ = sendType;
430  barrierBetween_ = barrierBetween;
431  useDistinctTags_ = useDistinctTags;
432  debug_ = debug;
433 
434  // ParameterListAcceptor semantics require pointer identity of the
435  // sublist passed to setParameterList(), so we save the pointer.
436  this->setMyParamList (plist);
437  }
438 
439  Teuchos::RCP<const Teuchos::ParameterList>
441  {
442  using Teuchos::Array;
443  using Teuchos::ParameterList;
444  using Teuchos::parameterList;
445  using Teuchos::RCP;
446  using Teuchos::setStringToIntegralParameter;
447 
448  const bool barrierBetween = barrierBetween_default;
449  const bool useDistinctTags = useDistinctTags_default;
450  const bool debug = tpetraDistributorDebugDefault;
451 
452  Array<std::string> sendTypes = distributorSendTypes ();
453  const std::string defaultSendType ("Send");
454  Array<Details::EDistributorSendType> sendTypeEnums;
455  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
456  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
457  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
458  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
459 
460  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
461  plist->set ("Barrier between receives and sends", barrierBetween,
462  "Whether to execute a barrier between receives and sends in do"
463  "[Reverse]Posts(). Required for correctness when \"Send type\""
464  "=\"Rsend\", otherwise correct but not recommended.");
465  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
466  defaultSendType, "When using MPI, the variant of send to use in "
467  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
468  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
469  "MPI message tags for different code paths. Highly recommended"
470  " to avoid message collisions.");
471  plist->set ("Debug", debug, "Whether to print copious debugging output on "
472  "all processes.");
473  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
474  "tell whether a pointer points to host memory or CUDA device "
475  "memory. You don't need to specify this option any more; "
476  "Tpetra assumes it is always true. This is a very light "
477  "assumption on the MPI implementation, and in fact does not "
478  "actually involve hardware or system RDMA support.");
479 
480  // mfh 24 Dec 2015: Tpetra no longer inherits from
481  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
482  // sublist. However, we retain the "VerboseObject" sublist
483  // anyway, for backwards compatibility (otherwise the above
484  // validation would fail with an invalid parameter name, should
485  // the user still want to provide this list).
486  Teuchos::setupVerboseObjectSublist (&*plist);
487  return Teuchos::rcp_const_cast<const ParameterList> (plist);
488  }
489 
490 
492  { return totalReceiveLength_; }
493 
495  { return numReceives_; }
496 
498  { return selfMessage_; }
499 
501  { return numSends_; }
502 
504  { return maxSendLength_; }
505 
506  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
507  { return procsFrom_; }
508 
509  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
510  { return lengthsFrom_; }
511 
512  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
513  { return procsTo_; }
514 
515  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
516  { return lengthsTo_; }
517 
518  Teuchos::RCP<Distributor>
520  if (reverseDistributor_.is_null ()) {
521  createReverseDistributor ();
522  }
523  TEUCHOS_TEST_FOR_EXCEPTION
524  (reverseDistributor_.is_null (), std::logic_error, "The reverse "
525  "Distributor is null after createReverseDistributor returned. "
526  "Please report this bug to the Tpetra developers.");
527  return reverseDistributor_;
528  }
529 
530 
531  void
532  Distributor::createReverseDistributor() const
533  {
534  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_, out_));
535  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
536  reverseDistributor_->sendType_ = sendType_;
537  reverseDistributor_->barrierBetween_ = barrierBetween_;
538  reverseDistributor_->debug_ = debug_;
539 
540  // The total length of all the sends of this Distributor. We
541  // calculate it because it's the total length of all the receives
542  // of the reverse Distributor.
543  size_t totalSendLength =
544  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
545 
546  // The maximum length of any of the receives of this Distributor.
547  // We calculate it because it's the maximum length of any of the
548  // sends of the reverse Distributor.
549  size_t maxReceiveLength = 0;
550  const int myProcID = comm_->getRank();
551  for (size_t i=0; i < numReceives_; ++i) {
552  if (procsFrom_[i] != myProcID) {
553  // Don't count receives for messages sent by myself to myself.
554  if (lengthsFrom_[i] > maxReceiveLength) {
555  maxReceiveLength = lengthsFrom_[i];
556  }
557  }
558  }
559 
560  // Initialize all of reverseDistributor's data members. This
561  // mainly just involves flipping "send" and "receive," or the
562  // equivalent "to" and "from."
563 
564  reverseDistributor_->selfMessage_ = selfMessage_;
565  reverseDistributor_->numSends_ = numReceives_;
566  reverseDistributor_->procsTo_ = procsFrom_;
567  reverseDistributor_->startsTo_ = startsFrom_;
568  reverseDistributor_->lengthsTo_ = lengthsFrom_;
569  reverseDistributor_->maxSendLength_ = maxReceiveLength;
570  reverseDistributor_->indicesTo_ = indicesFrom_;
571  reverseDistributor_->numReceives_ = numSends_;
572  reverseDistributor_->totalReceiveLength_ = totalSendLength;
573  reverseDistributor_->lengthsFrom_ = lengthsTo_;
574  reverseDistributor_->procsFrom_ = procsTo_;
575  reverseDistributor_->startsFrom_ = startsTo_;
576  reverseDistributor_->indicesFrom_ = indicesTo_;
577 
578  // requests_: Allocated on demand.
579  // reverseDistributor_: See note below
580 
581  // mfh 31 Mar 2016: These are statistics, kept on calls to
582  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
583  // when I started, and I didn't add them, so I don't know if they
584  // are accurate.
585  reverseDistributor_->lastRoundBytesSend_ = 0;
586  reverseDistributor_->lastRoundBytesRecv_ = 0;
587 
588  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
589 
590  // I am my reverse Distributor's reverse Distributor.
591  // Thus, it would be legit to do the following:
592  //
593  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
594  //
595  // (Note use of a "weak reference" to avoid a circular RCP
596  // dependency.) The only issue is that if users hold on to the
597  // reverse Distributor but let go of the forward one, this
598  // reference won't be valid anymore. However, the reverse
599  // Distributor is really an implementation detail of Distributor
600  // and not meant to be used directly, so we don't need to do this.
601  reverseDistributor_->reverseDistributor_ = Teuchos::null;
602  }
603 
604 
606  using Teuchos::Array;
607  using Teuchos::CommRequest;
608  using Teuchos::FancyOStream;
609  using Teuchos::includesVerbLevel;
610  using Teuchos::is_null;
611  using Teuchos::OSTab;
612  using Teuchos::RCP;
613  using Teuchos::waitAll;
614  using std::endl;
615 
616  Teuchos::OSTab tab (out_);
617 
618 #ifdef TPETRA_DISTRIBUTOR_TIMERS
619  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
620 #endif // TPETRA_DISTRIBUTOR_TIMERS
621 
622  const int myRank = comm_->getRank ();
623  const bool verbose = Tpetra::Details::Behavior::verbose("Distributor");
624 
625  if (verbose) {
626  std::ostringstream os;
627  os << myRank << ": doWaits: # reqs = "
628  << requests_.size () << endl;
629  *out_ << os.str ();
630  }
631 
632  if (requests_.size() > 0) {
633  waitAll (*comm_, requests_());
634 
635 #ifdef HAVE_TEUCHOS_DEBUG
636  // Make sure that waitAll() nulled out all the requests.
637  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
638  it != requests_.end(); ++it)
639  {
640  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
641  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
642  "should all be null aftr calling Teuchos::waitAll() on them, but "
643  "at least one request is not null.");
644  }
645 #endif // HAVE_TEUCHOS_DEBUG
646  // Restore the invariant that requests_.size() is the number of
647  // outstanding nonblocking communication requests.
648  requests_.resize (0);
649  }
650 
651 #ifdef HAVE_TEUCHOS_DEBUG
652  {
653  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
654  int globalSizeNonzero = 0;
655  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
656  localSizeNonzero,
657  Teuchos::outArg (globalSizeNonzero));
658  TEUCHOS_TEST_FOR_EXCEPTION(
659  globalSizeNonzero != 0, std::runtime_error,
660  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
661  "a nonzero number of outstanding posts. There should be none at this "
662  "point. Please report this bug to the Tpetra developers.");
663  }
664 #endif // HAVE_TEUCHOS_DEBUG
665 
666  if (verbose) {
667  std::ostringstream os;
668  os << myRank << ": doWaits done" << endl;
669  *out_ << os.str ();
670  }
671  }
672 
674  // call doWaits() on the reverse Distributor, if it exists
675  if (! reverseDistributor_.is_null()) {
676  reverseDistributor_->doWaits();
677  }
678  }
679 
680  std::string Distributor::description () const {
681  std::ostringstream out;
682 
683  out << "\"Tpetra::Distributor\": {";
684  const std::string label = this->getObjectLabel ();
685  if (label != "") {
686  out << "Label: " << label << ", ";
687  }
688  out << "How initialized: "
690  << ", Parameters: {"
691  << "Send type: "
692  << DistributorSendTypeEnumToString (sendType_)
693  << ", Barrier between receives and sends: "
694  << (barrierBetween_ ? "true" : "false")
695  << ", Use distinct tags: "
696  << (useDistinctTags_ ? "true" : "false")
697  << ", Debug: " << (debug_ ? "true" : "false")
698  << "}}";
699  return out.str ();
700  }
701 
702  std::string
703  Distributor::
704  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
705  {
706  using Teuchos::toString;
707  using Teuchos::VERB_HIGH;
708  using Teuchos::VERB_EXTREME;
709  using std::endl;
710 
711  // This preserves current behavior of Distributor.
712  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
713  return std::string ();
714  }
715 
716  auto outStringP = Teuchos::rcp (new std::ostringstream ());
717  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
718  Teuchos::FancyOStream& out = *outp;
719 
720  const int myRank = comm_->getRank ();
721  const int numProcs = comm_->getSize ();
722  out << "Process " << myRank << " of " << numProcs << ":" << endl;
723  Teuchos::OSTab tab1 (out);
724 
725  out << "selfMessage: " << hasSelfMessage () << endl;
726  out << "numSends: " << getNumSends () << endl;
727  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
728  out << "procsTo: " << toString (procsTo_) << endl;
729  out << "lengthsTo: " << toString (lengthsTo_) << endl;
730  out << "maxSendLength: " << getMaxSendLength () << endl;
731  }
732  if (vl == VERB_EXTREME) {
733  out << "startsTo: " << toString (startsTo_) << endl;
734  out << "indicesTo: " << toString (indicesTo_) << endl;
735  }
736  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
737  out << "numReceives: " << getNumReceives () << endl;
738  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
739  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
740  out << "startsFrom: " << toString (startsFrom_) << endl;
741  out << "procsFrom: " << toString (procsFrom_) << endl;
742  }
743 
744  out.flush (); // make sure the ostringstream got everything
745  return outStringP->str ();
746  }
747 
748  void
750  describe (Teuchos::FancyOStream &out,
751  const Teuchos::EVerbosityLevel verbLevel) const
752  {
753  using std::endl;
754  using Teuchos::VERB_DEFAULT;
755  using Teuchos::VERB_NONE;
756  using Teuchos::VERB_LOW;
757  using Teuchos::VERB_MEDIUM;
758  using Teuchos::VERB_HIGH;
759  using Teuchos::VERB_EXTREME;
760  const Teuchos::EVerbosityLevel vl =
761  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
762 
763  if (vl == VERB_NONE) {
764  return; // don't print anything
765  }
766  // If this Distributor's Comm is null, then the the calling
767  // process does not participate in Distributor-related collective
768  // operations with the other processes. In that case, it is not
769  // even legal to call this method. The reasonable thing to do in
770  // that case is nothing.
771  if (comm_.is_null ()) {
772  return;
773  }
774  const int myRank = comm_->getRank ();
775  const int numProcs = comm_->getSize ();
776 
777  // Only Process 0 should touch the output stream, but this method
778  // in general may need to do communication. Thus, we may need to
779  // preserve the current tab level across multiple "if (myRank ==
780  // 0) { ... }" inner scopes. This is why we sometimes create
781  // OSTab instances by pointer, instead of by value. We only need
782  // to create them by pointer if the tab level must persist through
783  // multiple inner scopes.
784  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
785 
786  if (myRank == 0) {
787  // At every verbosity level but VERB_NONE, Process 0 prints.
788  // By convention, describe() always begins with a tab before
789  // printing.
790  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
791  // We quote the class name because it contains colons.
792  // This makes the output valid YAML.
793  out << "\"Tpetra::Distributor\":" << endl;
794  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
795 
796  const std::string label = this->getObjectLabel ();
797  if (label != "") {
798  out << "Label: " << label << endl;
799  }
800  out << "Number of processes: " << numProcs << endl
801  << "How initialized: "
803  << endl;
804  {
805  out << "Parameters: " << endl;
806  Teuchos::OSTab tab2 (out);
807  out << "\"Send type\": "
808  << DistributorSendTypeEnumToString (sendType_) << endl
809  << "\"Barrier between receives and sends\": "
810  << (barrierBetween_ ? "true" : "false") << endl
811  << "\"Use distinct tags\": "
812  << (useDistinctTags_ ? "true" : "false") << endl
813  << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
814  }
815  } // if myRank == 0
816 
817  // This is collective over the Map's communicator.
818  if (vl > VERB_LOW) {
819  const std::string lclStr = this->localDescribeToString (vl);
820  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
821  }
822 
823  out << "Reverse Distributor:";
824  if (reverseDistributor_.is_null ()) {
825  out << " null" << endl;
826  }
827  else {
828  out << endl;
829  reverseDistributor_->describe (out, vl);
830  }
831  }
832 
833  void
834  Distributor::computeReceives ()
835  {
836  using Teuchos::Array;
837  using Teuchos::ArrayRCP;
838  using Teuchos::as;
839  using Teuchos::CommStatus;
840  using Teuchos::CommRequest;
841  using Teuchos::ireceive;
842  using Teuchos::RCP;
843  using Teuchos::rcp;
844  using Teuchos::REDUCE_SUM;
845  using Teuchos::receive;
846  using Teuchos::reduce;
847  using Teuchos::scatter;
848  using Teuchos::send;
849  using Teuchos::waitAll;
850  using std::endl;
851 
852  Teuchos::OSTab tab (out_);
853  const int myRank = comm_->getRank();
854  const int numProcs = comm_->getSize();
855 
856  // MPI tag for nonblocking receives and blocking sends in this method.
857  const int pathTag = 2;
858  const int tag = this->getTag (pathTag);
859  const bool verbose = Tpetra::Details::Behavior::verbose("Distributor");
860 
861  if (verbose) {
862  std::ostringstream os;
863  os << myRank << ": computeReceives: "
864  "{selfMessage_: " << (selfMessage_ ? "true" : "false")
865  << ", tag: " << tag << "}" << endl;
866  *out_ << os.str ();
867  }
868 
869  // toProcsFromMe[i] == the number of messages sent by this process
870  // to process i. The data in numSends_, procsTo_, and lengthsTo_
871  // concern the contiguous sends. Therefore, each process will be
872  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
873  // either be 0 or 1.
874  {
875  Array<int> toProcsFromMe (numProcs, 0);
876 #ifdef HAVE_TEUCHOS_DEBUG
877  bool counting_error = false;
878 #endif // HAVE_TEUCHOS_DEBUG
879  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
880 #ifdef HAVE_TEUCHOS_DEBUG
881  if (toProcsFromMe[procsTo_[i]] != 0) {
882  counting_error = true;
883  }
884 #endif // HAVE_TEUCHOS_DEBUG
885  toProcsFromMe[procsTo_[i]] = 1;
886  }
887 #ifdef HAVE_TEUCHOS_DEBUG
888  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
889  "Tpetra::Distributor::computeReceives: There was an error on at least "
890  "one process in counting the number of messages send by that process to "
891  "the other processs. Please report this bug to the Tpetra developers.",
892  *comm_);
893 #endif // HAVE_TEUCHOS_DEBUG
894 
895  if (verbose) {
896  std::ostringstream os;
897  os << myRank << ": computeReceives: Calling reduce and scatter" << endl;
898  *out_ << os.str ();
899  }
900 
901  // Compute the number of receives that this process needs to
902  // post. The number of receives includes any self sends (i.e.,
903  // messages sent by this process to itself).
904  //
905  // (We will use numReceives_ this below to post exactly that
906  // number of receives, with MPI_ANY_SOURCE as the sending rank.
907  // This will tell us from which processes this process expects
908  // to receive, and how many packets of data we expect to receive
909  // from each process.)
910  //
911  // toProcsFromMe[i] is the number of messages sent by this
912  // process to process i. Compute the sum (elementwise) of all
913  // the toProcsFromMe arrays on all processes in the
914  // communicator. If the array x is that sum, then if this
915  // process has rank j, x[j] is the number of messages sent
916  // to process j, that is, the number of receives on process j
917  // (including any messages sent by process j to itself).
918  //
919  // Yes, this requires storing and operating on an array of
920  // length P, where P is the number of processes in the
921  // communicator. Epetra does this too. Avoiding this O(P)
922  // memory bottleneck would require some research.
923  //
924  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
925  // implement this O(P) memory algorithm.
926  //
927  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
928  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
929  // Then, scatter the latter, so that each process p gets
930  // numRecvsOnEachProc[p].
931  //
932  // 2. Like #1, but use MPI_Reduce_scatter instead of
933  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
934  // optimized to reduce the number of messages, but
935  // MPI_Reduce_scatter is more general than we need (it
936  // allows the equivalent of MPI_Scatterv). See Bug 6336.
937  //
938  // 3. Do an all-reduce on toProcsFromMe, and let my process
939  // (with rank myRank) get numReceives_ from
940  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
941  // all-reduce method.
942  //
943  // Approaches 1 and 3 have the same critical path length.
944  // However, #3 moves more data. This is because the final
945  // result is just one integer, but #3 moves a whole array of
946  // results to all the processes. This is why we use Approach 1
947  // here.
948  //
949  // mfh 12 Apr 2013: See discussion in createFromSends() about
950  // how we could use this communication to propagate an error
951  // flag for "free" in a release build.
952 
953  const int root = 0; // rank of root process of the reduction
954  Array<int> numRecvsOnEachProc; // temp; only needed on root
955  if (myRank == root) {
956  numRecvsOnEachProc.resize (numProcs);
957  }
958  int numReceivesAsInt = 0; // output
959  reduce<int, int> (toProcsFromMe.getRawPtr (),
960  numRecvsOnEachProc.getRawPtr (),
961  numProcs, REDUCE_SUM, root, *comm_);
962  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
963  &numReceivesAsInt, 1, root, *comm_);
964  numReceives_ = static_cast<size_t> (numReceivesAsInt);
965  }
966 
967  // Now we know numReceives_, which is this process' number of
968  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
969  // with this number of entries.
970  lengthsFrom_.assign (numReceives_, 0);
971  procsFrom_.assign (numReceives_, 0);
972 
973  //
974  // Ask (via nonblocking receive) each process from which we are
975  // receiving how many packets we should expect from it in the
976  // communication pattern.
977  //
978 
979  // At this point, numReceives_ includes any self message that
980  // there may be. At the end of this routine, we'll subtract off
981  // the self message (if there is one) from numReceives_. In this
982  // routine, we don't need to receive a message from ourselves in
983  // order to figure out our lengthsFrom_ and source process ID; we
984  // can just ask ourselves directly. Thus, the actual number of
985  // nonblocking receives we post here does not include the self
986  // message.
987  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
988 
989  // Teuchos' wrapper for nonblocking receives requires receive
990  // buffers that it knows won't go away. This is why we use RCPs,
991  // one RCP per nonblocking receive request. They get allocated in
992  // the loop below.
993  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
994  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
995  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
996 
997  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
998  // (receive data from any process).
999 #ifdef HAVE_MPI
1000  const int anySourceProc = MPI_ANY_SOURCE;
1001 #else
1002  const int anySourceProc = -1;
1003 #endif
1004 
1005  if (verbose) {
1006  std::ostringstream os;
1007  os << myRank << ": computeReceives: Posting "
1008  << actualNumReceives << " irecvs" << endl;
1009  *out_ << os.str ();
1010  }
1011 
1012  // Post the (nonblocking) receives.
1013  for (size_t i = 0; i < actualNumReceives; ++i) {
1014  // Once the receive completes, we can ask the corresponding
1015  // CommStatus object (output by wait()) for the sending process'
1016  // ID (which we'll assign to procsFrom_[i] -- don't forget to
1017  // do that!).
1018  lengthsFromBuffers[i].resize (1);
1019  lengthsFromBuffers[i][0] = as<size_t> (0);
1020  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1021  if (verbose) {
1022  std::ostringstream os;
1023  os << myRank << ": computeReceives: "
1024  "Posted any-proc irecv w/ specified tag " << tag << endl;
1025  *out_ << os.str ();
1026  }
1027  }
1028 
1029  if (verbose) {
1030  std::ostringstream os;
1031  os << myRank << ": computeReceives: "
1032  "posting " << numSends_ << " sends" << endl;
1033  *out_ << os.str ();
1034  }
1035  // Post the sends: Tell each process to which we are sending how
1036  // many packets it should expect from us in the communication
1037  // pattern. We could use nonblocking sends here, as long as we do
1038  // a waitAll() on all the sends and receives at once.
1039  //
1040  // We assume that numSends_ and selfMessage_ have already been
1041  // set. The value of numSends_ (my process' number of sends) does
1042  // not include any message that it might send to itself.
1043  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1044  if (procsTo_[i] != myRank) {
1045  // Send a message to procsTo_[i], telling that process that
1046  // this communication pattern will send that process
1047  // lengthsTo_[i] blocks of packets.
1048  const size_t* const lengthsTo_i = &lengthsTo_[i];
1049  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1050  if (verbose) {
1051  std::ostringstream os;
1052  os << myRank << ": computeReceives: "
1053  "Posted send to Proc " << procsTo_[i] << " w/ specified tag "
1054  << tag << endl;
1055  *out_ << os.str ();
1056  }
1057  }
1058  else {
1059  // We don't need a send in the self-message case. If this
1060  // process will send a message to itself in the communication
1061  // pattern, then the last element of lengthsFrom_ and
1062  // procsFrom_ corresponds to the self-message. Of course
1063  // this process knows how long the message is, and the process
1064  // ID is its own process ID.
1065  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1066  procsFrom_[numReceives_-1] = myRank;
1067  }
1068  }
1069 
1070  if (verbose) {
1071  std::ostringstream os;
1072  os << myRank << ": computeReceives: waitAll on "
1073  << requests.size () << " requests" << endl;
1074  *out_ << os.str ();
1075  }
1076  //
1077  // Wait on all the receives. When they arrive, check the status
1078  // output of wait() for the receiving process ID, unpack the
1079  // request buffers into lengthsFrom_, and set procsFrom_ from the
1080  // status.
1081  //
1082  waitAll (*comm_, requests (), statuses ());
1083  for (size_t i = 0; i < actualNumReceives; ++i) {
1084  lengthsFrom_[i] = *lengthsFromBuffers[i];
1085  procsFrom_[i] = statuses[i]->getSourceRank ();
1086  }
1087 
1088  // Sort the procsFrom_ array, and apply the same permutation to
1089  // lengthsFrom_. This ensures that procsFrom_[i] and
1090  // lengthsFrom_[i] refers to the same thing.
1091  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1092 
1093  // Compute indicesFrom_
1094  totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1095  indicesFrom_.clear ();
1096  indicesFrom_.reserve (totalReceiveLength_);
1097  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1098  indicesFrom_.push_back(i);
1099  }
1100 
1101  startsFrom_.clear ();
1102  startsFrom_.reserve (numReceives_);
1103  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1104  startsFrom_.push_back(j);
1105  j += lengthsFrom_[i];
1106  }
1107 
1108  if (selfMessage_) {
1109  --numReceives_;
1110  }
1111 
1112  if (verbose) {
1113  std::ostringstream os;
1114  os << myRank << ": computeReceives: done" << endl;
1115  *out_ << os.str ();
1116  }
1117  }
1118 
1119  size_t
1120  Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportProcIDs)
1121  {
1122  using Teuchos::outArg;
1123  using Teuchos::REDUCE_MAX;
1124  using Teuchos::reduceAll;
1125  using std::endl;
1126 
1127  Teuchos::OSTab tab (out_);
1128  const size_t numExports = exportProcIDs.size();
1129  const int myProcID = comm_->getRank();
1130  const int numProcs = comm_->getSize();
1131  const bool verbose = Tpetra::Details::Behavior::verbose("Distributor");
1132  if (verbose) {
1133  std::ostringstream os;
1134  os << myProcID << ": createFromSends" << endl;
1135  *out_ << os.str ();
1136  }
1137 
1138  // exportProcIDs tells us the communication pattern for this
1139  // distributor. It dictates the way that the export data will be
1140  // interpreted in doPosts(). We want to perform at most one
1141  // send per process in doPosts; this is for two reasons:
1142  // * minimize latency / overhead in the comm routines (nice)
1143  // * match the number of receives and sends between processes
1144  // (necessary)
1145  //
1146  // Teuchos::Comm requires that the data for a send are contiguous
1147  // in a send buffer. Therefore, if the data in the send buffer
1148  // for doPosts() are not contiguous, they will need to be copied
1149  // into a contiguous buffer. The user has specified this
1150  // noncontiguous pattern and we can't do anything about it.
1151  // However, if they do not provide an efficient pattern, we will
1152  // warn them if one of the following compile-time options has been
1153  // set:
1154  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1155  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1156  //
1157  // If the data are contiguous, then we can post the sends in situ
1158  // (i.e., without needing to copy them into a send buffer).
1159  //
1160  // Determine contiguity. There are a number of ways to do this:
1161  // * If the export IDs are sorted, then all exports to a
1162  // particular proc must be contiguous. This is what Epetra does.
1163  // * If the export ID of the current export already has been
1164  // listed, then the previous listing should correspond to the
1165  // same export. This tests contiguity, but not sortedness.
1166  //
1167  // Both of these tests require O(n), where n is the number of
1168  // exports. However, the latter will positively identify a greater
1169  // portion of contiguous patterns. We use the latter method.
1170  //
1171  // Check to see if values are grouped by procs without gaps
1172  // If so, indices_to -> 0.
1173 
1174  // Set up data structures for quick traversal of arrays.
1175  // This contains the number of sends for each process ID.
1176  //
1177  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1178  // that create an array of length the number of processes in the
1179  // communicator (plus one). Given how this code uses this array,
1180  // it should be straightforward to replace it with a hash table or
1181  // some other more space-efficient data structure. In practice,
1182  // most of the entries of starts should be zero for a sufficiently
1183  // large process count, unless the communication pattern is dense.
1184  // Note that it's important to be able to iterate through keys (i
1185  // for which starts[i] is nonzero) in increasing order.
1186  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1187 
1188  // numActive is the number of sends that are not Null
1189  size_t numActive = 0;
1190  int needSendBuff = 0; // Boolean
1191 
1192 #ifdef HAVE_TPETRA_DEBUG
1193  int badID = -1; // only used in a debug build
1194 #endif // HAVE_TPETRA_DEBUG
1195  for (size_t i = 0; i < numExports; ++i) {
1196  const int exportID = exportProcIDs[i];
1197  if (exportID >= numProcs) {
1198 #ifdef HAVE_TPETRA_DEBUG
1199  badID = myProcID;
1200 #endif // HAVE_TPETRA_DEBUG
1201  break;
1202  }
1203  else if (exportID >= 0) {
1204  // exportID is a valid process ID. Increment the number of
1205  // messages this process will send to that process.
1206  ++starts[exportID];
1207 
1208  // If we're sending more than one message to process exportID,
1209  // then it is possible that the data are not contiguous.
1210  // Check by seeing if the previous process ID in the list
1211  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1212  // because if starts[exportID] > 1, then i must be > 1 (since
1213  // the starts array was filled with zeros initially).
1214 
1215  // null entries break continuity.
1216  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1217  if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1218  needSendBuff = 1;
1219  }
1220  ++numActive;
1221  }
1222  }
1223 
1224 #ifdef HAVE_TPETRA_DEBUG
1225  // Test whether any process in the communicator got an invalid
1226  // process ID. If badID != -1 on this process, then it equals
1227  // this process' rank. The max of all badID over all processes is
1228  // the max rank which has an invalid process ID.
1229  {
1230  int gbl_badID;
1231  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1232  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1233  Teuchos::typeName(*this) << "::createFromSends(): Process " << gbl_badID
1234  << ", perhaps among other processes, got a bad send process ID.");
1235  }
1236 #else
1237  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1238  // ignoring this information, we should think about how to pass it
1239  // along so that all the processes find out about it. In a
1240  // release build with efficiency warnings turned off, the next
1241  // collective communication happens in computeReceives(). We
1242  // could figure out how to encode the error flag in that
1243  // operation, for example by adding an extra entry to the
1244  // collective's output array that encodes the error condition (0
1245  // on all processes if no error, else 1 on any process with the
1246  // error, so that the sum will produce a nonzero value if any
1247  // process had an error). I'll defer this change for now and
1248  // recommend instead that people with troubles try a debug build.
1249 #endif // HAVE_TPETRA_DEBUG
1250 
1251 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1252  {
1253  int global_needSendBuff;
1254  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1255  outArg (global_needSendBuff));
1257  global_needSendBuff != 0, std::runtime_error,
1258  "::createFromSends: Grouping export IDs together by process rank often "
1259  "improves performance.");
1260  }
1261 #endif
1262 
1263  // Determine from the caller's data whether or not the current
1264  // process should send (a) message(s) to itself.
1265  if (starts[myProcID] != 0) {
1266  selfMessage_ = true;
1267  }
1268  else {
1269  selfMessage_ = false;
1270  }
1271 
1272 #ifdef HAVE_TEUCHOS_DEBUG
1273  bool index_neq_numActive = false;
1274  bool send_neq_numSends = false;
1275 #endif
1276  if (! needSendBuff) {
1277  // grouped by proc, no send buffer or indicesTo_ needed
1278  numSends_ = 0;
1279  // Count total number of sends, i.e., total number of procs to
1280  // which we are sending. This includes myself, if applicable.
1281  for (int i = 0; i < numProcs; ++i) {
1282  if (starts[i]) {
1283  ++numSends_;
1284  }
1285  }
1286 
1287  // Not only do we not need these, but we must clear them, as
1288  // empty status of indicesTo is a flag used later.
1289  indicesTo_.resize(0);
1290  // Size these to numSends_; note, at the moment, numSends_
1291  // includes self sends. Set their values to zeros.
1292  procsTo_.assign(numSends_,0);
1293  startsTo_.assign(numSends_,0);
1294  lengthsTo_.assign(numSends_,0);
1295 
1296  // set startsTo to the offset for each send (i.e., each proc ID)
1297  // set procsTo to the proc ID for each send
1298  // in interpreting this code, remember that we are assuming contiguity
1299  // that is why index skips through the ranks
1300  {
1301  size_t index = 0, procIndex = 0;
1302  for (size_t i = 0; i < numSends_; ++i) {
1303  while (exportProcIDs[procIndex] < 0) {
1304  ++procIndex; // skip all negative proc IDs
1305  }
1306  startsTo_[i] = procIndex;
1307  int procID = exportProcIDs[procIndex];
1308  procsTo_[i] = procID;
1309  index += starts[procID];
1310  procIndex += starts[procID];
1311  }
1312 #ifdef HAVE_TEUCHOS_DEBUG
1313  if (index != numActive) {
1314  index_neq_numActive = true;
1315  }
1316 #endif
1317  }
1318  // sort the startsTo and proc IDs together, in ascending order, according
1319  // to proc IDs
1320  if (numSends_ > 0) {
1321  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1322  }
1323  // compute the maximum send length
1324  maxSendLength_ = 0;
1325  for (size_t i = 0; i < numSends_; ++i) {
1326  int procID = procsTo_[i];
1327  lengthsTo_[i] = starts[procID];
1328  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1329  maxSendLength_ = lengthsTo_[i];
1330  }
1331  }
1332  }
1333  else {
1334  // not grouped by proc, need send buffer and indicesTo_
1335 
1336  // starts[i] is the number of sends to proc i
1337  // numActive equals number of sends total, \sum_i starts[i]
1338 
1339  // this loop starts at starts[1], so explicitly check starts[0]
1340  if (starts[0] == 0 ) {
1341  numSends_ = 0;
1342  }
1343  else {
1344  numSends_ = 1;
1345  }
1346  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1347  im1=starts.begin();
1348  i != starts.end(); ++i)
1349  {
1350  if (*i != 0) ++numSends_;
1351  *i += *im1;
1352  im1 = i;
1353  }
1354  // starts[i] now contains the number of exports to procs 0 through i
1355 
1356  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1357  i=starts.rbegin()+1;
1358  i != starts.rend(); ++i)
1359  {
1360  *ip1 = *i;
1361  ip1 = i;
1362  }
1363  starts[0] = 0;
1364  // starts[i] now contains the number of exports to procs 0 through
1365  // i-1, i.e., all procs before proc i
1366 
1367  indicesTo_.resize(numActive);
1368 
1369  for (size_t i = 0; i < numExports; ++i) {
1370  if (exportProcIDs[i] >= 0) {
1371  // record the offset to the sendBuffer for this export
1372  indicesTo_[starts[exportProcIDs[i]]] = i;
1373  // now increment the offset for this proc
1374  ++starts[exportProcIDs[i]];
1375  }
1376  }
1377  // our send buffer will contain the export data for each of the procs
1378  // we communicate with, in order by proc id
1379  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1380  // indicesTo now maps each export to the location in our send buffer
1381  // associated with the export
1382  // data for export i located at sendBuffer[indicesTo[i]]
1383  //
1384  // starts[i] once again contains the number of exports to
1385  // procs 0 through i
1386  for (int proc = numProcs-1; proc != 0; --proc) {
1387  starts[proc] = starts[proc-1];
1388  }
1389  starts.front() = 0;
1390  starts[numProcs] = numActive;
1391  //
1392  // starts[proc] once again contains the number of exports to
1393  // procs 0 through proc-1
1394  // i.e., the start of my data in the sendBuffer
1395 
1396  // this contains invalid data at procs we don't care about, that is okay
1397  procsTo_.resize(numSends_);
1398  startsTo_.resize(numSends_);
1399  lengthsTo_.resize(numSends_);
1400 
1401  // for each group of sends/exports, record the destination proc,
1402  // the length, and the offset for this send into the
1403  // send buffer (startsTo_)
1404  maxSendLength_ = 0;
1405  size_t snd = 0;
1406  for (int proc = 0; proc < numProcs; ++proc ) {
1407  if (starts[proc+1] != starts[proc]) {
1408  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1409  startsTo_[snd] = starts[proc];
1410  // record max length for all off-proc sends
1411  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1412  maxSendLength_ = lengthsTo_[snd];
1413  }
1414  procsTo_[snd] = proc;
1415  ++snd;
1416  }
1417  }
1418 #ifdef HAVE_TEUCHOS_DEBUG
1419  if (snd != numSends_) {
1420  send_neq_numSends = true;
1421  }
1422 #endif
1423  }
1424 #ifdef HAVE_TEUCHOS_DEBUG
1425  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1426  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1427  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1428  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1429 #endif
1430 
1431  if (selfMessage_) --numSends_;
1432 
1433  // Invert map to see what msgs are received and what length
1434  computeReceives();
1435 
1436  if (verbose) {
1437  std::ostringstream os;
1438  os << myProcID << ": createFromSends: done" << endl;
1439  *out_ << os.str ();
1440  }
1441 
1442  // createFromRecvs() calls createFromSends(), but will set
1443  // howInitialized_ again after calling createFromSends().
1444  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1445 
1446  return totalReceiveLength_;
1447  }
1448 
1449  void
1451  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1452  const Teuchos::ArrayView<const int>& remoteProcIDs)
1453  {
1454  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1455  // an entry for each GID. If the export/remoteProcIDs is taken from
1456  // the getProcs{From|To} lists that are extracted from a previous distributor,
1457  // it will generate a wrong answer, because those lists have a unique entry
1458  // for each processor id. A version of this with lengthsTo and lengthsFrom
1459  // should be made.
1460 
1461  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1462 
1463 
1464  int myProcID = comm_->getRank ();
1465  int numProcs = comm_->getSize();
1466 
1467  const size_t numExportIDs = exportProcIDs.size();
1468  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1469 
1470  size_t numActive = 0;
1471  int needSendBuff = 0; // Boolean
1472 
1473  for(size_t i = 0; i < numExportIDs; i++ )
1474  {
1475  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1476  needSendBuff = 1;
1477  if( exportProcIDs[i] >= 0 )
1478  {
1479  ++starts[ exportProcIDs[i] ];
1480  ++numActive;
1481  }
1482  }
1483 
1484  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1485 
1486  numSends_ = 0;
1487 
1488  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1489  {
1490  if (starts[0] == 0 ) {
1491  numSends_ = 0;
1492  }
1493  else {
1494  numSends_ = 1;
1495  }
1496  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1497  im1=starts.begin();
1498  i != starts.end(); ++i)
1499  {
1500  if (*i != 0) ++numSends_;
1501  *i += *im1;
1502  im1 = i;
1503  }
1504  // starts[i] now contains the number of exports to procs 0 through i
1505 
1506  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1507  i=starts.rbegin()+1;
1508  i != starts.rend(); ++i)
1509  {
1510  *ip1 = *i;
1511  ip1 = i;
1512  }
1513  starts[0] = 0;
1514  // starts[i] now contains the number of exports to procs 0 through
1515  // i-1, i.e., all procs before proc i
1516 
1517  indicesTo_.resize(numActive);
1518 
1519  for (size_t i = 0; i < numExportIDs; ++i) {
1520  if (exportProcIDs[i] >= 0) {
1521  // record the offset to the sendBuffer for this export
1522  indicesTo_[starts[exportProcIDs[i]]] = i;
1523  // now increment the offset for this proc
1524  ++starts[exportProcIDs[i]];
1525  }
1526  }
1527  for (int proc = numProcs-1; proc != 0; --proc) {
1528  starts[proc] = starts[proc-1];
1529  }
1530  starts.front() = 0;
1531  starts[numProcs] = numActive;
1532  procsTo_.resize(numSends_);
1533  startsTo_.resize(numSends_);
1534  lengthsTo_.resize(numSends_);
1535  maxSendLength_ = 0;
1536  size_t snd = 0;
1537  for (int proc = 0; proc < numProcs; ++proc ) {
1538  if (starts[proc+1] != starts[proc]) {
1539  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1540  startsTo_[snd] = starts[proc];
1541  // record max length for all off-proc sends
1542  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1543  maxSendLength_ = lengthsTo_[snd];
1544  }
1545  procsTo_[snd] = proc;
1546  ++snd;
1547  }
1548  }
1549  }
1550  else {
1551  // grouped by proc, no send buffer or indicesTo_ needed
1552  numSends_ = 0;
1553  // Count total number of sends, i.e., total number of procs to
1554  // which we are sending. This includes myself, if applicable.
1555  for (int i = 0; i < numProcs; ++i) {
1556  if (starts[i]) {
1557  ++numSends_;
1558  }
1559  }
1560 
1561  // Not only do we not need these, but we must clear them, as
1562  // empty status of indicesTo is a flag used later.
1563  indicesTo_.resize(0);
1564  // Size these to numSends_; note, at the moment, numSends_
1565  // includes self sends. Set their values to zeros.
1566  procsTo_.assign(numSends_,0);
1567  startsTo_.assign(numSends_,0);
1568  lengthsTo_.assign(numSends_,0);
1569 
1570  // set startsTo to the offset for each send (i.e., each proc ID)
1571  // set procsTo to the proc ID for each send
1572  // in interpreting this code, remember that we are assuming contiguity
1573  // that is why index skips through the ranks
1574  {
1575  size_t index = 0, procIndex = 0;
1576  for (size_t i = 0; i < numSends_; ++i) {
1577  while (exportProcIDs[procIndex] < 0) {
1578  ++procIndex; // skip all negative proc IDs
1579  }
1580  startsTo_[i] = procIndex;
1581  int procID = exportProcIDs[procIndex];
1582  procsTo_[i] = procID;
1583  index += starts[procID];
1584  procIndex += starts[procID];
1585  }
1586  }
1587  // sort the startsTo and proc IDs together, in ascending order, according
1588  // to proc IDs
1589  if (numSends_ > 0) {
1590  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1591  }
1592  // compute the maximum send length
1593  maxSendLength_ = 0;
1594  for (size_t i = 0; i < numSends_; ++i) {
1595  int procID = procsTo_[i];
1596  lengthsTo_[i] = starts[procID];
1597  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1598  maxSendLength_ = lengthsTo_[i];
1599  }
1600  }
1601  }
1602 
1603 
1604  numSends_ -= selfMessage_;
1605  std::vector<int> recv_list;
1606  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1607 
1608  int last_pid=-2;
1609  for(int i=0; i<remoteProcIDs.size(); i++) {
1610  if(remoteProcIDs[i]>last_pid) {
1611  recv_list.push_back(remoteProcIDs[i]);
1612  last_pid = remoteProcIDs[i];
1613  }
1614  else if (remoteProcIDs[i]<last_pid)
1615  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1616  }
1617  numReceives_ = recv_list.size();
1618  if(numReceives_) {
1619  procsFrom_.assign(numReceives_,0);
1620  lengthsFrom_.assign(numReceives_,0);
1621  indicesFrom_.assign(numReceives_,0);
1622  startsFrom_.assign(numReceives_,0);
1623  }
1624  for(size_t i=0,j=0; i<numReceives_; ++i) {
1625  int jlast=j;
1626  procsFrom_[i] = recv_list[i];
1627  startsFrom_[i] = j;
1628  for( ; j<(size_t)remoteProcIDs.size() &&
1629  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1630  lengthsFrom_[i] = j-jlast;
1631  }
1632  totalReceiveLength_ = remoteProcIDs.size();
1633  indicesFrom_.clear ();
1634  indicesFrom_.reserve (totalReceiveLength_);
1635  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1636  indicesFrom_.push_back(i);
1637  }
1638 
1639  numReceives_-=selfMessage_;
1640  }
1641 
1642 } // namespace Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
static bool verbose()
Whether Tpetra is in verbose mode.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.