DEV Community

Stella Achar Oiro
Stella Achar Oiro

Posted on

How to Build Real-Time Analytics Systems with Go: A Complete Guide

Real-time analytics has become essential for businesses seeking to extract immediate insights from streaming data. In this guide, we'll walk through how to build a scalable, high-performance real-time analytics system using Go, with a special focus on AI applications.

By the end of this guide, you'll understand how to:

  • Set up efficient data streaming pipelines in Go
  • Implement key stream processing patterns
  • Connect your system to time-series databases
  • Apply memory-efficient algorithms for online learning
  • Optimize your system for maximum performance

Prerequisites

Before getting started, make sure you have:

  • Go 1.16+ installed on your system
  • Basic familiarity with Go programming
  • A development environment set up (VS Code, GoLand, or your preferred IDE)
  • Docker (optional, for running databases locally)

1. Setting Up Your Go Environment for Real-Time Streaming

1.1 Create Your Project Structure

Start by creating a new Go module:

mkdir go-streaming-analytics
cd go-streaming-analytics
go mod init github.com/yourusername/go-streaming-analytics
Enter fullscreen mode Exit fullscreen mode

1.2 Adding Essential Dependencies

go get -u github.com/influxdata/influxdb-client-go/v2
go get -u github.com/gorilla/websocket
go get -u github.com/prometheus/client_golang/prometheus
Enter fullscreen mode Exit fullscreen mode

1.3 Creating a Basic Stream Processor

Create a file named main.go with this skeleton:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // Setup signal handling for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

    // Start your streaming components here

    log.Println("Stream processor started. Press Ctrl+C to exit.")

    // Wait for termination signal
    <-signalChan
    log.Println("Shutdown signal received, initiating graceful shutdown...")

    // Add a timeout for shutdown
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer shutdownCancel()

    // Perform cleanup

    log.Println("Shutdown complete")
}
Enter fullscreen mode Exit fullscreen mode

2. Implementing Concurrent Data Streams with Go

Go's concurrency model makes it ideal for handling multiple data streams. Let's implement a basic streaming pipeline.

2.1 Defining Your Data Models

Create a file called models.go:

package main

import "time"

// DataPoint represents a single measurement from a data source
type DataPoint struct {
    Timestamp time.Time
    SourceID  string
    Value     float64
    Metadata  map[string]string
}

// ProcessedResult represents an analyzed data point
type ProcessedResult struct {
    OriginalPoint DataPoint
    AnomalyScore  float64
    Prediction    float64
    ProcessedAt   time.Time
}
Enter fullscreen mode Exit fullscreen mode

2.2 Building a Basic Pipeline

Now let's implement a simple processing pipeline in pipeline.go:

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

// Pipeline stage function types
type SourceFunc func(ctx context.Context, out chan<- DataPoint)
type ProcessFunc func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult)
type SinkFunc func(ctx context.Context, in <-chan ProcessedResult)

// Pipeline represents a complete data processing pipeline
type Pipeline struct {
    source      SourceFunc
    processors  []ProcessFunc
    sink        SinkFunc
    bufferSize  int
    concurrency int
}

// NewPipeline creates a new processing pipeline
func NewPipeline(source SourceFunc, sink SinkFunc, bufferSize, concurrency int) *Pipeline {
    return &Pipeline{
        source:      source,
        sink:        sink,
        bufferSize:  bufferSize,
        concurrency: concurrency,
        processors:  make([]ProcessFunc, 0),
    }
}

// AddProcessor adds a processing stage to the pipeline
func (p *Pipeline) AddProcessor(processor ProcessFunc) {
    p.processors = append(p.processors, processor)
}

// Run starts the pipeline and blocks until the context is canceled
func (p *Pipeline) Run(ctx context.Context) error {
    // Create channels
    sourceChannel := make(chan DataPoint, p.bufferSize)

    // Create intermediate channels for each processor
    channels := make([]chan ProcessedResult, len(p.processors)+1)
    for i := range channels {
        channels[i] = make(chan ProcessedResult, p.bufferSize)
    }

    // WaitGroup to track all goroutines
    var wg sync.WaitGroup

    // Start source
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(sourceChannel)
        p.source(ctx, sourceChannel)
    }()

    // Start processors (with fan-out for each stage)
    for i, processor := range p.processors {
        // Input is either source channel (for first processor) or previous processor's output
        var input chan DataPoint
        if i == 0 {
            input = sourceChannel
        } else {
            // We'd need to convert ProcessedResult back to DataPoint in a real implementation
            // This is simplified for the example
        }

        output := channels[i]

        // Fan out each processor to multiple goroutines
        for j := 0; j < p.concurrency; j++ {
            wg.Add(1)
            go func(proc ProcessFunc, in <-chan DataPoint, out chan<- ProcessedResult) {
                defer wg.Done()
                proc(ctx, in, out)
            }(processor, input, output)
        }
    }

    // Start sink
    wg.Add(1)
    go func() {
        defer wg.Done()
        p.sink(ctx, channels[len(channels)-1])
    }()

    // Wait for completion or cancellation
    go func() {
        wg.Wait()
        // Close the final channel when all processing is done
        close(channels[len(channels)-1])
    }()

    // Wait for context cancellation
    <-ctx.Done()
    log.Println("Pipeline shutdown initiated")

    return ctx.Err()
}
Enter fullscreen mode Exit fullscreen mode

2.3 Creating a Sample Data Source

Let's implement a simple data generator in source.go:

package main

import (
    "context"
    "math/rand"
    "time"
)

// RandomDataSource generates random data points at a specified interval
func RandomDataSource(interval time.Duration, sourceID string) SourceFunc {
    return func(ctx context.Context, out chan<- DataPoint) {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case t := <-ticker.C:
                out <- DataPoint{
                    Timestamp: t,
                    SourceID:  sourceID,
                    Value:     rand.Float64() * 100.0, // Random value between 0-100
                    Metadata: map[string]string{
                        "type": "random",
                        "unit": "percentage",
                    },
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.4 Implementing a Processor

Now, let's create a simple anomaly detector in processor.go:

package main

import (
    "context"
    "math"
    "time"
)

// SimpleAnomalyDetector detects values that deviate significantly from recent history
func SimpleAnomalyDetector(windowSize int, threshold float64) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Store recent values in a circular buffer
        buffer := make([]float64, windowSize)
        bufferIndex := 0
        bufferFilled := 0

        for {
            select {
            case <-ctx.Done():
                return
            case dataPoint, ok := <-in:
                if !ok {
                    // Channel closed
                    return
                }

                // Add value to buffer
                buffer[bufferIndex] = dataPoint.Value
                bufferIndex = (bufferIndex + 1) % windowSize
                if bufferFilled < windowSize {
                    bufferFilled++
                }

                // Calculate average and standard deviation
                var sum, sumSquared float64
                for i := 0; i < bufferFilled; i++ {
                    sum += buffer[i]
                    sumSquared += buffer[i] * buffer[i]
                }

                mean := sum / float64(bufferFilled)
                variance := (sumSquared / float64(bufferFilled)) - (mean * mean)
                stdDev := math.Sqrt(variance)

                // Calculate z-score (how many standard deviations from mean)
                zScore := 0.0
                if stdDev > 0 {
                    zScore = math.Abs(dataPoint.Value - mean) / stdDev
                }

                // Output result
                out <- ProcessedResult{
                    OriginalPoint: dataPoint,
                    AnomalyScore:  zScore,
                    Prediction:    mean, // Simple prediction is just the mean
                    ProcessedAt:   time.Now(),
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.5 Creating a Data Sink

Finally, let's implement a simple sink that logs the results in sink.go:

package main

import (
    "context"
    "log"
)

// LoggingSink logs processed results to stdout
func LoggingSink() SinkFunc {
    return func(ctx context.Context, in <-chan ProcessedResult) {
        for {
            select {
            case <-ctx.Done():
                return
            case result, ok := <-in:
                if !ok {
                    // Channel closed
                    return
                }

                // Log results
                if result.AnomalyScore > 2.0 {
                    log.Printf("ANOMALY DETECTED: Source: %s, Value: %.2f, Score: %.2f",
                        result.OriginalPoint.SourceID, result.OriginalPoint.Value, result.AnomalyScore)
                } else {
                    log.Printf("Data: Source: %s, Value: %.2f, Prediction: %.2f",
                        result.OriginalPoint.SourceID, result.OriginalPoint.Value, result.Prediction)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.6 Putting It All Together

Now, update main.go to use our pipeline components:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // Setup signal handling for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

    // Create the pipeline
    pipeline := NewPipeline(
        RandomDataSource(100*time.Millisecond, "sensor-1"),
        LoggingSink(),
        100,  // Buffer size
        3,    // Concurrency level
    )

    // Add processors
    pipeline.AddProcessor(SimpleAnomalyDetector(50, 2.0))

    // Run the pipeline in a goroutine
    go func() {
        if err := pipeline.Run(ctx); err != nil && err != context.Canceled {
            log.Printf("Pipeline error: %v", err)
        }
    }()

    log.Println("Stream processor started. Press Ctrl+C to exit.")

    // Wait for termination signal
    <-signalChan
    log.Println("Shutdown signal received, initiating graceful shutdown...")

    // Cancel the context to signal all components to shut down
    cancel()

    // Give components time to shut down gracefully
    time.Sleep(2 * time.Second)

    log.Println("Shutdown complete")
}
Enter fullscreen mode Exit fullscreen mode

3. Implementing Key Stream Processing Patterns

3.1 Window Operations for Time-Based Analysis

Create a file called window.go to implement time-based windowing:

package main

import (
    "context"
    "time"
)

// Window represents a time window of data points
type Window struct {
    StartTime time.Time
    EndTime   time.Time
    Points    []DataPoint
}

// WindowedProcessor buffers data into time windows and processes them as a batch
func WindowedProcessor(windowDuration time.Duration, processFn func(Window) []ProcessedResult) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        var currentWindow Window
        currentWindow.StartTime = time.Now()
        currentWindow.EndTime = currentWindow.StartTime.Add(windowDuration)

        ticker := time.NewTicker(windowDuration)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                // Process any remaining points before shutting down
                results := processFn(currentWindow)
                for _, result := range results {
                    out <- result
                }
                return

            case <-ticker.C:
                // Window time is up, process points and create new window
                results := processFn(currentWindow)
                for _, result := range results {
                    out <- result
                }

                now := time.Now()
                currentWindow = Window{
                    StartTime: now,
                    EndTime:   now.Add(windowDuration),
                    Points:    make([]DataPoint, 0),
                }

            case point, ok := <-in:
                if !ok {
                    return
                }

                // Add point to current window
                currentWindow.Points = append(currentWindow.Points, point)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3.2 Implementing the Fan-Out/Fan-In Pattern

Create a file called fanout.go:

package main

import (
    "context"
    "sync"
)

// FanOutFanIn distributes work across multiple workers and combines results
func FanOutFanIn(workerFunc func(DataPoint) ProcessedResult, workerCount int) ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Create a WaitGroup to track workers
        var wg sync.WaitGroup

        // Create a channel for collecting results from workers
        results := make(chan ProcessedResult, workerCount)

        // Start the result collector
        go func() {
            for result := range results {
                select {
                case <-ctx.Done():
                    return
                case out <- result:
                    // Result sent downstream
                }
            }
        }()

        // Start workers
        for i := 0; i < workerCount; i++ {
            wg.Add(1)
            go func(workerID int) {
                defer wg.Done()

                for {
                    select {
                    case <-ctx.Done():
                        return
                    case data, ok := <-in:
                        if !ok {
                            return
                        }

                        // Process the data and send result
                        result := workerFunc(data)

                        select {
                        case <-ctx.Done():
                            return
                        case results <- result:
                            // Result sent to collector
                        }
                    }
                }
            }(i)
        }

        // Wait for all workers to complete and close the results channel
        go func() {
            wg.Wait()
            close(results)
        }()
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Integrating with Time-Series Databases

4.1 Setting Up InfluxDB

First, let's create a Docker Compose file for InfluxDB:

# docker-compose.yml
version: '3'
services:
  influxdb:
    image: influxdb:2.0
    ports:
      - "8086:8086"
    volumes:
      - influxdb-data:/var/lib/influxdb2
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=mypassword123
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=metrics
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=myadmintoken

volumes:
  influxdb-data:
Enter fullscreen mode Exit fullscreen mode

Start InfluxDB with:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

4.2 Creating an InfluxDB Sink

Now, let's create a sink that writes to InfluxDB in influxdb_sink.go:

package main

import (
    "context"
    "log"
    "time"

    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

// InfluxDBSink writes processed results to InfluxDB
func InfluxDBSink(serverURL, token, org, bucket string, batchSize int) SinkFunc {
    return func(ctx context.Context, in <-chan ProcessedResult) {
        // Create InfluxDB client
        client := influxdb2.NewClient(serverURL, token)
        defer client.Close()

        // Get non-blocking write API
        writeAPI := client.WriteAPI(org, bucket)

        // Listen for write errors
        errorsCh := writeAPI.Errors()
        go func() {
            for err := range errorsCh {
                log.Printf("InfluxDB write error: %s", err.Error())
            }
        }()

        // Process incoming results
        batch := make([]ProcessedResult, 0, batchSize)
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        flushBatch := func() {
            for _, result := range batch {
                // Create InfluxDB point
                point := influxdb2.NewPoint(
                    "sensor_data",
                    map[string]string{
                        "source_id": result.OriginalPoint.SourceID,
                    },
                    map[string]interface{}{
                        "value":         result.OriginalPoint.Value,
                        "anomaly_score": result.AnomalyScore,
                        "prediction":    result.Prediction,
                    },
                    result.OriginalPoint.Timestamp,
                )

                // Write asynchronously
                writeAPI.WritePoint(point)
            }

            // Clear the batch
            batch = batch[:0]
        }

        for {
            select {
            case <-ctx.Done():
                // Flush any remaining points
                if len(batch) > 0 {
                    flushBatch()
                }
                // Ensure pending writes are flushed
                writeAPI.Flush()
                return

            case result, ok := <-in:
                if !ok {
                    // Channel closed
                    if len(batch) > 0 {
                        flushBatch()
                    }
                    writeAPI.Flush()
                    return
                }

                // Add to batch
                batch = append(batch, result)

                // Flush if batch is full
                if len(batch) >= batchSize {
                    flushBatch()
                }

            case <-ticker.C:
                // Time-based flush
                if len(batch) > 0 {
                    flushBatch()
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4.3 Querying Data from InfluxDB

Create a file called query.go with functions to retrieve analytics data:

package main

import (
    "context"
    "fmt"
    "time"

    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

// TimeSeriesData represents a time series of values
type TimeSeriesData struct {
    Times  []time.Time
    Values []float64
}

// QueryRecentData retrieves recent data from InfluxDB
func QueryRecentData(serverURL, token, org, bucket, measurement string, duration time.Duration) (*TimeSeriesData, error) {
    // Create client
    client := influxdb2.NewClient(serverURL, token)
    defer client.Close()

    // Get query client
    queryAPI := client.QueryAPI(org)

    // Create Flux query
    query := fmt.Sprintf(`
        from(bucket: "%s")
          |> range(start: -%s)
          |> filter(fn: (r) => r._measurement == "%s")
          |> filter(fn: (r) => r._field == "value")
    `, bucket, duration.String(), measurement)

    // Run query
    result, err := queryAPI.Query(context.Background(), query)
    if err != nil {
        return nil, fmt.Errorf("query error: %w", err)
    }
    defer result.Close()

    // Process results
    data := &TimeSeriesData{
        Times:  make([]time.Time, 0),
        Values: make([]float64, 0),
    }

    for result.Next() {
        data.Times = append(data.Times, result.Record().Time())
        value := result.Record().Value()
        if v, ok := value.(float64); ok {
            data.Values = append(data.Values, v)
        }
    }

    // Check for query errors
    if result.Err() != nil {
        return nil, fmt.Errorf("result error: %w", result.Err())
    }

    return data, nil
}
Enter fullscreen mode Exit fullscreen mode

5. Implementing Memory-Efficient Algorithms for Online Learning

5.1 Creating an Online Learning Model

Create a file called online_learning.go for incremental model updates:

package main

import (
    "math"
)

// OnlineStats keeps statistics that update incrementally
type OnlineStats struct {
    Count  int64
    Mean   float64
    M2     float64 // Sum of squared differences from the mean
    Min    float64
    Max    float64
}

// NewOnlineStats creates a new online statistics tracker
func NewOnlineStats() *OnlineStats {
    return &OnlineStats{
        Min: math.Inf(1),
        Max: math.Inf(-1),
    }
}

// Update adds a value and updates statistics
func (s *OnlineStats) Update(value float64) {
    // Update count
    s.Count++

    // Update min/max
    if value < s.Min {
        s.Min = value
    }
    if value > s.Max {
        s.Max = value
    }

    // For the first value, just set the mean
    if s.Count == 1 {
        s.Mean = value
        return
    }

    // Update mean and variance using Welford's algorithm
    delta := value - s.Mean
    s.Mean += delta / float64(s.Count)
    delta2 := value - s.Mean
    s.M2 += delta * delta2
}

// Variance returns the current variance
func (s *OnlineStats) Variance() float64 {
    if s.Count < 2 {
        return 0
    }
    return s.M2 / float64(s.Count)
}

// StdDev returns the current standard deviation
func (s *OnlineStats) StdDev() float64 {
    return math.Sqrt(s.Variance())
}

// ZScore calculates how many standard deviations a value is from the mean
func (s *OnlineStats) ZScore(value float64) float64 {
    stdDev := s.StdDev()
    if stdDev == 0 {
        return 0
    }
    return (value - s.Mean) / stdDev
}

// Normalize returns a value scaled to [0,1] range based on observed min/max
func (s *OnlineStats) Normalize(value float64) float64 {
    if s.Max == s.Min {
        return 0.5
    }
    return (value - s.Min) / (s.Max - s.Min)
}
Enter fullscreen mode Exit fullscreen mode

5.2 Creating an Online Learning Processor

Now, let's use our online learning model in a processor:

package main

import (
    "context"
    "math"
    "sync"
)

// OnlineLearningProcessor uses online statistics to detect anomalies
func OnlineLearningProcessor() ProcessFunc {
    return func(ctx context.Context, in <-chan DataPoint, out chan<- ProcessedResult) {
        // Map to track stats for each source
        statsBySource := make(map[string]*OnlineStats)
        var mu sync.RWMutex

        for {
            select {
            case <-ctx.Done():
                return
            case point, ok := <-in:
                if !ok {
                    return
                }

                // Get or create stats tracker for this source
                mu.RLock()
                stats, exists := statsBySource[point.SourceID]
                mu.RUnlock()

                if !exists {
                    stats = NewOnlineStats()
                    mu.Lock()
                    statsBySource[point.SourceID] = stats
                    mu.Unlock()
                }

                // Calculate anomaly score before updating model
                zScore := stats.ZScore(point.Value)
                anomalyScore := math.Abs(zScore)

                // Update the model with the new data point
                stats.Update(point.Value)

                // Create result
                result := ProcessedResult{
                    OriginalPoint: point,
                    AnomalyScore:  anomalyScore,
                    Prediction:    stats.Mean,
                    ProcessedAt:   point.Timestamp,
                }

                out <- result
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Optimizing Performance for Real-Time Analytics

6.1 Implementing a Memory Pool

Create a file called memory_pool.go:

package main

import "sync"

// DataBuffer represents a reusable buffer
type DataBuffer struct {
    Data []byte
}

// BufferPool manages a pool of reusable buffers
type BufferPool struct {
    pool      sync.Pool
    bufferSize int
}

// NewBufferPool creates a new buffer pool
func NewBufferPool(bufferSize int) *BufferPool {
    return &BufferPool{
        pool: sync.Pool{
            New: func() interface{} {
                return &DataBuffer{
                    Data: make([]byte, bufferSize),
                }
            },
        },
        bufferSize: bufferSize,
    }
}

// Get retrieves a buffer from the pool
func (p *BufferPool) Get() *DataBuffer {
    return p.pool.Get().(*DataBuffer)
}

// Put returns a buffer to the pool
func (p *BufferPool) Put(buffer *DataBuffer) {
    // Clear for security (optional)
    for i := range buffer.Data {
        buffer.Data[i] = 0
    }
    p.pool.Put(buffer)
}
Enter fullscreen mode Exit fullscreen mode

6.2 Implementing a Ring Buffer

Create a file called ring_buffer.go:

package main

import (
    "errors"
    "sync"
)

var (
    ErrBufferFull  = errors.New("ring buffer: full")
    ErrBufferEmpty = errors.New("ring buffer: empty")
)

// RingBuffer is a thread-safe circular buffer
type RingBuffer struct {
    buffer []interface{}
    size   int
    mu     sync.Mutex
    read   int // Read position
    write  int // Write position
    count  int // Number of elements in buffer
}

// NewRingBuffer creates a new ring buffer with the given size
func NewRingBuffer(size int) *RingBuffer {
    return &RingBuffer{
        buffer: make([]interface{}, size),
        size:   size,
    }
}

// Push adds an item to the buffer
func (rb *RingBuffer) Push(item interface{}) error {
    rb.mu.Lock()
    defer rb.mu.Unlock()

    if rb.count == rb.size {
        return ErrBufferFull
    }

    rb.buffer[rb.write] = item
    rb.write = (rb.write + 1) % rb.size
    rb.count++

    return nil
}

// Pop removes and returns an item from the buffer
func (rb *RingBuffer) Pop() (interface{}, error) {
    rb.mu.Lock()
    defer rb.mu.Unlock()

    if rb.count == 0 {
        return nil, ErrBufferEmpty
    }

    item := rb.buffer[rb.read]
    rb.buffer[rb.read] = nil // Help GC
    rb.read = (rb.read + 1) % rb.size
    rb.count--

    return item, nil
}

// Len returns the current number of items in the buffer
func (rb *RingBuffer) Len() int {
    rb.mu.Lock()
    defer rb.mu.Unlock()
    return rb.count
}

// Cap returns the capacity of the buffer
func (rb *RingBuffer) Cap() int {
    return rb.size
}
Enter fullscreen mode Exit fullscreen mode

6.3 Setting Up Monitoring

Create a file called monitoring.go:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

// Metrics holds Prometheus metrics for the application
type Metrics struct {
    ProcessedCounter   *prometheus.CounterVec
    AnomalyCounter     *prometheus.CounterVec
    ProcessingDuration *prometheus.HistogramVec
    QueueSize          *prometheus.GaugeVec
}

// NewMetrics creates and registers metrics
func NewMetrics(registry *prometheus.Registry) *Metrics {
    m := &Metrics{
        ProcessedCounter: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "stream_processed_total",
                Help: "Total number of processed data points",
            },
            []string{"source_id"},
        ),
        AnomalyCounter: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "stream_anomalies_total",
                Help: "Total number of detected anomalies",
            },
            []string{"source_id"},
        ),
        ProcessingDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "stream_processing_duration_seconds",
                Help:    "Duration of processing operations",
                Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // From 1ms to ~1s
            },
            []string{"operation"},
        ),
        QueueSize: prometheus.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "stream_queue_size",
                Help: "Current number of items in processing queues",
            },
            []string{"queue"},
        ),
    }

    registry.MustRegister(
        m.ProcessedCounter,
        m.AnomalyCounter,
        m.ProcessingDuration,
        m.QueueSize,
    )

    return m
}

// StartMetricsServer starts an HTTP server for Prometheus metrics
func StartMetricsServer(addr string, registry *prometheus.Registry) {
    http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
    go func() {
        http.ListenAndServe(addr, nil)
    }()
}
Enter fullscreen mode Exit fullscreen mode

7. Deploying Your System

7.1 Building the Application

Create a Dockerfile:

FROM golang:1.18-alpine AS builder

WORKDIR /app

# Copy go mod files
COPY go.mod go.sum ./
RUN go mod download

# Copy source code
COPY *.go ./

# Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -o /go-streaming-analytics

# Create final minimal image
FROM alpine:latest

WORKDIR /app

COPY --from=builder /go-streaming-analytics /app/

CMD ["/app/go-streaming-analytics"]
Enter fullscreen mode Exit fullscreen mode

Build and run with Docker:

docker build -t go-streaming-analytics .
docker run -p 8080:8080 -p 9090:9090 go-streaming-analytics
Enter fullscreen mode Exit fullscreen mode

7.2 Configuration Management

Create a file called config.go:

package main

import (
    "encoding/json"
    "os"
    "time"
)

// Config holds application configuration
type Config struct {
    ServerPort      string        `json:"server_port"`
    MetricsPort     string        `json:"metrics_port"`
    BufferSize      int           `json:"buffer_size"`
    ConcurrencyLevel int          `json:"concurrency_level"`
    WindowDuration  time.Duration `json:"window_duration"`
    InfluxDB        struct {
        URL     string `json:"url"`
        Token   string `json:"token"`
        Org     string `json:"org"`
        Bucket  string `json:"bucket"`
    } `json:"influxdb"`
}

// LoadConfig loads configuration from a JSON file
func LoadConfig(path string) (*Config, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    config := &Config{}
    decoder := json.NewDecoder(file)
    if err := decoder.Decode(config); err != nil {
        return nil, err
    }

    return config, nil
}

// DefaultConfig returns a default configuration
func DefaultConfig() *Config {
    config := &Config{
        ServerPort:      "8080",
        MetricsPort:     "9090",
        BufferSize:      1000,
        ConcurrencyLevel: 4,
        WindowDuration:  10 * time.Second,
    }

    config.InfluxDB.URL = "http://localhost:8086"
    config.InfluxDB.Token = "myadmintoken"
    config.InfluxDB.Org = "myorg"
    config.InfluxDB.Bucket = "metrics"

    return config
}
Enter fullscreen mode Exit fullscreen mode

Conclusion and Next Steps

You've now built a complete real-time analytics pipeline in Go that can:

  • Process streaming data concurrently
  • Implement various stream processing patterns
  • Connect to time-series databases
  • Apply online learning algorithms
  • Optimize for performance

To extend this system, consider:

  1. Adding more sophisticated ML models
  2. Implementing stream joins for correlating multiple data sources
  3. Adding a real-time visualization layer
  4. Implementing fault tolerance with save points
  5. Scaling out to a distributed architecture

How Berrijam AI Leverages These Principles

While Berrijam AI uses a different technology stack, the principles demonstrated in this guide are similar to those that power their real-time analytics platform. Berrijam's system incorporates:

  • Efficient stream processing for immediate insights
  • Memory-optimized algorithms for online learning and prediction
  • Seamless integration with time-series data storage
  • Highly concurrent processing architecture

These capabilities allow Berrijam AI to provide instantaneous insights from streaming data, enabling businesses to make data-driven decisions based on the most current information available.

To learn more about how Berrijam AI can help your organization implement real-time analytics solutions without the complexity of building them yourself, visit Berrijam or contact their solutions team for a personalized demonstration.

Top comments (0)