Hey there, fellow developers! 👋 Ever needed to add real-time updates to your Go application but found WebSockets a bit too complex for your needs? Enter Server-Sent Events (SSE) - a simpler alternative that's perfect for one-way server-to-client communication.
In this guide, I'll walk you through implementing SSE using GoFrame, taking you from basic implementation all the way to production-ready code. Let's dive in!
What are Server-Sent Events? 🤔
SSE is a standard that enables servers to push real-time updates to clients over HTTP. Unlike WebSocket, SSE:
- Is one-way (server to client only)
- Uses standard HTTP
- Automatically reconnects if the connection is lost
- Is simpler to implement
Perfect for: real-time notifications, live feeds, status updates, and monitoring dashboards!
Getting Started: Basic SSE Implementation 🌱
Let's start with a simple example. Here's how to create your first SSE endpoint in GoFrame:
func SseHandler(r *ghttp.Request) {
// Set SSE headers
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
r.Response.Header().Set("Access-Control-Allow-Origin", "*")
// Create message channel
messageChan := make(chan string)
defer close(messageChan)
// Send updates every 2 seconds
go func() {
for {
message := fmt.Sprintf("Current time: %s", gtime.Now().String())
messageChan <- message
time.Sleep(time.Second * 2)
}
}()
// Write to client
for {
select {
case message := <-messageChan:
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", message)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
Register the route:
func main() {
s := g.Server()
s.BindHandler("/sse", SseHandler)
s.Run()
}
And here's how to connect from the frontend:
const eventSource = new EventSource('http://localhost:8080/sse');
eventSource.onmessage = function(event) {
console.log('Got update:', event.data);
};
eventSource.onerror = function(error) {
console.error('Connection error:', error);
eventSource.close();
};
Real-World Examples: Let's Build Something Cool! 🛠️
1. Live Stock Ticker 📈
Let's build something more practical - a real-time stock price feed:
type StockPrice struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Time string `json:"time"`
}
func StockPriceHandler(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
messageChan := make(chan StockPrice)
defer close(messageChan)
// Simulate stock updates
go func() {
stocks := []string{"AAPL", "GOOGL", "MSFT"}
for {
for _, symbol := range stocks {
price := StockPrice{
Symbol: symbol,
Price: rand.Float64() * 1000,
Time: gtime.Now().String(),
}
messageChan <- price
}
time.Sleep(time.Second * 3)
}
}()
for {
select {
case price := <-messageChan:
data, err := gjson.Encode(price)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
2. Real-time Chat Room Status 💬
Monitor active users and typing indicators in a chat room:
type ChatStatus struct {
ActiveUsers int `json:"activeUsers"`
TypingUsers []string `json:"typingUsers"`
LastActivity string `json:"lastActivity"`
}
func ChatStatusHandler(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
statusChan := make(chan ChatStatus)
defer close(statusChan)
// Track room status
roomStatus := ChatStatus{
ActiveUsers: 0,
TypingUsers: make([]string, 0),
}
// Handle user status updates (simplified example)
go func() {
for {
// Simulate status changes
roomStatus.ActiveUsers = rand.Intn(50) + 10
roomStatus.TypingUsers = []string{"Alice", "Bob"}[0:rand.Intn(2)]
roomStatus.LastActivity = gtime.Now().String()
statusChan <- roomStatus
time.Sleep(time.Second * 2)
}
}()
for {
select {
case status := <-statusChan:
data, err := gjson.Encode(status)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
3. System Monitoring Dashboard 📊
Monitor system metrics in real-time:
type SystemMetrics struct {
CPU float64 `json:"cpu"`
Memory float64 `json:"memory"`
Disk float64 `json:"disk"`
Network NetworkStats `json:"network"`
Timestamp string `json:"timestamp"`
}
type NetworkStats struct {
BytesIn int64 `json:"bytesIn"`
BytesOut int64 `json:"bytesOut"`
}
func SystemMetricsHandler(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
metricsChan := make(chan SystemMetrics)
defer close(metricsChan)
// Collect and send metrics
go func() {
for {
metrics := SystemMetrics{
CPU: getSystemCPUUsage(),
Memory: getSystemMemoryUsage(),
Disk: getSystemDiskUsage(),
Network: getNetworkStats(),
Timestamp: gtime.Now().String(),
}
metricsChan <- metrics
time.Sleep(time.Second * 5)
}
}()
for {
select {
case metrics := <-metricsChan:
data, err := gjson.Encode(metrics)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
const eventSource = new EventSource('/system/metrics');
eventSource.onmessage = (event) => {
const metrics = JSON.parse(event.data);
// Update CPU gauge
updateGauge('cpu-gauge', metrics.cpu);
// Update memory usage
updateProgressBar('memory-bar', metrics.memory);
// Update network graph
updateNetworkGraph(metrics.network);
// Update timestamp
document.getElementById('last-update').textContent =
new Date(metrics.timestamp).toLocaleString();
};
4. Live Order Processing Status ⚡
Track order processing status in real-time:
type OrderStatus struct {
OrderID string `json:"orderId"`
Status string `json:"status"`
Steps []Step `json:"steps"`
UpdateTime string `json:"updateTime"`
}
type Step struct {
Name string `json:"name"`
Status string `json:"status"` // pending, in-progress, completed, failed
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
}
func OrderStatusHandler(r *ghttp.Request) {
orderID := r.Get("orderId").String()
if orderID == "" {
r.Response.WriteStatus(http.StatusBadRequest)
return
}
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
statusChan := make(chan OrderStatus)
defer close(statusChan)
// Process order and send updates
go func() {
steps := []string{
"Order Received",
"Payment Processing",
"Inventory Check",
"Packaging",
"Shipping Label Created",
"Ready for Pickup"
}
orderStatus := OrderStatus{
OrderID: orderID,
Status: "processing",
Steps: make([]Step, len(steps)),
}
// Simulate order processing
for i, stepName := range steps {
orderStatus.Steps[i] = Step{
Name: stepName,
Status: "in-progress",
StartTime: gtime.Now().String(),
}
statusChan <- orderStatus
// Simulate processing time
time.Sleep(time.Second * time.Duration(rand.Intn(3)+1))
orderStatus.Steps[i].Status = "completed"
orderStatus.Steps[i].EndTime = gtime.Now().String()
if i == len(steps)-1 {
orderStatus.Status = "completed"
}
statusChan <- orderStatus
}
}()
for {
select {
case status := <-statusChan:
data, err := gjson.Encode(status)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
// Frontend code for order tracking
const orderId = 'ORDER123'; // Get from your order
const eventSource = new EventSource(\`/order/status?orderId=\${orderId}\`);
eventSource.onmessage = (event) => {
const status = JSON.parse(event.data);
// Update overall status
document.getElementById('order-status').textContent = status.status;
// Update progress steps
status.steps.forEach((step, index) => {
const stepElement = document.getElementById(\`step-\${index}\`);
stepElement.className = \`step \${step.status}\`;
if (step.endTime) {
stepElement.querySelector('.time').textContent =
new Date(step.endTime).toLocaleString();
}
});
// Close connection if order is completed
if (status.status === 'completed') {
eventSource.close();
}
};
5. Live Sports Score Updates 🏆
Track live game scores and statistics:
type GameStats struct {
GameID string `json:"gameId"`
HomeTeam TeamStats `json:"homeTeam"`
AwayTeam TeamStats `json:"awayTeam"`
Period int `json:"period"`
TimeLeft string `json:"timeLeft"`
LastUpdate string `json:"lastUpdate"`
}
type TeamStats struct {
Name string `json:"name"`
Score int `json:"score"`
Shots int `json:"shots"`
Fouls int `json:"fouls"`
Timeout int `json:"timeouts"`
}
func LiveGameHandler(r *ghttp.Request) {
gameID := r.Get("gameId").String()
if gameID == "" {
r.Response.WriteStatus(http.StatusBadRequest)
return
}
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
statsChan := make(chan GameStats)
defer close(statsChan)
// Simulate live game updates
go func() {
gameStats := GameStats{
GameID: gameID,
Period: 1,
TimeLeft: "12:00",
HomeTeam: TeamStats{Name: "Home", Score: 0},
AwayTeam: TeamStats{Name: "Away", Score: 0},
}
for {
// Simulate game events
if rand.Float32() < 0.3 { // 30% chance of score change
if rand.Float32() < 0.5 {
gameStats.HomeTeam.Score += 2
} else {
gameStats.AwayTeam.Score += 2
}
}
gameStats.LastUpdate = gtime.Now().String()
statsChan <- gameStats
time.Sleep(time.Second * 3)
}
}()
for {
select {
case stats := <-statsChan:
data, err := gjson.Encode(stats)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
Making it Production-Ready 🛠️
1. Add a Heartbeat
Keep connections alive with periodic pings:
func sendHeartbeat(r *ghttp.Request) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.Response.Write([]byte(": heartbeat\n\n"))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
2. Implement Retry Logic
Make your frontend resilient:
function connectSSE() {
const eventSource = new EventSource('/sse');
eventSource.onmessage = (event) => {
console.log('Got update:', event.data);
};
eventSource.onerror = (error) => {
console.error('Connection lost:', error);
eventSource.close();
// Retry after 3 seconds
setTimeout(connectSSE, 3000);
};
}
3. Support Different Event Types 🔄
Handle various types of updates:
type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
ID string `json:"id"`
}
// Send different types of events
response := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n",
msg.ID, msg.Type, data)
r.Response.Write([]byte(response))
Frontend handling:
const eventSource = new EventSource('/sse');
// Handle specific event types
eventSource.addEventListener('price_update', (e) => {
const price = JSON.parse(e.data);
updatePriceDisplay(price);
});
eventSource.addEventListener('alert', (e) => {
const alert = JSON.parse(e.data);
showAlert(alert);
});
Scaling with Redis 📈
For distributed systems, use Redis pub/sub to coordinate SSE messages:
type SseManager struct {
redis *gredis.Redis
topic string
}
func (sm *SseManager) HandleSSE(r *ghttp.Request) {
ctx := r.Context()
pubSub, _, err := sm.redis.Subscribe(ctx, sm.topic)
if err != nil {
g.Log().Error(ctx, err)
return
}
defer pubSub.Close(ctx)
// Forward Redis messages to SSE
for {
msg, err := pubSub.ReceiveMessage(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", msg.Payload)))
r.Response.Flush()
}
}
Performance Tips 🚀
- Batch Updates: When you have frequent updates, batch them together:
const batchSize = 10
const batchTimeout = time.Second
// Collect messages
batchMessages = append(batchMessages, msg)
if len(batchMessages) >= batchSize {
sendBatch(r, batchMessages)
batchMessages = batchMessages[:0]
}
- Connection Limits: Prevent server overload:
type SseService struct {
connections int32
maxConn int32
}
func (s *SseService) HandleSSE(r *ghttp.Request) {
if atomic.LoadInt32(&s.connections) >= s.maxConn {
r.Response.WriteStatus(http.StatusServiceUnavailable)
return
}
atomic.AddInt32(&s.connections, 1)
defer atomic.AddInt32(&s.connections, -1)
// Handle SSE...
}
More Advanced Features 🔥
Event Replay Support
Implement event replay for clients that reconnect:
type EventStore struct {
events []Event
mu sync.RWMutex
}
type Event struct {
ID string
Type string
Data interface{}
Time time.Time
}
func (es *EventStore) AddEvent(event Event) {
es.mu.Lock()
defer es.mu.Unlock()
es.events = append(es.events, event)
// Keep last 100 events
if len(es.events) > 100 {
es.events = es.events[1:]
}
}
func (es *EventStore) GetEventsSince(id string) []Event {
es.mu.RLock()
defer es.mu.RUnlock()
for i, e := range es.events {
if e.ID == id {
return es.events[i+1:]
}
}
return es.events
}
func HandleSSEWithReplay(r *ghttp.Request) {
lastEventID := r.Header.Get("Last-Event-ID")
if lastEventID != "" {
// Send missed events
events := eventStore.GetEventsSince(lastEventID)
for _, event := range events {
sendEvent(r, event)
}
}
// Continue with normal SSE handling...
}
Client Groups and Filtering
Implement client grouping for targeted updates:
type Client struct {
ID string
Groups []string
Channel chan interface{}
}
type SSEBroker struct {
clients map[string]*Client
groups map[string]map[string]*Client
mu sync.RWMutex
}
func (b *SSEBroker) AddClient(client *Client) {
b.mu.Lock()
defer b.mu.Unlock()
b.clients[client.ID] = client
for _, group := range client.Groups {
if b.groups[group] == nil {
b.groups[group] = make(map[string]*Client)
}
b.groups[group][client.ID] = client
}
}
func (b *SSEBroker) BroadcastToGroup(group string, message interface{}) {
b.mu.RLock()
clients := b.groups[group]
b.mu.RUnlock()
for _, client := range clients {
select {
case client.Channel <- message:
default:
// Channel full, client might be slow
log.Printf("Client %s message buffer full", client.ID)
}
}
}
Rate Limiting and Throttling
Implement rate limiting for high-frequency updates:
type ThrottledSSE struct {
updateChan chan interface{}
rate time.Duration
}
func NewThrottledSSE(rate time.Duration) *ThrottledSSE {
return &ThrottledSSE{
updateChan: make(chan interface{}, 100),
rate: rate,
}
}
func (t *ThrottledSSE) HandleUpdates(r *ghttp.Request) {
ticker := time.NewTicker(t.rate)
defer ticker.Stop()
var lastUpdate interface{}
for {
select {
case update := <-t.updateChan:
lastUpdate = update
case <-ticker.C:
if lastUpdate != nil {
// Send the most recent update
data, _ := gjson.Encode(lastUpdate)
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
lastUpdate = nil
}
case <-r.Context().Done():
return
}
}
}
Pro Tips 💡
- Use CORS headers in production
- Add authentication for sensitive data
- Monitor connection counts and server resources
- Test with different network conditions
Wrapping Up 🎉
SSE is a powerful tool for real-time updates that's often overlooked in favor of WebSockets. For one-way communication, it's simpler, more lightweight, and works great with HTTP/2. With GoFrame, implementing SSE becomes even more straightforward and maintainable.
Here's a quick checklist for your SSE implementation:
- ✅ Basic SSE setup with proper headers
- ✅ Error handling and connection management
- ✅ Authentication and authorization
- ✅ Monitoring and metrics
- ✅ Scaling strategy
- ✅ Resource management
- ✅ Client handling
- ✅ Security considerations
What's Next? 🚀
You could extend this implementation by:
- Adding message persistence
- Implementing message replay
- Adding compression
- Building client libraries
- Adding WebSocket fallback
- Implementing server-side filtering
- Adding message prioritization
Have you used SSE in your projects? What challenges did you face? Share your experiences in the comments below! 👇
P.S. Want to see the complete code? Check out my GitHub repo [link to be added] for a production-ready implementation!
Resources 📚
If you found this helpful, follow me for more Go tutorials and real-world examples! ✨
Top comments (0)