I recently had the opportunity to use Go generics to separate concurrency from my business logic. This is my first time using Go generics to do something non-trivial, and I’m pleased with the results, so I figured I would share my experience here and show off what I have been able to come up with.

I’m going to a lot of motivating using a concrete example based on GitHub’s public API. If you’re just here for the code, you might want to skip down to the solution.

The Problem

If you ever find yourself needing to retrieve lots of data from a third-party system, there are a few options. If you have access to the database, and own the schema, your choice is easy. Just use the database, write some SQL (or maybe some no-SQL), test it, deliver it, and call it an afternoon.

But even when you do have database access, you don’t always own the schema. This situation can arise when you’re working with hosted, third-party tools. The temptation is strong to just hook your code directly to the database and make use of whatever you find. That can work for small applications, but because you don’t own the database schema, your maintenance budget is at the mercy of inbound schema updates from the developers of the third-party tool, who probably don’t have your use-case in mind. You don’t want to have to wait on performing an upgrade until you have had time to reverse engineer and update your data-layer.

So what do we do? Well, we’re writing an Application, we’re doing some Programming, and we need an Interface. We can just use the API.

… but some APIs really suck – they’re very rarely designed to work well for your problem.

The Problem is the Relationship(s)

Let’s take GitHub’s API as an example. Let’s say that we’re auditing a GitHub organization, and we want to write something which reports on unreviewed Pull Requests which may have triggered an artifact publish via a GitHub Action. We want our solution to work for historical runs, not just new runs. Not every GitHub Action publishes an artifact, but we can read the workflow run logs and look for publish events. [GitHub has an API for fetching its logs], which we can invoke over HTTP as:

GET /repos/{owner}/{repo}/actions/runs/{run_id}/logs

But in order to fetch logs, we need a run_id. Thankfully, we can list workflow runs for a repository which were triggered by a pull request. This should give us a list of run_ids to work with, and all we have to do to list the runs is:

GET /repos/{owner}/{repo}/actions/runs?event=pull_request

But of course, we need to do this for all the repos in our organization. At this point we’re committed, so we might as well use the API again to list organization repositories.

GET /orgs/{org}/repos

And we can assume that we were given a list of organizations (but if we weren’t, we could list them all too).

We also need to fetch the Pull Request metadata associated with each action. Thankfully, the payload from our GET .../runs request, contains a list of pull request IDs. But in order to understand whether a pull request was reviewed, we need to make another API request to list reviews for a pull request.

GET /repos/{owner}/{repo}/pulls/{pull_number}/reviews

The problem with this is clear. The entities we’re fetching are in a one-to-many relationship with one another. Each repo has multiple workflow runs, each run has multiple pull-request reviews. There are going to be a lot of network requests and making all of them sequentially is going to be slow.

But we know Go, and Go is good at concurrency. However, concurrency in Go can be messy, and we would like to be able to flexibly maintain our solution as GitHub’s API and our use-case evolves. Oh, if only there were a way to separate our business logic from the concurrency mechanism!

The Solution

Separating what we’re doing from how we assign that work to goroutines is exactly what I plan to cover in this post. Using the above use-case as an example, let start by designing our planned computation.

Taking a look at what we have to do, we observe that most of our computation time will be spent waiting on network requests to GitHub (i.e. we’re working with an I/O-bound computation). We also have dependencies between requests; we can’t even start certain requests until we’ve completed other requests. Anytime these two properties combine, it’s a likely candidate for pipelining.

Pipelining is a pattern best explained by analogy with a laundromat, where it’s quicker to complete multiple loads of laundry by washing one load while drying another. Similarly, we can be fetching data about a GitHub repository in one goroutine, while we’re fetching data about the Reviews in another. The approach to pipelining we’re going to follow has deep ties to stream processing and functional programming.

We can break down the fetches we need to perform into stages, based on the data dependencies between them. We’ll assume we start with a list of organizations to fetch, and we’ll add a “combine” stage at the end to combine pipeline logs with information about the review. In this post we’ll just focus on fetching the data, and leave the ‘combine’ stage for next time.

flowchart LR repos(GET Repos) runs(GET Runs) prs(GET Reviews) logs(GET Logs) Combine repos -- repo_id --> runs runs -- run_id --> prs runs -- run_id --> logs prs -- "(run_id, review)" --> Combine logs -- "(run_id, logs)" --> Combine

Each of the boxes above represents a different pipeline stage which performs some computation. The arrows between each box represent the data flowing between them. Once we’ve implemented this, each arrow will map to a channel, and each box will map to one or more goroutines.

Most of our boxes take data from an input channel and provides data to one or more output channels. We can represent each box as a generic Stage func which does this explicitly.

func Stage[S,T any](in <-chan S) <-chan T

Each stage takes in a channel of some type S, does some work, and sends its output to a new channel, which it also creates and returns. Knowing nothing else about what our pipeline stages will do, we can already write out a basic implementation of Stage:

func Stage[S, T any](in <-chan S) <-chan T { // CAUTION: unfinished!
  output := make(chan T)
  go func() {
    for s := range in {
      // do some work, convert s to t
      output <- t
    }
  }()
  return output
}

But how do we ‘do some work’ to convert a generic S to a generic T? As the implementors of Stage, we don’t have any idea what S or T are, so we must rely on the caller to pass us a func that can perform the conversion for us. Such a func would have a signature like func(S) T. As the authors of Stage, we have no idea what this func might be doing, but we know we can give it a value of type S and use it to produce a T. That’s all we need to obtain a working implementation of Stage.

func Stage[S, T any](in <-chan S, f func(S) T) <-chan T { // CAUTION: unfinished!
   output := make(chan T)
   go func() {
      for s := range in {
         t := f(s)
         output <- t
      }
   }()
   return output
}

While this is a good start, it doesn’t follow Go’s concurrency best-practices. There are a few problems to address:

  1. The sender is usually responsible for closing a channel. The goroutine we start is the only sender to output, but we aren’t closing it.
  2. The only way to break out of the for range loop within the goroutine is to close in. This is inflexible, as the caller may want to reuse the channel for other purposes, or introduce a timeout if processing takes too long.
  3. If the goroutine we start is blocked on sending a value to the output channel, the goroutine will leak.

The first problem can be solved by defering a close. The second and third problems can be solved by accepting a context.Context and handling context cancellation in a select statement. The result is less easy to read, but better.

func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T) <-chan T {
   output := make(chan T)
   go func() {
      defer close(output)
      for {
         select {
         case s, ok := <-in:
            if !ok {
               return
            }
            select {
            case output <- f(s):
            case <-ctx.Done():
               return
            }
         case <-ctx.Done():
            return
         }
      }
   }()
   return output
}

Note that I’ve also renamed our generically named Stage to Map. Map uses f to convert the contents of a chan S to a chan T. The function f can be arbitrarily complicated, as long as it eventually returns a T. Notice that the signature of f doesn’t provide us with any option to fail or return an error, so in order to capture fallible operations, we would need to capture errors as part of T and handle them downstream. It’s usually cleaner to handle errors locally, so while I don’t recommend sending errors downstream, it is an option.

To understand the power of Go’s generics, close your eyes and imagine if we had to copy something like this code everytime we wanted to add a stage to a pipeline.

Map is our first pipeline stage. There are a lot of variations we can (and will) introduce later, but before adding more complexity, let’s see whether we can use what we’ve got and iterate from there.

Introducing FlatMap

Let’s use test our design by attempting to use Map to solve our original problem. We can assume that we’ve written the functions fetchRepos, fetchRuns, fetchReviews, and fetchLogs, which each make API calls to fetch data from GitHub.

We can already use what we have to chain a bunch of Map stages together explicitly and get something like what we want. Toy implementations of the domain entities used here can be found in the Appendix.

func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
  var repos <-chan Repo  = Map(ctx, orgIDs, fetchRepos)
  var runs  <-chan []Run = Map(ctx, repos, fetchRuns)
  
  var reviews <-chan []RunReview = Map(ctx, runs, fetchReviews)
  var logs    <-chan []RunLogs   = Map(ctx, runs, fetchLogs)

  // TODO: combine reviews and logs somehow
  
  // Note: specifying the type of each of the channels is a reading convenience; 
  // Go's compiler will infer the type properly.
}

To make this work using only Map, we have to write functions with the following signatures.

func fetchRepos(orgID string) Repo
func fetchRuns(repo Repo) []Run
func fetchReviews(runs []Run) []Review
func fetchLogs(runs []Run) []Logs

But wait a minute, each of these fetch functions is going to be performing network I/O. We don’t want our programs to hang on a busy server, so we should be passing a Context, to enable us to shutdown or timeout gracefully. In order to do this properly, we also have to plumb the Context through the func which we pass to Map.

This is easily solved by introducing a variant of Map named MapCtx, with the signature below:

func MapCtx[S, T any](
	ctx context.Context, 
	in <-chan S, 
	f func(context.Context, S) T
 ) <-chan T

The implementation is nearly identical to Map. The only difference is that f also accepts a context, which is passed on every invocation. Moving forward, when we introduce a pipeline named Stage, if it accepts a func as an argument, we’ll assume we have also implemented a corresponding StageCtx, which takes a similar func, with a Context in the first argument. Since implementing StageCtx is trivial given an implementation of Stage, we won’t belabor the details in this post.

A second problem arises from how Map forces us to design the API for fetchReviews and fetchLogs. Both of these functions take in a slice []Run and return a slice in response. But GitHub’s APIs only a single run_id as input, so both of our implementations are just going to be performing fetches in a loop. It would be much simpler to be able to pass functions that accept a single Run a piece.

func fetchReviews(run Run) []Review
func fetchLogs(run Run)    []Logs

If we were to make this change, we’d see a compiler error in AuditOrgs. When we compose Map with fetchRuns, we get a <-chan []Run, when what we need in the stage using fetchReviews is a <-chan Run instead. To solve this, we could write a new pipeline stage called Flatten which converts a <-chan []T to a <-chan T by ‘flattening’ each slice it receives. But introducingFlatten would use an extra goroutine which, unlike our others, wouldn’t be performing any I/O. We expect that this heterogeneity could complicate our future attempts to tune the overall performance of our system, so we should look for other solutions.

Instead, we can modify Map, to handle the case where one value of S can map to multiple values of T. The new function we write will act like a Map, followed immediately by a Flatten, so we’ll call it FlatMap, and give it the following signature:

func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T) <-chan T

The implementation of FlatMap is identical to Map, but the contents of the slice returned from f are each sent to out individually.

After applying these tweaks to AuditOrgs, and adding ctx arguments to our fetch functions, we arrive at the following:

func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
  var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
  var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
  
  var reviews <-chan RunReview = FlatMapCtx(ctx, runs, fetchReviews)
  var logs <-chan RunLogs = FlatMapCtx(ctx, runs, fetchLogs)
  
  // TODO: combine reviews and logs somehow
}

This compiles, and it meets some of our requirements, but if we were to test it, we would see that we have a major bug. The fetchReviews and fetchLogs stages are each consuming from the runs channel. Thus, each run will only be seen by one stage or the other, but never by both. Since we need to fetch reviews and logs for each run, this doesn’t even come close to what we want. We’ll have to fix it before moving forward.

Introducing Tee

In order to ensure each downstream stage sees each Run, we need a way to duplicate the contents of the runs channel into multiple streams which can be consumed independently. Enter Tee, which has the following signature and implementation.

func Tee[T any](in <-chan T) (<-chan T, <-chan T) { // CAUTION: unfinished!
   out1, out2 := make(chan T), make(chan T)
   go func() {
      for t := range in {
         select {
         case out1 <- t:
            out2 <- t
         case out2 <- t:
            out1 <- t
         }
      }
   }()
   return out1, out2
}

Tee sends each value it receives from its input channel to each of its output channels. Armed with it, we can fix the bug we had in AuditOrgs.

func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
   var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
   var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
   runs1, runs2 := Tee(runs)

   var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews)
   var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs)

   // TODO: combine reviews and logs somehow
}

But this also has a subtle problem, which might not become apparent until long after the code has been deployed. A careful look at the implementation of Tee reveals that it can form a potential bottleneck in our pipeline system. As currently written, each value of t must be received by both out1 and out2 before the next value of t is received from in.

There are different ways to fix this. One way is to fork each pair of send operations off onto its own goroutine, as follows:

func Tee[T any](in <-chan T) (<-chan T, <-chan T) { // CAUTION: unfinished!
   out1, out2 := make(chan T), make(chan T)
   go func() {
      for t := range in {
         go func () {    // please do not do this
            select {
            case out1 <- t:
               out2 <- t
            case out2 <- t:
               out1 <- t
            }
         }()
      }
   }()
   return out1, out2
}

This would address the bottleneck, but it comes with its own set of problems. Since we start a new goroutine for each value of t, if out1 was being consumed much more quickly than out2, we could end up with an unbounded number of new goroutines, each just waiting to send a single value to out2. Goroutines are cheap, but an unbounded number of them can easily exhaust available memory and crash our process. We don’t want that, so we abandon this approach.

Instead, we can opt for bounded parallelism by buffering both of the output channels. This way, if one of the channels becomes full, the backpressure propagates and eventually slows processing upstream, which in turn limits our memory overhead.

We could introduce a new buffSize parameter to the Tee function to handle this, like so:

func Tee[T any](in <-chan T, buffSize int) (<-chan T, <-chan T)

But it’s a bit ugly to require each caller to send a 0 whenever they want an unbuffered channel – which they should be using by defualt. That said, we might want to optionally buffer the channels returned by any of the stages. One pattern in Go for optionally configuring resources is functional options. We introduce variadic arguments to each pipeline stage which will give us a way to perform optional pipeline configurations. An implementation could look like this:

func Tee[T any](in <-chan T, opts ...Option) (<-chan T, <-chan T) {
   c := configure(opts)
   out1, out2 := makeChan[T](c), makeChan[T](c)
   // ....
}

type Option func(*config)

func WithBuffer(buffSize int) Option {
   return func(c *config) {
      c.bufferSize = buffSize
   }
}

type config struct {
   bufferSize int
}

func configure(opts []Option) config {
   var result config
   for _, opt := range opts {
      opt(&result)
   }
   return result
}

func makeChan[T any](c config) chan T {
   return make(chan T, c.bufferSize)
}

With this in place, we have a way to alleviate the bottleneck while also ensuring that backpressure is eventually applied upstream to prevent out of memory crashes. As a final step, we plumb a Context through Tee, which makes the code ugly, but is the pattern we are using to shut down goroutines which our pipeline starts. See Tee’s final form in the Appendix.

const BuffSize := 100
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
   var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
   var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
   runs1, runs2 := Tee(ctx, runs, WithBuffer(BuffSize))

   var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews)
   var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs)

   // TODO: combine reviews and logs somehow
}

We’ve also introduced a configuration pattern which we will leverage later to allow callers to further customize their pipeline stages. One way we’ll use this is to allow callers to customize the number of goroutines on which they run a pipeline stage. This should allow users of our package to scale out each stage to tune the overall pipeline’s throughput.

const BuffSize := 100
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
   var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
   var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns, WithPool(2)) // runs on 2 goroutines
   runs1, runs2 := Tee(runs, WithBuffer(BuffSize))

   var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews, WithPool(4)) // runs on 4 goroutines
   var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs, WithPool(4))

   // TODO: combine reviews and logs somehow
}

Since pipeline stages are long-lived, we expect that any overhead which we incur from setting up a stage will be easily overshadowed by I/O latency during processing. Implementing worker pools without duplicating code across every pipeline stage can be a little subtle, and I’m happy with how the pipelines package achieves it. I’ll explain the most important details of how it works in my next post, but for the moment, readers are welcome to explore the implementation of doWithConf for details.

Conclusion

The API we’ve developed separates our business logic from concurrency concerns. If we were solving this problem for real, all we would have to do is implement fetchRepos, fetchRuns, fetchReviews, and fetchLogs, drain the remaining channels, and move on with our lives. In case we notice a performance issue later, we can sprinkle in configuration to manage how a pipeline runs without altering what it does.

Another exciting property – at least to me – is that Go’s verbose concurrency syntax is tucked away behind a well-defined abstraction into a library where it’s been tested and can evolve separately.

We’ll cover how we can combine our reviews and logs output into a final answer later. For the moment, I hope that readers can imagine how it could be done. In the next post we’ll motivate and apply Combine, discuss how to implement optional worker pools, and cover a some of our options for bringing termination signals and errors out of pipelines.

Until then, check out the pipelines library on pkg.go.dev.

Appendix

Toy Domain Entities

Below are some data types which can be used to make sense of the fetch functions above. GitHub undoubtedly does something a bit different with the types; these are only toys.

type Repo struct {
   Owner string
   Repo  string
}

type Run struct {
   Owner      string
   Repo       string
   RunID      int64
   PullNumber *int64
}

type Review struct {
   Owner       string
   Repo        string
   PullNumber  int64
   ReviewerIDs []string
}

type Logs struct {
   Owner string
   Repo  string
   RunID int64
   Logs  []byte
} 
MapCtx

Below is a simple implementation of MapCtx. The only differences from Map are the type of f and passing in ctx on every call.

func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T) <-chan T {
   output := make(chan T)
   go func() {
      defer close(output)
      for {
         select {
         case s, ok := <-in:
            if !ok {
               return
            }
            select {
            case output <- f(ctx, s):
            case <-ctx.Done():
               return
            }
         case <-ctx.Done():
            return
         }
      }
   }()
   return output
}
FlatMap

Below is a simple implementation of FlatMap. The only differences from Map are the type of f and the loop to individual values of T to output.

func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T) <-chan T {
   output := make(chan T)
   go func() {
      defer close(output)
      for {
         select {
         case s, ok := <-in:
            if !ok {
               return
            }
            ts := f(s)
            for _, t := range ts {
               select {
               case output <- t:
               case <-ctx.Done():
                  return
               }
            }
         case <-ctx.Done():
            return
         }
      }
   }()
   return output
}

Tee with Context

Below is an implementation of Tee including a ctx parameter, and correctly closing the result channels when the sending goroutine halts.

func Tee[T any](ctx context.Context, in <-chan T, opts ...Option) (<-chan T, <-chan T) {
   c := configure(opts)
   out1, out2 := makeChan[T](c), makeChan[T](c)
   go func() {
      defer func() {
         close(out2)
         close(out1)
      }()
      for {
         select {
         case <-ctx.Done():
            return
         case t, ok := <-in:
            if !ok {
               return
            }
            select {
            case out1 <- t:
               select {
               case out2 <- t:
               case <-ctx.Done():
                  return
               }
            case out2 <-t:
               select {
               case out1 <- t:
               case <-ctx.Done():
                  return
               }
            case <-ctx.Done():
               return
            }
         }
      }
   }()
   return out1, out2
}