Task scheduler: Add thread-aware task push routines

This commit implements new function BLI_task_pool_push_from_thread()
who's main goal is to have less parasitic load on the CPU bu avoiding
memory allocations as much as possible, making taks pushing cheaper.

This function expects thread ID, which must be 0 for the thread from
which pool is created from (and from which wait_work() is called) and
for other threads it mush be the ID which was sent to the thread working
function.

This reduces allocations quite a bit in the new dependency graph,
hopefully gaining some visible speedup on a fewzillion core machines
(on my own machine can only see benefit in profiler, which shows
significant reduce of time wasted in the memory allocation).
This commit is contained in:
Sergey Sharybin 2016-05-10 09:55:58 +02:00
parent 401e710807
commit 7efa34d078
3 changed files with 215 additions and 19 deletions

View File

@ -84,7 +84,9 @@ void BLI_task_pool_push_ex(
TaskPool *pool, TaskRunFunction run, void *taskdata,
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority);
void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run,
void *taskdata, bool free_taskdata, TaskPriority priority);
void *taskdata, bool free_taskdata, TaskPriority priority);
void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id);
/* work and wait until all tasks are done */
void BLI_task_pool_work_and_wait(TaskPool *pool);

View File

@ -35,8 +35,17 @@
#include "atomic_ops.h"
/* Define this to enable some detailed statistic print. */
#undef DEBUG_STATS
/* Types */
/* Number of per-thread pre-allocated tasks.
*
* For more details see description of TaskMemPool.
*/
#define MEMPOOL_SIZE 256
typedef struct Task {
struct Task *next, *prev;
@ -47,6 +56,50 @@ typedef struct Task {
TaskPool *pool;
} Task;
/* This is a per-thread storage of pre-allocated tasks.
*
* The idea behind this is simple: reduce amount of malloc() calls when pushing
* new task to the pool. This is done by keeping memory from the tasks which
* were finished already, so instead of freeing that memory we put it to the
* pool for the later re-use.
*
* The tricky part here is to avoid any inter-thread synchronization, hence no
* lock must exist around this pool. The pool will become an owner of the pointer
* from freed task, and only corresponding thread will be able to use this pool
* (no memory stealing and such).
*
* This leads to the following use of the pool:
*
* - task_push() should provide proper thread ID from which the task is being
* pushed from.
*
* - Task allocation function which check corresponding memory pool and if there
* is any memory in there it'll mark memory as re-used, remove it from the pool
* and use that memory for the new task.
*
* At this moment task queue owns the memory.
*
* - When task is done and task_free() is called the memory will be put to the
* pool which corresponds to a thread which handled the task.
*/
typedef struct TaskMemPool {
/* Number of pre-allocated tasks in the pool. */
int num_tasks;
/* Pre-allocated task memory pointers. */
Task *tasks[MEMPOOL_SIZE];
} TaskMemPool;
#ifdef DEBUG_STATS
typedef struct TaskMemPoolStats {
/* Number of allocations. */
int num_alloc;
/* Number of avoided allocations (pointer was re-used from the pool). */
int num_reuse;
/* Number of discarded memory due to pool saturation, */
int num_discard;
} TaskMemPoolStats;
#endif
struct TaskPool {
TaskScheduler *scheduler;
@ -62,14 +115,32 @@ struct TaskPool {
volatile bool do_cancel;
/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler has to use its special
* background fallback thread in case we are in single-threaded situation. */
/* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
* has to use its special background fallback thread in case we are in
* single-threaded situation.
*/
bool run_in_background;
/* This pool is used for caching task pointers for thread id 0.
* This could either point to a global scheduler's task_mempool[0] if the
* pool is handled form the main thread or point to task_mempool_local
* otherwise.
*
* This way we solve possible threading conflicts accessing same global
* memory pool from multiple threads from which wait_work() is called.
*/
TaskMemPool *task_mempool;
TaskMemPool task_mempool_local;
#ifdef DEBUG_STATS
TaskMemPoolStats *mempool_stats;
#endif
};
struct TaskScheduler {
pthread_t *threads;
struct TaskThread *task_threads;
TaskMemPool *task_mempool;
int num_threads;
bool background_thread_only;
@ -98,6 +169,63 @@ static void task_data_free(Task *task, const int thread_id)
}
}
BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id)
{
if (thread_id == 0) {
return pool->task_mempool;
}
return &pool->scheduler->task_mempool[thread_id];
}
static Task *task_alloc(TaskPool *pool, const int thread_id)
{
assert(thread_id <= pool->scheduler->num_threads);
if (thread_id != -1) {
assert(thread_id >= 0);
TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
/* Try to re-use task memory from a thread local storage. */
if (mem_pool->num_tasks > 0) {
--mem_pool->num_tasks;
/* Success! We've just avoided task allocation. */
#ifdef DEBUG_STATS
pool->mempool_stats[thread_id].num_reuse++;
#endif
return mem_pool->tasks[mem_pool->num_tasks];
}
/* We are doomed to allocate new task data. */
#ifdef DEBUG_STATS
pool->mempool_stats[thread_id].num_alloc++;
#endif
}
return MEM_mallocN(sizeof(Task), "New task");
}
static void task_free(TaskPool *pool, Task *task, const int thread_id)
{
task_data_free(task, thread_id);
assert(thread_id >= 0);
assert(thread_id <= pool->scheduler->num_threads);
TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) {
/* Successfully allowed the task to be re-used later. */
mem_pool->tasks[mem_pool->num_tasks] = task;
++mem_pool->num_tasks;
}
else {
/* Local storage saturated, no other way than just discard
* the memory.
*
* TODO(sergey): We can perhaps store such pointer in a global
* scheduler pool, maybe it'll be faster than discarding and
* allocating again.
*/
MEM_freeN(task);
#ifdef DEBUG_STATS
pool->mempool_stats[thread_id].num_discard++;
#endif
}
}
/* Task Scheduler */
static void task_pool_num_decrease(TaskPool *pool, size_t done)
@ -196,8 +324,7 @@ static void *task_scheduler_thread_run(void *thread_p)
task->run(pool, task->taskdata, thread_id);
/* delete task */
task_data_free(task, thread_id);
MEM_freeN(task);
task_free(pool, task, thread_id);
/* notify pool task was done */
task_pool_num_decrease(pool, 1);
@ -249,6 +376,9 @@ TaskScheduler *BLI_task_scheduler_create(int num_threads)
fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
}
}
scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1),
"TaskScheduler task_mempool");
}
return scheduler;
@ -281,6 +411,16 @@ void BLI_task_scheduler_free(TaskScheduler *scheduler)
MEM_freeN(scheduler->task_threads);
}
/* Delete task memory pool */
if (scheduler->task_mempool) {
for (int i = 0; i <= scheduler->num_threads; ++i) {
for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) {
MEM_freeN(scheduler->task_mempool[i].tasks[j]);
}
}
MEM_freeN(scheduler->task_mempool);
}
/* delete leftover tasks */
for (task = scheduler->queue.first; task; task = task->next) {
task_data_free(task, 0);
@ -372,6 +512,20 @@ static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, c
pool->userdata = userdata;
BLI_mutex_init(&pool->user_mutex);
if (BLI_thread_is_main()) {
pool->task_mempool = scheduler->task_mempool;
}
else {
pool->task_mempool = &pool->task_mempool_local;
pool->task_mempool_local.num_tasks = 0;
}
#ifdef DEBUG_STATS
pool->mempool_stats =
MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1),
"per-taskpool mempool stats");
#endif
/* Ensure malloc will go fine from threads,
*
* This is needed because we could be in main thread here
@ -417,16 +571,36 @@ void BLI_task_pool_free(TaskPool *pool)
BLI_mutex_end(&pool->user_mutex);
/* Free local memory pool, those pointers are lost forever. */
if (pool->task_mempool == &pool->task_mempool_local) {
for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) {
MEM_freeN(pool->task_mempool_local.tasks[i]);
}
}
#ifdef DEBUG_STATS
printf("Thread ID Allocated Reused Discarded\n");
for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
printf("%02d %05d %05d %05d\n",
i,
pool->mempool_stats[i].num_alloc,
pool->mempool_stats[i].num_reuse,
pool->mempool_stats[i].num_discard);
}
MEM_freeN(pool->mempool_stats);
#endif
MEM_freeN(pool);
BLI_end_threaded_malloc();
}
void BLI_task_pool_push_ex(
static void task_pool_push(
TaskPool *pool, TaskRunFunction run, void *taskdata,
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
int thread_id)
{
Task *task = MEM_mallocN(sizeof(Task), "Task");
Task *task = task_alloc(pool, thread_id);
task->run = run;
task->taskdata = taskdata;
@ -437,12 +611,25 @@ void BLI_task_pool_push_ex(
task_scheduler_push(pool->scheduler, task, priority);
}
void BLI_task_pool_push_ex(
TaskPool *pool, TaskRunFunction run, void *taskdata,
bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
{
task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1);
}
void BLI_task_pool_push(
TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority)
{
BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority);
}
void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id)
{
task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id);
}
void BLI_task_pool_work_and_wait(TaskPool *pool)
{
TaskScheduler *scheduler = pool->scheduler;
@ -482,8 +669,7 @@ void BLI_task_pool_work_and_wait(TaskPool *pool)
work_task->run(pool, work_task->taskdata, 0);
/* delete task */
task_data_free(task, 0);
MEM_freeN(work_task);
task_free(pool, task, 0);
/* notify pool task was done */
task_pool_num_decrease(pool, 1);

View File

@ -122,7 +122,8 @@ void DEG_evaluation_context_free(EvaluationContext *eval_ctx)
static void schedule_children(TaskPool *pool,
Depsgraph *graph,
OperationDepsNode *node,
const int layers);
const int layers,
const int thread_id);
struct DepsgraphEvalState {
EvaluationContext *eval_ctx;
@ -132,7 +133,7 @@ struct DepsgraphEvalState {
static void deg_task_run_func(TaskPool *pool,
void *taskdata,
int UNUSED(threadid))
int thread_id)
{
DepsgraphEvalState *state = (DepsgraphEvalState *)BLI_task_pool_userdata(pool);
OperationDepsNode *node = (OperationDepsNode *)taskdata;
@ -161,7 +162,7 @@ static void deg_task_run_func(TaskPool *pool,
node,
end_time - start_time);
schedule_children(pool, state->graph, node, state->layers);
schedule_children(pool, state->graph, node, state->layers, thread_id);
}
static void calculate_pending_parents(Depsgraph *graph, int layers)
@ -235,7 +236,8 @@ static void calculate_eval_priority(OperationDepsNode *node)
* after a task has been completed.
*/
static void schedule_node(TaskPool *pool, Depsgraph *graph, int layers,
OperationDepsNode *node, bool dec_parents)
OperationDepsNode *node, bool dec_parents,
const int thread_id)
{
int id_layers = node->owner->owner->layers;
@ -252,11 +254,16 @@ static void schedule_node(TaskPool *pool, Depsgraph *graph, int layers,
if (!is_scheduled) {
if (node->is_noop()) {
/* skip NOOP node, schedule children right away */
schedule_children(pool, graph, node, layers);
schedule_children(pool, graph, node, layers, thread_id);
}
else {
/* children are scheduled once this task is completed */
BLI_task_pool_push(pool, deg_task_run_func, node, false, TASK_PRIORITY_LOW);
BLI_task_pool_push_from_thread(pool,
deg_task_run_func,
node,
false,
TASK_PRIORITY_LOW,
thread_id);
}
}
}
@ -272,14 +279,15 @@ static void schedule_graph(TaskPool *pool,
++it)
{
OperationDepsNode *node = *it;
schedule_node(pool, graph, layers, node, false);
schedule_node(pool, graph, layers, node, false, 0);
}
}
static void schedule_children(TaskPool *pool,
Depsgraph *graph,
OperationDepsNode *node,
const int layers)
const int layers,
const int thread_id)
{
DEPSNODE_RELATIONS_ITER_BEGIN(node->outlinks, rel)
{
@ -289,7 +297,7 @@ static void schedule_children(TaskPool *pool,
/* Happens when having cyclic dependencies. */
continue;
}
schedule_node(pool, graph, layers, child, (rel->flag & DEPSREL_FLAG_CYCLIC) == 0);
schedule_node(pool, graph, layers, child, (rel->flag & DEPSREL_FLAG_CYCLIC) == 0, thread_id);
}
DEPSNODE_RELATIONS_ITER_END;
}