diff --git a/distrib/distrib.cc b/distrib/distrib.cc index b04570f..9040fab 100644 --- a/distrib/distrib.cc +++ b/distrib/distrib.cc @@ -8,18 +8,24 @@ #include #include /* gethostbyname() */ #include /* memset() */ +#include /* fcntl(), F_GETFD, F_SETFD, FD_CLOEXEC */ #include #include #include using namespace std; +#define MSG_WANT_DATA 1 +#define MSG_SEND_DATA 2 + distrib::distrib() { pthread_cond_init(&m_listen_cond, NULL); pthread_mutex_init(&m_listen_mutex, NULL); + pthread_mutex_init(&m_task_mutex, NULL); m_num_clients = 0; m_data = NULL; m_server = true; + m_next_task = 0; } int distrib::readHostFile(const char * filename) @@ -103,6 +109,24 @@ int distrib::clientConnect(const string & host, return 0; } +void connection_thread(distrib::connection_thread_arg_t * arg) +{ + distrib * the_distrib = arg->the_distrib; + int client_socket = arg->client_socket; + delete arg; + + /* loop listening for messages from the client */ + for (;;) + { + int msg_type; + int nread = read(client_socket, &msg_type, sizeof(msg_type)); + if (nread < 0) + break; + cout << "Read message type " << msg_type << "!" << endl; + } + close(client_socket); +} + void distrib_server(distrib * the_distrib) { char hostname[1000]; @@ -116,6 +140,10 @@ void distrib_server(distrib * the_distrib) exit(39); } + int flags = fcntl(listen_socket, F_GETFD); + flags |= FD_CLOEXEC; + fcntl(listen_socket, F_SETFD, flags); + if ( listen(listen_socket, 5) == -1 ) { cerr << "Error " << errno << " when trying to listen!" << endl; @@ -146,6 +174,29 @@ void distrib_server(distrib * the_distrib) pthread_mutex_lock(&the_distrib->m_listen_mutex); pthread_cond_signal(&the_distrib->m_listen_cond); pthread_mutex_unlock(&the_distrib->m_listen_mutex); + + for (;;) + { + struct sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + int client_socket = accept(listen_socket, + (struct sockaddr *) &client_addr, + &client_addr_len); + + if (client_socket < 0) + break; + + distrib::connection_thread_arg_t * arg = + new distrib::connection_thread_arg_t; + arg->the_distrib = the_distrib; + arg->client_socket = client_socket; + + pthread_t client_thread; + pthread_create(&client_thread, + NULL, + (void * (*)(void *)) &connection_thread, + arg); + } } int distrib::startServer() @@ -201,17 +252,55 @@ int distrib::startClient(const char * server, int port) int distrib::send_data(int task, unsigned char * data, int num_bytes) { - /* TODO: finish */ + if (m_server) + { + if (m_data != NULL) + { + int num_to_copy = num_bytes; + if (3 * task * UNIT_TASK_SIZE + num_to_copy > m_data_size) + num_to_copy = m_data_size - 3 * task * UNIT_TASK_SIZE; + if (num_to_copy > 0) + { + memcpy(m_data + 3 * task * UNIT_TASK_SIZE, data, num_to_copy); + } + } + } + else + { + int msg_header = MSG_SEND_DATA; /* send data */ + if ( write(m_client_socket, &msg_header, sizeof(msg_header)) < 0 + || write(m_client_socket, data, num_bytes) < 0) + { + return -1; + } + } + return 0; } int distrib::getTask() { - /* TODO: finish */ if (m_server) { + pthread_mutex_lock(&m_task_mutex); + int task = -1; + if (m_next_task < m_num_tasks) + { + task = m_next_task; + m_next_task++; + } + pthread_mutex_unlock(&m_task_mutex); + return task; } else { + int msg_header = MSG_WANT_DATA; + if (write(m_client_socket, &msg_header, sizeof(msg_header)) < 0) + return -1; + + /* wait for a message back */ + int task = 0; + if (read(m_client_socket, &task, sizeof(task)) < 0) + return -1; + return task; } - return -1; } diff --git a/distrib/distrib.h b/distrib/distrib.h index 0635394..dfa4820 100644 --- a/distrib/distrib.h +++ b/distrib/distrib.h @@ -6,6 +6,8 @@ #include #include +#define UNIT_TASK_SIZE 50 + class distrib { public: @@ -15,11 +17,23 @@ class distrib int startClient(const char * server, int port); int startClients(const std::vector & client_options); int getNumClients() { return m_num_clients; } - void set_data(unsigned char * data) { m_data = data; } + void set_data(unsigned char * data, int size) + { + m_data = data; + m_data_size = size; + } + void set_num_tasks(int num_tasks) { m_num_tasks = num_tasks; } int getTask(); int send_data(int task, unsigned char * data, int num_bytes); + typedef struct + { + distrib * the_distrib; + int client_socket; + } connection_thread_arg_t; + friend void distrib_server(distrib * the_distrib); + friend void connection_thread(connection_thread_arg_t * arg); protected: int clientConnect(const std::string & host, @@ -29,14 +43,17 @@ class distrib std::vector m_children; std::string m_servername; int m_serverport; - int m_listen_socket; int m_client_socket; pthread_t m_server_thread; pthread_cond_t m_listen_cond; pthread_mutex_t m_listen_mutex; int m_num_clients; unsigned char * m_data; + int m_data_size; + int m_num_tasks; + int m_next_task; bool m_server; + pthread_mutex_t m_task_mutex; }; #endif diff --git a/main/Scene.cc b/main/Scene.cc index 2b0c83d..58c2d69 100644 --- a/main/Scene.cc +++ b/main/Scene.cc @@ -104,6 +104,12 @@ Scene::Scene(const map & options, m_distrib.readHostFile(m_hosts_file.c_str()); m_distrib.startServer(); m_distrib.startClients(m_client_options); + + m_data = new unsigned char[m_width * m_height * 3]; /* 24bpp */ + int num_tasks = (m_width * m_height + (UNIT_TASK_SIZE - 1)) + / UNIT_TASK_SIZE; + m_distrib.set_num_tasks(num_tasks); + m_distrib.set_data(m_data, 3 * m_width * m_height); } else { @@ -140,8 +146,6 @@ void Scene::render() cout << "----------------------------------------" << endl; } - m_data = new unsigned char[m_width * m_height * 3]; /* 24bpp */ - if (m_distrib.getNumClients() < 1) { for (int i = 0; i < m_height; i++) @@ -154,9 +158,9 @@ void Scene::render() } else { - m_distrib.set_data(m_data); /* work on tasks in this thread until there are no more */ taskLoop(); + /* TODO: wait until all data has arrived */ } if (m_verbose) @@ -176,25 +180,29 @@ void Scene::render() void Scene::taskLoop() { + unsigned char data[3 * UNIT_TASK_SIZE]; for (;;) { int task_id = m_distrib.getTask(); if (task_id < 0) break; - unsigned char data[3 * UNIT_TASK_SIZE]; int i = task_id / m_width; int j = task_id % m_width; for (int t = 0; t < UNIT_TASK_SIZE; t++) { renderPixel(j, i, &data[3 * t]); j++; - if (j == m_width) + if (j >= m_width) { j = 0; i++; + if (i >= m_height) + break; } } - m_distrib.send_data(task_id, data, 3 * UNIT_TASK_SIZE); + int ret = m_distrib.send_data(task_id, data, 3 * UNIT_TASK_SIZE); + if (ret != 0) + break; } } diff --git a/main/Scene.h b/main/Scene.h index 2f2daf0..dd1de38 100644 --- a/main/Scene.h +++ b/main/Scene.h @@ -23,7 +23,6 @@ #include "Light.h" #define SCENE_FACTOR_THRESHOLD 0.02 -#define UNIT_TASK_SIZE 50 class Scene {