PostgreSQL batch operations in Go

Consider the following case: When creating a user (database insert) with their profile (another insert), other users must be updated (database update) with a new score value. Score is just a float for which a dummy formula will be used. And then an action record is needed (insert), which marks the fact that a user was created.

The tech context is PostgreSQL in Go with pgx as database driver and Echo framework for the HTTP server. The database setup is straight forward using Docker; it also includes a database management interface which will be available at http://localhost:54321. If you clone the sample repository, and start the setup with Docker Compose (docker compose up -d), when the PostgreSQL Docker container is built, a database is created with the schema used in this post.

CREATE TABLE "users" (
  "id" serial NOT NULL,
  "username" CHARACTER VARYING (100) NOT NULL,
  "score" DECIMAL NOT NULL DEFAULT 0,
  "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
  "updated" TIMESTAMP(0) WITH TIME ZONE
);

CREATE TABLE "user_profile" (
  "user_id" INTEGER NOT NULL,
  "firstname" CHARACTER VARYING (100) NOT NULL,
  "lastname" CHARACTER VARYING (100) NOT NULL
);

CREATE TABLE "actions" (
  "id" serial NOT NULL,
  "description" text NOT NULL,
  "created" TIMESTAMP(0) WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Data integrity is of interest, so all the queries will be sent on a database transaction. And because there are multiple user update queries, they will be sent all at the same time in a batch of operations.

Handling big data operations inside an HTTP request is not something you’d normally want to do, so in some cases I would choose RabbitMQ to handle the process in background and to assure reliability. But the focus here is on the database operations; this implementation can be transposed in other contexts, too.

So we need an HTTP server with some routes to create and display data, a database connection and some prepared statements. When we want to create a user we POST data, we make some checks (like if the username is already used), then the fun part comes in.

It all starts with a timeout context, because we want to limit the time needed for the entire operation, to prevent crashes in huge load situations: deadlocks when having too many batched operations, DoS attacks, or maybe just a traffic spike and you prefer returning an error after a time instead of having big response times.

ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*2)
defer cancelFunc()

We use a connection pool on which we start a transaction.

var db *pgx.ConnPool
tx, err := db.BeginEx(ctx, nil)

Create the main user record and get the auto generated data (user ID and time the record was created). If any error, we rollback the transaction.

type User struct {
   ID       int        `json:"id"`
   Username string     `json:"username"`
   Score    float64    `json:"score"`
   Created  time.Time  `json:"created"`
   Updated  *time.Time `json:"updated"`
   Profile  struct {
      FirstName string `json:"firstname"`
      LastName  string `json:"lastname"`
   } `json:"profile"`
}

args := []interface{}{
   user.Username,
   user.Score,
}
row := tx.QueryRow("create_user", args...)

err = row.Scan(&user.ID, &user.Created)
if err != nil {
   if e := tx.Rollback(); e != nil {
      c.Logger().Error(e)
   }
   return
}

Then, to update all users with the new score, create a batch, iterate all users and calculate the score for each one of them, then add the update query in the batch, send all queries to the database and close the batch.

// New batch
b := tx.BeginBatch()

usersCount := float64(len(users))
for _, u := range users {
   u.Score = userScoreFormula(u.Score, usersCount)
   args = []interface{}{
      u.ID,
      u.Score,
      time.Now(),
   }

   // Add user update score query to batch
   b.Queue("update_user_score", args, nil, nil)
}

// Send the queries to db
err = b.Send(ctx, nil)
if err != nil {
   if e := tx.Rollback(); e != nil {
      c.Logger().Error(e)
   }

   // It's very important to close the batch operation on error
   if e := b.Close(); e != nil {
      c.Logger().Error(e)
   }
   return
}

// Close batch operation
err = b.Close()
if err != nil {
   if e := tx.Rollback(); e != nil {
      c.Logger().Error(e)
   }
   return
}

The last requirement is to log the action.

args = []interface{}{
    fmt.Sprintf("new user created with id %d and username %s", user.ID, user.Username),
}
_, err = tx.Exec("log_action", args...)
if err != nil {
   if e := tx.Rollback(); e != nil {
      c.Logger().Error(e)
   }
   return
}

Finally, commit all the operations.

err = tx.Commit()
if err != nil {
   return err
}

It’s very important to handle each possible error: rollback the transaction and close the batch on any error, otherwise the connection will be not released. Also, if you get a panic “should never release when context is in progress”, you could have forgotten to close the batch.

I haven’t tested in production the exact case I’m presenting here, this one derived from an implementation I had to add some features to.

Take a look at the entire source code on Github.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.