Skip to main content

Database Models

Il billing-service utilizza un database PostgreSQL dedicato (billing_db) con due tabelle principali.

📊 Schema ER​


Tabella: subscriptions​

Memorizza lo stato corrente di ogni abbonamento.

Campi​

CampoTipoNullDescrizione
idSERIALNoChiave primaria
user_idINTEGERNoID utente (riferimento esterno)
providerVARCHAR(20)Nostripe, apple, google
provider_subscription_idVARCHAR(255)NoID subscription del provider
provider_customer_idVARCHAR(255)SìCustomer ID (Stripe) o originalTransactionId (Apple)
plan_idVARCHAR(100)NoID del piano/prodotto
plan_nameVARCHAR(255)SìNome descrittivo del piano
statusVARCHAR(20)NoStato normalizzato
raw_statusVARCHAR(100)SìStato originale del provider
started_atTIMESTAMPSìData inizio abbonamento
current_period_startTIMESTAMPSìInizio periodo corrente
current_period_endTIMESTAMPSìFine periodo corrente
canceled_atTIMESTAMPSìData cancellazione
created_atTIMESTAMPNoData creazione record
updated_atTIMESTAMPNoUltimo aggiornamento

Indici​

NomeCampiTipo
idx_subscriptions_user_iduser_idB-tree
idx_subscriptions_provider_idprovider, provider_subscription_idUnique

Esempio Query​

-- Ottieni abbonamento attivo per utente
SELECT * FROM subscriptions
WHERE user_id = 123
AND status IN ('ACTIVE', 'GRACE_PERIOD')
ORDER BY current_period_end DESC
LIMIT 1;

Tabella: subscription_transactions​

Audit log di tutti gli eventi webhook processati. Serve anche per idempotenza.

Campi​

CampoTipoNullDescrizione
idSERIALNoChiave primaria
subscription_idINTEGERSìFK a subscriptions
providerVARCHAR(20)NoProvider dell'evento
event_typeVARCHAR(100)NoTipo di evento
event_idVARCHAR(255)NoID univoco evento (dal provider)
old_statusVARCHAR(20)SìStato prima dell'evento
new_statusVARCHAR(20)SìStato dopo l'evento
raw_eventJSONBSìPayload completo per debug
event_timestampTIMESTAMPSìQuando è avvenuto l'evento
processed_atTIMESTAMPNoQuando è stato processato

Indici​

NomeCampiTipo
idx_transactions_idempotencyprovider, event_idUnique
idx_transactions_subscriptionsubscription_idB-tree

Come funziona l'idempotenza​

  1. Arriva un webhook con event_id = "evt_123"
  2. Prima di processare, controlliamo se esiste già in subscription_transactions
  3. Se esiste → restituiamo 200 OK senza ri-processare
  4. Se non esiste → processiamo e inseriamo il record
# utils/idempotency.py
def ensure_idempotent(db, provider, event_id):
existing = db.query(SubscriptionTransaction).filter(
SubscriptionTransaction.provider == provider,
SubscriptionTransaction.event_id == event_id
).first()

if existing:
raise DuplicateEventError(event_id, provider)

SQLAlchemy Models​

from sqlalchemy import String, Integer, DateTime, ForeignKey, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase, relationship


class Base(DeclarativeBase):
pass


class Subscription(Base):
__tablename__ = 'subscriptions'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(Integer, nullable=False, index=True)

provider: Mapped[str] = mapped_column(String(20), nullable=False)
provider_subscription_id: Mapped[str] = mapped_column(String(255), nullable=False)
provider_customer_id: Mapped[str | None] = mapped_column(String(255))

plan_id: Mapped[str] = mapped_column(String(100), nullable=False)
plan_name: Mapped[str | None] = mapped_column(String(255))

status: Mapped[str] = mapped_column(String(20), nullable=False)
raw_status: Mapped[str | None] = mapped_column(String(100))

# ... timestamps

__table_args__ = (
Index('idx_subscriptions_provider_id', 'provider', 'provider_subscription_id', unique=True),
)

@property
def is_active(self) -> bool:
return self.status in ['ACTIVE', 'GRACE_PERIOD']


class SubscriptionTransaction(Base):
__tablename__ = 'subscription_transactions'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
subscription_id: Mapped[int | None] = mapped_column(ForeignKey('subscriptions.id'))

provider: Mapped[str] = mapped_column(String(20), nullable=False)
event_type: Mapped[str] = mapped_column(String(100), nullable=False)
event_id: Mapped[str] = mapped_column(String(255), nullable=False)

old_status: Mapped[str | None] = mapped_column(String(20))
new_status: Mapped[str | None] = mapped_column(String(20))

raw_event: Mapped[dict | None] = mapped_column(JSONB)

__table_args__ = (
Index('idx_transactions_idempotency', 'provider', 'event_id', unique=True),
)

Migrazione Iniziale​

Le tabelle vengono create automaticamente all'avvio del servizio:

# app.py
from db import Base, engine

Base.metadata.create_all(bind=engine)

Per ambienti di produzione, considera l'uso di Alembic per migration controllate.