Skip to main content

WebSocket Real-time Service

Overview

The WebSocket service provides real-time GPS position updates and event notifications to web and mobile clients. It listens to Redis streams for position and event data, then broadcasts updates to connected clients over persistent WebSocket connections.

Architecture

Service Details

  • Port: 8086
  • Endpoint: wss://gateway.vislagps.com/api/socket or ws://localhost:8086/api/socket
  • Technology: Python (FastAPI + WebSockets)
  • Authentication: JWT token via query parameter
  • Protocol: WebSocket with JSON messages

Connection Flow

1. Client Connection

const token = localStorage.getItem('accessToken');
const ws = new WebSocket(`wss://gateway.vislagps.com/api/socket?token=${token}`);

ws.onopen = () => {
console.log('WebSocket connected');
};

ws.onmessage = (event) => {
const message = JSON.parse(event.data);
handleMessage(message);
};

ws.onerror = (error) => {
console.error('WebSocket error:', error);
};

ws.onclose = () => {
console.log('WebSocket disconnected');
// Implement reconnection logic
};

2. Authentication

  • JWT token passed as query parameter: ?token=<JWT>
  • Nginx validates token via auth_request before WebSocket upgrade
  • Invalid or missing token → connection rejected with code 4001

3. Heartbeat

Client should send periodic ping messages to keep connection alive:

setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping');
}
}, 30000); // Every 30 seconds

Server responds with pong to confirm connection is alive.

Message Format

All WebSocket messages follow this JSON structure:

type WebSocketMessage = PositionUpdate | EventUpdate;

interface PositionUpdate {
type: 'position';
data: Position;
}

interface EventUpdate {
type: 'event';
data: Event;
}

Position Update

{
"type": "position",
"data": {
"id": 12345,
"deviceId": 42,
"protocol": "gt06",
"deviceTime": "2025-12-15T17:30:00Z",
"fixTime": "2025-12-15T17:30:00Z",
"serverTime": "2025-12-15T17:30:01Z",
"outdated": false,
"valid": true,
"latitude": 45.4642,
"longitude": 9.1900,
"altitude": 120.5,
"speed": 45.8,
"course": 180.5,
"address": "Via Example, Milano, IT",
"accuracy": 10.0,
"attributes": {
"ignition": true,
"battery": 12.8,
"satellites": 12
}
}
}

Event Update

{
"type": "event",
"data": {
"id": 789,
"type": "deviceOverspeed",
"eventTime": "2025-12-15T17:30:00Z",
"deviceId": 42,
"positionId": 12345,
"geofenceId": null,
"maintenanceId": null,
"attributes": {
"speed": 145.8,
"speedLimit": 130.0
}
}
}

Device Permissions

The WebSocket service implements device-level permissions:

  1. When a client connects, the service loads all devices accessible to that user
  2. Permissions are based on:
    • Direct device ownership (user_device table)
    • Group membership (user_group + devices.group_id)
  3. Only position/event updates for authorized devices are sent to each client

Permission Loading Query

SELECT d.id 
FROM devices d
LEFT JOIN user_device ud ON d.id = ud.device_id
WHERE ud.user_id = :user_id
OR d.id IN (
SELECT d2.id FROM devices d2
JOIN groups g ON d2.group_id = g.id
JOIN user_group ug ON g.id = ug.group_id
WHERE ug.user_id = :user_id
)

Redis Integration

Subscribed Streams

Stream NamePurposeData Format
positionsGPS position updates{position: <JSON>, uniqueId: <string>}
eventsDevice events/alarms{event: <JSON>, deviceId: <int>}

Stream Consumer

async def redis_subscriber():
redis_client = redis.from_url(settings.redis_url, decode_responses=True)

last_ids = {
'positions': '$',
'events': '$'
}

while True:
entries = await redis_client.xread(
{
'positions': last_ids['positions'],
'events': last_ids['events']
},
block=5000
)

for stream_name, messages in entries:
for msg_id, data in messages:
last_ids[stream_name] = msg_id

if stream_name == 'positions':
await handle_position(data)
elif stream_name == 'events':
await handle_event(data)

Connection Management

Connection Manager

The service uses a ConnectionManager class to handle multiple connections per user:

class ConnectionManager:
def __init__(self):
# user_id -> list of WebSocket connections
self.connections: Dict[int, List[WebSocket]] = {}
# device_id -> set of authorized user_ids
self.device_permissions: Dict[int, Set[int]] = {}

async def connect(self, websocket: WebSocket, user_id: int):
await websocket.accept()
if user_id not in self.connections:
self.connections[user_id] = []
self.connections[user_id].append(websocket)
await self._load_user_devices(user_id)

async def broadcast_position(self, position: dict, device_id: int):
authorized_users = self.get_authorized_users(device_id)
message = {"type": "position", "data": position}

for user_id in authorized_users:
await self.broadcast_to_user(user_id, message)

Multiple Connections

  • Users can have multiple simultaneous connections (e.g., web + mobile)
  • Each connection receives the same updates
  • Disconnections are handled gracefully without affecting other connections

Metrics

The service exposes Prometheus metrics at /metrics:

MetricTypeDescription
websocket_active_connectionsGaugeCurrent number of active WebSocket connections
websocket_messages_sent_totalCounterTotal messages sent, labeled by message_type (position/event)

Error Handling

Connection Errors

Error CodeReasonAction
4001Missing or invalid tokenClient should re-authenticate
1000Normal closureConnection closed cleanly
1006Abnormal closureNetwork issue - client should reconnect

Reconnection Strategy

Clients should implement exponential backoff for reconnections:

let reconnectDelay = 3000; // Start with 3 seconds
const maxReconnectDelay = 30000; // Cap at 30 seconds

function connect() {
const ws = new WebSocket(wsUrl);

ws.onclose = () => {
console.log(`Reconnecting in ${reconnectDelay}ms...`);
setTimeout(() => {
reconnectDelay = Math.min(reconnectDelay * 2, maxReconnectDelay);
connect();
}, reconnectDelay);
};

ws.onopen = () => {
reconnectDelay = 3000; // Reset on successful connection
};
}

Client Examples

Web App Integration

See web-app/src/lib/websocket-context.tsx for full React implementation.

React Hook

import { useWebSocket } from '@/hooks/useWebSocket';

function MyComponent() {
const { status, isConnected } = useWebSocket({
onMessage: (message) => {
if (message.type === 'position') {
updateDevicePosition(message.data);
} else if (message.type === 'event') {
showEventNotification(message.data);
}
},
autoConnect: true
});

return (
<div>
Status: {status}
{isConnected && '✅ Connected'}
</div>
);
}

Mobile App Integration

iOS (Swift)

import Starscream

class WebSocketManager: WebSocketDelegate {
var socket: WebSocket?

func connect(token: String) {
var request = URLRequest(url: URL(string: "wss://gateway.vislagps.com/api/socket?token=\(token)")!)
socket = WebSocket(request: request)
socket?.delegate = self
socket?.connect()
}

func didReceive(event: WebSocketEvent, client: WebSocket) {
switch event {
case .text(let text):
if let data = text.data(using: .utf8),
let message = try? JSONDecoder().decode(WebSocketMessage.self, from: data) {
handleMessage(message)
}
case .connected:
startHeartbeat()
default:
break
}
}
}

Android (Kotlin)

import okhttp3.*

class WebSocketManager {
private var webSocket: WebSocket? = null

fun connect(token: String) {
val client = OkHttpClient()
val request = Request.Builder()
.url("wss://gateway.vislagps.com/api/socket?token=$token")
.build()

webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
val message = Gson().fromJson(text, WebSocketMessage::class.java)
handleMessage(message)
}

override fun onOpen(webSocket: WebSocket, response: Response) {
startHeartbeat()
}
})
}
}

Monitoring

Grafana Dashboard

A comprehensive Grafana dashboard is available at:

  • Dashboard: WebSocket Service - Real-time Metrics
  • File: monitoring/grafana/dashboards/microservices/websocket-metrics.json

Key Panels

  1. Active Connections - Real-time gauge
  2. Messages Sent - Rate and total count
  3. Message Distribution - Pie chart by type (positions vs events)
  4. Activity Timeline - Messages over time

Logs

View WebSocket logs in Grafana Loki:

{container="websocket"} |= "connected"
{container="websocket"} |= "broadcast"
{container="websocket"} |~ "ERROR|error"

Best Practices

Client Implementation

  1. Always implement reconnection logic with exponential backoff
  2. Send periodic pings to keep connection alive
  3. Handle all message types gracefully (unknown types should be logged, not crash)
  4. Store connection state in application state management
  5. Show connection status to user (connected/disconnected indicator)

Performance

  1. Batch updates when processing multiple positions/events
  2. Throttle UI updates to avoid excessive re-renders
  3. Use virtual scrolling for event lists
  4. Unsubscribe when not needed (e.g., user navigates away)

Security

  1. Never log JWT tokens in client code
  2. Validate all incoming messages before processing
  3. Use WSS (secure WebSocket) in production
  4. Rotate tokens regularly and handle token refresh

Troubleshooting

Connection Refused

Symptom: WebSocket connection fails immediately

Possible Causes:

  • Nginx not routing /api/socket to WebSocket service
  • WebSocket service not running
  • Firewall blocking port 8086

Solution: Check nginx logs and verify service is running

Connection Closes After ~60 Seconds

Symptom: Connection drops after about 1 minute

Possible Causes:

  • No heartbeat/ping being sent
  • Nginx timeout too short
  • Load balancer timeout

Solution: Implement ping/pong heartbeat every 30 seconds

Not Receiving Updates

Symptom: Connected but no position/event updates

Possible Causes:

  • User doesn't have permission to device
  • Device not sending data
  • Redis stream not receiving data

Solution:

  1. Check user permissions in database
  2. Verify device is active and sending positions
  3. Check Redis streams: XREAD COUNT 10 STREAMS positions events $ $

Multiple Duplicate Messages

Symptom: Receiving same position/event multiple times

Possible Causes:

  • Multiple WebSocket connections for same user
  • Client reconnecting too aggressively

Solution: Implement connection deduplication or use connection ID tracking

Configuration

Environment Variables

# WebSocket Service (.env)
DATABASE_URL=postgresql://user:pass@postgres:5432/visla
REDIS_URL=redis://redis:6379/0
SECRET_KEY=<your-jwt-secret>
JWT_ALGORITHM=HS256
HOST=0.0.0.0
PORT=8086
POSITIONS_STREAM=positions
EVENTS_STREAM=events

Nginx Configuration

location /api/socket {
# Authenticate before WebSocket upgrade
auth_request /auth-validate;

# WebSocket upgrade
proxy_pass http://websocket:8086;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;

# Timeouts
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}

FAQ

Q: Can I connect from multiple tabs/devices?
A: Yes! Each connection is independent. The service supports multiple simultaneous connections per user.

Q: What happens if I lose connection?
A: The server will clean up your connection. Client should implement automatic reconnection.

Q: How do I know which devices I have access to?
A: The server automatically filters updates based on your user permissions. You only receive updates for devices you're authorized to see.

Q: Is there a rate limit?
A: No explicit rate limit, but connections are validated on connect. Excessive reconnections may be throttled by nginx.

Q: Can I request historical data over WebSocket?
A: No, WebSocket only broadcasts new updates. Use REST API /api/positions for historical data.