diff --git a/distrib/distrib.cc b/distrib/distrib.cc index 5d9a8e9..c16571c 100644 --- a/distrib/distrib.cc +++ b/distrib/distrib.cc @@ -304,17 +304,18 @@ int distrib::send_data(int task, unsigned char * data, int num_bytes) int distrib::getTask() { + int task = -1; if (m_server) { pthread_mutex_lock(&m_task_mutex); - int task = -1; if (m_next_task < m_num_tasks) { task = m_next_task; m_next_task++; } pthread_mutex_unlock(&m_task_mutex); - return task; + if (task > -1) + recordTask(task); } else { @@ -323,9 +324,22 @@ int distrib::getTask() return -1; /* wait for a message back */ - int task = 0; if (read(m_client_socket, &task, sizeof(task)) < 0) return -1; - return task; } + return task; +} + +void distrib::recordTask(int task) +{ + pthread_mutex_lock(&m_tasks_in_progress_mutex); + m_tasks_in_progress[task] = 1; + pthread_mutex_unlock(&m_tasks_in_progress_mutex); +} + +void distrib::taskDone(int task) +{ + pthread_mutex_lock(&m_tasks_in_progress_mutex); + m_tasks_in_progress.erase(task); + pthread_mutex_unlock(&m_tasks_in_progress_mutex); } diff --git a/distrib/distrib.h b/distrib/distrib.h index dfa4820..eabf9c9 100644 --- a/distrib/distrib.h +++ b/distrib/distrib.h @@ -4,6 +4,7 @@ #include #include +#include #include #define UNIT_TASK_SIZE 50 @@ -25,6 +26,7 @@ class distrib void set_num_tasks(int num_tasks) { m_num_tasks = num_tasks; } int getTask(); int send_data(int task, unsigned char * data, int num_bytes); + int getNumTasksInProgress() { return m_tasks_in_progress.size(); } typedef struct { @@ -38,6 +40,8 @@ class distrib protected: int clientConnect(const std::string & host, const std::vector & client_options); + void recordTask(int task); + void taskDone(int task); std::vector m_hosts; std::vector m_children; @@ -54,6 +58,8 @@ class distrib int m_next_task; bool m_server; pthread_mutex_t m_task_mutex; + std::map m_tasks_in_progress; + pthread_mutex_t m_tasks_in_progress_mutex; }; #endif diff --git a/main/Scene.cc b/main/Scene.cc index b19cdf6..6f7b270 100644 --- a/main/Scene.cc +++ b/main/Scene.cc @@ -160,7 +160,13 @@ void Scene::render() { /* work on tasks in this thread until there are no more */ taskLoop(); - /* TODO: wait until all data has arrived */ + /* TODO: change wait condition */ + for (;;) + { + if (m_distrib.getNumTasksInProgress() == 0) + break; + usleep(100000); + } } if (m_verbose)