Lately I’ve been using Go’s concurrent map. And sometimes I needed to count the items I’ve stored in the map. I have ranged over the items and incremented a counter, and it was done:
package main import ( "sync" "log" ) func main() { m := sync.Map{} m.Store("k1", "v1") m.Store("k2", "v2") count := 0 m.Range(func(key, value interface{}) bool{ count++ return true }) log.Println(count) }
But I didn’t want to range over the items each time I wanted to count them. So I thought of incrementing the counter every time an element was added, and to decrementing it when one was removed. The following is just for practice, I didn’t perform any advanced tests.
Being a concurrent map, it obviously can be used in a concurrent context, so I had to keep that in mind. No matter where a value is added to the map from (or removed), the counter needs to be correctly updated. All counter update requests must be taken into account and applied only after the previous one had been applied, so I don’t have any consistency issues. It sounded like a queue between multiple routines: a channel.
So I had a map, a counter, and a channel.
m := sync.Map{} count := 0 countchan := make(chan int)
I’m going to send all counter update requests on that channel, and for this I’m going to use two functions:
// Send 1 to increment var inc = func() { countchan <- 1 } // And -1 to decrement var dec = func() { countchan <- -1 }
On a routine, I’m waiting for updates and apply them:
go func() { for { // Get value from queue update := <-countchan // Update counter count += update if update > 0 { log.Println("inc") } if update < 0 { log.Println("dec") } } }()
On another routine, I’m continuously printing the number of items in the map:
go func() { for { log.Println("count: ", count) time.Sleep(time.Second) } }()
Now let’s add some items to the map from multiple routines, then randomly delete them:
// Add go func() { for i := 1; i <= 500; i++ { go func(i int) { log.Println("add: ", i) m.Store(i, i) inc() }(i) } }() // delete go func() { for { m.Range(func(key, value interface{}) bool{ if random(1, 5) == 1 { go func(key interface{}){ m.Delete(key) dec() }(key) } return true }) time.Sleep(time.Second * 1) } }()
You’re going to see items being added and removed, while printing the map’s length.
And we can add/delete items manually:
// http://localhost:8800/add?key=123 http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") log.Println("add: ", key) _, ok := m.Load(key) if ok == false { m.Store(key, key) inc() log.Println("added: ", key) } else { log.Println("already exists: ", key) } w.Write([]byte(key)) }) // http://localhost:8800/delete?key=123 http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") log.Println("delete: ", key) _, ok := m.Load(key) if ok == true { m.Delete(key) dec() log.Println("deleted: ", key) } else { log.Println("not found: ", key) } w.Write([]byte(key)) }) http.ListenAndServe(":8800", nil)
It was fun doing this. Channels are awesome. Here’s the whole code:
package main import ( "sync" "time" "log" "math/rand" "net/http" ) func random(min, max int) int { rand.Seed(time.Now().UTC().UnixNano()) return rand.Intn(max - min) + min } func main() { // Map to store items m := sync.Map{} // Map items counter count := 0 // Integer channel to send counter update requests (increment, decrement) // Send 1 for inc, -1 for dec countchan := make(chan int) // Send 1 to increment var inc = func() { countchan <- 1 } // And -1 to decrement var dec = func() { countchan <- -1 } // Wait for counter update requests go func() { for { // Get value from queue update := <-countchan // Update counter count += update if update > 0 { log.Println("inc") } if update < 0 { log.Println("dec") } } }() // Continuously print the counter go func() { for { log.Println("count: ", count) time.Sleep(time.Second) } }() // Add some data into the map go func() { for i := 1; i <= 500; i++ { // Let's abuse the counter by sending update requests concurrently go func(i int) { log.Println("add: ", i) m.Store(i, i) inc() }(i) } }() // Randomly delete data from the map to generate dec update requests go func() { for { m.Range(func(key, value interface{}) bool{ if random(1, 5) == 1 { go func(key interface{}){ m.Delete(key) dec() }(key) } return true }) time.Sleep(time.Second * 1) } }() // Let's also manually add and delete data http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") log.Println("add: ", key) _, ok := m.Load(key) if ok == false { m.Store(key, key) inc() log.Println("added: ", key) } else { log.Println("already exists: ", key) } w.Write([]byte(key)) }) http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") log.Println("delete: ", key) _, ok := m.Load(key) if ok == true { m.Delete(key) dec() log.Println("deleted: ", key) } else { log.Println("not found: ", key) } w.Write([]byte(key)) }) http.ListenAndServe(":8800", nil) }