Counting number of items in a concurrent map

Lately I’ve been using Go’s concurrent map. And sometimes I needed to count the items I’ve stored in the map. I have ranged over the items and incremented a counter, and it was done:

package main

import (
       "sync"
       "log"
)

func main() {
       m := sync.Map{}
       
       m.Store("k1", "v1")
       m.Store("k2", "v2")
       
       count := 0
       m.Range(func(key, value interface{}) bool{
              count++
              return true
       })

       log.Println(count)
}

But I didn’t want to range over the items each time I wanted to count them. So I thought of incrementing the counter every time an element was added, and to decrementing it when one was removed. The following is just for practice, I didn’t perform any advanced tests.

Being a concurrent map, it obviously can be used in a concurrent context, so I had to keep that in mind. No matter where a value is added to the map from (or removed), the counter needs to be correctly updated. All counter update requests must be taken into account and applied only after the previous one had been applied, so I don’t have any consistency issues. It sounded like a queue between multiple routines: a channel.

So I had a map, a counter, and a channel.

m := sync.Map{}

count := 0

countchan := make(chan int)

I’m going to send all counter update requests on that channel, and for this I’m going to use two functions:

// Send 1 to increment
var inc = func() {
       countchan <- 1
}

// And -1 to decrement
var dec = func() {
       countchan <- -1
}

On a routine, I’m waiting for updates and apply them:

go func() {
       for {
              // Get value from queue
              update := <-countchan

              // Update counter
              count += update

              if update > 0 {
                     log.Println("inc")
              }
              if update < 0 {
                     log.Println("dec")
              }
       }
}()

On another routine, I’m continuously printing the number of items in the map:

go func() {
       for {
              log.Println("count: ", count)
              time.Sleep(time.Second)
       }
}()

Now let’s add some items to the map from multiple routines, then randomly delete them:

// Add
go func() {
       for i := 1; i <= 500; i++ {              
              go func(i int) {
                     log.Println("add: ", i)
                     m.Store(i, i)
                     inc()
              }(i)
       }
}()

// delete
go func() {
       for {
              m.Range(func(key, value interface{}) bool{
                     if random(1, 5) == 1 {
                            go func(key interface{}){
                                   m.Delete(key)
                                   dec()
                            }(key)
                     }
                     return true
              })
              time.Sleep(time.Second * 1)
       }
}()

You’re going to see items being added and removed, while printing the map’s length.

And we can add/delete items manually:

// http://localhost:8800/add?key=123
http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
       key := r.URL.Query().Get("key")
       log.Println("add: ", key)
       _, ok := m.Load(key)
       
       if ok == false {
              m.Store(key, key)
              inc()
              log.Println("added: ", key)
       } else {
              log.Println("already exists: ", key)
       }

       w.Write([]byte(key))
})

// http://localhost:8800/delete?key=123
http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) {
       key := r.URL.Query().Get("key")
       log.Println("delete: ", key)
       _, ok := m.Load(key)
       
       if ok == true {
              m.Delete(key)
              dec()
              log.Println("deleted: ", key)
       } else {
              log.Println("not found: ", key)
       }

       w.Write([]byte(key))
})

http.ListenAndServe(":8800", nil)

It was fun doing this. Channels are awesome. Here’s the whole code:

package main

import (
       "sync"
       "time"
       "log"
       "math/rand"
       "net/http"
)

func random(min, max int) int {
       rand.Seed(time.Now().UTC().UnixNano())
       return rand.Intn(max - min) + min
}

func main() {
       // Map to store items
       m := sync.Map{}
       
       // Map items counter
       count := 0
       
       // Integer channel to send counter update requests (increment, decrement)
       // Send 1 for inc, -1 for dec
       countchan := make(chan int)

       // Send 1 to increment
       var inc = func() {
              countchan <- 1
       }

       // And -1 to decrement
       var dec = func() {
              countchan <- -1
       }

       // Wait for counter update requests
       go func() {
              for {
                     // Get value from queue
                     update := <-countchan

                     // Update counter
                     count += update

                     if update > 0 {
                            log.Println("inc")
                     }
                     if update < 0 {
                            log.Println("dec")
                     }
              }
       }()

       // Continuously print the counter
       go func() {
              for {
                     log.Println("count: ", count)
                     time.Sleep(time.Second)
              }
       }()

       // Add some data into the map
       go func() {
              for i := 1; i <= 500; i++ {
                     // Let's abuse the counter by sending update requests concurrently
                     go func(i int) {
                            log.Println("add: ", i)
                            m.Store(i, i)
                            inc()
                     }(i)
              }
       }()
       
       // Randomly delete data from the map to generate dec update requests
       go func() {
              for {
                     m.Range(func(key, value interface{}) bool{
                            if random(1, 5) == 1 {
                                   go func(key interface{}){
                                          m.Delete(key)
                                          dec()
                                   }(key)
                            }
                            return true
                     })
                     time.Sleep(time.Second * 1)
              }
       }()

       // Let's also manually add and delete data
       http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
              key := r.URL.Query().Get("key")
              log.Println("add: ", key)
              _, ok := m.Load(key)
              
              if ok == false {
                     m.Store(key, key)
                     inc()
                     log.Println("added: ", key)
              } else {
                     log.Println("already exists: ", key)
              }

              w.Write([]byte(key))
       })
       
       http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) {
              key := r.URL.Query().Get("key")
              log.Println("delete: ", key)
              _, ok := m.Load(key)
              
              if ok == true {
                     m.Delete(key)
                     dec()
                     log.Println("deleted: ", key)
              } else {
                     log.Println("not found: ", key)
              }

              w.Write([]byte(key))
       })

       http.ListenAndServe(":8800", nil)
}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.