Using channels to control flow

When I first met channels in Go, I thought they were just data containers. When you want to send data between routines, they are the go to. But later I saw they can be used to control an application flow, like waiting until something happens and only after that go on with the normal flow.

Here’s an approach to wait until some tasks have finished. Let me consider some input data that can come from anywhere: HTTP requests, files, sockets etc. That data can come in fast, but processing can take some time. If the application needs to exit (you restart it, stop it) but you want to wait until all the data you have in memory is processed, you can use channels.

First, I have a routine which processes input data read from a buffered channel. I simulate the slow processing with a sleep.

input := make(chan string, 20)

go func () {
	for {
		s := <-input
		time.Sleep(time.Second)
		fmt.Printf("%+v\n", s)
	}
}()

I need some data pushed into that channel, just some timestamps.

i := 0
go func() {
	for {
		input <- time.Now().String()
		time.Sleep(time.Millisecond * 50)

		i++
		if i == 20 {
			break
		}
	}
}()

The input is being pushed pretty fast into the channel, but the processing will be slow, one message per second. While this is happening, if I try to stop the app I want it not to exit, but when everything is done, to be able to close it. When closing an app, the OS sends a signal to it, which I can intercept on a channel, using the standard os/signal package. The channel is of os.Signal type, and I tell Go to send all interrupt signals (os.Interrupt) on it.

sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt)

I’m waiting for the os.Interrupt signal on another routine, where I can control the application exit.

go func() {
	for {
		<-sig
	}
}()

In the above routine I get the OS signal, and I can decide what to do. In this case, I’m gonna allow the app to exit only if there are no messages on the input channel.

For the app to stay alive forever, I’m using another channel (called “exit”) on the last line of the app, which just waits until it receives something. When something appears on it, it just passes on and the app exits. In the routine which waits for the os.Interrupt signal I’m gonna close the exit channel if there are no more messages on the input channel.

package main

import (
	"time"
	"fmt"
	"os"
	"os/signal"
	"log"
)

func main() {
	input := make(chan string, 20)

	go func () {
		for {
			s := <-input
			time.Sleep(time.Second)
			fmt.Printf("%+v\n", s)
		}
	}()

	i := 0
	go func() {
		for {
			input <- time.Now().String()
			time.Sleep(time.Millisecond * 50)

			i++
			if i == 20 {
				break
			}
		}
	}()

	sig := make(chan os.Signal)
	signal.Notify(sig, os.Interrupt)

	exit := make(chan bool)

	go func() {
		for {
			<-sig

			if len(input) > 0 {
				log.Printf("Waiting for %d items to finish.", len(input))
			} else {
				close(exit)
			}
		}
	}()

	<-exit
}

Run it and press Ctrl + C. It can be force stopped by sending the kill signal (kill -9).

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.