C++ Channel: A thread-safe container for sharing data between threads

Threads synchronization is a common task in multithreading applications. You cannot get away without some form of protecting the data that is accessed from multiple threads. Some of the concepts of protecting data are mutexes and atomic variables, and they are common for programming languages that support multithreading.

There is another concept that offers the same features in a different way. Instead of requiring you to explicitly protecting data, it forces you to think about how data “flows” through your application and implicitly on the threads you’re using. This is what a channel is for and one of the languages that offer it is Go. A channel feels very natural to use, hiding a lower-level implementation of using data across threads.

And this is something I wanted to implement in C++ when I found out what the standard library offers for multi-threading. Why? Just to practice thread-safe C++. A channel has a far more complex and different implementation than you’ll find here. What I have done is a synchronized queue with a channel feeling.

I wanted to have a container that is very easy to use. Data should get in on some threads and come out on some other threads, and the operations must be thread-safe:

int in = 1;
in >> channel;

int out = 0;
out << channel; // out is 2

This is the most common and simple use case.

Another common situation is to continuously read data from a channel:

while (true) {
    out << channel;
}

Or, for a better C++ approach, using a range-based for loop:

for (auto out : channel) {
    // do something with "out"
}

A particularity of a channel is that it can hold an unlimited or a limited number of elements. For the unlimited case, the channel is called unbuffered. It allows elements to be pushed forever. But sometimes you want to control the quantity of data your handling, you want to… buffer the channel. Because every element processed from a channel implies consuming some resources. After a specific number of elements (capacity), you may want to stop receiving more, blocking the thread you’re using to read elements from the channel until some elements are being taken out (popped) from the channel.

Also, when iterating a channel, the loop should be blocking. When the last element is read, the loop will block the thread until new elements are being pushed. Of course, you can break the loop anytime you want.

Other operations you may need are regarding the number of elements currently in the channel.

channel.size(); // Returns the number of elements in the channel
channel.empty(); // Returns true if there are no elements in channel

 

For small data, pushing can be made by copy, but some elements could be too large and you’ll want to move them into the channel:

std::move(in) >> channel;

 

Stability and performance are important for this type of container, so the copy and move operations of a channel are forbidden (at least at the time of writing).

 

The API looks like this:

template <typename T>
class Channel {
   public:
    explicit constexpr Channel(size_type capacity = 0);

    template <typename Type>
    friend void operator>>(const Type&, Channel<Type>&);

    template <typename Type>
    friend void operator>>(Type&&, Channel<Type>&);

    template <typename Type>
    friend void operator<<(Type&, Channel<Type>&);

    NODISCARD size_type constexpr size() const;

    NODISCARD bool constexpr empty() const;

    iterator begin() noexcept;
    iterator end() noexcept;

    Channel(const Channel&) = delete;
    Channel& operator=(const Channel&) = delete;
    Channel(Channel&&) = delete;
    Channel& operator=(Channel&&) = delete;
    virtual ~Channel() = default;
};

 

The implementation is not too complicated:

    • When an element is pushed, a lock is acquired, and if the channel is not full (for buffered channels), the element is pushed into a queue. If the channel is full, the thread is blocked until an element is popped from a reading thread. After pushing the element, the read operation is allowed by notifying one of the waiting threads that it can continue (and the acquired lock is unlocked).
    • When an element is about to be popped, the situation is similar, except the thread is blocked if there are no more elements in the channel until an element is pushed.
    • The channel uses a blocking iterator, which is just a form of an input iterator that reads from the channel as stated at the point above.
    • The size and empty operations are the ones a queue offers.

The breakdown of the push operation by copy is (follow the comments):

// Type is the type of the elements
template <typename Type>

// The input stream operator is overloaded with the value to be pushed and the channel
void operator>>(const Type& in, Channel<Type>& ch)
{
    // A unique lock is acquired to protect the queue
    std::unique_lock<std::mutex> lock{ch.mtx};

    // For a buffered channel, the current thread will block if the channel is full...
    if (ch.cap > 0 && ch.queue.size() == ch.cap) {
        // ...until the number of elements is less than the capacity
        ch.cnd.wait(lock, [&ch]() { return ch.queue.size() < ch.cap; });
    }

    // The element is pushed on the queue
    ch.queue.push(in);

    // One of the waiting threads is notified that it can continue
    ch.cnd.notify_one();
}

For the push operation by moving, the only difference is I’m using perfect forwarding.

The read operation is not so different. The same lock is acquired, but the calling thread is blocked if there are no elements in the channel until an element is pushed. Then an element is popped and a waiting thread is notified. Reading benefits of the move operation.

template <typename Type>
void operator<<(Type& out, Channel<Type>& ch)
{
    std::unique_lock<std::mutex> lock{ch.mtx};
    ch.cnd.wait(lock, [&ch] { return ch.queue.size() > 0; });

    out = std::move(ch.queue.front());
    ch.queue.pop();

    ch.cnd.notify_one();
}

 

Another important detail is the iterator. As I mentioned earlier, it is an input iterator that is given a channel (by templating) to pop elements from.


// The increment operator does nothing, because advancing to the next element will be done by the channel read operation
BlockingIterator<Channel> operator++() { return *this; }

// The not equal operator returns true to have an infinite iteration
bool operator!=(BlockingIterator<Channel>) { return true; }

// Dereferencing will pop an element from the channel
value_type operator*()
{
    value_type value{};
    value << ch;

    return value;
}

 

All other implementation aspects, tests, benchmarks, and examples can be found on the C++ Channel GitHub repository.

The outcome of this exercise is that I learned more about how to synchronize threads, how to build a simple iterator, I understood the benefits of move semantics and perfect forwarding, improved my templating skills. And it was the first time I used Google Benchmark. I already started a list of improvements and features that I will probably work on in the future.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.