Consider the following case: When creating a user (database insert) with their profile (another insert), other users must be updated (database update) with a new score value. Score is just a float for which a dummy formula will be used. And then an action record is needed (insert), which marks the fact that a user was created.
The tech context is PostgreSQL in Go with pgx as database driver and Echo framework for the HTTP server. The database setup is straight forward using Docker; it also includes a database management interface which will be available at http://localhost:54321. If you clone the sample repository, and start the setup with Docker Compose (docker compose up -d), when the PostgreSQL Docker container is built, a database is created with the schema used in this post.
CREATE TABLE "users" ( "id" serial NOT NULL, "username" CHARACTER VARYING (100) NOT NULL, "score" DECIMAL NOT NULL DEFAULT 0, "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, "updated" TIMESTAMP(0) WITH TIME ZONE ); CREATE TABLE "user_profile" ( "user_id" INTEGER NOT NULL, "firstname" CHARACTER VARYING (100) NOT NULL, "lastname" CHARACTER VARYING (100) NOT NULL ); CREATE TABLE "actions" ( "id" serial NOT NULL, "description" text NOT NULL, "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP );
Data integrity is of interest, so all the queries will be sent on a database transaction. And because there are multiple user update queries, they will be sent all at the same time in a batch of operations.
Handling big data operations inside an HTTP request is not something you’d normally want to do, so in some cases I would choose RabbitMQ to handle the process in background and to assure reliability. But the focus here is on the database operations; this implementation can be transposed in other contexts, too.
So we need an HTTP server with some routes to create and display data, a database connection and some prepared statements. When we want to create a user we POST data, we make some checks (like if the username is already used), then the fun part comes in.
It all starts with a timeout context, because we want to limit the time needed for the entire operation, to prevent crashes in huge load situations: deadlocks when having too many batched operations, DoS attacks, or maybe just a traffic spike and you prefer returning an error after a time instead of having big response times.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*2) defer cancelFunc()
We use a connection pool on which we start a transaction.
var db *pgx.ConnPool tx, err := db.BeginEx(ctx, nil)
Create the main user record and get the auto generated data (user ID and time the record was created). If any error, we rollback the transaction.
type User struct {
ID int `json:"id"`
Username string `json:"username"`
Score float64 `json:"score"`
Created time.Time `json:"created"`
Updated *time.Time `json:"updated"`
Profile struct {
FirstName string `json:"firstname"`
LastName string `json:"lastname"`
} `json:"profile"`
}
args := []interface{}{
user.Username,
user.Score,
}
row := tx.QueryRow("create_user", args...)
err = row.Scan(&user.ID, &user.Created)
if err != nil {
if e := tx.Rollback(); e != nil {
c.Logger().Error(e)
}
return
}
Then, to update all users with the new score, create a batch, iterate all users and calculate the score for each one of them, then add the update query in the batch, send all queries to the database and close the batch.
// New batch
b := tx.BeginBatch()
usersCount := float64(len(users))
for _, u := range users {
u.Score = userScoreFormula(u.Score, usersCount)
args = []interface{}{
u.ID,
u.Score,
time.Now(),
}
// Add user update score query to batch
b.Queue("update_user_score", args, nil, nil)
}
// Send the queries to db
err = b.Send(ctx, nil)
if err != nil {
if e := tx.Rollback(); e != nil {
c.Logger().Error(e)
}
// It's very important to close the batch operation on error
if e := b.Close(); e != nil {
c.Logger().Error(e)
}
return
}
// Close batch operation
err = b.Close()
if err != nil {
if e := tx.Rollback(); e != nil {
c.Logger().Error(e)
}
return
}
The last requirement is to log the action.
args = []interface{}{
fmt.Sprintf("new user created with id %d and username %s", user.ID, user.Username),
}
_, err = tx.Exec("log_action", args...)
if err != nil {
if e := tx.Rollback(); e != nil {
c.Logger().Error(e)
}
return
}
Finally, commit all the operations.
err = tx.Commit()
if err != nil {
return err
}
It’s very important to handle each possible error: rollback the transaction and close the batch on any error, otherwise the connection will be not released. Also, if you get a panic “should never release when context is in progress”, you could have forgotten to close the batch.
I haven’t tested in production the exact case I’m presenting here, this one derived from an implementation I had to add some features to.
Take a look at the entire source code on GitHub for Golang Postgres bulk insert/update.