Cycles: network render code updated for latest changes and improved

This actually works somewhat now, although viewport rendering is broken and any
kind of network error or connection failure will kill Blender.

* Experimental WITH_CYCLES_NETWORK cmake option
* Networked Device is shown as an option next to CPU and GPU Compute
* Various updates to work with the latest Cycles code
* Locks and thread safety for RPC calls and tiles
* Refactored pointer mapping code
* Fix error in CPU brand string retrieval code

This includes work by Doug Gale, Martijn Berger and Brecht Van Lommel.

Reviewers: brecht

Differential Revision: http://developer.blender.org/D36
This commit is contained in:
Martijn Berger 2013-12-07 02:29:53 +01:00 committed by Brecht Van Lommel
parent 5c07f62fab
commit 85a0c5d4e1
22 changed files with 490 additions and 152 deletions

View File

@ -737,6 +737,9 @@ if(UNIX AND NOT APPLE)
if(WITH_INTERNATIONAL)
list(APPEND __boost_packages locale)
endif()
if(WITH_CYCLES_NETWORK)
list(APPEND __boost_packages serialization)
endif()
find_package(Boost 1.48 COMPONENTS ${__boost_packages})
unset(__boost_packages)
if(Boost_USE_STATIC_LIBS AND WITH_BOOST_ICU)
@ -1700,6 +1703,9 @@ elseif(APPLE)
list(APPEND BOOST_LIBRARIES boost_locale-mt)
set(PLATFORM_LINKFLAGS "${PLATFORM_LINKFLAGS} -liconv") # boost_locale needs it !
endif()
if(WITH_CYCLES_NETWORK)
list(APPEND BOOST_LIBRARIES boost_serialization-mt)
endif()
set(BOOST_LIBPATH ${BOOST}/lib)
set(BOOST_DEFINITIONS)
endif()

View File

@ -30,6 +30,10 @@ set(LIBRARIES
${TIFF_LIBRARY}
)
if(UNIX)
list(APPEND LIBRARIES dl)
endif()
if(WIN32)
list(APPEND LIBRARIES ${PTHREADS_LIBRARIES})
endif()

View File

@ -35,6 +35,7 @@ int main(int argc, const char **argv)
string devicelist = "";
string devicename = "cpu";
bool list = false;
int threads = 0;
vector<DeviceType>& types = Device::available_types();
@ -51,6 +52,7 @@ int main(int argc, const char **argv)
ap.options ("Usage: cycles_server [options]",
"--device %s", &devicename, ("Devices to use: " + devicelist).c_str(),
"--list-devices", &list, "List information about all available devices",
"--threads %d", &threads, "Number of threads to use for CPU device",
NULL);
if(ap.parse(argc, argv) < 0) {
@ -84,11 +86,11 @@ int main(int argc, const char **argv)
}
}
TaskScheduler::init();
TaskScheduler::init(threads);
while(1) {
Stats stats;
Device *device = Device::create(device_info, stats);
Device *device = Device::create(device_info, stats, true);
printf("Cycles Server with device: %s\n", device->info.description.c_str());
device->server_run();
delete device;

View File

@ -30,7 +30,7 @@ typedef struct CCLDeviceInfo {
int value;
} CCLDeviceInfo;
CCLDeviceInfo *CCL_compute_device_list(int opencl);
CCLDeviceInfo *CCL_compute_device_list(int device_type);
/* create python module _cycles used by addon */

View File

@ -88,3 +88,7 @@ def available_devices():
def with_osl():
import _cycles
return _cycles.with_osl
def with_network():
import _cycles
return _cycles.with_network

View File

@ -25,9 +25,15 @@ from bpy.props import (BoolProperty,
# enums
import _cycles
enum_devices = (
('CPU', "CPU", "Use CPU for rendering"),
('GPU', "GPU Compute", "Use GPU compute device for rendering, configured in user preferences"))
('CPU', "CPU", "Use CPU for rendering"),
('GPU', "GPU Compute", "Use GPU compute device for rendering, configured in user preferences"),
)
if _cycles.with_network:
enum_devices += (('NETWORK', "Networked Device", "Use networked device for rendering"),)
enum_feature_set = (
('SUPPORTED', "Supported", "Only use finished and supported features"),

View File

@ -1254,6 +1254,8 @@ def draw_device(self, context):
layout.prop(cscene, "device")
elif device_type == 'OPENCL':
layout.prop(cscene, "device")
elif device_type == 'NETWORK':
layout.prop(cscene, "device")
if engine.with_osl() and (cscene.device == 'CPU' or device_type == 'NONE'):
layout.prop(cscene, "shading_system")

View File

@ -482,12 +482,34 @@ void *CCL_python_module_init()
Py_INCREF(Py_False);
#endif
#ifdef WITH_NETWORK
PyModule_AddObject(mod, "with_network", Py_True);
Py_INCREF(Py_True);
#else /* WITH_NETWORK */
PyModule_AddObject(mod, "with_network", Py_False);
Py_INCREF(Py_False);
#endif /* WITH_NETWORK */
return (void*)mod;
}
CCLDeviceInfo *CCL_compute_device_list(int opencl)
CCLDeviceInfo *CCL_compute_device_list(int device_type)
{
ccl::DeviceType type = (opencl)? ccl::DEVICE_OPENCL: ccl::DEVICE_CUDA;
ccl::DeviceType type;
switch(device_type) {
case 0:
type = ccl::DEVICE_CUDA;
break;
case 1:
type = ccl::DEVICE_OPENCL;
break;
case 2:
type = ccl::DEVICE_NETWORK;
break;
default:
type = ccl::DEVICE_NONE;
break;
}
return ccl::compute_device_list(type);
}

View File

@ -26,7 +26,7 @@ set(SRC
device_task.cpp
)
if(WITH_NETWORK)
if(WITH_CYCLES_NETWORK)
list(APPEND SRC
device_network.cpp
)

View File

@ -127,7 +127,7 @@ Device *Device::create(DeviceInfo& info, Stats &stats, bool background)
switch(info.type) {
case DEVICE_CPU:
device = device_cpu_create(info, stats);
device = device_cpu_create(info, stats, background);
break;
#ifdef WITH_CUDA
case DEVICE_CUDA:
@ -159,9 +159,6 @@ Device *Device::create(DeviceInfo& info, Stats &stats, bool background)
return NULL;
}
if(device)
device->info = info;
return device;
}

View File

@ -71,7 +71,7 @@ public:
class Device {
protected:
Device(Stats &stats_) : stats(stats_) {}
Device(DeviceInfo& info_, Stats &stats_, bool background) : background(background), info(info_), stats(stats_) {}
bool background;
string error_msg;

View File

@ -45,11 +45,13 @@ class CPUDevice : public Device
public:
TaskPool task_pool;
KernelGlobals kernel_globals;
#ifdef WITH_OSL
OSLGlobals osl_globals;
#endif
CPUDevice(Stats &stats) : Device(stats)
CPUDevice(DeviceInfo& info, Stats &stats, bool background)
: Device(info, stats, background)
{
#ifdef WITH_OSL
kernel_globals.osl = &osl_globals;
@ -401,9 +403,9 @@ public:
}
};
Device *device_cpu_create(DeviceInfo& info, Stats &stats)
Device *device_cpu_create(DeviceInfo& info, Stats &stats, bool background)
{
return new CPUDevice(stats);
return new CPUDevice(info, stats, background);
}
void device_cpu_info(vector<DeviceInfo>& devices)

View File

@ -171,7 +171,8 @@ public:
cuda_assert(cuCtxSetCurrent(NULL));
}
CUDADevice(DeviceInfo& info, Stats &stats, bool background_) : Device(stats)
CUDADevice(DeviceInfo& info, Stats &stats, bool background_)
: Device(info, stats, background_)
{
first_error = true;
background = background_;

View File

@ -21,7 +21,7 @@ CCL_NAMESPACE_BEGIN
class Device;
Device *device_cpu_create(DeviceInfo& info, Stats &stats);
Device *device_cpu_create(DeviceInfo& info, Stats &stats, bool background);
Device *device_opencl_create(DeviceInfo& info, Stats &stats, bool background);
Device *device_cuda_create(DeviceInfo& info, Stats &stats, bool background);
Device *device_network_create(DeviceInfo& info, Stats &stats, const char *address);

View File

@ -45,25 +45,24 @@ public:
device_ptr unique_ptr;
MultiDevice(DeviceInfo& info, Stats &stats, bool background_)
: Device(stats), unique_ptr(1)
: Device(info, stats, background_), unique_ptr(1)
{
Device *device;
background = background_;
foreach(DeviceInfo& subinfo, info.multi_devices) {
device = Device::create(subinfo, stats, background);
devices.push_back(SubDevice(device));
}
#if 0 //def WITH_NETWORK
#ifdef WITH_NETWORK
/* try to add network devices */
ServerDiscovery discovery(true);
time_sleep(1.0);
list<string> servers = discovery.get_server_list();
vector<string> servers = discovery.get_server_list();
foreach(string& server, servers) {
device = device_network_create(info, server.c_str());
device = device_network_create(info, stats, server.c_str());
if(device)
devices.push_back(SubDevice(device));
}

View File

@ -20,9 +20,25 @@
#include "util_foreach.h"
#if defined(WITH_NETWORK)
CCL_NAMESPACE_BEGIN
#ifdef WITH_NETWORK
typedef map<device_ptr, device_ptr> PtrMap;
typedef vector<uint8_t> DataVector;
typedef map<device_ptr, DataVector> DataMap;
/* tile list */
typedef vector<RenderTile> TileList;
/* search a list of tiles and find the one that matches the passed render tile */
static TileList::iterator tile_list_find(TileList& tile_list, RenderTile& tile)
{
for(TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it)
if(tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample)
return it;
return tile_list.end();
}
class NetworkDevice : public Device
{
@ -32,8 +48,10 @@ public:
device_ptr mem_counter;
DeviceTask the_task; /* todo: handle multiple tasks */
NetworkDevice(Stats &stats, const char *address)
: Device(stats), socket(io_service)
thread_mutex rpc_lock;
NetworkDevice(DeviceInfo& info, Stats &stats, const char *address)
: Device(info, stats, true), socket(io_service)
{
stringstream portstr;
portstr << SERVER_PORT;
@ -64,6 +82,8 @@ public:
void mem_alloc(device_memory& mem, MemoryType type)
{
thread_scoped_lock lock(rpc_lock);
mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "mem_alloc");
@ -75,6 +95,8 @@ public:
void mem_copy_to(device_memory& mem)
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_copy_to");
snd.add(mem);
@ -84,6 +106,10 @@ public:
void mem_copy_from(device_memory& mem, int y, int w, int h, int elem)
{
thread_scoped_lock lock(rpc_lock);
size_t data_size = mem.memory_size();
RPCSend snd(socket, "mem_copy_from");
snd.add(mem);
@ -94,11 +120,13 @@ public:
snd.write();
RPCReceive rcv(socket);
rcv.read_buffer((void*)mem.data_pointer, mem.memory_size());
rcv.read_buffer((void*)mem.data_pointer, data_size);
}
void mem_zero(device_memory& mem)
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_zero");
snd.add(mem);
@ -108,6 +136,8 @@ public:
void mem_free(device_memory& mem)
{
if(mem.device_pointer) {
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_free");
snd.add(mem);
@ -119,6 +149,8 @@ public:
void const_copy_to(const char *name, void *host, size_t size)
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "const_copy_to");
string name_string(name);
@ -131,6 +163,8 @@ public:
void tex_alloc(const char *name, device_memory& mem, bool interpolation, bool periodic)
{
thread_scoped_lock lock(rpc_lock);
mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "tex_alloc");
@ -148,6 +182,8 @@ public:
void tex_free(device_memory& mem)
{
if(mem.device_pointer) {
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "tex_free");
snd.add(mem);
@ -157,8 +193,25 @@ public:
}
}
bool load_kernels(bool experimental)
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "load_kernels");
snd.add(experimental);
snd.write();
bool result;
RPCReceive rcv(socket);
rcv.read(result);
return result;
}
void task_add(DeviceTask& task)
{
thread_scoped_lock lock(rpc_lock);
the_task = task;
RPCSend snd(socket, "task_add");
@ -168,55 +221,73 @@ public:
void task_wait()
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "task_wait");
snd.write();
list<RenderTile> the_tiles;
lock.unlock();
TileList the_tiles;
/* todo: run this threaded for connecting to multiple clients */
for(;;) {
RPCReceive rcv(socket);
RenderTile tile;
lock.lock();
RPCReceive rcv(socket);
if(rcv.name == "acquire_tile") {
lock.unlock();
/* todo: watch out for recursive calls! */
if(the_task.acquire_tile(this, tile)) { /* write return as bool */
the_tiles.push_back(tile);
lock.lock();
RPCSend snd(socket, "acquire_tile");
snd.add(tile);
snd.write();
lock.unlock();
}
else {
lock.lock();
RPCSend snd(socket, "acquire_tile_none");
snd.write();
lock.unlock();
}
}
else if(rcv.name == "release_tile") {
rcv.read(tile);
lock.unlock();
for(list<RenderTile>::iterator it = the_tiles.begin(); it != the_tiles.end(); it++) {
if(tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample) {
tile.buffers = it->buffers;
the_tiles.erase(it);
break;
}
TileList::iterator it = tile_list_find(the_tiles, tile);
if (it != the_tiles.end()) {
tile.buffers = it->buffers;
the_tiles.erase(it);
}
assert(tile.buffers != NULL);
the_task.release_tile(tile);
lock.lock();
RPCSend snd(socket, "release_tile");
snd.write();
lock.unlock();
}
else if(rcv.name == "task_wait_done")
else if(rcv.name == "task_wait_done") {
lock.unlock();
break;
}
else
lock.unlock();
}
}
void task_cancel()
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "task_cancel");
snd.write();
}
@ -224,7 +295,7 @@ public:
Device *device_network_create(DeviceInfo& info, Stats &stats, const char *address)
{
return new NetworkDevice(stats, address);
return new NetworkDevice(info, stats, address);
}
void device_network_info(vector<DeviceInfo>& devices)
@ -243,8 +314,10 @@ void device_network_info(vector<DeviceInfo>& devices)
class DeviceServer {
public:
thread_mutex rpc_lock;
DeviceServer(Device *device_, tcp::socket& socket_)
: device(device_), socket(socket_)
: device(device_), socket(socket_), stop(false), blocked_waiting(false)
{
}
@ -252,56 +325,151 @@ public:
{
/* receive remote function calls */
for(;;) {
RPCReceive rcv(socket);
listen_step();
if(rcv.name == "stop")
if(stop)
break;
process(rcv);
}
}
protected:
void process(RPCReceive& rcv)
void listen_step()
{
// fprintf(stderr, "receive process %s\n", rcv.name.c_str());
thread_scoped_lock lock(rpc_lock);
RPCReceive rcv(socket);
if(rcv.name == "stop")
stop = true;
else
process(rcv, lock);
}
/* create a memory buffer for a device buffer and insert it into mem_data */
DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size)
{
/* create a new DataVector and insert it into mem_data */
pair<DataMap::iterator,bool> data_ins = mem_data.insert(
DataMap::value_type(client_pointer, DataVector()));
/* make sure it was a unique insertion */
assert(data_ins.second);
/* get a reference to the inserted vector */
DataVector &data_v = data_ins.first->second;
/* size the vector */
data_v.resize(data_size);
return data_v;
}
DataVector &data_vector_find(device_ptr client_pointer)
{
DataMap::iterator i = mem_data.find(client_pointer);
assert(i != mem_data.end());
return i->second;
}
/* setup mapping and reverse mapping of client_pointer<->real_pointer */
void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer)
{
pair<PtrMap::iterator,bool> mapins;
/* insert mapping from client pointer to our real device pointer */
mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer));
assert(mapins.second);
/* insert reverse mapping from real our device pointer to client pointer */
mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer));
assert(mapins.second);
}
device_ptr device_ptr_from_client_pointer(device_ptr client_pointer)
{
PtrMap::iterator i = ptr_map.find(client_pointer);
assert(i != ptr_map.end());
return i->second;
}
device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer)
{
PtrMap::iterator i = ptr_map.find(client_pointer);
assert(i != ptr_map.end());
device_ptr result = i->second;
/* erase the mapping */
ptr_map.erase(i);
/* erase the reverse mapping */
PtrMap::iterator irev = ptr_imap.find(result);
assert(irev != ptr_imap.end());
ptr_imap.erase(irev);
/* erase the data vector */
DataMap::iterator idata = mem_data.find(client_pointer);
assert(idata != mem_data.end());
mem_data.erase(idata);
return result;
}
/* note that the lock must be already acquired upon entry.
* This is necessary because the caller often peeks at
* the header and delegates control to here when it doesn't
* specifically handle the current RPC.
* The lock must be unlocked before returning */
void process(RPCReceive& rcv, thread_scoped_lock &lock)
{
if(rcv.name == "mem_alloc") {
MemoryType type;
network_device_memory mem;
device_ptr remote_pointer;
device_ptr client_pointer;
rcv.read(mem);
rcv.read(type);
/* todo: CPU needs mem.data_pointer */
lock.unlock();
remote_pointer = mem.device_pointer;
client_pointer = mem.device_pointer;
mem_data[remote_pointer] = vector<uint8_t>();
mem_data[remote_pointer].resize(mem.memory_size());
if(mem.memory_size())
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
/* create a memory buffer for the device buffer */
size_t data_size = mem.memory_size();
DataVector &data_v = data_vector_insert(client_pointer, data_size);
if(data_size)
mem.data_pointer = (device_ptr)&(data_v[0]);
else
mem.data_pointer = 0;
/* perform the allocation on the actual device */
device->mem_alloc(mem, type);
ptr_map[remote_pointer] = mem.device_pointer;
ptr_imap[mem.device_pointer] = remote_pointer;
/* store a mapping to/from client_pointer and real device pointer */
pointer_mapping_insert(client_pointer, mem.device_pointer);
}
else if(rcv.name == "mem_copy_to") {
network_device_memory mem;
rcv.read(mem);
lock.unlock();
device_ptr remote_pointer = mem.device_pointer;
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
device_ptr client_pointer = mem.device_pointer;
rcv.read_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
DataVector &data_v = data_vector_find(client_pointer);
mem.device_pointer = ptr_map[remote_pointer];
size_t data_size = mem.memory_size();
/* get pointer to memory buffer for device buffer */
mem.data_pointer = (device_ptr)&data_v[0];
/* copy data from network into memory buffer */
rcv.read_buffer((uint8_t*)mem.data_pointer, data_size);
/* translate the client pointer to a real device pointer */
mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
/* copy the data from the memory buffer to the device buffer */
device->mem_copy_to(mem);
}
else if(rcv.name == "mem_copy_from") {
@ -314,37 +482,47 @@ protected:
rcv.read(h);
rcv.read(elem);
device_ptr remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[remote_pointer];
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
device_ptr client_pointer = mem.device_pointer;
mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
DataVector &data_v = data_vector_find(client_pointer);
mem.data_pointer = (device_ptr)&(data_v[0]);
device->mem_copy_from(mem, y, w, h, elem);
size_t data_size = mem.memory_size();
RPCSend snd(socket);
snd.write();
snd.write_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
snd.write_buffer((uint8_t*)mem.data_pointer, data_size);
lock.unlock();
}
else if(rcv.name == "mem_zero") {
network_device_memory mem;
rcv.read(mem);
device_ptr remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
lock.unlock();
device_ptr client_pointer = mem.device_pointer;
mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
DataVector &data_v = data_vector_find(client_pointer);
mem.data_pointer = (device_ptr)&(data_v[0]);
device->mem_zero(mem);
}
else if(rcv.name == "mem_free") {
network_device_memory mem;
device_ptr remote_pointer;
device_ptr client_pointer;
rcv.read(mem);
lock.unlock();
remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
ptr_map.erase(remote_pointer);
ptr_imap.erase(mem.device_pointer);
mem_data.erase(remote_pointer);
client_pointer = mem.device_pointer;
mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer);
device->mem_free(mem);
}
@ -357,6 +535,7 @@ protected:
vector<char> host_vector(size);
rcv.read_buffer(&host_vector[0], size);
lock.unlock();
device->const_copy_to(name_string.c_str(), &host_vector[0], size);
}
@ -365,53 +544,76 @@ protected:
string name;
bool interpolation;
bool periodic;
device_ptr remote_pointer;
device_ptr client_pointer;
rcv.read(name);
rcv.read(mem);
rcv.read(interpolation);
rcv.read(periodic);
lock.unlock();
remote_pointer = mem.device_pointer;
client_pointer = mem.device_pointer;
mem_data[remote_pointer] = vector<uint8_t>();
mem_data[remote_pointer].resize(mem.memory_size());
if(mem.memory_size())
mem.data_pointer = (device_ptr)&(mem_data[remote_pointer][0]);
size_t data_size = mem.memory_size();
DataVector &data_v = data_vector_insert(client_pointer, data_size);
if(data_size)
mem.data_pointer = (device_ptr)&(data_v[0]);
else
mem.data_pointer = 0;
rcv.read_buffer((uint8_t*)mem.data_pointer, mem.memory_size());
rcv.read_buffer((uint8_t*)mem.data_pointer, data_size);
device->tex_alloc(name.c_str(), mem, interpolation, periodic);
ptr_map[remote_pointer] = mem.device_pointer;
ptr_imap[mem.device_pointer] = remote_pointer;
pointer_mapping_insert(client_pointer, mem.device_pointer);
}
else if(rcv.name == "tex_free") {
network_device_memory mem;
device_ptr remote_pointer;
device_ptr client_pointer;
rcv.read(mem);
lock.unlock();
remote_pointer = mem.device_pointer;
mem.device_pointer = ptr_map[mem.device_pointer];
ptr_map.erase(remote_pointer);
ptr_map.erase(mem.device_pointer);
mem_data.erase(remote_pointer);
client_pointer = mem.device_pointer;
mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer);
device->tex_free(mem);
}
else if(rcv.name == "load_kernels") {
bool experimental;
rcv.read(experimental);
bool result;
result = device->load_kernels(experimental);
RPCSend snd(socket);
snd.add(result);
snd.write();
lock.unlock();
}
else if(rcv.name == "task_add") {
DeviceTask task;
rcv.read(task);
lock.unlock();
if(task.buffer)
task.buffer = device_ptr_from_client_pointer(task.buffer);
if(task.rgba_half)
task.rgba_half = device_ptr_from_client_pointer(task.rgba_half);
if(task.rgba_byte)
task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte);
if(task.shader_input)
task.shader_input = device_ptr_from_client_pointer(task.shader_input);
if(task.shader_output)
task.shader_output = device_ptr_from_client_pointer(task.shader_output);
if(task.buffer) task.buffer = ptr_map[task.buffer];
if(task.rgba_byte) task.rgba_byte = ptr_map[task.rgba_byte];
if(task.rgba_half) task.rgba_half = ptr_map[task.rgba_half];
if(task.shader_input) task.shader_input = ptr_map[task.shader_input];
if(task.shader_output) task.shader_output = ptr_map[task.shader_output];
task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2);
task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1);
@ -422,14 +624,44 @@ protected:
device->task_add(task);
}
else if(rcv.name == "task_wait") {
device->task_wait();
lock.unlock();
blocked_waiting = true;
device->task_wait();
blocked_waiting = false;
lock.lock();
RPCSend snd(socket, "task_wait_done");
snd.write();
lock.unlock();
}
else if(rcv.name == "task_cancel") {
lock.unlock();
device->task_cancel();
}
else if(rcv.name == "acquire_tile") {
AcquireEntry entry;
entry.name = rcv.name;
rcv.read(entry.tile);
acquire_queue.push_back(entry);
lock.unlock();
}
else if(rcv.name == "acquire_tile_none") {
AcquireEntry entry;
entry.name = rcv.name;
acquire_queue.push_back(entry);
lock.unlock();
}
else if(rcv.name == "release_tile") {
AcquireEntry entry;
entry.name = rcv.name;
acquire_queue.push_back(entry);
lock.unlock();
}
else {
cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n";
lock.unlock();
}
}
bool task_acquire_tile(Device *device, RenderTile& tile)
@ -441,23 +673,34 @@ protected:
RPCSend snd(socket, "acquire_tile");
snd.write();
while(1) {
RPCReceive rcv(socket);
do {
if(blocked_waiting)
listen_step();
if(rcv.name == "acquire_tile") {
rcv.read(tile);
/* todo: avoid busy wait loop */
thread_scoped_lock lock(rpc_lock);
if(tile.buffer) tile.buffer = ptr_map[tile.buffer];
if(tile.rng_state) tile.rng_state = ptr_map[tile.rng_state];
if(!acquire_queue.empty()) {
AcquireEntry entry = acquire_queue.front();
acquire_queue.pop_front();
result = true;
break;
if(entry.name == "acquire_tile") {
tile = entry.tile;
if(tile.buffer) tile.buffer = ptr_map[tile.buffer];
if(tile.rng_state) tile.rng_state = ptr_map[tile.rng_state];
result = true;
break;
}
else if(entry.name == "acquire_tile_none") {
break;
}
else {
cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
}
}
else if(rcv.name == "acquire_tile_none")
break;
else
process(rcv);
}
} while(acquire_queue.empty() && !stop);
return result;
}
@ -479,18 +722,34 @@ protected:
if(tile.buffer) tile.buffer = ptr_imap[tile.buffer];
if(tile.rng_state) tile.rng_state = ptr_imap[tile.rng_state];
RPCSend snd(socket, "release_tile");
snd.add(tile);
snd.write();
while(1) {
RPCReceive rcv(socket);
if(rcv.name == "release_tile")
break;
else
process(rcv);
{
thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "release_tile");
snd.add(tile);
snd.write();
lock.unlock();
}
do {
if(blocked_waiting)
listen_step();
/* todo: avoid busy wait loop */
thread_scoped_lock lock(rpc_lock);
if(!acquire_queue.empty()) {
AcquireEntry entry = acquire_queue.front();
acquire_queue.pop_front();
if(entry.name == "release_tile") {
lock.unlock();
break;
}
else {
cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n";
}
}
} while(acquire_queue.empty() && !stop);
}
bool task_get_cancel()
@ -503,11 +762,20 @@ protected:
tcp::socket& socket;
/* mapping of remote to local pointer */
map<device_ptr, device_ptr> ptr_map;
map<device_ptr, device_ptr> ptr_imap;
map<device_ptr, vector<uint8_t> > mem_data;
PtrMap ptr_map;
PtrMap ptr_imap;
DataMap mem_data;
struct AcquireEntry {
string name;
RenderTile tile;
};
thread_mutex acquire_mutex;
list<AcquireEntry> acquire_queue;
bool stop;
bool blocked_waiting;
/* todo: free memory and device (osl) on network error */
};
@ -540,7 +808,8 @@ void Device::server_run()
}
}
#endif
CCL_NAMESPACE_END
#endif

View File

@ -28,6 +28,8 @@
#include <boost/thread.hpp>
#include <iostream>
#include <sstream>
#include <deque>
#include "buffers.h"
@ -70,12 +72,12 @@ public:
: name(name_), socket(socket_), archive(archive_stream), sent(false)
{
archive & name_;
fprintf(stderr, "rpc send %s\n", name.c_str());
}
~RPCSend()
{
if(!sent)
fprintf(stderr, "Error: RPC %s not sent\n", name.c_str());
}
void add(const device_memory& mem)
@ -98,13 +100,14 @@ public:
archive & task.offset & task.stride;
archive & task.shader_input & task.shader_output & task.shader_eval_type;
archive & task.shader_x & task.shader_w;
archive & task.need_finish_queue;
}
void add(const RenderTile& tile)
{
archive & tile.x & tile.y & tile.w & tile.h;
archive & tile.start_sample & tile.num_samples & tile.sample;
archive & tile.offset & tile.stride;
archive & tile.resolution & tile.offset & tile.stride;
archive & tile.buffer & tile.rng_state;
}
@ -178,6 +181,7 @@ public:
size_t data_size;
if((header_stream >> hex >> data_size)) {
vector<char> data(data_size);
size_t len = boost::asio::read(socket, boost::asio::buffer(data));
@ -191,15 +195,19 @@ public:
archive = new boost::archive::text_iarchive(*archive_stream);
*archive & name;
fprintf(stderr, "rpc receive %s\n", name.c_str());
}
else {
cout << "Network receive error: data size doesn't match header\n";
}
else
cout << "Network receive error: data size doens't match header\n";
}
else
else {
cout << "Network receive error: can't decode data size from header\n";
}
}
else
else {
cout << "Network receive error: invalid header size\n";
}
}
~RPCReceive()
@ -235,9 +243,10 @@ public:
*archive & type & task.x & task.y & task.w & task.h;
*archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples;
*archive & task.resolution & task.offset & task.stride;
*archive & task.offset & task.stride;
*archive & task.shader_input & task.shader_output & task.shader_eval_type;
*archive & task.shader_x & task.shader_w;
*archive & task.need_finish_queue;
task.type = (DeviceTask::Type)type;
}
@ -247,7 +256,7 @@ public:
*archive & tile.x & tile.y & tile.w & tile.h;
*archive & tile.start_sample & tile.num_samples & tile.sample;
*archive & tile.resolution & tile.offset & tile.stride;
*archive & tile.buffer & tile.rng_state & tile.rgba_byte & tile.rgba_half;
*archive & tile.buffer & tile.rng_state;
tile.buffers = NULL;
}
@ -303,12 +312,12 @@ public:
delete work;
}
list<string> get_server_list()
vector<string> get_server_list()
{
list<string> result;
vector<string> result;
mutex.lock();
result = servers;
result = vector<string>(servers.begin(), servers.end());
mutex.unlock();
return result;
@ -333,11 +342,8 @@ private:
mutex.lock();
/* add address if it's not already in the list */
bool found = false;
foreach(string& server, servers)
if(server == address)
found = true;
bool found = std::find(servers.begin(), servers.end(),
address) != servers.end();
if(!found)
servers.push_back(address);
@ -393,10 +399,21 @@ private:
/* buffer and endpoint for receiving messages */
char receive_buffer[256];
boost::asio::ip::udp::endpoint receive_endpoint;
// os, version, devices, status, host name, group name, ip as far as fields go
struct ServerInfo {
string cycles_version;
string os;
int device_count;
string status;
string host_name;
string group_name;
string host_addr;
};
/* collection of server addresses in list */
bool collect_servers;
list<string> servers;
vector<string> servers;
};
CCL_NAMESPACE_END

View File

@ -423,9 +423,8 @@ public:
}
OpenCLDevice(DeviceInfo& info, Stats &stats, bool background_)
: Device(stats)
: Device(info, stats, background_)
{
background = background_;
cpPlatform = NULL;
cdDevice = NULL;
cxContext = NULL;

View File

@ -95,7 +95,7 @@ Session::~Session()
wait();
}
if(display && params.output_path != "") {
if(display && !params.output_path.empty()) {
tonemap();
progress.set_status("Writing Image", params.output_path);
@ -242,7 +242,7 @@ void Session::run_gpu()
/* update scene */
update_scene();
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
if(progress.get_cancel())
@ -263,7 +263,7 @@ void Session::run_gpu()
device->task_wait();
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
/* update status and timing */
@ -283,7 +283,7 @@ void Session::run_gpu()
}
}
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
tiles_written = update_progressive_refine(progress.get_cancel());
@ -531,7 +531,7 @@ void Session::run_cpu()
/* update scene */
update_scene();
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
if(progress.get_cancel())
@ -549,7 +549,7 @@ void Session::run_cpu()
if(!params.background)
need_tonemap = true;
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
}
@ -571,7 +571,7 @@ void Session::run_cpu()
tonemap();
}
if(device->error_message() != "")
if(!device->error_message().empty())
progress.set_cancel(device->error_message());
tiles_written = update_progressive_refine(progress.get_cancel());
@ -592,7 +592,7 @@ void Session::run()
if(!device->load_kernels(params.experimental)) {
string message = device->error_message();
if(message == "")
if(message.empty())
message = "Failed loading render kernel, see console for errors";
progress.set_status("Error", message);
@ -796,7 +796,7 @@ void Session::update_status_time(bool show_pause, bool show_done)
}
else {
status = substatus;
substatus = "";
substatus.clear();
}
progress.set_status(status, substatus);

View File

@ -96,5 +96,14 @@ bool string_endswith(const string& s, const char *end)
return strncmp(s.c_str() + s.size() - len, end, len) == 0;
}
string string_strip(const string& s)
{
string result = s;
result.erase(0, result.find_first_not_of(' '));
result.erase(result.find_last_not_of(' ') + 1);
return result;
}
CCL_NAMESPACE_END

View File

@ -41,6 +41,7 @@ string string_printf(const char *format, ...) PRINTF_ATTRIBUTE;
bool string_iequals(const string& a, const string& b);
void string_split(vector<string>& tokens, const string& str, const string& separators = "\t ");
bool string_endswith(const string& s, const char *end);
string string_strip(const string& s);
CCL_NAMESPACE_END

View File

@ -101,9 +101,7 @@ string system_cpu_brand_string()
replace_string(brand, "(TM)", "");
replace_string(brand, "(R)", "");
size_t i;
if((i = brand.find(" ")) != string::npos)
brand = brand.substr(0, i);
brand = string_strip(brand);
return brand;
}