Pipelines and workers in Go

I have some lists of users that I get and, for each user, I need to apply some rules (text formatting, max length, and who knows what other business rules can come up in the future), then send it further to another service. If I get the user again, I have to ignore them from the entire process. If one of the rules tells the user is not eligible, I have to stop the entire process, no need to go the next rules.

If you read the previous paragraph again, you can see some if statements that you should avoid from the technical implementation, but of course, not from the business rules:

  • If user was already processed, continue
  • If max length is exceeded, truncate
  • If a rule tells user is not OK, stop

I look at the rules as being some workers in a pipeline. Every worker does its job and sends its work to the next worker. Here’s how I’ve handled this.

First, the user model and a struct for the list of users:

type User struct {
       ID        int    `json:"id"`
       FirstName string `json:"first_name"`
       LastName  string `json:"last_name"`
}

type Users []User

Now I have to allow users to be sent to my app, and this can be done by a simple HTTP server with a handler that will send incoming data to a channel.

incoming = make(chan Users)
func defaultHandler(w http.ResponseWriter, r *http.Request) {
       body, err := ioutil.ReadAll(r.Body)
       defer r.Body.Close()
       if err != nil {
              panic(err)
       }

       var users Users
       err = json.Unmarshal(body, &users)
       if err != nil {
              panic(err)
       }

       fmt.Println()
       log.Println("Incoming...")
       log.Println(users, "\n")

       // Send received users on incoming channel
       incoming <- users

       w.Header().Set("Content-Type", "application/json")
       w.Write(body)
}

http.HandleFunc("/", defaultHandler)
http.ListenAndServe(":8080", nil)

I have a routine waiting for data on the incoming channel, and any time I receive something, I’m applying the rules. Here comes the pipeline to the rescue. Instead of having multiple if statements for each condition I need for processing the data, I’m going to use some workers (like I’ve mentioned earlier), each of them doing their unique job, and send data one to another. If a worker sends no data, I’m not going anymore to the next ones, I just stop the pipeline.

type Pipeline struct {
       workers []Worker
}

func (pe *Pipeline) Execute(u User) {
       var err error
       for _, w := range pe.workers {
              // If one worker returns an error, no need to continue
              if err != nil {
                     log.Println(fmt.Sprintf("Error: %s", err.Error()), u)
                     break
              }

              u, err = w.Work(u)
       }
}

The workers are very simple:

  • UniqueInsurer is a in-memory database for the users and will send further only new ones.
  • Capitalizer capitalizes the last name.
  • BoundariesApplier truncates the first and last names if they exceed a maximum length.
  • Emitter just sends a processed user to another channel, from where they will be sent further. I’ve implemented this worker so I don’t give the responsibility of sending further to the BoundariesApplier (which could be removed some day).
type Worker interface {
       Work(u User) (User, error)
}

// UniqueInsurer will keep users in memory and return only new ones
type UniqueInsurer struct {
       memory []*User
}

func (ui *UniqueInsurer) Work(u User) (User, error) {
       for _, m := range ui.memory {
              if u.ID == m.ID {
                     return u, errors.New("User exists in memory")
              }
       }

       ui.memory = append(ui.memory, &u)

       return u, nil
}

// Capitalizer will capitalize last name
type Capitalizer struct {
}

func (c Capitalizer) Work(u User) (User, error) {
       u.LastName = strings.ToUpper(u.LastName)
       return u, nil
}

// BoundariesApplier truncates names that are too long
type BoundariesApplier struct {
       MaxLength int
}

func (ba BoundariesApplier) Work(u User) (User, error) {
       u.FirstName = ba.truncate(u.FirstName)
       u.LastName = ba.truncate(u.LastName)
       return u, nil
}

func (ba BoundariesApplier) truncate(text string) string {
       if len(text) > ba.MaxLength {
              text = text[0:ba.MaxLength]
       }
       
       return text
}

type Emitter struct {
       ch chan User
}

func (e Emitter) Work(u User) (User, error) {
       e.ch <- u
       return u, nil
}

The pipeline and its workers are easy to set up:

workers := []Worker{
       &UniqueInsurer{
              make([]*User, 0),
       },
       Capitalizer{},
       BoundariesApplier{
              12,
       },
       Emitter{
              outgoing,
       },
}
pipeline := Pipeline{
       workers,
}

Now a routine to run the pipeline for each user on:

go func(incoming chan Users, pipeline Pipeline) {
       for {
              users := <-incoming
              for _, u := range users {
                     pipeline.Execute(u)
              }
       }
}(incoming, pipeline)

Finally, I’m waiting on an outgoing channel for valid users:

go func(outgoing chan User) {
       for {
              u := <-outgoing
              log.Println("Success: Outgoing", u)
       }
}(outgoing)

Now I can send users to my service. Change the ids and other properties sent in the following request, and watch the cli for the response.

curl -X POST \
    http://127.0.0.1:8080/ \
        -d '[
            {
                "id": 1,
                "first_name": "John",
                "last_name": "Doe"
            },
            {
                "id": 2,
                "first_name": "Johanna",
                "last_name": "Doe"
            },
            {
                "id": 3,
                "first_name": "Johanna Anne Marie",
                "last_name": "Doe Doe"
            }
    ]'

Take a look at the full pipeline service example on Github.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.