Sample Header Ad - 728x90

ADO.Net SqlBulkCopy, concurrency

0 votes
0 answers
124 views
I inherited a program that has to read a number of large jsonl zips, deserialize the json, normalize it out into a DataSet and then use SqlBulkCopy to save the results to a Sql Server database. Originally, it would have X number of threads processing each file separately but the performance (time and memory consumption) was very jaggy. It would read, 50000 or 100000 objects from the file and transform them into the DataSet, and then it would wait for SqlBulkCopy to complete synchronously in a loop. I wanted to introduce more parallelism both on the file reading and the db saving, so I added a db writer background thread. The file reading threads put 20000 json objects into DataSets and then put the DataSet into a BlockingCollection. The writer thread pulls the DataSets off and does the SqlBulkCopy calls (for some reason, maybe related to the issue I have a question about), the old code tried to randomize the order of the DataTables' SqlBulkCopy calls from the DataSets. Anyway, most of the time the new parallelization works nicely to smooth things out but when I have a larger than usual pile of files (all the threads busy for longer), occasionally the number of main rows in Sql Server won't equal the total from the manifest. The two times it has happened, oddly, it's been increments of *10000* rows that go missing (not the 20000 objects in each batch, and not the random dregs from the last run) I'm not seeing where my concurrency hole is here. Are multiple SqlBulkCopy operations to the same table from different threads in the app colliding? This is the background dbwriter thread code I put in place private static BlockingCollection datasetQueue = new BlockingCollection(); stopWriterToken = new CancellationTokenSource(); var token = stopWriterToken.Token; writingThread = Task.Factory.StartNew(() => { long timeSpentDbWrites = 0; var writers = new List(); // throw each DataSet on its own thread for writing for (; ; ) { try { writers.RemoveAll(w => w.IsCompleted); // thin the list of completed writes DataSet ds = null; if (datasetQueue.TryTake(out ds, -1, token)) writers.Add(Task.Run(() => { var sw = Stopwatch.StartNew(); var r = new Random(); using (var sqlConn = new SqlConnection(ConnectionString)) { sqlConn.Open(); foreach (var i in Enumerable.Range(0, count).OrderBy(x => r.Next())) { var table = set.Tables[i]; if (table.Rows.Count > 0) { using (var sqlBulkCopy = new SqlBulkCopy(sqlConn) { BulkCopyTimeout = Timeout, DestinationTableName = table.Name, BatchSize = ToSqlBatchSize }) { sqlBulkCopy.WriteToServer(table); } table.Clear(); } } } Interlocked.Add(ref timeSpentDbWrites, sw.ElapsedTicks); })); } catch (Exception e) { Logger.Error($"Exception processing result sets in background thread: {e.Message}"); } if ((datasetQueue.Count == 0 && datasetQueue.IsCompleted) || token.IsCancellationRequested) break; } Task.WaitAll(writers.ToArray()); // wait for all outstanding writers to finish Logger.Info($"Cumulative time on async DB Writes: {((timeSpentDbWrites * 1000) / Stopwatch.Frequency).ToString()} ms"); }, token);
Asked by user1664043 (379 rep)
Feb 18, 2022, 09:35 PM
Last activity: Feb 18, 2022, 10:23 PM