Skip to main content

Real-Time Service — Data Layer

The real-time service provides live updates over WebSocket: position changes, device status transitions, geofence mutations, notifications, and sharing events. Unlike every other data-layer service, it does not use Retrofit — it relies on an OkHttp WebSocket managed by WebSocketManager.

WebSocketManager

Singleton at data/network/WebSocketManager.kt. Owns the OkHttp WebSocket connection, JSON parsing, DTO→domain mapping (via WebSocketMapper), and exposes reactive Flow/StateFlow properties.

Constructor

@Singleton
class WebSocketManager @Inject constructor(
private val tokenManager: TokenManager,
private val mapper: WebSocketMapper
)

Connection lifecycle

MethodDescription
connect()Reads the access token from TokenManager, opens $WS_BASE_URL/api/websocket?token=…, and starts a 30 s ping timer on open. Sets a connection error and returns early if no token.
disconnect()Cancels reconnect/ping jobs, sends a normal-close frame, and sets isConnected = false.
cleanup()Cancels the coroutine scope and disconnects. Called on app teardown.

On failure the listener schedules an automatic reconnect after 5 seconds.

Exposed flows

FlowTypeEmitted when
isConnectedStateFlow<Boolean>WebSocket opens or closes
connectionErrorStateFlow<String?>Connection failure occurs
positionUpdatesSharedFlow<Position>"position" message received
deviceStatusUpdatesSharedFlow<Pair<Int, String>>"device_status" message received
eventUpdatesSharedFlow<Pair<Int, String>>"event" message received
geofenceCreatedSharedFlow<Geofence>"geofence_created" message received
geofenceUpdatedSharedFlow<Geofence>"geofence_updated" message received
geofenceDeletedSharedFlow<Int>"geofence_deleted" message received
notificationReceivedSharedFlow<NotificationHistoryItem>"notification" message received
deviceSettingsChangedSharedFlow<Int>"device_settings_changed" message received
shareRevokedSharedFlow<Int>"share_revoked" message received
shareInviteReceivedSharedFlow<ShareInviteReceivedEvent>"share_invite_received" message received
shareAcceptedSharedFlow<ShareAcceptedEvent>"share_accepted" message received

All SharedFlows use extraBufferCapacity (64 for high-frequency position/status/event, 16 for the rest) so slow collectors don't block the WebSocket thread.

Message handling

Incoming text frames are JSON objects with { "type": "…", "data": {…} }. The handleMessage method dispatches on type:

when (type) {
"position" -> handlePositionUpdate(data)
"device_status" -> handleDeviceStatus(data)
"event" -> handleEvent(data)
"geofence_created" -> handleGeofenceCreated(data)
"geofence_updated" -> handleGeofenceUpdated(data)
"geofence_deleted" -> handleGeofenceDeleted(data)
"notification" -> handleNotification(data)
"device_settings_changed" -> handleDeviceSettingsChanged(data)
"share_revoked" -> handleShareRevoked(data)
"share_invite_received" -> handleShareInviteReceived(data)
"share_accepted" -> handleShareAccepted(data)
else -> Log.d(TAG, "Unknown message type: $type")
}

For messages that carry rich payloads (position, geofence_*, notification, share_*), the data JSON is deserialized into a WebSocket DTO via Gson and then mapped to a domain entity through WebSocketMapper. Simpler messages (device_status, event, device_settings_changed, share_revoked, geofence_deleted) extract fields directly from the JsonObject.

RealTimeRepository

Domain interface at domain/repositories/RealTimeRepository.kt. A thin port that mirrors the flows and lifecycle methods of WebSocketManager using domain types only.

interface RealTimeRepository {
val isConnected: StateFlow<Boolean>
val connectionError: StateFlow<String?>
val positionUpdates: Flow<Position>
val deviceStatusUpdates: Flow<Pair<Int, String>>
val eventUpdates: Flow<Pair<Int, String>>
val geofenceCreated: Flow<Geofence>
val geofenceUpdated: Flow<Geofence>
val geofenceDeleted: Flow<Int>
val notificationReceived: Flow<NotificationHistoryItem>
val deviceSettingsChanged: Flow<Int>
val shareRevoked: Flow<Int>
val shareInviteReceived: Flow<ShareInviteReceivedEvent>
val shareAccepted: Flow<ShareAcceptedEvent>

fun connect()
fun disconnect()
}

RealTimeRepositoryImpl

Implementation at data/repositories/RealTimeRepositoryImpl.kt. A pure delegate — every property and method forwards directly to WebSocketManager.

@Singleton
class RealTimeRepositoryImpl @Inject constructor(
private val webSocketManager: WebSocketManager
) : RealTimeRepository {

override val isConnected: StateFlow<Boolean> get() = webSocketManager.isConnected
override val connectionError: StateFlow<String?> get() = webSocketManager.connectionError
override val positionUpdates: Flow<Position> get() = webSocketManager.positionUpdates
// ... remaining flows delegate identically

override fun connect() = webSocketManager.connect()
override fun disconnect() = webSocketManager.disconnect()
}

WebSocket DTOs

Located at data/remote/dto/WebSocketDtos.kt. These DTOs differ from the HTTP API DTOs because WebSocket messages come from different backend services with varying serialization conventions (camelCase vs snake_case).

DTOJSON namingDomain target
WsPositionDtocamelCasePosition
WsGeofenceDtosnake_case (@SerializedName)Geofence
WsCoordinateDtoAccepts both lat/lng and latitude/longitudeHelper for WsGeofenceDto
WsNotificationDtosnake_case (@SerializedName)NotificationHistoryItem
WsShareInviteReceivedDtosnake_case (@SerializedName)ShareInviteReceivedEvent
WsShareAcceptedDtosnake_case (@SerializedName)ShareAcceptedEvent
WsPermissionsDtocamelCaseSharePermissions (private helper)

WebSocketMapper

Located at data/mappers/WebSocketMapper.kt. A @Singleton Hilt-injected class that converts WebSocket DTOs to domain entities.

@Singleton
class WebSocketMapper @Inject constructor() {
fun toDomain(dto: WsPositionDto): Position
fun toDomain(dto: WsGeofenceDto): Geofence
fun toDomain(dto: WsNotificationDto): NotificationHistoryItem
fun toDomain(dto: WsShareInviteReceivedDto): ShareInviteReceivedEvent
fun toDomain(dto: WsShareAcceptedDto): ShareAcceptedEvent
}
MethodDomain targetNotes
toDomain(WsPositionDto)PositionMerges batteryLevel into attributes map
toDomain(WsGeofenceDto)GeofenceResolves WsCoordinateDto to Coordinate; maps type string to GeofenceType enum
toDomain(WsNotificationDto)NotificationHistoryItemGenerates a negative synthetic ID when id is null
toDomain(WsShareInviteReceivedDto)ShareInviteReceivedEventParses expiresAt with 7-day fallback
toDomain(WsShareAcceptedDto)ShareAcceptedEventMaps nested WsPermissionsDto to SharePermissions

Timestamp parsing

WebSocketMapper uses a two-stage parse: Instant.parse() first, then DateTimeFormatter.ISO_DATE_TIME as fallback. On total failure it logs a warning and returns Instant.now().

RealTimeDataBridge

Singleton at core/data/RealTimeDataBridge.kt. Bridges WebSocket events from RealTimeRepository into the various DataStore singletons so the rest of the app reacts to real-time changes through a single reactive state layer.

Constructor

@Singleton
class RealTimeDataBridge @Inject constructor(
private val realTimeRepository: RealTimeRepository,
private val deviceDataStore: DeviceDataStore,
private val geofenceDataStore: GeofenceDataStore,
private val notificationDataStore: NotificationDataStore,
private val inviteDataStore: InviteDataStore,
private val sharingDataStore: SharingDataStore,
private val lazyDeviceRepository: Lazy<DeviceRepository>
)

DeviceRepository is injected via Dagger Lazy to break a circular dependency.

Lifecycle

MethodDescription
start()Calls realTimeRepository.connect(), sets up all flow collectors, and initializes pending-update queues. No-ops if already running.
stop()Disconnects the WebSocket, cancels the coroutine scope, resets state. Called on logout.
reconnect()Disconnects then immediately reconnects. Used after claiming a device or accepting a share invite so the server picks up new permissions.
isActive()Returns true if the bridge is currently running.

Pending-update queues

Position and device-status updates can arrive before DeviceDataStore is initialized (before the initial HTTP device list loads). The bridge queues them in CopyOnWriteArrayList collections and drains them once deviceDataStore.isInitialized emits true.

Event routing

FlowTarget DataStoreAction
positionUpdatesDeviceDataStoreupdatePosition(position)
deviceStatusUpdatesDeviceDataStoreupdateDeviceStatus(deviceId, status)
geofenceCreatedGeofenceDataStoreaddGeofence(geofence)
geofenceUpdatedGeofenceDataStoreupdateGeofence(geofence)
geofenceDeletedGeofenceDataStoreremoveGeofence(geofenceId)
notificationReceivedNotificationDataStoreaddNotification(notification, emitEvent = true)
deviceSettingsChangedDeviceDataStoreFetches full device via DeviceRepository.getDevice(), then calls updateDevice(device)
shareRevokedDeviceDataStore + GeofenceDataStoreremoveDevice(deviceId) and invalidateDevice(deviceId)
shareInviteReceivedInviteDataStoreaddInvite(event)
shareAcceptedSharingDataStoreonShareAccepted(event)
isConnected(logging only)Logs connection state transitions

Use Cases

Located at domain/usecases/realtime/RealTimeUseCases.kt. Thin single-responsibility wrappers around RealTimeRepository.

Use caseReturn typeDelegates to
ObservePositionUpdatesUseCaseFlow<Position>realTimeRepository.positionUpdates
ObserveDeviceStatusUpdatesUseCaseFlow<Pair<Int, String>>realTimeRepository.deviceStatusUpdates
ObserveEventUpdatesUseCaseFlow<Pair<Int, String>>realTimeRepository.eventUpdates
ObserveConnectionStateUseCaseStateFlow<Boolean>realTimeRepository.isConnected
ManageConnectionUseCaseconnect() / disconnect()realTimeRepository.connect() / .disconnect()

RealTimeDevicesInteractor

Located at domain/usecases/realtime/RealTimeDevicesInteractor.kt. A facade that bundles the device-related use cases for convenient injection into ViewModels.

class RealTimeDevicesInteractor @Inject constructor(
private val observePositionUpdatesUseCase: ObservePositionUpdatesUseCase,
private val observeDeviceStatusUpdatesUseCase: ObserveDeviceStatusUpdatesUseCase,
private val observeConnectionStateUseCase: ObserveConnectionStateUseCase,
private val manageConnectionUseCase: ManageConnectionUseCase
)
MethodReturn type
observePositionUpdates()Flow<Position>
observeDeviceStatusUpdates()Flow<Pair<Int, String>>
observeConnectionState()StateFlow<Boolean>
connect()Unit
disconnect()Unit