FART is my semester project for the Computer Science 658 class at Grand Valley State University. My goal is for FART to be a distributed, fault-tolerant, object-oriented raytracer. The name is a recursive acronym similar to GNU or LAME.
The input to the program will be a scene description file. Scene description files are structured ASCII documents. The format is a hierarchical layout of scene options and elements. After the scene file is parsed, the scene will be raytraced (rendered) according to the render options. Some of these options can be specified on the command line and others can be specified in the scene description file. When a given option can be specified both places, the command-line options will override what is in the scene file.
Rendering can be done using the built-in task distribution infrastructure. Command-line options will be utilized to specify a "hosts file" which can contain a list of hosts to use while rendering the scene. The algorithm is fault-tolerant so that if any of the worker nodes goes down the tasks can be reorganized so other nodes complete what the faulty nodes did not.
I used an object-oriented model for the program. Each displayable scene object is represented by a C++ object which inherits from the Shape base-class, while light objects inherit from a base Light class.
Shape objects implement an intersect() method. Polymorphism is utilized so that when the scene attempts to intersect a Ray with a shape object, it does not need to know the actual type of shape object being intersected with.
The task distribution infrastructure is implemented in a class called distrib. This package contains methods to start up a server thread, launch slave processes on the worker nodes, and connect back from the slave processes to the master server.
The design document for FART is available here.
To create a scalable parser to read scene files according to a grammar that I supplied, I used the bison parser generator. I chose bison because it is very portable and has a C++ interface so I could interface it with the rest of my code. The parser produces a hierarchy of Node objects. Each Node object represents some part of the parse tree obtained from the scene description file. Thus, the parse tree can be traversed after parsing is complete in order to evaluate all of the scene objects specified in the scene file, maintaining node order and parent/child relationships.
I designed the task distribution architecture using a combination of threads and processes to achieve a working master/slave setup. When the program starts up, it will determine whether it is running in distributed mode or not. If it is not running in distributed mode (no --hosts or --host argument was supplied), then the scene is rendered one pixel at a time sequentially. If, however, a --hosts or --host option is present, then the program is running in distributed mode. If a --hosts option is present, then the process is the master process and it reads the list of hosts to use as slave processes from the file specified. If a --host and --port option are provided then the process is a slave process and it connects to the master process using the hostname provided on the TCP port provided.
In distributed mode, the master process creates a server thread. This server thread listens on a free TCP port (chosen by the system) for incoming connections. Every time that a connection from a slave node is made to this port, another connection thread is spawned to handle communication from the master node's side for that slave node. After the master process starts the listen thread, it also forks and execs an ssh process as a sub-process. This ssh process is what connects to the slave node and begins executing a copy of the program at the slave node, informing it of the hostname and port of the master node via the --host and --port command-line options.
When I first implemented the distribution architecture and did a test-run, a slave process was created on each of the hosts that I specified, but only 5-10% of each CPU was being utilized. I ran "netstat -taunp" and saw that the TCP connections had data in their send queues. This meant that the program had already processed what it was asked to process and was simply waiting for the network traffic to be processed. I realized that the TCP implementation was waiting to accumulate data before sending it over the TCP connection. In order to deal with this, I used the setsockopt() call to set the TCP_NODELAY flag to 1. This disabled "Nagle's algorithm" in the TCP subsystem for that socket. This meant that data would be sent as soon as it was available. Setting this option on the communication socket immediately allowed each slave node to begin using pretty much 100% of one of its cores.
My first attempt to utilize each core on the system involved turning on OpenMP with the compiler flag -fopenmp and adding omp parallel for directives in the for loop carrying out the render task. This attempt failed. My program would abort with errors coming from the C library. I used gdb and examined stack dumps and there were pthread and other C memory management functions which were throwing errors. I believe this did not work well because I was manually creating and managing threads with the pthread system in addition to trying to use OpenMP, which was probably implemented by the compiler using pthreads as well.
Because OpenMP did not work to utilize every core on a worker node, I switched to a separate solution. The program was already computing a list of command-line arguments in order to pass to slave nodes, so I made the slave nodes record this list. If a process was the "master slave process" (the first process executed on this slave node) then the program would call n = sysconf(_SC_NPROCESSORS_CONF) to retrieve the total number of processors on the system. Then, the program simply did a fork() and execvp() to execute n-1 copies of itself on the slave node. In this way, one worker process was spawned per core on the slave node.
I had originally planned on implementing fault-tolerance in the distribution architecture by establishing a second TCP connection from each slave node to the master which served as a polling connection to make sure that the slaves were still alive. During implementation, I arrived at a more elegant solution. I was already keeping track of the set of tasks that were considered "in progress" as far as the master process was concerned. If the master process received a request from a slave for a task to work on, it would normally respond with the next available task number until all tasks had been given out, and then it would respond saying that there were no more tasks to work on. I changed this slightly so that if the master got a request from a slave for a task to work on, and all of the tasks were already given out, then the master would respond to the slave with a task ID from the set of tasks that were currently in progress. That way, whether the original slave node or the new one finished the task, the data for it would be collected. If the original node was dead, then the new slave node would take over the task and return the data. If the original node was alive, but just responding very slowly, then the replacement node could finish the task and return the results before the original node. This ended up working very well, as I was able to kill all of the worker processes on a given slave node and the tasks that they were working on were finished by other nodes later on.
I designed the raytracer itself in the first half of the semester and got it working well using a sequential approach. In the second half of the semester I added the task distribution infrastructure which allowed the raytracer to break up the work of rendering an entire scene into tasks of a small size. These tasks were then dispersed among slave processes which were launched using the distribution infrastructure. One slave process was created for every processor on each host involved in the render. I tested this design in the Grand Valley EOS computer lab. A serial render of a given scene on eos01 took 38 minutes. When I rendered it using the distribution infrastructure on all 24 nodes (64 cores), it took about 38 seconds. This is a speedup of about 60 times.
I was able to implement my program very closely with my original design. There are some features related to the raytracer (not the distribution part of it) that I was not able to implement, but that I am planning to add at a later time. Such features include other Light types such as a directional light, a jitter parameter to lights that can be used to render soft shadows, a scene pre-parser that would allow shape definitions and looping with variables to create patterns of objects, texture support on the surfaces of objects, and light refraction through semi-transparent objects.
The current method to utilize each core of the worker nodes involves the main process doing a fork() and execvp() n-1 times, where n is the number of processors detected on the slave node. This has the disadvantage that the scene-file is parsed and initialization is repeated for each of these processes. This time could be saved by an implementation that instead created more threads with pthread_create() after parsing the scene file and building the scene, and then used these threads to split up the work for a given task.