BLI_task: Add new 'BLI_task_parallel_range_finalize()'.
Together with the extended loop callback and userdata_chunk, this allows to perform cumulative tasks (like aggregation) in a lockfree way using local userdata_chunk to store temp data, and once all workers have finished, to merge those userdata_chunks in the finalize callback (from calling thread, so no need to lock here either). Note that this changes how userdata_chunk is handled (now fully from 'main' thread, which means a given worker thread will always get the same userdata_chunk, without being re-initialized anymore to init value at start of each iter chunk).
This commit is contained in:
parent
5a7429c363
commit
688858d3a8
|
@ -119,11 +119,13 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool);
|
|||
/* Parallel for routines */
|
||||
typedef void (*TaskParallelRangeFunc)(void *userdata, const int iter);
|
||||
typedef void (*TaskParallelRangeFuncEx)(void *userdata, void *userdata_chunk, const int iter, const int thread_id);
|
||||
typedef void (*TaskParallelRangeFuncFinalize)(void *userdata, void *userdata_chunk);
|
||||
void BLI_task_parallel_range_ex(
|
||||
int start, int stop,
|
||||
void *userdata,
|
||||
void *userdata_chunk,
|
||||
const size_t userdata_chunk_size, TaskParallelRangeFuncEx func_ex,
|
||||
const size_t userdata_chunk_size,
|
||||
TaskParallelRangeFuncEx func_ex,
|
||||
const bool use_threading,
|
||||
const bool use_dynamic_scheduling);
|
||||
void BLI_task_parallel_range(
|
||||
|
@ -132,6 +134,16 @@ void BLI_task_parallel_range(
|
|||
TaskParallelRangeFunc func,
|
||||
const bool use_threading);
|
||||
|
||||
void BLI_task_parallel_range_finalize(
|
||||
int start, int stop,
|
||||
void *userdata,
|
||||
void *userdata_chunk,
|
||||
const size_t userdata_chunk_size,
|
||||
TaskParallelRangeFuncEx func_ex,
|
||||
TaskParallelRangeFuncFinalize func_finalize,
|
||||
const bool use_threading,
|
||||
const bool use_dynamic_scheduling);
|
||||
|
||||
typedef void (*TaskParallelListbaseFunc)(void *userdata,
|
||||
struct Link *iter,
|
||||
int index);
|
||||
|
|
|
@ -768,8 +768,6 @@ size_t BLI_task_pool_tasks_done(TaskPool *pool)
|
|||
typedef struct ParallelRangeState {
|
||||
int start, stop;
|
||||
void *userdata;
|
||||
void *userdata_chunk;
|
||||
size_t userdata_chunk_size;
|
||||
|
||||
TaskParallelRangeFunc func;
|
||||
TaskParallelRangeFuncEx func_ex;
|
||||
|
@ -792,24 +790,16 @@ BLI_INLINE bool parallel_range_next_iter_get(
|
|||
|
||||
static void parallel_range_func(
|
||||
TaskPool * __restrict pool,
|
||||
void *UNUSED(taskdata),
|
||||
void *userdata_chunk,
|
||||
int threadid)
|
||||
{
|
||||
ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool);
|
||||
int iter, count;
|
||||
|
||||
const bool use_userdata_chunk = (state->func_ex != NULL) &&
|
||||
(state->userdata_chunk_size != 0) && (state->userdata_chunk != NULL);
|
||||
void *userdata_chunk = use_userdata_chunk ? MALLOCA(state->userdata_chunk_size) : NULL;
|
||||
|
||||
while (parallel_range_next_iter_get(state, &iter, &count)) {
|
||||
int i;
|
||||
|
||||
if (state->func_ex) {
|
||||
if (use_userdata_chunk) {
|
||||
memcpy(userdata_chunk, state->userdata_chunk, state->userdata_chunk_size);
|
||||
}
|
||||
|
||||
for (i = 0; i < count; ++i) {
|
||||
state->func_ex(state->userdata, userdata_chunk, iter + i, threadid);
|
||||
}
|
||||
|
@ -820,8 +810,6 @@ static void parallel_range_func(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
MALLOCA_FREE(userdata_chunk, state->userdata_chunk_size);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -836,6 +824,7 @@ static void task_parallel_range_ex(
|
|||
const size_t userdata_chunk_size,
|
||||
TaskParallelRangeFunc func,
|
||||
TaskParallelRangeFuncEx func_ex,
|
||||
TaskParallelRangeFuncFinalize func_finalize,
|
||||
const bool use_threading,
|
||||
const bool use_dynamic_scheduling)
|
||||
{
|
||||
|
@ -844,6 +833,10 @@ static void task_parallel_range_ex(
|
|||
ParallelRangeState state;
|
||||
int i, num_threads, num_tasks;
|
||||
|
||||
void *userdata_chunk_local = NULL;
|
||||
void *userdata_chunk_array = NULL;
|
||||
const bool use_userdata_chunk = (func_ex != NULL) && (userdata_chunk_size != 0) && (userdata_chunk != NULL);
|
||||
|
||||
if (start == stop) {
|
||||
return;
|
||||
}
|
||||
|
@ -859,9 +852,6 @@ static void task_parallel_range_ex(
|
|||
*/
|
||||
if (!use_threading) {
|
||||
if (func_ex) {
|
||||
const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
|
||||
void *userdata_chunk_local = NULL;
|
||||
|
||||
if (use_userdata_chunk) {
|
||||
userdata_chunk_local = MALLOCA(userdata_chunk_size);
|
||||
memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
|
||||
|
@ -871,6 +861,10 @@ static void task_parallel_range_ex(
|
|||
func_ex(userdata, userdata_chunk, i, 0);
|
||||
}
|
||||
|
||||
if (func_finalize) {
|
||||
func_finalize(userdata, userdata_chunk);
|
||||
}
|
||||
|
||||
MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
|
||||
}
|
||||
else {
|
||||
|
@ -895,8 +889,6 @@ static void task_parallel_range_ex(
|
|||
state.start = start;
|
||||
state.stop = stop;
|
||||
state.userdata = userdata;
|
||||
state.userdata_chunk = userdata_chunk;
|
||||
state.userdata_chunk_size = userdata_chunk_size;
|
||||
state.func = func;
|
||||
state.func_ex = func_ex;
|
||||
state.iter = start;
|
||||
|
@ -910,15 +902,34 @@ static void task_parallel_range_ex(
|
|||
num_tasks = min_ii(num_tasks, (stop - start) / state.chunk_size);
|
||||
atomic_fetch_and_add_uint32((uint32_t *)(&state.iter), 0);
|
||||
|
||||
if (use_userdata_chunk) {
|
||||
userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
|
||||
}
|
||||
|
||||
for (i = 0; i < num_tasks; i++) {
|
||||
if (use_userdata_chunk) {
|
||||
userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
|
||||
memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
|
||||
}
|
||||
/* Use this pool's pre-allocated tasks. */
|
||||
BLI_task_pool_push_from_thread(task_pool,
|
||||
parallel_range_func,
|
||||
NULL, false,
|
||||
userdata_chunk_local, false,
|
||||
TASK_PRIORITY_HIGH, 0);
|
||||
}
|
||||
|
||||
BLI_task_pool_work_and_wait(task_pool);
|
||||
BLI_task_pool_free(task_pool);
|
||||
|
||||
if (use_userdata_chunk) {
|
||||
if (func_finalize) {
|
||||
for (i = 0; i < num_tasks; i++) {
|
||||
userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
|
||||
func_finalize(userdata, userdata_chunk_local);
|
||||
}
|
||||
}
|
||||
MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -946,7 +957,7 @@ void BLI_task_parallel_range_ex(
|
|||
const bool use_dynamic_scheduling)
|
||||
{
|
||||
task_parallel_range_ex(
|
||||
start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex,
|
||||
start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, NULL,
|
||||
use_threading, use_dynamic_scheduling);
|
||||
}
|
||||
|
||||
|
@ -967,7 +978,39 @@ void BLI_task_parallel_range(
|
|||
TaskParallelRangeFunc func,
|
||||
const bool use_threading)
|
||||
{
|
||||
task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, use_threading, false);
|
||||
task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, NULL, use_threading, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement,
|
||||
* with an additional 'finalize' func called from calling thread once whole range have been processed.
|
||||
*
|
||||
* \param start First index to process.
|
||||
* \param stop Index to stop looping (excluded).
|
||||
* \param userdata Common userdata passed to all instances of \a func.
|
||||
* \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data
|
||||
* (similar to OpenMP's firstprivate).
|
||||
* \param userdata_chunk_size Memory size of \a userdata_chunk.
|
||||
* \param func_ex Callback function (advanced version).
|
||||
* \param func_finalize Callback function, called after all workers have finisehd, useful to finalize accumulative tasks.
|
||||
* \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop
|
||||
* (allows caller to use any kind of test to switch on parallelization or not).
|
||||
* \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently),
|
||||
* otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently).
|
||||
*/
|
||||
void BLI_task_parallel_range_finalize(
|
||||
int start, int stop,
|
||||
void *userdata,
|
||||
void *userdata_chunk,
|
||||
const size_t userdata_chunk_size,
|
||||
TaskParallelRangeFuncEx func_ex,
|
||||
TaskParallelRangeFuncFinalize func_finalize,
|
||||
const bool use_threading,
|
||||
const bool use_dynamic_scheduling)
|
||||
{
|
||||
task_parallel_range_ex(
|
||||
start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, func_finalize,
|
||||
use_threading, use_dynamic_scheduling);
|
||||
}
|
||||
|
||||
#undef MALLOCA
|
||||
|
|
Loading…
Reference in New Issue