Symfony 7 come backend di dominio e Python come gateway LLM: architettura production per applicazioni B2B
Ho costruito il primo prototipo di questa architettura nel mio laboratorio di ricerca applicata il 3 febbraio 2026, per capire come un portale B2B di quoting automatico potesse integrare generazione di documenti commerciali via LLM senza mescolare le due basi di codice. Lo stack di dominio era un Symfony 7.2 su PHP 8.3 con Doctrine 3, autenticazione tramite symfony/security-bundle, database MariaDB 11 con 180 tabelle normalizzate intorno a entità di business (Cliente, Preventivo, Commessa, Listino). L'ecosistema AI invece era interamente Python: LangChain 0.3 per orchestrazione, LlamaIndex per retrieval, Anthropic SDK per Claude Sonnet 4.6, FastAPI per la superficie HTTP interna. L'hardware di laboratorio era un Hetzner EX44 (Intel Core i5-13500, 64 GB RAM DDR4, 2x NVMe 512 GB in RAID 1) con Debian 12, Docker 27 e RabbitMQ 3.13 a fare da spina dorsale della comunicazione asincrona. La tentazione iniziale è sempre la stessa: aprire un client HTTP nel controller Symfony, chiamare il servizio Python in sincrono, restituire la risposta. Dopo una settimana di test di carico, quella tentazione era seppellita da quattro tipologie di incidenti diversi - ed è da lì che è partito il design definitivo.
Perché non chiamare il servizio Python sincronamente dal controller Symfony?
Una chiamata sincrona HTTP da PHP verso un gateway LLM Python espone l'applicazione a quattro problemi che si manifestano solo sotto carico. Il primo è la latenza: un preventivo arricchito da generazione LLM impiega 20-40 secondi, durante i quali un worker PHP-FPM rimane bloccato in attesa e non può servire altre richieste. Il secondo è la propagazione dei fallimenti: se il gateway Python è lento o in timeout, l'errore risale fino al browser dell'utente B2B sotto forma di 500 o 504 anche se l'operazione principale (salvare il preventivo) era andata a buon fine. Il terzo è il coupling operativo: deploy, restart e incidenti di Python si riflettono istantaneamente sul sistema di dominio, e la postura di sicurezza del portale B2B dipende da uno stack che cambia molto più rapidamente del core business. Il quarto, il più insidioso, è l'autorizzazione: ogni chiamata diretta da PHP verso Python deve portarsi dietro un token, un contesto, un principal - il rischio di leak di credenziali inter-servizio o di over-authorization cresce in modo lineare con il numero di integration point.
La separazione asincrona via message broker risolve questi quattro problemi contemporaneamente. Il controller Symfony fa due cose in transazione: salva l'entità di dominio e dispatch-a un messaggio su una coda. Il consumer Python preleva il messaggio, esegue la pipeline LLM con i suoi tempi, persiste il risultato tornando indietro via un'altra coda. L'utente B2B non aspetta: il portale mostra "documento in generazione" con notifica push al completamento. È lo stesso principio con cui ho descritto nell'articolo FastAPI come orchestrator di LLM per backend Laravel - cambia il framework PHP, ma la forma architetturale si ripete perché il problema di fondo è lo stesso.
Se vuoi vedere come progetto architetture AI multi-stack dove ogni runtime sta al posto giusto, nel mio hub sull'integrazione AI per aziende trovi articoli che affrontano la stessa classe di decisioni su stack diversi - Laravel, Symfony, Node.js, Python - con il criterio comune di evitare vendor lock-in e mantenere i confini operativi netti.
La topologia del laboratorio: chi parla con chi, come e perché
Il disegno finale ha cinque componenti. Symfony 7.2 è il backend di dominio, unico owner del database di business. Python 3.12 con FastAPI è il gateway LLM, chiamato ai-worker, che consuma messaggi, esegue pipeline LangChain, produce risultati. RabbitMQ 3.13 è il message broker con due code principali - ai.request.generation (Symfony -> Python) e ai.response.generation (Python -> Symfony) - e una coda di dead letter per ogni direzione. Redis 7 tiene il tracking dei job pending e la presentation state del portale. Un Prometheus + Grafana raccoglie metriche da entrambi i lati per avere un cruscotto unico di latenza e tasso di errore.
+-------------+ +--------------+ +-----------+
| Browser |<------>| Symfony 7.2 |<------>| MariaDB |
| (portal) | | (domain) | | (domain) |
+-------------+ +------+-------+ +-----------+
|
| dispatch/consume
v
+--------------+
| RabbitMQ |
| ai.request |
| ai.response |
| ai.dlq.* |
+------+-------+
|
| consume/publish
v
+---------------+ +----------------+
| Python 3.12 |<------>| Anthropic API |
| (ai-worker) | | Claude Sonnet |
| LangChain | +----------------+
+---------------+Tutto il traffico di dominio passa dentro Symfony. Tutto il traffico AI passa dentro Python. Il solo punto di contatto è il broker. Questa non è eleganza architetturale fine a sé stessa: è la garanzia che un incidente nel gateway AI non compromette la disponibilità del portale di dominio - e viceversa, che un prompt injection intercettato dal consumer Python non può raggiungere la base dati del business senza passare da un message schema esplicitamente validato.
Il message schema lato Symfony Messenger
Ho definito un solo tipo di messaggio di richiesta e un solo tipo di risposta, con schema rigido e campo version esplicito. Ogni messaggio contiene anche un correlationId generato come UUID v7 al momento del dispatch, che attraversa tutto il flusso e permette correlazione nei log e nei trace.
<?php
// src/Message/GenerateQuoteDocument.php
declare(strict_types=1);
namespace App\Message;
final readonly class GenerateQuoteDocument
{
public function __construct(
public int $preventivoId,
public int $richiedenteId,
public string $tipoDocumento, // "offer-letter" | "technical-appendix"
public string $correlationId,
public int $schemaVersion = 1,
public int $costCapUsd = 2,
) {}
}L'handler Symfony non esegue logica AI: si limita a marcare lo stato del preventivo come ai_generation_in_progress, registrare il correlationId nella tabella ai_jobs e dispatchare. Il transport Messenger è configurato via DSN AMQP verso RabbitMQ con routing key distinta per tipologia di task.
<?php
// src/MessageHandler/GenerateQuoteDocumentHandler.php
declare(strict_types=1);
namespace App\MessageHandler;
use App\Message\GenerateQuoteDocument;
use App\Repository\PreventivoRepository;
use App\Service\AiJobTracker;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final class GenerateQuoteDocumentHandler
{
public function __construct(
private PreventivoRepository $preventivi,
private AiJobTracker $jobTracker,
private MessageBusInterface $aiRequestBus,
) {}
public function __invoke(GenerateQuoteDocument $message): void
{
$preventivo = $this->preventivi->find($message->preventivoId);
if ($preventivo === null || $preventivo->isBloccato()) {
$this->jobTracker->reject($message->correlationId, 'preventivo_unavailable');
return;
}
$this->jobTracker->markInProgress($message->correlationId, $message->preventivoId);
$this->aiRequestBus->dispatch($message); // -> RabbitMQ exchange ai.request
}
}La separazione tra comando di dominio (che viene eseguito nel bus locale) e dispatch esterno (che pubblica sulla coda AMQP) è deliberata: il primo gira nel processo Symfony, il secondo viene materializzato da Messenger nel broker. Se il publish su RabbitMQ fallisce, il Messenger usa il suo transport di retry; se RabbitMQ è down in modo prolungato, il messaggio finisce nel transport failed e posso riprocessarlo manualmente con messenger:failed:retry.
Il consumer Python: lettura del messaggio, pipeline LLM, persistenza del risultato
Il consumer Python usa aio-pika per il consumo asincrono dalla coda, valida il payload con Pydantic e chiama la pipeline LangChain in un ThreadPoolExecutor per non bloccare il loop durante la chiamata LLM. Alla fine pubblica il risultato su ai.response.generation.
import asyncio
import json
import aio_pika
from pydantic import BaseModel, Field
from anthropic import AsyncAnthropic
class GenerateQuoteRequest(BaseModel):
preventivoId: int = Field(..., gt=0)
richiedenteId: int = Field(..., gt=0)
tipoDocumento: str = Field(..., pattern=r"^(offer-letter|technical-appendix)$")
correlationId: str = Field(..., min_length=36, max_length=36)
schemaVersion: int = 1
costCapUsd: float = 2.0
class GenerateQuoteResponse(BaseModel):
correlationId: str
status: str
markdown: str | None = None
usageUsd: float
errorCode: str | None = None
async def handle_message(message: aio_pika.IncomingMessage, anthropic: AsyncAnthropic) -> None:
async with message.process(requeue=False):
try:
req = GenerateQuoteRequest.model_validate_json(message.body)
except ValueError as err:
await publish_response(GenerateQuoteResponse(
correlationId="unknown",
status="schema_violation",
usageUsd=0,
errorCode=str(err)
))
return
context = await fetch_preventivo_context(req.preventivoId)
prompt = build_offer_letter_prompt(context)
result = await anthropic.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=SYSTEM_PROMPT_OFFER_LETTER,
messages=[{"role": "user", "content": prompt}],
)
cost = estimate_cost_usd(result.usage, req.costCapUsd)
if cost > req.costCapUsd:
await publish_response(GenerateQuoteResponse(
correlationId=req.correlationId,
status="cost_cap_exceeded",
usageUsd=cost,
errorCode="LLM10_unbounded_consumption"
))
return
await publish_response(GenerateQuoteResponse(
correlationId=req.correlationId,
status="completed",
markdown=result.content[0].text,
usageUsd=cost
))Due scelte esplicite qui. La prima è message.process(requeue=False): se un messaggio fallisce per ragioni diverse dal timeout di rete, non lo rimettiamo in coda automaticamente - finisce nella DLQ e un operatore umano decide se ricreare o scartare. Riaccodare indiscriminatamente un messaggio malformato produce poison message loop che satura il broker. La seconda è la verifica del costCapUsd dopo la chiamata LLM: se la risposta costa più del cap, pubblico uno stato cost_cap_exceeded con codice LLM10_unbounded_consumption - il riferimento esplicito al punto LLM10 della OWASP Top 10 per LLM application 2025 rende il motivo operativamente tracciabile in audit.
Come propagare il contesto di sicurezza e il tracing tra Symfony e Python?
La risposta operativa è una combinazione di tre campi sul messaggio AMQP: un correlationId applicativo, un header traceparent W3C per il tracing distribuito, e un token di autorizzazione scoped limitato al task. Nessuno dei tre è opzionale, e ciascuno serve a un problema diverso.
Il correlationId è l'identificatore end-to-end del job. Lo genero in Symfony al momento del dispatch, lo scrivo in ai_jobs e nel log applicativo, e Python lo riceve, lo copia nei suoi log, e lo rimette nella response. Quando indago un incidente, cerco quel singolo UUID nei due sistemi e ricostruisco la timeline completa senza ambiguità. Il traceparent è il vero tracing distribuito in formato W3C: Symfony con il bundle open-telemetry/symfony-bundle genera lo span di dispatch, lo serializza in un header AMQP, Python legge l'header all'ingresso del consumer e crea uno span figlio. L'OpenTelemetry Collector raccoglie entrambi, li invia a Grafana Tempo, e sulla UI vedo la timeline del job - dispatch Symfony, coda RabbitMQ, pickup Python, chiamata Anthropic, publish risposta, consume Symfony - come un unico flusso con i tempi di ogni hop.
Il token di autorizzazione è il punto più delicato. Non uso il JWT di sessione del portale: ne genero uno nuovo al dispatch, firmato con una chiave privata Symfony dedicata al bus, con scope ai:generate:quote:{preventivoId} e scadenza di 15 minuti. Python valida il token all'ingresso del consumer, verifica la chiave pubblica ruotata settimanalmente, e rifiuta ogni messaggio con token scaduto o scope diverso. Così, anche se RabbitMQ fosse compromesso, nessun messaggio iniettato dall'attaccante potrebbe triggerare generazione autenticata: gli mancherebbe la firma corretta.
La gestione degli errori cross-stack
I fallimenti che un sistema del genere vede in produzione si dividono in quattro categorie, e ciascuna ha una strategia diversa. Un errore di schema violation è deterministico e non beneficia di retry: il messaggio va subito in DLQ con un codice esplicito, un operatore lo guarda al mattino e decide. Un errore di upstream LLM transient - timeout Anthropic, rate limit temporaneo, 503 server error - beneficia di retry esponenziale: tre tentativi a 5s, 20s, 60s, poi DLQ. Un errore di cost cap exceeded non è tecnicamente un fallimento: è un segnale di governance che va persistito senza retry, e il portale mostra all'utente B2B un messaggio "il documento richiesto supera il budget AI mensile, contatta l'amministratore". Un errore di prompt injection suspected - rilevato da un filtro regex sul payload di input o da un canary token che l'LLM non doveva mai produrre - bloccca il job, notifica il team di sicurezza e congela il preventivo in uno stato manualmente sbloccabile.
Sul lato Symfony, il consumer della risposta applica la stessa logica al contrario. Se arriva una response status: completed, persiste il Markdown sulla tabella ai_documenti_generati legata al preventivoId, sblocca il preventivo e notifica l'utente via Mercure o WebSocket. Se arriva un fallimento definitivo, aggiorna lo stato del job e lo rende visibile nel backoffice con il codice errore e il correlationId cliccabile per il deep-link al trace Grafana.
Il deployment containerizzato con Docker Compose
L'intero laboratorio gira in sei container su un singolo host tramite Compose. La configurazione ridotta all'osso è questa.
services:
symfony:
image: myorg/symfony-portal:7.2-php8.3
environment:
MESSENGER_TRANSPORT_DSN: amqp://app:${RMQ_PASS}@rabbitmq:5672/%2f/ai.request
AI_JOB_TOKEN_PRIVATE_KEY: /run/secrets/ai_token_priv
secrets: [ai_token_priv]
depends_on: [mariadb, rabbitmq]
ai-worker:
image: myorg/ai-worker:python3.12
environment:
AMQP_URL: amqp://app:${RMQ_PASS}@rabbitmq:5672/%2f
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
secrets: [ai_token_pub]
deploy:
replicas: 3
rabbitmq:
image: rabbitmq:3.13-management-alpine
volumes: ["./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json:ro"]
mariadb:
image: mariadb:11
volumes: ["mariadb_data:/var/lib/mysql"]
otel-collector:
image: otel/opentelemetry-collector-contrib:0.110.0
volumes: ["./otel-collector.yaml:/etc/otelcol-contrib/config.yaml:ro"]
secrets:
ai_token_priv: { file: ./secrets/ai_token_priv.pem }
ai_token_pub: { file: ./secrets/ai_token_pub.pem }Tre pattern vanno letti con attenzione. Le chiavi del token di autorizzazione sono passate via secrets Docker, non variabili d'ambiente: su Linux vengono montate come tmpfs leggibile solo dal processo. Il deploy.replicas: 3 sul ai-worker è ciò che permette di scalare orizzontalmente il consumer senza toccare Symfony: se la coda si riempie, aggiungo replica e il broker distribuisce il lavoro round-robin. Il file rabbitmq/definitions.json contiene la topologia di exchange, queue e DLQ in forma dichiarativa, checkata in git: non esiste configurazione RabbitMQ fatta a mano via UI - tutto passa dalla definizione versionata.
Quando questo pattern non è la scelta giusta
Se la tua applicazione Symfony genera meno di 50 documenti AI al giorno, un'architettura con broker dedicato, OpenTelemetry e replica dei consumer è over-engineering: basta un comando app:generate-documents lanciato ogni 15 minuti da cron con chiamata sincrona al gateway Python e persistenza diretta. Se il tuo caso d'uso è conversazionale - chat che streama token al browser - il pattern giusto è un altro, ed è quello che ho descritto nell'articolo sullo streaming real-time di LLM con Node.js e TypeScript: RabbitMQ aggiunge latenza di enqueue-dequeue incompatibile con requisiti di time-to-first-token sotto il secondo. Se il tuo stack è già fatto interamente in Python - Django o FastAPI nel dominio - non c'è motivo di aggiungere Symfony: tieni tutto in Python.
Il pattern Symfony-di-dominio-Python-di-AI con broker asincrono si giustifica in tre scenari concreti. Il primo: una codebase Symfony consolidata con dominio business complesso che non vuoi smontare ma che deve aggiungere funzioni AI asincrone su volumi significativi (centinaia di job giornalieri). Il secondo: un requisito di compliance che impone di isolare il codice che tratta dati personali dal codice che chiama LLM esterni - con la separazione stack, il flusso di dati esce dal perimetro di dominio solo dove passa dal broker, rendendo auditabile a granularità di schema ogni byte che lascia il sistema. Il terzo: un team con competenze asimmetriche, PHP senior sul backend e ingegneri ML su Python, dove la separazione permette di mantenere la velocità di sviluppo di entrambi senza vincolare l'uno all'altro.
Un portale B2B che integra generazione AI non si distingue dalla concorrenza per il modello che usa - Claude, GPT, Gemini cambiano ogni sei mesi - ma per la robustezza con cui quel modello è integrato nella business logic. La differenza tra una demo da conferenza e un sistema che regge in produzione sta tutta in dettagli invisibili all'utente finale: correlationId sui messaggi, DLQ che non silenziano i fallimenti, cost cap che impediscono denial-of-wallet, tracing distribuito che permette di indagare un incidente in minuti invece che in giorni. Sono gli stessi dettagli che distinguono un back office che si sblocca da solo dalla classica frase del titolare a settembre: "il sistema AI è bloccato da due settimane e nessuno capisce perché".
Se stai progettando un portale B2B - quoting, tender, commesse, procurement - che integra generazione di documenti o classificazione semantica via LLM, e vuoi capire se il pattern broker-asincrono è giustificato dal tuo volume e dai tuoi requisiti di compliance, il modulo di preventivo gratuito ti dà una prima lettura in 7 domande, 2 minuti. Ti dico se il tuo progetto rientra nelle cose che so fare bene e, se il caso richiede un profilo diverso, te lo dico e ti indico una direzione utile quando posso.