Real-Time Service — Data Layer
The real-time service provides live updates over WebSocket: position changes, device status transitions, geofence mutations, notifications, and sharing events. Unlike every other data-layer service, it does not use Retrofit — it relies on an OkHttp WebSocket managed by WebSocketManager.
WebSocketManager
Singleton at data/network/WebSocketManager.kt. Owns the OkHttp WebSocket connection, JSON parsing, DTO→domain mapping (via WebSocketMapper), and exposes reactive Flow/StateFlow properties.
Constructor
@Singleton
class WebSocketManager @Inject constructor(
private val tokenManager: TokenManager,
private val mapper: WebSocketMapper
)
Connection lifecycle
| Method | Description |
|---|---|
connect() | Reads the access token from TokenManager, opens $WS_BASE_URL/api/websocket?token=…, and starts a 30 s ping timer on open. Sets a connection error and returns early if no token. |
disconnect() | Cancels reconnect/ping jobs, sends a normal-close frame, and sets isConnected = false. |
cleanup() | Cancels the coroutine scope and disconnects. Called on app teardown. |
On failure the listener schedules an automatic reconnect after 5 seconds.
Exposed flows
| Flow | Type | Emitted when |
|---|---|---|
isConnected | StateFlow<Boolean> | WebSocket opens or closes |
connectionError | StateFlow<String?> | Connection failure occurs |
positionUpdates | SharedFlow<Position> | "position" message received |
deviceStatusUpdates | SharedFlow<Pair<Int, String>> | "device_status" message received |
eventUpdates | SharedFlow<Pair<Int, String>> | "event" message received |
geofenceCreated | SharedFlow<Geofence> | "geofence_created" message received |
geofenceUpdated | SharedFlow<Geofence> | "geofence_updated" message received |
geofenceDeleted | SharedFlow<Int> | "geofence_deleted" message received |
notificationReceived | SharedFlow<NotificationHistoryItem> | "notification" message received |
deviceSettingsChanged | SharedFlow<Int> | "device_settings_changed" message received |
shareRevoked | SharedFlow<Int> | "share_revoked" message received |
shareInviteReceived | SharedFlow<ShareInviteReceivedEvent> | "share_invite_received" message received |
shareAccepted | SharedFlow<ShareAcceptedEvent> | "share_accepted" message received |
All SharedFlows use extraBufferCapacity (64 for high-frequency position/status/event, 16 for the rest) so slow collectors don't block the WebSocket thread.
Message handling
Incoming text frames are JSON objects with { "type": "…", "data": {…} }. The handleMessage method dispatches on type:
when (type) {
"position" -> handlePositionUpdate(data)
"device_status" -> handleDeviceStatus(data)
"event" -> handleEvent(data)
"geofence_created" -> handleGeofenceCreated(data)
"geofence_updated" -> handleGeofenceUpdated(data)
"geofence_deleted" -> handleGeofenceDeleted(data)
"notification" -> handleNotification(data)
"device_settings_changed" -> handleDeviceSettingsChanged(data)
"share_revoked" -> handleShareRevoked(data)
"share_invite_received" -> handleShareInviteReceived(data)
"share_accepted" -> handleShareAccepted(data)
else -> Log.d(TAG, "Unknown message type: $type")
}
For messages that carry rich payloads (position, geofence_*, notification, share_*), the data JSON is deserialized into a WebSocket DTO via Gson and then mapped to a domain entity through WebSocketMapper. Simpler messages (device_status, event, device_settings_changed, share_revoked, geofence_deleted) extract fields directly from the JsonObject.
RealTimeRepository
Domain interface at domain/repositories/RealTimeRepository.kt. A thin port that mirrors the flows and lifecycle methods of WebSocketManager using domain types only.
interface RealTimeRepository {
val isConnected: StateFlow<Boolean>
val connectionError: StateFlow<String?>
val positionUpdates: Flow<Position>
val deviceStatusUpdates: Flow<Pair<Int, String>>
val eventUpdates: Flow<Pair<Int, String>>
val geofenceCreated: Flow<Geofence>
val geofenceUpdated: Flow<Geofence>
val geofenceDeleted: Flow<Int>
val notificationReceived: Flow<NotificationHistoryItem>
val deviceSettingsChanged: Flow<Int>
val shareRevoked: Flow<Int>
val shareInviteReceived: Flow<ShareInviteReceivedEvent>
val shareAccepted: Flow<ShareAcceptedEvent>
fun connect()
fun disconnect()
}
RealTimeRepositoryImpl
Implementation at data/repositories/RealTimeRepositoryImpl.kt. A pure delegate — every property and method forwards directly to WebSocketManager.
@Singleton
class RealTimeRepositoryImpl @Inject constructor(
private val webSocketManager: WebSocketManager
) : RealTimeRepository {
override val isConnected: StateFlow<Boolean> get() = webSocketManager.isConnected
override val connectionError: StateFlow<String?> get() = webSocketManager.connectionError
override val positionUpdates: Flow<Position> get() = webSocketManager.positionUpdates
// ... remaining flows delegate identically
override fun connect() = webSocketManager.connect()
override fun disconnect() = webSocketManager.disconnect()
}
WebSocket DTOs
Located at data/remote/dto/WebSocketDtos.kt. These DTOs differ from the HTTP API DTOs because WebSocket messages come from different backend services with varying serialization conventions (camelCase vs snake_case).
| DTO | JSON naming | Domain target |
|---|---|---|
WsPositionDto | camelCase | Position |
WsGeofenceDto | snake_case (@SerializedName) | Geofence |
WsCoordinateDto | Accepts both lat/lng and latitude/longitude | Helper for WsGeofenceDto |
WsNotificationDto | snake_case (@SerializedName) | NotificationHistoryItem |
WsShareInviteReceivedDto | snake_case (@SerializedName) | ShareInviteReceivedEvent |
WsShareAcceptedDto | snake_case (@SerializedName) | ShareAcceptedEvent |
WsPermissionsDto | camelCase | SharePermissions (private helper) |
WebSocketMapper
Located at data/mappers/WebSocketMapper.kt. A @Singleton Hilt-injected class that converts WebSocket DTOs to domain entities.
@Singleton
class WebSocketMapper @Inject constructor() {
fun toDomain(dto: WsPositionDto): Position
fun toDomain(dto: WsGeofenceDto): Geofence
fun toDomain(dto: WsNotificationDto): NotificationHistoryItem
fun toDomain(dto: WsShareInviteReceivedDto): ShareInviteReceivedEvent
fun toDomain(dto: WsShareAcceptedDto): ShareAcceptedEvent
}
| Method | Domain target | Notes |
|---|---|---|
toDomain(WsPositionDto) | Position | Merges batteryLevel into attributes map |
toDomain(WsGeofenceDto) | Geofence | Resolves WsCoordinateDto to Coordinate; maps type string to GeofenceType enum |
toDomain(WsNotificationDto) | NotificationHistoryItem | Generates a negative synthetic ID when id is null |
toDomain(WsShareInviteReceivedDto) | ShareInviteReceivedEvent | Parses expiresAt with 7-day fallback |
toDomain(WsShareAcceptedDto) | ShareAcceptedEvent | Maps nested WsPermissionsDto to SharePermissions |
Timestamp parsing
WebSocketMapper uses a two-stage parse: Instant.parse() first, then DateTimeFormatter.ISO_DATE_TIME as fallback. On total failure it logs a warning and returns Instant.now().
RealTimeDataBridge
Singleton at core/data/RealTimeDataBridge.kt. Bridges WebSocket events from RealTimeRepository into the various DataStore singletons so the rest of the app reacts to real-time changes through a single reactive state layer.
Constructor
@Singleton
class RealTimeDataBridge @Inject constructor(
private val realTimeRepository: RealTimeRepository,
private val deviceDataStore: DeviceDataStore,
private val geofenceDataStore: GeofenceDataStore,
private val notificationDataStore: NotificationDataStore,
private val inviteDataStore: InviteDataStore,
private val sharingDataStore: SharingDataStore,
private val lazyDeviceRepository: Lazy<DeviceRepository>
)
DeviceRepository is injected via Dagger Lazy to break a circular dependency.
Lifecycle
| Method | Description |
|---|---|
start() | Calls realTimeRepository.connect(), sets up all flow collectors, and initializes pending-update queues. No-ops if already running. |
stop() | Disconnects the WebSocket, cancels the coroutine scope, resets state. Called on logout. |
reconnect() | Disconnects then immediately reconnects. Used after claiming a device or accepting a share invite so the server picks up new permissions. |
isActive() | Returns true if the bridge is currently running. |
Pending-update queues
Position and device-status updates can arrive before DeviceDataStore is initialized (before the initial HTTP device list loads). The bridge queues them in CopyOnWriteArrayList collections and drains them once deviceDataStore.isInitialized emits true.
Event routing
| Flow | Target DataStore | Action |
|---|---|---|
positionUpdates | DeviceDataStore | updatePosition(position) |
deviceStatusUpdates | DeviceDataStore | updateDeviceStatus(deviceId, status) |
geofenceCreated | GeofenceDataStore | addGeofence(geofence) |
geofenceUpdated | GeofenceDataStore | updateGeofence(geofence) |
geofenceDeleted | GeofenceDataStore | removeGeofence(geofenceId) |
notificationReceived | NotificationDataStore | addNotification(notification, emitEvent = true) |
deviceSettingsChanged | DeviceDataStore | Fetches full device via DeviceRepository.getDevice(), then calls updateDevice(device) |
shareRevoked | DeviceDataStore + GeofenceDataStore | removeDevice(deviceId) and invalidateDevice(deviceId) |
shareInviteReceived | InviteDataStore | addInvite(event) |
shareAccepted | SharingDataStore | onShareAccepted(event) |
isConnected | (logging only) | Logs connection state transitions |
Use Cases
Located at domain/usecases/realtime/RealTimeUseCases.kt. Thin single-responsibility wrappers around RealTimeRepository.
| Use case | Return type | Delegates to |
|---|---|---|
ObservePositionUpdatesUseCase | Flow<Position> | realTimeRepository.positionUpdates |
ObserveDeviceStatusUpdatesUseCase | Flow<Pair<Int, String>> | realTimeRepository.deviceStatusUpdates |
ObserveEventUpdatesUseCase | Flow<Pair<Int, String>> | realTimeRepository.eventUpdates |
ObserveConnectionStateUseCase | StateFlow<Boolean> | realTimeRepository.isConnected |
ManageConnectionUseCase | connect() / disconnect() | realTimeRepository.connect() / .disconnect() |
RealTimeDevicesInteractor
Located at domain/usecases/realtime/RealTimeDevicesInteractor.kt. A facade that bundles the device-related use cases for convenient injection into ViewModels.
class RealTimeDevicesInteractor @Inject constructor(
private val observePositionUpdatesUseCase: ObservePositionUpdatesUseCase,
private val observeDeviceStatusUpdatesUseCase: ObserveDeviceStatusUpdatesUseCase,
private val observeConnectionStateUseCase: ObserveConnectionStateUseCase,
private val manageConnectionUseCase: ManageConnectionUseCase
)
| Method | Return type |
|---|---|
observePositionUpdates() | Flow<Position> |
observeDeviceStatusUpdates() | Flow<Pair<Int, String>> |
observeConnectionState() | StateFlow<Boolean> |
connect() | Unit |
disconnect() | Unit |