Webhook Flow
Questa pagina spiega il flusso completo dall'arrivo di un webhook fino alla pubblicazione dell'evento su Redis.
π Flusso Generaleβ
π Step by Stepβ
1. Ricezione Webhookβ
# routes/webhooks/stripe.py
@bp.route('/stripe', methods=['POST'])
def stripe_webhook():
payload = request.get_data() # Raw bytes
headers = dict(request.headers)
Γ importante ottenere il payload come bytes (request.get_data()) e non come JSON. Stripe verifica la firma sul payload originale.
2. Validazione Firmaβ
Ogni provider ha un metodo diverso:
# Stripe
stripe.Webhook.construct_event(payload, sig_header, webhook_secret)
# Apple v2
jwt.decode(signed_payload, public_key, algorithms=["ES256"])
# Google
base64.b64decode(message_data) # + optional Pub/Sub auth
3. Check Idempotenzaβ
Prima di processare, controlliamo se l'evento esiste giΓ :
# utils/idempotency.py
def ensure_idempotent(db: Session, provider: str, event_id: str):
existing = db.execute(
select(SubscriptionTransaction)
.where(SubscriptionTransaction.provider == provider)
.where(SubscriptionTransaction.event_id == event_id)
).scalar_one_or_none()
if existing:
raise DuplicateEventError(event_id, provider)
I provider possono reinviare lo stesso webhook piΓΉ volte (retry, network issues). L'idempotenza garantisce che ogni evento sia processato esattamente una volta.
4. Normalizzazione Statoβ
La state machine converte lo stato provider-specifico:
normalized_status = StateMachine.normalize(
provider="stripe",
raw_status="past_due"
)
# β SubscriptionStatus.PAST_DUE
5. Upsert Subscriptionβ
Creiamo o aggiorniamo la subscription:
subscription, old_status = service.upsert_subscription(
provider="stripe",
provider_subscription_id="sub_xxx",
user_id=123,
plan_id="price_xxx",
status=normalized_status.value,
raw_status="past_due",
current_period_end=datetime(2026, 1, 15)
)
6. Record Transactionβ
Salviamo l'evento per audit e idempotenza:
record_event(
db=db,
provider="stripe",
event_id="evt_xxx",
event_type="customer.subscription.updated",
subscription_id=subscription.id,
old_status="ACTIVE",
new_status="PAST_DUE",
raw_event=full_payload # JSONB
)
7. Publish Redis Eventβ
Se lo stato Γ¨ cambiato, pubblichiamo su Redis:
if old_status != normalized_status.value:
event_publisher.publish_subscription_updated(
user_id=123,
subscription_id=subscription.id,
old_status="ACTIVE",
new_status="PAST_DUE",
provider="stripe",
plan_id="price_xxx"
)
π¨ Formato Evento Redisβ
{
"type": "subscription_updated",
"userId": "123",
"subscriptionId": "456",
"oldStatus": "ACTIVE",
"newStatus": "PAST_DUE",
"provider": "stripe",
"planId": "price_xxx",
"timestamp": "2025-12-15T14:00:00Z"
}
Altri servizi possono consumare da billing:events:
# In altro servizio
entries = redis.xread({'billing:events': '$'}, block=5000)
for stream, messages in entries:
for msg_id, data in messages:
event = json.loads(data['event'])
if event['newStatus'] == 'EXPIRED':
# Disabilita feature premium per utente
β οΈ Error Handlingβ
Firma Invalidaβ
except ValueError as e:
logger.error(f"Webhook validation error: {e}")
return jsonify({"error": str(e)}), 400
Evento Duplicatoβ
Non Γ¨ un errore, rispondiamo 200:
if subscription is None: # DuplicateEventError was caught
return jsonify({"status": "duplicate"}), 200
User ID Mancanteβ
Se non riusciamo a identificare l'utente:
if event.user_id is None:
logger.warning(f"Could not identify user for event {event.event_id}")
return jsonify({"status": "skipped", "reason": "unknown_user"}), 200
π Retry dei Providerβ
| Provider | Retry Policy |
|---|---|
| Stripe | Esponenziale: 1h, 2h, 4h... fino a 72h |
| Apple | Non documentato, ma riprova su 5xx |
| Pub/Sub standard, riprova su non-2xx |
Rispondi sempre 200 OK piΓΉ velocemente possibile. Elabora in modo sincrono solo se veloce, altrimenti usa background jobs.