Lately I’ve been using Go’s concurrent map. And sometimes I needed to count the items I’ve stored in the map. I have ranged over the items and incremented a counter, and it was done:
package main
import (
"sync"
"log"
)
func main() {
m := sync.Map{}
m.Store("k1", "v1")
m.Store("k2", "v2")
count := 0
m.Range(func(key, value interface{}) bool{
count++
return true
})
log.Println(count)
}
But I didn’t want to range over the items each time I wanted to count them. So I thought of incrementing the counter every time an element was added, and to decrementing it when one was removed. The following is just for practice, I didn’t perform any advanced tests.
Being a concurrent map, it obviously can be used in a concurrent context, so I had to keep that in mind. No matter where a value is added to the map from (or removed), the counter needs to be correctly updated. All counter update requests must be taken into account and applied only after the previous one had been applied, so I don’t have any consistency issues. It sounded like a queue between multiple routines: a channel.
So I had a map, a counter, and a channel.
m := sync.Map{}
count := 0
countchan := make(chan int)
I’m going to send all counter update requests on that channel, and for this I’m going to use two functions:
// Send 1 to increment
var inc = func() {
countchan <- 1
}
// And -1 to decrement
var dec = func() {
countchan <- -1
}
On a routine, I’m waiting for updates and apply them:
go func() {
for {
// Get value from queue
update := <-countchan
// Update counter
count += update
if update > 0 {
log.Println("inc")
}
if update < 0 {
log.Println("dec")
}
}
}()
On another routine, I’m continuously printing the number of items in the map:
go func() {
for {
log.Println("count: ", count)
time.Sleep(time.Second)
}
}()
Now let’s add some items to the map from multiple routines, then randomly delete them:
// Add
go func() {
for i := 1; i <= 500; i++ {
go func(i int) {
log.Println("add: ", i)
m.Store(i, i)
inc()
}(i)
}
}()
// delete
go func() {
for {
m.Range(func(key, value interface{}) bool{
if random(1, 5) == 1 {
go func(key interface{}){
m.Delete(key)
dec()
}(key)
}
return true
})
time.Sleep(time.Second * 1)
}
}()
You’re going to see items being added and removed, while printing the map’s length.
And we can add/delete items manually:
// http://localhost:8800/add?key=123
http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
log.Println("add: ", key)
_, ok := m.Load(key)
if ok == false {
m.Store(key, key)
inc()
log.Println("added: ", key)
} else {
log.Println("already exists: ", key)
}
w.Write([]byte(key))
})
// http://localhost:8800/delete?key=123
http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
log.Println("delete: ", key)
_, ok := m.Load(key)
if ok == true {
m.Delete(key)
dec()
log.Println("deleted: ", key)
} else {
log.Println("not found: ", key)
}
w.Write([]byte(key))
})
http.ListenAndServe(":8800", nil)
It was fun doing this. Channels are awesome. Here’s the whole code:
package main
import (
"sync"
"time"
"log"
"math/rand"
"net/http"
)
func random(min, max int) int {
rand.Seed(time.Now().UTC().UnixNano())
return rand.Intn(max - min) + min
}
func main() {
// Map to store items
m := sync.Map{}
// Map items counter
count := 0
// Integer channel to send counter update requests (increment, decrement)
// Send 1 for inc, -1 for dec
countchan := make(chan int)
// Send 1 to increment
var inc = func() {
countchan <- 1
}
// And -1 to decrement
var dec = func() {
countchan <- -1
}
// Wait for counter update requests
go func() {
for {
// Get value from queue
update := <-countchan
// Update counter
count += update
if update > 0 {
log.Println("inc")
}
if update < 0 {
log.Println("dec")
}
}
}()
// Continuously print the counter
go func() {
for {
log.Println("count: ", count)
time.Sleep(time.Second)
}
}()
// Add some data into the map
go func() {
for i := 1; i <= 500; i++ {
// Let's abuse the counter by sending update requests concurrently
go func(i int) {
log.Println("add: ", i)
m.Store(i, i)
inc()
}(i)
}
}()
// Randomly delete data from the map to generate dec update requests
go func() {
for {
m.Range(func(key, value interface{}) bool{
if random(1, 5) == 1 {
go func(key interface{}){
m.Delete(key)
dec()
}(key)
}
return true
})
time.Sleep(time.Second * 1)
}
}()
// Let's also manually add and delete data
http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
log.Println("add: ", key)
_, ok := m.Load(key)
if ok == false {
m.Store(key, key)
inc()
log.Println("added: ", key)
} else {
log.Println("already exists: ", key)
}
w.Write([]byte(key))
})
http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
log.Println("delete: ", key)
_, ok := m.Load(key)
if ok == true {
m.Delete(key)
dec()
log.Println("deleted: ", key)
} else {
log.Println("not found: ", key)
}
w.Write([]byte(key))
})
http.ListenAndServe(":8800", nil)
}