Forem

Jones Charles
Jones Charles

Posted on

Real-time Updates Made Easy: Building Server-Sent Events with GoFrame 🚀

Hey there, fellow developers! 👋 Ever needed to add real-time updates to your Go application but found WebSockets a bit too complex for your needs? Enter Server-Sent Events (SSE) - a simpler alternative that's perfect for one-way server-to-client communication.

In this guide, I'll walk you through implementing SSE using GoFrame, taking you from basic implementation all the way to production-ready code. Let's dive in!

What are Server-Sent Events? 🤔

SSE is a standard that enables servers to push real-time updates to clients over HTTP. Unlike WebSocket, SSE:

  • Is one-way (server to client only)
  • Uses standard HTTP
  • Automatically reconnects if the connection is lost
  • Is simpler to implement

Perfect for: real-time notifications, live feeds, status updates, and monitoring dashboards!

Getting Started: Basic SSE Implementation 🌱

Let's start with a simple example. Here's how to create your first SSE endpoint in GoFrame:

func SseHandler(r *ghttp.Request) {
    // Set SSE headers
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")
    r.Response.Header().Set("Access-Control-Allow-Origin", "*")

    // Create message channel
    messageChan := make(chan string)
    defer close(messageChan)

    // Send updates every 2 seconds
    go func() {
        for {
            message := fmt.Sprintf("Current time: %s", gtime.Now().String())
            messageChan <- message
            time.Sleep(time.Second * 2)
        }
    }()

    // Write to client
    for {
        select {
        case message := <-messageChan:
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", message)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Register the route:

func main() {
    s := g.Server()
    s.BindHandler("/sse", SseHandler)
    s.Run()
}
Enter fullscreen mode Exit fullscreen mode

And here's how to connect from the frontend:

const eventSource = new EventSource('http://localhost:8080/sse');

eventSource.onmessage = function(event) {
    console.log('Got update:', event.data);
};

eventSource.onerror = function(error) {
    console.error('Connection error:', error);
    eventSource.close();
};
Enter fullscreen mode Exit fullscreen mode

Real-World Examples: Let's Build Something Cool! 🛠️

1. Live Stock Ticker 📈

Let's build something more practical - a real-time stock price feed:

type StockPrice struct {
    Symbol string  `json:"symbol"`
    Price  float64 `json:"price"`
    Time   string  `json:"time"`
}

func StockPriceHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    messageChan := make(chan StockPrice)
    defer close(messageChan)

    // Simulate stock updates
    go func() {
        stocks := []string{"AAPL", "GOOGL", "MSFT"}
        for {
            for _, symbol := range stocks {
                price := StockPrice{
                    Symbol: symbol,
                    Price:  rand.Float64() * 1000,
                    Time:   gtime.Now().String(),
                }
                messageChan <- price
            }
            time.Sleep(time.Second * 3)
        }
    }()

    for {
        select {
        case price := <-messageChan:
            data, err := gjson.Encode(price)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Real-time Chat Room Status 💬

Monitor active users and typing indicators in a chat room:

type ChatStatus struct {
    ActiveUsers    int      `json:"activeUsers"`
    TypingUsers    []string `json:"typingUsers"`
    LastActivity   string   `json:"lastActivity"`
}

func ChatStatusHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statusChan := make(chan ChatStatus)
    defer close(statusChan)

    // Track room status
    roomStatus := ChatStatus{
        ActiveUsers: 0,
        TypingUsers: make([]string, 0),
    }

    // Handle user status updates (simplified example)
    go func() {
        for {
            // Simulate status changes
            roomStatus.ActiveUsers = rand.Intn(50) + 10
            roomStatus.TypingUsers = []string{"Alice", "Bob"}[0:rand.Intn(2)]
            roomStatus.LastActivity = gtime.Now().String()

            statusChan <- roomStatus
            time.Sleep(time.Second * 2)
        }
    }()

    for {
        select {
        case status := <-statusChan:
            data, err := gjson.Encode(status)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3. System Monitoring Dashboard 📊

Monitor system metrics in real-time:

type SystemMetrics struct {
    CPU       float64   `json:"cpu"`
    Memory    float64   `json:"memory"`
    Disk      float64   `json:"disk"`
    Network   NetworkStats `json:"network"`
    Timestamp string    `json:"timestamp"`
}

type NetworkStats struct {
    BytesIn  int64 `json:"bytesIn"`
    BytesOut int64 `json:"bytesOut"`
}

func SystemMetricsHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    metricsChan := make(chan SystemMetrics)
    defer close(metricsChan)

    // Collect and send metrics
    go func() {
        for {
            metrics := SystemMetrics{
                CPU:     getSystemCPUUsage(),
                Memory:  getSystemMemoryUsage(),
                Disk:    getSystemDiskUsage(),
                Network: getNetworkStats(),
                Timestamp: gtime.Now().String(),
            }

            metricsChan <- metrics
            time.Sleep(time.Second * 5)
        }
    }()

    for {
        select {
        case metrics := <-metricsChan:
            data, err := gjson.Encode(metrics)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
const eventSource = new EventSource('/system/metrics');

eventSource.onmessage = (event) => {
    const metrics = JSON.parse(event.data);

    // Update CPU gauge
    updateGauge('cpu-gauge', metrics.cpu);

    // Update memory usage
    updateProgressBar('memory-bar', metrics.memory);

    // Update network graph
    updateNetworkGraph(metrics.network);

    // Update timestamp
    document.getElementById('last-update').textContent = 
        new Date(metrics.timestamp).toLocaleString();
};
Enter fullscreen mode Exit fullscreen mode

4. Live Order Processing Status ⚡

Track order processing status in real-time:

type OrderStatus struct {
    OrderID    string   `json:"orderId"`
    Status     string   `json:"status"`
    Steps      []Step   `json:"steps"`
    UpdateTime string   `json:"updateTime"`
}

type Step struct {
    Name      string `json:"name"`
    Status    string `json:"status"` // pending, in-progress, completed, failed
    StartTime string `json:"startTime"`
    EndTime   string `json:"endTime"`
}

func OrderStatusHandler(r *ghttp.Request) {
    orderID := r.Get("orderId").String()
    if orderID == "" {
        r.Response.WriteStatus(http.StatusBadRequest)
        return
    }

    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statusChan := make(chan OrderStatus)
    defer close(statusChan)

    // Process order and send updates
    go func() {
        steps := []string{
            "Order Received",
            "Payment Processing",
            "Inventory Check",
            "Packaging",
            "Shipping Label Created",
            "Ready for Pickup"
        }

        orderStatus := OrderStatus{
            OrderID: orderID,
            Status:  "processing",
            Steps:   make([]Step, len(steps)),
        }

        // Simulate order processing
        for i, stepName := range steps {
            orderStatus.Steps[i] = Step{
                Name:      stepName,
                Status:    "in-progress",
                StartTime: gtime.Now().String(),
            }

            statusChan <- orderStatus

            // Simulate processing time
            time.Sleep(time.Second * time.Duration(rand.Intn(3)+1))

            orderStatus.Steps[i].Status = "completed"
            orderStatus.Steps[i].EndTime = gtime.Now().String()

            if i == len(steps)-1 {
                orderStatus.Status = "completed"
            }

            statusChan <- orderStatus
        }
    }()

    for {
        select {
        case status := <-statusChan:
            data, err := gjson.Encode(status)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
// Frontend code for order tracking
const orderId = 'ORDER123'; // Get from your order
const eventSource = new EventSource(\`/order/status?orderId=\${orderId}\`);

eventSource.onmessage = (event) => {
    const status = JSON.parse(event.data);

    // Update overall status
    document.getElementById('order-status').textContent = status.status;

    // Update progress steps
    status.steps.forEach((step, index) => {
        const stepElement = document.getElementById(\`step-\${index}\`);
        stepElement.className = \`step \${step.status}\`;

        if (step.endTime) {
            stepElement.querySelector('.time').textContent = 
                new Date(step.endTime).toLocaleString();
        }
    });

    // Close connection if order is completed
    if (status.status === 'completed') {
        eventSource.close();
    }
};
Enter fullscreen mode Exit fullscreen mode

5. Live Sports Score Updates 🏆

Track live game scores and statistics:

type GameStats struct {
    GameID      string    `json:"gameId"`
    HomeTeam    TeamStats `json:"homeTeam"`
    AwayTeam    TeamStats `json:"awayTeam"`
    Period      int       `json:"period"`
    TimeLeft    string    `json:"timeLeft"`
    LastUpdate  string    `json:"lastUpdate"`
}

type TeamStats struct {
    Name    string `json:"name"`
    Score   int    `json:"score"`
    Shots   int    `json:"shots"`
    Fouls   int    `json:"fouls"`
    Timeout int    `json:"timeouts"`
}

func LiveGameHandler(r *ghttp.Request) {
    gameID := r.Get("gameId").String()
    if gameID == "" {
        r.Response.WriteStatus(http.StatusBadRequest)
        return
    }

    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statsChan := make(chan GameStats)
    defer close(statsChan)

    // Simulate live game updates
    go func() {
        gameStats := GameStats{
            GameID:  gameID,
            Period:  1,
            TimeLeft: "12:00",
            HomeTeam: TeamStats{Name: "Home", Score: 0},
            AwayTeam: TeamStats{Name: "Away", Score: 0},
        }

        for {
            // Simulate game events
            if rand.Float32() < 0.3 {  // 30% chance of score change
                if rand.Float32() < 0.5 {
                    gameStats.HomeTeam.Score += 2
                } else {
                    gameStats.AwayTeam.Score += 2
                }
            }

            gameStats.LastUpdate = gtime.Now().String()
            statsChan <- gameStats

            time.Sleep(time.Second * 3)
        }
    }()

    for {
        select {
        case stats := <-statsChan:
            data, err := gjson.Encode(stats)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Making it Production-Ready 🛠️

1. Add a Heartbeat

Keep connections alive with periodic pings:

func sendHeartbeat(r *ghttp.Request) {
    ticker := time.NewTicker(time.Second * 30)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            r.Response.Write([]byte(": heartbeat\n\n"))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Implement Retry Logic

Make your frontend resilient:

function connectSSE() {
    const eventSource = new EventSource('/sse');

    eventSource.onmessage = (event) => {
        console.log('Got update:', event.data);
    };

    eventSource.onerror = (error) => {
        console.error('Connection lost:', error);
        eventSource.close();

        // Retry after 3 seconds
        setTimeout(connectSSE, 3000);
    };
}
Enter fullscreen mode Exit fullscreen mode

3. Support Different Event Types 🔄

Handle various types of updates:

type Message struct {
    Type string      `json:"type"`
    Data interface{} `json:"data"`
    ID   string      `json:"id"`
}

// Send different types of events
response := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n",
    msg.ID, msg.Type, data)
r.Response.Write([]byte(response))
Enter fullscreen mode Exit fullscreen mode

Frontend handling:

const eventSource = new EventSource('/sse');

// Handle specific event types
eventSource.addEventListener('price_update', (e) => {
    const price = JSON.parse(e.data);
    updatePriceDisplay(price);
});

eventSource.addEventListener('alert', (e) => {
    const alert = JSON.parse(e.data);
    showAlert(alert);
});
Enter fullscreen mode Exit fullscreen mode

Scaling with Redis 📈

For distributed systems, use Redis pub/sub to coordinate SSE messages:

type SseManager struct {
    redis *gredis.Redis
    topic string
}

func (sm *SseManager) HandleSSE(r *ghttp.Request) {
    ctx := r.Context()
    pubSub, _, err := sm.redis.Subscribe(ctx, sm.topic)
    if err != nil {
        g.Log().Error(ctx, err)
        return
    }
    defer pubSub.Close(ctx)

    // Forward Redis messages to SSE
    for {
        msg, err := pubSub.ReceiveMessage(ctx)
        if err != nil {
            g.Log().Error(ctx, err)
            return
        }
        r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", msg.Payload)))
        r.Response.Flush()
    }
}
Enter fullscreen mode Exit fullscreen mode

Performance Tips 🚀

  1. Batch Updates: When you have frequent updates, batch them together:
const batchSize = 10
const batchTimeout = time.Second

// Collect messages
batchMessages = append(batchMessages, msg)
if len(batchMessages) >= batchSize {
    sendBatch(r, batchMessages)
    batchMessages = batchMessages[:0]
}
Enter fullscreen mode Exit fullscreen mode
  1. Connection Limits: Prevent server overload:
type SseService struct {
    connections int32
    maxConn     int32
}

func (s *SseService) HandleSSE(r *ghttp.Request) {
    if atomic.LoadInt32(&s.connections) >= s.maxConn {
        r.Response.WriteStatus(http.StatusServiceUnavailable)
        return
    }
    atomic.AddInt32(&s.connections, 1)
    defer atomic.AddInt32(&s.connections, -1)
    // Handle SSE...
}
Enter fullscreen mode Exit fullscreen mode

More Advanced Features 🔥

Event Replay Support

Implement event replay for clients that reconnect:

type EventStore struct {
    events []Event
    mu     sync.RWMutex
}

type Event struct {
    ID      string
    Type    string
    Data    interface{}
    Time    time.Time
}

func (es *EventStore) AddEvent(event Event) {
    es.mu.Lock()
    defer es.mu.Unlock()
    es.events = append(es.events, event)
    // Keep last 100 events
    if len(es.events) > 100 {
        es.events = es.events[1:]
    }
}

func (es *EventStore) GetEventsSince(id string) []Event {
    es.mu.RLock()
    defer es.mu.RUnlock()

    for i, e := range es.events {
        if e.ID == id {
            return es.events[i+1:]
        }
    }
    return es.events
}

func HandleSSEWithReplay(r *ghttp.Request) {
    lastEventID := r.Header.Get("Last-Event-ID")

    if lastEventID != "" {
        // Send missed events
        events := eventStore.GetEventsSince(lastEventID)
        for _, event := range events {
            sendEvent(r, event)
        }
    }

    // Continue with normal SSE handling...
}
Enter fullscreen mode Exit fullscreen mode

Client Groups and Filtering

Implement client grouping for targeted updates:

type Client struct {
    ID       string
    Groups   []string
    Channel  chan interface{}
}

type SSEBroker struct {
    clients  map[string]*Client
    groups   map[string]map[string]*Client
    mu       sync.RWMutex
}

func (b *SSEBroker) AddClient(client *Client) {
    b.mu.Lock()
    defer b.mu.Unlock()

    b.clients[client.ID] = client
    for _, group := range client.Groups {
        if b.groups[group] == nil {
            b.groups[group] = make(map[string]*Client)
        }
        b.groups[group][client.ID] = client
    }
}

func (b *SSEBroker) BroadcastToGroup(group string, message interface{}) {
    b.mu.RLock()
    clients := b.groups[group]
    b.mu.RUnlock()

    for _, client := range clients {
        select {
        case client.Channel <- message:
        default:
            // Channel full, client might be slow
            log.Printf("Client %s message buffer full", client.ID)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Rate Limiting and Throttling

Implement rate limiting for high-frequency updates:

type ThrottledSSE struct {
    updateChan chan interface{}
    rate       time.Duration
}

func NewThrottledSSE(rate time.Duration) *ThrottledSSE {
    return &ThrottledSSE{
        updateChan: make(chan interface{}, 100),
        rate:       rate,
    }
}

func (t *ThrottledSSE) HandleUpdates(r *ghttp.Request) {
    ticker := time.NewTicker(t.rate)
    defer ticker.Stop()

    var lastUpdate interface{}

    for {
        select {
        case update := <-t.updateChan:
            lastUpdate = update

        case <-ticker.C:
            if lastUpdate != nil {
                // Send the most recent update
                data, _ := gjson.Encode(lastUpdate)
                r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
                r.Response.Flush()
                lastUpdate = nil
            }

        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Pro Tips 💡

  1. Use CORS headers in production
  2. Add authentication for sensitive data
  3. Monitor connection counts and server resources
  4. Test with different network conditions

Wrapping Up 🎉

SSE is a powerful tool for real-time updates that's often overlooked in favor of WebSockets. For one-way communication, it's simpler, more lightweight, and works great with HTTP/2. With GoFrame, implementing SSE becomes even more straightforward and maintainable.

Here's a quick checklist for your SSE implementation:

  • ✅ Basic SSE setup with proper headers
  • ✅ Error handling and connection management
  • ✅ Authentication and authorization
  • ✅ Monitoring and metrics
  • ✅ Scaling strategy
  • ✅ Resource management
  • ✅ Client handling
  • ✅ Security considerations

What's Next? 🚀

You could extend this implementation by:

  • Adding message persistence
  • Implementing message replay
  • Adding compression
  • Building client libraries
  • Adding WebSocket fallback
  • Implementing server-side filtering
  • Adding message prioritization

Have you used SSE in your projects? What challenges did you face? Share your experiences in the comments below! 👇

P.S. Want to see the complete code? Check out my GitHub repo [link to be added] for a production-ready implementation!

Resources 📚


If you found this helpful, follow me for more Go tutorials and real-world examples! ✨

Top comments (0)