Go concurrency is elegant and simple

These days I wanted to speed up some data retrieval with Go. Its concurrency model is elegant and simple, it has everything you need built-in.

Let’s say there are some articles that need to be fetched from an API. I have the IDSs of all the articles, and I can fetch them one by one. One request can take even a second, so I added a 1 second sleep to simulate this.

type Article struct {
       ID    uint
       Title string
}

func GetArticle(ID uint) Article {
       time.Sleep(time.Second * 1)
       return Article{ID, fmt.Sprintf("Title %d", ID)}
}

The classic way of doing this is making a request for each article, wait for it to finish, store the data.

var articles []Article
var id uint

for id = 1; id <= 10; id++ {
       log.Println(fmt.Sprintf("Fetching article %d...", id))
       article := GetArticle(id)
       articles = append(articles, article)
}

log.Println(articles)

With a 1 second response time it takes 10 seconds. Now imagine 100 articles or more. It’s gonna take a while. So I’m gonna make concurrent requests with go routines. I’ve used two ways to do this, depending on the needs.

First, I’ve used a channel to wait on for each article fetched concurrently. When I got an article on the channel, I stored it somewhere. Then I’ve launched all the requests concurrently, and after an article was fetched, I pushed it on the channel.

Waiting for an article in a routine:

ch := make(chan Article)

go func() {
       for {
              article := <-ch
              log.Println(article)
       }

}()

Launching the concurrent requests:

var id uint
for id = 1; id <= 10; id++ {
       go func(id uint) {
              log.Println(fmt.Sprintf("Fetching article %d...", id))
              article := GetArticle(id)
              ch <- article
       }(id)
}

And you need to keep the main thread blocked:

for{}

If using a channel like above, you will be notified after each article. If you want to know when they’re all finished, there is a second way: the concurrent map approach (which I’ve found of recently), released in the 1.9 version of Go, along with a wait group.

Declare the concurrent map and the wait group:

wg := sync.WaitGroup{}
var articlesMap sync.Map

Launch requests concurrently while adding them to the wait group, acknowledge the group when each request is done, and tell it to wait for all requests to be finished.

var id uint
for id = 1; id <= 10; id++ {
       wg.Add(1) // add to group
       go func(id uint) {
              article := GetArticle(id)
              articlesMap.Store(id, article) // store article in map
              wg.Done() // acknowledge job is done
       }(id)
}
wg.Wait() // wait for all

Now iterate the map and store all the articles as needed. For each map’s item I set as key the article ID, so if needed you can fetch a specific article from the map by its ID.

var articles []Article

articlesMap.Range(func(key, value interface{}) bool {
       article := value.(Article)
       articles = append(articles, article)
       return true
})

log.Println(articles)
for{}

If you have many requests, they could hit the API server really bad, so maybe you wanna take a break from time to time. You can sleep a bit after a number of articles.

for id = 1; id <= 10; id++ {
       if id % 3 == 0 {
              log.Println("Taking a break...")
              time.Sleep(time.Second * 1)
       }
       wg.Add(1)
       go func(id uint) {
              article := GetArticle(id)
              articlesMap.Store(id, article)
              wg.Done()
       }(id)
}

Here’s the full example:

package main

import (
       "fmt"
       "log"
       "sync"
       "time"
)

type Article struct {
       ID    uint
       Title string
}

func GetArticle(ID uint) Article {
       time.Sleep(time.Second * 1)
       return Article{ID, fmt.Sprintf("Title %d", ID)}
}

func SyncFetch() {
       var articles []Article
       var id uint

       for id = 1; id <= 10; id++ {
              log.Println(fmt.Sprintf("Fetching article %d...", id))
              article := GetArticle(id)
              articles = append(articles, article)
       }

       log.Println(articles)
}

func AsyncFetchWithChannel() {
       // Async with channel
       ch := make(chan Article)

       go func() {
              for {
                     article := <-ch
                     log.Println(article)
              }

       }()

       var id uint
       for id = 1; id <= 10; id++ {
              go func(id uint) {
                     log.Println(fmt.Sprintf("Fetching article %d...", id))
                     article := GetArticle(id)
                     ch <- article
              }(id)
       }
}

func AsyncFetchWithMap() {
       // Async with map
       var articlesMap sync.Map
       wg := sync.WaitGroup{}

       log.Println("Fetching articles...")

       var id uint
       for id = 1; id <= 10; id++ {
              wg.Add(1)
              go func(id uint) {
                     article := GetArticle(id)
                     articlesMap.Store(id, article)
                     wg.Done()
              }(id)
       }
       wg.Wait()

       var articles []Article

       articlesMap.Range(func(key, value interface{}) bool {
              article := value.(Article)
              articles = append(articles, article)
              return true
       })

       log.Println(articles)
}

func AsyncFetchWithMapAndBreaks() {
       wg := sync.WaitGroup{}
       var articlesMap sync.Map

       log.Println("Fetching articles...")

       var id uint
       for id = 1; id <= 10; id++ {
              if id%3 == 0 {
                     log.Println("Taking a break...")
                     time.Sleep(time.Second * 1)
              }
              wg.Add(1)
              go func(id uint) {
                     article := GetArticle(id)
                     articlesMap.Store(id, article)
                     wg.Done()
              }(id)
       }
       wg.Wait()

       var articles []Article

       articlesMap.Range(func(key, value interface{}) bool {
              article := value.(Article)
              articles = append(articles, article)
              return true
       })

       log.Println(articles)
}

func main() {
       log.Println("SyncFetch")
       SyncFetch()
       time.Sleep(time.Second * 1)

       log.Println("\n\nAsyncFetchWithChannel")
       AsyncFetchWithChannel()
       time.Sleep(time.Second * 1)

       log.Println("\n\nAsyncFetchWithMap")
       AsyncFetchWithMap()
       time.Sleep(time.Second * 1)

       log.Println("\n\nAsyncFetchWithMapAndBreaks")
       AsyncFetchWithMapAndBreaks()
       
       for {
       }
}

Update: I just saw another nice way of using channels for this situation “Concurrent HTTP downloads using Go“.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.