WebSocket Real-time Service
Overview
The WebSocket service provides real-time GPS position updates and event notifications to web and mobile clients. It listens to Redis streams for position and event data, then broadcasts updates to connected clients over persistent WebSocket connections.
Architecture
Service Details
- Port: 8086
- Endpoint:
wss://gateway.vislagps.com/api/socketorws://localhost:8086/api/socket - Technology: Python (FastAPI + WebSockets)
- Authentication: JWT token via query parameter
- Protocol: WebSocket with JSON messages
Connection Flow
1. Client Connection
const token = localStorage.getItem('accessToken');
const ws = new WebSocket(`wss://gateway.vislagps.com/api/socket?token=${token}`);
ws.onopen = () => {
console.log('WebSocket connected');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
handleMessage(message);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('WebSocket disconnected');
// Implement reconnection logic
};
2. Authentication
- JWT token passed as query parameter:
?token=<JWT> - Nginx validates token via
auth_requestbefore WebSocket upgrade - Invalid or missing token → connection rejected with code 4001
3. Heartbeat
Client should send periodic ping messages to keep connection alive:
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping');
}
}, 30000); // Every 30 seconds
Server responds with pong to confirm connection is alive.
Message Format
All WebSocket messages follow this JSON structure:
type WebSocketMessage = PositionUpdate | EventUpdate;
interface PositionUpdate {
type: 'position';
data: Position;
}
interface EventUpdate {
type: 'event';
data: Event;
}
Position Update
{
"type": "position",
"data": {
"id": 12345,
"deviceId": 42,
"protocol": "gt06",
"deviceTime": "2025-12-15T17:30:00Z",
"fixTime": "2025-12-15T17:30:00Z",
"serverTime": "2025-12-15T17:30:01Z",
"outdated": false,
"valid": true,
"latitude": 45.4642,
"longitude": 9.1900,
"altitude": 120.5,
"speed": 45.8,
"course": 180.5,
"address": "Via Example, Milano, IT",
"accuracy": 10.0,
"attributes": {
"ignition": true,
"battery": 12.8,
"satellites": 12
}
}
}
Event Update
{
"type": "event",
"data": {
"id": 789,
"type": "deviceOverspeed",
"eventTime": "2025-12-15T17:30:00Z",
"deviceId": 42,
"positionId": 12345,
"geofenceId": null,
"maintenanceId": null,
"attributes": {
"speed": 145.8,
"speedLimit": 130.0
}
}
}
Device Permissions
The WebSocket service implements device-level permissions:
- When a client connects, the service loads all devices accessible to that user
- Permissions are based on:
- Direct device ownership (
user_devicetable) - Group membership (
user_group+devices.group_id)
- Direct device ownership (
- Only position/event updates for authorized devices are sent to each client
Permission Loading Query
SELECT d.id
FROM devices d
LEFT JOIN user_device ud ON d.id = ud.device_id
WHERE ud.user_id = :user_id
OR d.id IN (
SELECT d2.id FROM devices d2
JOIN groups g ON d2.group_id = g.id
JOIN user_group ug ON g.id = ug.group_id
WHERE ug.user_id = :user_id
)
Redis Integration
Subscribed Streams
| Stream Name | Purpose | Data Format |
|---|---|---|
positions | GPS position updates | {position: <JSON>, uniqueId: <string>} |
events | Device events/alarms | {event: <JSON>, deviceId: <int>} |
Stream Consumer
async def redis_subscriber():
redis_client = redis.from_url(settings.redis_url, decode_responses=True)
last_ids = {
'positions': '$',
'events': '$'
}
while True:
entries = await redis_client.xread(
{
'positions': last_ids['positions'],
'events': last_ids['events']
},
block=5000
)
for stream_name, messages in entries:
for msg_id, data in messages:
last_ids[stream_name] = msg_id
if stream_name == 'positions':
await handle_position(data)
elif stream_name == 'events':
await handle_event(data)
Connection Management
Connection Manager
The service uses a ConnectionManager class to handle multiple connections per user:
class ConnectionManager:
def __init__(self):
# user_id -> list of WebSocket connections
self.connections: Dict[int, List[WebSocket]] = {}
# device_id -> set of authorized user_ids
self.device_permissions: Dict[int, Set[int]] = {}
async def connect(self, websocket: WebSocket, user_id: int):
await websocket.accept()
if user_id not in self.connections:
self.connections[user_id] = []
self.connections[user_id].append(websocket)
await self._load_user_devices(user_id)
async def broadcast_position(self, position: dict, device_id: int):
authorized_users = self.get_authorized_users(device_id)
message = {"type": "position", "data": position}
for user_id in authorized_users:
await self.broadcast_to_user(user_id, message)
Multiple Connections
- Users can have multiple simultaneous connections (e.g., web + mobile)
- Each connection receives the same updates
- Disconnections are handled gracefully without affecting other connections
Metrics
The service exposes Prometheus metrics at /metrics:
| Metric | Type | Description |
|---|---|---|
websocket_active_connections | Gauge | Current number of active WebSocket connections |
websocket_messages_sent_total | Counter | Total messages sent, labeled by message_type (position/event) |
Error Handling
Connection Errors
| Error Code | Reason | Action |
|---|---|---|
| 4001 | Missing or invalid token | Client should re-authenticate |
| 1000 | Normal closure | Connection closed cleanly |
| 1006 | Abnormal closure | Network issue - client should reconnect |
Reconnection Strategy
Clients should implement exponential backoff for reconnections:
let reconnectDelay = 3000; // Start with 3 seconds
const maxReconnectDelay = 30000; // Cap at 30 seconds
function connect() {
const ws = new WebSocket(wsUrl);
ws.onclose = () => {
console.log(`Reconnecting in ${reconnectDelay}ms...`);
setTimeout(() => {
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
connect();
}, reconnectDelay);
};
ws.onopen = () => {
reconnectDelay = 3000; // Reset on successful connection
};
}
Client Examples
Web App Integration
See web-app/src/lib/websocket-context.tsx for full React implementation.
React Hook
import { useWebSocket } from '@/hooks/useWebSocket';
function MyComponent() {
const { status, isConnected } = useWebSocket({
onMessage: (message) => {
if (message.type === 'position') {
updateDevicePosition(message.data);
} else if (message.type === 'event') {
showEventNotification(message.data);
}
},
autoConnect: true
});
return (
<div>
Status: {status}
{isConnected && '✅ Connected'}
</div>
);
}
Mobile App Integration
iOS (Swift)
import Starscream
class WebSocketManager: WebSocketDelegate {
var socket: WebSocket?
func connect(token: String) {
var request = URLRequest(url: URL(string: "wss://gateway.vislagps.com/api/socket?token=\(token)")!)
socket = WebSocket(request: request)
socket?.delegate = self
socket?.connect()
}
func didReceive(event: WebSocketEvent, client: WebSocket) {
switch event {
case .text(let text):
if let data = text.data(using: .utf8),
let message = try? JSONDecoder().decode(WebSocketMessage.self, from: data) {
handleMessage(message)
}
case .connected:
startHeartbeat()
default:
break
}
}
}
Android (Kotlin)
import okhttp3.*
class WebSocketManager {
private var webSocket: WebSocket? = null
fun connect(token: String) {
val client = OkHttpClient()
val request = Request.Builder()
.url("wss://gateway.vislagps.com/api/socket?token=$token")
.build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
val message = Gson().fromJson(text, WebSocketMessage::class.java)
handleMessage(message)
}
override fun onOpen(webSocket: WebSocket, response: Response) {
startHeartbeat()
}
})
}
}
Monitoring
Grafana Dashboard
A comprehensive Grafana dashboard is available at:
- Dashboard: WebSocket Service - Real-time Metrics
- File:
monitoring/grafana/dashboards/microservices/websocket-metrics.json
Key Panels
- Active Connections - Real-time gauge
- Messages Sent - Rate and total count
- Message Distribution - Pie chart by type (positions vs events)
- Activity Timeline - Messages over time
Logs
View WebSocket logs in Grafana Loki:
{container="websocket"} |= "connected"
{container="websocket"} |= "broadcast"
{container="websocket"} |~ "ERROR|error"
Best Practices
Client Implementation
- ✅ Always implement reconnection logic with exponential backoff
- ✅ Send periodic pings to keep connection alive
- ✅ Handle all message types gracefully (unknown types should be logged, not crash)
- ✅ Store connection state in application state management
- ✅ Show connection status to user (connected/disconnected indicator)
Performance
- ✅ Batch updates when processing multiple positions/events
- ✅ Throttle UI updates to avoid excessive re-renders
- ✅ Use virtual scrolling for event lists
- ✅ Unsubscribe when not needed (e.g., user navigates away)
Security
- ✅ Never log JWT tokens in client code
- ✅ Validate all incoming messages before processing
- ✅ Use WSS (secure WebSocket) in production
- ✅ Rotate tokens regularly and handle token refresh
Troubleshooting
Connection Refused
Symptom: WebSocket connection fails immediately
Possible Causes:
- Nginx not routing
/api/socketto WebSocket service - WebSocket service not running
- Firewall blocking port 8086
Solution: Check nginx logs and verify service is running
Connection Closes After ~60 Seconds
Symptom: Connection drops after about 1 minute
Possible Causes:
- No heartbeat/ping being sent
- Nginx timeout too short
- Load balancer timeout
Solution: Implement ping/pong heartbeat every 30 seconds
Not Receiving Updates
Symptom: Connected but no position/event updates
Possible Causes:
- User doesn't have permission to device
- Device not sending data
- Redis stream not receiving data
Solution:
- Check user permissions in database
- Verify device is active and sending positions
- Check Redis streams:
XREAD COUNT 10 STREAMS positions events $ $
Multiple Duplicate Messages
Symptom: Receiving same position/event multiple times
Possible Causes:
- Multiple WebSocket connections for same user
- Client reconnecting too aggressively
Solution: Implement connection deduplication or use connection ID tracking
Configuration
Environment Variables
# WebSocket Service (.env)
DATABASE_URL=postgresql://user:pass@postgres:5432/visla
REDIS_URL=redis://redis:6379/0
SECRET_KEY=<your-jwt-secret>
JWT_ALGORITHM=HS256
HOST=0.0.0.0
PORT=8086
POSITIONS_STREAM=positions
EVENTS_STREAM=events
Nginx Configuration
location /api/socket {
# Authenticate before WebSocket upgrade
auth_request /auth-validate;
# WebSocket upgrade
proxy_pass http://websocket:8086;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Timeouts
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
Related Documentation
- Backend Structure - Overall architecture
- Web App Development - Web client implementation
- Mobile Apps - iOS/Android integration
- Redis Streams - Data flow from decoder to WebSocket
FAQ
Q: Can I connect from multiple tabs/devices?
A: Yes! Each connection is independent. The service supports multiple simultaneous connections per user.
Q: What happens if I lose connection?
A: The server will clean up your connection. Client should implement automatic reconnection.
Q: How do I know which devices I have access to?
A: The server automatically filters updates based on your user permissions. You only receive updates for devices you're authorized to see.
Q: Is there a rate limit?
A: No explicit rate limit, but connections are validated on connect. Excessive reconnections may be throttled by nginx.
Q: Can I request historical data over WebSocket?
A: No, WebSocket only broadcasts new updates. Use REST API /api/positions for historical data.