BLI_task: Add new generic `BLI_task_parallel_iterator()`.

This new function is part of the 'parallel for loops' functions. It
takes an iterator callback to generate items to be processed, in
addition to the usual 'process' func callback.

This allows to use common code from BLI_task for a wide range of custom
iteratiors, whithout having to re-invent the wheel of the whole tasks &
data chuncks handling.

This supports all settings features from `BLI_task_parallel_range()`,
including dynamic and static (if total number of items is knwon)
scheduling, TLS data and its finalize callback, etc.

One question here is whether we should provide usercode with a spinlock
by default, or enforce it to always handle its own sync mechanism.
I kept it, since imho it will be needed very often, and generating one
is pretty cheap even if unused...

----------

Additionaly, this commit converts (currently unused)
`BLI_task_parallel_listbase()` to use that generic code. This was done
mostly as proof of concept, but performance-wise it shows some
interesting data, roughly:
 - Very light processing (that should not be threaded anyway) is several
   times slower, which is expected due to more overhead in loop management
   code.
 - Heavier processing can be up to 10% quicker (probably thanks to the
   switch from dynamic to static scheduling, which reduces a lot locking
   to fill-in the per-tasks chunks of data). Similar speed-up in
   non-threaded case comes as a surprise though, not sure what can
   explain that.

While this conversion is not really needed, imho we should keep it
(instead of existing code for that function), it's easier to have
complex handling logic in as few places as possible, for maintaining and
for improving it.

Note: That work was initially done to allow for D5372 to be possible... Unfortunately that one proved to be not better  than orig code on performances point of view.

Reviewed By: sergey

Differential Revision: https://developer.blender.org/D5371
This commit is contained in:
Bastien Montagne 2019-10-30 12:23:45 +01:00 committed by Bastien Montagne
parent 3e7af19bf1
commit 29433da4c6
4 changed files with 332 additions and 112 deletions

View File

@ -198,11 +198,45 @@ void BLI_task_parallel_range(const int start,
TaskParallelRangeFunc func,
const TaskParallelSettings *settings);
typedef void (*TaskParallelListbaseFunc)(void *userdata, struct Link *iter, int index);
/* This data is shared between all tasks, its access needs thread lock or similar protection. */
typedef struct TaskParallelIteratorStateShared {
/* Maximum amount of items to acquire at once. */
int chunk_size;
/* Next item to be acquired. */
void *next_item;
/* Index of the next item to be acquired. */
int next_index;
/* Indicates that end of iteration has been reached. */
bool is_finished;
/* Helper lock to protect access to this data in iterator getter callback,
* can be ignored (if the callback implements its own protection system, using atomics e.g.).
* Will be NULL when iterator is actually processed in a single thread. */
SpinLock *spin_lock;
} TaskParallelIteratorStateShared;
typedef void (*TaskParallelIteratorIterFunc)(void *__restrict userdata,
const TaskParallelTLS *__restrict tls,
void **r_next_item,
int *r_next_index,
bool *r_do_abort);
typedef void (*TaskParallelIteratorFunc)(void *__restrict userdata,
void *item,
int index,
const TaskParallelTLS *__restrict tls);
void BLI_task_parallel_iterator(void *userdata,
TaskParallelIteratorIterFunc iter_func,
void *init_item,
const int init_index,
const int tot_items,
TaskParallelIteratorFunc func,
const TaskParallelSettings *settings);
void BLI_task_parallel_listbase(struct ListBase *listbase,
void *userdata,
TaskParallelListbaseFunc func,
const bool use_threading);
TaskParallelIteratorFunc func,
const TaskParallelSettings *settings);
typedef struct MempoolIterData MempoolIterData;
typedef void (*TaskParallelMempoolFunc)(void *userdata, MempoolIterData *iter);

View File

@ -149,7 +149,7 @@ typedef struct TaskThreadLocalStorage {
* without "interrupting" for task execution.
*
* We try to accumulate as much tasks as possible in a local queue without
* any locks first, and then we push all of them into a schedulers queue
* any locks first, and then we push all of them into a scheduler's queue
* from within a single mutex lock.
*/
bool do_delayed_push;
@ -1052,14 +1052,20 @@ typedef struct ParallelRangeState {
int chunk_size;
} ParallelRangeState;
BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
const int num_tasks,
ParallelRangeState *state)
BLI_INLINE void task_parallel_calc_chunk_size(const TaskParallelSettings *settings,
const int tot_items,
int num_tasks,
int *r_chunk_size)
{
const int tot_items = state->stop - state->start;
int chunk_size = 0;
if (settings->min_iter_per_thread > 0) {
if (!settings->use_threading) {
/* Some users of this helper will still need a valid chunk size in case processing is not
* threaded. We can use a bigger one than in default threaded case then. */
chunk_size = 1024;
num_tasks = 1;
}
else if (settings->min_iter_per_thread > 0) {
/* Already set by user, no need to do anything here. */
chunk_size = settings->min_iter_per_thread;
}
@ -1091,14 +1097,28 @@ BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *
BLI_assert(chunk_size > 0);
switch (settings->scheduling_mode) {
case TASK_SCHEDULING_STATIC:
state->chunk_size = max_ii(chunk_size, tot_items / (num_tasks));
break;
case TASK_SCHEDULING_DYNAMIC:
state->chunk_size = chunk_size;
break;
if (tot_items > 0) {
switch (settings->scheduling_mode) {
case TASK_SCHEDULING_STATIC:
*r_chunk_size = max_ii(chunk_size, tot_items / num_tasks);
break;
case TASK_SCHEDULING_DYNAMIC:
*r_chunk_size = chunk_size;
break;
}
}
else {
/* If total amount of items is unknown, we can only use dynamic scheduling. */
*r_chunk_size = chunk_size;
}
}
BLI_INLINE void task_parallel_range_calc_chunk_size(const TaskParallelSettings *settings,
const int num_tasks,
ParallelRangeState *state)
{
task_parallel_calc_chunk_size(
settings, state->stop - state->start, num_tasks, &state->chunk_size);
}
BLI_INLINE bool parallel_range_next_iter_get(ParallelRangeState *__restrict state,
@ -1256,77 +1276,239 @@ void BLI_task_parallel_range(const int start,
}
}
#undef MALLOCA
#undef MALLOCA_FREE
typedef struct ParallelListbaseState {
typedef struct TaskParallelIteratorState {
void *userdata;
TaskParallelListbaseFunc func;
TaskParallelIteratorIterFunc iter_func;
TaskParallelIteratorFunc func;
int chunk_size;
int index;
Link *link;
SpinLock lock;
} ParallelListState;
/* *** Data used to 'acquire' chunks of items from the iterator. *** */
/* Common data also passed to the generator callback. */
TaskParallelIteratorStateShared iter_shared;
/* Total number of items. If unknown, set it to a negative number. */
int tot_items;
} TaskParallelIteratorState;
BLI_INLINE Link *parallel_listbase_next_iter_get(ParallelListState *__restrict state,
int *__restrict index,
int *__restrict count)
BLI_INLINE void task_parallel_iterator_calc_chunk_size(const TaskParallelSettings *settings,
const int num_tasks,
TaskParallelIteratorState *state)
{
int task_count = 0;
BLI_spin_lock(&state->lock);
Link *result = state->link;
if (LIKELY(result != NULL)) {
*index = state->index;
while (state->link != NULL && task_count < state->chunk_size) {
task_count++;
state->link = state->link->next;
}
state->index += task_count;
}
BLI_spin_unlock(&state->lock);
*count = task_count;
return result;
task_parallel_calc_chunk_size(
settings, state->tot_items, num_tasks, &state->iter_shared.chunk_size);
}
static void parallel_listbase_func(TaskPool *__restrict pool,
void *UNUSED(taskdata),
int UNUSED(threadid))
static void parallel_iterator_func_do(TaskParallelIteratorState *__restrict state,
void *userdata_chunk,
int threadid)
{
ParallelListState *__restrict state = BLI_task_pool_userdata(pool);
Link *link;
int index, count;
TaskParallelTLS tls = {
.thread_id = threadid,
.userdata_chunk = userdata_chunk,
};
while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
for (int i = 0; i < count; i++) {
state->func(state->userdata, link, index + i);
link = link->next;
void **current_chunk_items;
int *current_chunk_indices;
int current_chunk_size;
const size_t items_size = sizeof(*current_chunk_items) * (size_t)state->iter_shared.chunk_size;
const size_t indices_size = sizeof(*current_chunk_indices) *
(size_t)state->iter_shared.chunk_size;
current_chunk_items = MALLOCA(items_size);
current_chunk_indices = MALLOCA(indices_size);
current_chunk_size = 0;
for (bool do_abort = false; !do_abort;) {
if (state->iter_shared.spin_lock != NULL) {
BLI_spin_lock(state->iter_shared.spin_lock);
}
/* Get current status. */
int index = state->iter_shared.next_index;
void *item = state->iter_shared.next_item;
int i;
/* 'Acquire' a chunk of items from the iterator function. */
for (i = 0; i < state->iter_shared.chunk_size && !state->iter_shared.is_finished; i++) {
current_chunk_indices[i] = index;
current_chunk_items[i] = item;
state->iter_func(state->userdata, &tls, &item, &index, &state->iter_shared.is_finished);
}
/* Update current status. */
state->iter_shared.next_index = index;
state->iter_shared.next_item = item;
current_chunk_size = i;
do_abort = state->iter_shared.is_finished;
if (state->iter_shared.spin_lock != NULL) {
BLI_spin_unlock(state->iter_shared.spin_lock);
}
for (i = 0; i < current_chunk_size; ++i) {
state->func(state->userdata, current_chunk_items[i], current_chunk_indices[i], &tls);
}
}
MALLOCA_FREE(current_chunk_items, items_size);
MALLOCA_FREE(current_chunk_indices, indices_size);
}
static void task_parallel_listbase_no_threads(struct ListBase *listbase,
void *userdata,
TaskParallelListbaseFunc func)
static void parallel_iterator_func(TaskPool *__restrict pool, void *userdata_chunk, int threadid)
{
int i = 0;
for (Link *link = listbase->first; link != NULL; link = link->next, i++) {
func(userdata, link, i);
TaskParallelIteratorState *__restrict state = BLI_task_pool_userdata(pool);
parallel_iterator_func_do(state, userdata_chunk, threadid);
}
static void task_parallel_iterator_no_threads(const TaskParallelSettings *settings,
TaskParallelIteratorState *state)
{
/* Prepare user's TLS data. */
void *userdata_chunk = settings->userdata_chunk;
const size_t userdata_chunk_size = settings->userdata_chunk_size;
void *userdata_chunk_local = NULL;
const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
if (use_userdata_chunk) {
userdata_chunk_local = MALLOCA(userdata_chunk_size);
memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
}
/* Also marking it as non-threaded for the iterator callback. */
state->iter_shared.spin_lock = NULL;
parallel_iterator_func_do(state, userdata_chunk, 0);
if (use_userdata_chunk) {
if (settings->func_finalize != NULL) {
settings->func_finalize(state->userdata, userdata_chunk_local);
}
MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
}
}
/* NOTE: The idea here is to compensate for rather measurable threading
* overhead caused by fetching tasks. With too many CPU threads we are starting
* to spend too much time in those overheads. */
BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
static void task_parallel_iterator_do(const TaskParallelSettings *settings,
TaskParallelIteratorState *state)
{
if (num_threads > 32) {
return 128;
TaskScheduler *task_scheduler = BLI_task_scheduler_get();
const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
task_parallel_iterator_calc_chunk_size(settings, num_threads, state);
if (!settings->use_threading) {
task_parallel_iterator_no_threads(settings, state);
return;
}
else if (num_threads > 16) {
return 64;
const int chunk_size = state->iter_shared.chunk_size;
const int tot_items = state->tot_items;
const size_t num_tasks = tot_items >= 0 ?
(size_t)min_ii(num_threads, state->tot_items / chunk_size) :
(size_t)num_threads;
BLI_assert(num_tasks > 0);
if (num_tasks == 1) {
task_parallel_iterator_no_threads(settings, state);
return;
}
return 32;
SpinLock spin_lock;
BLI_spin_init(&spin_lock);
state->iter_shared.spin_lock = &spin_lock;
void *userdata_chunk = settings->userdata_chunk;
const size_t userdata_chunk_size = settings->userdata_chunk_size;
void *userdata_chunk_local = NULL;
void *userdata_chunk_array = NULL;
const bool use_userdata_chunk = (userdata_chunk_size != 0) && (userdata_chunk != NULL);
TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, state);
if (use_userdata_chunk) {
userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
}
for (size_t i = 0; i < num_tasks; i++) {
if (use_userdata_chunk) {
userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
}
/* Use this pool's pre-allocated tasks. */
BLI_task_pool_push_from_thread(task_pool,
parallel_iterator_func,
userdata_chunk_local,
false,
TASK_PRIORITY_HIGH,
task_pool->thread_id);
}
BLI_task_pool_work_and_wait(task_pool);
BLI_task_pool_free(task_pool);
if (use_userdata_chunk) {
if (settings->func_finalize != NULL) {
for (size_t i = 0; i < num_tasks; i++) {
userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
settings->func_finalize(state->userdata, userdata_chunk_local);
}
}
MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
}
BLI_spin_end(&spin_lock);
state->iter_shared.spin_lock = NULL;
}
/**
* This function allows to parallelize for loops using a generic iterator.
*
* \param userdata: Common userdata passed to all instances of \a func.
* \param iter_func: Callback function used to generate chunks of items.
* \param init_item: The initial item, if necessary (may be NULL if unused).
* \param init_index: The initial index.
* \param tot_items: The total amount of items to iterate over
* (if unkown, set it to a negative number).
* \param func: Callback function.
* \param settings: See public API doc of TaskParallelSettings for description of all settings.
*
* \note Static scheduling is only available when \a tot_items is >= 0.
*/
void BLI_task_parallel_iterator(void *userdata,
TaskParallelIteratorIterFunc iter_func,
void *init_item,
const int init_index,
const int tot_items,
TaskParallelIteratorFunc func,
const TaskParallelSettings *settings)
{
TaskParallelIteratorState state = {0};
state.tot_items = tot_items;
state.iter_shared.next_index = init_index;
state.iter_shared.next_item = init_item;
state.iter_shared.is_finished = false;
state.userdata = userdata;
state.iter_func = iter_func;
state.func = func;
task_parallel_iterator_do(settings, &state);
}
static void task_parallel_listbase_get(void *__restrict UNUSED(userdata),
const TaskParallelTLS *__restrict UNUSED(tls),
void **r_next_item,
int *r_next_index,
bool *r_do_abort)
{
/* Get current status. */
Link *link = *r_next_item;
if (link->next == NULL) {
*r_do_abort = true;
}
*r_next_item = link->next;
(*r_next_index)++;
}
/**
@ -1335,58 +1517,36 @@ BLI_INLINE int task_parallel_listbasecalc_chunk_size(const int num_threads)
* \param listbase: The double linked list to loop over.
* \param userdata: Common userdata passed to all instances of \a func.
* \param func: Callback function.
* \param use_threading: If \a true, actually split-execute loop in threads,
* else just do a sequential forloop
* (allows caller to use any kind of test to switch on parallelization or not).
* \param settings: See public API doc of ParallelRangeSettings for description of all settings.
*
* \note There is no static scheduling here,
* since it would need another full loop over items to count them.
*/
void BLI_task_parallel_listbase(struct ListBase *listbase,
void BLI_task_parallel_listbase(ListBase *listbase,
void *userdata,
TaskParallelListbaseFunc func,
const bool use_threading)
TaskParallelIteratorFunc func,
const TaskParallelSettings *settings)
{
if (BLI_listbase_is_empty(listbase)) {
return;
}
if (!use_threading) {
task_parallel_listbase_no_threads(listbase, userdata, func);
return;
}
TaskScheduler *task_scheduler = BLI_task_scheduler_get();
const int num_threads = BLI_task_scheduler_num_threads(task_scheduler);
/* TODO(sergey): Consider making chunk size configurable. */
const int chunk_size = task_parallel_listbasecalc_chunk_size(num_threads);
const int num_tasks = min_ii(num_threads, BLI_listbase_count(listbase) / chunk_size);
if (num_tasks <= 1) {
task_parallel_listbase_no_threads(listbase, userdata, func);
return;
}
ParallelListState state;
TaskPool *task_pool = BLI_task_pool_create_suspended(task_scheduler, &state);
TaskParallelIteratorState state = {0};
state.index = 0;
state.link = listbase->first;
state.tot_items = BLI_listbase_count(listbase);
state.iter_shared.next_index = 0;
state.iter_shared.next_item = listbase->first;
state.iter_shared.is_finished = false;
state.userdata = userdata;
state.iter_func = task_parallel_listbase_get;
state.func = func;
state.chunk_size = chunk_size;
BLI_spin_init(&state.lock);
BLI_assert(num_tasks > 0);
for (int i = 0; i < num_tasks; i++) {
/* Use this pool's pre-allocated tasks. */
BLI_task_pool_push_from_thread(
task_pool, parallel_listbase_func, NULL, false, TASK_PRIORITY_HIGH, task_pool->thread_id);
}
BLI_task_pool_work_and_wait(task_pool);
BLI_task_pool_free(task_pool);
BLI_spin_end(&state.lock);
task_parallel_iterator_do(settings, &state);
}
#undef MALLOCA
#undef MALLOCA_FREE
typedef struct ParallelMempoolState {
void *userdata;
TaskParallelMempoolFunc func;

View File

@ -38,14 +38,22 @@ static uint gen_pseudo_random_number(uint num)
return ((num & 255) << 6) + 1;
}
static void task_listbase_light_iter_func(void *UNUSED(userdata), Link *item, int index)
static void task_listbase_light_iter_func(void *UNUSED(userdata),
void *item,
int index,
const TaskParallelTLS *__restrict UNUSED(tls))
{
LinkData *data = (LinkData *)item;
data->data = POINTER_FROM_INT(POINTER_AS_INT(data->data) + index);
}
static void task_listbase_light_membarrier_iter_func(void *userdata, Link *item, int index)
static void task_listbase_light_membarrier_iter_func(void *userdata,
void *item,
int index,
const TaskParallelTLS *__restrict UNUSED(tls))
{
LinkData *data = (LinkData *)item;
int *count = (int *)userdata;
@ -54,7 +62,11 @@ static void task_listbase_light_membarrier_iter_func(void *userdata, Link *item,
atomic_sub_and_fetch_uint32((uint32_t *)count, 1);
}
static void task_listbase_heavy_iter_func(void *UNUSED(userdata), Link *item, int index)
static void task_listbase_heavy_iter_func(void *UNUSED(userdata),
void *item,
int index,
const TaskParallelTLS *__restrict UNUSED(tls))
{
LinkData *data = (LinkData *)item;
@ -66,7 +78,11 @@ static void task_listbase_heavy_iter_func(void *UNUSED(userdata), Link *item, in
}
}
static void task_listbase_heavy_membarrier_iter_func(void *userdata, Link *item, int index)
static void task_listbase_heavy_membarrier_iter_func(void *userdata,
void *item,
int index,
const TaskParallelTLS *__restrict UNUSED(tls))
{
LinkData *data = (LinkData *)item;
int *count = (int *)userdata;
@ -84,14 +100,18 @@ static void task_listbase_test_do(ListBase *list,
const int num_items,
int *num_items_tmp,
const char *id,
TaskParallelListbaseFunc func,
TaskParallelIteratorFunc func,
const bool use_threads,
const bool check_num_items_tmp)
{
TaskParallelSettings settings;
BLI_parallel_range_settings_defaults(&settings);
settings.use_threading = use_threads;
double averaged_timing = 0.0;
for (int i = 0; i < NUM_RUN_AVERAGED; i++) {
const double init_time = PIL_check_seconds_timer();
BLI_task_parallel_listbase(list, num_items_tmp, func, use_threads);
BLI_task_parallel_listbase(list, num_items_tmp, func, &settings);
averaged_timing += PIL_check_seconds_timer() - init_time;
/* Those checks should ensure us all items of the listbase were processed once, and only once -

View File

@ -88,7 +88,10 @@ TEST(task, MempoolIter)
/* *** Parallel iterations over double-linked list items. *** */
static void task_listbase_iter_func(void *userdata, Link *item, int index)
static void task_listbase_iter_func(void *userdata,
void *item,
int index,
const TaskParallelTLS *__restrict UNUSED(tls))
{
LinkData *data = (LinkData *)item;
int *count = (int *)userdata;
@ -112,7 +115,10 @@ TEST(task, ListBaseIter)
num_items++;
}
BLI_task_parallel_listbase(&list, &num_items, task_listbase_iter_func, true);
TaskParallelSettings settings;
BLI_parallel_range_settings_defaults(&settings);
BLI_task_parallel_listbase(&list, &num_items, task_listbase_iter_func, &settings);
/* Those checks should ensure us all items of the listbase were processed once, and only once -
* as expected. */