22#ifdef CompilerHasUTSName
23#include <sys/utsname.h>
40 assertion2(numberOfTags>=1,fullQualifiedMessageName,numberOfTags);
41 const int result = _tagCounter;
42 _tagCounter += numberOfTags;
63 validateMaxTagIsSupported();
77 MPI_Iprobe(fromRank, tag, _communicator, &flag, MPI_STATUS_IGNORE);
81 assertion3( flag==0, fromRank, tag, getRank() );
90 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, _communicator, &flag, &status);
92 logError(
"plotMessageQueues()",
"there are no messages from any sender in MPI queue");
96 "plotMessageQueues()",
97 "there is still a message in queue "
98 " from rank " << status.MPI_SOURCE <<
99 " with tag " << status.MPI_TAG
107 return _areTimeoutsEnabled
109 _timeOutWarning>std::chrono::seconds(0)
111 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
116 return _areTimeoutsEnabled
118 _deadlockTimeOut>std::chrono::seconds(0)
120 std::chrono::system_clock::now() > _globalTimeOutDeadlock;
125 const std::string& className,
126 const std::string& methodName,
127 int communicationPartnerRank,
129 int numberOfExpectedMessages,
130 const std::string& comment
132 if ( exceededDeadlockThreshold() ) {
133 std::ostringstream out;
134 out <<
"operation " << className <<
"::" << methodName <<
" on node "
135 << getRank() <<
" had to wait more than " << std::to_string(_deadlockTimeOut.count())
136 <<
" seconds for " << numberOfExpectedMessages
137 <<
" message(s) from node " << communicationPartnerRank <<
" with tag " << tag
138 <<
". Timeout. " << comment;
139 logError(
"triggerDeadlockTimeOut(...)", out.str() );
143 abort(DEADLOCK_EXIT_CODE);
149 const std::string& className,
150 const std::string& methodName,
151 int communicationPartnerRank,
153 int numberOfExpectedMessages
155 if ( exceededTimeOutWarningThreshold() ) {
157 "writeTimeOutWarning(...)",
158 "operation " << className <<
"::" << methodName <<
" on node "
159 << getRank() <<
" had to wait more than " << std::to_string(_timeOutWarning.count())
160 <<
" seconds for " << numberOfExpectedMessages
161 <<
" message(s) from node " << communicationPartnerRank <<
" with tag " << tag
164 if ( _deadlockTimeOut.count()>0 ) {
166 "writeTimeOutWarning(...)",
167 "application will terminate after " << std::to_string(_deadlockTimeOut.count()) <<
" seconds because of a deadlock"
171 "writeTimeOutWarning(...)",
172 "deadlock detection switched off as deadlock time out is set to " << _deadlockTimeOut.count() <<
", i.e. you will continue to get warnings, but code will not shut down"
177 _timeOutWarning<_deadlockTimeOut/2
179 _deadlockTimeOut.count()<=0
181 std::chrono::seconds newTimeOutWarning = _timeOutWarning*2;
183 "writeTimeOutWarning(...)",
184 "increase time out warning threshold from " << std::to_string(_timeOutWarning.count()) <<
" seconds to " << std::to_string(newTimeOutWarning.count()) <<
" seconds to avoid flood of warning messages"
186 _timeOutWarning = newTimeOutWarning;
187 _globalTimeOutWarning = _globalTimeOutWarning+_timeOutWarning;
194 _globalTimeOutWarning = std::chrono::system_clock::now() + _timeOutWarning;
199 _globalTimeOutDeadlock = std::chrono::system_clock::now() + _deadlockTimeOut;
204 _areTimeoutsEnabled = !timeoutsDisabled;
210 std::ostringstream out;
213 char*
string =
new char[MPI_MAX_ERROR_STRING];
214 MPI_Error_string(result,
string, &resultlen);
217 MPI_Error_class(result, &errorclass);
219 out <<
"mpi error class: " << errorclass <<
"="
220 <<
", mpi error text: " << string;
222 switch ( errorclass ) {
223 case MPI_SUCCESS: out <<
"MPI_SUCCESS [no error]";
break;
224 case MPI_ERR_BUFFER: out <<
"MPI_ERR_BUFFER [invalid buffer pointer]";
break;
225 case MPI_ERR_COUNT: out <<
"MPI_ERR_COUNT [invalid count argument]";
break;
226 case MPI_ERR_TYPE: out <<
"MPI_ERR_TYPE [invalid datatype]";
break;
227 case MPI_ERR_TAG: out <<
"MPI_ERR_TAG [invalid tag]";
break;
228 case MPI_ERR_COMM: out <<
"MPI_ERR_COMM [invalid communicator]";
break;
229 case MPI_ERR_RANK: out <<
"MPI_ERR_RANK [invalid rank]";
break;
230 case MPI_ERR_REQUEST: out <<
"MPI_ERR_REQUEST [invalid request handle]";
break;
231 case MPI_ERR_ROOT: out <<
"MPI_ERR_ROOT [invalid root argument]";
break;
232 case MPI_ERR_GROUP: out <<
"MPI_ERR_GROUP [invalid group]";
break;
233 case MPI_ERR_OP: out <<
"MPI_ERR_OP [invalid operation]";
break;
234 case MPI_ERR_TOPOLOGY: out <<
"MPI_ERR_TOPOLOGY [invalid topology]";
break;
235 case MPI_ERR_DIMS: out <<
"MPI_ERR_DIMS [invalid dimensions]";
break;
236 case MPI_ERR_ARG: out <<
"MPI_ERR_ARG [invalid argument]";
break;
237 case MPI_ERR_UNKNOWN: out <<
"MPI_ERR_UNKNOWN [unknown error]";
break;
238 case MPI_ERR_TRUNCATE: out <<
"MPI_ERR_TRUNCATE [message has been truncated by receiver]";
break;
239 case MPI_ERR_OTHER: out <<
"MPI_ERR_OTHER [other unknown error]";
break;
240 case MPI_ERR_INTERN: out <<
"MPI_ERR_INTERN [internal mpi error]";
break;
241 default: out <<
"unknown";
break;
251 std::ostringstream out;
252 out <<
"status flag:"
253 <<
" MPI_ERROR=" << status.MPI_ERROR
255 <<
") ,MPI_SOURCE=" << status.MPI_SOURCE
256 <<
",MPI_TAG=" << status.MPI_TAG;
264 _initIsCalled(false),
266 _numberOfProcessors(-1),
267 _communicator( MPI_COMM_WORLD),
270 _areTimeoutsEnabled(true) {}
273 _initIsCalled(false),
275 _numberOfProcessors(1),
278 _areTimeoutsEnabled(true) {}
285 const void *sendbuf,
void *recvbuf,
int count,
286 MPI_Datatype datatype,
288 std::function<
void()> waitor
293 MPI_Request* request =
new MPI_Request();
294 MPI_Iallreduce(sendbuf, recvbuf, count, datatype, op, getCommunicator(), request );
308 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
319 const void *sendbuf,
void *recvbuf,
int count,
320 MPI_Datatype datatype,
322 std::function<
void()> waitor
327 MPI_Request* request =
new MPI_Request();
328 MPI_Ireduce(sendbuf, recvbuf, count, datatype, op, root, getCommunicator(), request );
341 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
356 if (getNumberOfRanks()>1) {
357 MPI_Request* request =
new MPI_Request();
358 MPI_Ibarrier( getCommunicator(), request );
371 MPI_Test( request, &flag, MPI_STATUS_IGNORE );
387 getCommunicator(), &flag, MPI_STATUS_IGNORE
402 int errorCode = MPI_Finalize();
407 _communicator = MPI_COMM_WORLD;
423 return getRank() == getGlobalMasterRank();
431 std::ostringstream statusMessage;
432 statusMessage <<
"MPI status:";
434 #ifdef CompilerHasUTSName
435 utsname* utsdata =
new utsname();
438 statusMessage <<
" nodename=" << utsdata->nodename;
441 statusMessage <<
" nodename=undef";
444 statusMessage <<
", rank=" << _rank;
446 statusMessage <<
", communicator=" << _communicator;
448 statusMessage <<
", #processors=" << _numberOfProcessors;
450 logInfo(
"logStatus()", statusMessage.str() );
458 _tagCounter < _maxTags
463 logWarning(
"validateMaxTagIsSupported()",
"maximum tag value is " << _maxTags <<
" though we would need " << _tagCounter <<
" tags. Code will likely crash" );
469static void tdTextPointer() {}
474 int result = MPI_SUCCESS;
476 #if defined( SharedMemoryParallelisation )
477 int initThreadProvidedThreadLevelSupport;
478 result = MPI_Init_thread( argc, argv, MPI_THREAD_MULTIPLE, &initThreadProvidedThreadLevelSupport );
479 if (initThreadProvidedThreadLevelSupport!=MPI_THREAD_MULTIPLE ) {
480 std::cerr <<
"warning: MPI implementation does not support MPI_THREAD_MULTIPLE. Support multithreading level is "
481 << initThreadProvidedThreadLevelSupport <<
" instead of " << MPI_THREAD_MULTIPLE
482 <<
". Disable MultipleThreadsMayTriggerMPICalls in the compiler-specific settings or via -DnoMultipleThreadsMayTriggerMPICalls."<< std::endl;
485 result = MPI_Init( argc, argv );
489 td_init(
reinterpret_cast<void*
>(&tdTextPointer));
492 if (result!=MPI_SUCCESS) {
493 std::cerr <<
"init(int*,char***)\t initialisation failed: " +
MPIReturnValueToString(result) +
" (no logging available yet)" << std::endl;
497 result = MPI_Comm_size( MPI_COMM_WORLD, &_numberOfProcessors );
498 if (result!=MPI_SUCCESS) {
499 std::cerr <<
"init(int*,char***)\t initialisation failed: " +
MPIReturnValueToString(result) +
" (no logging available yet)" << std::endl;
503 result = MPI_Comm_rank( MPI_COMM_WORLD, &_rank );
504 if (result!=MPI_SUCCESS) {
505 std::cerr <<
"init(int*,char***)\t initialisation failed: " +
MPIReturnValueToString(result) +
" (no logging available yet)" << std::endl;
511 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &rawMaxTag, &answerFlag);
513 _maxTags = *(
int*)rawMaxTag;
516 std::cerr <<
"init(int*,char***)\t was not able to query what the maximum tag value is" << std::endl;
524 _initIsCalled =
true;
525 return _initIsCalled;
547 return _communicator;
556 return _numberOfProcessors;
562 _timeOutWarning = std::chrono::seconds(value);
568 _deadlockTimeOut = std::chrono::seconds(value);
570 logInfo(
"setDeadlockTimeOut(int)",
"set deadlock timeout to " << _deadlockTimeOut.count() <<
" and hence disabled timeout checks" );
577 _communicator = communicator;
579 int result = MPI_Comm_size( _communicator, &_numberOfProcessors );
580 if (result!=MPI_SUCCESS) {
584 result = MPI_Comm_rank( _communicator, &_rank );
585 if (result!=MPI_SUCCESS) {
596 MPI_Abort(MPI_COMM_WORLD,errorCode);
#define assertion2(expr, param0, param1)
#define assertion3(expr, param0, param1, param2)
#define logError(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
#define logTraceOut(methodName)
#define logWarning(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
#define logTraceIn(methodName)
#define logInfo(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Represents a program instance within a cluster.
void setCommunicator(MPI_Comm communicator, bool recomputeRankAndWorld=true)
Set communicator to be used by Peano.
Rank()
The standard constructor assignes the attributes default values and checks whether the program is com...
void setDeadlockTimeOut(int valueInSeconds)
Set deadlock time out.
static int _tagCounter
Count the tags that have already been handed out.
static int getGlobalMasterRank()
Get the global master.
int getNumberOfRanks() const
bool isInitialised() const
void triggerDeadlockTimeOut(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1, const std::string &comment="")
Triggers a time out and shuts down the cluster if a timeout is violated.
bool isGlobalMaster() const
Is this node the global master process, i.e.
void setDeadlockWarningTimeStamp()
Memorise global timeout.
bool exceededTimeOutWarningThreshold() const
static int _maxTags
Set by init() and actually stores the number of valid tags.
void writeTimeOutWarning(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1)
Writes a warning if relevant.
void setDeadlockTimeOutTimeStamp()
void ensureThatMessageQueuesAreEmpty(int fromRank, int tag)
Ensure that there are no messages anymore from the specified rank.
void reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, std::function< void()> waitor=[]() -> void {})
static Rank & getInstance()
This operation returns the singleton instance.
bool isMessageInQueue(int tag) const
In older DaStGen version, I tried to find out whether a particular message type is in the MPI queue.
void setTimeOutWarning(int valueInSeconds)
Set time out warning.
int getRank() const
Return rank of this node.
bool init(int *argc, char ***argv)
This operation initializes the MPI environment and the program instance.
void allReduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, std::function< void()> waitor=[]() -> void {})
void suspendTimeouts(bool timeoutsDisabled)
static tarch::logging::Log _log
Logging device.
void barrier(std::function< void()> waitor=[]() -> void {})
void logStatus() const
Logs the status of the process onto the log device.
static bool validateMaxTagIsSupported()
Just try to find out if a tag is actually supported.
static int reserveFreeTag(const std::string &fullQualifiedMessageName, int numberOfTags=1)
Return a Free Tag.
virtual ~Rank()
The standard destructor calls MPI_Finalize().
static void releaseTag(int tag)
bool exceededDeadlockThreshold() const
void shutdown()
Shuts down the application.
static void abort(int errorCode)
A proper abort in an MPI context has to use MPI_Abort.
MPI_Comm getCommunicator() const
std::string MPIStatusToString(const MPI_Status &status)
Returns a string representation of the mpi status.
std::string MPIReturnValueToString(int result)
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.
static void shutdownDatatype()
Free the underlying MPI datatype.
static void initDatatype()
Wrapper around getDatatype() to trigger lazy evaluation if we use the lazy initialisation.