Real-Time WebSocket Architecture
This document covers the real-time communication layer of the Visla GPS Android app β how the client maintains a persistent WebSocket connection to the server, receives live updates (positions, device status, geofences, notifications, sharing events), and bridges those updates into the reactive DataStore layer for UI consumption.
Architecture Overviewβ
Data flow: Server pushes JSON β OkHttp WebSocket receives it β WebSocketManager parses the event type β WebSocketMapper converts DTOs to domain entities β emitted on SharedFlow channels β RealTimeRepositoryImpl exposes them as Flow β RealTimeDataBridge collects and writes to DataStores β ViewModels observe DataStores via Compose state.
WebSocketManagerβ
Location: data/network/WebSocketManager.kt
The WebSocketManager is a @Singleton that owns the OkHttp WebSocket connection, manages the full connection lifecycle, and exposes typed SharedFlow channels for every event type the server can push.
OkHttp Client Configurationβ
private val client = OkHttpClient.Builder()
.readTimeout(NetworkConstants.DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS) // 30s
.pingInterval(NetworkConstants.DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS) // 30s
.build()
The OkHttp client is separate from the HTTP client used for REST API calls. It configures:
| Parameter | Value | Purpose |
|---|---|---|
readTimeout | 30 seconds | Max time to wait for data before considering connection dead |
pingInterval | 30 seconds | OkHttp-level WebSocket ping/pong to detect broken connections |
Connection Lifecycleβ
Authenticationβ
The WebSocket authenticates via a Bearer token appended as a URL query parameter:
val url = "$baseUrl/api/websocket?token=$token"
The token is obtained from TokenManager.accessToken. If no token is available, connection is refused and connectionError is set to "Not authenticated".
Connectβ
fun connect() {
val token = tokenManager.accessToken ?: return // guard: not authenticated
disconnect() // clean up any existing connection
val request = Request.Builder().url("$baseUrl/api/websocket?token=$token").build()
webSocket = client.newWebSocket(request, listener)
}
On successful connection (onOpen):
_isConnectedStateFlow is set totrue_connectionErroris cleared- Application-level ping timer starts
Disconnectβ
fun disconnect() {
reconnectJob?.cancel()
stopPingTimer()
webSocket?.close(1000, "Client disconnect") // WEBSOCKET_CLOSE_NORMAL
webSocket = null
_isConnected.value = false
}
Sends a clean close frame with code 1000 (normal closure).
Heartbeat Mechanismβ
Two layers of keep-alive exist:
| Layer | Mechanism | Interval | Purpose |
|---|---|---|---|
| OkHttp | pingInterval on client builder | 30s | Transport-level ping/pong (RFC 6455 control frames) |
| Application | Coroutine sending "ping" text | 30s | Application-level heartbeat, server responds with "pong" |
private fun startPingTimer() {
pingJob = scope.launch {
while (isActive) {
delay(30_000L) // PING_INTERVAL_MS
webSocket?.send("ping")
}
}
}
The handleMessage method filters out "pong" responses before JSON parsing:
if (text == "pong") return
Reconnection Strategyβ
On connection failure (onFailure), the manager schedules a reconnect after a fixed delay:
private fun scheduleReconnect() {
reconnectJob = scope.launch {
delay(5_000L) // RECONNECT_DELAY_MS
connect()
}
}
| Parameter | Value |
|---|---|
RECONNECT_DELAY_MS | 5,000 ms (5 seconds) |
The reconnect job is cancelled if disconnect() is called or a new connect() begins. On reconnect, connect() calls disconnect() internally first, ensuring no duplicate connections.
Connection Stateβ
Two StateFlow properties expose connection state to observers:
val isConnected: StateFlow<Boolean> // true when onOpen, false on close/failure
val connectionError: StateFlow<String?> // error message from onFailure, null when healthy
Event Types & SharedFlow Channelsβ
Every event type gets its own MutableSharedFlow with a buffer, exposed as a read-only SharedFlow. The buffer ensures events are not lost if collectors are briefly suspended.
| Channel | Type | Buffer | Server Event |
|---|---|---|---|
positionUpdates | Position | 64 | position |
deviceStatusUpdates | Pair<Int, String> | 64 | device_status |
eventUpdates | Pair<Int, String> | 64 | event |
geofenceCreated | Geofence | 16 | geofence_created |
geofenceUpdated | Geofence | 16 | geofence_updated |
geofenceDeleted | Int | 16 | geofence_deleted |
notificationReceived | NotificationHistoryItem | 16 | notification |
deviceSettingsChanged | Int | 16 | device_settings_changed |
shareRevoked | Int | 16 | share_revoked |
shareInviteReceived | ShareInviteReceivedEvent | 16 | share_invite_received |
shareAccepted | ShareAcceptedEvent | 16 | share_accepted |
Position and device status channels use a larger buffer (64) because they are the most frequent events β GPS devices may report positions every few seconds across many devices.
Message Formatβ
All WebSocket messages from the server follow this envelope:
{
"type": "<event_type>",
"data": { ... }
}
Example β position update:
{
"type": "position",
"data": {
"id": 12345,
"deviceId": 42,
"latitude": 47.6062,
"longitude": -122.3321,
"altitude": 56.0,
"speed": 12.5,
"course": 180.0,
"accuracy": 8.0,
"motion": true,
"ignition": true,
"batteryLevel": 85.0,
"rssi": -67,
"odometer": 15234.5,
"distance": 120.3,
"totalDistance": 98234.7,
"protocol": "osmand",
"serverTime": "2024-01-15T10:30:00Z",
"deviceTime": "2024-01-15T10:30:00Z",
"fixTime": "2024-01-15T10:29:58Z",
"attributes": { "sat": 12 }
}
}
Example β device status:
{
"type": "device_status",
"data": {
"deviceId": 42,
"status": "online"
}
}
Example β geofence created:
{
"type": "geofence_created",
"data": {
"id": 7,
"name": "Home",
"description": "Home area",
"type": "circle",
"center": { "lat": 47.6062, "lng": -122.3321 },
"radius": 200.0,
"color": "#FF5722",
"device_ids": [42, 43],
"area": null,
"attributes": {}
}
}
Example β notification:
{
"type": "notification",
"data": {
"id": 501,
"user_id": 1,
"title": "Geofence Alert",
"body": "Device 'Car' entered 'Home'",
"type": "geofenceEnter",
"channel": "websocket",
"sent": true,
"sent_at": "2024-01-15T10:30:00Z",
"device_id": 42,
"event_id": 9001,
"created_at": "2024-01-15T10:30:00Z"
}
}
Example β share invite received:
{
"type": "share_invite_received",
"data": {
"token": "abc123-invite-token",
"device_id": 42,
"device_name": "Family Car",
"owner_name": "John Doe",
"owner_email": "john@example.com",
"expires_at": "2024-02-15T10:30:00Z",
"permissions": {
"position": true,
"events": true,
"geofences": true,
"notifications": true,
"commands": false
}
}
}
Example β share accepted:
{
"type": "share_accepted",
"data": {
"device_id": 42,
"device_name": "Family Car",
"accepted_by_user_id": 5,
"accepted_by_email": "jane@example.com",
"permissions": {
"position": true,
"events": true,
"geofences": true,
"notifications": true,
"commands": false
}
}
}
Message Dispatchβ
The handleMessage method parses the JSON envelope and dispatches to typed handlers:
private fun handleMessage(text: String) {
if (text == "pong") return
val json = JsonParser.parseString(text).asJsonObject
val type = json.get("type")?.asString ?: ""
val data = json.getAsJsonObject("data")
when (type) {
"position" -> handlePositionUpdate(data)
"device_status" -> handleDeviceStatus(data)
"event" -> handleEvent(data)
"geofence_created" -> handleGeofenceCreated(data)
"geofence_updated" -> handleGeofenceUpdated(data)
"geofence_deleted" -> handleGeofenceDeleted(data)
"notification" -> handleNotification(data)
"device_settings_changed" -> handleDeviceSettingsChanged(data)
"share_revoked" -> handleShareRevoked(data)
"share_invite_received" -> handleShareInviteReceived(data)
"share_accepted" -> handleShareAccepted(data)
else -> Log.d(TAG, "Unknown message type: $type")
}
}
Each handler deserializes the data object using Gson into the appropriate DTO, maps it to a domain entity via WebSocketMapper, then emits it with tryEmit():
private fun handlePositionUpdate(data: JsonObject) {
val dto = gson.fromJson(data, WsPositionDto::class.java)
val position = mapper.toDomain(dto)
_positionUpdates.tryEmit(position)
}
WebSocketMapperβ
Location: data/mappers/WebSocketMapper.kt
A @Singleton that converts WebSocket DTOs to domain entities. The WebSocket DTOs exist separately from HTTP API DTOs because the backend services sending WebSocket events use slightly different field formats.
Mapping Responsibilitiesβ
| Method | Input DTO | Output Entity | Notes |
|---|---|---|---|
toDomain(WsPositionDto) | Position data | Position | Merges batteryLevel into attributes map |
toDomain(WsGeofenceDto) | Geofence data | Geofence | Converts WsCoordinateDto to Coordinate, maps type string to GeofenceType enum |
toDomain(WsNotificationDto) | Notification data | NotificationHistoryItem | Generates negative ID from timestamp if id is null |
toDomain(WsShareInviteReceivedDto) | Share invite | ShareInviteReceivedEvent | Parses expiresAt with fallback to now + 7 days |
toDomain(WsShareAcceptedDto) | Share accepted | ShareAcceptedEvent | Maps nested WsPermissionsDto to SharePermissions |
Timestamp Parsingβ
Timestamps from the WebSocket use ISO 8601 format. The mapper tries two parsing strategies with a graceful fallback:
private fun parseInstant(str: String?): Instant {
if (str.isNullOrBlank()) return Instant.now()
return try {
Instant.parse(str) // Try standard ISO instant
} catch (e: DateTimeParseException) {
try {
DateTimeFormatter.ISO_DATE_TIME.parse(str, Instant::from) // Try ISO datetime
} catch (e2: Exception) {
Instant.now() // Fallback: use current time
}
}
}
Permissions Mappingβ
Share-related events include a WsPermissionsDto that maps to SharePermissions with safe defaults:
private fun toDomain(dto: WsPermissionsDto?): SharePermissions = SharePermissions(
position = dto?.position ?: true,
events = dto?.events ?: true,
geofences = dto?.geofences ?: true,
notifications = dto?.notifications ?: true,
commands = dto?.commands ?: false // commands default to false (restrictive)
)
RealTimeRepositoryβ
Location: domain/repositories/RealTimeRepository.kt
The domain-layer port (interface) that abstracts real-time communication. The domain layer has no knowledge of WebSockets β it only sees Kotlin Flows.
interface RealTimeRepository {
val isConnected: StateFlow<Boolean>
val connectionError: StateFlow<String?>
val positionUpdates: Flow<Position>
val deviceStatusUpdates: Flow<Pair<Int, String>>
val eventUpdates: Flow<Pair<Int, String>>
val geofenceCreated: Flow<Geofence>
val geofenceUpdated: Flow<Geofence>
val geofenceDeleted: Flow<Int>
val notificationReceived: Flow<NotificationHistoryItem>
val deviceSettingsChanged: Flow<Int>
val shareRevoked: Flow<Int>
val shareInviteReceived: Flow<ShareInviteReceivedEvent>
val shareAccepted: Flow<ShareAcceptedEvent>
fun connect()
fun disconnect()
}
RealTimeRepositoryImplβ
Location: data/repositories/RealTimeRepositoryImpl.kt
A thin @Singleton adapter that delegates every property and method directly to WebSocketManager:
@Singleton
class RealTimeRepositoryImpl @Inject constructor(
private val webSocketManager: WebSocketManager
) : RealTimeRepository {
override val isConnected get() = webSocketManager.isConnected
override val positionUpdates get() = webSocketManager.positionUpdates
// ... all other flows delegate identically
override fun connect() = webSocketManager.connect()
override fun disconnect() = webSocketManager.disconnect()
}
RealTimeDataBridgeβ
Location: core/data/RealTimeDataBridge.kt
The RealTimeDataBridge is the central orchestrator that connects the WebSocket event streams to the in-memory DataStores. It is a @Singleton that creates its own CoroutineScope and launches parallel collectors for every event channel.
Startup Flowβ
Pending Updates Queueβ
A race condition exists: WebSocket events may arrive before the initial REST API device load completes. The bridge handles this with CopyOnWriteArrayList queues:
private fun setupPositionUpdates(pendingPositions: CopyOnWriteArrayList<Position>) {
scope?.launch {
realTimeRepository.positionUpdates.collect { position ->
if (deviceDataStore.isInitialized.value) {
deviceDataStore.updatePosition(position)
} else {
pendingPositions.add(position)
}
}
}
}
private fun setupPendingUpdatesProcessor(...) {
scope?.launch {
deviceDataStore.isInitialized.first { it } // suspends until true
pendingPositions.forEach { deviceDataStore.updatePosition(it) }
pendingPositions.clear()
pendingStatusUpdates.forEach { (id, status) -> deviceDataStore.updateDeviceStatus(id, status) }
pendingStatusUpdates.clear()
}
}
DataStore Routingβ
Each event type routes to its corresponding DataStore:
| Event | DataStore | Method |
|---|---|---|
| Position update | DeviceDataStore | updatePosition(position) |
| Device status | DeviceDataStore | updateDeviceStatus(deviceId, status) |
| Geofence created | GeofenceDataStore | addGeofence(geofence) |
| Geofence updated | GeofenceDataStore | updateGeofence(geofence) |
| Geofence deleted | GeofenceDataStore | removeGeofence(geofenceId) |
| Notification | NotificationDataStore | addNotification(notification, emitEvent = true) |
| Device settings changed | DeviceDataStore | updateDevice(device) β re-fetches from REST API via DeviceRepository |
| Share revoked | DeviceDataStore + GeofenceDataStore | removeDevice(deviceId) + invalidateDevice(deviceId) |
| Share invite received | InviteDataStore | addInvite(event) |
| Share accepted | SharingDataStore | onShareAccepted(event) |
Notable: device_settings_changed doesn't carry the full device payload β the bridge re-fetches the device from the REST API via lazyDeviceRepository.get().getDevice(deviceId) and then updates the DataStore. The Lazy<DeviceRepository> wrapper avoids a circular dependency in Hilt.
Lifecycle Integrationβ
- Start: Called from
DevicesViewModelafter the initial device load succeeds. This ensures the DataStores have baseline data before WebSocket events start flowing. - Stop: Called from
AuthRepositoryImpl.logout()before clearing DataStores and calling the logout API. - Reconnect:
reconnect()can be called after claiming a device or accepting a share invite β the server needs to know about new device permissions, so the WebSocket is disconnected and reconnected. - Background: The WebSocket stays connected when the app goes to the background. There is no explicit disconnect on
onStop.
fun stop() {
if (!isRunning) return
realTimeRepository.disconnect()
scope?.cancel()
scope = null
isRunning = false
}
fun reconnect() {
realTimeRepository.disconnect()
realTimeRepository.connect()
}
Real-Time Use Casesβ
Location: domain/usecases/realtime/
Thin use case wrappers that expose individual real-time flows from RealTimeRepository:
| Use Case | Returns |
|---|---|
ObservePositionUpdatesUseCase | Flow<Position> |
ObserveDeviceStatusUpdatesUseCase | Flow<Pair<Int, String>> |
ObserveEventUpdatesUseCase | Flow<Pair<Int, String>> |
ObserveConnectionStateUseCase | StateFlow<Boolean> |
ManageConnectionUseCase | connect() / disconnect() methods |
RealTimeDevicesInteractorβ
A facade that bundles the most commonly used real-time use cases for ViewModels:
class RealTimeDevicesInteractor @Inject constructor(
private val observePositionUpdatesUseCase: ObservePositionUpdatesUseCase,
private val observeDeviceStatusUpdatesUseCase: ObserveDeviceStatusUpdatesUseCase,
private val observeConnectionStateUseCase: ObserveConnectionStateUseCase,
private val manageConnectionUseCase: ManageConnectionUseCase
) {
fun observePositionUpdates(): Flow<Position> = observePositionUpdatesUseCase()
fun observeDeviceStatusUpdates(): Flow<Pair<Int, String>> = observeDeviceStatusUpdatesUseCase()
fun observeConnectionState(): StateFlow<Boolean> = observeConnectionStateUseCase()
fun connect() { manageConnectionUseCase.connect() }
fun disconnect() { manageConnectionUseCase.disconnect() }
}
End-to-End Flow Exampleβ
A GPS device sends a position. Here's what happens:
Design Decisionsβ
Why WebSocket over pollingβ
GPS tracking demands real-time updates β a device's position changes every few seconds. HTTP polling would require frequent requests, wasting bandwidth and battery. WebSocket provides:
- True real-time: Sub-second delivery without polling intervals
- Bandwidth efficiency: No repeated HTTP headers; a single persistent TCP connection
- Server-initiated push: The server sends data when it has it, not when the client asks
- Multiplexed events: Position, status, geofence, notification, and sharing events all flow over a single connection
Why OkHttp WebSocket over Socket.IO / Scarletβ
- Consistency: The app already uses OkHttp for all HTTP communication (Retrofit). Using OkHttp's built-in WebSocket support means one HTTP stack, one connection pool, one set of timeouts.
- Simplicity: The server protocol is a simple JSON envelope (
{"type": "...", "data": {...}}). There is no need for Socket.IO's room/namespace abstraction or Scarlet's annotation-based API. - No additional dependencies: OkHttp WebSocket comes free with the existing OkHttp dependency. Socket.IO and Scarlet would each add transitive dependencies and complexity.
- Full control: Manual
connect()/disconnect()/scheduleReconnect()gives explicit control over the connection lifecycle, making it easier to reason about and debug.
Why SharedFlow with bufferβ
private val _positionUpdates = MutableSharedFlow<Position>(extraBufferCapacity = 64)
- No event loss:
extraBufferCapacityallowstryEmit()to succeed even when collectors are temporarily suspended (e.g., during DataStore writes). Without a buffer,tryEmit()would drop events. - Backpressure handling: The buffer absorbs bursts of rapid events (e.g., many devices reporting simultaneously). Position/status channels use 64 slots; less frequent events use 16.
- No replay:
replay = 0(default) means late subscribers don't receive stale data β they only see new events from the moment they start collecting. tryEmit()overemit():tryEmit()is non-suspending and safe to call from OkHttp's WebSocket callback thread. It returnsfalseonly if the buffer is full, which is extremely unlikely with the configured sizes.
Why RealTimeDataBridge as intermediaryβ
- Separation of concerns:
WebSocketManagerhandles the connection; DataStores handle state. The bridge is the only component that knows both exist. - DataStore consistency: The bridge ensures events only reach DataStores after initialization, using the pending queue pattern to avoid race conditions.
- Centralized event routing: All WebSocket-to-DataStore wiring lives in one place, making it easy to add new event types or change routing logic.
- Lifecycle control:
start()/stop()/reconnect()provide clear lifecycle hooks for the rest of the app (ViewModel, auth) without leaking WebSocket details. - Testability: The bridge depends on
RealTimeRepository(interface), notWebSocketManager(concrete). Tests can provide a fake repository that emits controlled events.