Target Audience
People who have never written a single line of code in Go but have programming experience and can tell the difference between dynamically typed languages such as JavaScript, Python or Ruby and statically typed languages such as Go, Java, or C#.
I will not be covering language basics because there’s an excellent Go Tour for that, but along the way, I will cover some characteristics of Go that are required for decision making and that will lead us to the final version of the methods we will introduce.
Preamble
Before diving in, let’s start with some facts about Go for those who are new to the language:
Go is a fast, statically typed, compiled language.
Go’s concurrency mechanisms make it easy to write programs that get the most out of multicore and networked machines.
Go does not have generics (yet).
Motivation for Writing the Methods
The nature of our business demands processing vast amounts of data and processing it concurrently is a good option for doing it fast.
Even though it is easy to write concurrent programs in Go, programmers need to be mindful about certain details and we wanted to create a library that would allow them to not worry about those details.
Starting with “Each”
Having some Ruby background, we wanted to use familiar names for methods that apply actions to all elements of a collection, such as those that exist for Ruby’s arrays, so let’s start with the method each
.
Go is statically typed, thus you cannot have arrays containing values of different types. What it does offer is the empty interface interface{}
type which allows you to get around some type restrictions. However, this comes with a cost that I’ll be talking about later on in this article.
For trying to mimic how an each
method works on dynamically typed languages, we can declare a new type which is a slice of empty interfaces:
type Array []interface{}
Then make this type the receiver of an Each
method that prints all of its values.
func(a Array) Each() {
for _, anything := range a {
fmt.Println(anything)
}
}
The next step is to add the ability to tell it what to do with each of the values, instead of the hard-coded Println
. So, we modify the method and allow it to receive a function as a parameter.
Yes, Go supports first-class functions.
The function that we will be passing as a parameter must be able to receive a value whose type matches the type of the items of the slice. This is because the function will be receiving one of those items when invoked.
In this case, we are still using interface{}
as the type, which means pretty much any type will be accepted.
func (a Array) Each(fn func(interface{})) {
for _, anything := range a {
fn(anything)
}
}
Usage:
arrayOfStrings.Each(func(value interface{}) {
fmt.Printf("Say %v\\n", value)
})
So far so good, right? We have just defined a type with a method that behaves just like Ruby’s each
method for arrays — or have we?
It has some caveats:
- We are not taking advantage of the benefits of a statically typed language which is to detect errors during compile-time instead of runtime.
- Detecting the type during runtime to know how to deal with the value has a performance cost, in the example, the methods
fmt.Println
andfmt.Printf
internally use something called reflection for dealing with it. - A slice of a certain type cannot be used as a slice of
interface{}
without having to allocate the whole slice again. In other wordsvar i int
can be used as ainterface{}
butvar []int
cannot be used asvar []interface{}
. This doesn’t compile – it would require us to do something like this.
Examples like this are what makes engineers ask for generics in Go.
In the meantime until Go 2.0 is launched, and because we like to stick to the YAGNI principle, we’ll focus on our most immediate need: to process strings.
type Strings []string
func (s Strings) Each(fn func(string)) {
for _, str := range s {
fn(str)
}
}
And whenever the time comes that we need something similar for an int
or any other type, then we could write a code generator.
Error Handling
So far we’re only printing strings and not expecting errors to happen, but what if the processing that we want to apply to the string could result in an error? We should handle it properly a la Go which means to return it. In Go, functions can return more than one value, so it is typical to return the normal value the function would return, plus an error value; in our example, we only need to return the error value.
The signature of the function that we receive changes to:
fn func(string) error
Note: Because of simplicity for the example, we arbitrarily decided it should exit the for loop and return the first inner error that it encounters. Alternatively, we could have wrapped all of them into a single error value that we return after the for
loop.
func (s Strings) Each(fn func(string) error) error {
for _, str := range s {
if err := fn(str); err != nil {
return err
}
}
return nil
}
Concurrency
Here is where the fun begins 😄
Go’s concurrency mechanism is composed of goroutines that communicate through channels.
“Don’t communicate by sharing memory, share memory by communicating.” – Rob Pike
Then if we wanted to concurrently apply certain logic to a big collection of string values, it makes sense to launch thousands of goroutines and have them all read their input from a channel.
Channelize
The first step is to change our type
type Strings <-chan string
The for loop also needs a minor adjustment because when ranging over channels, it only returns one argument:
for str := range s
Finally, we need to feed the channel with values:
stringsCh := make(chan string)
go func() { // separate goroutine to avoid starvation
for _, str := range arrayOfStrings {
stringsCh <- str
}
close(stringsCh)
}()
And now we have a working version of what processed each item of a slice, but this time for a strings channel.
To make it more interesting (more data), let’s simulate a file reader by writing a generator function that creates, feeds and returns a strings channel:
func generator(lines int) Strings {
c := make(chan string)
go func() {
for i := 1; i < lines; i++ {
var buf bytes.Buffer
buf.WriteString(strconv.Itoa(i))
for j := 0; j < i-1; j++ { // write i tab delimited values
buf.WriteString("\t")
buf.WriteString(strconv.Itoa(i))
}
c <- buf.String()
}
close(c)
}()
return c
}
And change the function passed so that instead of just printing the string, it prints how many tab-delimited values it has. Let’s also add an artificial delay and let it error if the count equals 42.
err := generator(100).Each(func(str string) error {
values := strings.Split(str, "\\t")
if len(values) == 42 {
return fmt.Errorf("you already found the meaning of life, universe and everything: %d", len(values))
}
fmt.Println(len(values))
return nil
})
Launch N Workers
Now, instead of processing each line sequentially, we will pass a new parameter to our Each
method, the number of workers wanted. And we will use that parameter for starting N goroutines that read from the channel.
func (s Strings) Each(workers int, fn func(int, string) error) error {
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(wid int) {
for str := range s {
if err := fn(wid, str); err != nil {
panic(err)
}
}
wg.Done()
}(i)
}
wg.Wait()
return nil
}
As you can see, we use the WaitGroup
object provided by the sync
standard library package to hold the main thread until all workers are done; otherwise, the program would just exit before the goroutines finish.
We also added a worker ID wid
parameter to the function received, just in case we need to identify each of the workers (mostly for debugging purposes).
The last — but very important — thing to notice is that because the passed function is now running inside a goroutine, we can’t capture its return value. Therefore, in case of error, we just panic instead of returning it. We will be taking a more appropriate action next.
Errors Occurring in the Background (inside goroutines)
Because the program timeline isn’t a single line anymore, we don’t have a way of immediately bailing out to handle errors if they should arise. In this scenario, one way to handle errors is to put them into a channel and postpone dealing with them until later.
func (s Strings) Each(workers int, fn func(int, string) error) <-chan error {
errCh := make(chan error, workers)
go func() {
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(wid int) {
for str := range s {
if err := fn(wid, str); err != nil {
errCh <- err
}
}
wg.Done()
}(i)
}
wg.Wait()
}()
return errCh
}
This technique also allowed us to have the body of Each
run in the background, turning it into a non-blocking method. Now, the means whereby we wait for Each
to finish is by reading from the errors channel that it returns because it is closed as soon as all workers are done.
for err := range errors {
fmt.Println("There was an error:", err)
}
Early Exit
In the current state of our code, an error message is printed when it finds an offending line, but the method itself does not provide the user any other meaningful course of action. What if a certain error should result in a full stop of every worker?
We can use the context
standard library package for sending a cancel signal to workers. But we will have to replace the for range
over the channel with an infinite for
loop and a select
statement.
var ErrCancelled = errors.New("cancel signal received")
func (s Strings) Each(ctx context.Context, workers int, fn func(int, string) error) <-chan error {
errCh := make(chan error, workers)
go func() {
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(wid int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
errCh <- ErrCancelled
return
case item, hasMore := <-s:
if !hasMore {
return
}
if err := fn(wid, item); err != nil {
errCh <- err
}
}
}
}(i)
}
wg.Wait()
}()
return errCh
}
But we are still not doing anything meaningful with the error returned by the processing function and we don’t really need to. We can leave that responsibility to the user because they have to deal with it one way or another. Placing it in an errors channel isn’t saving them any lines of code; it is probably adding a burden instead.
func (s Strings) Each(ctx context.Context, workers int, fn func(int, string)) <-chan error {
errCh := make(chan error, workers)
go func() {
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(wid int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
errCh <- ErrCancelled
return
case item, hasMore := <-s:
if !hasMore {
return
}
fn(wid, item)
}
}
}(i)
}
wg.Wait()
}()
return errCh
}
And for calling it
errors := generator(100).Each(ctx, 10, func(wid int, str string) {
values := strings.Split(str, "\\t")
if len(values) == 42 {
log.Printf("you already found the meaning of life, universe and everything: %d", len(values))
cancel()
}
log.Println(wid, "->", len(values))
time.Sleep(50 * time.Millisecond)
})
As a last minor improvement, we can eliminate the chances of a worker reading from a channel instead of receiving the cancellation signal by using a double select
. See the explanation in this Stack Overflow answer.
for {
select {
case <-ctx.Done():
errCh <- ErrCancelled
return
default:
}
select {
case <-ctx.Done():
errCh <- ErrCancelled
return
case item, hasMore := <-s:
if !hasMore {
return
}
fn(wid, item)
}
}
“Map” & “Select”
Having written the Each
method, writing the Map
and Select
methods should be an easy task because they can be built on top of Each
.
func (sch Strings) Map(
ctx context.Context, workers int, mapFn func(int, string) string,
) (Strings, <-chan error) {
outCh := make(chan string, workers)
return outCh, sch.Each(ctx, workers, func(wid int, item string) {
outCh <- mapFn(wid, item)
})
}
func (sch Strings) Select(
ctx context.Context, workers int, selectFn func(int, string) bool,
) (Strings, <-chan error) {
outCh := make(chan string, workers)
return outCh, sch.Each(ctx, workers, func(wid int, item string) {
if selectFn(wid, item) {
outCh <- item
}
})
}
Except for one issue: we aren’t closing the output channels.
When working with channels, it is always recommended that the responsibility for closing them relies on who writes to them. This is to prevent panics that would occur when attempting to write to a closed channel.
So, we need to close the output channels and it needs to happen after we know the Each
method has finished writing. However, Each
works in the background, so how can we, as its users, know when it finishes? We can’t.
What we can do instead is to ask Each
to perform some action right after all of its workers are done.
func (sch Strings) Each(ctx context.Context, workers int, fn func(int, string), doneFns ...func()) <-chan error {
errCh := make(chan error, workers)
go func() {
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
//...
}
wg.Wait()
for _, doneFn := range doneFns {
doneFn()
}
}()
return errCh
}
Then we rewrite Map
and Select
:
func (sch Strings) Map(
ctx context.Context, workers int, mapFn func(int, string) string,
) (Strings, <-chan error) {
outCh := make(chan string, workers)
return outCh, sch.Each(ctx, workers, func(wid int, item string) {
outCh <- mapFn(wid, item)
}, func() {
close(outCh)
})
}
func (sch Strings) Select(
ctx context.Context, workers int, selectFn func(int, string) bool,
) (Strings, <-chan error) {
outCh := make(chan string, workers)
return outCh, sch.Each(ctx, workers, func(wid int, item string) {
if selectFn(wid, item) {
outCh <- item
}
}, func() {
close(outCh)
})
}
Conclusion
Coming from linear programming models, it sometimes becomes difficult to mentally follow all the paths and states that a concurrent Go program executes. It is actually difficult to do so not only mentally but even with the support of debugging tools.
When you get over the first hurdles and some patterns start to repeat, then it is time to turn the solutions into libraries that hide complexity from others. In the example presented in this blog post, we free programmers from worrying too much about:
- Deadlocks caused by channel starvation
- Panics caused by attempting to write on closed channels
- Routine leaks
- Having to come up with ways to handle errors that can occur in parallel timelines
It is also very important that the libraries are intuitive and easy to use. This is because they will not be perfect in the first iterations but as more people use them and feel free to contribute to them, then the libraries end up becoming solid pieces of software. They become a valuable asset for the organization or for a developer community.
Interested in working with us? Have a look at our careers page and reach out to us if you would like to be a part of our team!