6#include <condition_variable>
19 { t() } -> std::same_as<void>;
24 { t(ctx) } -> std::same_as<void>;
38template<
typename T,
typename State>
40 { t(
s) } -> std::same_as<void>;
43template<
typename T,
typename State>
45 { t(
s, ctx) } -> std::same_as<void>;
48template<
typename T,
typename State>
51template<
typename T,
typename State>
54template<
typename T,
typename State>
60 return Lambda([](
auto &generator) {
61 auto taskOpt = generator();
63 using TaskType = std::remove_reference_t<
decltype(taskOpt.value())>;
66 return taskOpt ? std::optional(
Lambda([](
auto &task,
auto &acc) { task(); }, taskOpt.value())) : std::nullopt;
68 return taskOpt ? std::optional(
Lambda([](
auto &task,
auto &acc,
auto &ctx) { task(ctx); }, taskOpt.value())) : std::nullopt;
69 }
else static_assert(
false);
75 template<
typename Val, StatefulTask<Val> Task>
82 }
else static_assert(
false);
86 template<StatelessTaskGenerator T>
93 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
95 using ValueType = std::invoke_result_t<StateInitF>;
99 auto accumulators = std::vector<ValueType>(numThreads);
101 #pragma omp parallel num_threads(numThreads) default(none) shared(numThreads, generator, accumulators, stateInitF)
102 #pragma omp single nowait
104 for (
int i = 0; i < numThreads; i++) {
105 #pragma omp task default(none) shared(stateInitF, accumulators) firstprivate(i)
106 accumulators[i] = stateInitF();
112 auto taskOpt = generator();
113 if (!taskOpt) [[unlikely]]
break;
115 auto task = taskOpt.value();
116 #pragma omp task default(none) shared(stateInitF, accumulators) firstprivate(task)
118 auto threadNum = omp_get_thread_num();
127 template<StatelessTaskGenerator T>
134 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
136 using ValueType = std::invoke_result_t<StateInitF>;
140 auto accumulators = std::vector<ValueType>(numThreads);
142 #pragma omp parallel num_threads(numThreads) default(none) shared(generator, stateInitF, accumulators)
144 auto threadId = omp_get_thread_num();
145 accumulators[threadId] = stateInitF();
148 auto taskOpt = generator();
149 if (!taskOpt) [[unlikely]]
break;
151 auto task = taskOpt.value();
160 auto numThreads = (
u32) omp_get_max_threads();
172 template<
typename Task,
typename TaskGenerator>
175 std::atomic_int32_t *producerThreadId) {
177 producerThreadId->store(-1, std::memory_order_relaxed);
182 auto taskOpt = (*generator)();
184 if (!taskOpt) [[unlikely]] {
186 producerThreadId->store(-1, std::memory_order_relaxed);
190 auto task = taskOpt.value();
192 auto success = channel->
put(task);
193 if (!success) [[unlikely]]
return task;
197 template<
typename Val, StatefulTask<Val> Task>
204 }
else static_assert(
false);
207 template<
typename ValueType, StatefulTaskGenerator<ValueType> TaskGenerator, StatefulTask<ValueType> Task,
typename StateInitF>
209 TaskGenerator *generator,
211 StateInitF stateInitF,
213 std::atomic_int32_t *producerThreadId) {
215 auto acc = stateInitF();
217 if (threadId == producerThreadId->load(std::memory_order_relaxed)) [[unlikely]]
goto I_AM_PRODUCER;
218 else goto I_AM_CONSUMER;
222 auto myNewTaskOpt =
createNewTasks(generator, channel, producerThreadId);
223 if (!myNewTaskOpt) [[unlikely]]
goto I_AM_CONSUMER;
225 producerThreadId->store(-1, std::memory_order_relaxed);
226 auto myNewTask = myNewTaskOpt.value();
229 auto currentProducerThreadId = producerThreadId->load(std::memory_order_relaxed);
230 while (currentProducerThreadId == -1) {
231 producerThreadId->compare_exchange_weak(currentProducerThreadId, threadId);
234 if (currentProducerThreadId != threadId) [[unlikely]]
goto I_AM_CONSUMER;
239 auto taskOpt = rx.getFast();
242 auto task = taskOpt.value();
247 if (channel->
isClosed()) [[unlikely]] {
249 taskOpt = rx.getOther();
251 auto task = taskOpt.value();
258 *retVal = std::move(acc);
263 auto someElseGenerator = producerThreadId->compare_exchange_weak(m1, threadId);
264 if (!someElseGenerator)
continue;
275 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
277 using ValueType = std::invoke_result_t<StateInitF>;
279 auto accumulators = std::vector<ValueType>(1);
280 accumulators[0] = stateInitF();
283 auto taskOpt = generator();
285 auto task = taskOpt.value();
292 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
294 using ValueType = std::invoke_result_t<StateInitF>;
295 using Task = std::invoke_result_t<T>::value_type;
298 auto accumulators = std::vector<ValueType>(numThreads);
300 std::atomic_int32_t producerThreadId = 0;
302 for (
i32 localHelperId = 1; localHelperId < numThreads; localHelperId++) {
303 auto runnable =
Lambda([](
auto localHelperId,
auto *generator,
auto *channel,
auto stateInitF,
auto *accumulator,
auto *producerThreadId) {
304 producerConsumerLoop<ValueType, T, Task, StateInitF>(localHelperId, generator, channel, stateInitF, accumulator, producerThreadId);
305 }, localHelperId, &generator, &channel, stateInitF, &accumulators[localHelperId], &producerThreadId);
310 producerConsumerLoop<ValueType, T, Task, StateInitF>(0, &generator, &channel, stateInitF, &accumulators[0], &producerThreadId);
317 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
319 using ValueType = std::invoke_result_t<StateInitF>;
321 auto accumulators = std::vector<ValueType>(numThreads);
323 for (
u32 localHelperId = 1; localHelperId < numThreads; localHelperId++) {
325 auto acc = stateInitF();
327 auto taskOpt = generator();
330 auto task = taskOpt.value();
333 accumulators[localHelperId] = acc;
337 auto &acc = accumulators[0];
340 auto taskOpt = generator();
343 auto task = taskOpt.value();
353 template<StatelessTaskGenerator T>
360 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
367 template<StatelessTaskGenerator T>
374 template<
typename StateInitF, StatefulTaskGenerator<std::invoke_result_t<StateInitF>> T>
applications::exahype2::acoustic::VariableShortcuts s
static void runParallelGenerator(T generator, u32 numThreads=0)
static auto runSerialGenerator(T generator, StateInitF stateInitF, u32 numThreads=0)
static void runSerialGenerator(T generator, u32 numThreads=0)
static auto runParallelGenerator(T generator, StateInitF stateInitF, u32 numThreads=0)
static u32 THREAD_COUNT(u32 numThreads=0)
static void invokeTask(Task &task, Val &val, i32 threadId)
Reader getReader(u32 consumerIdHint=0)
static void runParallelGenerator(T generator, u32 numThreads=0)
static auto runParallelGeneratorParallel(T &generator, StateInitF stateInitF, u32 numThreads)
static auto runParallelGenerator(T generator, StateInitF stateInitF, u32 numThreads=0)
static ThreadPool & WORKER_POOL()
static auto runSerialGenerator(T generator, StateInitF stateInitF, u32 numThreads=0)
static u32 THREAD_COUNT(u32 numThreads=0)
static auto runSerialGeneratorParallel(T generator, StateInitF stateInitF, u32 numThreads)
static void invokeTask(Task &task, Val &val, i32 threadId)
static void runSerialGenerator(T generator, u32 numThreads=0)
static void producerConsumerLoop(i32 threadId, TaskGenerator *generator, SPMCChannel< Task > *channel, StateInitF stateInitF, ValueType *retVal, std::atomic_int32_t *producerThreadId)
static auto runSerialGeneratorSerial(T generator, StateInitF stateInitF)
static std::optional< Task > createNewTasks(TaskGenerator *generator, SPMCChannel< Task > *channel, std::atomic_int32_t *producerThreadId)
void dispatch(const std::function< void()> &work, u32 numThreads=0)
StatefulTaskGenerator< Any > auto ToStatefulGenerator(StatelessTaskGenerator auto generator)