Peano 4
Loading...
Searching...
No Matches
Tasks.cpp
Go to the documentation of this file.
1#include "Tasks.h"
2
3#include <queue>
4#include <set>
5#include <thread>
6
7#include "BooleanSemaphore.h"
8#include "config.h"
9#include "Core.h"
10#include "Lock.h"
11#include "multicore.h"
12#include "tarch/Assertions.h"
17
18#ifdef UseSmartMPI
19#include "smartmpi.h"
20#endif
21
22namespace {
26 struct NonblockingTasks {
27 NonblockingTasks() = default;
28 ~NonblockingTasks() = default;
29 NonblockingTasks(const NonblockingTasks&) = delete;
30
31 typedef std::list<tarch::multicore::Task*> Tasks;
32 Tasks tasks;
34 };
35
36 NonblockingTasks globalNonblockingTasks;
37 std::vector<NonblockingTasks*> localNonblockingTasks;
38
40
41 tarch::logging::Log _log("tarch::multicore");
42
43 const std::string BSPConcurrencyLevelStatisticsIdentifier("tarch::multicore::bsp-concurrency-level");
44 const std::string GlobalPendingTasksStatisticsIdentifier("tarch::multicore::global-pending-tasks");
45 const std::string ThreadLocalPendingTasksStatisticsIdentifier("tarch::multicore::thread-local-pending-tasks");
46 const std::string FuseTasksStatisticsIdentifier("tarch::multicore::fuse-tasks");
47
57 bool processOnePendingTaskLIFO() {
58 tarch::multicore::Task* myTask = nullptr;
59
60 tarch::multicore::Lock lock(globalNonblockingTasks.semaphore);
61 if (not globalNonblockingTasks.tasks.empty()) {
62 myTask = globalNonblockingTasks.tasks.back();
63 globalNonblockingTasks.tasks.pop_back();
64 }
65 lock.free();
66
67 if (myTask != nullptr) {
68 bool requeue = myTask->run();
69 if (requeue) {
70 spawnTask(myTask);
71 } else {
72 delete myTask;
73 }
74 }
75 return myTask != nullptr;
76 }
77
78 bool processOnePendingTaskFIFO() {
79 tarch::multicore::Task* myTask = nullptr;
80
81 tarch::multicore::Lock lock(globalNonblockingTasks.semaphore);
82 if (not globalNonblockingTasks.tasks.empty()) {
83 myTask = globalNonblockingTasks.tasks.front();
84 globalNonblockingTasks.tasks.pop_front();
85 }
86 lock.free();
87
88 if (myTask != nullptr) {
89 bool requeue = myTask->run();
90 if (requeue) {
91 spawnTask(myTask);
92 } else {
93 delete myTask;
94 }
95 return true;
96 } else {
97 return false;
98 }
99 }
100} // namespace
101
102
107 NonblockingTasks::Tasks extractedTasks;
108
109 tarch::multicore::Lock lock(globalNonblockingTasks.semaphore);
110 maxTasks = std::min(static_cast<int>(globalNonblockingTasks.tasks.size()), maxTasks);
111 NonblockingTasks::Tasks::iterator cutIteration = globalNonblockingTasks.tasks.begin();
112 std::advance(cutIteration, globalNonblockingTasks.tasks.size() - maxTasks);
113 extractedTasks.splice(extractedTasks.begin(), globalNonblockingTasks.tasks, cutIteration, globalNonblockingTasks.tasks.end());
114 lock.free();
115
116 for (NonblockingTasks::Tasks::reverse_iterator task = extractedTasks.rbegin(); task != extractedTasks.rend(); task++) {
117 tarch::multicore::native::spawnTask(*task, std::set<TaskNumber>(), std::set<TaskNumber>(), NoDependencies);
118 }
119
120 return extractedTasks.size();
121}
122
124 logDebug("fusePendingTasks(int)", "process " << globalNonblockingTasks.tasks.size() << " tasks subject to " << fuseInstruction.toString());
125
127
128 tarch::multicore::Task* myTask = nullptr;
129 std::list<tarch::multicore::Task*> tasksOfSameType;
130
131 tarch::multicore::Lock lock(globalNonblockingTasks.semaphore);
132 if (not globalNonblockingTasks.tasks.empty()) {
133 myTask = globalNonblockingTasks.tasks.front();
134 globalNonblockingTasks.tasks.pop_front();
135 }
136
137 if (myTask != nullptr and myTask->canFuse()) {
138 auto pp = globalNonblockingTasks.tasks.begin();
139 while (
140 pp != globalNonblockingTasks.tasks.end() and (*pp)->getTaskType() == myTask->getTaskType() and /*(*pp)->getTaskType() == myTask->canFuse()*/ (*pp)->canFuse()
141 and tasksOfSameType.size() < fuseInstruction.maxTasks - 1
142 ) {
143 tasksOfSameType.push_back(*pp);
144 pp = globalNonblockingTasks.tasks.erase(pp);
145 }
146 lock.free();
147
148 ::tarch::logging::Statistics::getInstance().log(FuseTasksStatisticsIdentifier, tasksOfSameType.size());
149
150 native::processFusedTask(myTask, tasksOfSameType, fuseInstruction.device);
151 } else if (myTask != nullptr and not myTask->canFuse()) {
152 lock.free();
153 tarch::multicore::native::spawnTask(myTask, std::set<TaskNumber>(), std::set<TaskNumber>(), NoDependencies);
154 }
155
156 return myTask == nullptr ? 0 : tasksOfSameType.size() + 1;
157}
158
160 assertion(orchestrationStrategy != nullptr);
161 assertion(realisation != nullptr);
162
163 delete orchestrationStrategy;
164 orchestrationStrategy = realisation;
165}
166
168 assertion(orchestrationStrategy != nullptr);
169 assertion(realisation != nullptr);
170
171 tarch::multicore::orchestration::Strategy* result = orchestrationStrategy;
172 orchestrationStrategy = realisation;
173 return result;
174}
175
176bool operator<(const tarch::multicore::Task& lhs, const tarch::multicore::Task& rhs) { return lhs.getPriority() < rhs.getPriority(); }
177
178bool tarch::multicore::TaskComparison::operator()(const Task& lhs, const Task& rhs) const { return lhs < rhs; }
179
180bool tarch::multicore::TaskComparison::operator()(Task* lhs, Task* rhs) const { return *lhs < *rhs; }
181
182tarch::multicore::Task::Task(int taskType, int priority):
183 _taskType(taskType),
184 _priority(priority) {
185 assertion2(priority >= 0, taskType, priority);
186}
187
188bool tarch::multicore::Task::canFuse() const { return _taskType != DontFuse; }
189
190int tarch::multicore::Task::getPriority() const { return _priority; }
191
193 assertion3(priority >= 0, _taskType, _priority, priority);
194 _priority = priority;
195}
196
197int tarch::multicore::Task::getTaskType() const { return _taskType; }
198
199bool tarch::multicore::Task::fuse(const std::list<Task*>& otherTasks, int device) {
200 assertion(canFuse());
201 for (auto pp : otherTasks) {
202 tarch::multicore::Task* currentTask = pp;
203 while (currentTask->run()) {
204 }
205 delete currentTask;
206 }
207 return true;
208}
209
210tarch::multicore::TaskWithCopyOfFunctor::TaskWithCopyOfFunctor(int taskType, int priority, const std::function<bool()>& taskFunctor):
211 Task(taskType, priority),
212 _taskFunctor(taskFunctor) {
214 OTTER_DEFINE_TASK(this_task, step_task, otter_add_to_pool, otter::label::TaskWithCopyOfFunctor);
215}
216
219 OTTER_TASK_START(this_task);
220 bool result = _taskFunctor();
221 OTTER_TASK_END(this_task);
222 return result;
223}
224
225tarch::multicore::TaskWithoutCopyOfFunctor::TaskWithoutCopyOfFunctor(int taskType, int priority, std::function<bool()>& taskFunctor):
226 Task(taskType, priority),
227 _taskFunctor(taskFunctor) {
229 OTTER_DEFINE_TASK(this_task, step_task, otter_add_to_pool, otter::label::TaskWithoutCopyOfFunctor);
230}
231
234 OTTER_TASK_START(this_task);
235 bool result = _taskFunctor();
236 OTTER_TASK_END(this_task);
237 return result;
238}
239
240bool tarch::multicore::processPendingTasks(int maxTasks, bool fifo) {
241 assertion(maxTasks >= 0);
242
243 bool result = false;
244 while (maxTasks > 0) {
245 if (fifo) {
246 maxTasks--;
247 result |= processOnePendingTaskFIFO();
248 } else {
249 maxTasks--;
250 result |= processOnePendingTaskLIFO();
251 }
252
253 if (not result and maxTasks == 0) {
255 }
256 }
257
258 return result;
259}
260
261
263 Task* task,
264 const std::set<TaskNumber>& inDependencies,
265 const std::set<TaskNumber>& conflicts,
266 const TaskNumber& taskNumber
267) {
268 assertion(task != nullptr);
269
270 if (taskNumber == NoDependencies and inDependencies.empty() and conflicts.empty()) {
271 int threadNumber = Core::getInstance().getThreadNumber();
272 assertion1(threadNumber >= 0, threadNumber);
273 assertion2(threadNumber < localNonblockingTasks.size(), threadNumber, localNonblockingTasks.size());
274
275#ifdef UseSmartMPI
276 if (task->isSmartMPITask() and smartmpi::spawn(dynamic_cast<smartmpi::Task*>(task))) {
277 } else
278#endif
279 if (localNonblockingTasks[threadNumber]->tasks.size() >= orchestrationStrategy->getNumberOfTasksToHoldBack(task->getTaskType())) {
280 logDebug("spawnTasks(int)", "spawn native task of type " << task->getTaskType());
281 native::spawnTask(task, std::set<TaskNumber>(), std::set<TaskNumber>(), NoDependencies);
282 } else {
283 tarch::multicore::Lock lock(localNonblockingTasks[threadNumber]->semaphore);
284 localNonblockingTasks[threadNumber]->tasks.push_back(task);
285 lock.free();
286
287 ::tarch::logging::Statistics::getInstance().log(ThreadLocalPendingTasksStatisticsIdentifier, localNonblockingTasks[threadNumber]->tasks.size());
288
289 logDebug("spawnTask(...)", "enqueued task (#tasks=" << localNonblockingTasks[threadNumber]->tasks.size() << ")");
290
291 auto fusionCommand = orchestrationStrategy->getNumberOfTasksToFuseAndTargetDevice(task->getTaskType());
292 if (localNonblockingTasks[threadNumber]->tasks.size() >= fusionCommand.minTasks and orchestrationStrategy->fuseTasksImmediatelyWhenSpawned(task->getTaskType())) {
294 internal::fusePendingTasks(fusionCommand);
295 }
296 }
297 } else {
298 tarch::multicore::native::spawnTask(task, inDependencies, conflicts, taskNumber);
299 }
300}
301
302void tarch::multicore::spawnAndWait(const std::vector<Task*>& tasks) {
303 static tarch::logging::Log _log("tarch::multicore");
304
305 if (not tasks.empty()) {
306 static int nestedSpawnAndWaits = 0;
307 static tarch::multicore::BooleanSemaphore nestingSemaphore;
308
309 tarch::multicore::Lock nestingLock(nestingSemaphore, false);
310 nestingLock.lock();
311 nestedSpawnAndWaits++;
312 const int localNestedSpawnAndWaits = nestedSpawnAndWaits;
313 nestingLock.free();
314
315 orchestrationStrategy->startBSPSection(localNestedSpawnAndWaits);
316
317 switch (orchestrationStrategy->paralleliseForkJoinSection(localNestedSpawnAndWaits, tasks.size(), tasks[0]->getTaskType())) {
319 for (auto& p : tasks) {
320 while (p->run()) {
322 }
323 delete p;
324 }
325 } break;
327 ::tarch::logging::Statistics::getInstance().inc(BSPConcurrencyLevelStatisticsIdentifier, static_cast<int>(tasks.size()), true);
329 ::tarch::logging::Statistics::getInstance().inc(BSPConcurrencyLevelStatisticsIdentifier, -static_cast<int>(tasks.size()), true);
330
334
335 bool successInFusing = true;
336 while (successInFusing and globalNonblockingTasks.tasks.size() >= fusionInstruction.minTasks) {
337 successInFusing = tarch::multicore::internal::fusePendingTasks(fusionInstruction);
338 }
339
340 if (not globalNonblockingTasks.tasks.empty()) {
341 int numberOfTasksToProcessNow = std::max(
342 0, static_cast<int>(globalNonblockingTasks.tasks.size()) - orchestrationStrategy->getNumberOfTasksToHoldBack(tarch::multicore::orchestration::Strategy::EndOfBSPSection)
343 );
344 internal::mapPendingTasksOntoNativeTasks(numberOfTasksToProcessNow);
345 }
346 } break;
348 ::tarch::logging::Statistics::getInstance().inc(BSPConcurrencyLevelStatisticsIdentifier, static_cast<int>(tasks.size()), true);
350 ::tarch::logging::Statistics::getInstance().inc(BSPConcurrencyLevelStatisticsIdentifier, -static_cast<int>(tasks.size()), true);
351
352 } break;
353 }
354
355 orchestrationStrategy->endBSPSection(localNestedSpawnAndWaits);
356
357 nestingLock.lock();
358 nestedSpawnAndWaits--;
359 nestingLock.free();
360 }
361}
362
364 while (localNonblockingTasks.size() < numberOfThreads) {
365 localNonblockingTasks.push_back(new NonblockingTasks());
366 }
367}
368
370 for (int i = 0; i < localNonblockingTasks.size(); i++) {
372 }
373}
374
376 assertion(threadNumber >= 0);
377
378 ::tarch::logging::Statistics::getInstance().log(ThreadLocalPendingTasksStatisticsIdentifier, localNonblockingTasks[threadNumber]->tasks.size());
379 tarch::multicore::Lock globalLock(globalNonblockingTasks.semaphore);
380 tarch::multicore::Lock localLock(localNonblockingTasks[threadNumber]->semaphore);
381 std::copy(localNonblockingTasks[threadNumber]->tasks.begin(), localNonblockingTasks[threadNumber]->tasks.end(), std::back_inserter(globalNonblockingTasks.tasks));
382 logDebug(
383 "copyInternalTaskQueuesOverIntoGlobalQueue()",
384 "added "
385 << localNonblockingTasks[threadNumber]->tasks.size() << " tasks to global tasks. #global pending tasks=" << tarch::multicore::internal::getNumberOfWithholdPendingTasks()
386 );
387 localNonblockingTasks[threadNumber]->tasks.clear();
389}
390
391int tarch::multicore::internal::getNumberOfWithholdPendingTasks() { return globalNonblockingTasks.tasks.size(); }
392
393
395
396
397void tarch::multicore::waitForTasks(const std::set<TaskNumber>& inDependencies) { tarch::multicore::native::waitForTasks(inDependencies); }
398
399
400void tarch::multicore::waitForTask(const int taskNumber) {
401 std::set<int> tmp;
402 tmp.insert(taskNumber);
403 waitForTasks(tmp);
404}
405
406
407#ifndef SharedMemoryParallelisation
408
409#include <thread>
410
411
412void tarch::multicore::native::spawnAndWaitAsTaskLoop(const std::vector<Task*>& tasks) {
413 for (auto& p : tasks) {
414 while (p->run()) {
415 }
416 delete p;
417 }
418}
419
420void tarch::multicore::native::processFusedTask(Task* myTask, const std::list<tarch::multicore::Task*>& tasksOfSameType, int device) {
421 bool stillExecuteLocally = myTask->fuse(tasksOfSameType, device);
422 if (stillExecuteLocally) {
423 tarch::multicore::native::spawnTask(myTask, std::set<TaskNumber>(), std::set<TaskNumber>(), NoDependencies);
424 } else {
425 delete myTask;
426 }
427}
428
429
431 Task* job,
432 const std::set<TaskNumber>& inDependencies,
433 const std::set<TaskNumber>& conflicts,
434 const TaskNumber& taskNumber
435) {
436 while (job->run()) {
437 }
438 delete job;
439}
440
441
442void tarch::multicore::native::waitForTasks(const std::set<TaskNumber>& inDependencies) {}
443
444
445void tarch::multicore::native::waitForAllTasks() { logDebug("waitForAllTasks()", "wait"); }
446
447#endif
#define assertion2(expr, param0, param1)
#define assertion3(expr, param0, param1, param2)
#define assertion1(expr, param)
#define assertion(expr)
#define logDebug(methodName, logMacroMessageStream)
Definition Log.h:50
bool operator<(const tarch::multicore::Task &lhs, const tarch::multicore::Task &rhs)
Definition Tasks.cpp:176
Log Device.
Definition Log.h:516
void inc(const std::string &identifier, double value=1.0, bool disableSampling=false)
Definition Statistics.h:92
void log(const std::string &identifier, double value, bool disableSampling=false)
Log one particular value.
Definition Statistics.h:90
static Statistics & getInstance()
This is not the canonical realisation of singletons as I use it usually for stats in Peano.
static Core & getInstance()
Definition Core.cpp:55
int getThreadNumber() const
Definition Core.cpp:77
void yield()
Wrapper around backend-specific yield.
Definition Core.cpp:79
Create a lock around a boolean semaphore region.
Definition Lock.h:19
bool operator()(const Task &lhs, const Task &rhs) const
Definition Tasks.cpp:178
virtual bool run() override
Definition Tasks.cpp:217
Abstract super class for a job.
Definition Tasks.h:37
virtual bool canFuse() const
Definition Tasks.cpp:188
int getPriority() const
Definition Tasks.cpp:190
virtual bool run()=0
Task(int taskType, int priority)
Construct task.
Definition Tasks.cpp:182
int getTaskType() const
Definition Tasks.cpp:197
virtual bool fuse(const std::list< Task * > &otherTasks, int targetDevice=Host)
Fuse multiple tasks.
Definition Tasks.cpp:199
void setPriority(int priority)
Set priority.
Definition Tasks.cpp:192
Interface for any task orchestration.
Definition Strategy.h:23
virtual ExecutionPolicy paralleliseForkJoinSection(int nestedParallelismLevel, int numberOfTasks, int taskType)=0
Determine how to handle/realise parallelisation within fork/join region.
virtual FuseInstruction getNumberOfTasksToFuseAndTargetDevice(int taskType)=0
How many tasks to fuse and to which device to deploy.
virtual void startBSPSection(int nestedParallelismLevel)=0
Notifies the strategy that we enter a BSP section.
virtual void endBSPSection(int nestedParallelismLevel)=0
virtual bool fuseTasksImmediatelyWhenSpawned(int taskType)=0
If you set this flag, tasks are immediately fused.
virtual int getNumberOfTasksToHoldBack(int taskType)=0
How many tasks shall system hold back from tasking runtime in user-defined queues.
static const char * TaskWithCopyOfFunctor
Definition otter.h:111
static const char * TaskWithoutCopyOfFunctor
Definition otter.h:112
static const char * step
Definition otter.h:106
int getNumberOfWithholdPendingTasks()
Definition Tasks.cpp:391
void copyInternalTaskQueuesOverIntoGlobalQueue()
Commit local task queues into global queue.
Definition Tasks.cpp:369
bool fusePendingTasks(const tarch::multicore::orchestration::Strategy::FuseInstruction &fuseInstruction)
The routine searches through the global task queue and tries to fuse tasks.
Definition Tasks.cpp:123
void configureInternalTaskQueues(int numberOfTasks)
Set up internal queues.
Definition Tasks.cpp:363
void copyInternalTaskQueueOverIntoGlobalQueue(int threadNumber)
Definition Tasks.cpp:375
int mapPendingTasksOntoNativeTasks(int maxTasks)
Take up to maxTasks tasks that are internally buffered and throw them into the native runtime.
Definition Tasks.cpp:106
void processFusedTask(Task *firstTask, const std::list< tarch::multicore::Task * > &otherTasks, int device)
Process a fused task.
Definition Tasks.cpp:183
void spawnAndWaitAsTaskLoop(const std::vector< tarch::multicore::Task * > &tasks)
Map onto native tasking.
Definition Tasks.cpp:107
void waitForAllTasks()
Slightly different than the umbrella version in the general namespace.
void spawnTask(Task *task, const std::set< TaskNumber > &inDependencies, const std::set< TaskNumber > &conflicts, const TaskNumber &taskNumber)
Spawn a new task into the tasking backend.
void waitForTasks(const std::set< TaskNumber > &inDependencies)
Wait for other task.
Strategy * createDefaultStrategy()
My default strategy is BackfillAndDeployRoundRobin() with the default to fuse 16 tasks into one (GPU)...
bool processPendingTasks(int maxTasks=std::numeric_limits< int >::max(), bool fifo=true)
Process a few tasks from my backlog of tasks.
Definition Tasks.cpp:240
void waitForTask(const int taskNumber)
Wrapper around waitForTasks() with a single-element set.
Definition Tasks.cpp:400
void waitForTasks(const std::set< TaskNumber > &inDependencies)
Wait for set of tasks.
Definition Tasks.cpp:397
constexpr TaskNumber NoDependencies
Definition Tasks.h:21
void spawnTask(Task *task, const std::set< TaskNumber > &inDependencies=std::set< TaskNumber >(), const std::set< TaskNumber > &conflicts=std::set< TaskNumber >(), const TaskNumber &taskNumber=NoDependencies)
Spawns a single task in a non-blocking fashion.
Definition Tasks.cpp:262
void setOrchestration(tarch::multicore::orchestration::Strategy *realisation)
Definition Tasks.cpp:159
void spawnAndWait(const std::vector< Task * > &tasks)
Fork-join task submission pattern.
Definition Tasks.cpp:302
void waitForAllTasks()
Wait for all tasks which have been spawned by spawnTask.
Definition Tasks.cpp:394
tarch::multicore::orchestration::Strategy * swapOrchestration(tarch::multicore::orchestration::Strategy *realisation)
Swap the active orchestration.
Definition Tasks.cpp:167
#define OTTER_POOL_DECL_POP(...)
Definition otter.h:138
#define OTTER_TASK_END(...)
Definition otter.h:141
#define OTTER_POOL_DECL_BORROW(...)
Definition otter.h:139
#define OTTER_TASK_START(...)
Definition otter.h:140
#define OTTER_DEFINE_TASK(...)
Definition otter.h:133
tarch::logging::Log _log("examples::unittests")