Geometry Nodes: improve evaluator with lazy threading

In large node setup the threading overhead was sometimes very significant.
That's especially true when most nodes do very little work.

This commit improves the scheduling by not using multi-threading in many
cases unless it's likely that it will be worth it. For more details see the comments
in `BLI_lazy_threading.hh`.

Differential Revision: https://developer.blender.org/D15976
This commit is contained in:
Jacques Lucke 2022-09-20 10:59:12 +02:00
parent 7a239812ca
commit 5c81d3bd46
13 changed files with 447 additions and 109 deletions

View File

@ -0,0 +1,83 @@
/* SPDX-License-Identifier: GPL-2.0-or-later */
#pragma once
/** \file
* \ingroup bli
*
* The goal of "lazy threading" is to avoid using threads unless one can reasonably assume that it
* is worth distributing work over multiple threads. Using threads can lead to worse overall
* performance by introducing inter-thread communication overhead. Keeping all work on a single
* thread reduces this overhead to zero and also makes better use of the CPU cache.
*
* Functions like #parallel_for also solve this to some degree by using a "grain size". When the
* number of individual tasks is too small, no multi-threading is used. This works very well when
* there are many homogeneous tasks that can be expected to take approximately the same time.
*
* The situation becomes more difficult when:
* - The individual tasks are not homogeneous, i.e. they take different amounts of time to compute.
* - It is practically impossible to guess how long each task will take in advance.
*
* Given those constraints, a single grain size cannot be determined. One could just schedule all
* tasks individually but that would create a lot of overhead when the tasks happen to be very
* small. While TBB will keep all tasks on a single thread if the other threads are busy, if they
* are idle they will start stealing the work even if that's not benefitial for overall
* performance.
*
* This file provides a simple API that allows a task scheduler to properly handle tasks whose size
* is not known in advance. The key idea is this:
*
* > By default, all work stays on a single thread. If an individual task notices that it is about
* > start a computation that will take a while, it notifies the task scheduler further up on the
* > stack. The scheduler then allows other threads to take over other tasks that were originally
* > meant for the current thread.
*
* This way, when all tasks are small, no threading overhead has to be paid for. Whenever there is
* a task that keeps the current thread busy for a while, the other tasks are moved to a separate
* thread so that they can be executed without waiting for the long computation to finish.
*
* Consequently, the earlier a task knows during it execution that it will take a while, the
* better. That's because if it is blocking anyway, it's more efficient to move the other tasks to
* another thread earlier.
*
* To make this work, three things have to be solved:
* 1. The task scheduler has to be able to start single-threaded and become multi-threaded after
* tasks have started executing. This has to be solved in the specific task scheduler.
* 2. There has to be a way for the currently running task to tell the task scheduler that it is
* about to perform a computation that will take a while and that it would be reasonable to move
* other tasks to other threads. This part is implemented in the API provided by this file.
* 3. Individual tasks have to decide when a computation is long enough to justify talking to the
* scheduler. This is always based on heuristics that have to be fine tuned over time. One could
* assume that this means adding new work-size checks to many parts in Blender, but that's
* actually not necessary, because these checks exist already in the form of grain sizes passed
* to e.g. #parallel_for. The assumption here is that when the task thinks the current work load
* is big enough to justify using threads, it's also big enough to justify using another thread
* for waiting tasks on the current thread.
*/
#include "BLI_function_ref.hh"
namespace blender::lazy_threading {
/**
* Tell task schedulers on the current thread that it is about to start a long computation
* and that other waiting tasks should better be moved to another thread if possible.
*/
void send_hint();
/**
* Used by the task scheduler to receive hints from current tasks that they will take a while.
* This should only be allocated on the stack.
*/
class HintReceiver {
public:
/**
* The passed in function is called when a task signals that it will take a while.
* \note The function has to stay alive after the call to the constructor. So one must not pass a
* lambda directly into this constructor but store it in a separate variable on the stack first.
*/
HintReceiver(FunctionRef<void()> fn);
~HintReceiver();
};
} // namespace blender::lazy_threading

View File

@ -31,6 +31,7 @@
#endif
#include "BLI_index_range.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_utildefines.h"
namespace blender::threading {
@ -56,6 +57,7 @@ void parallel_for(IndexRange range, int64_t grain_size, const Function &function
#ifdef WITH_TBB
/* Invoking tbb for small workloads has a large overhead. */
if (range.size() >= grain_size) {
lazy_threading::send_hint();
tbb::parallel_for(
tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
[&](const tbb::blocked_range<int64_t> &subrange) {
@ -78,6 +80,7 @@ Value parallel_reduce(IndexRange range,
{
#ifdef WITH_TBB
if (range.size() >= grain_size) {
lazy_threading::send_hint();
return tbb::parallel_reduce(
tbb::blocked_range<int64_t>(range.first(), range.one_after_last(), grain_size),
identity,
@ -114,6 +117,7 @@ template<typename... Functions>
void parallel_invoke(const bool use_threading, Functions &&...functions)
{
if (use_threading) {
lazy_threading::send_hint();
parallel_invoke(std::forward<Functions>(functions)...);
}
else {

View File

@ -85,6 +85,7 @@ set(SRC
intern/kdtree_3d.c
intern/kdtree_4d.c
intern/lasso_2d.c
intern/lazy_threading.cc
intern/length_parameterize.cc
intern/listbase.c
intern/math_base.c

View File

@ -0,0 +1,30 @@
/* SPDX-License-Identifier: GPL-2.0-or-later */
#include "BLI_lazy_threading.hh"
#include "BLI_vector.hh"
namespace blender::lazy_threading {
/**
* This is a #RawVector so that it can be destructed after Blender checks for memory leaks.
*/
thread_local RawVector<FunctionRef<void()>, 0> hint_receivers;
void send_hint()
{
for (const FunctionRef<void()> &fn : hint_receivers) {
fn();
}
}
HintReceiver::HintReceiver(const FunctionRef<void()> fn)
{
hint_receivers.append(fn);
}
HintReceiver::~HintReceiver()
{
hint_receivers.pop_last();
}
} // namespace blender::lazy_threading

View File

@ -12,6 +12,7 @@
#include "DNA_listBase.h"
#include "BLI_lazy_threading.hh"
#include "BLI_task.h"
#include "BLI_threads.h"
@ -104,6 +105,8 @@ void BLI_task_parallel_range(const int start,
const size_t grainsize = MAX2(settings->min_iter_per_thread, 1);
const tbb::blocked_range<int> range(start, stop, grainsize);
blender::lazy_threading::send_hint();
if (settings->func_reduce) {
parallel_reduce(range, task);
if (settings->userdata_chunk) {

View File

@ -43,6 +43,13 @@
#include "BLI_linear_allocator.hh"
#include "BLI_vector.hh"
#include <atomic>
#include <thread>
#ifdef DEBUG
# define FN_LAZY_FUNCTION_DEBUG_THREADS
#endif
namespace blender::fn::lazy_function {
enum class ValueUsage {
@ -102,9 +109,13 @@ class Params {
* The lazy-function this #Params has been prepared for.
*/
const LazyFunction &fn_;
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
std::thread::id main_thread_id_;
std::atomic<bool> allow_multi_threading_;
#endif
public:
Params(const LazyFunction &fn);
Params(const LazyFunction &fn, bool allow_multi_threading_initially);
/**
* Get a pointer to an input value if the value is available already. Otherwise null is returned.
@ -154,7 +165,7 @@ class Params {
* Typed utility methods that wrap the methods above.
*/
template<typename T> T extract_input(int index);
template<typename T> const T &get_input(int index);
template<typename T> const T &get_input(int index) const;
template<typename T> T *try_get_input_data_ptr_or_request(int index);
template<typename T> void set_output(int index, T &&value);
@ -163,7 +174,15 @@ class Params {
*/
void set_default_remaining_outputs();
/**
* Returns true when the lazy-function is now allowed to use multi-threading when interacting
* with this #Params. That means, it is allowed to call non-const methods from different threads.
*/
bool try_enable_multi_threading();
private:
void assert_valid_thread() const;
/**
* Methods that need to be implemented by subclasses. Those are separate from the non-virtual
* methods above to make it easy to insert additional debugging logic on top of the
@ -176,6 +195,7 @@ class Params {
virtual bool output_was_set_impl(int index) const = 0;
virtual ValueUsage get_output_usage_impl(int index) const = 0;
virtual void set_input_unused_impl(int index) = 0;
virtual bool try_enable_multi_threading_impl();
};
/**
@ -312,7 +332,14 @@ inline void LazyFunction::execute(Params &params, const Context &context) const
/** \name #Params Inline Methods
* \{ */
inline Params::Params(const LazyFunction &fn) : fn_(fn)
inline Params::Params(const LazyFunction &fn,
[[maybe_unused]] bool allow_multi_threading_initially)
: fn_(fn)
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
,
main_thread_id_(std::this_thread::get_id()),
allow_multi_threading_(allow_multi_threading_initially)
#endif
{
}
@ -323,16 +350,19 @@ inline void *Params::try_get_input_data_ptr(const int index) const
inline void *Params::try_get_input_data_ptr_or_request(const int index)
{
this->assert_valid_thread();
return this->try_get_input_data_ptr_or_request_impl(index);
}
inline void *Params::get_output_data_ptr(const int index)
{
this->assert_valid_thread();
return this->get_output_data_ptr_impl(index);
}
inline void Params::output_set(const int index)
{
this->assert_valid_thread();
this->output_set_impl(index);
}
@ -348,18 +378,20 @@ inline ValueUsage Params::get_output_usage(const int index) const
inline void Params::set_input_unused(const int index)
{
this->assert_valid_thread();
this->set_input_unused_impl(index);
}
template<typename T> inline T Params::extract_input(const int index)
{
this->assert_valid_thread();
void *data = this->try_get_input_data_ptr(index);
BLI_assert(data != nullptr);
T return_value = std::move(*static_cast<T *>(data));
return return_value;
}
template<typename T> inline const T &Params::get_input(const int index)
template<typename T> inline const T &Params::get_input(const int index) const
{
const void *data = this->try_get_input_data_ptr(index);
BLI_assert(data != nullptr);
@ -368,17 +400,43 @@ template<typename T> inline const T &Params::get_input(const int index)
template<typename T> inline T *Params::try_get_input_data_ptr_or_request(const int index)
{
this->assert_valid_thread();
return static_cast<T *>(this->try_get_input_data_ptr_or_request(index));
}
template<typename T> inline void Params::set_output(const int index, T &&value)
{
using DecayT = std::decay_t<T>;
this->assert_valid_thread();
void *data = this->get_output_data_ptr(index);
new (data) DecayT(std::forward<T>(value));
this->output_set(index);
}
inline bool Params::try_enable_multi_threading()
{
this->assert_valid_thread();
const bool success = this->try_enable_multi_threading_impl();
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
if (success) {
allow_multi_threading_ = true;
}
#endif
return success;
}
inline void Params::assert_valid_thread() const
{
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
if (allow_multi_threading_) {
return;
}
if (main_thread_id_ != std::this_thread::get_id()) {
BLI_assert_unreachable();
}
#endif
}
/** \} */
} // namespace blender::fn::lazy_function

View File

@ -41,6 +41,7 @@ class BasicParams : public Params {
bool output_was_set_impl(const int index) const override;
ValueUsage get_output_usage_impl(const int index) const override;
void set_input_unused_impl(const int index) override;
bool try_enable_multi_threading_impl() override;
};
namespace detail {

View File

@ -63,4 +63,9 @@ void Params::set_default_remaining_outputs()
}
}
bool Params::try_enable_multi_threading_impl()
{
return false;
}
} // namespace blender::fn::lazy_function

View File

@ -14,7 +14,7 @@ BasicParams::BasicParams(const LazyFunction &fn,
MutableSpan<std::optional<ValueUsage>> input_usages,
Span<ValueUsage> output_usages,
MutableSpan<bool> set_outputs)
: Params(fn),
: Params(fn, true),
inputs_(inputs),
outputs_(outputs),
input_usages_(input_usages),
@ -62,4 +62,9 @@ void BasicParams::set_input_unused_impl(const int index)
input_usages_[index] = ValueUsage::Unused;
}
bool BasicParams::try_enable_multi_threading_impl()
{
return true;
}
} // namespace blender::fn::lazy_function

View File

@ -3,18 +3,20 @@
/**
* This file implements the evaluation of a lazy-function graph. It's main objectives are:
* - Only compute values that are actually used.
* - Allow spreading the work over an arbitrary number of CPU cores.
* - Stay single threaded when nodes are executed quickly.
* - Allow spreading the work over an arbitrary number of threads efficiently.
*
* Other (simpler) executors with different main objectives could be implemented in the future. For
* some scenarios those could be simpler when many nodes do very little work or most nodes have to
* be processed sequentially. Those assumptions make the first and second objective less important
* respectively.
* This executor makes use of `FN_lazy_threading.hh` to enable multi-threading only when it seems
* benefitial. It operates in two modes: single- and multi-threaded. The use of a task pool and
* locks is avoided in single-threaded mode. Once multi-threading is enabled the executor starts
* using both. It is not possible to switch back from multi-threaded to single-threaded mode.
*
* The design implemented in this executor requires *no* main thread that coordinates everything.
* Instead, one thread will trigger some initial work and then many threads coordinate themselves
* in a distributed fashion. In an ideal situation, every thread ends up processing a separate part
* of the graph which results in less communication overhead. The way TBB schedules tasks helps
* with that: a thread will next process the task that it added to a task pool just before.
* The multi-threading design implemented in this executor requires *no* main thread that
* coordinates everything. Instead, one thread will trigger some initial work and then many threads
* coordinate themselves in a distributed fashion. In an ideal situation, every thread ends up
* processing a separate part of the graph which results in less communication overhead. The way
* TBB schedules tasks helps with that: a thread will next process the task that it added to a task
* pool just before.
*
* Communication between threads is synchronized by using a mutex in every node. When a thread
* wants to access the state of a node, its mutex has to be locked first (with some documented
@ -26,15 +28,14 @@
* state of its inputs and outputs. Every time a node is executed, it has to advance its state in
* some way (e.g. it requests a new input or computes a new output).
*
* At the core of the executor is a task pool. Every task in that pool represents a node execution.
* When a node is executed it may send notifications to other nodes which may in turn add those
* nodes to the task pool. For example, the current node has computed one of its outputs, then the
* When a node is executed it may send notifications to other nodes which may in turn schedule
* those nodes. For example, when the current node has computed one of its outputs, then the
* computed value is forwarded to all linked inputs, changing their node states in the process. If
* this input was the last missing required input, the node will be added to the task pool so that
* it is executed next.
* this input was the last missing required input, the node will be scheduled that it is executed
* next.
*
* When the task pool is empty, the executor gives back control to the caller which may later
* provide new inputs to the graph which in turn adds new nodes to the task pool and the process
* When all tasks are completed, the executor gives back control to the caller which may later
* provide new inputs to the graph which in turn leads to new nodes being scheduled and the process
* starts again.
*/
@ -190,27 +191,31 @@ struct LockedNode {
*/
Vector<const OutputSocket *> delayed_required_outputs;
Vector<const OutputSocket *> delayed_unused_outputs;
Vector<const FunctionNode *> delayed_scheduled_nodes;
LockedNode(const Node &node, NodeState &node_state) : node(node), node_state(node_state)
{
}
};
class Executor;
class GraphExecutorLFParams;
struct CurrentTask {
/**
* The node that should be run on the same thread after the current node is done. This avoids
* some overhead by skipping a round trip through the task pool.
* Mutex used to protect #scheduled_nodes when the executor uses multi-threading.
*/
std::atomic<const FunctionNode *> next_node = nullptr;
std::mutex mutex;
/**
* Indicates that some node has been added to the task pool.
* Nodes that have been scheduled to execute next.
*/
std::atomic<bool> added_node_to_pool = false;
Vector<const FunctionNode *> scheduled_nodes;
/**
* Makes it cheaper to check if there are any scheduled nodes because it avoids locking the
* mutex.
*/
std::atomic<bool> has_scheduled_nodes = false;
};
class GraphExecutorLFParams;
class Executor {
private:
const GraphExecutor &self_;
@ -230,13 +235,18 @@ class Executor {
const Context *context_ = nullptr;
/**
* Used to distribute work on separate nodes to separate threads.
* If this is empty, the executor is in single threaded mode.
*/
TaskPool *task_pool_ = nullptr;
std::atomic<TaskPool *> task_pool_ = nullptr;
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
std::thread::id current_main_thread_;
#endif
/**
* A separate linear allocator for every thread. We could potentially reuse some memory, but that
* doesn't seem worth it yet.
*/
threading::EnumerableThreadSpecific<LinearAllocator<>> local_allocators_;
LinearAllocator<> *main_local_allocator_ = nullptr;
/**
* Set to false when the first execution ends.
*/
@ -249,11 +259,14 @@ class Executor {
{
/* The indices are necessary, because they are used as keys in #node_states_. */
BLI_assert(self_.graph_.node_indices_are_valid());
main_local_allocator_ = &local_allocators_.local();
}
~Executor()
{
BLI_task_pool_free(task_pool_);
if (TaskPool *task_pool = task_pool_.load()) {
BLI_task_pool_free(task_pool);
}
threading::parallel_for(node_states_.index_range(), 1024, [&](const IndexRange range) {
for (const int node_index : range) {
const Node &node = *self_.graph_.nodes()[node_index];
@ -270,18 +283,23 @@ class Executor {
{
params_ = &params;
context_ = &context;
BLI_SCOPED_DEFER([&]() {
/* Make sure the #params_ pointer is not dangling, even when it shouldn't be accessed by
* anyone. */
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
current_main_thread_ = std::this_thread::get_id();
#endif
const auto deferred_func = [&]() {
/* Make sure the pointers are not dangling, even when it shouldn't be accessed by anyone. */
params_ = nullptr;
context_ = nullptr;
is_first_execution_ = false;
});
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
current_main_thread_ = {};
#endif
};
BLI_SCOPED_DEFER(deferred_func);
CurrentTask current_task;
if (is_first_execution_) {
this->initialize_node_states();
task_pool_ = BLI_task_pool_create(this, TASK_PRIORITY_HIGH);
/* Initialize atomics to zero. */
memset(static_cast<void *>(loaded_inputs_.data()), 0, loaded_inputs_.size() * sizeof(bool));
@ -294,21 +312,11 @@ class Executor {
this->schedule_newly_requested_outputs(current_task);
this->forward_newly_provided_inputs(current_task);
/* Avoid using task pool when there is no parallel work to do. */
while (!current_task.added_node_to_pool) {
if (current_task.next_node == nullptr) {
/* Nothing to do. */
return;
}
const FunctionNode &node = *current_task.next_node;
current_task.next_node = nullptr;
this->run_node_task(node, current_task);
}
if (current_task.next_node != nullptr) {
this->add_node_to_task_pool(*current_task.next_node);
}
this->run_task(current_task);
BLI_task_pool_work_and_wait(task_pool_);
if (TaskPool *task_pool = task_pool_.load()) {
BLI_task_pool_work_and_wait(task_pool);
}
}
private:
@ -426,7 +434,7 @@ class Executor {
NodeState &node_state = *node_states_[node->index_in_graph()];
node_state.has_side_effects = true;
this->with_locked_node(*node, node_state, current_task, [&](LockedNode &locked_node) {
this->schedule_node(locked_node);
this->schedule_node(locked_node, current_task);
});
}
}
@ -434,7 +442,7 @@ class Executor {
void forward_newly_provided_inputs(CurrentTask &current_task)
{
LinearAllocator<> &allocator = local_allocators_.local();
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
for (const int graph_input_index : self_.graph_inputs_.index_range()) {
std::atomic<uint8_t> &was_loaded = loaded_inputs_[graph_input_index];
if (was_loaded.load()) {
@ -488,7 +496,7 @@ class Executor {
return;
}
this->forward_newly_provided_input(
current_task, local_allocators_.local(), graph_input_index, input_data);
current_task, this->get_main_or_local_allocator(), graph_input_index, input_data);
return;
}
@ -498,7 +506,7 @@ class Executor {
return;
}
output_state.usage = ValueUsage::Used;
this->schedule_node(locked_node);
this->schedule_node(locked_node, current_task);
});
}
@ -520,25 +528,28 @@ class Executor {
params_->set_input_unused(graph_input_index);
}
else {
this->schedule_node(locked_node);
this->schedule_node(locked_node, current_task);
}
}
}
});
}
void schedule_node(LockedNode &locked_node)
void schedule_node(LockedNode &locked_node, CurrentTask &current_task)
{
BLI_assert(locked_node.node.is_function());
switch (locked_node.node_state.schedule_state) {
case NodeScheduleState::NotScheduled: {
/* Don't add the node to the task pool immediately, because the task pool might start
* executing it immediately (when Blender is started with a single thread).
* That would often result in a deadlock, because we are still holding the mutex of the
* current node. Also see comments in #LockedNode. */
locked_node.node_state.schedule_state = NodeScheduleState::Scheduled;
locked_node.delayed_scheduled_nodes.append(
&static_cast<const FunctionNode &>(locked_node.node));
const FunctionNode &node = static_cast<const FunctionNode &>(locked_node.node);
if (this->use_multi_threading()) {
std::lock_guard lock{current_task.mutex};
current_task.scheduled_nodes.append(&node);
}
else {
current_task.scheduled_nodes.append(&node);
}
current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
break;
}
case NodeScheduleState::Scheduled: {
@ -562,14 +573,16 @@ class Executor {
BLI_assert(&node_state == node_states_[node.index_in_graph()]);
LockedNode locked_node{node, node_state};
{
if (this->use_multi_threading()) {
std::lock_guard lock{node_state.mutex};
threading::isolate_task([&]() { f(locked_node); });
}
else {
f(locked_node);
}
this->send_output_required_notifications(locked_node.delayed_required_outputs, current_task);
this->send_output_unused_notifications(locked_node.delayed_unused_outputs, current_task);
this->schedule_new_nodes(locked_node.delayed_scheduled_nodes, current_task);
}
void send_output_required_notifications(const Span<const OutputSocket *> sockets,
@ -588,49 +601,21 @@ class Executor {
}
}
void schedule_new_nodes(const Span<const FunctionNode *> nodes, CurrentTask &current_task)
void run_task(CurrentTask &current_task)
{
for (const FunctionNode *node_to_schedule : nodes) {
/* Avoid a round trip through the task pool for the first node that is scheduled by the
* current node execution. Other nodes are added to the pool so that other threads can pick
* them up. */
const FunctionNode *expected = nullptr;
if (current_task.next_node.compare_exchange_strong(
expected, node_to_schedule, std::memory_order_relaxed)) {
continue;
while (!current_task.scheduled_nodes.is_empty()) {
const FunctionNode &node = *current_task.scheduled_nodes.pop_last();
if (current_task.scheduled_nodes.is_empty()) {
current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
this->add_node_to_task_pool(*node_to_schedule);
current_task.added_node_to_pool.store(true, std::memory_order_relaxed);
}
}
void add_node_to_task_pool(const Node &node)
{
BLI_task_pool_push(
task_pool_, Executor::run_node_from_task_pool, (void *)&node, false, nullptr);
}
static void run_node_from_task_pool(TaskPool *task_pool, void *task_data)
{
void *user_data = BLI_task_pool_user_data(task_pool);
Executor &executor = *static_cast<Executor *>(user_data);
const FunctionNode &node = *static_cast<const FunctionNode *>(task_data);
/* This loop reduces the number of round trips through the task pool as long as the current
* node is scheduling more nodes. */
CurrentTask current_task;
current_task.next_node = &node;
while (current_task.next_node != nullptr) {
const FunctionNode &node_to_run = *current_task.next_node;
current_task.next_node = nullptr;
executor.run_node_task(node_to_run, current_task);
this->run_node_task(node, current_task);
}
}
void run_node_task(const FunctionNode &node, CurrentTask &current_task)
{
NodeState &node_state = *node_states_[node.index_in_graph()];
LinearAllocator<> &allocator = local_allocators_.local();
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const LazyFunction &fn = node.function();
bool node_needs_execution = false;
@ -672,7 +657,7 @@ class Executor {
}
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(default_value, buffer);
this->forward_value_to_input(locked_node, input_state, {type, buffer});
this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
/* Request linked inputs that are always needed. */
@ -723,7 +708,7 @@ class Executor {
NodeScheduleState::RunningAndRescheduled;
node_state.schedule_state = NodeScheduleState::NotScheduled;
if (reschedule_requested && !node_state.node_has_finished) {
this->schedule_node(locked_node);
this->schedule_node(locked_node, current_task);
}
});
}
@ -887,7 +872,7 @@ class Executor {
CurrentTask &current_task)
{
BLI_assert(value_to_forward.get() != nullptr);
LinearAllocator<> &allocator = local_allocators_.local();
LinearAllocator<> &allocator = this->get_main_or_local_allocator();
const CPPType &type = *value_to_forward.type();
if (self_.logger_ != nullptr) {
@ -938,13 +923,13 @@ class Executor {
}
if (is_last_target) {
/* No need to make a copy if this is the last target. */
this->forward_value_to_input(locked_node, input_state, value_to_forward);
this->forward_value_to_input(locked_node, input_state, value_to_forward, current_task);
value_to_forward = {};
}
else {
void *buffer = allocator.allocate(type.size(), type.alignment());
type.copy_construct(value_to_forward.get(), buffer);
this->forward_value_to_input(locked_node, input_state, {type, buffer});
this->forward_value_to_input(locked_node, input_state, {type, buffer}, current_task);
}
});
}
@ -955,7 +940,8 @@ class Executor {
void forward_value_to_input(LockedNode &locked_node,
InputState &input_state,
GMutablePointer value)
GMutablePointer value,
CurrentTask &current_task)
{
NodeState &node_state = locked_node.node_state;
@ -966,10 +952,82 @@ class Executor {
if (input_state.usage == ValueUsage::Used) {
node_state.missing_required_inputs -= 1;
if (node_state.missing_required_inputs == 0) {
this->schedule_node(locked_node);
this->schedule_node(locked_node, current_task);
}
}
}
bool use_multi_threading() const
{
return task_pool_.load() != nullptr;
}
bool try_enable_multi_threading()
{
if (this->use_multi_threading()) {
return true;
}
#ifdef FN_LAZY_FUNCTION_DEBUG_THREADS
/* Only the current main thread is allowed to enabled multi-threading, because the executor is
* still in single-threaded mode. */
if (current_main_thread_ != std::this_thread::get_id()) {
BLI_assert_unreachable();
}
#endif
/* Check of the caller supports multi-threading. */
if (!params_->try_enable_multi_threading()) {
return false;
}
/* Avoid using multiple threads when only one thread can be used anyway. */
if (BLI_system_thread_count() <= 1) {
return false;
}
task_pool_.store(BLI_task_pool_create(this, TASK_PRIORITY_HIGH));
return true;
}
/**
* Allow other threads to steal all the nodes that are currently scheduled on this thread.
*/
void move_scheduled_nodes_to_task_pool(CurrentTask &current_task)
{
BLI_assert(this->use_multi_threading());
using FunctionNodeVector = Vector<const FunctionNode *>;
FunctionNodeVector *nodes = MEM_new<FunctionNodeVector>(__func__);
{
std::lock_guard lock{current_task.mutex};
if (current_task.scheduled_nodes.is_empty()) {
return;
}
*nodes = std::move(current_task.scheduled_nodes);
current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
/* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
* overhead when the nodes are fast to compute. */
BLI_task_pool_push(
task_pool_.load(),
[](TaskPool *pool, void *data) {
Executor &executor = *static_cast<Executor *>(BLI_task_pool_user_data(pool));
FunctionNodeVector &nodes = *static_cast<FunctionNodeVector *>(data);
CurrentTask new_current_task;
new_current_task.scheduled_nodes = std::move(nodes);
new_current_task.has_scheduled_nodes.store(true, std::memory_order_relaxed);
executor.run_task(new_current_task);
},
nodes,
true,
[](TaskPool * /*pool*/, void *data) {
MEM_delete(static_cast<FunctionNodeVector *>(data));
});
}
LinearAllocator<> &get_main_or_local_allocator()
{
if (this->use_multi_threading()) {
return local_allocators_.local();
}
return *main_local_allocator_;
}
};
class GraphExecutorLFParams final : public Params {
@ -985,7 +1043,7 @@ class GraphExecutorLFParams final : public Params {
const Node &node,
NodeState &node_state,
CurrentTask &current_task)
: Params(fn),
: Params(fn, executor.use_multi_threading()),
executor_(executor),
node_(node),
node_state_(node_state),
@ -1017,7 +1075,7 @@ class GraphExecutorLFParams final : public Params {
OutputState &output_state = node_state_.outputs[index];
BLI_assert(!output_state.has_been_computed);
if (output_state.value == nullptr) {
LinearAllocator<> &allocator = executor_.local_allocators_.local();
LinearAllocator<> &allocator = executor_.get_main_or_local_allocator();
const CPPType &type = node_.output(index).type();
output_state.value = allocator.allocate(type.size(), type.alignment());
}
@ -1052,6 +1110,11 @@ class GraphExecutorLFParams final : public Params {
{
executor_.set_input_unused_during_execution(node_, node_state_, index, current_task_);
}
bool try_enable_multi_threading_impl() override
{
return executor_.try_enable_multi_threading();
}
};
/**
@ -1073,6 +1136,20 @@ inline void Executor::execute_node(const FunctionNode &node,
self_.logger_->log_before_node_execute(node, node_params, fn_context);
}
/* This is run when the execution of the node calls `lazy_threading::send_hint` to indicate that
* the execution will take a while. In this case, other tasks waiting on this thread should be
* allowed to be picked up by another thread. */
auto blocking_hint_fn = [&]() {
if (!current_task.has_scheduled_nodes.load()) {
return;
}
if (!this->try_enable_multi_threading()) {
return;
}
this->move_scheduled_nodes_to_task_pool(current_task);
};
lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};
fn.execute(node_params, fn_context);
if (self_.logger_ != nullptr) {

View File

@ -124,6 +124,11 @@ struct GeometryNodesLazyFunctionGraphInfo {
* Mappings between the lazy-function graph and the #bNodeTree.
*/
GeometryNodeLazyFunctionGraphMapping mapping;
/**
* Approximate number of nodes in the graph if all sub-graphs were inlined.
* This can be used as a simple heuristic for the complexity of the node group.
*/
int num_inline_nodes_approximate = 0;
GeometryNodesLazyFunctionGraphInfo();
~GeometryNodesLazyFunctionGraphInfo();
@ -148,6 +153,9 @@ class GeometryNodesLazyFunctionLogger : public fn::lazy_function::GraphExecutor:
void dump_when_input_is_set_twice(const lf::InputSocket &target_socket,
const lf::OutputSocket &from_socket,
const lf::Context &context) const override;
void log_before_node_execute(const lf::FunctionNode &node,
const lf::Params &params,
const lf::Context &context) const override;
};
/**

View File

@ -533,6 +533,8 @@ static void node_geo_exec(GeoNodeExecParams params)
attribute_outputs.rotation_id = StrongAnonymousAttributeID("Rotation");
}
lazy_threading::send_hint();
geometry_set.modify_geometry_sets([&](GeometrySet &geometry_set) {
point_distribution_calculate(
geometry_set, selection_field, method, seed, attribute_outputs, params);

View File

@ -16,6 +16,7 @@
#include "NOD_multi_function.hh"
#include "NOD_node_declaration.hh"
#include "BLI_lazy_threading.hh"
#include "BLI_map.hh"
#include "DNA_ID.h"
@ -559,6 +560,7 @@ class LazyFunctionForViewerNode : public LazyFunction {
class LazyFunctionForGroupNode : public LazyFunction {
private:
const bNode &group_node_;
bool has_many_nodes_ = false;
std::optional<GeometryNodesLazyFunctionLogger> lf_logger_;
std::optional<GeometryNodesLazyFunctionSideEffectProvider> lf_side_effect_provider_;
std::optional<lf::GraphExecutor> graph_executor_;
@ -577,6 +579,8 @@ class LazyFunctionForGroupNode : public LazyFunction {
bNodeTree *group_btree = reinterpret_cast<bNodeTree *>(group_node_.id);
BLI_assert(group_btree != nullptr);
has_many_nodes_ = lf_graph_info.num_inline_nodes_approximate > 1000;
Vector<const lf::OutputSocket *> graph_inputs;
for (const lf::OutputSocket *socket : lf_graph_info.mapping.group_input_sockets) {
if (socket != nullptr) {
@ -608,6 +612,12 @@ class LazyFunctionForGroupNode : public LazyFunction {
GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data);
BLI_assert(user_data != nullptr);
if (has_many_nodes_) {
/* If the called node group has many nodes, it's likely that executing it takes a while even
* if every individual node is very small. */
lazy_threading::send_hint();
}
/* The compute context changes when entering a node group. */
bke::NodeGroupComputeContext compute_context{user_data->compute_context, group_node_.name};
GeoNodesLFUserData group_user_data = *user_data;
@ -699,6 +709,7 @@ struct GeometryNodesLazyFunctionGraphBuilder {
this->add_default_inputs();
lf_graph_->update_node_indices();
lf_graph_info_->num_inline_nodes_approximate += lf_graph_->nodes().size();
}
private:
@ -915,6 +926,8 @@ struct GeometryNodesLazyFunctionGraphBuilder {
mapping_->bsockets_by_lf_socket_map.add(&lf_socket, &bsocket);
}
mapping_->group_node_map.add(&bnode, &lf_node);
lf_graph_info_->num_inline_nodes_approximate +=
group_lf_graph_info->num_inline_nodes_approximate;
}
void handle_geometry_node(const bNode &bnode)
@ -1358,4 +1371,52 @@ GeometryNodesLazyFunctionGraphInfo::~GeometryNodesLazyFunctionGraphInfo()
}
}
static void add_thread_id_debug_message(const GeometryNodesLazyFunctionGraphInfo &lf_graph_info,
const lf::FunctionNode &node,
const lf::Context &context)
{
static std::atomic<int> thread_id_source = 0;
static thread_local const int thread_id = thread_id_source.fetch_add(1);
static thread_local const std::string thread_id_str = "Thread: " + std::to_string(thread_id);
GeoNodesLFUserData *user_data = dynamic_cast<GeoNodesLFUserData *>(context.user_data);
BLI_assert(user_data != nullptr);
if (user_data->modifier_data->eval_log == nullptr) {
return;
}
geo_eval_log::GeoTreeLogger &tree_logger =
user_data->modifier_data->eval_log->get_local_tree_logger(*user_data->compute_context);
/* Find corresponding node based on the socket mapping. */
auto check_sockets = [&](const Span<const lf::Socket *> lf_sockets) {
for (const lf::Socket *lf_socket : lf_sockets) {
const Span<const bNodeSocket *> bsockets =
lf_graph_info.mapping.bsockets_by_lf_socket_map.lookup(lf_socket);
if (!bsockets.is_empty()) {
const bNodeSocket &bsocket = *bsockets[0];
const bNode &bnode = bsocket.owner_node();
tree_logger.debug_messages.append(
{tree_logger.allocator->copy_string(bnode.name), thread_id_str});
return true;
}
}
return false;
};
if (check_sockets(node.inputs().cast<const lf::Socket *>())) {
return;
}
check_sockets(node.outputs().cast<const lf::Socket *>());
}
void GeometryNodesLazyFunctionLogger::log_before_node_execute(const lf::FunctionNode &node,
const lf::Params &UNUSED(params),
const lf::Context &context) const
{
/* Enable this to see the threads that invoked a node. */
if constexpr (false) {
add_thread_id_debug_message(lf_graph_info_, node, context);
}
}
} // namespace blender::nodes