Skip to main content

Real-Time WebSocket Architecture

This document covers the real-time communication layer of the Visla GPS Android app β€” how the client maintains a persistent WebSocket connection to the server, receives live updates (positions, device status, geofences, notifications, sharing events), and bridges those updates into the reactive DataStore layer for UI consumption.

Architecture Overview​

Data flow: Server pushes JSON β†’ OkHttp WebSocket receives it β†’ WebSocketManager parses the event type β†’ WebSocketMapper converts DTOs to domain entities β†’ emitted on SharedFlow channels β†’ RealTimeRepositoryImpl exposes them as Flow β†’ RealTimeDataBridge collects and writes to DataStores β†’ ViewModels observe DataStores via Compose state.


WebSocketManager​

Location: data/network/WebSocketManager.kt

The WebSocketManager is a @Singleton that owns the OkHttp WebSocket connection, manages the full connection lifecycle, and exposes typed SharedFlow channels for every event type the server can push.

OkHttp Client Configuration​

private val client = OkHttpClient.Builder()
.readTimeout(NetworkConstants.DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS) // 30s
.pingInterval(NetworkConstants.DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS) // 30s
.build()

The OkHttp client is separate from the HTTP client used for REST API calls. It configures:

ParameterValuePurpose
readTimeout30 secondsMax time to wait for data before considering connection dead
pingInterval30 secondsOkHttp-level WebSocket ping/pong to detect broken connections

Connection Lifecycle​

Authentication​

The WebSocket authenticates via a Bearer token appended as a URL query parameter:

val url = "$baseUrl/api/websocket?token=$token"

The token is obtained from TokenManager.accessToken. If no token is available, connection is refused and connectionError is set to "Not authenticated".

Connect​

fun connect() {
val token = tokenManager.accessToken ?: return // guard: not authenticated
disconnect() // clean up any existing connection
val request = Request.Builder().url("$baseUrl/api/websocket?token=$token").build()
webSocket = client.newWebSocket(request, listener)
}

On successful connection (onOpen):

  1. _isConnected StateFlow is set to true
  2. _connectionError is cleared
  3. Application-level ping timer starts

Disconnect​

fun disconnect() {
reconnectJob?.cancel()
stopPingTimer()
webSocket?.close(1000, "Client disconnect") // WEBSOCKET_CLOSE_NORMAL
webSocket = null
_isConnected.value = false
}

Sends a clean close frame with code 1000 (normal closure).

Heartbeat Mechanism​

Two layers of keep-alive exist:

LayerMechanismIntervalPurpose
OkHttppingInterval on client builder30sTransport-level ping/pong (RFC 6455 control frames)
ApplicationCoroutine sending "ping" text30sApplication-level heartbeat, server responds with "pong"
private fun startPingTimer() {
pingJob = scope.launch {
while (isActive) {
delay(30_000L) // PING_INTERVAL_MS
webSocket?.send("ping")
}
}
}

The handleMessage method filters out "pong" responses before JSON parsing:

if (text == "pong") return

Reconnection Strategy​

On connection failure (onFailure), the manager schedules a reconnect after a fixed delay:

private fun scheduleReconnect() {
reconnectJob = scope.launch {
delay(5_000L) // RECONNECT_DELAY_MS
connect()
}
}
ParameterValue
RECONNECT_DELAY_MS5,000 ms (5 seconds)

The reconnect job is cancelled if disconnect() is called or a new connect() begins. On reconnect, connect() calls disconnect() internally first, ensuring no duplicate connections.

Connection State​

Two StateFlow properties expose connection state to observers:

val isConnected: StateFlow<Boolean>       // true when onOpen, false on close/failure
val connectionError: StateFlow<String?> // error message from onFailure, null when healthy

Event Types & SharedFlow Channels​

Every event type gets its own MutableSharedFlow with a buffer, exposed as a read-only SharedFlow. The buffer ensures events are not lost if collectors are briefly suspended.

ChannelTypeBufferServer Event
positionUpdatesPosition64position
deviceStatusUpdatesPair<Int, String>64device_status
eventUpdatesPair<Int, String>64event
geofenceCreatedGeofence16geofence_created
geofenceUpdatedGeofence16geofence_updated
geofenceDeletedInt16geofence_deleted
notificationReceivedNotificationHistoryItem16notification
deviceSettingsChangedInt16device_settings_changed
shareRevokedInt16share_revoked
shareInviteReceivedShareInviteReceivedEvent16share_invite_received
shareAcceptedShareAcceptedEvent16share_accepted

Position and device status channels use a larger buffer (64) because they are the most frequent events β€” GPS devices may report positions every few seconds across many devices.

Message Format​

All WebSocket messages from the server follow this envelope:

{
"type": "<event_type>",
"data": { ... }
}

Example β€” position update:

{
"type": "position",
"data": {
"id": 12345,
"deviceId": 42,
"latitude": 47.6062,
"longitude": -122.3321,
"altitude": 56.0,
"speed": 12.5,
"course": 180.0,
"accuracy": 8.0,
"motion": true,
"ignition": true,
"batteryLevel": 85.0,
"rssi": -67,
"odometer": 15234.5,
"distance": 120.3,
"totalDistance": 98234.7,
"protocol": "osmand",
"serverTime": "2024-01-15T10:30:00Z",
"deviceTime": "2024-01-15T10:30:00Z",
"fixTime": "2024-01-15T10:29:58Z",
"attributes": { "sat": 12 }
}
}

Example β€” device status:

{
"type": "device_status",
"data": {
"deviceId": 42,
"status": "online"
}
}

Example β€” geofence created:

{
"type": "geofence_created",
"data": {
"id": 7,
"name": "Home",
"description": "Home area",
"type": "circle",
"center": { "lat": 47.6062, "lng": -122.3321 },
"radius": 200.0,
"color": "#FF5722",
"device_ids": [42, 43],
"area": null,
"attributes": {}
}
}

Example β€” notification:

{
"type": "notification",
"data": {
"id": 501,
"user_id": 1,
"title": "Geofence Alert",
"body": "Device 'Car' entered 'Home'",
"type": "geofenceEnter",
"channel": "websocket",
"sent": true,
"sent_at": "2024-01-15T10:30:00Z",
"device_id": 42,
"event_id": 9001,
"created_at": "2024-01-15T10:30:00Z"
}
}

Example β€” share invite received:

{
"type": "share_invite_received",
"data": {
"token": "abc123-invite-token",
"device_id": 42,
"device_name": "Family Car",
"owner_name": "John Doe",
"owner_email": "john@example.com",
"expires_at": "2024-02-15T10:30:00Z",
"permissions": {
"position": true,
"events": true,
"geofences": true,
"notifications": true,
"commands": false
}
}
}

Example β€” share accepted:

{
"type": "share_accepted",
"data": {
"device_id": 42,
"device_name": "Family Car",
"accepted_by_user_id": 5,
"accepted_by_email": "jane@example.com",
"permissions": {
"position": true,
"events": true,
"geofences": true,
"notifications": true,
"commands": false
}
}
}

Message Dispatch​

The handleMessage method parses the JSON envelope and dispatches to typed handlers:

private fun handleMessage(text: String) {
if (text == "pong") return

val json = JsonParser.parseString(text).asJsonObject
val type = json.get("type")?.asString ?: ""
val data = json.getAsJsonObject("data")

when (type) {
"position" -> handlePositionUpdate(data)
"device_status" -> handleDeviceStatus(data)
"event" -> handleEvent(data)
"geofence_created" -> handleGeofenceCreated(data)
"geofence_updated" -> handleGeofenceUpdated(data)
"geofence_deleted" -> handleGeofenceDeleted(data)
"notification" -> handleNotification(data)
"device_settings_changed" -> handleDeviceSettingsChanged(data)
"share_revoked" -> handleShareRevoked(data)
"share_invite_received" -> handleShareInviteReceived(data)
"share_accepted" -> handleShareAccepted(data)
else -> Log.d(TAG, "Unknown message type: $type")
}
}

Each handler deserializes the data object using Gson into the appropriate DTO, maps it to a domain entity via WebSocketMapper, then emits it with tryEmit():

private fun handlePositionUpdate(data: JsonObject) {
val dto = gson.fromJson(data, WsPositionDto::class.java)
val position = mapper.toDomain(dto)
_positionUpdates.tryEmit(position)
}

WebSocketMapper​

Location: data/mappers/WebSocketMapper.kt

A @Singleton that converts WebSocket DTOs to domain entities. The WebSocket DTOs exist separately from HTTP API DTOs because the backend services sending WebSocket events use slightly different field formats.

Mapping Responsibilities​

MethodInput DTOOutput EntityNotes
toDomain(WsPositionDto)Position dataPositionMerges batteryLevel into attributes map
toDomain(WsGeofenceDto)Geofence dataGeofenceConverts WsCoordinateDto to Coordinate, maps type string to GeofenceType enum
toDomain(WsNotificationDto)Notification dataNotificationHistoryItemGenerates negative ID from timestamp if id is null
toDomain(WsShareInviteReceivedDto)Share inviteShareInviteReceivedEventParses expiresAt with fallback to now + 7 days
toDomain(WsShareAcceptedDto)Share acceptedShareAcceptedEventMaps nested WsPermissionsDto to SharePermissions

Timestamp Parsing​

Timestamps from the WebSocket use ISO 8601 format. The mapper tries two parsing strategies with a graceful fallback:

private fun parseInstant(str: String?): Instant {
if (str.isNullOrBlank()) return Instant.now()
return try {
Instant.parse(str) // Try standard ISO instant
} catch (e: DateTimeParseException) {
try {
DateTimeFormatter.ISO_DATE_TIME.parse(str, Instant::from) // Try ISO datetime
} catch (e2: Exception) {
Instant.now() // Fallback: use current time
}
}
}

Permissions Mapping​

Share-related events include a WsPermissionsDto that maps to SharePermissions with safe defaults:

private fun toDomain(dto: WsPermissionsDto?): SharePermissions = SharePermissions(
position = dto?.position ?: true,
events = dto?.events ?: true,
geofences = dto?.geofences ?: true,
notifications = dto?.notifications ?: true,
commands = dto?.commands ?: false // commands default to false (restrictive)
)

RealTimeRepository​

Location: domain/repositories/RealTimeRepository.kt

The domain-layer port (interface) that abstracts real-time communication. The domain layer has no knowledge of WebSockets β€” it only sees Kotlin Flows.

interface RealTimeRepository {
val isConnected: StateFlow<Boolean>
val connectionError: StateFlow<String?>
val positionUpdates: Flow<Position>
val deviceStatusUpdates: Flow<Pair<Int, String>>
val eventUpdates: Flow<Pair<Int, String>>
val geofenceCreated: Flow<Geofence>
val geofenceUpdated: Flow<Geofence>
val geofenceDeleted: Flow<Int>
val notificationReceived: Flow<NotificationHistoryItem>
val deviceSettingsChanged: Flow<Int>
val shareRevoked: Flow<Int>
val shareInviteReceived: Flow<ShareInviteReceivedEvent>
val shareAccepted: Flow<ShareAcceptedEvent>

fun connect()
fun disconnect()
}

RealTimeRepositoryImpl​

Location: data/repositories/RealTimeRepositoryImpl.kt

A thin @Singleton adapter that delegates every property and method directly to WebSocketManager:

@Singleton
class RealTimeRepositoryImpl @Inject constructor(
private val webSocketManager: WebSocketManager
) : RealTimeRepository {
override val isConnected get() = webSocketManager.isConnected
override val positionUpdates get() = webSocketManager.positionUpdates
// ... all other flows delegate identically
override fun connect() = webSocketManager.connect()
override fun disconnect() = webSocketManager.disconnect()
}

RealTimeDataBridge​

Location: core/data/RealTimeDataBridge.kt

The RealTimeDataBridge is the central orchestrator that connects the WebSocket event streams to the in-memory DataStores. It is a @Singleton that creates its own CoroutineScope and launches parallel collectors for every event channel.

Startup Flow​

Pending Updates Queue​

A race condition exists: WebSocket events may arrive before the initial REST API device load completes. The bridge handles this with CopyOnWriteArrayList queues:

private fun setupPositionUpdates(pendingPositions: CopyOnWriteArrayList<Position>) {
scope?.launch {
realTimeRepository.positionUpdates.collect { position ->
if (deviceDataStore.isInitialized.value) {
deviceDataStore.updatePosition(position)
} else {
pendingPositions.add(position)
}
}
}
}

private fun setupPendingUpdatesProcessor(...) {
scope?.launch {
deviceDataStore.isInitialized.first { it } // suspends until true
pendingPositions.forEach { deviceDataStore.updatePosition(it) }
pendingPositions.clear()
pendingStatusUpdates.forEach { (id, status) -> deviceDataStore.updateDeviceStatus(id, status) }
pendingStatusUpdates.clear()
}
}

DataStore Routing​

Each event type routes to its corresponding DataStore:

EventDataStoreMethod
Position updateDeviceDataStoreupdatePosition(position)
Device statusDeviceDataStoreupdateDeviceStatus(deviceId, status)
Geofence createdGeofenceDataStoreaddGeofence(geofence)
Geofence updatedGeofenceDataStoreupdateGeofence(geofence)
Geofence deletedGeofenceDataStoreremoveGeofence(geofenceId)
NotificationNotificationDataStoreaddNotification(notification, emitEvent = true)
Device settings changedDeviceDataStoreupdateDevice(device) β€” re-fetches from REST API via DeviceRepository
Share revokedDeviceDataStore + GeofenceDataStoreremoveDevice(deviceId) + invalidateDevice(deviceId)
Share invite receivedInviteDataStoreaddInvite(event)
Share acceptedSharingDataStoreonShareAccepted(event)

Notable: device_settings_changed doesn't carry the full device payload β€” the bridge re-fetches the device from the REST API via lazyDeviceRepository.get().getDevice(deviceId) and then updates the DataStore. The Lazy<DeviceRepository> wrapper avoids a circular dependency in Hilt.

Lifecycle Integration​

  • Start: Called from DevicesViewModel after the initial device load succeeds. This ensures the DataStores have baseline data before WebSocket events start flowing.
  • Stop: Called from AuthRepositoryImpl.logout() before clearing DataStores and calling the logout API.
  • Reconnect: reconnect() can be called after claiming a device or accepting a share invite β€” the server needs to know about new device permissions, so the WebSocket is disconnected and reconnected.
  • Background: The WebSocket stays connected when the app goes to the background. There is no explicit disconnect on onStop.
fun stop() {
if (!isRunning) return
realTimeRepository.disconnect()
scope?.cancel()
scope = null
isRunning = false
}

fun reconnect() {
realTimeRepository.disconnect()
realTimeRepository.connect()
}

Real-Time Use Cases​

Location: domain/usecases/realtime/

Thin use case wrappers that expose individual real-time flows from RealTimeRepository:

Use CaseReturns
ObservePositionUpdatesUseCaseFlow<Position>
ObserveDeviceStatusUpdatesUseCaseFlow<Pair<Int, String>>
ObserveEventUpdatesUseCaseFlow<Pair<Int, String>>
ObserveConnectionStateUseCaseStateFlow<Boolean>
ManageConnectionUseCaseconnect() / disconnect() methods

RealTimeDevicesInteractor​

A facade that bundles the most commonly used real-time use cases for ViewModels:

class RealTimeDevicesInteractor @Inject constructor(
private val observePositionUpdatesUseCase: ObservePositionUpdatesUseCase,
private val observeDeviceStatusUpdatesUseCase: ObserveDeviceStatusUpdatesUseCase,
private val observeConnectionStateUseCase: ObserveConnectionStateUseCase,
private val manageConnectionUseCase: ManageConnectionUseCase
) {
fun observePositionUpdates(): Flow<Position> = observePositionUpdatesUseCase()
fun observeDeviceStatusUpdates(): Flow<Pair<Int, String>> = observeDeviceStatusUpdatesUseCase()
fun observeConnectionState(): StateFlow<Boolean> = observeConnectionStateUseCase()
fun connect() { manageConnectionUseCase.connect() }
fun disconnect() { manageConnectionUseCase.disconnect() }
}

End-to-End Flow Example​

A GPS device sends a position. Here's what happens:


Design Decisions​

Why WebSocket over polling​

GPS tracking demands real-time updates β€” a device's position changes every few seconds. HTTP polling would require frequent requests, wasting bandwidth and battery. WebSocket provides:

  • True real-time: Sub-second delivery without polling intervals
  • Bandwidth efficiency: No repeated HTTP headers; a single persistent TCP connection
  • Server-initiated push: The server sends data when it has it, not when the client asks
  • Multiplexed events: Position, status, geofence, notification, and sharing events all flow over a single connection

Why OkHttp WebSocket over Socket.IO / Scarlet​

  • Consistency: The app already uses OkHttp for all HTTP communication (Retrofit). Using OkHttp's built-in WebSocket support means one HTTP stack, one connection pool, one set of timeouts.
  • Simplicity: The server protocol is a simple JSON envelope ({"type": "...", "data": {...}}). There is no need for Socket.IO's room/namespace abstraction or Scarlet's annotation-based API.
  • No additional dependencies: OkHttp WebSocket comes free with the existing OkHttp dependency. Socket.IO and Scarlet would each add transitive dependencies and complexity.
  • Full control: Manual connect() / disconnect() / scheduleReconnect() gives explicit control over the connection lifecycle, making it easier to reason about and debug.

Why SharedFlow with buffer​

private val _positionUpdates = MutableSharedFlow<Position>(extraBufferCapacity = 64)
  • No event loss: extraBufferCapacity allows tryEmit() to succeed even when collectors are temporarily suspended (e.g., during DataStore writes). Without a buffer, tryEmit() would drop events.
  • Backpressure handling: The buffer absorbs bursts of rapid events (e.g., many devices reporting simultaneously). Position/status channels use 64 slots; less frequent events use 16.
  • No replay: replay = 0 (default) means late subscribers don't receive stale data β€” they only see new events from the moment they start collecting.
  • tryEmit() over emit(): tryEmit() is non-suspending and safe to call from OkHttp's WebSocket callback thread. It returns false only if the buffer is full, which is extremely unlikely with the configured sizes.

Why RealTimeDataBridge as intermediary​

  • Separation of concerns: WebSocketManager handles the connection; DataStores handle state. The bridge is the only component that knows both exist.
  • DataStore consistency: The bridge ensures events only reach DataStores after initialization, using the pending queue pattern to avoid race conditions.
  • Centralized event routing: All WebSocket-to-DataStore wiring lives in one place, making it easy to add new event types or change routing logic.
  • Lifecycle control: start() / stop() / reconnect() provide clear lifecycle hooks for the rest of the app (ViewModel, auth) without leaking WebSocket details.
  • Testability: The bridge depends on RealTimeRepository (interface), not WebSocketManager (concrete). Tests can provide a fake repository that emits controlled events.