Peano
Loading...
Searching...
No Matches
Rank.cpp
Go to the documentation of this file.
1#include "tarch/Assertions.h"
2#include "tarch/tarch.h"
4
5#include <sstream>
6#include <cstdlib>
7#include <chrono>
8
9#include "Rank.h"
12
17
22#ifdef CompilerHasUTSName
23#include <sys/utsname.h>
24#endif
25
29
31
33 if (tag==_tagCounter-1) {
35 }
36}
37
38
39int tarch::mpi::Rank::reserveFreeTag([[maybe_unused]] const std::string& fullQualifiedMessageName, [[maybe_unused]] int numberOfTags) {
40 assertion2(numberOfTags>=1,fullQualifiedMessageName,numberOfTags);
41 const int result = _tagCounter;
42 _tagCounter += numberOfTags;
43
44 // I protect the tag manually (not via log filter), as many tags are actually
45 // grabbed before most applications initialise their log filters properly.
46 //
47 // We may not use isGlobalMaster() as this query checks whether the code is
48 // properly initialised. Please note rank is -1 as long as MPI is not properly
49 // initialised, i.e. any tag booking prior to the MPI initialisation is not
50 // logged properly.
51 //
52 // We also may not use the instance, as the instance might not be up yet. I
53 // also saw other stuff with cout, so better ignore it
54 /*
55 #if PeanoDebug>0
56 if ( getInstance()._rank==getGlobalMasterRank() ) {
57 std::cout << "assigned message " << fullQualifiedMessageName
58 << " the free tag " << result << " (" << numberOfTags << " consecutive tags reserved)" << std::endl;
59 }
60 #endif
61 */
62
64
65 return result;
66}
67
68
70 return _initIsCalled;
71}
72
73
74void tarch::mpi::Rank::ensureThatMessageQueuesAreEmpty( [[maybe_unused]] int fromRank, [[maybe_unused]] int tag ) {
75 #ifdef Parallel
76 int flag;
77 MPI_Iprobe(fromRank, tag, _communicator, &flag, MPI_STATUS_IGNORE);
78 if (flag!=0) {
80 }
81 assertion3( flag==0, fromRank, tag, getRank() );
82 #endif
83}
84
85
87 #ifdef Parallel
88 int flag;
89 MPI_Status status;
90 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, _communicator, &flag, &status);
91 if (flag==0) {
92 logError("plotMessageQueues()", "there are no messages from any sender in MPI queue");
93 }
94 else {
96 "plotMessageQueues()",
97 "there is still a message in queue "
98 " from rank " << status.MPI_SOURCE <<
99 " with tag " << status.MPI_TAG
100 );
101 }
102 #endif
103}
104
105
108 and
109 _timeOutWarning>std::chrono::seconds(0)
110 and
111 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
112}
113
114
117 and
118 _deadlockTimeOut>std::chrono::seconds(0)
119 and
120 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
121}
122
123
125 const std::string& className,
126 const std::string& methodName,
127 int communicationPartnerRank,
128 int tag,
129 int numberOfExpectedMessages,
130 const std::string& comment
131) {
133 std::ostringstream out;
134 out << "operation " << className << "::" << methodName << " on node "
135 << getRank() << " had to wait more than " << std::to_string(_deadlockTimeOut.count())
136 << " seconds for " << numberOfExpectedMessages
137 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
138 << ". Timeout. " << comment;
139 logError( "triggerDeadlockTimeOut(...)", out.str() );
140
142
144 }
145}
146
147
149 const std::string& className,
150 const std::string& methodName,
151 int communicationPartnerRank,
152 int tag,
153 int numberOfExpectedMessages
154) {
157 "writeTimeOutWarning(...)",
158 "operation " << className << "::" << methodName << " on node "
159 << getRank() << " had to wait more than " << std::to_string(_timeOutWarning.count())
160 << " seconds for " << numberOfExpectedMessages
161 << " message(s) from node " << communicationPartnerRank << " with tag " << tag
162 );
163
164 if ( _deadlockTimeOut.count()>0 ) {
166 "writeTimeOutWarning(...)",
167 "application will terminate after " << std::to_string(_deadlockTimeOut.count()) << " seconds because of a deadlock"
168 );
169 } else {
171 "writeTimeOutWarning(...)",
172 "deadlock detection switched off as deadlock time out is set to " << _deadlockTimeOut.count() << ", i.e. you will continue to get warnings, but code will not shut down"
173 );
174 }
175
176 if (
178 or
179 _deadlockTimeOut.count()<=0
180 ) {
181 std::chrono::seconds newTimeOutWarning = _timeOutWarning*2;
183 "writeTimeOutWarning(...)",
184 "increase time out warning threshold from " << std::to_string(_timeOutWarning.count()) << " seconds to " << std::to_string(newTimeOutWarning.count()) << " seconds to avoid flood of warning messages"
185 );
186 _timeOutWarning = newTimeOutWarning;
188 }
189 }
190}
191
192
194 _globalTimeOutWarning = std::chrono::system_clock::now() + _timeOutWarning;
195}
196
197
199 _globalTimeOutDeadlock = std::chrono::system_clock::now() + _deadlockTimeOut;
200}
201
202
203void tarch::mpi::Rank::suspendTimeouts( bool timeoutsDisabled ) {
204 _areTimeoutsEnabled = !timeoutsDisabled;
205}
206
207
208#ifdef Parallel
210 _initIsCalled(false),
211 _rank(-1),
214 _communicator( MPI_COMM_WORLD),
217 _areTimeoutsEnabled(true) {}
218#else
220 _initIsCalled(false),
221 _rank(0),
222 _numberOfProcessors(1),
223 _timeOutWarning(0),
224 _deadlockTimeOut(0),
225 _areTimeoutsEnabled(true) {}
226#endif
227
229
230#ifdef Parallel
232 const void *sendbuf, void *recvbuf, int count,
233 MPI_Datatype datatype,
234 MPI_Op op,
235 std::function<void()> waitor
236) {
237 logTraceIn( "allReduce()" );
238
239// if (getNumberOfRanks()>1) {
240 MPI_Request* request = new MPI_Request();
241 MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, getCommunicator(), request );
242
245
246 int flag = 0;
247 while (not flag) {
248 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "allReduce()", -1, -1 );
249 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "allReduce()", -1, -1 );
250
251 waitor();
252
253 // leads to deadlock/starve situations
254 //tarch::multicore::yield();
255 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
256 }
257
258 delete request;
259// }
260
261 logTraceOut( "allReduce()" );
262}
263
264
266 const void *sendbuf, void *recvbuf, int count,
267 MPI_Datatype datatype,
268 MPI_Op op, int root,
269 std::function<void()> waitor
270) {
271 logTraceIn( "reduce()" );
272
273// if (getNumberOfRanks()>1) {
274 MPI_Request* request = new MPI_Request();
275 MPI_Ireduce(sendbuf, recvbuf, count, datatype, op, root, getCommunicator(), request );
276
279 int flag = 0;
280 while (not flag) {
281 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "reduce()", -1, -1 );
282 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "reduce()", -1, -1 );
283
284 waitor();
285
286 // leads to deadlock/starve situations
287 //tarch::multicore::yield();
288 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
289 }
290
291 delete request;
292// }
293
294 logTraceOut( "reduce()" );
295}
296#endif
297
298
299void tarch::mpi::Rank::barrier([[maybe_unused]] std::function<void()> waitor) {
300 #ifdef Parallel
301 logTraceIn( "barrier()" );
302
303 if (getNumberOfRanks()>1) {
304 MPI_Request* request = new MPI_Request();
305 MPI_Ibarrier( getCommunicator(), request );
306
309 int flag = 0;
310 while (not flag) {
311 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::Rank", "barrier()", -1, -1 );
312 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::Rank", "barrier()", -1, -1 );
313
314 waitor();
315
316 // leads to deadlock/starve situations
317 //tarch::multicore::yield();
318 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
319 }
320
321 delete request;
322 }
323
324 logTraceOut( "barrier()" );
325 #endif
326}
327
328
329bool tarch::mpi::Rank::isMessageInQueue([[maybe_unused]] int tag) const {
330 #ifdef Parallel
331 int flag = 0;
332 MPI_Iprobe(
333 MPI_ANY_SOURCE, tag,
334 getCommunicator(), &flag, MPI_STATUS_IGNORE
335 );
336 return flag;
337 #else
338 return false;
339 #endif
340}
341
343 logTraceIn( "shutdown()" );
344 #ifdef Parallel
345 assertion( _rank!=-1 );
346
348
349 int errorCode = MPI_Finalize();
350 if (errorCode) {
351 logError( "shutdown()", MPIReturnValueToString(errorCode) );
352 }
353
354 _communicator = MPI_COMM_WORLD;
355 #endif
356
357 _rank = -1;
358 logTraceOut( "shutdown()" );
359}
360
361
363 return 0;
364}
365
366
368 #ifdef Parallel
370 return getRank() == getGlobalMasterRank();
371 #else
372 return true;
373 #endif
374}
375
376
378 std::ostringstream statusMessage;
379 statusMessage << "MPI status:";
380
381 #ifdef CompilerHasUTSName
382 utsname* utsdata = new utsname();
383 assertion( utsdata!=NULL );
384 uname(utsdata);
385 statusMessage << " nodename=" << utsdata->nodename;
386 delete utsdata;
387 #else
388 statusMessage << " nodename=undef";
389 #endif
390
391 statusMessage << ", rank=" << _rank;
392 #ifdef Parallel
393 statusMessage << ", communicator=" << _communicator;
394 #endif
395 statusMessage << ", #processors=" << _numberOfProcessors;
396
397 logInfo( "logStatus()", statusMessage.str() );
398}
399
400
402 if (
403 _maxTags <= 0
404 or
406 ) {
407 return true;
408 }
409 else {
410 logWarning( "validateMaxTagIsSupported()", "maximum tag value is " << _maxTags << " though we would need " << _tagCounter << " tags. Code will likely crash" );
411 return false;
412 }
413}
414
415#ifdef UseTargetDART
416static void tdTextPointer() {}
417#endif
418
419
423
424
425bool tarch::mpi::Rank::init([[maybe_unused]] int* argc, [[maybe_unused]] char*** argv) {
426 #ifdef Parallel
427 int result = MPI_SUCCESS;
428
429 #if defined( SharedMemoryParallelisation )
430 result = MPI_Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &_providedThreadLevelSupport );
431 if (_providedThreadLevelSupport!=MPI_THREAD_MULTIPLE ) {
432 std::cerr << "warning: MPI implementation does not support MPI_THREAD_MULTIPLE. Support multithreading level is "
433 << _providedThreadLevelSupport << " instead of " << MPI_THREAD_MULTIPLE
434 << ". Disable MultipleThreadsMayTriggerMPICalls in the compiler-specific settings or via -DnoMultipleThreadsMayTriggerMPICalls."<< std::endl;
435 }
436 #else
437 result = MPI_Init( argc, argv );
438 #endif
439
440#ifdef UseTargetDART
441 td_init(reinterpret_cast<void*>(&tdTextPointer));
442#endif
443
444 if (result!=MPI_SUCCESS) {
445 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
446 return false;
447 }
448
449 result = MPI_Comm_size( MPI_COMM_WORLD, &_numberOfProcessors );
450 if (result!=MPI_SUCCESS) {
451 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
452 return false;
453 }
454
455 result = MPI_Comm_rank( MPI_COMM_WORLD, &_rank );
456 if (result!=MPI_SUCCESS) {
457 std::cerr << "init(int*,char***)\t initialisation failed: " + MPIReturnValueToString(result) + " (no logging available yet)" << std::endl;
458 return false;
459 }
460
461 int answerFlag;
462 void* rawMaxTag;
463 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &rawMaxTag, &answerFlag);
464 if (answerFlag) {
465 _maxTags = *(int*)rawMaxTag;
466 }
467 else {
468 std::cerr << "init(int*,char***)\t was not able to query what the maximum tag value is" << std::endl;
469 return false;
470 }
471
474 #endif
475
476 _initIsCalled = true;
477 return _initIsCalled;
478}
479
480
482 return _rank;
483}
484
485
489
490
491#ifdef Parallel
496#endif
497
498
500 #ifdef Parallel
502 #endif
503 return _numberOfProcessors;
504}
505
506
508 assertion( value>=0 );
509 _timeOutWarning = std::chrono::seconds(value);
510}
511
512
514 assertion( value>=0 );
515 _deadlockTimeOut = std::chrono::seconds(value);
516 if (value==0) {
517 logInfo( "setDeadlockTimeOut(int)", "set deadlock timeout to " << _deadlockTimeOut.count() << " and hence disabled timeout checks" );
518 }
519}
520
521
522#ifdef Parallel
523void tarch::mpi::Rank::setCommunicator( MPI_Comm communicator, [[maybe_unused]] bool recomputeRankAndWorld ) {
524 _communicator = communicator;
525
526 int result = MPI_Comm_size( _communicator, &_numberOfProcessors );
527 if (result!=MPI_SUCCESS) {
528 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
529 }
530
531 result = MPI_Comm_rank( _communicator, &_rank );
532 if (result!=MPI_SUCCESS) {
533 logError( "setCommunicator(...)", "initialisation failed: " + MPIReturnValueToString(result) );
534 }
535}
536#endif
537
538
539void tarch::mpi::Rank::abort([[maybe_unused]] int errorCode) {
540 std::cout.flush();
541 std::cerr.flush();
542 #ifdef Parallel
543 MPI_Abort(MPI_COMM_WORLD,errorCode);
544 #else
545 std::abort();
546 #endif
547}
#define assertion2(expr, param0, param1)
#define assertion3(expr, param0, param1, param2)
#define assertion(expr)
#define logError(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:464
#define logTraceOut(methodName)
Definition Log.h:379
#define logWarning(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:440
#define logTraceIn(methodName)
Definition Log.h:369
#define logInfo(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:411
Log Device.
Definition Log.h:516
Represents a program instance within a cluster.
Definition Rank.h:48
void setCommunicator(MPI_Comm communicator, bool recomputeRankAndWorld=true)
Set communicator to be used by Peano.
Definition Rank.cpp:523
Rank()
The standard constructor assignes the attributes default values and checks whether the program is com...
Definition Rank.cpp:209
std::chrono::system_clock::time_point _globalTimeOutDeadlock
Definition Rank.h:113
int getProvidedThreadLevelSupport() const
Information on supported thread-level support.
Definition Rank.cpp:420
void setDeadlockTimeOut(int valueInSeconds)
Set deadlock time out.
Definition Rank.cpp:513
static int _tagCounter
Count the tags that have already been handed out.
Definition Rank.h:132
static int getGlobalMasterRank()
Get the global master.
Definition Rank.cpp:362
int getNumberOfRanks() const
Definition Rank.cpp:499
bool isInitialised() const
Definition Rank.cpp:69
void triggerDeadlockTimeOut(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1, const std::string &comment="")
Triggers a time out and shuts down the cluster if a timeout is violated.
Definition Rank.cpp:124
bool isGlobalMaster() const
Is this node the global master process, i.e.
Definition Rank.cpp:367
void setDeadlockWarningTimeStamp()
Memorise global timeout.
Definition Rank.cpp:193
bool exceededTimeOutWarningThreshold() const
Definition Rank.cpp:106
static int _maxTags
Set by init() and actually stores the number of valid tags.
Definition Rank.h:127
MPI_Comm _communicator
MPI Communicator this process belongs to.
Definition Rank.h:81
void writeTimeOutWarning(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1)
Writes a warning if relevant.
Definition Rank.cpp:148
int _numberOfProcessors
Number of processors available.
Definition Rank.h:73
std::chrono::system_clock::time_point _globalTimeOutWarning
Definition Rank.h:112
void setDeadlockTimeOutTimeStamp()
Definition Rank.cpp:198
void ensureThatMessageQueuesAreEmpty(int fromRank, int tag)
Ensure that there are no messages anymore from the specified rank.
Definition Rank.cpp:74
int _rank
Rank (id) of this process.
Definition Rank.h:68
void reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, std::function< void()> waitor=[]() -> void {})
Definition Rank.cpp:265
static Rank & getInstance()
This operation returns the singleton instance.
Definition Rank.cpp:486
std::chrono::seconds _deadlockTimeOut
Time to timeout.
Definition Rank.h:110
bool isMessageInQueue(int tag) const
In older DaStGen version, I tried to find out whether a particular message type is in the MPI queue.
Definition Rank.cpp:329
void setTimeOutWarning(int valueInSeconds)
Set time out warning.
Definition Rank.cpp:507
bool _areTimeoutsEnabled
Global toggle to enable/disable timeouts.
Definition Rank.h:122
int getRank() const
Return rank of this node.
Definition Rank.cpp:481
bool init(int *argc, char ***argv)
This operation initializes the MPI environment and the program instance.
Definition Rank.cpp:425
void allReduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, std::function< void()> waitor=[]() -> void {})
Wrapper around allreduce.
Definition Rank.cpp:231
void suspendTimeouts(bool timeoutsDisabled)
Definition Rank.cpp:203
static tarch::logging::Log _log
Logging device.
Definition Rank.h:56
std::chrono::seconds _timeOutWarning
Timeout warning.
Definition Rank.h:102
void barrier(std::function< void()> waitor=[]() -> void {})
Global MPI barrier.
Definition Rank.cpp:299
bool _initIsCalled
Is set true if init() is called.
Definition Rank.h:63
void logStatus() const
Logs the status of the process onto the log device.
Definition Rank.cpp:377
static bool validateMaxTagIsSupported()
Just try to find out if a tag is actually supported.
Definition Rank.cpp:401
int _providedThreadLevelSupport
Definition Rank.h:75
static int reserveFreeTag(const std::string &fullQualifiedMessageName, int numberOfTags=1)
Return a Free Tag.
Definition Rank.cpp:39
void plotMessageQueues()
Definition Rank.cpp:86
virtual ~Rank()
The standard destructor calls MPI_Finalize().
Definition Rank.cpp:228
static void releaseTag(int tag)
Definition Rank.cpp:32
bool exceededDeadlockThreshold() const
Definition Rank.cpp:115
void shutdown()
Shuts down the application.
Definition Rank.cpp:342
static void abort(int errorCode)
A proper abort in an MPI context has to use MPI_Abort.
Definition Rank.cpp:539
MPI_Comm getCommunicator() const
Definition Rank.cpp:492
static Rank _singleton
Definition Rank.h:58
static const int DEADLOCK_EXIT_CODE
Definition Rank.h:50
std::string MPIReturnValueToString(int result)
Definition mpi.cpp:9
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.
static void shutdownDatatype()
Free the underlying MPI datatype.
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.