Parallelism Fundamentals in Go: Learn how to Implement Goroutines and WaitGroups to Run Tasks Concurrently
Introduction
A fundamental technique for improving performance is parallel computing, which allows multiple tasks to run simultaneously to make the best use of system resources. In this blog, we will explore how to implement parallel programming techniques in Go, using practical examples and key concepts such as goroutines, WaitGroups, Mutexes and atomic operations.
Introduction to Parallel Computing
Parallel computing is a form of processing in which tasks are divided into sub-tasks that can be executed at the same time. This contrasts with sequential computing, where tasks are executed one after the other. The main idea is that by executing multiple operations simultaneously, the overall execution time can be reduced and the overall efficiency of the system can be improved.
Parallelism vs. Concurrency
It is important to differentiate between parallelism and concurrency:
Parallelism: This refers to the simultaneous execution of multiple tasks. In the context of computing, this generally involves the use of multiple CPU cores to execute tasks at the same time. In Go, this is achieved through the use of goroutines and the thread-based execution model.
Concurrency: The ability to handle multiple tasks in such a way that they appear to be running simultaneously, even if they are not necessarily running in parallel. Concurrency can involve switching between tasks on a single CPU core. In Go, concurrency is handled by goroutines and channels, facilitating the creation of programs that can do multiple things at once.
Parallel Computing in Go
Go, known for its simplicity and efficiency, offers powerful tools for parallel programming. Among them, goroutines are fundamental. Goroutines are functions that run concurrently with other functions. They are lightweight and managed by the Go runtime, allowing highly parallelised applications to be created without additional complications.
Goroutines: These are the basic unit of execution in Go. They are created using the
go
keyword, and the Go runtime handles the concurrent and parallel execution of these goroutines.WaitGroups: These are used to wait for a set of goroutines to finish execution. They provide a form of synchronisation between goroutines, ensuring that all parallel tasks complete before continuing.
Mutexes: These are mechanisms to control concurrent access to shared resources. They prevent race condition problems, where two or more goroutines attempt to access and modify the same resource simultaneously, which can lead to unexpected results or errors.
In this blog, we will use a practical example of parallel processing in Go to illustrate these concepts. Through the analysis of code that performs parallel mapping, filtering and reduction operations, we will see how to apply these concepts to improve performance and efficiency in Go applications. We will explore how goroutines and other tools provided by the language can be used to achieve faster and more efficient data processing.
Key Components in the Code
In this section, we will discuss in detail the essential components of our code, explaining how goroutines, sync.WaitGroup
, sync.Mutex
and atomic operations are used to implement parallel programming in Go.
You can find the complete code here:
Goroutines
Goroutines are a fundamental feature of Go that allows concurrent execution of functions. They are created by using the go
keyword before a function or function expression. Goroutines are lightweight and managed by the Go runtime, which means that they can be created and managed in large numbers without a high resource cost.
In the code, goroutines are used to perform parallel operations on different parts of a slice. This is done in the parallelMap
, parallelFilter
, and parallelReduce
functions:
// Each worker applies the mapping function to its chunk of the slice.
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
result[j] = mapFunc(nums[j])
}
}(start, end)
Each goroutine takes a segment of the nums
slice and applies the mapFunc
function to each element of that segment. The use of goroutines allows multiple segments of the slice to be processed at the same time, reducing the overall execution time.
WaitGroups
The sync.WaitGroup
is a synchronisation tool used to wait for a group of goroutines to complete their execution. It is used to coordinate the completion of multiple goroutines and ensure that all goroutines have finished before continuing the execution of the program.
In the code, sync.WaitGroup
is used to ensure that the main program waits for all goroutines to finish before continuing:
// Create a WaitGroup to synchronize the workers.
var wg sync.WaitGroup
// Start the workers.
for i := 0; i < numWorkers; i++ {
start := i * chunkSize
end := start + chunkSize
if i == numWorkers-1 {
end = n
}
wg.Add(1)
// Each worker applies the mapping function to its chunk of the slice.
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
result[j] = mapFunc(nums[j])
}
}(start, end)
}
// Wait for all workers to finish.
wg.Wait()
Here, wg.Add(1)
increments the WaitGroup counter each time a new goroutine is started. Each goroutine calls wg.Done()
when it finishes, decrementing the counter. wg.Wait()
blocks the main thread until the counter reaches zero, i.e. until all goroutines have finished their work.
Mutexes
The sync.Mutex
is a synchronisation mechanism used to prevent race conditions, which occur when multiple goroutines attempt to access and modify shared data at the same time. A mutex ensures that only one goroutine can access a critical section of code at a time.
In the code, sync.mutex
is used to protect access to the result
slice during filtering and reduction operations:
// Create a Mutex to protect the result slice.
var mu sync.Mutex
// Each worker filters its chunk of the slice and appends the results to the local result slice.
go func(start, end int) {
defer wg.Done()
localResult := make([]int, 0, end-start)
for j := start; j < end; j++ {
if filterFunc(nums[j]) {
localResult = append(localResult, nums[j])
}
}
// Lock the Mutex and append the local result slice to the global result slice.
mu.Lock()
result = append(result, localResult...)
mu.Unlock()
}(start, end)
The mutex mu
is used to ensure that only one goroutine can modify result
at any given time. mu.Lock()
acquires the mutex before modifying result
, and mu.Unlock()
releases the mutex after the modification.
Atomic Operations
Atomic operations allow operations on shared variables to be performed safely without the need for mutexes. The sync/atomic
package provides operations such as addition, subtraction, and compare and shift, which are atomic and ensure that operations are safe in a concurrent environment.
In the code, atomic.AddInt32
is used to perform the reduction safely:
var result int32
// Each worker reduces its chunk of the slice and atomically adds the result to the global result variable.
go func(start, end int) {
defer wg.Done()
localResult := nums[start]
for j := start + 1; j < end; j++ {
localResult = reduceFunc(localResult, nums[j])
}
// Lock the Mutex and atomically add the local result to the global result variable.
mu.Lock()
atomic.AddInt32(&result, int32(localResult))
mu.Unlock()
}(start, end)
Here, atomic.AddInt32(&result, int32(localResult))
adds the localResult
value to the result
variable atomically, avoiding synchronisation problems without the need to use a mutex.
Parallel Execution in Action: Mapping, Filtering and Reduction
Now, we will examine how mapping, filtering, and reduction operations are implemented in parallel using goroutines, sync.WaitGroup
, sync.Mutex
, and atomic operations. These operations are common in data processing and benefit greatly from parallel programming to improve performance.
Parallel Mapping
Mapping is the process of applying a function to each element of a data set and transforming those elements into new values. In the code provided, parallelMap
performs this operation in parallel.
Implementation
// parallelMap applies a mapping function to each element of a slice in parallel.
func parallelMap(nums []int, mapFunc func(int) int, numWorkers int) []int {
n := len(nums)
// Create a result slice of the same length as the input slice.
result := make([]int, n)
// Calculate the chunk size for each worker.
chunkSize := n / numWorkers
// Create a WaitGroup to synchronize the workers.
var wg sync.WaitGroup
// Start the workers.
for i := 0; i < numWorkers; i++ {
start := i * chunkSize
end := start + chunkSize
if i == numWorkers-1 {
end = n
}
wg.Add(1)
// Each worker applies the mapping function to its chunk of the slice.
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
result[j] = mapFunc(nums[j])
}
}(start, end)
}
// Wait for all workers to finish.
wg.Wait()
return result
}
Explanation
Creating the Result Slice: A
result
slice is created with the same length as thenums
input slice. This slice will store the mapping results.Division of Labour: The chunk size for each goroutine is calculated as
n / numWorkers
. Each goroutine will process a section of thenums
slice, from indexstart
toend
.Launching Goroutines: A goroutine is started for each slice segment. Each goroutine applies the
mapFunc
function to its segment ofnums
and stores the result inresult
.Synchronisation:
sync.WaitGroup
is used to wait for all goroutines to finish.wg.Add(1)
is called before starting each goroutine, andwg.Done()
is called at the end.Wait for Completion:
wg.Wait()
blocks execution until all goroutines have finished their work, ensuring thatresult
is completely filled with mapped values.
Parallel Filtering
Filtering is the process of selecting elements from a dataset that meet a specific condition. In parallelFilter
, this process is done in parallel to improve performance.
Implementation
// parallelFilter filters a slice in parallel using a filtering function.
func parallelFilter(nums []int, filterFunc func(int) bool, numWorkers int) []int {
n := len(nums)
chunkSize := n / numWorkers
var wg sync.WaitGroup
// Create a Mutex to protect the result slice.
var mu sync.Mutex
result := make([]int, 0, n)
for i := 0; i < numWorkers; i++ {
start := i * chunkSize
end := start + chunkSize
if i == numWorkers-1 {
end = n
}
wg.Add(1)
// Each worker filters its chunk of the slice and appends the results to the local result slice.
go func(start, end int) {
defer wg.Done()
localResult := make([]int, 0, end-start)
for j := start; j < end; j++ {
if filterFunc(nums[j]) {
localResult = append(localResult, nums[j])
}
}
// Lock the Mutex and append the local result slice to the global result slice.
mu.Lock()
result = append(result, localResult...)
mu.Unlock()
}(start, end)
}
// Wait for all workers to finish.
wg.Wait()
return result
}
Explanation
Creating the Result Slice: A
result
slice is created which is initially empty. This slice will store the elements that pass the filter.Division of Labour: Each goroutine processes a section of the
nums
slice, applying thefilterFunc
function to select the elements that meet the condition. Accumulation ofLocal Results: Each goroutine maintains a local slice
localResult
to temporarily store the elements that pass the filter. This avoids data contention between goroutines.Mutex Access Protection: After a goroutine completes its filtering, it uses
mu.Lock()
to ensure exclusive access to theresult
slice and aggregate the filtered items.mu.Unlock()
releases the mutex after the operation.Synchronisation:
sync.WaitGroup
is used to wait for all goroutines to finish.wg.Add(1)
is called before starting each goroutine, andwg.Done()
is called at the end.Wait for Completion:
wg.Wait()
ensures thatresult
is completely filled before returning it.
Parallel Reduction
Reduction is the process of combining all elements of a dataset into a single value using a reduce function. In parallelReduce
, this process is performed in parallel.
Implementation
// parallelReduce reduces a slice in parallel using a reduction function.
func parallelReduce(nums []int, reduceFunc func(int, int) int, numWorkers int) int {
n := len(nums)
chunkSize := n / numWorkers
var wg sync.WaitGroup
var mu sync.Mutex
var result int32
for i := 0; i < numWorkers; i++ {
start := i * chunkSize
end := start + chunkSize
if i == numWorkers-1 {
end = n
}
wg.Add(1)
// Each worker reduces its chunk of the slice and atomically adds the result to the global result variable.
go func(start, end int) {
defer wg.Done()
localResult := nums[start]
for j := start + 1; j < end; j++ {
localResult = reduceFunc(localResult, nums[j])
}
// Lock the Mutex and atomically add the local result to the global result variable.
mu.Lock()
atomic.AddInt32(&result, int32(localResult))
mu.Unlock()
}(start, end)
}
// Wait for all workers to finish.
wg.Wait()
return int(result)
}
Explanation
Result initialisation:
result
is a variable of typeint32
that is used to store the final accumulated value. It is initialised with a default value.Division of Labour: Each goroutine processes a segment of the
nums
slice, applying thereduceFunc
function to combine the values into a partial result.Local Result Calculation: Each goroutine maintains a
localResult
variable to accumulate the partial result of its segment. ThereduceFunc
function is applied iteratively to the elements of the segment.Atomic Update: After calculating the partial result,
atomic.AddInt32
is used to addlocalResult
to theresult
variable atomically. This ensures that the update is safe in a concurrent environment without using a mutex for synchronisation.Synchronisation:
sync.WaitGroup
is used to wait for all goroutines to finish.wg.Add(1)
is called before starting each goroutine, andwg.Done()
is called on completion.Wait for Completion:
wg.Wait()
blocks until all goroutines have finished, ensuring thatresult
contains the final accumulated value.
Detailed Explanation of main
main()
is the main function in a Go program, and in this case, it coordinates the execution of parallel processing in the program. In the following, we will break down each part of main to understand how it works and what its purpose is.
- ### Data Generation
// Generate a random slice of 1000000 integers between 1 and 9.
nums := generateRandomSlice(1000000, 1, 9)
fmt.Println("First numbers:", nums[:10])
Data Generation: The
generateRandomSlice
function is used to create a 1,000,000 slice of random integers in the range 1 to 9. This slice simulates a large data set to process.Print First Numbers: A portion of the slice (
nums[:10]
) is printed to show the first 10 numbers generated. This is useful to verify that the data generation was successful.
- ### Measuring Execution Time
// Start measuring execution time.
start := time.Now()
-
Measurement Start:
time.Now()
is used to capture the current time, marking the start of the measurement of parallel processing execution time. This will allow to calculate how long it takes to complete all parallel operations.
- ### Configuring the Number of Workers
// Get the number of CPUs available.
numWorkers := runtime.NumCPU()
-
Number of Workers:
runtime.NumCPU()
returns the number of CPU cores available on the machine. This value is used to define how many workers (goroutines) will be used in parallel processing. Using a number equal to the number of CPU cores helps to optimise performance. In the next section when we run the code we will go more into this.
- ### Application of Parallel Functions
// Apply the mapping function in parallel.
mapped := parallelMap(nums, func(x int) int { return x * 2 }, numWorkers)
-
Application of parallelMap: The
parallelMap
function is called to apply a mapping function to each element of the slice nums. In this case, the mapping function multiplies each number by 2.
// Apply the filtering function in parallel.
filtered := parallelFilter(mapped, func(x int) bool { return x > 10 }, numWorkers)
-
Application of parallelFilter: The
parallelFilter
function is called to filter the mapped slice, keeping only elements greater than 10.
// Apply the reduction function in parallel.
result := parallelReduce(filtered, func(acc, x int) int { return acc + x }, numWorkers)
-
Application of parallelReduce: The
parallelReduce
function is called to reduce the filtered slice by summing all elements. The reduce function takes an accumulator and an element, and returns their sum.
- ### Printout of Results
fmt.Println("Result:", result)
- Print Result: The final result of the reduction is printed. This is the aggregated value of all elements that passed the filter.
- ### Measuring Execution Time
// Stop measuring execution time and print the duration.
duration := time.Since(start)
fmt.Println("Execution time:", duration)
-
End of Measurement:
time.Since(start)
calculates the duration from the start of parallel processing to the current time. This duration is printed to show how long it took to perform all parallel operations.
Examples of results and numWorkers
numWorkers := runtime.NumCPU()
The line numWorkers := runtime.NumCPU()
is a key part of the code that allocates the number of workers to be used to process tasks in parallel. This line uses the runtime.NumCPU()
function, which returns the number of CPU cores available on the system where the program is running. This number is then assigned to numWorkers
, which determines how many goroutines will be created to split the work.
Use Cases and Considerations
- Parallelism Optimisation:
* **CPU Leverage:** By using `runtime.NumCPU()`, the code attempts to take full advantage of the available hardware by allocating one goroutine per CPU core. This can improve performance by distributing the workload across all cores, allowing multiple tasks to run in parallel without saturating a single core.
- Avoid Goroutine Overload:
* **Worker/Task Balance:** If `numWorkers` is equal to or close to `runtime.NumCPU()`, the program is likely to maintain an appropriate balance between the number of workers and system capabilities. However, if this number is significantly exceeded, performance may suffer due to system overhead from creating and managing too many goroutines, which can lead to unnecessary competition for system resources.
- Problems with Small Data Sizes:
* **More Workers than Tasks:** If the number of workers (`numWorkers`) is greater than the number of elements in the array, a problem can occur. For example, if you have an array with 4 elements and `numWorkers` is 8 (assuming the CPU has 8 cores), some goroutines will not have enough elements to process. This can cause incorrect or inefficient behaviour, such as:
* **Resource Waste:** Some goroutines may end up not doing any useful work, which wastes system resources.
* **Incorrect results:** Depending on how the code is structured, if the work is divided into more goroutines than elements in the array, some goroutines might not process any data, which could lead to incorrect results.
- Manual Tuning of numWorkers:
* **Optimisation for Specific Data:** In some cases, it may be beneficial to manually set `numWorkers` instead of using `runtime.NumCPU()`. For example, if you know the array size is small, you can set `numWorkers` to a lower number to avoid over-allocating goroutines. This can result in more efficient code that is easier to debug.
- Behaviour with Very Large Arrays:
* **Equal Division of Labor:** When working with very large arrays, `runtime.NumCPU()` can help ensure that each CPU core gets a reasonable share of the work. This is especially useful in intensive operations where parallelism can significantly reduce execution time.
Practical Example
Suppose you are running code on a system with 8 CPU cores (runtime.NumCPU()
returns 8). If you have an array with 100 elements, and you use numWorkers := 8
, each goroutine will process 12 or 13 elements (100/8 = 12.5). This is usually efficient.
However, if the array has only 4 elements and numWorkers
is still 8, then each goroutine will handle half an element, which is impractical and may cause some goroutines to do nothing, or even throw errors if the code does not handle this scenario correctly.
Execution and result verification
To:
nums := generateRandomSlice(10, 1, 9)
fmt.Println("First numbers:", nums[:10])
// Start measuring execution time.
start := time.Now()
// Get the number of CPUs available.
numWorkers := 3
When running the code with the command go run parallel_map_filter_reduce.go
, with the above parameters set in the main()
, we get in console:
First numbers: [3 6 1 2 8 1 7 2 8 6]
Result: 70
Execution time: 514.3µs
Execution Explanation
- Random Slice Generation:
* `generateRandomSlice(10, 1, 9)` generates a slice of 10 random integers, each within the range of 1 to 9.
* In this particular run, the numbers generated were: `[3, 6, 1, 2, 2, 8, 1, 7, 2, 8, 6]`.
* This line also prints the numbers generated: First numbers: `[3 6 1 2 8 1 7 2 8 6]`.
- Number of Workers:
* We define `numWorkers := 3`, indicating that 3 goroutines (workers) will be used to process the slice in parallel.
- Execution Result:
* The code processed the generated slice using the parallel mapping, filtering and reduction functions.
* The final result was `70`.
Checking the Result
Let's check the result step by step:
- Mapping (Multiplication by 2):
* The numbers `[3, 6, 1, 2, 2, 8, 1, 7, 2, 8, 6]` are mapped to `[6, 12, 2, 4, 16, 2, 14, 4, 16, 12]`.
- Filtering (Greater than 10):
* Numbers greater than 10 are filtered out, leaving `[12, 16, 14, 16, 16, 12]`.
- Reduction (Sum):
* The sum of the filtered numbers is `12 + 16 + 14 + 16 + 16 + 12 = 70`.
The result obtained (70
) is correct, as it corresponds to the sum of the values that passed the filter after being mapped.
Warning about numWorkers
When using numWorkers
, it is important to choose an appropriate value for the number of workers in relation to the size of the slice. Here are some warnings:
Too many Workers: if
numWorkers
is greater than the number of elements in the slice, some workers will have no work, which could cause parallelism to be inefficient. This can also lead to problems if the code does not handle these cases correctly, as happened in the example above with an incorrect result.Too Few Workers: If the number of workers is too low, parallelism will not be properly exploited, which could cause the program to run slower than if more workers were used.
In this case, with numWorkers := 3
for a 10-element slice, each worker processes approximately 3-4 elements, which is reasonable and results in correct processing.
Conclusion
We have explored the fascinating world of parallel computing in Go, focusing on how to take advantage of the language's features to improve the efficiency and performance of our applications. Through a practical example that performs parallel mapping, filtering and reduction operations, we have seen how goroutines, WaitGroups and Mutexes can be used to handle concurrent tasks and synchronise access to shared resources.
How would you improve this code to make it even more efficient or robust?
Your perspective can help to further optimise the code and adapt it to new situations.
I look forward to hearing from you in the comments.
You may be interested:
Top comments (0)