Crafting a concurrent queue in Golang

Crafting a concurrent queue in Golang:

 Queue is one of the most important data structure. Using queues, we can model many computational tasks easily and efficiently. Queues are used everywhere, in Operating systems, they are used to schedule processes, in routers, queues are used as buffers to store packets before processing them. Even cloud native applications composed of miroservices and workers use queues in between them for communication. Queues are extensively used for asynchronous processing.

What is a Job queue?

If you have ever used node.js, you would be familiar with job queues already. The so called Event Loop in node is nothing but a job queue which schedules asynchronous operations to any on the free uv-threadpool workers. This type of pattern is common in many systems out there. So here is how queue is used for asynchronous processing, consider the scenario below:

  1. There will be a separate thread or a group of threads waiting for some kind of input.
  2. The main thread produces these inputs ( or tasks )

In this case, there has to be a medium which must act as a glue between input main thread and worker threads running in background. The medium should be designed in such a way that the input which is fed first should get a chance to be processed first, then the next. The input fed into this medium can be consumed by any of the free worker threads. Queue is the best data-structure for this scenario, because of FIFO (First-in, First-out) property, this property ensures ordering, if we make this data-structure somehow shareable between multiple threads we can achieve what we wanted to. 


This is how a job queue looks like, the main thread places input on to the queue and continues its operation without waiting for the output. Since the workers wait for input, any one of the worker pops the input from the queue and and processes it. Any of the thread which is free can pop the input. Since the thread which is doing the work will have no time to pop the next input from the queue, only the free thread will get a chance to pop from the queue and becomes busy. If no thread is free, the inputs will still be there in the queue following the right order.

What are the advantages?

Imagine a scenario where main thread did all the work, it would work much slower because the main thread cannot process next job until it finishes the current one. A single thread cannot run on multiple cores at once, that is the main reason it runs sequentially. But on a machine with multiple cores, this is not a good approach, because all other cores sit ideally without doing any work. Since we used thread pool here, the free threads in our pool runs on separate cores and we are exploiting parallelism, if we have a 8 core machine, we can now process 8 inputs at once in the same time we took to process a single input. The main thread can now concentrate on fetching new inputs rather than processing them, because putting an input on the queue doesn’t take much time.

Concurrent queue

Every advantage comes with a disadvantage as well. In the case we discussed above, we cannot use a normal queue, because it is not thread-safe. Since we have multiple threads operating on the same queue, synchronization problems will be obviously there, which we need to take care of. A concurrent-queue is a special queue designed to handle such issues. It allows multiple threads to operate on the same queue without any synchronization problems. In this blog we will be designing one such concurrent queue that can be used by multiple goroutines. The queue we design will have following properties:

  1. Can hold any type of data, i.e the queue is a generic implementation (interface type in golang)
  2. Expandable in size, it grows and shrinks as required. (i.e no pre-allocated memory)
  3. Thread-safe by nature, no synchronization issues.

Let’s create a normal queue first

Synchronization issues has nothing to do with how data is stored, we only have to control how data is accessed. This allows us to implement a normal queue first and use it as a storage backend for the concurrent queue.

Here is how we are going to design the queue:

  1. We will be using a doubly-linked list.
  2. The queue will have push and pop functions.
  3. We insert data at the head of the list and pop from the tail. This is how pushand pop will be implemented.
  4. We make the queue to grow only until maxSize. So we keep track of current size every time an enqueue or dequeue occurs and take decisions based on the current size. Here is an implementation of the queue using doubly linked list.
//Node storage of queue data
type Node struct {
    data interface{}
    prev *Node
    next *Node
}

//QueueBackend Backend storage of the queue, a double linked list
type QueueBackend struct {
    //Pointers to root and end
    head *Node
    tail *Node

    //keep track of current size
    size    uint32
    maxSize uint32
}

func (queue *QueueBackend) createNode(data interface{}) *Node {
    node := Node{}
    node.data = data
    node.next = nil
    node.prev = nil

    return &node
}

func (queue *QueueBackend) put(data interface{}) error {
    if queue.size >= queue.maxSize {
        err := errors.New("Queue full")
        return err
    }

    if queue.size == 0 {
        //new root node
        node := queue.createNode(data)
        queue.head = node
        queue.tail = node

        queue.size++

        return nil
    }

    //queue non-empty append to head
    currentHead := queue.head
    newHead := queue.createNode(data)
    newHead.next = currentHead
    currentHead.prev = newHead

    queue.head = currentHead
    queue.size++
    return nil
}

func (queue *QueueBackend) pop() (interface{}, error) {
    if queue.size == 0 {
        err := errors.New("Queue empty")
        return nil, err
    }

    currentEnd := queue.tail
    newEnd := currentEnd.prev

    if newEnd != nil {
        newEnd.next = nil
    }

    queue.size--
    if queue.size == 0 {
        queue.head = nil
        queue.tail = nil
    }

    return currentEnd.data, nil
}

func (queue *QueueBackend) isEmpty() bool {
    return queue.size == 0
}

func (queue *QueueBackend) isFull() bool {
    return queue.size >= queue.maxSize
}

We have created a QueueBackend, this is just a storage layer on top of which synchronization primitives are implemented. The code above is simple and anyone with basic data structures knowledge can understand it. Next we are going to implement synchronization primitives.

Adding concurrency support

We are going to create a ConcurrentQueue type which uses QueueBackend type internally. The ConcurrentQueue type is defined as shown below:

//ConcurrentQueue concurrent queue
type ConcurrentQueue struct {
    //mutex lock
    lock *sync.Mutex

    //empty and full locks
    notEmpty *sync.Cond
    notFull  *sync.Cond

    //queue storage backend
    backend *QueueBackend
}

Here, the backend is a pointer variable which holds the address of the queue storage layer we created before. Now let us see what other members actually mean.

  1. lock : This is a member pointer of type sync.Mutex, a mutex is a synchronization primitive, mutex simply means a lock, lock can be either locked or unclocked. If the mutex is locked, other threads will wait until the lock is unlocked. This ensures only one thread can access the queue at a time. Here is how a thread will operate on the queue: a. The thread first locks the mutex, this way it gains exclusive control over the queue by avoiding other threads from accessing the queue at the same time. b. It then calls euqueue or dequeue over the queue and once that is done, it unlocks the mutex, allowing other waiting threads to get access to the queue.

This is more than enough to provide a basic synchronization support for the queue, but we actually need more. Once a thread gains control over the queue, it pops the data if present, if no data is present, it has to wait. One way to implement this wait operation is to use a while loop and simply keep checking the queue for data every time. This is not so cool because it runs in an infinite loop taking all the CPU cycles (the core usage goes to 100%). We need some mechanism that allows us to suspend the thread until data becomes available, we can do so by using conditional locks. We have defined two such locks, notEmpty and notFullnotEmpty means that, the queue is not empty and has some data, so anyone can pop. notFull means that, the queue is not full and the main thread can still push the data into it. If queue is not in notEmpty state, the consumer (worker threads) will wait. Similarly, if notFull condition is not met, the producer (main thread) will wait for queue to become empty. Now let us see how euqueue is implemented:

func (c *ConcurrentQueue) enqueue(data interface{}) error {
    c.lock.Lock()

    for c.backend.isFull() {
        //wait for empty
        c.notFull.Wait()
    }

    //insert
    err := c.backend.put(data)

    //signal notEmpty
    c.notEmpty.Signal()

    c.lock.Unlock()

    return err
}

The equeue function checks if anyone has already locked the queue, if not, it locks the queue to gain control, next it checks if the queue is full, if yes, it waits until it is not full. (in the line c.notFull.Wait()), After waiting, a new input is pushed into the queue, calling push over the queue backend. Now the queue is not empty, because there is a new input, c.notEmpty.Signal() is called, this notifies the threads waiting over notEmpty that a new entry has been inserted and the queue is no more empty, finally the lock is released, allowing other threads to operate on the queue.

Now, let us see how dequeue is implemented at the consumer’s side:


func (c *ConcurrentQueue) dequeue() (interface{}, error) {
    c.lock.Lock()

    for c.backend.isEmpty() {
        c.notEmpty.Wait()
    }

    data, err := c.backend.pop()

    //signal notFull
    c.notFull.Signal()

    c.lock.Unlock()

    return data, err
}

The consumer tries to access the queue for reading, if it is locked, it waits for the thread which is already accessing the queue to release the lock. Next, it checks if the queue is empty, if yes, it waits over notEmpty mutex. (in the line c.notEmpty.Wait()). Once the queue is not empty (remember c.notEmpty.Signal() in enqueue), it pops the entry and notifies the producer to push new input, since there is a space in the queue now. This is done by calling c.notFull.Signal()).

That is it, we have implemented basic enqueue and dequeue methods with synchronization support. We also implement a miscellaneous method called getSize which simply returns the size, even this is done with lock because it might return the wrong value if another thread push/pops the data at the same time.

func (c *ConcurrentQueue) getSize() uint32 {
    c.lock.Lock()
    size := c.backend.size
    c.lock.Unlock()

    return size
}

To wrap this entire thing, we create a method called NewConcurrentQueue which simply creates an instance of ConcurrentQueue type and initializes it with proper values. Here we also initialize all the necessary mutexes. Look at it’s code below:

//NewConcurrentQueue Creates a new queue
func NewConcurrentQueue(maxSize uint32) *ConcurrentQueue {
    queue := ConcurrentQueue{}

    //init mutexes
    queue.lock = &sync.Mutex{}
    queue.notFull = sync.NewCond(queue.lock)
    queue.notEmpty = sync.NewCond(queue.lock)

    //init backend
    queue.backend = &QueueBackend{}
    queue.backend.size = 0
    queue.backend.head = nil
    queue.backend.tail = nil

    queue.backend.maxSize = maxSize
    return &queue
}

from Tumblr https://generouspiratequeen.tumblr.com/post/636830987906662400

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s