DB Persister
Il servizio DB Persister è responsabile della persistenza delle posizioni GPS validate nel database PostgreSQL.
Panoramica
Flusso di Elaborazione
1. Ricezione Messaggi
Il servizio consuma messaggi dal Redis Stream positions:validated:
{
"position": "{\"latitude\": 41.90, \"longitude\": 12.50, ...}",
"uniqueId": "019175767229"
}
2. Lookup Device
Per ogni messaggio, il servizio cerca il device nel database:
device_id = self.device_cache.get_device_id(unique_id)
La ricerca avviene nella tabella devices:
SELECT * FROM devices WHERE unique_id = '019175767229'
[!IMPORTANT] Il
unique_iddeve corrispondere esattamente a quello inviato dal dispositivo, inclusi eventuali zeri iniziali.
3. Cache dei Device
Il DeviceCache mantiene una cache in memoria per evitare query ripetute:
class DeviceCache:
def get_device_id(self, unique_id: str) -> Optional[int]:
# Prima controlla la cache in memoria
if unique_id in self._cache:
return self._cache[unique_id]
# Se non in cache, cerca nel DB
device = session.query(Device).filter(
Device.unique_id == unique_id
).first()
if device:
self._cache[unique_id] = device.id
return device.id
return None
4. Persistenza Position
Se il device viene trovato:
- Crea oggetto Position con tutti i campi dal JSON
- Inserisce nella tabella
positions - Aggiorna
devicesconposition_idelast_update
-- Inserisce la nuova position
INSERT INTO positions (device_id, latitude, longitude, ...) VALUES (...);
-- Aggiorna il device con l'ultima position
UPDATE devices
SET position_id = :new_position_id,
last_update = NOW()
WHERE id = :device_id;
Tabelle Database
Tabella devices
| Campo | Tipo | Descrizione |
|---|---|---|
id | INTEGER | ID numerico interno |
unique_id | VARCHAR | ID univoco del dispositivo (IMEI, serial, etc.) |
name | VARCHAR | Nome assegnato dall'utente |
position_id | INTEGER | FK all'ultima position |
last_update | TIMESTAMP | Ultimo aggiornamento |
Tabella positions
| Campo | Tipo | Descrizione |
|---|---|---|
id | SERIAL | Primary key |
device_id | INTEGER | FK al device |
latitude | DOUBLE | Latitudine |
longitude | DOUBLE | Longitudine |
altitude | DOUBLE | Altitudine |
speed | DOUBLE | Velocità in knots |
course | DOUBLE | Direzione in gradi |
valid | BOOLEAN | Position valida |
device_time | TIMESTAMP | Ora del dispositivo |
server_time | TIMESTAMP | Ora del server |
attributes | JSONB | Attributi aggiuntivi |
Troubleshooting
Device Not Found
Se nei log appare:
WARNING - Device not found for uniqueId: 019175767229
Verificare che il unique_id nel database corrisponda esattamente:
-- Controllare il valore attuale
SELECT unique_id FROM devices WHERE name = 'PET';
-- Correggere se necessario (es. aggiungere zero iniziale)
UPDATE devices SET unique_id = '019175767229' WHERE unique_id = '19175767229';
UPDATE device_list SET device_unique_id = '019175767229' WHERE device_unique_id = '19175767229';
Position Rejected
Se le position non arrivano al DB Persister, controllare il servizio position-filter:
reason=invalid→ Il GPS non ha un fix validoreason=zero→ Coordinate 0,0reason=future→ Timestamp nel futuroreason=past→ Timestamp troppo vecchio
Metriche Prometheus
| Metrica | Tipo | Descrizione |
|---|---|---|
db_persister_positions_total | Counter | Position persistite |
db_persister_positions_failed_total | Counter | Position fallite |
db_persister_processing_seconds | Histogram | Tempo di elaborazione |
db_persister_batch_size | Histogram | Dimensione batch |
Configurazione
Variabili ambiente in config.py:
| Variabile | Default | Descrizione |
|---|---|---|
REDIS_URL | redis://redis:6379 | URL Redis |
DATABASE_URL | - | URL PostgreSQL |
INPUT_STREAM | positions:validated | Stream di input |
CONSUMER_GROUP | db-persister-group | Nome consumer group |
BATCH_SIZE | 100 | Dimensione batch |
HEALTH_PORT | 8080 | Porta metriche |