set sockets to close on exec

git-svn-id: svn://anubis/fart/trunk@228 7f9b0f55-74a9-4bce-be96-3c2cd072584d
This commit is contained in:
Josh Holtrop 2009-04-06 21:57:18 +00:00
parent 68450b8c4d
commit 8424e34a91
4 changed files with 125 additions and 12 deletions

View File

@ -8,18 +8,24 @@
#include <errno.h> #include <errno.h>
#include <netdb.h> /* gethostbyname() */ #include <netdb.h> /* gethostbyname() */
#include <string.h> /* memset() */ #include <string.h> /* memset() */
#include <fcntl.h> /* fcntl(), F_GETFD, F_SETFD, FD_CLOEXEC */
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/ip.h> #include <netinet/ip.h>
using namespace std; using namespace std;
#define MSG_WANT_DATA 1
#define MSG_SEND_DATA 2
distrib::distrib() distrib::distrib()
{ {
pthread_cond_init(&m_listen_cond, NULL); pthread_cond_init(&m_listen_cond, NULL);
pthread_mutex_init(&m_listen_mutex, NULL); pthread_mutex_init(&m_listen_mutex, NULL);
pthread_mutex_init(&m_task_mutex, NULL);
m_num_clients = 0; m_num_clients = 0;
m_data = NULL; m_data = NULL;
m_server = true; m_server = true;
m_next_task = 0;
} }
int distrib::readHostFile(const char * filename) int distrib::readHostFile(const char * filename)
@ -103,6 +109,24 @@ int distrib::clientConnect(const string & host,
return 0; return 0;
} }
void connection_thread(distrib::connection_thread_arg_t * arg)
{
distrib * the_distrib = arg->the_distrib;
int client_socket = arg->client_socket;
delete arg;
/* loop listening for messages from the client */
for (;;)
{
int msg_type;
int nread = read(client_socket, &msg_type, sizeof(msg_type));
if (nread < 0)
break;
cout << "Read message type " << msg_type << "!" << endl;
}
close(client_socket);
}
void distrib_server(distrib * the_distrib) void distrib_server(distrib * the_distrib)
{ {
char hostname[1000]; char hostname[1000];
@ -116,6 +140,10 @@ void distrib_server(distrib * the_distrib)
exit(39); exit(39);
} }
int flags = fcntl(listen_socket, F_GETFD);
flags |= FD_CLOEXEC;
fcntl(listen_socket, F_SETFD, flags);
if ( listen(listen_socket, 5) == -1 ) if ( listen(listen_socket, 5) == -1 )
{ {
cerr << "Error " << errno << " when trying to listen!" << endl; cerr << "Error " << errno << " when trying to listen!" << endl;
@ -146,6 +174,29 @@ void distrib_server(distrib * the_distrib)
pthread_mutex_lock(&the_distrib->m_listen_mutex); pthread_mutex_lock(&the_distrib->m_listen_mutex);
pthread_cond_signal(&the_distrib->m_listen_cond); pthread_cond_signal(&the_distrib->m_listen_cond);
pthread_mutex_unlock(&the_distrib->m_listen_mutex); pthread_mutex_unlock(&the_distrib->m_listen_mutex);
for (;;)
{
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_socket = accept(listen_socket,
(struct sockaddr *) &client_addr,
&client_addr_len);
if (client_socket < 0)
break;
distrib::connection_thread_arg_t * arg =
new distrib::connection_thread_arg_t;
arg->the_distrib = the_distrib;
arg->client_socket = client_socket;
pthread_t client_thread;
pthread_create(&client_thread,
NULL,
(void * (*)(void *)) &connection_thread,
arg);
}
} }
int distrib::startServer() int distrib::startServer()
@ -201,17 +252,55 @@ int distrib::startClient(const char * server, int port)
int distrib::send_data(int task, unsigned char * data, int num_bytes) int distrib::send_data(int task, unsigned char * data, int num_bytes)
{ {
/* TODO: finish */ if (m_server)
{
if (m_data != NULL)
{
int num_to_copy = num_bytes;
if (3 * task * UNIT_TASK_SIZE + num_to_copy > m_data_size)
num_to_copy = m_data_size - 3 * task * UNIT_TASK_SIZE;
if (num_to_copy > 0)
{
memcpy(m_data + 3 * task * UNIT_TASK_SIZE, data, num_to_copy);
}
}
}
else
{
int msg_header = MSG_SEND_DATA; /* send data */
if ( write(m_client_socket, &msg_header, sizeof(msg_header)) < 0
|| write(m_client_socket, data, num_bytes) < 0)
{
return -1;
}
}
return 0;
} }
int distrib::getTask() int distrib::getTask()
{ {
/* TODO: finish */
if (m_server) if (m_server)
{ {
pthread_mutex_lock(&m_task_mutex);
int task = -1;
if (m_next_task < m_num_tasks)
{
task = m_next_task;
m_next_task++;
}
pthread_mutex_unlock(&m_task_mutex);
return task;
} }
else else
{ {
int msg_header = MSG_WANT_DATA;
if (write(m_client_socket, &msg_header, sizeof(msg_header)) < 0)
return -1;
/* wait for a message back */
int task = 0;
if (read(m_client_socket, &task, sizeof(task)) < 0)
return -1;
return task;
} }
return -1;
} }

View File

@ -6,6 +6,8 @@
#include <vector> #include <vector>
#include <pthread.h> #include <pthread.h>
#define UNIT_TASK_SIZE 50
class distrib class distrib
{ {
public: public:
@ -15,11 +17,23 @@ class distrib
int startClient(const char * server, int port); int startClient(const char * server, int port);
int startClients(const std::vector<std::string> & client_options); int startClients(const std::vector<std::string> & client_options);
int getNumClients() { return m_num_clients; } int getNumClients() { return m_num_clients; }
void set_data(unsigned char * data) { m_data = data; } void set_data(unsigned char * data, int size)
{
m_data = data;
m_data_size = size;
}
void set_num_tasks(int num_tasks) { m_num_tasks = num_tasks; }
int getTask(); int getTask();
int send_data(int task, unsigned char * data, int num_bytes); int send_data(int task, unsigned char * data, int num_bytes);
typedef struct
{
distrib * the_distrib;
int client_socket;
} connection_thread_arg_t;
friend void distrib_server(distrib * the_distrib); friend void distrib_server(distrib * the_distrib);
friend void connection_thread(connection_thread_arg_t * arg);
protected: protected:
int clientConnect(const std::string & host, int clientConnect(const std::string & host,
@ -29,14 +43,17 @@ class distrib
std::vector<int> m_children; std::vector<int> m_children;
std::string m_servername; std::string m_servername;
int m_serverport; int m_serverport;
int m_listen_socket;
int m_client_socket; int m_client_socket;
pthread_t m_server_thread; pthread_t m_server_thread;
pthread_cond_t m_listen_cond; pthread_cond_t m_listen_cond;
pthread_mutex_t m_listen_mutex; pthread_mutex_t m_listen_mutex;
int m_num_clients; int m_num_clients;
unsigned char * m_data; unsigned char * m_data;
int m_data_size;
int m_num_tasks;
int m_next_task;
bool m_server; bool m_server;
pthread_mutex_t m_task_mutex;
}; };
#endif #endif

View File

@ -104,6 +104,12 @@ Scene::Scene(const map<string, const char *> & options,
m_distrib.readHostFile(m_hosts_file.c_str()); m_distrib.readHostFile(m_hosts_file.c_str());
m_distrib.startServer(); m_distrib.startServer();
m_distrib.startClients(m_client_options); m_distrib.startClients(m_client_options);
m_data = new unsigned char[m_width * m_height * 3]; /* 24bpp */
int num_tasks = (m_width * m_height + (UNIT_TASK_SIZE - 1))
/ UNIT_TASK_SIZE;
m_distrib.set_num_tasks(num_tasks);
m_distrib.set_data(m_data, 3 * m_width * m_height);
} }
else else
{ {
@ -140,8 +146,6 @@ void Scene::render()
cout << "----------------------------------------" << endl; cout << "----------------------------------------" << endl;
} }
m_data = new unsigned char[m_width * m_height * 3]; /* 24bpp */
if (m_distrib.getNumClients() < 1) if (m_distrib.getNumClients() < 1)
{ {
for (int i = 0; i < m_height; i++) for (int i = 0; i < m_height; i++)
@ -154,9 +158,9 @@ void Scene::render()
} }
else else
{ {
m_distrib.set_data(m_data);
/* work on tasks in this thread until there are no more */ /* work on tasks in this thread until there are no more */
taskLoop(); taskLoop();
/* TODO: wait until all data has arrived */
} }
if (m_verbose) if (m_verbose)
@ -176,25 +180,29 @@ void Scene::render()
void Scene::taskLoop() void Scene::taskLoop()
{ {
unsigned char data[3 * UNIT_TASK_SIZE];
for (;;) for (;;)
{ {
int task_id = m_distrib.getTask(); int task_id = m_distrib.getTask();
if (task_id < 0) if (task_id < 0)
break; break;
unsigned char data[3 * UNIT_TASK_SIZE];
int i = task_id / m_width; int i = task_id / m_width;
int j = task_id % m_width; int j = task_id % m_width;
for (int t = 0; t < UNIT_TASK_SIZE; t++) for (int t = 0; t < UNIT_TASK_SIZE; t++)
{ {
renderPixel(j, i, &data[3 * t]); renderPixel(j, i, &data[3 * t]);
j++; j++;
if (j == m_width) if (j >= m_width)
{ {
j = 0; j = 0;
i++; i++;
if (i >= m_height)
break;
} }
} }
m_distrib.send_data(task_id, data, 3 * UNIT_TASK_SIZE); int ret = m_distrib.send_data(task_id, data, 3 * UNIT_TASK_SIZE);
if (ret != 0)
break;
} }
} }

View File

@ -23,7 +23,6 @@
#include "Light.h" #include "Light.h"
#define SCENE_FACTOR_THRESHOLD 0.02 #define SCENE_FACTOR_THRESHOLD 0.02
#define UNIT_TASK_SIZE 50
class Scene class Scene
{ {