Sample Header Ad - 728x90

Using a Table as a Queue without sp_getapplock/sp_releaseapplock

7 votes
2 answers
580 views
I have a list of commands I need to execute, all of which are contained within a table I've named myQueue. This table is a little unique in that some commands should be *grouped together* such that their execution is performed sequentially, rather than concurrently, as executing them concurrently causes unwanted data artifacts and errors. Because of this, the queue cannot be classified in a typical **FIFO**/**LIFO** fashion as the dequeue order is determined at run-time. To summarize: 1. A Table named myQueue will act as a command queue (where dequeue order is determined at run-time) 2. Commands are added to the table in a random way 3. Commands may fall into *groups*, and if so, must be executed by a single worker thread in an ordered, sequential manner 4. Any number of worker threads can be running when commands are being dequeued 5. Dequeuing is performed via an UPDATE rather than a DELETE as this table is used for historical performance reporting for said commands My current approach is to iterate over this table using explicit mutex logic via [sp_getapplock](https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-ver15)/[ sp_releaseapplock](https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-ver15) calls. While this works as expected, the approach generates enough locking such that a high number of worker threads isn't feasible to iterate over the queue at any given time. After reading through Remus Rusanu's [excellent blog post on the topic](https://rusanu.com/2010/03/26/using-tables-as-queues/) , I decided to try utilizing table hints in hopes I could further optimize my approach. I'll include the test code below, but to summarize my results, the downside to using table hints and eliminating calls to sp_getapplock/sp_releaseapplock results in up to three undesirable behaviors as follows: 1. Deadlocking 2. Multiple threads execute commands that are contained within a single *group* 3. Thread Assignments are missing within a *group* of commands On a positive note though, when the code accommodates the deadlocking (e.g. retrying the offending operation as is currently included), the methods not using sp_getapplock/sp_releaseapplock and which don't exhibit undesirable behaviors 2 & 3 perform at least twice as fast, if not faster. **What I'm hoping for is that someone will point out how I'm not structuring my dequeing statements correctly so I can still move forward with using table hints exclusively.** If that doesn't work, so be it, but I wanted to see if it could be done just the same. The tests can be setup with the following code. The myQueue table creation and population with *commands* that are similar enough to my workload:
CREATE TABLE myQueue
(
	ID INT	IDENTITY (1,1) PRIMARY KEY CLUSTERED,
	Main	INT,
	Sub		INT,
	Detail	INT,
	Command	VARCHAR(MAX),
	Thread	INT,
	StartDT	DATETIME2,
	EndDT	DATETIME2
)
GO
INSERT INTO myQueue WITH (TABLOCKX) (Main, Sub, Detail, Command)
SELECT	ABS(CHECKSUM(NEWID()) % 200),
		ABS(CHECKSUM(NEWID()) % 1280),
		ABS(CHECKSUM(NEWID())),
		'WAITFOR DELAY ''00:00:00.01'''
FROM sys.types t1 CROSS JOIN 
	 sys.types t2 CROSS JOIN
	 sys.types t3 CROSS JOIN
	 (VALUES (1), (2)) t4(x)
GO

CREATE NONCLUSTERED INDEX [IX_myQueue_Update]
ON [dbo].[myQueue] ([Main],[Sub])
INCLUDE (Thread, EndDT)
GO
Worker Threads all follow the same logic. I recommend that if you run this locally, you just copy this code into separate query windows and run each query at the same time, making sure all Worker Threads adhere to the same locking method (there are 7 buried in the comments and surrounded by comment blocks):
SET NOCOUNT ON
DECLARE @updOUT TABLE
(
	Main	INT,
	Sub		INT
)
-- Update @CurrentThread as a unique ID, I tend to
SET NOCOUNT ON
DECLARE @updOUT TABLE
(
	Main	INT,
	Sub		INT
)
-- @CurrentThread should be a unique ID, which I'm assigning as @@SPID
DECLARE @CurrentThread INT = @@SPID, 
		@main INT, @sub INT,
		@id INT, @command VARCHAR(MAX), 
		@ErrorMessage NVARCHAR(4000)
WHILE	EXISTS(SELECT TOP 1 ID FROM myQueue WHERE EndDT IS NULL)
BEGIN
	BEGIN TRY

		--/*
		-- Method 1: Top 1 WITH TIES within CTE, direct update against CTE, Contained with sp_getapplock/sp_releaseapplock
		-- works
		-- high volume of xp_userlock waits
		BEGIN TRY
			BEGIN TRAN

				EXEC sp_getapplock @Resource = 'myQueue', @LockMode = 'Update'

				;WITH dequeue AS
				(
					SELECT TOP 1 WITH TIES
						Main, Sub, Thread
					FROM	myQueue
					WHERE	EndDT IS NULL
						AND	(Thread IS NULL OR Thread = @CurrentThread)
					ORDER BY Main, Sub
				)
				UPDATE	dequeue
				SET	Thread = @CurrentThread
				OUTPUT	DELETED.Main,
						DELETED.Sub
				INTO @updOUT

				EXEC sp_releaseapplock @Resource = 'myQueue'
			COMMIT
		END TRY
		BEGIN CATCH
			EXEC sp_releaseapplock @Resource = 'myQueue'
			ROLLBACK TRAN
		END CATCH
		--*/

		/*
		-- Method 2: Top 1 WITH TIES within CTE, direct update against CTE
		-- does not work
		-- some groupings contain multiple worker threads 
		-- missing thread assignments (e.g. NULL value in Thread Column)
		-- deadlocking experienced
		;WITH dequeue AS
		(
			SELECT TOP 1 WITH TIES
				Main, Sub, Thread
			FROM	myQueue WITH (ROWLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	dequeue
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT
		*/

		/*
		-- Method 3: Top 1 WITH TIES within CTE, join to myQueue table
		-- does not work
		-- some groupings contain multiple worker threads 
		-- missing thread assignments (e.g. NULL value in Thread Column)
		-- deadlocking experienced
		;WITH dequeue AS
		(
			SELECT TOP 1 WITH TIES
				Main, Sub, Thread
			FROM	myQueue WITH (ROWLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	myQ
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT
		FROM	myQueue myQ WITH (ROWLOCK, UPDLOCK, READPAST)
					INNER JOIN dequeue
						ON myQ.Main = dequeue.Main
						AND myQ.Sub = dequeue.Sub 
		*/

		/*
		-- Method 4: Top 1 within CTE, join to myQueue table
		-- does not work
		-- some groupings contain multiple worker threads
		;WITH dequeue AS
		(
			SELECT TOP 1
				Main, Sub, Thread
			FROM	myQueue WITH (ROWLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	myQ
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT
		FROM	myQueue myQ WITH (ROWLOCK, UPDLOCK, READPAST)
					INNER JOIN dequeue
						ON myQ.Main = dequeue.Main
						AND myQ.Sub = dequeue.Sub 
		*/

		/*
		-- Method 5: Top 1 WITH TIES within CTE, join to myQueue table, PAGLOCK hint instead of ROWLOCK
		-- works*
		-- deadlocking experienced
		;WITH dequeue AS
		(
			SELECT TOP 1 WITH TIES
				Main, Sub, Thread
			FROM	myQueue WITH (PAGLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	myQ
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT
		FROM	myQueue myQ WITH (PAGLOCK, UPDLOCK, READPAST)
					INNER JOIN dequeue
						ON myQ.Main = dequeue.Main
						AND myQ.Sub = dequeue.Sub 
		*/

		/*
		-- Method 6: Top 1 WITH TIES within CTE, direct update against CTE, PAGLOCK hint instead of ROWLOCK
		-- works*
		-- deadlocking experienced
		;WITH dequeue AS
		(
			SELECT TOP 1 WITH TIES
				Main, Sub, Thread
			FROM	myQueue WITH (PAGLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	dequeue
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT

		*/

		/*
		-- Method 7: Top 1 within CTE, join to myQueue table, PAGLOCK hint instead of ROWLOCK
		-- works*
		-- deadlocking experienced
		;WITH dequeue AS
		(
			SELECT TOP 1
				Main, Sub, Thread
			FROM	myQueue WITH (PAGLOCK, UPDLOCK, READPAST)
			WHERE	EndDT IS NULL
				AND	(Thread IS NULL OR Thread = @CurrentThread)
			ORDER BY Main, Sub
		)
		UPDATE	myQ
		SET	Thread = @CurrentThread
		OUTPUT	DELETED.Main,
				DELETED.Sub
		INTO @updOUT
		FROM	myQueue myQ WITH (PAGLOCK, UPDLOCK, READPAST)
					INNER JOIN dequeue
						ON myQ.Main = dequeue.Main
						AND myQ.Sub = dequeue.Sub 
		*/

		SELECT	TOP 1 
			  @main = Main
			, @sub = Sub
		FROM @updOUT

		END TRY
		BEGIN CATCH
			SELECT @ErrorMessage = 'Msg ' + CAST(ERROR_NUMBER() AS VARCHAR(10)) + ', Level ' + CAST(ERROR_SEVERITY() AS VARCHAR(10)) 
			+ ', State ' + CAST(ERROR_STATE() AS VARCHAR(10)) + ', Line ' + CAST(ERROR_LINE() AS VARCHAR(10))
			+ CHAR(13) + CHAR(10) + ERROR_MESSAGE()

			RAISERROR(@ErrorMessage, 1, 1) WITH NOWAIT

			-- Set to Uselss values so cursor doesn't fire
			SELECT @main = -1, @sub = -1
		END CATCH

		DELETE FROM @updOUT

		DECLARE WorkQueueCur INSENSITIVE CURSOR
		FOR
			SELECT	ID, Command
			FROM	myQueue
			WHERE	Main = @main
				AND Sub = @sub
			ORDER BY Detail

		OPEN WorkQueueCur

		FETCH NEXT FROM WorkQueueCur
		INTO @id, @command

		WHILE @@FETCH_STATUS = 0
		BEGIN

			RETRY1:

			BEGIN TRY
				UPDATE	myQueue
				SET StartDT = GETDATE()
				WHERE ID = @id
			END TRY
			BEGIN CATCH
				SELECT @ErrorMessage = 'Retry1: Msg ' + CAST(ERROR_NUMBER() AS VARCHAR(10)) + ', Level ' + CAST(ERROR_SEVERITY() AS VARCHAR(10)) 
				+ ', State ' + CAST(ERROR_STATE() AS VARCHAR(10)) + ', Line ' + CAST(ERROR_LINE() AS VARCHAR(10))
				+ CHAR(13) + CHAR(10) + ERROR_MESSAGE()

				RAISERROR(@ErrorMessage, 1, 1) WITH NOWAIT

				GOTO RETRY1
			END CATCH

			EXEC(@command)

			RETRY2:
			
			BEGIN TRY
				UPDATE	myQueue
				Set	EndDT = GETDATE()
				WHERE ID = @id
			END TRY
			BEGIN CATCH
				SELECT @ErrorMessage = 'Retry2: Msg ' + CAST(ERROR_NUMBER() AS VARCHAR(10)) + ', Level ' + CAST(ERROR_SEVERITY() AS VARCHAR(10)) 
				+ ', State ' + CAST(ERROR_STATE() AS VARCHAR(10)) + ', Line ' + CAST(ERROR_LINE() AS VARCHAR(10))
				+ CHAR(13) + CHAR(10) + ERROR_MESSAGE()

				RAISERROR(@ErrorMessage, 1, 1) WITH NOWAIT

				GOTO RETRY2
			END CATCH

			FETCH NEXT FROM WorkQueueCur
			INTO @id, @command
		END

		CLOSE WorkQueueCur
		DEALLOCATE WorkQueueCur


END
Confirmation of undesirable behaviors 2 and 3 (or lack thereof), above can be determined by running the following statement:
;WITH invalidMThread AS (
	SELECT	*, DENSE_RANK() OVER (PARTITION BY Main, Sub ORDER BY Thread) AS ThreadCount
	FROM	dbo.myQueue WITH (NOLOCK)
	WHERE	StartDT IS NOT NULL
), invalidNThread AS (
	SELECT	*
	FROM	dbo.myQueue WITH (NOLOCK)
	WHERE	Thread IS NULL
			AND StartDT IS NOT NULL
)
SELECT	t1.*, 'Multiple Threads' AS Issue
FROM	dbo.myQueue t1 WITH (NOLOCK) 
		INNER JOIN invalidMThread i1
			ON i1.Main = t1.Main
			AND i1.Sub = t1.Sub
WHERE	i1.ThreadCount > 1

UNION

SELECT	t1.*, 'Unassigned Thread(s)' AS Issue
FROM	dbo.myQueue t1 WITH (NOLOCK) 
		INNER JOIN invalidNThread i2
			ON i2.Main = t1.Main
			AND i2.Sub = t1.Sub

ORDER BY t1.Main, t1.Sub
Again, I fully anticipate I missed some critical point Remus made in the blog post, so any help in pointing that out would be very much appreciated.
Asked by John Eisbrener (9547 rep)
Feb 6, 2020, 02:58 PM
Last activity: Jan 26, 2023, 04:34 PM