26th November 2013

Multi Processing Part 3: Jumping the Drupal Queue

John Ennew
Technical Director

In Part 1 of this series, we introduced the multi threading handler for Drush. In Part 2, we saw how the handler could be used to process migrations in parallel in order to speed it up. In Part 3, we look at a use of the handler on the Drupal Queue sub system. The Drupal Queue is used for storing items for later processing. Each item placed on the Queue is a potential work package which something else must come along and process later. Typically you would write a Drush script or hook_cron implementation to handle this. If you do it with Drush, then it can be enhanced with the Drush multi threaded handler which will potentially allow faster completion. What we are doing is basically 'opening more checkouts' - turning one long queue into several smaller queues. Note that this process is only useful if you do not care about the order the items in the Queue are processed in, you cannot predict the order the threads will process items in. An example implementation is provided in the github code repository for this project in the file mtq.drush.inc. Put this in your sites/all/drush directory with the mt.drush.inc file and clear cache drush cc all. You can then run the example multi threaded Queue processor with: drush mtq-process --threads=2 --limit=100 This code populates an example Queue called 'mtq' with 100 items and then processes them in parallel with two threads. As with the previous migrate example in Part 2, don't go crazy with the number of threads. You will need to balance the best combination of parameters for your particular application. We've found that eight typically works the best on a single server but you might be able to handle more depending on your setup. As the number of threads increases, the bottle neck tends to move into the database, so watch your DB server for signs of high load.

How to guide

The code is composed of two Drush commands: 

/**
 * Implements hook_drush_command().
 */
function mtq_drush_command() { 
  $items = array();
  $items['mtq-process'] = array(
    'description' => 'A command to process items on the Queue.', 
    'options' => array(
      'threads' => 'The number of threads to use',
      'limit' => 'The total number of jobs to put on the Queue',
      'batch_size' => 'How many items from the Queue to process in each thread. Default is 10.', 
    ),
  );
  $items['mtq-consumer'] = array(
    'description' => 'A single consumer process.',
    'arguments' => array(
      'limit' => 'The number of jobs to process on the Queue',
     ),
  ); 
  return $items; 
} 

mtq-process is the multi threading command which manages the threads. mtq-consumer is the code for one thread which processes items from the Queue. 

/**
  * Process the Queue, multi threaded. 
  */ 
function drush_mtq_process() { 
  $time = time();
  $threads = drush_get_option('threads', 1); 
  $limit = drush_get_option('limit', 100); 
  $batch_size = drush_get_option('batch_size', 10); 

  // Populate the Queue with dummy data. 
  _mtq_populate_queue($limit); 
  $queue = DrupalQueue::get('mtq', TRUE); 
  $queue_size = $queue->numberOfItems(); 
  try { 
    drush_print("Going to work on {$queue_size} items from the Queue with {$threads} threads..."); 
    drush_thread_manager($queue_size, $batch_size, $threads, '_mtq_setup'); 
  } catch (Exception $e) { 
    drush_set_error($e->getMessage()); 
  } 
  $time = time() - $time; 
  drush_print("Time taken with {$threads} threads: {$time} seconds");
 } 

/**
  * A multi threaded setup function. 
  * 
  * @param int $thread_id 
  *   An identifier for the thread which will execute this command. 
  * @param int $batch_size 
  *   How many tasks this command should work on. 
  * @param int $offset
  *   The position in a Queue of jobs for the first job this command 
  *   should work on.
  *
  * @return string
  * A command which can be executed on the command line. 
*/ 
function _mtq_setup($thread_id, $batch_size, $offset) { 
  return "drush mtq-consumer $batch_size";
} 

This is the code for the multi threaded Drush command you invoke yourself. The function _mtq_populate_queue here would not be needed in your own implementation and is provided as an example of how to add items to the Queue. The result of the function gets the size of the Queue and tells the multi thread handler to work on that many items from the Queue in small batches. A batch is the number of items from the Queue a thread will work on before finishing. 

/** 
  * Consume items from the Queue. 
  *
  * @param int $limit 
  *   The maximum number of items to consume. 
  */ 
function drush_mtq_consumer($limit = 10) { 
  $queue = DrupalQueue::get('mtq', TRUE);
  for ($count = 0; $count < $limit; $count++) { 
    if ($item = $queue->claimItem()) {
      $transaction = db_transaction(__FUNCTION__); 
      try { // Do something with the item. 
        _mtq_process_item($item->data); 
      } catch (Exception $e) { 
        $transaction->rollback(); 
        drush_set_error($e->getMessage()); 
        return; 
      } 
      // Unset the transaction to force a commit.
      unset($transaction); $queue->deleteItem($item);
    }
  } 
} 

/**
 * Process an item on the Queue.
 */ 
function _mtq_process_item($data) { 
  drush_print("Processing {$data['id']}"); 
  usleep(rand(100000, 1000000)); 
}

Here we see the code for the individual consumer thread. This takes the next item from the Queue and passes it to a helper function to process. The db_transaction inside the drush_mtq_consumer function is a useful pattern for ensuring the message and associated data is preserved on the Queue in the case of failure. In most applications, the only thing you need to replace in the code above is the helper function. This just deals with a single item from the Queue. The rest of the code is all boiler plate to support multi threading Queues. As ever, let us know how you get on and share your thoughts and ideas below. Happy threading!