Peano
Loading...
Searching...
No Matches
Rank.cpp
Go to the documentation of this file.
1#include "tarch/Assertions.h"
2#include "tarch/tarch.h"
4
5#include <sstream>
6#include <cstdlib>
7#include <chrono>
8
9#include "Rank.h"
12
17
22#ifdef CompilerHasUTSName
23#include <sys/utsname.h>
24#endif
25
29
31
33 if (tag==_tagCounter-1) {
35 }
36}
37
38
39int tarch::mpi::Rank::reserveFreeTag([[maybe_unused]] const std::string& fullQualifiedMessageName, [[maybe_unused]] int numberOfTags) {
40 assertion2(numberOfTags>=1,fullQualifiedMessageName,numberOfTags);
41 const int result = _tagCounter;
42 _tagCounter += numberOfTags;
43
44 // I protect the tag manually (not via log filter), as many tags are actually
45 // grabbed before most applications initialise their log filters properly.
46 //
47 // We may not use isGlobalMaster() as this query checks whether the code is
48 // properly initialised. Please note rank is -1 as long as MPI is not properly
49 // initialised, i.e. any tag booking prior to the MPI initialisation is not
50 // logged properly.
51 //
52 // We also may not use the instance, as the instance might not be up yet. I
53 // also saw other stuff with cout, so better ignore it
54 /*
55 #if PeanoDebug>0
56 if ( getInstance()._rank==getGlobalMasterRank() ) {
57 std::cout << "assigned message " << fullQualifiedMessageName
58 << " the free tag " << result << " (" << numberOfTags << " consecutive tags reserved)" << std::endl;
59 }
60 #endif
61 */
62
63 validateMaxTagIsSupported();
64
65 return result;
66}
67
68
70 return _initIsCalled;
71}
72
73
74void tarch::mpi::Rank::ensureThatMessageQueuesAreEmpty( [[maybe_unused]] int fromRank, [[maybe_unused]] int tag ) {
75 #ifdef Parallel
76 int flag;
77 MPI_Iprobe(fromRank, tag, _communicator, &flag, MPI_STATUS_IGNORE);
78 if (flag!=0) {
79 plotMessageQueues();
80 }
81 assertion3( flag==0, fromRank, tag, getRank() );
82 #endif
83}
84
85
87 #ifdef Parallel
88 int flag;
89 MPI_Status status;
90 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, _communicator, &flag, &status);
91 if (flag==0) {
92 logError("plotMessageQueues()", "there are no messages from any sender in MPI queue");
93 }
94 else {
96 "plotMessageQueues()",
97 "there is still a message in queue "
98 " from rank " << status.MPI_SOURCE <<
99 " with tag " << status.MPI_TAG
100 );
101 }
102 #endif
103}
104
105
107 return _areTimeoutsEnabled
108 and
109 _timeOutWarning>std::chrono::seconds(0)
110 and
111 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
112}
113
114
116 return _areTimeoutsEnabled
117 and
118 _deadlockTimeOut>std::chrono::seconds(0)
119 and
120 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
121}
122
123
125 const std::string& className,
126 const std::string& methodName,
127 int communicationPartnerRank,
128 int tag,
129 int numberOfExpectedMessages,
130 const std::string& comment
131) {
132 if ( exceededDeadlockThreshold() ) {
133 std::ostringstream out;
134 out << "operation " << className << "::" << methodName << " on node "
135 << getRank() << " had to wait more than " << std::to_string(_deadlockTimeOut.count())
136 << " seconds for " << numberOfExpectedMessages
137 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
138 << ". Timeout. " << comment;
139 logError( "triggerDeadlockTimeOut(...)", out.str() );
140
141 plotMessageQueues();
142
143 abort(DEADLOCK_EXIT_CODE);
144 }
145}
146
147
149 const std::string& className,
150 const std::string& methodName,
151 int communicationPartnerRank,
152 int tag,
153 int numberOfExpectedMessages
154) {
155 if ( exceededTimeOutWarningThreshold() ) {
157 "writeTimeOutWarning(...)",
158 "operation " << className << "::" << methodName << " on node "
159 << getRank() << " had to wait more than " << std::to_string(_timeOutWarning.count())
160 << " seconds for " << numberOfExpectedMessages
161 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
162 );
163
164 if ( _deadlockTimeOut.count()>0 ) {
166 "writeTimeOutWarning(...)",
167 "application will terminate after " << std::to_string(_deadlockTimeOut.count()) << " seconds because of a deadlock"
168 );
169 } else {
171 "writeTimeOutWarning(...)",
172 "deadlock detection switched off as deadlock time out is set to " << _deadlockTimeOut.count() << ", i.e. you will continue to get warnings, but code will not shut down"
173 );
174 }
175
176 if (
177 _timeOutWarning<_deadlockTimeOut/2
178 or
179 _deadlockTimeOut.count()<=0
180 ) {
181 std::chrono::seconds newTimeOutWarning = _timeOutWarning*2;
183 "writeTimeOutWarning(...)",
184 "increase time out warning threshold from " << std::to_string(_timeOutWarning.count()) << " seconds to " << std::to_string(newTimeOutWarning.count()) << " seconds to avoid flood of warning messages"
185 );
186 _timeOutWarning = newTimeOutWarning;
187 _globalTimeOutWarning = _globalTimeOutWarning+_timeOutWarning;
188 }
189 }
190}
191
192
194 _globalTimeOutWarning = std::chrono::system_clock::now() + _timeOutWarning;
195}
196
197
199 _globalTimeOutDeadlock = std::chrono::system_clock::now() + _deadlockTimeOut;
200}
201
202
203void tarch::mpi::Rank::suspendTimeouts( bool timeoutsDisabled ) {
204 _areTimeoutsEnabled = !timeoutsDisabled;
205}
206
207
208#ifdef Parallel
209std::string tarch::mpi::MPIReturnValueToString( int result ) {
210 std::ostringstream out;
211
212 int resultlen;
213 char* string = new char[MPI_MAX_ERROR_STRING]; // (char *)malloc(MPI_MAX_ERROR_STRING * sizeof(char));
214 MPI_Error_string(result, string, &resultlen);
215
216 int errorclass;
217 MPI_Error_class(result, &errorclass);
218
219 out << "mpi error class: " << errorclass << "="
220 << ", mpi error text: " << string;
221
222 switch ( errorclass ) {
223 case MPI_SUCCESS: out << "MPI_SUCCESS [no error]"; break;
224 case MPI_ERR_BUFFER: out << "MPI_ERR_BUFFER [invalid buffer pointer]"; break;
225 case MPI_ERR_COUNT: out << "MPI_ERR_COUNT [invalid count argument]"; break;
226 case MPI_ERR_TYPE: out << "MPI_ERR_TYPE [invalid datatype]"; break;
227 case MPI_ERR_TAG: out << "MPI_ERR_TAG [invalid tag]"; break;
228 case MPI_ERR_COMM: out << "MPI_ERR_COMM [invalid communicator]"; break;
229 case MPI_ERR_RANK: out << "MPI_ERR_RANK [invalid rank]"; break;
230 case MPI_ERR_REQUEST: out << "MPI_ERR_REQUEST [invalid request handle]"; break;
231 case MPI_ERR_ROOT: out << "MPI_ERR_ROOT [invalid root argument]"; break;
232 case MPI_ERR_GROUP: out << "MPI_ERR_GROUP [invalid group]"; break;
233 case MPI_ERR_OP: out << "MPI_ERR_OP [invalid operation]"; break;
234 case MPI_ERR_TOPOLOGY: out << "MPI_ERR_TOPOLOGY [invalid topology]"; break;
235 case MPI_ERR_DIMS: out << "MPI_ERR_DIMS [invalid dimensions]"; break;
236 case MPI_ERR_ARG: out << "MPI_ERR_ARG [invalid argument]"; break;
237 case MPI_ERR_UNKNOWN: out << "MPI_ERR_UNKNOWN [unknown error]"; break;
238 case MPI_ERR_TRUNCATE: out << "MPI_ERR_TRUNCATE [message has been truncated by receiver]"; break;
239 case MPI_ERR_OTHER: out << "MPI_ERR_OTHER [other unknown error]"; break;
240 case MPI_ERR_INTERN: out << "MPI_ERR_INTERN [internal mpi error]"; break;
241 default: out << "unknown"; break;
242 }
243
244 delete[] string;
245
246 return out.str();
247}
248
249
250std::string tarch::mpi::MPIStatusToString( const MPI_Status& status ) {
251 std::ostringstream out;
252 out << "status flag:"
253 << " MPI_ERROR=" << status.MPI_ERROR
254 << " (" << MPIReturnValueToString(status.MPI_ERROR)
255 << ") ,MPI_SOURCE=" << status.MPI_SOURCE
256 << ",MPI_TAG=" << status.MPI_TAG;
257 return out.str();
258}
259#endif
260
261
262#ifdef Parallel
264 _initIsCalled(false),
265 _rank(-1),
266 _numberOfProcessors(-1),
267 _communicator( MPI_COMM_WORLD),
268 _timeOutWarning(0),
269 _deadlockTimeOut(0),
270 _areTimeoutsEnabled(true) {}
271#else
273 _initIsCalled(false),
274 _rank(0),
275 _numberOfProcessors(1),
276 _timeOutWarning(0),
277 _deadlockTimeOut(0),
278 _areTimeoutsEnabled(true) {}
279#endif
280
282
283#ifdef Parallel
285 const void *sendbuf, void *recvbuf, int count,
286 MPI_Datatype datatype,
287 MPI_Op op,
288 std::function<void()> waitor
289) {
290 logTraceIn( "allReduce()" );
291
292// if (getNumberOfRanks()>1) {
293 MPI_Request* request = new MPI_Request();
294 MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, getCommunicator(), request );
295
298
299 int flag = 0;
300 while (not flag) {
301 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "allReduce()", -1, -1 );
302 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "allReduce()", -1, -1 );
303
304 waitor();
305
306 // leads to deadlock/starve situations
307 //tarch::multicore::yield();
308 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
309 }
310
311 delete request;
312// }
313
314 logTraceOut( "allReduce()" );
315}
316
317
319 const void *sendbuf, void *recvbuf, int count,
320 MPI_Datatype datatype,
321 MPI_Op op, int root,
322 std::function<void()> waitor
323) {
324 logTraceIn( "reduce()" );
325
326// if (getNumberOfRanks()>1) {
327 MPI_Request* request = new MPI_Request();
328 MPI_Ireduce(sendbuf, recvbuf, count, datatype, op, root, getCommunicator(), request );
329
332 int flag = 0;
333 while (not flag) {
334 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "reduce()", -1, -1 );
335 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "reduce()", -1, -1 );
336
337 waitor();
338
339 // leads to deadlock/starve situations
340 //tarch::multicore::yield();
341 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
342 }
343
344 delete request;
345// }
346
347 logTraceOut( "reduce()" );
348}
349#endif
350
351
352void tarch::mpi::Rank::barrier([[maybe_unused]] std::function<void()> waitor) {
353 #ifdef Parallel
354 logTraceIn( "barrier()" );
355
356 if (getNumberOfRanks()>1) {
357 MPI_Request* request = new MPI_Request();
358 MPI_Ibarrier( getCommunicator(), request );
359
362 int flag = 0;
363 while (not flag) {
364 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "barrier()", -1, -1 );
365 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "barrier()", -1, -1 );
366
367 waitor();
368
369 // leads to deadlock/starve situations
370 //tarch::multicore::yield();
371 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
372 }
373
374 delete request;
375 }
376
377 logTraceOut( "barrier()" );
378 #endif
379}
380
381
382bool tarch::mpi::Rank::isMessageInQueue([[maybe_unused]] int tag) const {
383 #ifdef Parallel
384 int flag = 0;
385 MPI_Iprobe(
386 MPI_ANY_SOURCE, tag,
387 getCommunicator(), &flag, MPI_STATUS_IGNORE
388 );
389 return flag;
390 #else
391 return false;
392 #endif
393}
394
396 logTraceIn( "shutdown()" );
397 #ifdef Parallel
398 assertion( _rank!=-1 );
399
401
402 int errorCode = MPI_Finalize();
403 if (errorCode) {
404 logError( "shutdown()", MPIReturnValueToString(errorCode) );
405 }
406
407 _communicator = MPI_COMM_WORLD;
408 #endif
409
410 _rank = -1;
411 logTraceOut( "shutdown()" );
412}
413
414
416 return 0;
417}
418
419
421 #ifdef Parallel
422 assertion(_initIsCalled);
423 return getRank() == getGlobalMasterRank();
424 #else
425 return true;
426 #endif
427}
428
429
431 std::ostringstream statusMessage;
432 statusMessage << "MPI status:";
433
434 #ifdef CompilerHasUTSName
435 utsname* utsdata = new utsname();
436 assertion( utsdata!=NULL );
437 uname(utsdata);
438 statusMessage << " nodename=" << utsdata->nodename;
439 delete utsdata;
440 #else
441 statusMessage << " nodename=undef";
442 #endif
443
444 statusMessage << ", rank=" << _rank;
445 #ifdef Parallel
446 statusMessage << ", communicator=" << _communicator;
447 #endif
448 statusMessage << ", #processors=" << _numberOfProcessors;
449
450 logInfo( "logStatus()", statusMessage.str() );
451}
452
453
455 if (
456 _maxTags <= 0
457 or
458 _tagCounter < _maxTags
459 ) {
460 return true;
461 }
462 else {
463 logWarning( "validateMaxTagIsSupported()", "maximum tag value is " << _maxTags << " though we would need " << _tagCounter << " tags. Code will likely crash" );
464 return false;
465 }
466}
467
468#ifdef UseTargetDART
469static void tdTextPointer() {}
470#endif
471
472bool tarch::mpi::Rank::init([[maybe_unused]] int* argc, [[maybe_unused]] char*** argv) {
473 #ifdef Parallel
474 int result = MPI_SUCCESS;
475
476 #if defined( SharedMemoryParallelisation )
477 int initThreadProvidedThreadLevelSupport;
478 result = MPI_Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &initThreadProvidedThreadLevelSupport );
479 if (initThreadProvidedThreadLevelSupport!=MPI_THREAD_MULTIPLE ) {
480 std::cerr << "warning: MPI implementation does not support MPI_THREAD_MULTIPLE. Support multithreading level is "
481 << initThreadProvidedThreadLevelSupport << " instead of " << MPI_THREAD_MULTIPLE
482 << ". Disable MultipleThreadsMayTriggerMPICalls in the compiler-specific settings or via -DnoMultipleThreadsMayTriggerMPICalls."<< std::endl;
483 }
484 #else
485 result = MPI_Init( argc, argv );
486 #endif
487
488#ifdef UseTargetDART
489 td_init(reinterpret_cast<void*>(&tdTextPointer));
490#endif
491
492 if (result!=MPI_SUCCESS) {
493 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
494 return false;
495 }
496
497 result = MPI_Comm_size( MPI_COMM_WORLD, &_numberOfProcessors );
498 if (result!=MPI_SUCCESS) {
499 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
500 return false;
501 }
502
503 result = MPI_Comm_rank( MPI_COMM_WORLD, &_rank );
504 if (result!=MPI_SUCCESS) {
505 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
506 return false;
507 }
508
509 int answerFlag;
510 void* rawMaxTag;
511 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &rawMaxTag, &answerFlag);
512 if (answerFlag) {
513 _maxTags = *(int*)rawMaxTag;
514 }
515 else {
516 std::cerr << "init(int*,char***)\t was not able to query what the maximum tag value is" << std::endl;
517 return false;
518 }
519
522 #endif
523
524 _initIsCalled = true;
525 return _initIsCalled;
526}
527
528
530/*
531 #ifdef Parallel
532 assertion(_initIsCalled);
533 #endif
534*/
535 return _rank;
536}
537
538
540 return _singleton;
541}
542
543
544#ifdef Parallel
546 assertion(_initIsCalled);
547 return _communicator;
548}
549#endif
550
551
553 #ifdef Parallel
554 assertion(_initIsCalled);
555 #endif
556 return _numberOfProcessors;
557}
558
559
561 assertion( value>=0 );
562 _timeOutWarning = std::chrono::seconds(value);
563}
564
565
567 assertion( value>=0 );
568 _deadlockTimeOut = std::chrono::seconds(value);
569 if (value==0) {
570 logInfo( "setDeadlockTimeOut(int)", "set deadlock timeout to " << _deadlockTimeOut.count() << " and hence disabled timeout checks" );
571 }
572}
573
574
575#ifdef Parallel
576void tarch::mpi::Rank::setCommunicator( MPI_Comm communicator, [[maybe_unused]] bool recomputeRankAndWorld ) {
577 _communicator = communicator;
578
579 int result = MPI_Comm_size( _communicator, &_numberOfProcessors );
580 if (result!=MPI_SUCCESS) {
581 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
582 }
583
584 result = MPI_Comm_rank( _communicator, &_rank );
585 if (result!=MPI_SUCCESS) {
586 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
587 }
588}
589#endif
590
591
592void tarch::mpi::Rank::abort([[maybe_unused]] int errorCode) {
593 std::cout.flush();
594 std::cerr.flush();
595 #ifdef Parallel
596 MPI_Abort(MPI_COMM_WORLD,errorCode);
597 #else
598 std::abort();
599 #endif
600}
#define assertion2(expr, param0, param1)
#define assertion3(expr, param0, param1, param2)
#define assertion(expr)
#define logError(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:464
#define logTraceOut(methodName)
Definition Log.h:379
#define logWarning(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:440
#define logTraceIn(methodName)
Definition Log.h:369
#define logInfo(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:411
Log Device.
Definition Log.h:516
Represents a program instance within a cluster.
Definition Rank.h:58
void setCommunicator(MPI_Comm communicator, bool recomputeRankAndWorld=true)
Set communicator to be used by Peano.
Definition Rank.cpp:576
Rank()
The standard constructor assignes the attributes default values and checks whether the program is com...
Definition Rank.cpp:263
void setDeadlockTimeOut(int valueInSeconds)
Set deadlock time out.
Definition Rank.cpp:566
static int _tagCounter
Count the tags that have already been handed out.
Definition Rank.h:140
static int getGlobalMasterRank()
Get the global master.
Definition Rank.cpp:415
int getNumberOfRanks() const
Definition Rank.cpp:552
bool isInitialised() const
Definition Rank.cpp:69
void triggerDeadlockTimeOut(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1, const std::string &comment="")
Triggers a time out and shuts down the cluster if a timeout is violated.
Definition Rank.cpp:124
bool isGlobalMaster() const
Is this node the global master process, i.e.
Definition Rank.cpp:420
void setDeadlockWarningTimeStamp()
Memorise global timeout.
Definition Rank.cpp:193
bool exceededTimeOutWarningThreshold() const
Definition Rank.cpp:106
static int _maxTags
Set by init() and actually stores the number of valid tags.
Definition Rank.h:135
void writeTimeOutWarning(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1)
Writes a warning if relevant.
Definition Rank.cpp:148
void setDeadlockTimeOutTimeStamp()
Definition Rank.cpp:198
void ensureThatMessageQueuesAreEmpty(int fromRank, int tag)
Ensure that there are no messages anymore from the specified rank.
Definition Rank.cpp:74
void reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:318
static Rank & getInstance()
This operation returns the singleton instance.
Definition Rank.cpp:539
bool isMessageInQueue(int tag) const
In older DaStGen version, I tried to find out whether a particular message type is in the MPI queue.
Definition Rank.cpp:382
void setTimeOutWarning(int valueInSeconds)
Set time out warning.
Definition Rank.cpp:560
int getRank() const
Return rank of this node.
Definition Rank.cpp:529
bool init(int *argc, char ***argv)
This operation initializes the MPI environment and the program instance.
Definition Rank.cpp:472
void allReduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:284
void suspendTimeouts(bool timeoutsDisabled)
Definition Rank.cpp:203
static tarch::logging::Log _log
Logging device.
Definition Rank.h:66
void barrier(std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:352
void logStatus() const
Logs the status of the process onto the log device.
Definition Rank.cpp:430
static bool validateMaxTagIsSupported()
Just try to find out if a tag is actually supported.
Definition Rank.cpp:454
static int reserveFreeTag(const std::string &fullQualifiedMessageName, int numberOfTags=1)
Return a Free Tag.
Definition Rank.cpp:39
void plotMessageQueues()
Definition Rank.cpp:86
virtual ~Rank()
The standard destructor calls MPI_Finalize().
Definition Rank.cpp:281
static void releaseTag(int tag)
Definition Rank.cpp:32
bool exceededDeadlockThreshold() const
Definition Rank.cpp:115
void shutdown()
Shuts down the application.
Definition Rank.cpp:395
static void abort(int errorCode)
A proper abort in an MPI context has to use MPI_Abort.
Definition Rank.cpp:592
MPI_Comm getCommunicator() const
Definition Rank.cpp:545
static Rank _singleton
Definition Rank.h:68
std::string MPIStatusToString(const MPI_Status &status)
Returns a string representation of the mpi status.
Definition Rank.cpp:250
std::string MPIReturnValueToString(int result)
Definition Rank.cpp:209
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.
static void shutdownDatatype()
Free the underlying MPI datatype.
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.