DEV Community

Saleh Rahimzadeh
Saleh Rahimzadeh

Posted on

Data Processing Pipeline in Go (Golang)

Data Processing Pipeline in Go (Golang)

A data processing pipeline is a series of stages or steps where data is processed sequentially.
Each stage performs a specific operation on the data, and the output of one stage becomes the input of the next.
This pattern is commonly used in scenarios like ETL (Extract, Transform, Load), stream processing, or batch processing.

In Go (Golang), pipelines are often implemented using channels and goroutines, which are core features of the language for concurrent programming.
Channels allow you to pass data between stages, and goroutines enable concurrent execution of each stage.


Key Concepts of a Data Processing Pipeline in Go

  1. Stages:

    • Each stage is a function that takes input data, processes it, and produces output data.
    • Stages are connected via channels.
  2. Channels:

    • Channels are used to pass data between stages.
    • They ensure safe communication between goroutines.
  3. Goroutines:

    • Each stage can run concurrently as a goroutine.
    • This allows for efficient utilization of CPU and I/O resources.
  4. Fan-Out and Fan-In:

    • Fan-Out: Distributing work across multiple goroutines (parallelism).
    • Fan-In: Combining results from multiple goroutines into a single channel.

Simple Data Processing Pipeline

Let’s build a simple pipeline that:

  1. Generates numbers.
  2. Squares the numbers.
  3. Prints the squared numbers.
package main

import "fmt"

// Stage 1: Generate numbers
func generate(count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range count {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Stage 3: Print numbers
func print(in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

func main() {
    // Set up the pipeline
    numbers := generate(5)
    squaredNumbers := square(numbers)
    print(squaredNumbers)
}
Enter fullscreen mode Exit fullscreen mode

Explanation of the Code

  1. generate function:

    • Takes a counter integer, range over it and sends them into a channel.
    • Runs in a goroutine to avoid blocking the main program.
  2. square function:

    • Reads numbers from the input channel, squares them, and sends the result to the output channel.
    • Also runs in a goroutine.
  3. print function:

    • Reads squared numbers from the input channel and prints them.
  4. Pipeline Setup:

    • The generate function produces numbers.
    • The square function processes them.
    • The print function consumes the final output.

Adding Concurrency with Fan-Out and Fan-In

To make the pipeline more efficient, you can introduce fan-out (multiple goroutines processing data in parallel) and fan-in (combining results back into a single channel) patterns.

Example: Fan-Out and Fan-In

package main

import (
    "fmt"
    "sync"
)

// Stage 1: Generate numbers
func generate(count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range count {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square numbers (with fan-out)
func square(in <-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // Fan-out: Start multiple workers
    worker := func() {
        defer wg.Done()
        for n := range in {
            out <- n * n
        }
    }

    // Start 3 workers
    wg.Add(3)
    go worker()
    go worker()
    go worker()

    // Close the output channel when all workers are done
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// Stage 3: Print numbers
func print(in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

func main() {
    // Set up the pipeline
    numbers := generate(5)
    squaredNumbers := square(numbers)
    print(squaredNumbers)
}
Enter fullscreen mode Exit fullscreen mode

Key Points in the Fan-Out/Fan-In Example

  1. Fan-Out:

    • Multiple goroutines (worker) are started to process data concurrently.
    • This is useful when the processing stage is CPU-intensive or involves I/O operations.
  2. Fan-In:

    • The sync.WaitGroup ensures that the output channel is closed only after all workers are done.
    • This combines the results from multiple goroutines into a single channel.
  3. Scalability:

    • You can adjust the number of workers based on the available resources (e.g., CPU cores).

Best Practices for Data Processing Pipelines in Go

  1. Use Buffered Channels:
    If one stage is slower than others, use buffered channels to avoid blocking.

  2. Graceful Shutdown:
    Use context.Context to handle cancellation and timeouts gracefully.

  3. Error Handling:
    Propagate errors through channels or use a separate error channel.

  4. Resource Management:
    Ensure channels are properly closed to avoid goroutine leaks.

  5. Testing:
    Test each stage independently to ensure correctness.

Example with Error Handling and Context:

package main

import (
    "context"
    "fmt"
    "time"
)

// Stage 1: Generate numbers
func generate(ctx context.Context, count int) (<-chan int, <-chan error) {
    out := make(chan int)
    errCh := make(chan error, 1)
    go func() {
        defer close(out)
        defer close(errCh)
        for n := range count {
            select {
            case out <- n:
            case <-ctx.Done():
                errCh <- ctx.Err()
                return
            }
        }
    }()
    return out, errCh
}

// Stage 2: Square numbers
func square(ctx context.Context, in <-chan int) (<-chan int, <-chan error) {
    out := make(chan int)
    errCh := make(chan error, 1)
    go func() {
        defer close(out)
        defer close(errCh)
        for n := range in {
            select {
            case out <- n * n:
            case <-ctx.Done():
                errCh <- ctx.Err()
                return
            }
        }
    }()
    return out, errCh
}

// Stage 3: Print numbers
func print(ctx context.Context, in <-chan int) <-chan error {
    errCh := make(chan error, 1)
    go func() {
        defer close(errCh)
        for n := range in {
            select {
            case <-ctx.Done():
                errCh <- ctx.Err()
                return
            default:
                fmt.Println(n)
            }
        }
    }()
    return errCh
}

func main() {
    const timeout = 2 * time.Second

    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    // Set up the pipeline
    numbers, genErr := generate(ctx, 5)
    squaredNumbers, squareErr := square(ctx, numbers)
    printErr := print(ctx, squaredNumbers)

    // Handle errors
    select {
    case err := <-genErr:
        fmt.Println("Error in generate:", err)
    case err := <-squareErr:
        fmt.Println("Error in square:", err)
    case err := <-printErr:
        fmt.Println("Error in print:", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

This example adds error handling and context cancellation to ensure the pipeline can handle errors and timeouts gracefully.

Top comments (0)