Peano 4
Loading...
Searching...
No Matches
Rank.cpp
Go to the documentation of this file.
1#include "tarch/Assertions.h"
2#include "tarch/tarch.h"
4
5#include <sstream>
6#include <cstdlib>
7#include <chrono>
8
9#include "Rank.h"
12
17
22#ifdef CompilerHasUTSName
23#include <sys/utsname.h>
24#endif
25
29
31
33 if (tag==_tagCounter-1) {
35 }
36}
37
38
39int tarch::mpi::Rank::reserveFreeTag([[maybe_unused]] const std::string& fullQualifiedMessageName, [[maybe_unused]] int numberOfTags) {
40 assertion2(numberOfTags>=1,fullQualifiedMessageName,numberOfTags);
41 const int result = _tagCounter;
42 _tagCounter += numberOfTags;
43
44 // I protect the tag manually (not via log filter), as many tags are actually
45 // grabbed before most applications initialise their log filters properly.
46 //
47 // We may not use isGlobalMaster() as this query checks whether the code is
48 // properly initialised. Please note rank is -1 as long as MPI is not properly
49 // initialised, i.e. any tag booking prior to the MPI initialisation is not
50 // logged properly.
51 #if PeanoDebug>0
52 if ( getInstance()._rank==getGlobalMasterRank() ) {
53 std::cout << "rank " << getInstance()._rank << ": assigned message " << fullQualifiedMessageName
54 << " the free tag " << result << " (" << numberOfTags << " consecutive tags reserved)" << std::endl;
55 }
56 #endif
57
58 validateMaxTagIsSupported();
59
60 return result;
61}
62
63
65 return _initIsCalled;
66}
67
68
69void tarch::mpi::Rank::ensureThatMessageQueuesAreEmpty( [[maybe_unused]] int fromRank, [[maybe_unused]] int tag ) {
70 #ifdef Parallel
71 int flag;
72 MPI_Iprobe(fromRank, tag, _communicator, &flag, MPI_STATUS_IGNORE);
73 if (flag!=0) {
74 plotMessageQueues();
75 }
76 assertion3( flag==0, fromRank, tag, getRank() );
77 #endif
78}
79
80
82 #ifdef Parallel
83 int flag;
84 MPI_Status status;
85 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, _communicator, &flag, &status);
86 if (flag==0) {
87 logError("plotMessageQueues()", "there are no messages from any sender in MPI queue");
88 }
89 else {
91 "plotMessageQueues()",
92 "there is still a message in queue "
93 " from rank " << status.MPI_SOURCE <<
94 " with tag " << status.MPI_TAG
95 );
96 }
97 #endif
98}
99
100
102 return _areTimeoutsEnabled
103 and
104 _timeOutWarning>std::chrono::seconds(0)
105 and
106 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
107}
108
109
111 return _areTimeoutsEnabled
112 and
113 _deadlockTimeOut>std::chrono::seconds(0)
114 and
115 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
116}
117
118
120 const std::string& className,
121 const std::string& methodName,
122 int communicationPartnerRank,
123 int tag,
124 int numberOfExpectedMessages,
125 const std::string& comment
126) {
127 if ( exceededDeadlockThreshold() ) {
128 std::ostringstream out;
129 out << "operation " << className << "::" << methodName << " on node "
130 << getRank() << " had to wait more than " << std::to_string(_deadlockTimeOut.count())
131 << " seconds for " << numberOfExpectedMessages
132 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
133 << ". Timeout. " << comment;
134 logError( "triggerDeadlockTimeOut(...)", out.str() );
135
136 plotMessageQueues();
137
138 abort(DEADLOCK_EXIT_CODE);
139 }
140}
141
142
144 const std::string& className,
145 const std::string& methodName,
146 int communicationPartnerRank,
147 int tag,
148 int numberOfExpectedMessages
149) {
150 if ( exceededTimeOutWarningThreshold() ) {
152 "writeTimeOutWarning(...)",
153 "operation " << className << "::" << methodName << " on node "
154 << getRank() << " had to wait more than " << std::to_string(_timeOutWarning.count())
155 << " seconds for " << numberOfExpectedMessages
156 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
157 );
158
159 if ( _deadlockTimeOut.count()>0 ) {
161 "writeTimeOutWarning(...)",
162 "application will terminate after " << std::to_string(_deadlockTimeOut.count()) << " seconds because of a deadlock"
163 );
164 } else {
166 "writeTimeOutWarning(...)",
167 "deadlock detection switched off as deadlock time out is set to " << _deadlockTimeOut.count() << ", i.e. you will continue to get warnings, but code will not shut down"
168 );
169 }
170
171 if (
172 _timeOutWarning<_deadlockTimeOut/2
173 or
174 _deadlockTimeOut.count()<=0
175 ) {
176 std::chrono::seconds newTimeOutWarning = _timeOutWarning*2;
178 "writeTimeOutWarning(...)",
179 "increase time out warning threshold from " << std::to_string(_timeOutWarning.count()) << " seconds to " << std::to_string(newTimeOutWarning.count()) << " seconds to avoid flood of warning messages"
180 );
181 _timeOutWarning = newTimeOutWarning;
182 _globalTimeOutWarning = _globalTimeOutWarning+_timeOutWarning;
183 }
184 }
185}
186
187
189 _globalTimeOutWarning = std::chrono::system_clock::now() + _timeOutWarning;
190}
191
192
194 _globalTimeOutDeadlock = std::chrono::system_clock::now() + _deadlockTimeOut;
195}
196
197
198void tarch::mpi::Rank::suspendTimeouts( bool timeoutsDisabled ) {
199 _areTimeoutsEnabled = !timeoutsDisabled;
200}
201
202
203#ifdef Parallel
204std::string tarch::mpi::MPIReturnValueToString( int result ) {
205 std::ostringstream out;
206
207 int resultlen;
208 char* string = new char[MPI_MAX_ERROR_STRING]; // (char *)malloc(MPI_MAX_ERROR_STRING * sizeof(char));
209 MPI_Error_string(result, string, &resultlen);
210
211 int errorclass;
212 MPI_Error_class(result, &errorclass);
213
214 out << "mpi error class: " << errorclass << "="
215 << ", mpi error text: " << string;
216
217 switch ( errorclass ) {
218 case MPI_SUCCESS: out << "MPI_SUCCESS [no error]"; break;
219 case MPI_ERR_BUFFER: out << "MPI_ERR_BUFFER [invalid buffer pointer]"; break;
220 case MPI_ERR_COUNT: out << "MPI_ERR_COUNT [invalid count argument]"; break;
221 case MPI_ERR_TYPE: out << "MPI_ERR_TYPE [invalid datatype]"; break;
222 case MPI_ERR_TAG: out << "MPI_ERR_TAG [invalid tag]"; break;
223 case MPI_ERR_COMM: out << "MPI_ERR_COMM [invalid communicator]"; break;
224 case MPI_ERR_RANK: out << "MPI_ERR_RANK [invalid rank]"; break;
225 case MPI_ERR_REQUEST: out << "MPI_ERR_REQUEST [invalid request handle]"; break;
226 case MPI_ERR_ROOT: out << "MPI_ERR_ROOT [invalid root argument]"; break;
227 case MPI_ERR_GROUP: out << "MPI_ERR_GROUP [invalid group]"; break;
228 case MPI_ERR_OP: out << "MPI_ERR_OP [invalid operation]"; break;
229 case MPI_ERR_TOPOLOGY: out << "MPI_ERR_TOPOLOGY [invalid topology]"; break;
230 case MPI_ERR_DIMS: out << "MPI_ERR_DIMS [invalid dimensions]"; break;
231 case MPI_ERR_ARG: out << "MPI_ERR_ARG [invalid argument]"; break;
232 case MPI_ERR_UNKNOWN: out << "MPI_ERR_UNKNOWN [unknown error]"; break;
233 case MPI_ERR_TRUNCATE: out << "MPI_ERR_TRUNCATE [message has been truncated by receiver]"; break;
234 case MPI_ERR_OTHER: out << "MPI_ERR_OTHER [other unknown error]"; break;
235 case MPI_ERR_INTERN: out << "MPI_ERR_INTERN [internal mpi error]"; break;
236 default: out << "unknown"; break;
237 }
238
239 delete[] string;
240
241 return out.str();
242}
243
244
245std::string tarch::mpi::MPIStatusToString( const MPI_Status& status ) {
246 std::ostringstream out;
247 out << "status flag:"
248 << " MPI_ERROR=" << status.MPI_ERROR
249 << " (" << MPIReturnValueToString(status.MPI_ERROR)
250 << ") ,MPI_SOURCE=" << status.MPI_SOURCE
251 << ",MPI_TAG=" << status.MPI_TAG;
252 return out.str();
253}
254#endif
255
256
257#ifdef Parallel
259 _initIsCalled(false),
260 _rank(-1),
261 _numberOfProcessors(-1),
262 _communicator( MPI_COMM_WORLD),
263 _timeOutWarning(0),
264 _deadlockTimeOut(0),
265 _areTimeoutsEnabled(true) {}
266#else
268 _initIsCalled(false),
269 _rank(0),
270 _numberOfProcessors(1),
271 _timeOutWarning(0),
272 _deadlockTimeOut(0),
273 _areTimeoutsEnabled(true) {}
274#endif
275
277
278#ifdef Parallel
280 const void *sendbuf, void *recvbuf, int count,
281 MPI_Datatype datatype,
282 MPI_Op op,
283 std::function<void()> waitor
284) {
285 logTraceIn( "allReduce()" );
286
287// if (getNumberOfRanks()>1) {
288 MPI_Request* request = new MPI_Request();
289 MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, getCommunicator(), request );
290
293
294 int flag = 0;
295 while (not flag) {
296 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "allReduce()", -1, -1 );
297 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "allReduce()", -1, -1 );
298
299 waitor();
300
301 // leads to deadlock/starve situations
302 //tarch::multicore::yield();
303 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
304 }
305
306 delete request;
307// }
308
309 logTraceOut( "allReduce()" );
310}
311
312
314 const void *sendbuf, void *recvbuf, int count,
315 MPI_Datatype datatype,
316 MPI_Op op, int root,
317 std::function<void()> waitor
318) {
319 logTraceIn( "reduce()" );
320
321// if (getNumberOfRanks()>1) {
322 MPI_Request* request = new MPI_Request();
323 MPI_Ireduce(sendbuf, recvbuf, count, datatype, op, root, getCommunicator(), request );
324
327 int flag = 0;
328 while (not flag) {
329 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "reduce()", -1, -1 );
330 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "reduce()", -1, -1 );
331
332 waitor();
333
334 // leads to deadlock/starve situations
335 //tarch::multicore::yield();
336 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
337 }
338
339 delete request;
340// }
341
342 logTraceOut( "reduce()" );
343}
344#endif
345
346
347void tarch::mpi::Rank::barrier([[maybe_unused]] std::function<void()> waitor) {
348 #ifdef Parallel
349 logTraceIn( "barrier()" );
350
351 if (getNumberOfRanks()>1) {
352 MPI_Request* request = new MPI_Request();
353 MPI_Ibarrier( getCommunicator(), request );
354
357 int flag = 0;
358 while (not flag) {
359 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "barrier()", -1, -1 );
360 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "barrier()", -1, -1 );
361
362 waitor();
363
364 // leads to deadlock/starve situations
365 //tarch::multicore::yield();
366 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
367 }
368
369 delete request;
370 }
371
372 logTraceOut( "barrier()" );
373 #endif
374}
375
376
377bool tarch::mpi::Rank::isMessageInQueue([[maybe_unused]] int tag) const {
378 #ifdef Parallel
379 int flag = 0;
380 MPI_Iprobe(
381 MPI_ANY_SOURCE, tag,
382 getCommunicator(), &flag, MPI_STATUS_IGNORE
383 );
384 return flag;
385 #else
386 return false;
387 #endif
388}
389
391 logTraceIn( "shutdown()" );
392 #ifdef Parallel
393 assertion( _rank!=-1 );
394
396
397#ifdef UseTargetDART
398 finalizeTargetDART();
399#endif
400
401 int errorCode = MPI_Finalize();
402 if (errorCode) {
403 logError( "shutdown()", MPIReturnValueToString(errorCode) );
404 }
405
406 _communicator = MPI_COMM_WORLD;
407 #endif
408
409 _rank = -1;
410 logTraceOut( "shutdown()" );
411}
412
413
415 return 0;
416}
417
418
420 #ifdef Parallel
421 assertion(_initIsCalled);
422 return getRank() == getGlobalMasterRank();
423 #else
424 return true;
425 #endif
426}
427
428
430 std::ostringstream statusMessage;
431 statusMessage << "MPI status:";
432
433 #ifdef CompilerHasUTSName
434 utsname* utsdata = new utsname();
435 assertion( utsdata!=NULL );
436 uname(utsdata);
437 statusMessage << " nodename=" << utsdata->nodename;
438 delete utsdata;
439 #else
440 statusMessage << " nodename=undef";
441 #endif
442
443 statusMessage << ", rank=" << _rank;
444 #ifdef Parallel
445 statusMessage << ", communicator=" << _communicator;
446 #endif
447 statusMessage << ", #processors=" << _numberOfProcessors;
448
449 logInfo( "logStatus()", statusMessage.str() );
450}
451
452
454 if (
455 _maxTags <= 0
456 or
457 _tagCounter < _maxTags
458 ) {
459 return true;
460 }
461 else {
462 logWarning( "validateMaxTagIsSupported()", "maximum tag value is " << _maxTags << " though we would need " << _tagCounter << " tags. Code will likely crash" );
463 return false;
464 }
465}
466
467#ifdef UseTargetDART
468static void targetDartTextPointer() {}
469#endif
470
471bool tarch::mpi::Rank::init([[maybe_unused]] int* argc, [[maybe_unused]] char*** argv) {
472 #ifdef Parallel
473 int result = MPI_SUCCESS;
474
475 #if defined( SharedMemoryParallelisation )
476 int initThreadProvidedThreadLevelSupport;
477 result = MPI_Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &initThreadProvidedThreadLevelSupport );
478 if (initThreadProvidedThreadLevelSupport!=MPI_THREAD_MULTIPLE ) {
479 std::cerr << "warning: MPI implementation does not support MPI_THREAD_MULTIPLE. Support multithreading level is "
480 << initThreadProvidedThreadLevelSupport << " instead of " << MPI_THREAD_MULTIPLE
481 << ". Disable MultipleThreadsMayTriggerMPICalls in the compiler-specific settings or via -DnoMultipleThreadsMayTriggerMPICalls."<< std::endl;
482 }
483 #else
484 result = MPI_Init( argc, argv );
485 #endif
486
487#ifdef UseTargetDART
488 initTargetDART(reinterpret_cast<void*>(&targetDartTextPointer));
489#endif
490
491 if (result!=MPI_SUCCESS) {
492 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
493 return false;
494 }
495
496 result = MPI_Comm_size( MPI_COMM_WORLD, &_numberOfProcessors );
497 if (result!=MPI_SUCCESS) {
498 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
499 return false;
500 }
501
502 result = MPI_Comm_rank( MPI_COMM_WORLD, &_rank );
503 if (result!=MPI_SUCCESS) {
504 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
505 return false;
506 }
507
508 int answerFlag;
509 void* rawMaxTag;
510 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &rawMaxTag, &answerFlag);
511 if (answerFlag) {
512 _maxTags = *(int*)rawMaxTag;
513 }
514 else {
515 std::cerr << "init(int*,char***)\t was not able to query what the maximum tag value is" << std::endl;
516 return false;
517 }
518
521 #endif
522
523 _initIsCalled = true;
524 return _initIsCalled;
525}
526
527
529/*
530 #ifdef Parallel
531 assertion(_initIsCalled);
532 #endif
533*/
534 return _rank;
535}
536
537
539 return _singleton;
540}
541
542
543#ifdef Parallel
545 assertion(_initIsCalled);
546 return _communicator;
547}
548#endif
549
550
552 #ifdef Parallel
553 assertion(_initIsCalled);
554 #endif
555 return _numberOfProcessors;
556}
557
558
560 assertion( value>=0 );
561 _timeOutWarning = std::chrono::seconds(value);
562}
563
564
566 assertion( value>=0 );
567 _deadlockTimeOut = std::chrono::seconds(value);
568 if (value==0) {
569 logInfo( "setDeadlockTimeOut(int)", "set deadlock timeout to " << _deadlockTimeOut.count() << " and hence disabled timeout checks" );
570 }
571}
572
573
574#ifdef Parallel
575void tarch::mpi::Rank::setCommunicator( MPI_Comm communicator, [[maybe_unused]] bool recomputeRankAndWorld ) {
576 _communicator = communicator;
577
578 int result = MPI_Comm_size( _communicator, &_numberOfProcessors );
579 if (result!=MPI_SUCCESS) {
580 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
581 }
582
583 result = MPI_Comm_rank( _communicator, &_rank );
584 if (result!=MPI_SUCCESS) {
585 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
586 }
587}
588#endif
589
590
591void tarch::mpi::Rank::abort([[maybe_unused]] int errorCode) {
592 std::cout.flush();
593 std::cerr.flush();
594 #ifdef Parallel
595 MPI_Abort(MPI_COMM_WORLD,errorCode);
596 #else
597 std::abort();
598 #endif
599}
#define assertion2(expr, param0, param1)
#define assertion3(expr, param0, param1, param2)
#define assertion(expr)
#define logError(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:464
#define logTraceOut(methodName)
Definition Log.h:379
#define logWarning(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:440
#define logTraceIn(methodName)
Definition Log.h:369
#define logInfo(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:411
Log Device.
Definition Log.h:516
Represents a program instance within a cluster.
Definition Rank.h:58
void setCommunicator(MPI_Comm communicator, bool recomputeRankAndWorld=true)
Set communicator to be used by Peano.
Definition Rank.cpp:575
Rank()
The standard constructor assignes the attributes default values and checks whether the program is com...
Definition Rank.cpp:258
void setDeadlockTimeOut(int valueInSeconds)
Set deadlock time out.
Definition Rank.cpp:565
static int _tagCounter
Count the tags that have already been handed out.
Definition Rank.h:140
static int getGlobalMasterRank()
Get the global master.
Definition Rank.cpp:414
int getNumberOfRanks() const
Definition Rank.cpp:551
bool isInitialised() const
Definition Rank.cpp:64
void triggerDeadlockTimeOut(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1, const std::string &comment="")
Triggers a time out and shuts down the cluster if a timeout is violated.
Definition Rank.cpp:119
bool isGlobalMaster() const
Is this node the global master process, i.e.
Definition Rank.cpp:419
void setDeadlockWarningTimeStamp()
Memorise global timeout.
Definition Rank.cpp:188
bool exceededTimeOutWarningThreshold() const
Definition Rank.cpp:101
static int _maxTags
Set by init() and actually stores the number of valid tags.
Definition Rank.h:135
void writeTimeOutWarning(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1)
Writes a warning if relevant.
Definition Rank.cpp:143
void setDeadlockTimeOutTimeStamp()
Definition Rank.cpp:193
void ensureThatMessageQueuesAreEmpty(int fromRank, int tag)
Ensure that there are no messages anymore from the specified rank.
Definition Rank.cpp:69
void reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:313
static Rank & getInstance()
This operation returns the singleton instance.
Definition Rank.cpp:538
bool isMessageInQueue(int tag) const
In older DaStGen version, I tried to find out whether a particular message type is in the MPI queue.
Definition Rank.cpp:377
void setTimeOutWarning(int valueInSeconds)
Set time out warning.
Definition Rank.cpp:559
int getRank() const
Return rank of this node.
Definition Rank.cpp:528
bool init(int *argc, char ***argv)
This operation initializes the MPI environment and the program instance.
Definition Rank.cpp:471
void allReduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:279
void suspendTimeouts(bool timeoutsDisabled)
Definition Rank.cpp:198
static tarch::logging::Log _log
Logging device.
Definition Rank.h:66
void barrier(std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:347
void logStatus() const
Logs the status of the process onto the log device.
Definition Rank.cpp:429
static bool validateMaxTagIsSupported()
Just try to find out if a tag is actually supported.
Definition Rank.cpp:453
static int reserveFreeTag(const std::string &fullQualifiedMessageName, int numberOfTags=1)
Return a Free Tag.
Definition Rank.cpp:39
void plotMessageQueues()
Definition Rank.cpp:81
virtual ~Rank()
The standard destructor calls MPI_Finalize().
Definition Rank.cpp:276
static void releaseTag(int tag)
Definition Rank.cpp:32
bool exceededDeadlockThreshold() const
Definition Rank.cpp:110
void shutdown()
Shuts down the application.
Definition Rank.cpp:390
static void abort(int errorCode)
A proper abort in an MPI context has to use MPI_Abort.
Definition Rank.cpp:591
MPI_Comm getCommunicator() const
Definition Rank.cpp:544
static Rank _singleton
Definition Rank.h:68
std::string MPIStatusToString(const MPI_Status &status)
Returns a string representation of the mpi status.
Definition Rank.cpp:245
std::string MPIReturnValueToString(int result)
Definition Rank.cpp:204
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.
static void shutdownDatatype()
Free the underlying MPI datatype.
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.