Sample Header Ad - 728x90

Can a new transaction claim an older sequence id?

0 votes
3 answers
115 views
I'm using a PostgresSQL database as an eventstore. We used to use https://github.com/SQLStreamStore/SQLStreamStore But they had issues when having a lot of parallel transactions. Essentially we suffered from a lot of 'skipped' events. A similar problem is explained here: https://github.com/eugene-khyst/postgresql-event-sourcing?tab=readme-ov-file#transactional-outbox-using-transaction-id So together with a co-worker we decided to fork the library and implement it using [pg_current_snapshot()](https://pgpedia.info/p/pg_current_snapshot.html) . We had a few iterations of this but in the end we got it working: https://github.com/ArneSchoonvliet/SQLStreamStore So the main idea is, if we see a gap in between positions we will only trust the events with a lower transaction_id than 'xmin'. This has worked great for us. And most problems are solved. But sometimes we have a weird occurrence
Position    MessageId                               CreatedAt                       TransactionId
31170300	be7b412a-103c-5cdd-8458-57fbb0e5c39e	2024-09-29 13:23:27.733 +0200	2306832989
31170299	38b9d7d9-540c-5440-a2a0-10b91cffb2ad	2024-09-29 13:23:27.736 +0200	2306832990
Query result
Position: 31170297, Array index: 0, Transaction id: 2306832974
Position: 31170298, Array index: 1, Transaction id: 2306832976
Position: 31170300, Array index: 2, Transaction id: 2306832989
Xmin: 2306832990
In the query result you see that 31170299 is missing. So our 'gap checking' code kicks in. And will check if all transactions_ids are lower than xmin. In this case they are... 31170299 wasn't visible yet. So as a result that event will be skipped. **Question** Is it expected that this can happen. A newer transaction claiming a lower seq value? We are using Google Cloud managed pgsql db Since I don't really know how we would ever be able to detect that without checking every time if transactions are still happening. But this would impact performance since we would lose a lot of time with 'actual' gaps (caused by transactions that are rolled back) People probably wonder what the insert / query sql looks like INSERT: https://github.com/ArneSchoonvliet/SQLStreamStore/blob/master/src/SqlStreamStore.Postgres/PgSqlScripts/AppendToStream.sql Important part:
INSERT INTO __schema__.messages (message_id,
                                 stream_id_internal,
                                 stream_version,
                                 created_utc,
                                 type,
                                 json_data,
                                 json_metadata,
                                 transaction_id)
SELECT m.message_id, _stream_id_internal, _current_version + (row_number()
    over ()) :: int, _created_utc, m.type, m.json_data, m.json_metadata, pg_current_xact_id()
FROM unnest(_new_stream_messages) m
ON CONFLICT DO NOTHING;
GET DIAGNOSTICS _success = ROW_COUNT;
As you can see the position isn't set. This is because it's an autoincrement defined like this: "position" int8 DEFAULT nextval('messages_seq'::regclass) NOT NULL QUERY: https://github.com/ArneSchoonvliet/SQLStreamStore/blob/master/src/SqlStreamStore.Postgres/PgSqlScripts/ReadAll.sql Important part:
BEGIN
  OPEN _txinfo FOR
  SELECT pg_snapshot_xmin(pg_current_snapshot());
  RETURN NEXT _txinfo;
    
  OPEN _messages FOR
  WITH messages AS (
      SELECT __schema__.streams.id_original,
             __schema__.messages.message_id,
             __schema__.messages.stream_version,
             __schema__.messages.position,
             __schema__.messages.created_utc,
             __schema__.messages.type,
             __schema__.messages.transaction_id,
             __schema__.messages.json_metadata,
             __schema__.messages.json_data,
             __schema__.streams.max_age
      FROM __schema__.messages
             INNER JOIN __schema__.streams ON __schema__.messages.stream_id_internal = __schema__.streams.id_internal
      WHERE  __schema__.messages.position >= _position
      ORDER BY __schema__.messages.position
      LIMIT _count
  )
  SELECT * FROM messages LIMIT _count;
  RETURN NEXT _messages;
END;
Asked by ErazerBrecht (101 rep)
Oct 9, 2024, 10:20 AM
Last activity: Nov 20, 2024, 11:30 AM