Building and Testing a Worker Pool in Go | by Stephen Wayne | Nov, 2022

Implementing a concurrent pattern for processing tasks using Go

worker pool (also known as a thread pool) is a software design pattern where a set of workers (“pools”) are created from a queue of tasks to process concurrently.

In some languages, each worker is a thread (hence the name “thread pool”), but in Go, we have goroutines – Lightweight threads managed by the Go runtime that enable concurrent programming.

The queuing system is there to distribute tasks to workers, and the logic is to get tasks to workers. Often, the queuing system will be a ChannelAnd inactive employees will work on first come first serve basis.

the gain

There are many advantages of the worker pool pattern. The first is simplicity and developer experience. By placing concurrent tools in a package behind an API, developers don’t have to think about properly closing channels, avoiding worker starvation, signaling, synchronization, etc. And here are some function calls that help me do that!”.

Worker pools can also be useful for performance. By using a defined set of workers (instead of spinning up a goroutine for each task), we can limit how many things we’re fanning out. Many systems have bottlenecks at various levels (be it the number of network requests, in-flight storage requests, processing speed in other parts of the system, etc.), and a worker pool allows us to process jobs while applying concurrent backpressure. allows for. More consumption of resources down the line.

Furthermore, for long-running processing jobs (say, handling incoming API requests) a worker pool avoids the overhead of creating new workers and separating them for each job. This process is a bit less expensive in Go because goroutines are so lightweight, but in general it’s good to be aware of the overhead when using resources.

tuning

The size of the worker pool should be tailored to the specific application. Things to consider are:

  • How many are working?
  • How soon can the jobs come? Are they torn apart, or more steady-state?
  • what’s below me How quickly can it handle the results of those operations (if applicable)?
  • How much of my system resources am I willing to devote to processing these tasks? Should the processing expand to the maximum that the system can handle, or should I limit it to some fraction of the system?
  • Should the number of workers be dynamic? (we won’t make it today)

Often, the engineer has a reasonable estimate of how many workers they need (within an order of magnitude), and tweaks the system based on how it performs under representative integration tests, load tests, or the real world. can be tuned. By including metrics or tracing, the engineer can take a closer look at exactly whether the worker pool is over- or under-allocated. Perhaps we can explore this in more depth in a future post.

Today we’ll implement a very simple (and reusable!) worker pool. Tasks would be defined as an interface (the worker pool doesn’t need to care what it’s processing, only how to do it), and idle workers would wait on a channel for additional work. We’ll also need a way to create, start, stop, and add work to worker pools. We will also create an interface for this.

Below you can see the basic algorithm of how work is sent to and removed from the pool.

Simple worker pool diagram. The calling code generates tasks, which are placed in a queue (channel in Go). Inactive workers read from this queue and are assigned work when they become available.

We’ll start by defining the interface for our worker pool and the tasks it can process:

The worker pool really only needs to provide a way to start and stop processing, and a way to add work to the queue.

need a way to run the jobs, Execute()and a way to handle any errors that result from the task, OnFailure(), With this interface, we leave the calling code a lot of flexibility to define what happens without being too prescriptive.

ok now we need to implement something that will accomplish this Pool Interface. we will do with SimplePool below:

The important thing here is that the worker pool has access to a channel from which work can be consumed (tasks), and it provides a channel (quit) Stopping workers when necessary. Some of the nitty gritty details include tracking how many workers are in the pool and using a Once To ensure that the pool can only be turned on or off once.

We’ll add a constructor to inform the calling code that we want to know how many workers are in the pool, and the size of the buffered channels. It will also set up some details internal to the implementation:

Note here that we have defined some custom errors − ErrNoWorkers And ErrNegativeChannelSize – This is a technique to make testing easier (which you can see workerpool/workerpool_test.go In project repo,

Starting and stopping the pool notifies workers to start or stop processing jobs tasks Channel:

AddWork() provides a way to add Task for worker pool. To implement this, we block the channel when it is full. Thus, we use a select to free any blocking AddWork() goroutines when the worker pool is closed:

Note that we could have chosen this to be non-blocking with minor tweaks – it’s the dealer’s choice sort of point (and may even require the interface to include both!):

now we will execute startWorkers which will create and start numWorkers to read from workers tasks Process channels and tasks:

using a select statement, an inactive employee can either wait for a new assignment tasks channel, or an indication that the worker pool is used quit Channel.

if something happens on it tasks channel will do whatever function it was defined as Execute() function, and it will call the function if there is an error OnFailure Work to fix whatever went wrong.

Once a worker has finished processing a task, it will go back to waiting until a new task becomes available, or until the worker pool is closed.

Automated testing is essential when developing code, especially for packages that may become dependencies for other projects. They can also be very helpful to the development process!

They allow time to stop and consider the interface and implementation (I changed things here as a result of the testing process), and they can help you find implementation bugs or interface inefficiencies before they become a problem.

For this, I have added workerpool/workerpool_test.go To walk through some use cases that should be used.

We’ll start by making sure our constructor functions:

Here we’re mainly testing that the constructor catches common cases of bad input (like negative channel size, 0 workers, etc.), and that we actually get a Pool when the input is valid.

Next, we’ll do our test Start() And Stop() The implementation does the right thing when called multiple times:

Next, we want to make sure that the worker pool can actually process the work. To do this we will create a test implementation that meets our requirements Task interface, and some of them workerpool,

Our testTask tool Execute() And OnFailure() to satisfy Task interface, but also adds tooling to test Execute() returning an error, determining whether we hit a failure case, and also notifying the calling code when the task has been processed through a waiting group,

As we can see here, sometimes it makes sense to build a specific test infrastructure when we are trying to test the code. Since the worker pool operates on interfaces (instead of concrete types), we can just create a new thing that implements that interface instead of trying to be really hacky with otherwise-production code!

Here we are essentially making sure that we can create a worker pool, add work to it, and that the work will be processed as expected. workerpool_test.go A similar function is included to validate the error-handling logic, but we’ll omit it here for the sake of brevity.

Finally, let’s make sure nothing is waiting AddWork Will be freed if the worker pool is ever closed (proper resource management is important!):

This test creates a worker pool and a set of goroutines to add work to it. There are more tasks than channel capacity, so some goroutines will block waiting for tasks to complete.

When the goroutines that link the functions are unblocked, they are signaled by calling Done() on line 18. This means that we can know when all goroutines are unblocked by waiting on the wait group, wg,

waiting for it waitgroup will block, though, so we need a way to timeout. We’ll spin up a goroutine to wait until all task-adding goroutines have finished, and then signal on a new channel, done, By signaling on this channel we can use a select to wait for him done Signal. Since, in case of failure, done This will never happen. We can potentially wait forever (or at least until our testing time is up).

To get around this, we can use a time.After Which sends on a channel after a specified period has elapsed. thus we can say wait until either one second has elapsed, or all goroutines have completed, In the case where we hit the timeout, we fail (because we still have waiting goroutines AddWork after our worker pool is stopped). if we get a signal done We’ll know that the blocking goroutines have completed, and that the worker pool is working as expected.

Worker pools can range from simple (much simpler than that!) to incredibly complex. Some common enhancements include:

  • Adding metrics to track how long work is taking, how much free time employees have, and more.
  • Add a task manager with appropriate sharing logic to ensure diff type For some definition of “fair”, tasks are processed fairly.
  • Add more logging.
  • Any additional logic specific to your application.

some of it can be seen Here In a Firesharing worker pool that I designed and implemented to solve the most frequent HashiCorp Vault save1!

Worker pools are an essential tool in your concurrency tool belt, and now we understand a little bit about how they work. Today we designed and built a fairly simple (but reusable!) implementation, and figured out how to test it. We’ve thought about how the calling code might use it (for example, AddWork() should not block after the pool is closed), and we have left opportunities to increase it.

As always, I welcome any feedback and hope it was helpful!

Leave a Reply