This might be my last take on error handling in go. I think this is the best one as well. We know every instruction that we execute is in a context. And the context can have errors. This is when I thought why not simply make a wrapper on top of the current context. So, all the task if executed via a specific fn then we could possibly check if the ctx has error and if so dont execute else execute and collect the error. This might become an anti-pattern but yeah until it becomes, we can try playing around.
Well cursor had few things to add ->
The Problem
The main challenge is this consider you need to call 4 functions from your current one then there needs to be 4 error handling blocks.
the same old if err != nil {return nil, err}
So if we have a context which saves the previous functions errors and passing down the context to a wrapper which checks the context errors before the actual fn execution will help in robust error handling.
Now you only have one error check for 4 function calls which is at the end where you check if taskCtx.Err() != nil {return nil, err}
The Solution: TaskContext
Let's build a TaskContext
that solves these problems:
package taskctx
import (
"context"
"errors"
"fmt"
"sync"
)
type RunFn[T any] func() (T, error)
type TaskContext struct {
context.Context
mu sync.RWMutex
err error
multiErr []error
}
func NewTaskContext(parent context.Context) *TaskContext {
if parent == nil {
panic("cannot create context from nil parent")
}
return &TaskContext{Context: parent}
}
Key Features
1. Thread-Safe Error Handling
func (c *TaskContext) WithError(err error) *TaskContext {
if err == nil {
return c
}
c.mu.Lock()
defer c.mu.Unlock()
c.multiErr = append(c.multiErr, err)
if c.err == nil {
c.err = err
} else {
c.err = errors.Join(c.err, err)
}
return c
}
2. Single Task Execution
func Run[T any](ctx *TaskContext, fn RunFn[T]) T {
var zero T
if err := ctx.Err(); err != nil {
return zero
}
result, err := fn()
if err != nil {
ctx.WithError(err)
return zero
}
return result
}
3. Parallel Task Execution
func RunParallel[T any](ctx *TaskContext, fns ...func() (T, error)) ([]T, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
results := make([]T, len(fns))
var resultsMu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(fns))
for i, fn := range fns {
i, fn := i, fn
go func() {
defer wg.Done()
result, err := fn()
if err != nil {
ctx.AddError(fmt.Errorf("task %d: %w", i+1, err))
} else {
resultsMu.Lock()
results[i] = result
resultsMu.Unlock()
}
}()
}
wg.Wait()
return results, ctx.Errors()
}
4. Controlled Concurrency
func RunParallelWithLimit[T any](ctx *TaskContext, limit int, fns ...func() (T, error)) ([]T, error) {
// ... similar to RunParallel but with semaphore ...
sem := make(chan struct{}, limit)
// ... implementation ...
}
Usage Examples
Simple Task Execution
func ExampleTaskContext_ShipmentProcessing() {
ctx := goctx.NewTaskContext(context.Background())
order := dummyOrder()
shipment := dummyShipment()
// Step 1: Validate address
// Step 2: Calculate shipping cost
// Step 3: Generate label
_ = goctx.Run(ctx, validateAddress("123 Main St"))
cost := goctx.Run(ctx, calculateShipping(order))
trackingNum := goctx.Run(ctx, generateLabel(shipment.OrderID, cost))
if ctx.Err() != nil {
fmt.Printf("Error: %v\n", ctx.Err())
return
}
shipment.Status = "READY"
shipment.TrackingNum = trackingNum
fmt.Printf("Shipment processed: %+v\n", shipment)
// Output:
// Shipment processed: {OrderID:ORD123 Status:READY TrackingNum:TRACK-ORD123-1234567890}
}
Parallel Task Execution
func ExampleTaskContext_OrderProcessing() {
ctx := goctx.NewTaskContext(context.Background())
// Mock order
order := []OrderItem{
{ProductID: "LAPTOP", Quantity: 2},
{ProductID: "MOUSE", Quantity: 3},
}
taskCtx := goctx.NewTaskContext(ctx)
// Create inventory checks for each item
inventoryChecks := goctx.Run[[]goctx.RunFn[bool]](taskCtx,
func() ([]goctx.RunFn[bool], error) {
return streams.NewTransformer[OrderItem, goctx.RunFn[bool]](order).
Transform(streams.MapItSimple(checkInventory)).
Result()
})
// Run inventory checks in parallel
_, err := goctx.RunParallel(ctx, inventoryChecks...)
fmt.Printf("Inventory check error: %v\n", err)
// Output:
// Inventory check error: task 1: insufficient inventory for LAPTOP
}
Benefits
- Thread Safety: All operations are protected by mutexes
- Error Collection: Maintains both first error and all errors
- Context Integration: Works with Go's context package
- Generic Support: Works with any return type
- Concurrency Control: Built-in support for limiting parallel executions
Testing
Here's how to test the implementation:
func TestTaskContext(t *testing.T) {
t.Run("handles parallel errors", func(t *testing.T) {
ctx := NewTaskContext(context.Background())
_, err := RunParallel(ctx,
func() (int, error) { return 0, errors.New("error 1") },
func() (int, error) { return 0, errors.New("error 2") },
)
assert.Error(t, err)
assert.Contains(t, err.Error(), "error 1")
assert.Contains(t, err.Error(), "error 2")
})
}
Conclusion
This TaskContext
implementation provides a robust solution for handling concurrent task execution with proper error handling in Go. It's particularly useful when you need to:
- Execute multiple tasks concurrently
- Collect errors from all tasks
- Limit concurrent executions
- Maintain thread safety
- Keep track of the first error while collecting all errors
The complete code is available on GitHub.
Resources
What patterns do you use for handling concurrent task execution in Go? Share your thoughts in the comments below!
Top comments (0)