Sample Header Ad - 728x90

Duplicate key errors in Postgres queue-like table when rows should be locked by SELECT FOR UPDATE SKIP LOCKED

2 votes
1 answer
457 views
I have a table called queue with items that need to be processed:
CREATE TABLE public.queue (
    id serial NOT NULL
        CONSTRAINT queue_pkey
            PRIMARY KEY
);
Another table process represents processed items from the queue (e.g. a report has been generated). In reality, there are more such tables (there are more processes that need to be performed on an item). There is one-to-one relation between queue and process – each item can be processed just once.
CREATE TABLE public.process (
    id            serial  NOT NULL
        CONSTRAINT process_pkey
            PRIMARY KEY,
    queue_item_id integer NOT NULL
        CONSTRAINT process_queue_item_id_key
            UNIQUE
        CONSTRAINT process_queue_item_id_8953ec7b_fk_datastore
            REFERENCES public.queue
            DEFERRABLE INITIALLY DEFERRED
);
Here are some test data:
BEGIN;
TRUNCATE TABLE queue, process
    RESTART IDENTITY CASCADE;
INSERT INTO queue
SELECT GENERATE_SERIES(1, 10000);
COMMIT;
The worker that processes the items is implemented as follows. The code is in Django (Python framework), but I am convinced that my error is not caused by Django or its ORM. (For simplicity, there is no termination condition.)
while True:
    with transaction.atomic():
        queue_items = Queue.objects \
                          .filter(process=None) \
                          .order_by() \
                          .select_for_update(skip_locked=True, of=('self',))[:8]

        print('Generating report...')
        time.sleep(0.1)

        Process.objects.bulk_create(
            (Process(queue_item=q)
             for q in queue_items)
        )
Here is a transcript of the SQL queries that travel to the database:
-- while True:

BEGIN;

SELECT queue."id"
FROM queue
         LEFT OUTER JOIN "process"
                         ON (queue."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
LIMIT 8 FOR UPDATE OF queue SKIP LOCKED;

-- print('Generating report...')
-- time.sleep(0.5)

INSERT INTO "process" ("queue_item_id")
VALUES (1),
       (2),
       (3),
       (4),
       (5),
       (6),
       (7),
       (8)
RETURNING "process"."id";

COMMIT;
If I start one worker, the queue is processed perfectly fine. If I run two or more workers, I start getting this error:
duplicate key value violates unique constraint "process_queue_item_id_key"
DETAIL:  Key (queue_item_id)=(**) already exists.
**How another transaction could create rows in process for items in a queue when those rows are locked?** What I tried: 1. I tried to rewrite SELECT query with EXISTS:
SELECT "queue"."id"
  FROM "queue"
 WHERE NOT (EXISTS(SELECT U0."id", U0."queue_item_id" FROM "process" U0 WHERE U0."queue_item_id" = "queue"."id"))
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED
without success, the same error occurs. 2. If I arrange the rows randomly, the error occurs much later (almost at the end of the queue).
SELECT "queue"."id"
  FROM "queue"
  LEFT OUTER JOIN "process"
    ON ("queue"."id" = "process"."queue_item_id")
 WHERE "process"."id" IS NULL
 ORDER BY RANDOM()
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED
3. I put a breakpoint in the middle of the transaction and in another transaction I checked that the row locking in my opinion works correctly:
SELECT id
FROM queue
WHERE id NOT IN (
    SELECT id
    FROM queue
        FOR UPDATE SKIP LOCKED
);
* My Postgres version: PostgreSQL 13.1, compiled by Visual C++ build 1914, 64-bit. * Each worker has its own connection to Postgres with default isolation level (read committed).
Asked by illagrenan (123 rep)
Feb 8, 2021, 03:20 PM
Last activity: Feb 9, 2021, 03:58 PM