These days I wanted to speed up some data retrieval with Go. Its concurrency model is elegant and simple, it has everything you need built-in.
Let’s say there are some articles that need to be fetched from an API. I have the IDSs of all the articles, and I can fetch them one by one. One request can take even a second, so I added a 1 second sleep to simulate this.
type Article struct { ID uint Title string } func GetArticle(ID uint) Article { time.Sleep(time.Second * 1) return Article{ID, fmt.Sprintf("Title %d", ID)} }
The classic way of doing this is making a request for each article, wait for it to finish, store the data.
var articles []Article var id uint for id = 1; id <= 10; id++ { log.Println(fmt.Sprintf("Fetching article %d...", id)) article := GetArticle(id) articles = append(articles, article) } log.Println(articles)
With a 1 second response time it takes 10 seconds. Now imagine 100 articles or more. It’s gonna take a while. So I’m gonna make concurrent requests with go routines. I’ve used two ways to do this, depending on the needs.
First, I’ve used a channel to wait on for each article fetched concurrently. When I got an article on the channel, I stored it somewhere. Then I’ve launched all the requests concurrently, and after an article was fetched, I pushed it on the channel.
Waiting for an article in a routine:
ch := make(chan Article) go func() { for { article := <-ch log.Println(article) } }()
Launching the concurrent requests:
var id uint for id = 1; id <= 10; id++ { go func(id uint) { log.Println(fmt.Sprintf("Fetching article %d...", id)) article := GetArticle(id) ch <- article }(id) }
And you need to keep the main thread blocked:
for{}
If using a channel like above, you will be notified after each article. If you want to know when they’re all finished, there is a second way: the concurrent map approach (which I’ve found of recently), released in the 1.9 version of Go, along with a wait group.
Declare the concurrent map and the wait group:
wg := sync.WaitGroup{} var articlesMap sync.Map
Launch requests concurrently while adding them to the wait group, acknowledge the group when each request is done, and tell it to wait for all requests to be finished.
var id uint for id = 1; id <= 10; id++ { wg.Add(1) // add to group go func(id uint) { article := GetArticle(id) articlesMap.Store(id, article) // store article in map wg.Done() // acknowledge job is done }(id) } wg.Wait() // wait for all
Now iterate the map and store all the articles as needed. For each map’s item I set as key the article ID, so if needed you can fetch a specific article from the map by its ID.
var articles []Article articlesMap.Range(func(key, value interface{}) bool { article := value.(Article) articles = append(articles, article) return true }) log.Println(articles) for{}
If you have many requests, they could hit the API server really bad, so maybe you wanna take a break from time to time. You can sleep a bit after a number of articles.
for id = 1; id <= 10; id++ { if id % 3 == 0 { log.Println("Taking a break...") time.Sleep(time.Second * 1) } wg.Add(1) go func(id uint) { article := GetArticle(id) articlesMap.Store(id, article) wg.Done() }(id) }
Here’s the full example:
package main import ( "fmt" "log" "sync" "time" ) type Article struct { ID uint Title string } func GetArticle(ID uint) Article { time.Sleep(time.Second * 1) return Article{ID, fmt.Sprintf("Title %d", ID)} } func SyncFetch() { var articles []Article var id uint for id = 1; id <= 10; id++ { log.Println(fmt.Sprintf("Fetching article %d...", id)) article := GetArticle(id) articles = append(articles, article) } log.Println(articles) } func AsyncFetchWithChannel() { // Async with channel ch := make(chan Article) go func() { for { article := <-ch log.Println(article) } }() var id uint for id = 1; id <= 10; id++ { go func(id uint) { log.Println(fmt.Sprintf("Fetching article %d...", id)) article := GetArticle(id) ch <- article }(id) } } func AsyncFetchWithMap() { // Async with map var articlesMap sync.Map wg := sync.WaitGroup{} log.Println("Fetching articles...") var id uint for id = 1; id <= 10; id++ { wg.Add(1) go func(id uint) { article := GetArticle(id) articlesMap.Store(id, article) wg.Done() }(id) } wg.Wait() var articles []Article articlesMap.Range(func(key, value interface{}) bool { article := value.(Article) articles = append(articles, article) return true }) log.Println(articles) } func AsyncFetchWithMapAndBreaks() { wg := sync.WaitGroup{} var articlesMap sync.Map log.Println("Fetching articles...") var id uint for id = 1; id <= 10; id++ { if id%3 == 0 { log.Println("Taking a break...") time.Sleep(time.Second * 1) } wg.Add(1) go func(id uint) { article := GetArticle(id) articlesMap.Store(id, article) wg.Done() }(id) } wg.Wait() var articles []Article articlesMap.Range(func(key, value interface{}) bool { article := value.(Article) articles = append(articles, article) return true }) log.Println(articles) } func main() { log.Println("SyncFetch") SyncFetch() time.Sleep(time.Second * 1) log.Println("\n\nAsyncFetchWithChannel") AsyncFetchWithChannel() time.Sleep(time.Second * 1) log.Println("\n\nAsyncFetchWithMap") AsyncFetchWithMap() time.Sleep(time.Second * 1) log.Println("\n\nAsyncFetchWithMapAndBreaks") AsyncFetchWithMapAndBreaks() for { } }
Update: I just saw another nice way of using channels for this situation “Concurrent HTTP downloads using Go“.