This article was originally published at tech.blinemedical.com

In .net 4, a new class called BlockingCollection was introduced, which let you have a threadsafe producer/consumer queue. Anyone consuming a BlockingCollection blocks automatically until new items are added. This lets you easily add items to the collection in one thread and use another synchronized thread to consume items. This class is great since before this existed, you had to do all this work with mutexes and it was a lot of extra work (and more error prone). In general, a good time to use a decoupled producer consumer pattern is when you have a slow consuming function and a producer thread that is time sensitive.

Even though BlockingCollection effectively synchronizes your producer/consumer, you still have to create boilerplate to manage the producer thread and the consumer thread. Also if you wanted to add extra exception handling or a cancellation token, you’d have to add all that yourself too. I wrapped this all up in a BlockingCollectionWrapper class that handles all this for you.

An example

Here is an example where the consumer takes one second each time it consumes an item.

  
private readonly ManualResetEvent \_testMutex = new ManualResetEvent(false);

[Test]  
public void TestCollection()  
{  
 // create the wrapper  
 var asyncCollection = new BlockingCollectionWrapper\<string\>();

asyncCollection.FinishedEvent += FinishedEventHandler;

// make sure we dispose of it. this will stop the internal thread  
 using (asyncCollection)  
 {  
 // register a consuming action  
 asyncCollection.QueueConsumingAction = (producedItem) =\>  
 {  
 Thread.Sleep(TimeSpan.FromSeconds(1));  
 Console.WriteLine(DateTime.Now + ": Consuming item: " + producedItem);  
 };

// start consuming  
 asyncCollection.Start();

// start producing  
 for (int i = 0; i \< 10; i++)  
 {  
 Console.WriteLine(DateTime.Now + ": Produced item " + i);  
 asyncCollection.AddItem(i.ToString());  
 }  
 }

// wait for the finished handler to pulse this  
 \_testMutex.WaitOne();

Assert.True(asyncCollection.Finished);  
}

private void FinishedEventHandler(object sender, BlockingCollectionEventArgs e)  
{  
 \_testMutex.Set();  
}  

This prints out

  
9/17/2012 6:22:43 PM: Produced item 0  
9/17/2012 6:22:43 PM: Produced item 1  
9/17/2012 6:22:43 PM: Produced item 2  
9/17/2012 6:22:43 PM: Produced item 3  
9/17/2012 6:22:43 PM: Produced item 4  
9/17/2012 6:22:43 PM: Produced item 5  
9/17/2012 6:22:43 PM: Produced item 6  
9/17/2012 6:22:43 PM: Produced item 7  
9/17/2012 6:22:43 PM: Produced item 8  
9/17/2012 6:22:43 PM: Produced item 9  
9/17/2012 6:22:44 PM: Consuming item: 0  
9/17/2012 6:22:45 PM: Consuming item: 1  
9/17/2012 6:22:46 PM: Consuming item: 2  
9/17/2012 6:22:47 PM: Consuming item: 3  
9/17/2012 6:22:48 PM: Consuming item: 4  
9/17/2012 6:22:49 PM: Consuming item: 5  
9/17/2012 6:22:50 PM: Consuming item: 6  
9/17/2012 6:22:51 PM: Consuming item: 7  
9/17/2012 6:22:52 PM: Consuming item: 8  
9/17/2012 6:22:53 PM: Consuming item: 9  

First, I created the blocking collection wrapper and made sure to put it in a using block since it’s disposable (the thread waiting on the blocking collection will need to be cleaned up). Then I registered a function to be executed each time an item is consumed. Calling Start() begins consuming. Once I’m done - even after the using block disposes of the wrapper - the separate consumer thread could still be running (processing whatever is left), but it is no longer blocking on additions and will complete consuming any pending items.

The wrapper

When you call .Start() we start our independent consumer thread.

  
/// \<summary\>  
/// Start the consumer  
/// \</summary\>  
public void Start()  
{  
 \_cancellationTokenSource = new CancellationTokenSource();  
 \_thread = new Thread(QueueConsumer) {Name = "BlockingConsumer"};  
 \_thread.Start();  
}  

This is the queue consumer that runs in the separate thread that executes the registered consumer action. The consuming action is locked to make changing the consuming action threadsafe.

  
/// \<summary\>  
/// The actual consumer queue that runs in a seperate thread  
/// \</summary\>  
private void QueueConsumer()  
{  
 try  
 {  
 // Block on \_queue.GetConsumerEnumerable  
 // When an item is added to the \_queue it will unblock and let us consume  
 foreach (var item in \_queue.GetConsumingEnumerable(\_cancellationTokenSource.Token))  
 {  
 // get a synchronized snapshot of the action  
 Action\<T\> consumerAction = QueueConsumingAction;

// execute our registered consuming action  
 if (consumerAction != null)  
 {  
 consumerAction(item);  
 }  
 }

// dispose of the token source  
 if (\_cancellationTokenSource != null)  
 {  
 \_cancellationTokenSource.Dispose();  
 }

//Log.Debug(this, "Done with queue consumer");

Finished = true;

if (FinishedEvent != null)  
 {  
 FinishedEvent(this, new BlockingCollectionEventArgs());  
 }  
 }  
 catch(OperationCanceledException)  
 {  
 //Log.Debug(this, "Blocking collection\<{0}\> cancelled", typeof(T));  
 }  
 catch (Exception ex)  
 {  
 //Log.Error(this, ex, "Error consuming from queue of type {0}", typeof(T));  
 }  
}  

And when the wrapper is disposed, we set CompleteAdding on the blocking collection which tells the collection to stop waiting for new additions and finish out whatever is left in the queue.

  
protected void Dispose(bool disposing)  
{  
 if(disposing)  
 {  
 if (\_queue !=null && !\_queue.IsAddingCompleted)  
 {  
 // mark the queue as complete  
 // the BlockingConsumer thread will now  
 // just process the remaining items  
 \_queue.CompleteAdding();  
 }  
 }  
}

public void Dispose()  
{  
 Dispose(true);  
}  

The remaining properties and functions on the wrapper let you

  • Force abort the consumer thread
  • Register a Finished event handler; disposing of the wrapper doesn’t mean that no more work is being done. It means that you are no longer adding items and the queue is effectively “closed”. Depending on your consumer function though, this could take some time to complete. This is why it’s good to hook into the finished event so you can be sure that all your processing is complete.
  • Manually mark the queue as AddedComplete (so the thread stops blocking)
  • Manually cancel the queue
  • Check if the queue is ended by looking at the Finished property

So to reiterate, the basic idea here is

  • Create a separate thread that has appropriate exception handling to be blocked while consuming the queued items
  • Handle cancellation gracefully
  • Be able to properly end our spawned thread so we don’t have anything leftover

It should be noted that even though this wrapper is built for a single consumer/single producer design, since we are leveraging GetConsumingEnumerable we could modify the wrapper to allow for multiple threads acting as consumers on the same enumerable. This could give us a single producer/multiple synchronized consumer pattern where only one consumer thread gets the particular item but multiple consumer threads exist and can do work.

Full source and tests provided at our github.