DataStore Architecture
The app uses a set of in-memory and persistent DataStores as the single source of truth for all client-side state. Each DataStore exposes Kotlin StateFlow / SharedFlow properties so ViewModels receive updates reactively from both HTTP API responses and real-time WebSocket events.
```mermaid
flowchart TD
HTTP["HTTP API"] --> Repository --> DataStore
WebSocket --> RealTimeDataBridge --> DataStore
DataStore -- "StateFlow / SharedFlow" --> ViewModel --> UI
Package layoutβ
| File | Package |
|---|---|
EncryptedTokenDataStore.kt | core.network |
AppPreferencesDataStore.kt | core.data |
DeviceDataStore.kt | core.data |
GeofenceDataStore.kt | core.data |
NotificationDataStore.kt | core.data |
InviteDataStore.kt | core.data |
SharingDataStore.kt | core.data |
RealTimeDataBridge.kt | core.data |
WidgetDataStore.kt | widget |
1. EncryptedTokenDataStoreβ
Location: core/network/EncryptedTokenDataStore.kt
Persists authentication tokens and user identity using Jetpack DataStore Preferences with Google Tink AES-256-GCM encryption on top.
Constructorβ
@Singleton
class EncryptedTokenDataStore @Inject constructor(private val context: Context)
Backing storeβ
private val Context.tokenDataStore: DataStore<Preferences>
by preferencesDataStore(name = "visla_tokens")
Encryptionβ
A Tink Aead primitive is initialised lazily. The keyset is stored in SharedPreferences (visla_token_prefs) and protected by an Android Keystore master key (visla_token_master_key).
private val aead: Aead by lazy {
AeadConfig.register()
try { buildAead() }
catch (e: java.security.InvalidKeyException) { resetKeystoreAndPrefs(); buildAead() }
catch (e: java.security.GeneralSecurityException) { resetKeystoreAndPrefs(); buildAead() }
}
buildAead() creates an AndroidKeysetManager with AES256_GCM template and master key URI android-keystore://visla_token_master_key.
If the keystore key is corrupted, resetKeystoreAndPrefs() deletes the Android Keystore entry, clears the keyset SharedPreferences, and clears the DataStore.
Stored fieldsβ
| Key | Preferences type | Encrypted |
|---|---|---|
access_token | stringPreferencesKey | β |
refresh_token | stringPreferencesKey | β |
user_id | intPreferencesKey | β |
user_email | stringPreferencesKey | β |
user_name | stringPreferencesKey | β |
Async access (Flow)β
| Property | Type |
|---|---|
accessToken | Flow<String?> |
refreshToken | Flow<String?> |
userId | Flow<Int?> |
userEmail | Flow<String?> |
userName | Flow<String?> |
Each Flow reads from tokenDataStore.data and decrypts inline. userId filters out the sentinel value -1.
Sync access (blocking)β
| Method | Return |
|---|---|
getAccessTokenSync() | String? |
getRefreshTokenSync() | String? |
getUserIdSync() | Int? |
getUserEmailSync() | String? |
getUserNameSync() | String? |
All implemented via runBlocking { flow.first() }. Used by interceptors / authenticators that run outside a coroutine context.
Mutatorsβ
| Method | Signature |
|---|---|
setAccessToken | suspend fun setAccessToken(token: String?) |
setRefreshToken | suspend fun setRefreshToken(token: String?) |
setUserId | suspend fun setUserId(id: Int?) |
setUserEmail | suspend fun setUserEmail(email: String?) |
setUserName | suspend fun setUserName(name: String?) |
saveAll | suspend fun saveAll(accessToken: String?, refreshToken: String?, userId: Int?, email: String?, name: String?) |
clear | suspend fun clear() |
saveAll performs a single edit transaction for all five fields. Individual setters store encrypted values or remove the key when null.
Threadingβ
No Mutex β thread-safety is provided by Jetpack DataStore's internal actor.
2. AppPreferencesDataStoreβ
Location: core/data/AppPreferencesDataStore.kt
Persists user-facing app settings (theme, language) via Jetpack DataStore Preferences.
Constructorβ
@Singleton
class AppPreferencesDataStore @Inject constructor(private val context: Context)
Backing storeβ
private val Context.appPreferencesDataStore: DataStore<Preferences>
by preferencesDataStore(name = "visla_app_preferences")
Stored fieldsβ
| Key | Type | Domain enum |
|---|---|---|
theme_mode | stringPreferencesKey | ThemeMode (SYSTEM, DARK, LIGHT) |
language | stringPreferencesKey | AppLanguage (ENGLISH("en"), ITALIAN("it")) |
Supporting enumsβ
ThemeMode β stored as "system" / "dark" / "light". Parsed via ThemeMode.fromString(value), defaults to SYSTEM.
AppLanguage β stored by ISO code. AppLanguage.fromCode(code) falls back to detectDeviceLanguage() which reads Locale.getDefault().language and defaults to ITALIAN.
Properties & methodsβ
| Member | Type / Signature |
|---|---|
themeMode | Flow<ThemeMode> |
getThemeModeSync() | fun getThemeModeSync(): ThemeMode |
setThemeMode | suspend fun setThemeMode(mode: ThemeMode) |
language | Flow<AppLanguage> |
getLanguageSync() | fun getLanguageSync(): AppLanguage |
setLanguage | suspend fun setLanguage(language: AppLanguage) |
clear | suspend fun clear() |
Threadingβ
No Mutex β thread-safety provided by Jetpack DataStore.
3. DeviceDataStoreβ
Location: core/data/DeviceDataStore.kt
In-memory single source of truth for devices and their latest positions. Merges data from HTTP API (initial load) and WebSocket (real-time updates).
Constructorβ
@Singleton
class DeviceDataStore @Inject constructor()
Threadingβ
Uses kotlinx.coroutines.sync.Mutex β all mutations are wrapped in mutex.withLock { }.
State flowsβ
| Property | Type | Description |
|---|---|---|
devicesWithPositions | StateFlow<List<DeviceWithPosition>> | Primary device list |
isInitialized | StateFlow<Boolean> | true after first API load |
Event streamsβ
| Property | Type | Buffer |
|---|---|---|
positionUpdates | SharedFlow<Position> | extraBufferCapacity = 64 |
deviceStatusUpdates | SharedFlow<Pair<Int, DeviceStatus>> | extraBufferCapacity = 64 |
Initializationβ
| Method | Signature | Notes |
|---|---|---|
initialize | suspend fun initialize(devices: List<DeviceWithPosition>) | First load; sets isInitialized = true |
setDevices | suspend fun setDevices(devices: List<DeviceWithPosition>) | Replaces list (pull-to-refresh, re-login) |
Real-time update methodsβ
| Method | Signature | Notes |
|---|---|---|
updatePosition | suspend fun updatePosition(position: Position) | Skips if latitude/longitude are null; emits to positionUpdates via tryEmit |
updateDeviceStatus | suspend fun updateDeviceStatus(deviceId: Int, status: String) | Converts via DeviceStatus.fromString; sets lastUpdate = Instant.now(); emits to deviceStatusUpdates |
updateDevice | suspend fun updateDevice(device: Device) | Replaces device details (name, icon, color, etc.) |
addDevice | suspend fun addDevice(deviceWithPosition: DeviceWithPosition) | Upserts β updates if ID already exists |
removeDevice | suspend fun removeDevice(deviceId: Int) | Filters out by ID |
Query methods (non-suspending)β
| Method | Signature |
|---|---|
getDevice | fun getDevice(deviceId: Int): Device? |
getDeviceWithPosition | fun getDeviceWithPosition(deviceId: Int): DeviceWithPosition? |
getPosition | fun getPosition(deviceId: Int): Position? |
State managementβ
| Method | Signature | Notes |
|---|---|---|
clear | suspend fun clear() | Empties list, resets isInitialized |
invalidate | fun invalidate() | Sets isInitialized = false without clearing data |
4. GeofenceDataStoreβ
Location: core/data/GeofenceDataStore.kt
In-memory single source of truth for geofences. Supports per-device lazy loading.
Constructorβ
@Singleton
class GeofenceDataStore @Inject constructor()
Threadingβ
Uses Mutex β all mutations wrapped in mutex.withLock { }.
State flowsβ
| Property | Type |
|---|---|
geofences | StateFlow<List<Geofence>> |
isInitialized | StateFlow<Boolean> |
Internal stateβ
loadedDeviceIds: MutableSet<Int> β tracks which devices have had their geofences fetched.
Initializationβ
| Method | Signature | Notes |
|---|---|---|
setGeofences | suspend fun setGeofences(geofences: List<Geofence>) | Full replacement |
setGeofencesForDevice | suspend fun setGeofencesForDevice(deviceId: Int, geofences: List<Geofence>) | Merges with existing; filters out geofences whose deviceIds contain deviceId then appends |
isDeviceLoaded | fun isDeviceLoaded(deviceId: Int): Boolean | Checks loadedDeviceIds |
CRUDβ
| Method | Signature | Notes |
|---|---|---|
addGeofence | suspend fun addGeofence(geofence: Geofence) | Upserts β updates if id already exists |
updateGeofence | suspend fun updateGeofence(geofence: Geofence) | Maps by id |
removeGeofence | suspend fun removeGeofence(id: Int) | Filters by id |
Query methodsβ
| Method | Signature |
|---|---|
getGeofencesByDevice | fun getGeofencesByDevice(deviceId: Int): List<Geofence> |
getGeofence | fun getGeofence(id: Int): Geofence? |
State managementβ
| Method | Signature | Notes |
|---|---|---|
clear | suspend fun clear() | Empties list, resets init flag, clears loadedDeviceIds |
invalidateDevice | fun invalidateDevice(deviceId: Int) | Removes from loadedDeviceIds |
invalidate | fun invalidate() | Resets init flag and clears loadedDeviceIds |
5. NotificationDataStoreβ
Location: core/data/NotificationDataStore.kt
In-memory store for notification history. Bridges HTTP API (initial/paginated load), FCM push (foreground), and future WebSocket events.
Constructorβ
@Singleton
class NotificationDataStore @Inject constructor()
Constantsβ
companion object {
const val DEFAULT_PAGE_SIZE = 10
}
Threadingβ
Uses Mutex.
State flowsβ
| Property | Type | Description |
|---|---|---|
notifications | StateFlow<List<NotificationHistoryItem>> | Newest-first |
isLoading | StateFlow<Boolean> | Loading indicator |
hasMore | StateFlow<Boolean> | Pagination flag |
unreadCount | StateFlow<Int> | Count of items where readAt == null |
Event streamβ
| Property | Type | Buffer |
|---|---|---|
newNotificationEvent | SharedFlow<NotificationHistoryItem> | extraBufferCapacity = 16 |
Initializationβ
| Method | Signature |
|---|---|
setNotifications | suspend fun setNotifications(notifications: List<NotificationHistoryItem>, hasMore: Boolean) |
appendNotifications | suspend fun appendNotifications(notifications: List<NotificationHistoryItem>, hasMore: Boolean) |
Real-time updatesβ
| Method | Signature | Notes |
|---|---|---|
addNotification | suspend fun addNotification(notification: NotificationHistoryItem, emitEvent: Boolean = true) | Deduplicates by id; prepends to list; emits newNotificationEvent |
addFromPush | suspend fun addFromPush(userId: Int, title: String, body: String, deviceId: Int?, eventType: String?, channel: String = "push") | Creates a NotificationHistoryItem with a negative temp ID and delegates to addNotification |
Mark-as-readβ
| Method | Signature |
|---|---|
markAsRead | suspend fun markAsRead(notificationId: Int) |
markAllAsRead | suspend fun markAllAsRead() |
removeNotification | suspend fun removeNotification(notificationId: Int) |
clearAll | suspend fun clearAll() |
Loading stateβ
| Method | Signature |
|---|---|
setLoading | fun setLoading(loading: Boolean) |
Query methodsβ
| Method | Signature |
|---|---|
getByType | fun getByType(eventType: String): List<NotificationHistoryItem> |
getByDevice | fun getByDevice(deviceId: Int): List<NotificationHistoryItem> |
getUnread | fun getUnread(): List<NotificationHistoryItem> |
State managementβ
| Method | Signature | Notes |
|---|---|---|
clear | suspend fun clear() | Resets all state including isLoading |
invalidate | fun invalidate() | Sets hasMore = true |
6. InviteDataStoreβ
Location: core/data/InviteDataStore.kt
In-memory store for pending sharing invites received by the current user.
Constructorβ
@Singleton
class InviteDataStore @Inject constructor()
Threadingβ
Uses Mutex.
State flowsβ
| Property | Type |
|---|---|
invites | StateFlow<List<UserInvite>> |
isInitialized | StateFlow<Boolean> |
Event streamβ
| Property | Type | Buffer |
|---|---|---|
newInviteEvent | SharedFlow<UserInvite> | extraBufferCapacity = 16 |
Methodsβ
| Method | Signature | Notes |
|---|---|---|
setInvites | suspend fun setInvites(invites: List<UserInvite>) | Sorts by expiresAt |
addInvite | suspend fun addInvite(event: ShareInviteReceivedEvent) | Deduplicates by token; constructs UserInvite from event; emits newInviteEvent |
removeInvite | suspend fun removeInvite(token: String) | Filters by token |
clear | suspend fun clear() | Empties list, resets init flag |
invalidate | fun invalidate() | Sets isInitialized = false |
getInviteCount | fun getInviteCount(): Int | Returns current list size |
7. SharingDataStoreβ
Location: core/data/SharingDataStore.kt
In-memory store for per-device sharing state (active shares and pending invites).
Constructorβ
@Singleton
class SharingDataStore @Inject constructor()
Threadingβ
Uses Mutex.
State flowsβ
| Property | Type |
|---|---|
deviceShares | StateFlow<Map<Int, DeviceSharesInfo>> |
Event streamβ
| Property | Type | Buffer |
|---|---|---|
shareAcceptedEvent | SharedFlow<ShareAcceptedEvent> | extraBufferCapacity = 16 |
Methodsβ
| Method | Signature | Notes |
|---|---|---|
setDeviceShares | suspend fun setDeviceShares(deviceId: Int, info: DeviceSharesInfo) | Adds/replaces entry in map |
getDeviceShares | fun getDeviceShares(deviceId: Int): DeviceSharesInfo? | Synchronous lookup |
onShareAccepted | suspend fun onShareAccepted(event: ShareAcceptedEvent) | Converts pending invite β active DeviceShare; removes matching pending by email (case-insensitive); upserts share by user.id; emits shareAcceptedEvent |
addPendingInvite | suspend fun addPendingInvite(deviceId: Int, invite: PendingInvite) | Appends to existing DeviceSharesInfo.pendingInvites |
removeShare | suspend fun removeShare(deviceId: Int, userId: Int) | Filters shares by user.id |
clearDevice | suspend fun clearDevice(deviceId: Int) | Removes device key from map |
clear | suspend fun clear() | Empties map |
isDeviceLoaded | fun isDeviceLoaded(deviceId: Int): Boolean | Checks containsKey |
8. WidgetDataStoreβ
Location: widget/WidgetDataStore.kt
Persists device snapshots and auth tokens for home-screen widget access. Uses plain SharedPreferences (not Jetpack DataStore) because widgets run in a separate process and need synchronous reads.
Declarationβ
object WidgetDataStore
Singleton object β no DI, no coroutines.
Backing storeβ
private const val PREFS_NAME = "visla_widget_prefs"
Retrieved via context.getSharedPreferences(PREFS_NAME, Context.MODE_PRIVATE).
Stored keysβ
| Key constant | SharedPreferences key | Type |
|---|---|---|
KEY_DEVICES | widget_devices | JSON string (List<WidgetDevice>) |
KEY_ACCESS_TOKEN | widget_access_token | String? |
KEY_REFRESH_TOKEN | widget_refresh_token | String? |
KEY_HAS_EVER_SYNCED | widget_has_ever_synced | Boolean |
WidgetDevice modelβ
data class WidgetDevice(
val id: Int,
val name: String,
val icon: String?,
val isOnline: Boolean,
val isSuspended: Boolean,
val isMuted: Boolean
)
Serialised/deserialised via Gson.
Methodsβ
| Method | Signature | Notes |
|---|---|---|
hasEverSynced | fun hasEverSynced(context: Context): Boolean | |
saveDevices | fun saveDevices(context: Context, devices: List<WidgetDevice>) | Sets KEY_HAS_EVER_SYNCED = true |
loadDevices | fun loadDevices(context: Context): List<WidgetDevice> | Returns emptyList() on parse failure |
saveAccessToken | fun saveAccessToken(context: Context, token: String?) | |
getAccessToken | fun getAccessToken(context: Context): String? | |
saveRefreshToken | fun saveRefreshToken(context: Context, token: String?) | |
getRefreshToken | fun getRefreshToken(context: Context): String? | |
updateDeviceMuteStatus | fun updateDeviceMuteStatus(context: Context, deviceId: Int, isMuted: Boolean) | Uses edit(commit = true) for synchronous write |
clear | fun clear(context: Context) |
Threadingβ
All operations are synchronous. updateDeviceMuteStatus uses commit = true (synchronous write) to guarantee data is persisted before the widget reads it.
9. RealTimeDataBridgeβ
Location: core/data/RealTimeDataBridge.kt
Orchestrates the flow of WebSocket events from RealTimeRepository into the appropriate DataStores. Acts as the wiring layer β it subscribes to every real-time event flow and routes updates to the correct store.
Constructorβ
@Singleton
class RealTimeDataBridge @Inject constructor(
private val realTimeRepository: RealTimeRepository,
private val deviceDataStore: DeviceDataStore,
private val geofenceDataStore: GeofenceDataStore,
private val notificationDataStore: NotificationDataStore,
private val inviteDataStore: InviteDataStore,
private val sharingDataStore: SharingDataStore,
private val lazyDeviceRepository: Lazy<DeviceRepository> // Dagger Lazy to break circular dep
)
Internal stateβ
| Field | Type | Description |
|---|---|---|
scope | CoroutineScope? | SupervisorJob() + Dispatchers.Default; created in start() |
isRunning | Boolean | Guard against double-start |
Lifecycleβ
| Method | Signature | Notes |
|---|---|---|
start | fun start() | Connects WebSocket, launches all collectors |
stop | fun stop() | Disconnects WebSocket, cancels scope |
reconnect | fun reconnect() | Disconnects then reconnects (e.g. after claiming a device) |
isActive | fun isActive(): Boolean | Returns isRunning |
Event routingβ
start() calls private setup methods that each launch a coroutine to collect from a RealTimeRepository flow:
| Setup method | Source flow | Target DataStore | Target method |
|---|---|---|---|
setupPositionUpdates | positionUpdates | DeviceDataStore | updatePosition(position) |
setupDeviceStatusUpdates | deviceStatusUpdates | DeviceDataStore | updateDeviceStatus(deviceId, status) |
setupGeofenceEvents | geofenceCreated | GeofenceDataStore | addGeofence(geofence) |
geofenceUpdated | GeofenceDataStore | updateGeofence(geofence) | |
geofenceDeleted | GeofenceDataStore | removeGeofence(geofenceId) | |
setupNotificationEvents | notificationReceived | NotificationDataStore | addNotification(notification, emitEvent = true) |
setupDeviceSettingsEvents | deviceSettingsChanged | DeviceDataStore | updateDevice(device) β fetches fresh device via lazyDeviceRepository.get().getDevice(deviceId) |
setupShareEvents | shareRevoked | DeviceDataStore | removeDevice(deviceId) |
GeofenceDataStore | invalidateDevice(deviceId) | ||
shareInviteReceived | InviteDataStore | addInvite(event) | |
shareAccepted | SharingDataStore | onShareAccepted(event) | |
setupConnectionStateMonitoring | isConnected | (logging only) | β |
Pending-updates queueβ
Position and device-status events that arrive before DeviceDataStore.isInitialized becomes true are buffered in CopyOnWriteArrayList queues. setupPendingUpdatesProcessor suspends on deviceDataStore.isInitialized.first { it } then drains both queues into the DataStore.
Common patternsβ
Initialization guardβ
Most in-memory DataStores expose isInitialized: StateFlow<Boolean>. Repositories set this to true after the first successful API fetch. RealTimeDataBridge waits for it before forwarding WebSocket events.
Mutex for in-memory storesβ
DeviceDataStore, GeofenceDataStore, NotificationDataStore, InviteDataStore, and SharingDataStore all protect mutations with kotlinx.coroutines.sync.Mutex. This ensures consistent state when concurrent coroutines update the same store.
Invalidate vs Clearβ
invalidate()β marks the store as stale without erasing data. The UI continues displaying cached data while a refresh is triggered.clear()β erases all data. Used on logout.
Persistent vs In-memoryβ
| Store | Persistence | Mechanism |
|---|---|---|
EncryptedTokenDataStore | Disk | Jetpack DataStore + Tink |
AppPreferencesDataStore | Disk | Jetpack DataStore |
WidgetDataStore | Disk | SharedPreferences + Gson |
| All others | In-memory | MutableStateFlow / MutableSharedFlow |