WebSocket Real-time Service
Overviewβ
The WebSocket service provides real-time GPS position updates and event notifications to web and mobile clients. It listens to Redis streams for position and event data, then broadcasts updates to connected clients over persistent WebSocket connections.
Architectureβ
Service Detailsβ
- Port: 8086
- Endpoint:
wss://api.vislagps.com/api/socketorws://localhost:8086/api/socket - Technology: Python (FastAPI + WebSockets)
- Authentication: JWT token via query parameter
- Protocol: WebSocket with JSON messages
Connection Flowβ
1. Client Connectionβ
const token = localStorage.getItem('accessToken');
const ws = new WebSocket(`wss://api.vislagps.com/api/socket?token=${token}`);
ws.onopen = () => {
console.log('WebSocket connected');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
handleMessage(message);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('WebSocket disconnected');
// Implement reconnection logic
};
2. Authenticationβ
- JWT token passed as query parameter:
?token=<JWT> - Nginx validates token via
auth_requestbefore WebSocket upgrade - Invalid or missing token β connection rejected with code 4001
3. Heartbeatβ
Client should send periodic ping messages to keep connection alive:
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping');
}
}, 30000); // Every 30 seconds
Server responds with pong to confirm connection is alive.
Message Formatβ
All WebSocket messages follow this JSON structure:
type WebSocketMessage = PositionUpdate | EventUpdate;
interface PositionUpdate {
type: 'position';
data: Position;
}
interface EventUpdate {
type: 'event';
data: Event;
}
Position Updateβ
{
"type": "position",
"data": {
"id": 12345,
"deviceId": 42,
"protocol": "gt06",
"deviceTime": "2025-12-15T17:30:00Z",
"fixTime": "2025-12-15T17:30:00Z",
"serverTime": "2025-12-15T17:30:01Z",
"outdated": false,
"valid": true,
"latitude": 45.4642,
"longitude": 9.1900,
"altitude": 120.5,
"speed": 45.8,
"course": 180.5,
"address": "Via Example, Milano, IT",
"accuracy": 10.0,
"attributes": {
"ignition": true,
"battery": 12.8,
"satellites": 12
}
}
}
Event Updateβ
{
"type": "event",
"data": {
"id": 789,
"type": "deviceOverspeed",
"eventTime": "2025-12-15T17:30:00Z",
"deviceId": 42,
"positionId": 12345,
"geofenceId": null,
"maintenanceId": null,
"attributes": {
"speed": 145.8,
"speedLimit": 130.0
}
}
}
Device ID Bridge (ID Mapping)β
The system maintains a mapping between External IDs (from the Decoder/Traccar) and Internal Platform IDs.
The Problemβ
- The Decoder (based on Traccar) assigns its own incremental IDs to devices (e.g.,
deviceId: 2). - The Visla Platform uses its own unique database IDs (e.g.,
deviceId: 7). - Frontend clients and permission checks rely strictly on the Platform IDs.
The Solutionβ
The WebSocket service acts as an intelligent bridge using the protocolId (IMEI) as the source of truth across all three streams (Positions, Events, and Presence):
- Resolution on Receive: When data is received from Redis with an external ID, the service extracts the
protocolId. - Platform Lookup: It looks up the corresponding Platform ID via the
devicesservice internal API. - ID Overriding: The service overrides the
deviceIdin the message object with the Platform ID before broadcasting it. - Caching: To ensure high performance, these mappings are cached locally in the
ConnectionManager(protocol_to_idmap).
This same logic is also implemented in the Positions and Events background workers before data is persisted to the database.
Benefitβ
Frontend clients can always match positions with their device inventory using a single, consistent ID system, regardless of the underlying decoder implementation.
Device Permissionsβ
The WebSocket service implements device-level permissions:
- When a client connects, the service loads all devices accessible to that user
- Permissions are based on:
- Direct device ownership (
user_devicetable) - Group membership (
user_group+devices.group_id)
- Direct device ownership (
- Only position/event updates for authorized devices are sent to each client
Permission Loading Queryβ
SELECT d.id
FROM devices d
LEFT JOIN user_device ud ON d.id = ud.device_id
WHERE ud.user_id = :user_id
OR d.id IN (
SELECT d2.id FROM devices d2
JOIN groups g ON d2.group_id = g.id
JOIN user_group ug ON g.id = ug.group_id
WHERE ug.user_id = :user_id
)
Redis Integrationβ
Subscribed Streamsβ
| Stream Name | Purpose | Data Format |
|---|---|---|
positions | GPS position updates | {position: <JSON>, uniqueId: <string>} |
events | Device events/alarms | {event: <JSON>, deviceId: <int>} |
Stream Consumerβ
async def redis_subscriber():
redis_client = redis.from_url(settings.redis_url, decode_responses=True)
last_ids = {
'positions': '$',
'events': '$'
}
while True:
entries = await redis_client.xread(
{
'positions': last_ids['positions'],
'events': last_ids['events']
},
block=5000
)
for stream_name, messages in entries:
for msg_id, data in messages:
last_ids[stream_name] = msg_id
if stream_name == 'positions':
await handle_position(data)
elif stream_name == 'events':
await handle_event(data)
Connection Managementβ
Connection Managerβ
The service uses a ConnectionManager class to handle multiple connections per user:
class ConnectionManager:
def __init__(self):
# user_id -> list of WebSocket connections
self.connections: Dict[int, List[WebSocket]] = {}
# device_id -> set of authorized user_ids
self.device_permissions: Dict[int, Set[int]] = {}
async def connect(self, websocket: WebSocket, user_id: int):
await websocket.accept()
if user_id not in self.connections:
self.connections[user_id] = []
self.connections[user_id].append(websocket)
await self._load_user_devices(user_id)
async def broadcast_position(self, position: dict, device_id: int):
authorized_users = self.get_authorized_users(device_id)
message = {"type": "position", "data": position}
for user_id in authorized_users:
await self.broadcast_to_user(user_id, message)
Multiple Connectionsβ
- Users can have multiple simultaneous connections (e.g., web + mobile)
- Each connection receives the same updates
- Disconnections are handled gracefully without affecting other connections
Metricsβ
The service exposes Prometheus metrics at /metrics:
| Metric | Type | Description |
|---|---|---|
websocket_active_connections | Gauge | Current number of active WebSocket connections |
websocket_messages_sent_total | Counter | Total messages sent, labeled by message_type (position/event) |
Error Handlingβ
Connection Errorsβ
| Error Code | Reason | Action |
|---|---|---|
| 4001 | Missing or invalid token | Client should re-authenticate |
| 1000 | Normal closure | Connection closed cleanly |
| 1006 | Abnormal closure | Network issue - client should reconnect |
Reconnection Strategyβ
Clients should implement exponential backoff for reconnections:
let reconnectDelay = 3000; // Start with 3 seconds
const maxReconnectDelay = 30000; // Cap at 30 seconds
function connect() {
const ws = new WebSocket(wsUrl);
ws.onclose = () => {
console.log(`Reconnecting in ${reconnectDelay}ms...`);
setTimeout(() => {
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
connect();
}, reconnectDelay);
};
ws.onopen = () => {
reconnectDelay = 3000; // Reset on successful connection
};
}
Client Examplesβ
Web App Integrationβ
See web-app/src/lib/websocket-context.tsx for full React implementation.
React Hookβ
import { useWebSocket } from '@/hooks/useWebSocket';
function MyComponent() {
const { status, isConnected } = useWebSocket({
onMessage: (message) => {
if (message.type === 'position') {
updateDevicePosition(message.data);
} else if (message.type === 'event') {
showEventNotification(message.data);
}
},
autoConnect: true
});
return (
<div>
Status: {status}
{isConnected && 'β
Connected'}
</div>
);
}
Mobile App Integrationβ
iOS (Swift)β
import Starscream
class WebSocketManager: WebSocketDelegate {
var socket: WebSocket?
func connect(token: String) {
var request = URLRequest(url: URL(string: "wss://api.vislagps.com/api/socket?token=\(token)")!)
socket = WebSocket(request: request)
socket?.delegate = self
socket?.connect()
}
func didReceive(event: WebSocketEvent, client: WebSocket) {
switch event {
case .text(let text):
if let data = text.data(using: .utf8),
let message = try? JSONDecoder().decode(WebSocketMessage.self, from: data) {
handleMessage(message)
}
case .connected:
startHeartbeat()
default:
break
}
}
}
Android (Kotlin)β
import okhttp3.*
class WebSocketManager {
private var webSocket: WebSocket? = null
fun connect(token: String) {
val client = OkHttpClient()
val request = Request.Builder()
.url("wss://api.vislagps.com/api/socket?token=$token")
.build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
val message = Gson().fromJson(text, WebSocketMessage::class.java)
handleMessage(message)
}
override fun onOpen(webSocket: WebSocket, response: Response) {
startHeartbeat()
}
})
}
}
Monitoringβ
Grafana Dashboardβ
A comprehensive Grafana dashboard is available at:
- Dashboard: WebSocket Service - Real-time Metrics
- File:
monitoring/grafana/dashboards/microservices/websocket-metrics.json
Key Panelsβ
- Active Connections - Real-time gauge
- Messages Sent - Rate and total count
- Message Distribution - Pie chart by type (positions vs events)
- Activity Timeline - Messages over time
Logsβ
View WebSocket logs in Grafana Loki:
{container="websocket"} |= "connected"
{container="websocket"} |= "broadcast"
{container="websocket"} |~ "ERROR|error"
Best Practicesβ
Client Implementationβ
- β Always implement reconnection logic with exponential backoff
- β Send periodic pings to keep connection alive
- β Handle all message types gracefully (unknown types should be logged, not crash)
- β Store connection state in application state management
- β Show connection status to user (connected/disconnected indicator)
Performanceβ
- β Batch updates when processing multiple positions/events
- β Throttle UI updates to avoid excessive re-renders
- β Use virtual scrolling for event lists
- β Unsubscribe when not needed (e.g., user navigates away)
Securityβ
- β Never log JWT tokens in client code
- β Validate all incoming messages before processing
- β Use WSS (secure WebSocket) in production
- β Rotate tokens regularly and handle token refresh
Troubleshootingβ
Connection Refusedβ
Symptom: WebSocket connection fails immediately
Possible Causes:
- Nginx not routing
/api/socketto WebSocket service - WebSocket service not running
- Firewall blocking port 8086
Solution: Check nginx logs and verify service is running
Connection Closes After ~60 Secondsβ
Symptom: Connection drops after about 1 minute
Possible Causes:
- No heartbeat/ping being sent
- Nginx timeout too short
- Load balancer timeout
Solution: Implement ping/pong heartbeat every 30 seconds
Not Receiving Updatesβ
Symptom: Connected but no position/event updates
Possible Causes:
- User doesn't have permission to device
- Device not sending data
- Redis stream not receiving data
Solution:
- Check user permissions in database
- Verify device is active and sending positions
- Check Redis streams:
XREAD COUNT 10 STREAMS positions events $ $
Multiple Duplicate Messagesβ
Symptom: Receiving same position/event multiple times
Possible Causes:
- Multiple WebSocket connections for same user
- Client reconnecting too aggressively
Solution: Implement connection deduplication or use connection ID tracking
Configurationβ
Environment Variablesβ
# WebSocket Service (.env)
DATABASE_URL=postgresql://user:pass@postgres:5432/visla
REDIS_URL=redis://redis:6379/0
SECRET_KEY=<your-jwt-secret>
JWT_ALGORITHM=HS256
HOST=0.0.0.0
PORT=8086
POSITIONS_STREAM=positions
EVENTS_STREAM=events
Nginx Configurationβ
location /api/socket {
# Authenticate before WebSocket upgrade
auth_request /auth-validate;
# WebSocket upgrade
proxy_pass http://websocket:8086;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Timeouts
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
Related Documentationβ
- Backend Structure - Overall architecture
- Web App Development - Web client implementation
- Mobile Apps - iOS/Android integration
- Redis Streams - Data flow from decoder to WebSocket
FAQβ
Q: Can I connect from multiple tabs/devices?
A: Yes! Each connection is independent. The service supports multiple simultaneous connections per user.
Q: What happens if I lose connection?
A: The server will clean up your connection. Client should implement automatic reconnection.
Q: How do I know which devices I have access to?
A: The server automatically filters updates based on your user permissions. You only receive updates for devices you're authorized to see.
Q: Is there a rate limit?
A: No explicit rate limit, but connections are validated on connect. Excessive reconnections may be throttled by nginx.
Q: Can I request historical data over WebSocket?
A: No, WebSocket only broadcasts new updates. Use REST API /api/positions for historical data.