Consider the following case: When creating a user (database insert) with their profile (another insert), other users must be updated (database update) with a new score value. Score is just a float for which a dummy formula will be used. And then an action record is needed (insert), which marks the fact that a user was created.
The tech context is PostgreSQL in Go with pgx as database driver and Echo framework for the HTTP server. The database setup is straight forward using Docker; it also includes a database management interface which will be available at http://localhost:54321. If you clone the sample repository, and start the setup with Docker Compose (docker compose up -d), when the PostgreSQL Docker container is built, a database is created with the schema used in this post.
CREATE TABLE "users" ( "id" serial NOT NULL, "username" CHARACTER VARYING (100) NOT NULL, "score" DECIMAL NOT NULL DEFAULT 0, "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, "updated" TIMESTAMP(0) WITH TIME ZONE ); CREATE TABLE "user_profile" ( "user_id" INTEGER NOT NULL, "firstname" CHARACTER VARYING (100) NOT NULL, "lastname" CHARACTER VARYING (100) NOT NULL ); CREATE TABLE "actions" ( "id" serial NOT NULL, "description" text NOT NULL, "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP );
Data integrity is of interest, so all the queries will be sent on a database transaction. And because there are multiple user update queries, they will be sent all at the same time in a batch of operations.
Handling big data operations inside an HTTP request is not something you’d normally want to do, so in some cases I would choose RabbitMQ to handle the process in background and to assure reliability. But the focus here is on the database operations; this implementation can be transposed in other contexts, too.
So we need an HTTP server with some routes to create and display data, a database connection and some prepared statements. When we want to create a user we POST data, we make some checks (like if the username is already used), then the fun part comes in.
It all starts with a timeout context, because we want to limit the time needed for the entire operation, to prevent crashes in huge load situations: deadlocks when having too many batched operations, DoS attacks, or maybe just a traffic spike and you prefer returning an error after a time instead of having big response times.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*2) defer cancelFunc()
We use a connection pool on which we start a transaction.
var db *pgx.ConnPool tx, err := db.BeginEx(ctx, nil)
Create the main user record and get the auto generated data (user ID and time the record was created). If any error, we rollback the transaction.
type User struct { ID int `json:"id"` Username string `json:"username"` Score float64 `json:"score"` Created time.Time `json:"created"` Updated *time.Time `json:"updated"` Profile struct { FirstName string `json:"firstname"` LastName string `json:"lastname"` } `json:"profile"` } args := []interface{}{ user.Username, user.Score, } row := tx.QueryRow("create_user", args...) err = row.Scan(&user.ID, &user.Created) if err != nil { if e := tx.Rollback(); e != nil { c.Logger().Error(e) } return }
Then, to update all users with the new score, create a batch, iterate all users and calculate the score for each one of them, then add the update query in the batch, send all queries to the database and close the batch.
// New batch b := tx.BeginBatch() usersCount := float64(len(users)) for _, u := range users { u.Score = userScoreFormula(u.Score, usersCount) args = []interface{}{ u.ID, u.Score, time.Now(), } // Add user update score query to batch b.Queue("update_user_score", args, nil, nil) } // Send the queries to db err = b.Send(ctx, nil) if err != nil { if e := tx.Rollback(); e != nil { c.Logger().Error(e) } // It's very important to close the batch operation on error if e := b.Close(); e != nil { c.Logger().Error(e) } return } // Close batch operation err = b.Close() if err != nil { if e := tx.Rollback(); e != nil { c.Logger().Error(e) } return }
The last requirement is to log the action.
args = []interface{}{ fmt.Sprintf("new user created with id %d and username %s", user.ID, user.Username), } _, err = tx.Exec("log_action", args...) if err != nil { if e := tx.Rollback(); e != nil { c.Logger().Error(e) } return }
Finally, commit all the operations.
err = tx.Commit() if err != nil { return err }
It’s very important to handle each possible error: rollback the transaction and close the batch on any error, otherwise the connection will be not released. Also, if you get a panic “should never release when context is in progress”, you could have forgotten to close the batch.
I haven’t tested in production the exact case I’m presenting here, this one derived from an implementation I had to add some features to.
Take a look at the entire source code on GitHub for Golang Postgres bulk insert/update.