Condition variables
Condition variables differ from the previous concurrency primitives in the sense that, for Go, they are not an essential concurrency tool because, in most cases, a condition variable can be replaced with a channel. However, especially for shared memory systems, condition variables are important tools for synchronization. For example, the Java language builds one of its core synchronization features using condition variables.
A well-known problem of concurrent computing is the producer-consumer problem. There are one or more producer threads that produce a value. These values are consumed by one or more consumer threads. Since all producers and consumers are running concurrently, sometimes there are not enough values produced to satisfy all the consumers, and sometimes there are not enough consumers to consume values produced by the producers. There is usually a finite queue of values into which producers put, and from which consumers retrieve. There is already an elegant solution to this problem: use a channel. All producers write to the channel, and all consumers read from it, and the problem is solved. But in a shared memory system, a condition variable is usually employed for such a situation. A condition variable is a synchronization mechanism where multiple goroutines wait for a condition to occur, and another goroutine announces the occurrence of the condition to the waiting goroutines.
A condition variable supports three operations, as follows:
Wait
: Blocks the current goroutine until a condition happensSignal
: Wakes up one of the waiting goroutines when the condition happensBroadcast
: Wakes up all of the waiting goroutines when the condition happens
Unlike the other concurrency primitives, a condition variable needs a mutex. The mutex is used to lock the critical sections in the goroutines that modify the condition. It does not matter what the condition is; what matters is that the condition can only be modified in a critical section and that critical section must be entered by locking the mutex used to construct the condition variable, as shown in the following code:
lock := sync.Mutex{} cond := sync.NewCond(&lock)
Now let’s implement the producers-consumers problem using this condition variable. Our producers will produce integers and place them in a circular queue. The queue has a finite capacity, so the producer must wait until a consumer consumes from the queue if the queue is full. That means we need a condition variable that will cause the producers to wait until a consumer consumes a value. When the consumer consumes a value, the queue will have more space, and the producer can use it, but then the consumer who consumed that value has to signal the waiting producers that there is space available. Similarly, if consumers consume all the values before producers can produce new ones, the consumers have to wait until new values are available. So, we need another condition variable that will cause the consumers to wait until a producer produces a value. When a producer produces a new value, it has to signal to the waiting consumers that a new value is available.
Let’s start with a simple circular queue implementation:
type Queue struct { elements []int front, rear int len int } // NewQueue initializes an empty circular queue //with the given capacity func NewQueue(capacity int) *Queue { return &Queue{ elements: make([]int, capacity), front: 0, // Read from elements[front] rear: -1, // Write to elements[rear] len: 0, } } // Enqueue adds a value to the queue. Returns false // if queue is full func (q *Queue) Enqueue(value int) bool { if q.len == len(q.elements) { return false } // Advance the write pointer, go around in a circle q.rear = (q.rear + 1) % len(q.elements) // Write the value q.elements[q.rear] = value q.len++ return true } // Dequeue removes a value from the queue. Returns 0,false // if queue is empty func (q *Queue) Dequeue() (int, bool) { if q.len == 0 { return 0, false } // Read the value at the read pointer data := q.elements[q.front] // Advance the read pointer, go around in a circle q.front = (q.front + 1) % len(q.elements) q.len-- return data, true }
We need a lock, two condition variables, and a circular queue:
func main() { lock := sync.Mutex{} fullCond := sync.NewCond(&lock) emptyCond := sync.NewCond(&lock) queue := NewQueue(10)
Here is the producer
function. It runs in an infinite loop, producing random integer values:
producer := func() { for { // Produce value value := rand.Int() lock.Lock() for !queue.Enqueue(value) { fmt.Println("Queue is full") fullCond.Wait() } lock.Unlock() emptyCond.Signal() time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) } }
The producer generates a random integer, enters into its critical section, and attempts to enqueue the value. If it is successful, it unlocks the mutex and signals one of the consumers, letting it know that a value has been generated. If there are no consumers waiting on the emptyCond
variable at that point, the signal is lost. If, however, the queue is full, then the producer starts waiting on the fullCond
variable. Note that Wait
is called in the critical section, with the mutex locked. When called, Wait
atomically unlocks the mutex and suspends the execution of the goroutine. While waiting, the producer is no longer in its critical section, allowing the consumers to go into their own critical sections. When a consumer consumes a value, it will signal fullCond
, which will wake one of the waiting producers up. When the producer wakes up, it will lock the mutex again. Waking up and locking the mutex is not atomic, which means, when Wait
returns, the condition that woke up the goroutine may no longer hold, so Wait
must be called inside a loop to recheck the condition. When the condition is rechecked, the goroutine will be in its critical section again, so no race conditions are possible.
The consumer is as follows:
consumer := func() { for { lock.Lock() var v int for { var ok bool if v, ok = queue.Dequeue(); !ok { fmt.Println("Queue is empty") emptyCond.Wait() continue } break } lock.Unlock() fullCond.Signal() time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) fmt.Println(v) } }
Note the symmetry between the producer and the consumer. The consumer enters into its critical section and attempts to dequeue a value inside a for
loop. If the queue has a value in it, it is read, the for
loop terminates, and the mutex is unlocked. Then the goroutine notifies any potential producers that a value is read from the queue, so it is likely that the queue is not full. By the time the consumer exists in its critical section and signals the producer, it is possible that another producer produced values to fill up the queue. That’s why the producer has to check the condition again when it wakes up. The same logic applies to the consumer: if the consumer cannot read a value, it starts waiting, and when it wakes up, it has to check whether the queue has elements in it to be consumed.
The rest of the program is as follows:
for i := 0; i < 10; i++ { go producer() } for i := 0; i < 10; i++ { go consumer() } select {} // Wait indefinitely
You can run this program with different numbers of producers and consumers, and see how it behaves. When there are more producers than consumers, you should see more messages on the queue being full, and when there are more consumers than producers, you should see more messages on the queue being empty.