Editor's Note: This article is the second part of a two-part series about how to use parallelization techniques in Perl scripts. You can download the Process Farm files from the Win32 Scripting Journal Web site. See Part 1 for installation instructions.
In Part 1 of this series, I demonstrated how to install and use the Process Farm system to speed up Perl scripts that can benefit from parallel execution (i.e., parallelization). The Process Farm system lets you start a pool of child processes and control them with one parent process. The parent process assigns jobs to the child processes in a round-robin fashion and collates the results. In Part 2, I explain how the Process Farm system works.
Figure 1 shows an overview of the Process Farm system. The MyRpcPool object contains, among other items, an array of references to MyRpcParent objects. The MyRpcParent objects communicate over TCP sockets with their respective child processes. The combination of the MyRpcParent code and the functions within MyRpcChild.pl constitutes the asynchronous remote procedure call (RPC) system that lets MyRpcPool communicate with the child processes. MyRpcPool provides a simple interface for starting a pool of child processes and having those processes execute jobs.
The RPC system enables asynchronous startup of the child processes and asynchronous command execution. The RPC system also ensures reliable destruction of the child processes when the parent process terminates, even if the parent process doesn't run to completion. Because the RPC system is the core of the Process Farm system, I discuss the interaction between a MyRpcParent object and its associated child process in detail. However, before I delve into the details, here's an overview of how the MyRpcParent and MyRpcPort objects function.
The MyRpcParent Object
As Figure 2 shows, the MyRpcParent object supports five methods: new_async, connect, execute, get_state, and get_retval. The new_async method creates a new MyRpcParent object and prompts creation of a child process. At this point, a communication pipeline between the parent object and the child process doesn't exist—a state referred to as the init state. The connect method tells a MyRpcParent object to accept an inbound connection from the child process and moves the parent object into the idle state. With the connection in place, the child process is ready for a job assignment. The execute method prompts the MyRpcParent object to tell the child process to perform a given command, causing the parent object to enter the wait state.
Calls made to the get_state method on a MyRpcParent object in the wait state first check to see whether the child process has finished executing the command. If the child process has finished, the method places the object in the fin state and returns fin. (Calls to the get_state method return the state that the MyRpcParent object is left in.) If the child process isn't finished, get_state leaves the MyRpcParent object in the wait state and returns wait. When in the fin state, the MyRpcParent object is ready to retrieve the results the child process transmitted. The get_retval method retrieves those results and places the MyRpcParent object back in the idle state, where it's ready to execute another command.
The MyRpcPort Object
Before you can create a MyRpcParent object, you must create a MyRpcPort object. This object creates an IO::Socket object to listen for incoming connections.
The MyRpcPort object supports three methods: new, get_port_num, and get_next_connection. The new method requires two parameters: the port number to listen on and the number of listeners to create. The second parameter is crucial because it limits the number of child processes that the parent process can start before the MyRpcParent objects must accept the inbound connections.
The get_port_num method provides an external interface to the internals of the MyRpcPort object. Standard object-oriented design methodology dictates that you never access object internals directly but instead use appropriate external interfaces, such as the get_port_num method. External interfaces provide more flexibility in changing the object's internals and offer more protection—as long as the interface doesn't change, the code won't break.
The get_next_connection method calls the accept method on the IO::Socket object. The accept method returns the resultant socket handle (i.e., a bidirectional filehandle representing a network connection). The gen_next_connection method verifies that the connection is from the same computer (the loopback IP address 127.0.0.1) and then returns the handle.
The Asynchronous RPC System Details
Figure 3, page 12, is a process flow diagram illustrating what occurs when a MyRpcPool object calls the various methods that the MyRpcParent object supports. (However, the MyRpcPool object isn't the only code that can call the MyRpcParent object. The MyRpcParent object is independent of the code that calls it.) Here's what is happening in each of the steps that Figure 3 depicts:
Step A. The MyRpcPool object calls the MyRpcParent::new_async class method, which first creates a new MyRpcParent object and initializes its instance variables. The method then creates a Win32::Process object, which spins off a child process, passing it the TCP port number and a unique identifier. Creating a Win32::Process object is an asynchronous process—Win32::Process::new returns before the child process has even finished loading Perl.exe, let alone the Perl script. The new_async method then places the MyRpcParent object in the init state and returns. (Perl methods, functions, and subroutines return when they have finished executing. Sometimes they return a value or a list of values; other times they just return.)
Step B. While the parent process is spinning off more child processes, the first child process calls and starts the init subroutine, which MyRpcChild.pl defines. The child process creates an IO::Socket object and tells this object to connect to the specified TCP port number on the local host. At this point, the child process blocks (i.e., sits idle) until the parent process accepts the connection.
Step C. The MyRpcPool object calls the connect method on the MyRpcParent object, which causes the MyRpcParent object to call the get_next_connection method on its MyRpcPort object. MyRpcPort::get_next_connection, in turn, calls the accept method on its IO::Socket object and returns the resultant socket handle. (The MyRpcPort object is a wrapper for the IO::Socket object.)
At this point, there is no guarantee that the resultant socket handle will communicate with the first child process that Win32::Process spun off in step A. The IO::Socket object returns socket handles in the order in which the child processes connect to the listening port. However, if a particular child process is momentarily starved for CPU, the child process might be delayed and thus out of order with its brethren.
One situation in which you might want to have the Win32::Process object and the socket handle point at the same child process is if you're supporting timeouts. With timeout support, if a child process takes too long to execute a given command, the parent process terminates the child process and spins off a new one. However, if the socket handle and Win32::Process objects point at different child processes, killing the correct child process is difficult.
One approach that would ensure correspondence between the socket handle and the Win32::Process object is to use synchronous startup. With synchronous startup, the new method doesn't return until the child process finishes connecting to the parent object. Because only one child process starts up at a time, you can be reasonably certain that the Win32::Process object and the socket handle point to the same child process. However, when I performance tested this approach, I experienced a 75 percent increase in startup time, which is clearly undesirable.
A better approach—and the approach I used in the Process Farm—is to pass each child process a unique identifier. The MyRpcParent object maintains a global class variable, and each call to the new_async method passes the current value to the child process on the command line and then increments the variable. After spinning off a new child process, the parent process places the returned Win32::Process object in a global hash, indexed on the passed unique identifier for later retrieval. The child process returns the unique identifier when it connects back to the parent process, which can use the identifier to retrieve the appropriate Win32::Process object from the global hash.
Step D. The child process returns from creating the IO::Socket object and continues executing the init subroutine. The child process then sends the unique identifier to the MyRpcParent object. To send the identifier, the child process first uses the pack function to encode the integer as a 4-byte Little Endian value and then uses the print function to write that value to the socket handle. (Little Endian is a way to store numbers that puts the least significant byte first—i.e., VAX order.) Because Perl buffers the print operation, that operation is effectively asynchronous. After the child process executes the print function, it proceeds to step F, where it enters the main_loop subroutine.
Step E. The connect method call on the MyRpcParent object continues by retrieving and unpacking the Little Endian value from the socket handle. The method then retrieves the appropriate Win32::Process object from the global hash and stores it in an instance variable. Finally, the connect method places the MyRpcParent object in the idle state and returns.
Step F. The child process enters the main_loop subroutine, which initiates the read function on the socket handle. The child process is waiting for a message from the parent process that specifies what command to execute. The read function blocks until the child process receives the message.
Step G. After the MyRpcPool object finishes connecting to all the child processes, the user-supplied code in the parent process submits a list of jobs and then instructs the MyRpcPool object to execute those jobs by calling the do_all_jobs method. This process results in the MyRpcPool object calling the execute method on one of its MyRpcParent objects. The execute method encodes the message containing the command and its associated parameters and uses the print function to write the message to the socket handle. The print function is effectively nonblocking, so the MyRpcParent object quickly enters the wait state and the execute method returns.
Step H. The child process retrieves the message from the socket handle and decodes it. The child process then begins executing the custom subroutine, which typically takes a while to execute. If it didn't take a while, there wouldn't be much point in placing it in the child process.
Step I. While the child process is executing the custom subroutine, the MyRpcParent object might receive numerous get_state method calls from the MyRpcPool object. Because the MyRpcParent object is in the wait state, the get_state method calls the four-argument form of the select function on the socket handle, which determines whether the handle contains any buffered data. If the socket handle is empty, the MyRpcParent object remains in the wait state. If the socket handle contains data, the MyRpcParent object enters the fin state. In either case, the get_state method returns whatever state the MyRpcParent object is in.
Step J. When the custom subroutine finishes executing, the main_loop subroutine serializes the custom subroutine's returned values and prints the result to the socket handle. The child process then loops and makes another call to the main_loop subroutine.
Step K. Now that the socket contains a message, the MyRpcPool object's calls to the MyRpcParent object's get_state method notice the presence of buffered data in the socket and therefore return fin. The MyRpcPool object now knows that the child process has finished executing the current command and that the MyRpcParent object can retrieve the returned data when the get_retval method requests it.
Step L. The get_retval method call causes the MyRpcParent object to retrieve the encoded returned values from the socket handle. The MyRpcParent object then enters the idle state (which signals that it is ready to accept another command) and returns the retrieved values.
The Asynchronous RPC System Protocol
In steps F and J, I mentioned that the processes exchange messages, but I didn't give any details about those messages. The socket connection between these two processes behaves like two pipes, each one transporting bytes from one process to the other. To transfer Perl objects between the two processes, I use the Data::Dumper module, which comes standard with Perl. This handy module takes in Perl objects and returns Perl code that reproduces them. Another Perl process can then take that code and run it through the eval function to get the original Perl objects. However, when you use the Data::Dumper module, you need to be aware of the security implications of an unguarded eval function, which the sidebar "An Inherent Danger in Data::Dumper" discusses.
The child process needs to know how many bytes to read out of the socket handle, so you need to specify how long the message is. In Process Farm messages, the first 4 bytes are a Little Endian integer representing the length of the Perl code that follows. The child process first reads 4 bytes out of the socket handle, unpacks them to retrieve the integer, then reads that many more bytes out of the socket handle. The child process passes the text string it read out of the socket handle to the eval function to produce the original data. For example, a message telling the child process to execute the ping command on the IP address 220.127.116.11 looks like
0x51000000$command = 'ping'; $ptr2params = \[ _18.104.22.168' \];
where 0x51000000 is a hexadecimal representation for the 4-byte Little Endian integer. If you print the message, it contains the letter Q (0x51 is Q in ASCII) followed by three NUL characters. Evaluation of this message results in the creation of two variables: $command, which contains the name of the command to execute, and $ptr2params, which contains a reference to an anonymous array of the parameters.
The return message from the child process to the parent process might look like
0x35000000$ptr2retval = \[ '0' \];
Evaluation of this message results in the creation of one variable, $ptr2retval, which contains a reference to an anonymous array listing the returned values from executing the custom subroutine.
The Termination Process
Although Figure 3 doesn't depict the termination process, it is an important part of the RPC system. When the parent process exits, you need to ensure that no child processes remain. Otherwise, they might drain system resources until you reboot the computer.
The Process Farm system implements an object destructor, the DESTROY method, to ensure child process termination, even if the parent process prematurely terminates (for example, through a Ctrl+C call). Whenever a Perl object's reference count drops to zero or a Perl script ends, the system calls the DESTROY method. This method provides a reliable way for objects to clean up after themselves.
The DESTROY method first checks to see whether any Win32::Process objects remain in the global hash. Objects might remain if, for example, the person running the program called Ctrl+C while MyRpcPool was initializing the pool. If any objects remain in the hash, the DESTROY method waits 1 second for them to exit and then terminates them if they are still present. The DESTROY method empties the global hash to ensure that additional calls to the method won't founder on attempts to terminate objects that no longer exist.
After attending to any incompletely initialized child processes, the DESTROY method turns its attention to the object on which it was called. The method first closes its socket handle. At this point, a child process is most likely waiting for a command to execute. When the DESTROY socket closes, the read function that is blocking the child process returns the value of 0, which represents the number of bytes it read. In the main_loop subroutine, the code that reads commands takes this value as a sign that the parent process is shutting down and calls the exit function. The DESTROY method waits up to 1 second for the child process to exit; if the child process still remains, the method terminates that process.
The MyRpcPool Module
The MyRpcPool module is anticlimactic compared with the supporting RPC system. The MyRpcPool object consists of scalars (namely, the number of processes and a reference to the associated MyRpcPort object), a hash to store the returned data from the finished jobs, and two arrays that keep track of the child processes and the jobs waiting to execute. The thread_pool array consists of references to hashes that contain two crucial attributes (key and RpcParent) about each child process. The key attribute contains the index that will store the returned data in the return_data hash. The RpcParent attribute contains a reference to a MyRpcParent object. The waiting_pool array consists of references to hashes that store the keys, commands, and parameters supplied with the add_waiting_job methods.
MyRpcPool supports 10 methods: new, add_waiting_job, count_waiting, count_running, get_return_data, clear_return_data, cleanse_pool, dispatch_jobs, cleanse_and_dispatch, and do_all_jobs. The new method first creates the MyRpcPool object and then creates a MyRpcPort object based on the port number and number of processes requested. The new method calls the MyRpcParent::new_async class method with the appropriate parameters and populates the thread_pool array. Finally, the new method loops through the thread_pool array and calls the connect method on each MyRpcParent object.
The add_waiting_job method adds another element to the waiting_pool array. The count_waiting and count_running methods, respectively, return the size of the waiting_pool and the number of running entries in the thread_pool. The get_return_data and clear_return_data methods provide access to the return_data area of the object.
The cleanse_pool and dispatch_jobs methods do the dirty work. The cleanse_pool method loops over the thread_pool, looking for MyRpcParent objects that are in the fin state. When the method finds one, it retrieves the return data with the get_retval method and uses the key to store the data in the return_data hash. The dispatch_jobs method loops over the thread_pool, looking for MyRpcParent objects in the idle state. When the method finds one, it assigns them a job from waiting_pool. If the method runs out of jobs to assign, it stops checking the pool.
The cleanse_and_dispatch method combines the previous two methods to improve performance. As this method cleanses the jobs from the thread_pool, it dispatches new ones from the waiting_pool. Finally, the do_all_jobs method makes repeated calls to cleanse_and_dispatch until no more jobs are waiting or running.
Achieve Improvements in Your Scripts
The trick to tackling any large job is breaking it down into smaller jobs. The Process Farm system does just that—instead of one process performing all the jobs, several processes perform the jobs simultaneously. As a result, the Process Farm system can help you achieve significant improvements in execution speed. And because the system is easy to use and versatile, you can apply it to any script that can benefit from parallelization.