DEV Community

Andrew Werner
Andrew Werner

Posted on • Edited on

Quick and Easy, Exactly-Once, Distributed Work Queues Using Serializable Transactions

Coordinating work across multiple processes almost always pops up in one form or another when building applications. There are well-known recipes to perform this coordination on top of both distributed coordination services like etcd or Apache Zookeeper/Curator, and single-machine SQL databases like Postgres. Given the strong consistency of CockroachDB, similar patterns always seemed possible but weren't well documented. This short post tries to provide a recipe to achieve correct, fault-tolerant, distributed coordination on top of CockroachDB.

I want to clarify that providing true execute-exactly-once semantics or mutual exclusion for work outside of the coordinating storage system is impossible to provide in a fault-tolerant, highly available way (see Section 3.2 in the Jepson etcd evaluation). In an arbitrarily asynchronous network where nodes may pause for unlimited amounts of time, we can’t know whether an operation is failed or slow.

Fortunately, we can get pretty close by making some reasonable assumptions about failure detection. Secondly, we can provide a different guarantee: exactly once acknowledgement. With this guarantee, only one task will ever be able to successfully acknowledge completion of some work item. If the completion of the task means writing a result to the database, this means we can achieve exactly-once semantics 🎉.

The key concept that enables coordination over work items is the session. The session encapsulates coordinated failure detection. The system will refer to sessions when attempting to perform work. Work items can only be successfully consumed by workers which have a live session throughout the entire processing of a work item.

Imagine we have work items that we want to acknowledge having worked on exactly once. To do this we’ll need two tables, a sessions table and an items table. The trick is that we’re going to use the sessions table to track the liveness of our workers. Each worker will heartbeat its session to indicate its liveness. Workers will lock work items prior to processing by associating the item with their session.

We’ll have workers do 3 things continuously:

1) Heartbeat the sessions table to keep their session alive.
2) Poll the sessions table to see if there are any expired sessions which need to be deleted.
3) Poll the items table to claim work items.

For each work item which is claimed, we'll then process the work item, then run a transaction which records its outcome and removes it from the queue. The code will ensure, transactionally, that the session under which the item was claimed is still valid when recording the result.

Sessions and Heartbeating

CREATE TABLE sessions (
    id
        UUID DEFAULT gen_random_uuid() PRIMARY KEY,
    heartbeated_at
        TIMESTAMPTZ NOT NULL
    -- Probably also a good idea to put some metadata about the session
    -- e.g. the hostname and pid which owns the session and maybe the
    -- timestamp at which it was created.
);

When a process starts up, it will need to create a session:

INSERT INTO sessions (heartbeated_at) VALUES (now()) RETURNING id

Then, the session will need to kick off a thread to periodically update the heartbeated_at timestamp with a new value of now. This should happen generally quite a bit more frequently than the expiration (which you’ll also need to pick).

-- Here $1 is the session ID for the process.
UPDATE sessions SET heartbeated_at = now() WHERE id = $1

If the process fails to heartbeat, likely because your session has expired, then the code should exit the worker process as it won’t be able to acknowledge any work that is currently claimed. This is the case where the system thinks your worker session has failed.

Removing Expired Sessions

We want to ensure that live sessions are eventually able to claim work items that had previously been claimed by now dead sessions. We do this by detecting expired sessions and deleting them.

-- Here $1 is the chosen expiration as an INTERVAL.
DELETE FROM sessions WHERE heartbeated_at < now() - $1

Reasonable values for the heartbeat interval and expiration might be 1 and 5 seconds respectively. With these settings, it would take 5 seconds for a failed node's items to be claimed by another node. Note that during graceful shutdown, the process can remove its session cooperatively, before it has expired, meaning work can be claimed immediately.

Claiming and Processing Work

We'll need an items table to store work items. We could use an index to order the items or just grab them arbitrarily.

CREATE TABLE items (
    id
        UUID NOT NULL PRIMARY KEY,
    claim
        UUID REFERENCES sessions (id) ON DELETE SET NULL
    -- other fields related to score, details of the item, etc.
    -- You could use score to prioritize item. 
    -- Maybe time added and then order by that? 
    -- Also you could create an index.
);

In our work claiming loop, while we have capacity to grab outstanding work items, we'll ask the database for a new item by periodically polling. The polling rate should be chosen based on the latency requirements of the application. Another consideration is the total number of worker nodes. If more nodes are polling for items, consider setting the polling rate to be less frequent. Something on the order of 100-200ms seems pretty reasonable for lots of applications with 10's of worker nodes.

We'll use the below query to find a work item and claim it. We could add an ORDER BY clause to prioritize work items based on properties.

-- Here $1 is the process's session ID.
UPDATE items SET claim = $1 WHERE claim IS NULL LIMIT 1 RETURNING id

Here’s the magic to achieve exactly once semantics: in the processing of this item, you will use a transaction that records the outcome of the item and also deletes the item from the queue. It’s critical that when that transaction runs, it ensures that the process’s session still has the claim on the item. That looks like:

-- do some processing of the item.
BEGIN;

--- ... Record the results of processing the item.

-- Here $1 is the item id and $2 is the process's session.
DELETE FROM items WHERE id = $1 AND claim = $2;
-- SQL clients give access to the number of rows affected by a statement.
-- Ensure that the DELETE affected 1 row. If it didn't, then your session
-- has lost its claim and the transaction needs to be aborted with a
-- ROLLBACK. Otherwise, proceed to COMMIT.

COMMIT;

If the above transaction commits, because of the properties of serializable isolation and transactions, you’ll be sure that the item was processed exactly once!

Top comments (4)

Collapse
 
spaskob profile image
Spas Bojanov

Great article!

How would you approach use cases where the workers need to "commit" their result outside of the database? For example, backing up a table to a file system. Say the task is to backup table foo to file /backups/foo.csv. The worker can create a temp file with the backup and can rename it to the final name /backups/foo.csv as to make sure that only one worker succeeds. Unfortunately the worker won't be able to do that in the same transaction that deletes the work item. I guess one approach may be to rename the temp file before deleting the item from the database hence actually "committing" the work before the item is deleted. If the worker dies immediately after, the next worker that picks the work item would notice that the work has been done by checking for the existence of file named /backups/foo.csv and will delete the item from the table.
I am curious what your thoughts are on the above.

Collapse
 
ajwerner profile image
Andrew Werner

Yeah the fact that the backup is itself an idempotent operation means that there's no hazard here due to the interaction with the external system. Even if the second iteration could not detect that the first iteration had completed its work, everything here would be okay. The detection of the file utilized the fact that rename is an atomic operation that itself does indicate complete success.

The problem becomes much trickier if instead the task has non-idempotent side-effects. Coordinating with sending a push-notification for example is a real bummer. Sometimes those sorts of systems will provide a mechanism whereby the publishing of the event is made to be idempotent. For example documentation.onesignal.com/docs/i....

Collapse
 
georgysavva profile image
Georgy Savva • Edited

Thanks a lot for spotting a light on this interesting topic!
I have a few questions about scalability and indexes in this type of workflow. Maybe you could help me figure them out.

  1. Let's imagine that we have our queue (cockroachDB) and workers handling a throughput of N tasks per second. For the sake of this example, items table has only 1 index on the id field (primary key).
    And now load increases and we need to handle 4N tasks per second. Scaling workers is simple, I just increase the number of workers by 4 and it's fine. I also should increase the number of cockroachdb nodes by 4 to scale the queue part as well. It will work perfectly when we put to the queue because we evenly distributed requests based on item id to a single leaseholder. But I am a bit concern about when we get from the queue, i.e. this query:

    UPDATE items SET claim = $1 WHERE claim IS NULL LIMIT 1 RETURNING id

    Does LIMIT 1 mean that cockroachdb can serve data from the gateway node if it contains the suitable record and doesn't need to call all other leaseholders in the cluster? If yes, it's great. Because as far as I understand without LIMIT 1 cockroachdb will need to search on all leaseholders i.e. involve all nodes and increasing the number of nodes by 4 won't scale this part very well. Or I missing something?

  2. You were mentioning that we can add a priority for items. Let's imagine items table have created_at timestamp field. And it still has only 1 index on the id field.
    We want our queue to act as FIFO, i.e. we get items with the lowest created_at:

    UPDATE items SET claim = $1 WHERE claim IS NULL ORDER BY created_at LIMIT 1 RETURNING id

    Does it mean that, again, cocroachdb needs to get items from all leasholders, to pick the one with the lowest created_at and hence involve all nodes in the cluster? Is it linearly scalable with the number of nodes?

  3. Let's assume that we do have an index on the claim field in the items table. All new records that we insert into the items table have NULL in the claim field. Doesn't it mean that we always insert into the same range, like when we have an index on an always increasing field? Can it become a bottleneck? And what happens if this single range for NULL value exceeds the limit (64 MB), can it be split?

Collapse
 
ajwerner profile image
Andrew Werner

I feel like I should start by saying that this scheme is especially well suited to tasks where the processing time is much larger than the time it takes to acquire the lease. Hopefully that contextualizes some of the scalability concerns you raise, as they are generally valid.

  1. When performing a LIMIT 1 query here we'll always scan the primary index in its sorted order. As you pointed out, this will require going to at least one leaseholder and potentially many. Furthermore, it will get involved in contention. In the best case, concurrent requests to acquire will serialize in memory due to the SELECT FOR UPDATE read locks added in 20.1. The point of this design is not to optimize concurrent lease acquisition. We'll explore how to better optimize concurrency for a different set of assumptions below.

  2. I was imagining that in the scenario with the FIFO, to make it efficient, we'd either add the ordering condition to the primary key or add a secondary index. Perhaps the best would be to add a partial secondary index (new feature in 20.1) over unclaimed items. That way you'd at least only need to go to one leaseholder rather than all of them.

  3. We can split secondary index ranges so I don't think this is really a problem. It is the case that you'll be constantly contending to read and update that index which is likely to lead to serialization of concurrent requests. Fortunately the more pessimistic locking should lead to reasonable behavior here rather than the backoff retry loops that might have occurred in previous versions.

As I noted at the beginning, none of these scalability concerns really come up if the work being done for each task takes orders of magnitude longer than acquiring the lease. That was the envisioned use case for this article and I should have spelled that out more clearly. A different set of requirements would be for an event processing system that wants to process short events at high throughput. In that setting, any contention reading or processing events would be completely detrimental to scalability. It is also the case that in that world, one is likely to be willing to accept slightly different performance characteristics. In this set of requirements I'd encourage us to look towards other pub-sub systems like kafka. Kafka provides parallelism by statically sharding the topic. This means that each partition becomes a FIFO but the whole topic is no longer totally ordered. In the initial presentation of this article, we didn't have order, and so adding a level of coarse-grained locking above the work pool is easy to imagine and would help to avoid contention.

Let's for simplicity just talk about static partitioning. We could imagine later talking about dynamic partitioning but it'd be slightly more involved. Even this is going to appear somewhat kludgy given we're just talking pseudo-code but I hope you get the picture.

To avoid contention we're going to wait to add another layer of indirection whereby sessions grab locks on portions of the items table.

CREATE TABLE session_parts (
    id UUID REFERENCES sessions (id) ON DELETE CASCADE, part INT8,
    PRIMARY KEY (id, part)
);
Enter fullscreen mode Exit fullscreen mode
CREATE TABLE items (
    id UUID NOT NULL PRIMARY KEY, claim UUID, claim_part INT8,
    CONSTRAINT fk_session_parts
        FOREIGN KEY (claim, claim_part)
        REFERENCES session_parts (id, part)
        ON DELETE SET NULL
);
Enter fullscreen mode Exit fullscreen mode

So as written I'm leaving a lot of logic up to the client but I envision that we'd have the client configured to have a certain number of parts that should be claimed. Then the sessions will claim parts by keeping track of the set of existing sessions and then choosing the parts based on lexical order. For example, imagine there are 2 sessions and we want to split it into 8 parts.

INSERT INTO sessions VALUES ('b4b3722f-ac65-4495-afca-5883da1ff861', 'c11f8e9b-1d67-493a-98d1-a7a3f645ab2a');
Enter fullscreen mode Exit fullscreen mode

These two sessions would notice each other and the first would grab parts 0-3 and the second would grab parts 4-7. Then, each session would use its parts to break up the item keyspace. For example, for part 0 you might do:

UPDATE
    items
SET
    claim = $1, claim_part = $2
WHERE
    claim IS NULL
    AND id >= '00000000-0000-0000-0000-000000000000'
    AND id < '20000000-0000-0000-0000-000000000000'
LIMIT
    1
RETURNING
    id;
Enter fullscreen mode Exit fullscreen mode

Let's say a new session popped up. The other sessions would eventually detect that (let's say with polling though there could be other means). Then they'd determine the need to drop some parts.

INSERT INTO sessions VALUES ('c006dd7c-1880-40ce-a8e6-ac07aa22163d')
Enter fullscreen mode Exit fullscreen mode

In this case the breakdown would be so the first session would drop parts 2 and 3 and the second session would drop part 4.
(0,1), (2, 3, 4), (5, 6, 7)

That way each part's claims would be completely uncontented. The problem with this is that failover to pick up new claims and increased latency due to imbalance in the partitions. For what it's worth, this is exactly how kafka consumer groups work.

Hope this all made sense!