C# – Concurrent collections

By | 06/03/2024

In this post, we will see some concurrent collections that we could use in our multi-threaded application where, managing shared data between threads, can introduce complexity and potential error such as race conditions or deadlocks.
.NET addresses these cases with its concurrent collections, found in the System.Collections.Concurrent namespace. These collections are designed to be thread-safe without the need for external synchronization.
Let’s see how to utilize these collections with some examples.


ConcurrentQueue<T>
It is a thread-safe variant of the queue data structure, following a first-in, first-out (FIFO) order.
It’s particularly useful in producer-consumer scenarios where items are produced by one or more threads and consumed by others.

using System;
using System.Collections.Concurrent;
using System.Threading;

ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
int numberOfItems = 5;

// Start a producer thread that enqueues items
Thread producerThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        queue.Enqueue(i); // Enqueue an item
        Console.WriteLine($"Produced: {i}");
        Thread.Sleep(50); // Simulate work to allow consumer to process
    }
});

// Start a consumer thread that dequeues items
Thread consumerThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        int item;
        while (!queue.TryDequeue(out item)) // Try to dequeue an item
        {
            Thread.Sleep(50); // Wait if queue is empty
        }
        Console.WriteLine($"Consumed: {item}");
    }
});

// Start threads
producerThread.Start();
consumerThread.Start();

// Wait for both threads to complete
producerThread.Join();
consumerThread.Join();

If we run the application, the following will be the result:


ConcurrentStack<T>
It provides a thread-safe last-in, first-out (LIFO) stack.
It’s suitable for tasks that require reverse-order processing.

using System.Collections.Concurrent;

ConcurrentStack<int> stack = new ConcurrentStack<int>();
int numberOfItems = 5;

// Start a producer thread that pushes items onto the stack
Thread producerThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        stack.Push(i); // Push an item onto the stack
        Console.WriteLine($"Pushed: {i}");
        Thread.Sleep(50); // Simulate work
    }
});

// Start a consumer thread that pops items from the stack
Thread consumerThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        int item;
        while (!stack.TryPop(out item)) // Try to pop an item
        {
            Thread.Sleep(50); // Wait if stack is empty
        }
        Console.WriteLine($"Popped: {item}");
    }
});

// Start threads
producerThread.Start();
consumerThread.Start();

// Wait for both threads to complete
producerThread.Join();
consumerThread.Join();

If we run the application, the following will be the result:


ConcurrentyDictionary<TKey, TValue>
It is a thread-safe dictionary that allows concurrent read and write operations, making it ideal for caching scenarios or storing shared data that needs to be updated by multiple threads.

using System.Collections.Concurrent;

// Initialize a concurrent dictionary
ConcurrentDictionary<int, string> dictionary = new ConcurrentDictionary<int, string>();
int numberOfItems = 5;

// Start a thread to update the dictionary
Thread updateThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        dictionary[i] = $"Value {i}"; // Add or update value for the key
        Console.WriteLine($"Added key {i} with Value {i}");
        Thread.Sleep(100); // Simulate work

        // Attempt to update the key's value
        dictionary.TryUpdate(i, $"Updated Value {i}", $"Value {i+10}");
        Console.WriteLine($"Updated key {i} to Updated Value {i + 10}");
    }
});

// Start a thread to read from the dictionary
Thread readThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        string value;
        while (!dictionary.TryGetValue(i, out value)) // Try to get value for the key
        {
            Thread.Sleep(50); // Wait if the key is not yet added
        }
        Console.WriteLine($"Key {i} has value {value}");
    }
});

// Start threads
updateThread.Start();
readThread.Start();

// Wait for both threads to complete
updateThread.Join();
readThread.Join();

If we run the application, the following will be the result:


BlockingCollection<T>
It is a versatile collection that supports bounding and blocking functionalities. It’s particularly effective in producer-consumer scenarios where controlling the collection’s size or blocking until an operation can proceed is necessary.

using System.Collections.Concurrent;

// Initialize a blocking collection with a bounded capacity of 5
BlockingCollection<int> collection = new BlockingCollection<int>(5);
int numberOfItems = 5;

// Producer thread: Adds items to the collection
Thread producerThread = new Thread(() =>
{
    for (int i = 0; i < numberOfItems; i++)
    {
        collection.Add(i); // Add an item to the collection
        Console.WriteLine($"Produced: {i}");
        Thread.Sleep(50); // Simulate work to slow down the production rate
    }
    collection.CompleteAdding(); // Signals that no more items will be added
});

// Consumer thread: Takes and processes items from the collection
Thread consumerThread = new Thread(() =>
{
    // GetConsumingEnumerable() provides a blocking and consuming enumerator for the collection
    foreach (var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine($"Consumed: {item}");
        Thread.Sleep(100); // Simulate work to slow down the consumption rate
    }
});

// Start the producer and consumer threads
producerThread.Start();
consumerThread.Start();

// Wait for both threads to complete their work
producerThread.Join();
consumerThread.Join();

If we run the application, the following will be the result:



Leave a Reply

Your email address will not be published. Required fields are marked *