I recently had the opportunity to use Go generics to separate concurrency from my business logic. This is my first time using Go generics to do something non-trivial, and I’m pleased with the results, so I figured I would share my experience here and show off what I have been able to come up with.
I’m going to a lot of motivating using a concrete example based on GitHub’s public API. If you’re just here for the code, you might want to skip down to the solution.
The Problem
If you ever find yourself needing to retrieve lots of data from a third-party system, there are a few options. If you have access to the database, and own the schema, your choice is easy. Just use the database, write some SQL (or maybe some no-SQL), test it, deliver it, and call it an afternoon.
But even when you do have database access, you don’t always own the schema. This situation can arise when you’re working with hosted, third-party tools. The temptation is strong to just hook your code directly to the database and make use of whatever you find. That can work for small applications, but because you don’t own the database schema, your maintenance budget is at the mercy of inbound schema updates from the developers of the third-party tool, who probably don’t have your use-case in mind. You don’t want to have to wait on performing an upgrade until you have had time to reverse engineer and update your data-layer.
So what do we do? Well, we’re writing an Application, we’re doing some Programming, and we need an Interface. We can just use the API.
… but some APIs really suck – they’re very rarely designed to work well for your problem.
The Problem is the Relationship(s)
Let’s take GitHub’s API as an example. Let’s say that we’re auditing a GitHub organization, and we want to write something which reports on unreviewed Pull Requests which may have triggered an artifact publish via a GitHub Action. We want our solution to work for historical runs, not just new runs. Not every GitHub Action publishes an artifact, but we can read the workflow run logs and look for publish events. [GitHub has an API for fetching its logs], which we can invoke over HTTP as:
GET /repos/{owner}/{repo}/actions/runs/{run_id}/logs
But in order to fetch logs, we need a run_id
. Thankfully, we can list workflow runs for a repository which were
triggered by a pull request. This should give us a list of run_id
s to work with, and all we have to do to list the
runs is:
GET /repos/{owner}/{repo}/actions/runs?event=pull_request
But of course, we need to do this for all the repos in our organization. At this point we’re committed, so we might as well use the API again to list organization repositories.
GET /orgs/{org}/repos
And we can assume that we were given a list of organizations (but if we weren’t, we could list them all too).
We also need to fetch the Pull Request metadata associated with each action. Thankfully, the payload from
our GET .../runs
request, contains a list of pull request IDs. But in order to understand whether a pull request was
reviewed, we need to make another API request to list reviews for a pull request.
GET /repos/{owner}/{repo}/pulls/{pull_number}/reviews
The problem with this is clear. The entities we’re fetching are in a one-to-many relationship with one another. Each repo has multiple workflow runs, each run has multiple pull-request reviews. There are going to be a lot of network requests and making all of them sequentially is going to be slow.
But we know Go, and Go is good at concurrency. However, concurrency in Go can be messy, and we would like to be able to flexibly maintain our solution as GitHub’s API and our use-case evolves. Oh, if only there were a way to separate our business logic from the concurrency mechanism!
The Solution
Separating what we’re doing from how we assign that work to goroutines is exactly what I plan to cover in this post. Using the above use-case as an example, let start by designing our planned computation.
Taking a look at what we have to do, we observe that most of our computation time will be spent waiting on network requests to GitHub (i.e. we’re working with an I/O-bound computation). We also have dependencies between requests; we can’t even start certain requests until we’ve completed other requests. Anytime these two properties combine, it’s a likely candidate for pipelining.
Pipelining is a pattern best explained by analogy with a laundromat, where it’s quicker to complete multiple loads of laundry by washing one load while drying another. Similarly, we can be fetching data about a GitHub repository in one goroutine, while we’re fetching data about the Reviews in another. The approach to pipelining we’re going to follow has deep ties to stream processing and functional programming.
We can break down the fetches we need to perform into stages, based on the data dependencies between them. We’ll assume we start with a list of organizations to fetch, and we’ll add a “combine” stage at the end to combine pipeline logs with information about the review. In this post we’ll just focus on fetching the data, and leave the ‘combine’ stage for next time.
Each of the boxes above represents a different pipeline stage which performs some computation. The arrows between each box represent the data flowing between them. Once we’ve implemented this, each arrow will map to a channel, and each box will map to one or more goroutines.
Most of our boxes take data from an input channel and provides data to one or more output channels. We can represent
each box as a generic Stage
func which does this explicitly.
func Stage[S,T any](in <-chan S) <-chan T
Each stage takes in a channel of some type S
, does some work, and sends its output to a new channel, which it also
creates and returns.
Knowing nothing else about what our pipeline stages will do, we can already write out a basic implementation of Stage
:
func Stage[S, T any](in <-chan S) <-chan T { // CAUTION: unfinished!
output := make(chan T)
go func() {
for s := range in {
// do some work, convert s to t
output <- t
}
}()
return output
}
But how do we ‘do some work’ to convert a generic S
to a generic T
? As the implementors of Stage
, we don’t have
any idea what S
or T
are, so we must rely on the caller to pass us a func that can perform the conversion for us.
Such a func would have a signature like func(S) T
. As the authors of Stage
, we have no idea what this func might be
doing, but we know we can give it a value of type S
and use it to produce a T
. That’s all we need to obtain a
working implementation of Stage
.
func Stage[S, T any](in <-chan S, f func(S) T) <-chan T { // CAUTION: unfinished!
output := make(chan T)
go func() {
for s := range in {
t := f(s)
output <- t
}
}()
return output
}
While this is a good start, it doesn’t follow Go’s concurrency best-practices. There are a few problems to address:
- The sender is usually responsible for closing a channel. The goroutine we start is the only sender to
output
, but we aren’t closing it. - The only way to break out of the
for range
loop within the goroutine is to closein
. This is inflexible, as the caller may want to reuse the channel for other purposes, or introduce a timeout if processing takes too long. - If the goroutine we start is blocked on sending a value to the output channel, the goroutine will leak.
The first problem can be solved by defer
ing a close
. The second and third problems can be solved by accepting a
context.Context
and handling context cancellation in a select
statement. The result is less easy to read,
but better.
func Map[S, T any](ctx context.Context, in <-chan S, f func(S) T) <-chan T {
output := make(chan T)
go func() {
defer close(output)
for {
select {
case s, ok := <-in:
if !ok {
return
}
select {
case output <- f(s):
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
Note that I’ve also renamed our generically named Stage
to Map
. Map uses f
to convert the contents of a chan S
to a chan T
. The function f
can be arbitrarily complicated, as long as it eventually returns a T
. Notice that the
signature of f
doesn’t provide us with any option to fail or return an error, so in order to capture fallible operations,
we would need to capture errors as part of T
and handle them downstream. It’s usually cleaner to handle errors locally,
so while I don’t recommend sending errors downstream, it is an option.
To understand the power of Go’s generics, close your eyes and imagine if we had to copy something like this code everytime we wanted to add a stage to a pipeline.
Map
is our first pipeline stage. There are a lot of variations we can (and will) introduce later, but before
adding more complexity, let’s see whether we can use what we’ve got and iterate from there.
Introducing FlatMap
Let’s use test our design by attempting to use Map
to solve our original problem. We can assume that we’ve
written the functions fetchRepos
, fetchRuns
, fetchReviews
, and fetchLogs
, which each make API calls to fetch
data from GitHub.
We can already use what we have to chain a bunch of Map
stages together explicitly and get something like what we
want. Toy implementations of the domain entities used here can be found in the Appendix.
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
var repos <-chan Repo = Map(ctx, orgIDs, fetchRepos)
var runs <-chan []Run = Map(ctx, repos, fetchRuns)
var reviews <-chan []RunReview = Map(ctx, runs, fetchReviews)
var logs <-chan []RunLogs = Map(ctx, runs, fetchLogs)
// TODO: combine reviews and logs somehow
// Note: specifying the type of each of the channels is a reading convenience;
// Go's compiler will infer the type properly.
}
To make this work using only Map
, we have to write functions with the following signatures.
func fetchRepos(orgID string) Repo
func fetchRuns(repo Repo) []Run
func fetchReviews(runs []Run) []Review
func fetchLogs(runs []Run) []Logs
But wait a minute, each of these fetch functions is going to be performing network I/O. We don’t want our programs to
hang on a busy server, so we should be passing a Context
, to enable us to shutdown or timeout gracefully. In order to
do this properly, we also have to plumb the Context
through the func
which we pass to Map
.
This is easily solved by introducing a variant of Map
named MapCtx
, with the signature below:
func MapCtx[S, T any](
ctx context.Context,
in <-chan S,
f func(context.Context, S) T
) <-chan T
The implementation is nearly identical to Map
. The only difference is that f
also accepts a context, which is
passed on every invocation. Moving forward, when we introduce a pipeline named Stage
, if it accepts a func
as an argument,
we’ll assume we have also implemented a corresponding StageCtx
, which takes a similar func, with a Context
in the
first argument. Since implementing StageCtx
is trivial given an implementation of Stage
, we won’t belabor the
details in this post.
A second problem arises from how Map
forces us to design the API for fetchReviews
and fetchLogs
. Both of these
functions take in a slice []Run
and return a slice in response. But GitHub’s APIs only a single run_id
as input, so
both of our implementations are just going to be performing fetches in a loop. It would be much simpler to be able to
pass functions that accept a single Run
a piece.
func fetchReviews(run Run) []Review
func fetchLogs(run Run) []Logs
If we were to make this change, we’d see a compiler error in AuditOrgs
. When we compose Map
with fetchRuns
,
we get a <-chan []Run
, when what we need in the stage using fetchReviews
is a <-chan Run
instead. To solve this,
we could write a new pipeline stage called Flatten
which converts a <-chan []T
to a <-chan T
by ‘flattening’ each
slice it receives. But introducingFlatten
would use an extra goroutine which, unlike our others, wouldn’t be
performing any I/O. We expect that this heterogeneity could complicate our future attempts to tune the overall
performance of our system, so we should look for other solutions.
Instead, we can modify Map
, to handle the case where one value of S
can map to multiple values of T
. The new
function we write will act like a Map
, followed immediately by a Flatten
, so we’ll call it FlatMap
, and give it the
following signature:
func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T) <-chan T
The implementation of FlatMap
is identical to Map
, but the contents of the slice returned from f
are each sent
to out
individually.
After applying these tweaks to AuditOrgs
, and adding ctx
arguments to our fetch
functions, we arrive at the
following:
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
var reviews <-chan RunReview = FlatMapCtx(ctx, runs, fetchReviews)
var logs <-chan RunLogs = FlatMapCtx(ctx, runs, fetchLogs)
// TODO: combine reviews and logs somehow
}
This compiles, and it meets some of our requirements, but if we were to test it, we would see that we have a major
bug. The fetchReviews
and fetchLogs
stages are each consuming from the runs
channel. Thus, each run will only be
seen by one stage or the other, but never by both. Since we need to fetch reviews and logs for each run, this doesn’t
even come close to what we want. We’ll have to fix it before moving forward.
Introducing Tee
In order to ensure each downstream stage sees each Run
, we need a way to duplicate the contents of the runs
channel
into multiple streams which can be consumed independently. Enter Tee
, which has the following signature and
implementation.
func Tee[T any](in <-chan T) (<-chan T, <-chan T) { // CAUTION: unfinished!
out1, out2 := make(chan T), make(chan T)
go func() {
for t := range in {
select {
case out1 <- t:
out2 <- t
case out2 <- t:
out1 <- t
}
}
}()
return out1, out2
}
Tee
sends each value it receives from its input channel to each of its output channels.
Armed with it, we can fix the bug we had in AuditOrgs
.
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
runs1, runs2 := Tee(runs)
var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews)
var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs)
// TODO: combine reviews and logs somehow
}
But this also has a subtle problem, which might not become apparent until long after the code has been deployed. A
careful look at the implementation of Tee
reveals that it can form a potential bottleneck in our pipeline system. As
currently written, each value of t
must be received by both out1
and out2
before the next value of t
is received
from in
.
There are different ways to fix this. One way is to fork each pair of send operations off onto its own goroutine, as follows:
func Tee[T any](in <-chan T) (<-chan T, <-chan T) { // CAUTION: unfinished!
out1, out2 := make(chan T), make(chan T)
go func() {
for t := range in {
go func () { // please do not do this
select {
case out1 <- t:
out2 <- t
case out2 <- t:
out1 <- t
}
}()
}
}()
return out1, out2
}
This would address the bottleneck, but it comes with its own set of problems. Since we start a new goroutine for each
value of t
, if out1
was being consumed much more quickly than out2
, we could end up with an unbounded number of
new goroutines, each just waiting to send a single value to out2
. Goroutines are cheap, but an unbounded number of
them can easily exhaust available memory and crash our process. We don’t want that, so we abandon this approach.
Instead, we can opt for bounded parallelism by buffering both of the output channels. This way, if one of the channels becomes full, the backpressure propagates and eventually slows processing upstream, which in turn limits our memory overhead.
We could introduce a new buffSize
parameter to the Tee
function to handle this, like so:
func Tee[T any](in <-chan T, buffSize int) (<-chan T, <-chan T)
But it’s a bit ugly to require each caller to send a 0 whenever they want an unbuffered channel – which they should be using by defualt. That said, we might want to optionally buffer the channels returned by any of the stages. One pattern in Go for optionally configuring resources is functional options. We introduce variadic arguments to each pipeline stage which will give us a way to perform optional pipeline configurations. An implementation could look like this:
func Tee[T any](in <-chan T, opts ...Option) (<-chan T, <-chan T) {
c := configure(opts)
out1, out2 := makeChan[T](c), makeChan[T](c)
// ....
}
type Option func(*config)
func WithBuffer(buffSize int) Option {
return func(c *config) {
c.bufferSize = buffSize
}
}
type config struct {
bufferSize int
}
func configure(opts []Option) config {
var result config
for _, opt := range opts {
opt(&result)
}
return result
}
func makeChan[T any](c config) chan T {
return make(chan T, c.bufferSize)
}
With this in place, we have a way to alleviate the bottleneck while also ensuring that backpressure is eventually applied
upstream to prevent out of memory crashes. As a final step, we plumb a Context through Tee
, which makes the code ugly,
but is the pattern we are using to shut down goroutines which our pipeline starts. See Tee
’s final form in the Appendix.
const BuffSize := 100
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns)
runs1, runs2 := Tee(ctx, runs, WithBuffer(BuffSize))
var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews)
var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs)
// TODO: combine reviews and logs somehow
}
We’ve also introduced a configuration pattern which we will leverage later to allow callers to further customize their pipeline stages. One way we’ll use this is to allow callers to customize the number of goroutines on which they run a pipeline stage. This should allow users of our package to scale out each stage to tune the overall pipeline’s throughput.
const BuffSize := 100
func AuditOrgs(ctx context.Context, orgIDs <-chan string) { // CAUTION: unfinished!
var repos <-chan Repo = MapCtx(ctx, orgIDs, fetchRepos)
var runs <-chan Run = FlatMapCtx(ctx, repos, fetchRuns, WithPool(2)) // runs on 2 goroutines
runs1, runs2 := Tee(runs, WithBuffer(BuffSize))
var reviews <-chan RunReview = FlatMapCtx(ctx, runs1, fetchReviews, WithPool(4)) // runs on 4 goroutines
var logs <-chan RunLogs = FlatMapCtx(ctx, runs2, fetchLogs, WithPool(4))
// TODO: combine reviews and logs somehow
}
Since pipeline stages are long-lived, we expect that any overhead which we incur from setting up a stage will be easily
overshadowed by I/O latency during processing. Implementing worker pools without duplicating code across every pipeline
stage can be a little subtle, and I’m happy with how the pipelines
package achieves it. I’ll explain
the most important details of how it works in my next post, but for the moment, readers are welcome to explore the
implementation of doWithConf
for details.
Conclusion
The API we’ve developed separates our business logic from concurrency concerns.
If we were solving this problem for real, all we would have to do is implement fetchRepos
, fetchRuns
,
fetchReviews
, and fetchLogs
, drain the remaining channels, and move on with our lives. In case we notice a
performance issue later, we can sprinkle in configuration to manage how a pipeline runs without altering what it
does.
Another exciting property – at least to me – is that Go’s verbose concurrency syntax is tucked away behind a well-defined abstraction into a library where it’s been tested and can evolve separately.
We’ll cover how we can combine our reviews
and logs
output into a final answer later. For the moment, I hope that
readers can imagine how it could be done. In the next post we’ll motivate and apply Combine
, discuss how to implement
optional worker pools, and cover a some of our options for bringing termination signals and errors out of pipelines.
Until then, check out the pipelines library on pkg.go.dev.
Appendix
Toy Domain Entities
Below are some data types which can be used to make sense of the fetch
functions above. GitHub undoubtedly does
something a bit different with the types; these are only toys.
type Repo struct {
Owner string
Repo string
}
type Run struct {
Owner string
Repo string
RunID int64
PullNumber *int64
}
type Review struct {
Owner string
Repo string
PullNumber int64
ReviewerIDs []string
}
type Logs struct {
Owner string
Repo string
RunID int64
Logs []byte
}
MapCtx
Below is a simple implementation of MapCtx
. The only differences from Map
are the type of f
and
passing in ctx
on every call.
func MapCtx[S, T any](ctx context.Context, in <-chan S, f func(context.Context, S) T) <-chan T {
output := make(chan T)
go func() {
defer close(output)
for {
select {
case s, ok := <-in:
if !ok {
return
}
select {
case output <- f(ctx, s):
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
FlatMap
Below is a simple implementation of FlatMap
. The only differences from Map
are the type of f
and
the loop to individual values of T
to output
.
func FlatMap[S, T any](ctx context.Context, in <-chan S, f func(S) []T) <-chan T {
output := make(chan T)
go func() {
defer close(output)
for {
select {
case s, ok := <-in:
if !ok {
return
}
ts := f(s)
for _, t := range ts {
select {
case output <- t:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return output
}
Tee with Context
Below is an implementation of Tee
including a ctx
parameter, and correctly closing the result channels when the
sending goroutine halts.
func Tee[T any](ctx context.Context, in <-chan T, opts ...Option) (<-chan T, <-chan T) {
c := configure(opts)
out1, out2 := makeChan[T](c), makeChan[T](c)
go func() {
defer func() {
close(out2)
close(out1)
}()
for {
select {
case <-ctx.Done():
return
case t, ok := <-in:
if !ok {
return
}
select {
case out1 <- t:
select {
case out2 <- t:
case <-ctx.Done():
return
}
case out2 <-t:
select {
case out1 <- t:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
}()
return out1, out2
}