This article guides you through creating a WebSocket client in Flutter for real-time data. We’ll start with the basics and improve it step by step until it’s solid and reliable. We’re using Binance Market WebSocket as an example (think of it like a live crypto price feed), but the real focus is WebSocket—connecting, fixing errors, and managing data streams. Let’s dive in!
Part 1: Starting with a Basic Connection
First, we need a way to connect to a WebSocket server. Since real-time data can come from different tools (like WebSocket, MQTT,...), we define a StreamingService
interface to keep things flexible. But here, we’ll use WebSocket with the web_socket_channel
package, connecting to Binance’s endpoint (wss://stream.binance.com:9443/ws
).
Here’s the simple starting code:
abstract class StreamingService {
void connect();
}
class WebsocketService implements StreamingService {
WebsocketService({required this.wsUrl});
final String wsUrl;
WebSocketChannel? _channel;
bool get isConnected => _channel != null && _channel!.closeCode == null;
@override
void connect() {
if (_channel != null) return;
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
_channel?.stream.listen(
_onEvent,
onError: _onError,
onDone: _onDone,
);
}
void _onEvent(dynamic event) {
print("Event: $event");
}
void _onError(dynamic error) {
print("Error: $error");
}
void _onDone() {
print("Connection closed");
}
}
This code connects and listens for data. _onEvent
prints incoming messages, _onError
logs errors, and _onDone
notes when the connection stops. It’s simple—just enough to see Binance data flowing—but it’s fragile. If the connection drops or fails, it won’t recover. Let’s fix that next.
Part 2: Keeping the Connection Alive with Ping-Pong
The basic setup from Part 1 has a weakness: WebSocket connections can die if they’re idle too long. Binance helps us here by sending a “ping” message to check if we’re still alive. We’ll reply with a “pong” to keep the connection going—a bit like a heartbeat.
To improve this, we’ll:
- Check incoming events for a
ping
. - Send a
pong
response. like saying, “Hey, I’m still here!”
Here’s the updated code:
class WebsocketService implements StreamingService {
/* Same as before */
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_sendPong();
return;
}
}
print("Event: $event");
}
void _sendPong() {
_channel?.sink.add(jsonEncode({'pong': DateTime.now().millisecondsSinceEpoch}));
}
}
This is smart—it keeps the connection alive without extra work. But if it drops anyway, we’re still stuck. Let’s add reconnect logic next.
Part 3: Sending Requests
So far, we only listen. Let’s sending requests to subscribe to Binance’s Stream (Example: Individual Symbol Mini Ticker Stream)
Improvements:
- Add
_symbolMiniTickerStreamController
lets us stream data to the app. - Add
EventTypes
keeps event names organized. - Add
SubscribeStreamRequest
andUnsubscribeStreamRequest
classes for clean code. - Add
_sendRequest()
to send messages.
Here’s the code:
class WebsocketService implements StreamingService {
/* Same as before */
final _symbolMiniTickerStreamController = StreamController<SymbolMiniTickerEvent>.broadcast();
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_handlePingResponse();
return;
}
final eventType = decodedEvent['e'];
switch (eventType) {
case EventTypes.symbolMiniTicker:
_symbolMiniTickerStreamController.sink.add(SymbolMiniTickerEvent.fromMap(decodedEvent));
break;
}
}
}
Future<bool> _sendRequest(Map<String, dynamic> request) async {
if (!isConnected) {
print("WebSocket is not connected. Cannot send request: $request");
return false;
}
_channel?.sink.add(jsonEncode(request));
return true; // Assume success
}
/// =============================== Individual Symbol Mini Ticker Streams ===============================
Stream<SymbolMiniTickerEvent> get symbolMiniTickerStream => _symbolMiniTickerStreamController.stream;
Future<bool> subscribeSymbolMiniTickerStream({required List<String> symbols}) async {
final request = SubscribeStreamRequest(
streamNames: symbols.map((symbol) => "${symbol.toLowerCase()}@miniTicker").toList(),
);
return _sendRequest(request.toMap());
}
Future<bool> unsubscribeSymbolMiniTickerStream({required List<String> symbols}) async {
final request = UnsubscribeStreamRequest(
streamNames: symbols.map((symbol) => "${symbol.toLowerCase()}@miniTicker").toList(),
);
return _sendRequest(request.toMap());
}
}
class EventTypes {
static const String symbolMiniTicker = '24hrMiniTicker';
}
class SubscribeStreamRequest {
SubscribeStreamRequest({required this.streamNames});
final List<String> streamNames;
Map<String, dynamic> toMap() {
return {
"id": DateTime.now().millisecondsSinceEpoch,
"method": "SUBSCRIBE",
"params": streamNames,
};
}
}
class UnsubscribeStreamRequest {
/* */
}
It’s cool, but _sendRequest()
doesn’t confirm if it worked, and reconnects lose subscriptions. Let’s make it smarter.
Part 4: Making Requests Smarter with ACK and Stream Management
Part 3 sends requests, but it’s blind—we don’t know if Binance accepted them and we don't know which stream was subscribed to. Let’s fix that with:
- Add
_pendingRequests
tracks requests with Completer — it waits for Binance’s ACK like “Hey, did this work?” - Add
_activeStreams
to remember active stream subscriptions. - Add
_handleAckResponse()
to process ack response. - Update
_sendRequest()
to check if the request was sent successfully to manage the active streams.
class WebsocketService implements StreamingService {
/* Same as before */
final Map<int, Completer<bool>> _pendingRequests = {};
final Set<String> _activeStreams = {};
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_handlePingResponse();
return;
}
if (decodedEvent.containsKey('id')) {
_handleAckEvent(decodedEvent);
return;
}
final eventType = decodedEvent['e'];
switch (eventType) {
case EventTypes.symbolMiniTicker:
_symbolMiniTickerStreamController.sink.add(SymbolMiniTickerEvent.fromMap(decodedEvent));
break;
}
}
}
void _handleAckEvent(Map<String, dynamic> event) {
final requestId = event['id'];
final completer = _pendingRequests.remove(requestId);
if (completer != null) {
if (event.containsKey('code')) {
completer.complete(false);
print("Error: ${event['msg']} (Code: ${event['code']})");
} else if (event.containsKey('result')) {
completer.complete(event['result'] == null);
} else {
completer.complete(false);
}
}
}
Future<bool> _sendRequest(Map<String, dynamic> request) async {
if (!isConnected) {
print("WebSocket is not connected. Cannot send request: $request");
return false;
}
final completer = Completer<bool>();
_pendingRequests[request['id']] = completer;
_channel?.sink.add(jsonEncode(request));
final isSuccess = await completer.future;
if (isSuccess && request.containsKey('params')) {
final streamNames = request['params'];
if (request['method'] == 'SUBSCRIBE') {
_activeStreams.addAll(streamNames);
} else if (request['method'] == 'UNSUBSCRIBE') {
_activeStreams.removeAll(streamNames);
}
}
return isSuccess;
}
}
Part 5: Adding Reconnect Logic
We have handled keep the connection alive during quiet times, but what if the network fails or Binance cuts us off? The old code just logs “Error” or “Connection closed” and gives up. We need it to fight back by reconnecting.
Here’s how we’ll improve it:
- Add
_reconnectAttempts
to avoid infinite loop calls. - Add
reconnect()
for retry, waits longer each try and re-subscribe to previously subscribed streams if reconnection is successful - Add
disconnect()
to terminate the current connection, and also cancels all pending requests to avoid resubscribing to streams that are no longer needed. - Update
_onError()
and_onDone()
to check error types and close codes to decide when to reconnect.
Here’s the code:
class WebsocketService implements StreamingService {
/* Same as before */
int _reconnectAttempts = 0;
void _onError(dynamic error) {
if (error is WebSocketChannelException || error is SocketException) {
print("Temporary error detected");
reconnect();
} else if (error is HttpException && error.message.contains("403")) {
print("Access denied (403 Forbidden). Check API permissions.");
} else if (error is HttpException && error.message.contains("400")) {
print("Bad request (400). Check the request format.");
} else {
print("Unknown error: $error");
}
}
void _onDone() {
print("Connection closed: ${_channel?.closeReason ?? 'Unknown reason'}");
if (_channel?.closeCode == null || _channel?.closeCode != 1000) {
reconnect();
}
}
@override
void reconnect() async {
disconnect();
if (_reconnectAttempts >= 5) {
print("Maximum reconnection attempts reached");
return;
}
print("Attempting to reconnect (${_reconnectAttempts + 1}/5)...");
_reconnectAttempts++;
await Future.delayed(Duration(seconds: 2 * _reconnectAttempts));
connect();
if (isConnected) {
_reconnectAttempts = 0;
if (_activeStreams.isNotEmpty) {
final request = SubscribeStreamRequest(streamNames: _activeStreams.toList());
await _sendRequest(request.toMap());
}
}
}
@override
void disconnect() {
for (final completer in _pendingRequests.values) {
completer.complete(false);
}
_pendingRequests.clear();
_channel?.sink.close();
_channel = null;
}
}
Wrapping Up
We’ve built a WebSocket client from scratch—starting with a basic connection, adding reconnect logic, keeping it alive with ping-pong, sending requests, and making them smart with ACKs and stream management. Binance Market WebSocket was just our example; the real star is the WebSocket setup. Use this code, tweak it for your needs, and handle real-time data like a pro! You can find the full source code at: Github
Top comments (0)