24th October 2013

Multi Processing Part 1: How to make Drush rush

John Ennew
Technical Director

In this first of two posts I'm going to focus on concurrency, also known as multi threading. This is the process of splitting up a program enabling it to be processed in parallel. Drush applications in PHP are single threaded and so are not using the maximum efficiency of the systems they are running on. On short running applications this is hardly noticeable but on long running applications this inefficiency adds up.

Migration example

A good example is a migration of data from a remote system into Drupal. Migrate will work on the records one at a time, requesting the source data, processing it when it is received and finally saving into Drupal before requesting the next record. When analysing what happens in this single threaded operation we see most of the time the migration program is waiting for something else to respond and is not working on something useful. Wouldn't it be great if whilst it is waiting for record 2 to arrive it could be saving record 1 into Drupal? So how about whilst it is waiting for record 1 to be saved to the Drupal database, it could already be asking for record 3?

The solution: Introducing the multi thread handler for Drush

So by multi threading we make use of potentially 'empty time'. In addition, concurrent applications take advantage of the multiple cores in modern servers. This means cores that were previously sitting idle are now working on bits of 'the problem'. The way multiple threads are generated is by executing a new processes via the PHP function proc_open. This means we have one Drush command to start the multi threading process with another Drush command which is then run multiple times with different parameters to deal with a part of the total problem. Tying this together is the multi threaded handler, or scheduler, which starts threads and watches them to see if they have completed. Once a thread has completed, the handler looks to see if there is any more work to do and then starts up another thread if there is, passing the necessary parameters into it. You can get the code for this from github.

Tutorial on how to use the multi threaded handler

Before we look at speeding up migrate, which has several interesting challenges itself to discuss, let's look how the multi threaded handler for Drush works through two example problems. The first is a very basic test application, the second is the archetypal concurrency problem in computer science, The Dining Philosophers.

Preparation

You will need a local install of Drupal 7, a folder at sites/all/drush and the mt.drush.inc file inside the sites/all/drush folder. Clear the cache to let Drush pickup the new file.

Simple multi threaded Drush command

The first example simply executes a command which sleeps for a random number of seconds before finishing. Run

drush help mt-test

to see the options for this command.

A test command for testing multi threading. Arguments: limit The total number of jobs Options: --threads The number of threads to use 

So if we run

drush mt-test 6 --threads=2

we say we want to work on 6 tasks using 2 threads. When run, each thread is an execution of the drush command drush mt-pause. Once a thread has completed this command, the parent drush command (mt-test) sees it has finished and checks to see if there are any more of the 6 original tasks which need doing. If there are, another thread is created which runs the drush mt-pause command again. Lets look at the functions which make this up in more detail. First, we have the hook_drush_command ... 

  /**
 * Implements of hook_drush_command().
 */ 
function mt_drush_command() { 
  $items = array(); 
  $items['mt-test'] = array( 
    'description' => 'A test command for testing multi threading.', 
    'arguments' => array( 
      'limit' => 'The total number of jobs', 
    ), 
    'options' => array( 
      'threads' => 'The number of threads to use', 
    ), 
    'bootstrap' => DRUSH_BOOTSTRAP_DRUPAL_ROOT, 
  ); 
  $items['mt-pause'] = array( 
    'description' => 'A test command for use with drush mt-test. Waits between 1 - 5 seconds before finishing', 
    'arguments' => array( 
      'name' => 'The name of this process, this will be the thread id', 
    ), 
    'bootstrap' => DRUSH_BOOTSTRAP_DRUPAL_ROOT,
  );
  return $items; 
} 

This should be fairly straight forward for anyone who has written Drush commands before. We are telling Drush that there are two new commands drush mt-test and drush mt-pause. The pause command is what is run by a single thread and looks like this: 

  /**
  * Pause for between 1 and 5 seconds. 
  */ 
function drush_mt_pause($name) { 
  $pause_time = rand(1, 5); 
  drush_print("Thread $name is working for $pause_time seconds ..."); 
  sleep($pause_time); 
} 

The drush mt-test command is the master command which needs to setup the threads. I provide a cut down version below with the error handling noise removed for clarity :- 

  /**
 * Multi threaded example.
 *
 * This is a very simple example of multi threaded management.
 * Each thread will be told to run the drush command:
 * drush mt-pause
 * This waits between 1 and 5 seconds before finishing.
 */ 
function drush_mt_test($limit = 10) { 
  $threads = drush_get_option('threads', 1);
  drush_print("Going to work on {$limit} jobs with {$threads} threads..."); 
  drush_thread_manager($limit, 1, $threads, '_mt_test_setup');
} 

Here we see that there is not a lot going on for the basic test command. We work out what the total number of threads we want is and tell the drush_thread_manager to start. Lets look at the function signature of the drush_thread_manager to understand what those arguments to it are: 

  /**
 * A multithreading manager for Drush.
 *
 * @param int $job_count 
 *   The total number of jobs to process
 * @param int $job_batch_size
 *   The number of jobs handed to each thread 
 * @param int $thread_count
 *   The maximum number of active threads
 * @param string $setup_function
 *   The function to generate the thread command. This is written for your 
 *   application its signature is: 
 *   my_setup_function($thread_id, $limit, $offset) : string; 
 *   Where $thread_id is the unique thread identifier, $limit is the total 
 *   number of records to process and offset is the starting point. The 
 *   function should return a string which is a shell command that the thread 
 *   will execute in a separate process. * @param string $teardown_function 
 *   A function which runs when a thread completes. It's signature is: 
 *   my_teardown_function($thread_id); 
 *   Your business logic can then run any cleanup which occurs when the 
 *   thread completes. 
 */
function drush_thread_manager($job_count, $job_batch_size, $thread_count, $setup_function, $teardown_function = ''); 

So it wants to know:

$job_count - the total number of things to work on
$job_batch_size - How many things each thread will work on in one execution, for the example application this is 1.
$thread_count - The maximum number of threads to use. It will never spawn more than this value to complete the job queue.
$setup_function - A function to run to generate the command which the thread executes
$teardown_function (optional) - A function which is run when a thread finishes
So the last thing we need to do for the example application is provide the setup function which creates the command which a thread will execute when it starts. 

  /**
 * A test multi threaded setup function.
 *
 * @param int $thread_id
 * An identifier for the thread which will execute this command.
 * @param int $batch_size
 * How many tasks this command should work on.
 * @param int $offset
 * The position in a queue of jobs for the first job this command
 * should work on.
 *
 * @return string
 * A command which can be executed on the command line. 
*/ 
function _mt_test_setup($thread_id, $batch_size, $offset) { 
  return "drush mt-pause $thread_id";
 } 

Here we see the function signature for the setup functions. They all will take the variables $thread_id, which is the identifier for the thread (a number from 0 to the maximum number of threads), $batch_size is the number of jobs the command should work on before finishing, for the example program this is 1. If we are running batches of jobs per thread then then thread will also need to know where in the list of jobs to start working on which is the $offset. For this example, this is not used. Hopefully this gives you a good overview of what is need to build a multi threaded Drush application. If you survived so far, lets look at something more complex...

The Dining Philosophers

This is an archetypal problem in computer science often used to teach concurrent techniques and highlight the need for methodologies to deal with multiple threads competing for the same resources. This is the problem of thread 1 and thread 2 trying to change the same thing at the same time. In the dining philosophers, imagine a circular dining table with 5 philosophers and 5 forks laid out such that there is one to the left and right of each philosopher. The philosophers do two things, think and then eat. In order to eat, a philosopher needs to pick up two forks, the one on their left and the one on their right. Philosophers do not release any fork they pick up until they have eaten. Because there are not enough forks for every philosopher to have two each, they are competing for the same resources. If philosopher 1 picks up the fork to their left, philosopher 2 cannot pick it up and so cannot eat until philosopher 1 has eaten and put down the forks to think. If all the philosophers pick up one fork, no philosopher will ever eat and they will stave to death - this is called a deadlock. To solve the problem, each fork is numbered from 1 to 5. Philosophers must always try and pick up the lowest number fork first. For all but the last philosopher this is the one on their right. For the last philosopher, between forks 5 and 1, this is the fork on their left. Edsger Dijkstra, the computer scientist who came up with this problem has proved that such a method will prevent deadlock. So, let us look at the code and see how we can implement locking using Drupal standard functionality to stop a thread from working when another thread has gained access to a resource. First we define two drush functions, as before, one is the master program and the other represents the sub threads (the philosophers in this case).

   /**
 * Implements of hook_drush_command().
 */
function mt_drush_command() {
  $items = array();
  $items['mt-dining-philosophers'] = array(
   'description' => 'Run an example of the dining philosphers problem',
   'arguments' => array(
     'number_of_philosophers' => 'The number of philosophers, must be at least 2', 
     'total_mouthfuls' => 'The number of times a philosopher eats before finishing',
     ), 
   );
  $items['mt-philosopher'] = array(
    'description' => 'A single philosopher. This is used by the drush command mt-dining-philosophers and is not intended to be run on its own', 
    'arguments' => array( 
      'name' => 'The name of this process, this will be the thread id', 
      'number_of_philosophers' => 'The number of philosophers, must be at least 2', 
      'total_mouthfuls' => 'The number of times the philosopher eats before finishing',
    ),
  );
  return $items; 
}

As we can see from the command descriptions, to run the program we would type drush mt-dining-philosophers 5 5 The master drush command looks like this (with the error checking clutter removed) ... 

  /**
 * An implementation of the dining philosophers problem.
 */
function drush_mt_dining_philosophers($number_of_philosophers, $total_mouthfulls) {
  $GLOBALS['number_of_philosophers'] = $number_of_philosophers;
  $GLOBALS['total_mouthfulls'] = $total_mouthfulls;
  drush_thread_manager($number_of_philosophers, 1, $number_of_philosophers, '_mt_philosophers_setup');
}

The thread setup function which generates the command to be a philosopher then looks like this ..

   /**
 * Thread setup function for dining philosophers. 
 */
function _mt_philosophers_setup($thread_id, $batch_size, $offset) { 
  $number_of_philosophers = $GLOBALS['number_of_philosophers']; 
  $total_mouthfulls = $GLOBALS['total_mouthfulls']; 
  $cmd = "drush mt-philosopher $thread_id $number_of_philosophers $total_mouthfulls"; 
  return $cmd;
}

Notice the use of $GLOBALS to pass information from the master command into the setup function. I've kept the implementation of this very procedural in line with the Drupal 7 architecture. A Drupal 8 implementation would ideally pass an object which implemented a threadExecutor interface into the drush_thread_manager and probably make use of the Symfony Process component, but that's a discussion for the future. The setup of the master function is very simple. The thread concurrency and resource management therefore happens in the threads themselves in this case and so they need to be aware that they are being multi threaded. Lets look at the code for the drush mt-philosopher command. 

  /**
  * A single philosopher. 
  */
function drush_mt_philosopher($name, $number_of_philosophers, $total_mouthfulls) { 
  $last_philosopher = $name == ($number_of_philosophers - 1);
  $low_fork = $last_philosopher ? 0 : "df_fork_{$name}"; 
  $high_fork = $last_philosopher ? "df_fork_{$name}" : 'df_fork_' . ($name + 1); 
  for ($count = 1; $count 

Here we see the first three lines decide which is the high or low fork for the philosopher. For most, this is the one with the same value as themselves (philosopher 1 picks up fork 1 first). For the last philosopher, this is the first fork (which has the value 0). Then for each mouthful they need to eat, the philosopher thinks then tries to eat with their two forks. Lets see how these two functions look ... 

  /** 
  * A philosopher thinks. 
*/ 
function _mt_philosopher_think($name, $count, $total_mouthfulls) { 
  $think_time = rand(1, 3); 
  drush_print("Philosopher $name is thinking... ($count/$total_mouthfulls)"); 
  sleep($think_time); 
} 

/** 
  * Philosopher tries to pick up two forks and eat. 
  */
function _mt_philosopher_eat($name, $low_fork, $high_fork, $count, $total_mouthfulls) { 
  $eat_time = rand(1, 3);
  _mt_philosopher_get_fork($name, $low_fork);
  _mt_philosopher_get_fork($name, $high_fork);
  drush_print("Philosopher $name is eating... ($count/$total_mouthfulls)"); 
  sleep($eat_time); 
  lock_release($low_fork); 
  lock_release($high_fork);
} 

Thinking is just a random wait. When eating the philosopher tries to pick up first their low value fork then their high value fork. They then eat (a random wait) and finally release the forks. Here we see the core Drupal 7 function lock_release($lock_name) releasing a lock so other threads might later acquire it. The last thing to see is the way the philosopher acquires the lock on the fork. 

  /** 
  * A philosopher picks up a fork. 
  */ 
function _mt_philosopher_get_fork($name, $fork_name) { 
  $waiting = TRUE; 
  $count = 0; 
  while ($waiting) { 
    if (lock_acquire($fork_name)) { 
      $waiting = FALSE; 
    }
    else {
      $count++; 
      if ($count > 30) { 
        throw new Exception("Philosopher $name starved to death waiting for fork $fork_name"); 
      }
      sleep(1); 
    } 
  }
} 

The philosopher repeatedly tries to get a lock on the fork using the Drupal 7 core function lock_acquire($lock_name). If this returns FALSE, another philosopher is using the fork. The philosopher waits 1 second and tries again. If he tries 30 times, this is a deadlock and the philosopher starves to death.

Coming next - Multi threading Part 2: making migrate move!

I hope these examples provide a good introduction to both concurrency and our Drush handler to provide it. In our next blog we will look at the promised practical example of multi threading migrate using this technique and the benefits and challenges that presents. Go to part 2 - How to make migrate move Picture credit: Benjamin D. Esham / Wikimedia Commons