Tuesday, January 22, 2013

Worker Queue System

In the good old days, Polyvore started out with a very simple, traditional LAMP architecture. Everything more or less directly accessed a bank of MySQL servers, both to service web requests and also batch housekeeping jobs. But our traffic and data set has kept growing and growing. We started to experience massive spikes in our DB load when nightly batch jobs were kicked off. Jobs that would complete in 3 minutes started to take 5 hours.

Fortunately, our infrastructure team has a ton of experience dealing with scalability issues. Their solution was to use a worker queue system to break up massive jobs into smaller chunks, executed by a bank of workers. This approach allows us to utilize our machines better by spreading load throughout the day and also scale jobs by parallel execution of chunks on different worker machines.

Since we used RabbitMQ before, we considered using it as the building block of our worker queue system as well. However, we quickly found out that RabbitMQ falls short in two important aspects:

  • RabbitMQ is essentially a generic message bus. This means we needed to extend its functionality to make it a full fledged worker queue system.
  • If a task is added to a RabbitMQ queue, there is no native way of inspecting that task while it is queued. That means we won’t be able to check which worker is handling that task, dedupe tasks and more.

After a bit of research we decided to use Gearman. Gearman is a generic framework to farm out work to other machines or processes. It was a great fit for our needs, especially since we use Perl extensively here in Polyvore, and Gearman has a Perl client and APIs. In addition, we already deployed and use Cassandra in production, and Gearman integrates well with Cassandra as its persistent storage.

Our implementation ended up being a light wrapper for Gearman. Our API is very simple:

A way to push a new task onto a named queue:

$queue->send_task({ channel => 'xzy', payload => $payload });

And a worker for consuming tasks in a given queue:

package Polyvore::Worker::XYZ;
use base qw(Polyvore::Worker);

# process is called with each task on the queue
sub process {
    my ($self, $payload) = @_;

    # do stuff

    # driver will auto-retry a few times
    die $exception if ($error);
}

# instantiate a worker and attach to channel to start consuming tasks.
Polyvore::Worker::XYZ->new({ channel => 'xyz'})->run();

The driver for workers will automatically retry the task a few times (with progressive back-off) if an exception occurs. This is very handy in the world of finicky Facebook, Twitter, etc… APIs. The worker system is integrated with our stats collection system which keeps track of jobs processed, time per task, exceptions and more.

We implemented other useful features as well; we have a worker manager that distributes the worker processes/tasks based on the workers cluster load and queue lengths. It is preferable to have a longer pending task queue than overloading the workers cluster. We also implemented a dependency protocol where a worker task can declare itself as dependent on other tasks in the system. That worker task won’t execute until all of its dependencies are complete.

We use the worker queue system both for scaling our backend processes and also for performing asynchronous front-end tasks. For example, our users post a ton of content to external services. Done synchronously, these post operations can hold up the response anywhere from 5 to 30 seconds (or fail entirely and have to be retried). Using the worker queue system we are able to perform these tasks asynchronously in the background and deliver a very responsive user experience.

Today, we have over 40 worker processes and handle over 18 million tasks per day.

Challenges

Sharding batch jobs

We started out by writing simple jobs that got all their data in one SQL statement. That worked for a while until the number of rows in the DB grew to the point that the select would make our read slaves keel over. We have since been sharding our jobs so that they can operate in smaller chunks, typically by operating over id ranges. Instead of processing all 1M rows, we break up the job into 1000 1K id ranges and treat each range as a task for the worker system.

Sharding to even out load based on data density

Some of the problems we solve using our worker queue system have interesting characteristics. These problems require us to split the data into buckets which are not necessarily equal in size in order to maximize efficiency. For example, we use machine learning to categorize items that we import into our index. We do incremental categorization for new items, but we also re-categorize older items that have been changed recently. Since the distribution of updates is biased toward newer items, we create inverse-log sized data buckets to even out the processing time for each group of items. This gives us larger buckets (~10M items) of old items (with few changes), and smaller buckets (~10K items) of new items (with more changes).

Testing

We have a great development environment which allows us to have multiple checkouts to work in. Each checkout can be previewed against development and production databases. We also have a per checkout test environment which allows us to run our test suite against a particular checkout, with its own isolated mysqld instance, Cassandra instance, etc… We also have per user worker queue in unit test.

Worker Queues vs. Map/Reduce

We use Hadoop for big data analysis. Currently we use it for a specific analysis we do on a subset of the data we have. However, we plan to expand our Hadoop deployment so that we can do more batch analysis on a larger portion of our data and in a lot of other use cases. Obviously Hadoop allows us to analyze our data in ways we couldn’t have done before. Using Hadoop also raises the question of which tasks are best suited for our worker queue system and which tasks will benefit more from map/reduce.

Our worker queue system is great for procedural tasks such as user notifications, user emails, posting on Facebook wall, or extracting meta-data from uploaded images. In addition, we use our worker queue system for scheduling Hadoop jobs. All of those tasks are asynchronous, independent and sometimes require a retry mechanism.

Summary

Worker queues are a great way to scale batch jobs, and increase the utilization of computation resources by spreading load to avoid spikes; thus it helps in designing and implementing a scalable architecture. It also lets you provide a better user experience by performing long blocking tasks asynchronously. As we expand our usage of Hadoop, we will continue to assess which tasks are better suited for our worker queue system and which ones can benefit from Hadoop’s map/reduce design pattern. Using the right tool for the job is an important principle. Designing and implementing simple, scalable tools allows us to uphold that principle.

Also See