#ifndef DISTRIB_H #define DISTRIB_H DISTRIB_H #include #include #include #include #define UNIT_TASK_SIZE 100 class distrib { public: distrib(); ~distrib(); int readHostFile(const char * filename); int startServer(); int startClient(const char * server, int port); int startClients(const std::vector & client_options); int getNumClients() { return m_num_clients; } void set_data(unsigned char * data, int size) { m_data = data; m_data_size = size; } void set_num_tasks(int num_tasks) { m_num_tasks = num_tasks; } int getTask(); int send_data(int task, unsigned char * data, int num_bytes); int getNumTasksInProgress() { return m_tasks_in_progress.size(); } void waitAllTasks(); std::string & getServerName() { return m_servername; } int getServerPort() { return m_serverport; } typedef struct { distrib * the_distrib; int client_socket; } connection_thread_arg_t; friend void distrib_server(distrib * the_distrib); friend void connection_thread(connection_thread_arg_t * arg); protected: int clientConnect(const std::string & host, const std::vector & client_options); void startTask(int task); void taskDone(int task); std::vector m_hosts; std::vector m_children; std::string m_servername; int m_serverport; int m_client_socket; pthread_t m_server_thread; pthread_cond_t m_listen_cond; pthread_mutex_t m_listen_mutex; int m_num_clients; unsigned char * m_data; int m_data_size; int m_num_tasks; int m_tasks_complete; int m_next_task; bool m_server; pthread_mutex_t m_task_mutex; std::map m_tasks_in_progress; pthread_mutex_t m_tasks_in_progress_mutex; pthread_mutex_t m_tasks_complete_mutex; pthread_cond_t m_tasks_complete_cond; }; #endif