Peano
Loading...
Searching...
No Matches
BooleanSemaphore.cpp
Go to the documentation of this file.
1#include "BooleanSemaphore.h"
2#include "Rank.h"
3
4#include "tarch/Assertions.h"
7
8tarch::logging::Log tarch::mpi::BooleanSemaphore::_log( "tarch::mpi::BooleanSemaphore" );
9
11
13
14
19
21 BooleanSemaphoreService::getInstance().releaseLock(_semaphoreNumber);
22 _localRankLockRequestSemaphore.leaveCriticalSection();
23}
24
26 _semaphoreNumber( _semaphoreCounter ) {
28}
29
30
32
33
38
39
41 if (locked) {
42 return "(locked,by-rank=" + std::to_string(rankThatLastLocked) + ")";
43 }
44 else {
45 return "(free,last-lock-by-rank=" + std::to_string(rankThatLastLocked) + ")";
46 }
47}
48
49
52 tarch::services::ServiceRepository::getInstance().addService( this, "tarch::mpi::BooleanSemaphore::BooleanSemaphoreService" );
53}
54
58
60 std::ostringstream msg;
61 msg << "(";
62 msg << "#sections:" << _map.size();
63 for (auto& p: _map) {
64 msg << "," << p.first << ":" << p.second.toString();
65 }
66 msg << ",#requests:" << _pendingLockRequests.size();
67 for (auto& p: _pendingLockRequests) {
68 msg << "," << "lock " << p.second << " from " << p.first;
69 }
70 msg << ")";
71 return msg.str();
72}
73
74
76 assertion( tarch::mpi::Rank::getInstance().isGlobalMaster() );
77
79
80 addMapEntryLazily(number);
81
82 bool gotLock = false;
83 assertion( _map.count(number)==1 );
84 if ( not _map[number].locked ) {
85 gotLock = true;
86 _map[number].locked = true;
87 _map[number].rankThatLastLocked = forRank;
88 logDebug( "tryLockSemaphoreOnGlobalMaster(int,int)", "successfully locked semaphore " << number << " for rank " << forRank );
89 }
90
91 return gotLock;
92}
93
94
96 assertion( tarch::mpi::Rank::getInstance().isGlobalMaster() );
97
100
101 logDebug( "lockSemaphoreOnGlobalMaster(int,int)", "will try to lock semahpore " << number << " for rank " << forRank );
102
103 bool gotLock = false;
104 bool wroteWarning = false;
105 while (not gotLock) {
106 gotLock = tryLockSemaphoreOnGlobalMaster(number,forRank);
107
108 if ( not wroteWarning and tarch::mpi::Rank::getInstance().exceededTimeOutWarningThreshold() ) {
109 wroteWarning = true;
110 logWarning( "lockSemaphoreOnGlobalMaster()", "semaphore " << number << " is locked by " << _map[number].rankThatLastLocked << " and therefore cannot be locked for " << forRank );
111 }
112
114 }
115}
116
117
119 assertion( tarch::mpi::Rank::getInstance().isGlobalMaster() );
120 assertion2( number>0, number, forRank );
121
123 assertion( _map.count(number)==1 );
124 assertion( _map[number].locked );
125 assertionEquals( _map[number].rankThatLastLocked, forRank );
126 _map[number].locked = false;
127
128 logDebug( "unlockSemaphoreOnGlobalMaster()", "successfully released lock " << number << ". state=" << toString() );
129}
130
131
133 assertion(tarch::mpi::Rank::getInstance().isGlobalMaster());
134
135 #ifdef Parallel
137
138 bool servedLockRequest = true;
139 while (servedLockRequest) {
140 servedLockRequest = false;
141 {
142 for (auto request = _pendingLockRequests.begin(); request != _pendingLockRequests.end(); ) {
143 if ( tryLockSemaphoreOnGlobalMaster(request->second, request->first) ) {
144 servedLockRequest = true;
145 logDebug( "receiveDanglingMessages()", "locked sempahore " << request->second << " for rank " << request->first << ". state=" << toString() );
146 MPI_Send( &(request->second), 1, MPI_INT, request->first, _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator());
147 request = _pendingLockRequests.erase(request);
148 }
149 else {
150 request++;
151 }
152 }
153 }
154 }
155 #endif
156}
157
158
160 if (tarch::mpi::Rank::getInstance().isGlobalMaster()) {
161 #ifdef Parallel
162 int flag = false;
163 MPI_Status status;
164 MPI_Iprobe(MPI_ANY_SOURCE, _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator(), &flag, &status);
165
166 if (flag) {
167 int number;
168 logDebug( "receiveDanglingMessages()", "there's a pending message from " << status.MPI_SOURCE );
169 MPI_Recv( &number, 1, MPI_INT, status.MPI_SOURCE, _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator(), MPI_STATUS_IGNORE);
170 assertion(number!=0);
171
172 logDebug( "receiveDanglingMessages()", "received number " << number << " from rank " << status.MPI_SOURCE );
173 if (number>0) {
175 std::pair<int,int> newEntry( status.MPI_SOURCE, number );
176 _pendingLockRequests.push_back(newEntry);
177 logDebug( "receiveDanglingMessages()", "there are " << _pendingLockRequests.size() << " lock requests in total. state=" << toString() );
178 }
179 else {
180 releaseLock(-number);
181 }
182 }
183 else {
185 }
186 #endif
187 }
188}
189
190
199
200
204
205
207 assertion1( number>0, number );
208 if ( _map.count(number)==0 ) {
209 _map.insert( std::pair<int,SemaphoreMapEntry>(number,SemaphoreMapEntry()) );
210 }
211}
212
213
215 if ( tarch::mpi::Rank::getInstance().isGlobalMaster() ) {
217 }
218 else {
219 #ifdef Parallel
220 logDebug( "acquireLock()", "have to acquire lock on global master " << tarch::mpi::Rank::getGlobalMasterRank() << " and thus send master a " << number );
221
222 MPI_Send( &number, 1, MPI_INT, tarch::mpi::Rank::getGlobalMasterRank(), _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator() );
223
224 logDebug( "acquireLock()", "wait for confirmation from global master rank" );
225 MPI_Request request;
228
229 MPI_Irecv( &number, 1, MPI_INT, tarch::mpi::Rank::getGlobalMasterRank(), _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator(), &request );
230 int flag = 0;
231 while ( not flag ) {
232 tarch::mpi::Rank::getInstance().writeTimeOutWarning( "tarch::mpi::BooleanSemaphore::BooleanSemaphoreService::", "acquireLock(int)", tarch::mpi::Rank::getGlobalMasterRank(), _semaphoreTag );
233 tarch::mpi::Rank::getInstance().triggerDeadlockTimeOut( "tarch::mpi::BooleanSemaphore::BooleanSemaphoreService::", "acquireLock(int)", tarch::mpi::Rank::getGlobalMasterRank(), _semaphoreTag );
235
236 // See documentation on dangling messages
237 //receiveDanglingMessages();
238 MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
239 }
240 #else
241 assertionMsg( false, "may not happen" );
242 #endif
243 }
244}
245
247 assertion( number>0 );
248 if ( tarch::mpi::Rank::getInstance().isGlobalMaster() ) {
251 }
252 else {
253 #ifdef Parallel
254 number = -number;
255 logDebug( "releaseLock()", "send global master " << number << " to release global lock" );
256
257 MPI_Send( &number, 1, MPI_INT, tarch::mpi::Rank::getGlobalMasterRank(), _semaphoreTag, tarch::mpi::Rank::getInstance().getCommunicator() );
258 #else
259 assertionMsg( false, "may not happen" );
260 #endif
261 }
262}
#define assertion2(expr, param0, param1)
#define assertionEquals(lhs, rhs)
#define assertion1(expr, param)
#define assertionMsg(expr, message)
#define assertion(expr)
#define logDebug(methodName, logMacroMessageStream)
Definition Log.h:50
#define logWarning(methodName, logMacroMessageStream)
Wrapper macro around tarch::tarch::logging::Log to improve logging.
Definition Log.h:440
Log Device.
Definition Log.h:516
int _semaphoreTag
Tag used to exchange locking MPI messages.
tarch::multicore::BooleanSemaphore _mapAccessSemaphore
Semaphore for the global master's map of locks.
virtual void receiveDanglingMessages() override
Receive any lock-related dangling messages.
bool tryLockSemaphoreOnGlobalMaster(int number, int forRank)
Try to lock a semaphore.
tarch::multicore::BooleanSemaphore _reserverationRequestsSemaphore
std::vector< std::pair< int, int > > _pendingLockRequests
List of pending lock requests.
static BooleanSemaphoreService & getInstance()
Don't use this routine.
std::map< int, SemaphoreMapEntry > _map
Map of semaphores.
void unlockSemaphoreOnGlobalMaster(int number, int forRank)
Unlock semaphore on global master.
static tarch::logging::Log _log
void leaveCriticalSection()
Tells the semaphore that it is about to leave.
tarch::multicore::BooleanSemaphore _localRankLockRequestSemaphore
void enterCriticalSection()
Waits until I can enter the critical section.
BooleanSemaphore()
Create new boolean semaphore spanning all MPI ranks.
static int getGlobalMasterRank()
Get the global master.
Definition Rank.cpp:415
void triggerDeadlockTimeOut(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1, const std::string &comment="")
Triggers a time out and shuts down the cluster if a timeout is violated.
Definition Rank.cpp:124
void setDeadlockWarningTimeStamp()
Memorise global timeout.
Definition Rank.cpp:193
void writeTimeOutWarning(const std::string &className, const std::string &methodName, int communicationPartnerRank, int tag, int numberOfExpectedMessages=1)
Writes a warning if relevant.
Definition Rank.cpp:148
void setDeadlockTimeOutTimeStamp()
Definition Rank.cpp:198
static Rank & getInstance()
This operation returns the singleton instance.
Definition Rank.cpp:539
static int reserveFreeTag(const std::string &fullQualifiedMessageName, int numberOfTags=1)
Return a Free Tag.
Definition Rank.cpp:39
MPI_Comm getCommunicator() const
Definition Rank.cpp:545
Create a lock around a boolean semaphore region.
Definition Lock.h:19
virtual void receiveDanglingMessages() override
Answer to MPI Messages.
static ServiceRepository & getInstance()
void removeService(Service *const service)
This routine is thread-safe, i.e.
void addService(Service *const service, const std::string &name)
Add a new service.