Building a Robust Task Queue with PostgreSQL

At my current work, we're building a data analytics platform for marketing data. Imagine something like Excel for massive datasets. One key feature is the ability for customers to define formulas on column values, similar to Excel formulas.

Now, picture this: we're dealing with tens of millions of rows, and customers can define hundreds of rules that need to be applied across all this data. Our goal is to create a system that performs these operations efficiently and within our Service Level Agreements (SLAs).

Initially, a single-core processing approach was sufficient. However, as our data volume grew, we needed to significantly speed up processing to meet our SLAs. Fortunately, these operations are highly parallelizable, as the effect on each row is independent of the others.

Since our data was already stored in PostgreSQL, and given the [meme](https://www.amazingcto.com/postgres-for-everything/) that "Postgres can be used for anything," we decided to leverage its power as a task queue using the SELECT ... FOR UPDATE SKIP LOCKED feature.

Our primary challenge was to ensure that each worker could pick a unique task (a row in data_object) without stepping on each other's toes. We needed a mechanism that would:

  1. Prevent duplicate processing: Ensure no two workers process the same row.
  2. Distribute tasks fairly: Allow workers to pick available tasks efficiently.
  3. Handle failures gracefully: Ensure that if a worker fails, the task can be re-processed.

How it works

We have a table where each row corresponds to an idempotent task, which is indexed on id. We have multiple workers waiting for tasks. Each worker will pick tasks using the following procedure

START TRANSACTION;
SELECT * FROM data_objects FOR UPDATE SKIP LOCKED LIMIT 1;
-- Process the job
COMMIT;
  • FOR UPDATE: This clause locks the selected rows, preventing other transactions from modifying them until the current transaction is committed or rolled back.
  • SKIP LOCKED: This clause instructs PostgreSQL to skip any rows that are already locked by another transaction.

Multiple workers compete to claim the job. The database ensures that only one worker gets the lock. However in this case the other workers will move on to the next record. They will not wait. That's what SKIP LOCKED does.

Scaling Challenges and the Introduction of Work Batches

As our data volume continued to grow, we started to observe performance degradation. While SELECT ... FOR UPDATE SKIP LOCKED was effective initially, we encountered challenges when scaling to a significantly larger number of workers and tasks.

The primary bottleneck was lock contention on the data_objects table. With many workers attempting to acquire locks on individual rows, the database spent a significant amount of time managing lock requests, leading to increased latency and reduced throughput.

To mitigate this, we introduced a new table, work_batch, to manage tasks in batches. The work_batch table stores ranges of data_object IDs, representing batches of work. Here's a simplified schema:

CREATE TABLE work_batch (
    id SERIAL PRIMARY KEY,
    start_id INTEGER NOT NULL,
    end_id INTEGER NOT NULL,
    processed BOOLEAN DEFAULT FALSE
);
  1. Master Batch Creation: Before workers begin processing, a "master" process creates batches of work by inserting ranges of data_object IDs into the work_batch table. For example, a batch might represent 1000 consecutive IDs.
  2. Worker Batch Acquisition: Workers now acquire locks on rows in the work_batch table instead of individual rows in data_object. This significantly reduces lock contention.
  3. Worker Processing: Once a worker acquires a batch, it processes all the data_object rows within the start_id and end_id range.
  4. Batch Completion: After processing all rows in the batch, the worker marks the batch as processed in the work_batchtable.

Considerations: Building a Robust System

While work batches improved scaling, several factors are key for a reliable task queue:

  • Batch Size: Optimize for balance; smaller increases lock contention, larger risks worker failures.
  • Worker Heartbeats: Implement to detect failures and prevent stuck tasks.
  • Cleanup: Periodically remove completed batches to manage table growth.
  • Master Coordination: Ensure master process robustness and consider failure recovery.
  • Error Handling: Implement retries and dead letter queues for failed batches.
  • Monitoring: Track queue length, throughput, and errors; set up alerts.
  • Transactions: Use transactions for data integrity; manage size to avoid performance issues.
  • Connection Pooling: Use connection pools for efficient database access.
  • Idempotency: Ensure workers can safely re-process tasks.
  • Prioritization: If needed, prioritize tasks with a priority column.