02 · CQRS Avanzado
En esta segunda parte profundizaremos en los aspectos avanzados de Event Sourcing, comprendiendo por qué un modelo append-only es clave para auditoría y reconstrucción, cómo optimizar búsquedas en el event store, y qué estrategias seguir para versionar eventos sin perder compatibilidad.
1. Por Qué Append-Only: Caso del Mundo Real
El modelo append-only mantiene un registro inmutable de cada cambio, facilitando auditorías y la reconstrucción histórica de estados. Veremos cómo este enfoque nos protege de inconsistencias y permite detectar errores.
Ejemplo de Sistema Bancario:
En este diagrama se observa cómo cada operación (depósito, retiro, cálculo de interés) se graba secuencialmente. Al reproducir eventos hasta una fecha concreta, podemos verificar el balance preciso en cualquier momento.
Ejercicio en Grupo:
Escenario: Un cliente reclama que su balance al 15/03/2023 era $200, pero la aplicación reporta $0. El cliente detalla que realizó un depósito de $500 el 12/03/2023 y un retiro de $300 el 14/03/2023.
Objetivo: Aprender a usar el event store para rastrear el origen de la discrepancia.
Tarea:
Reconstuir el estado histórico usando los eventos:
[
{ type: 'AccountOpened', balance: 0, date: '2023-03-10' },
{ type: 'Deposit', amount: 500, date: '2023-03-12' },
{ type: 'Withdrawal', amount: 300, date: '2023-03-14' },
{ type: 'FeeCharged', amount: 200, date: '2023-03-15' }
]
¿Por qué append only?
¿Por qué no usar un modelo relacional?
¿Qué parametros faltan para tener un evento completo?
2. Diseño de Event Store: Patrones de Optimización
Almacenar millones de eventos requiere estrategias de indexación y particionado que garanticen búsquedas eficaces. Analizaremos cómo aprovechar índices GIN en PostgreSQL y cómo estructurar el schema para cargas variadas.
correlationId: En CQRS y Event Sourcing, correlationId es un identificador único que permite rastrear una operación a través de múltiples componentes del sistema distribuido, especialmente cuando una acción inicial (por ejemplo, un comando) genera múltiples eventos o desencadena otros procesos.
aggregateId: En CQRS y Event Sourcing, el aggregateId es el identificador único de una instancia de un Aggregate, por ejemplo el id de un usuario o de un pedido. Se diferencia de correlationId en que esta última es un identificador único que permite rastrear una operación a través de múltiples componentes del sistema distribuido, especialmente cuando una acción inicial (por ejemplo, un comando) genera múltiples eventos o desencadena otros procesos.
2.1 Modelado para Búsquedas
Un índice GIN sobre campos JSONB acelera consultas filtradas por propiedades específicas.
CREATE INDEX idx_event_properties ON event_store USING GIN (
(payload->'user') jsonb_path_ops,
(payload->'metadata'->'correlationId')
);
-- Ejemplo de consulta: Todos los eventos del usuario "u123" con correlationId "corr_abc"
SELECT *
FROM event_store
WHERE payload @> '{"user": "u123"}'
AND payload->'metadata'->> 'correlationId' = 'corr_abc';
Este patrón reduce dramáticamente el tiempo de respuesta al buscar dentro de cargas JSON complejas.
2.2 Ejercicio de Diseño de Tabla
Requisitos:
Manejar múltiples tipos de agregados (Orders, Users, Payments).
Búsquedas rápidas por
correlationId
yaggregateId
.Compactación mensual de eventos antiguos para archivado. (Ejemplo PostgreSQL partitioning)
Tarea:
Proponer un esquema ajustado que cumpla los requisitos.
Definir los índices necesarios y explicar su elección.
Escribir la consulta SQL para listar todas las órdenes fallidas con un
correlationId
dado.
-- TABLE
CREATE TABLE event_store (
id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- orders, users, payments
aggregate_id UUID NOT NULL,
payload JSONB NOT NULL, -- contiene toda la información del evento, incluido el correlationId
created_at TIMESTAMP WITH TIMEZONE NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- IDX AGGREGATE_ID
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, aggregate_type);
-- IDX CREATED_AT
CREATE INDEX idx_event_store_created_at ON event_store (created_at DESC);
-- IDX CORRELATION_ID
CREATE INDEX idx_event_store_correlation_id ON event_store USING GIN ((payload->>'correlationId'))
-- QUERY, correlationId and type OrderFailed
SELECT * FROM event_store
WHERE payload->>'correlationId' = '123' AND payload->>'type' = 'OrderFailed';
-- COMPACT TABLE, puede gestionarse mediante cron jobs.
CREATE TABLE event_store_2025_01 PARTITION of event_store
FOR VALUES FROM ('2025-01-01') TO ('2025-01-31');
SELECT * FROM event_store
WHERE created_at >= '2025-01-01' AND created_at =< '2025-01-31';
3. Versionado de Eventos: Estrategia de Migración
Cuando evoluciona el negocio, los contratos de eventos cambian. Mantener compatibilidad requiere migradores y versiones. Veremos cómo diseñar migraciones seguras.
3.1 Caso Real: Cambio de Moneda
Al principio todos los totales se expresaban en USD. Tras globalizar la plataforma, necesitamos soportar múltiples monedas.
// Versión Original (v1)
interface OrderCreatedV1 {
type: 'OrderCreated';
total: number; // Siempre USD
}
// Versión Nueva (v2)
interface OrderCreatedV2 {
type: 'OrderCreated';
total: number;
currency: string; // Código ISO
convertedAmount?: number;
}
// Función migradora
const migrateOrderCreated = (event: OrderCreatedV1): OrderCreatedV2 => ({
...event,
currency: 'USD',
convertedAmount: event.total
});
Al aplicar este migrador, todos los eventos v1 se actualizarán dinámicamente a la forma v2 al ser leídos.
3.2 Ejercicio de Migración
Evento Obsoleto:
interface UserAddressChangedV1 {
type: 'UserAddressChanged';
address: string; // "Street 123, City"
}
Nuevos Requisitos:
Separar en campos:
street
,city
,postalCode
.Normalizar nombres de ciudades ("NYC" → "New York City").
Tarea:
Definir la interfaz
UserAddressChangedV2
.Escribir la función migradora de v1 a v2, manejando casos sin coma (e.g. "Street123WithoutComma").
Discutir cómo registrar incidencias de datos inválidos para posterior corrección.
// Evento Original
interface UserAddressChangedV1 {
type: 'UserAddressChanged';
address: string; // "Street 123, City"
}
interface UserAddressChangedV2 {
type: 'UserAddressChanged';
street: string;
city: string;
postalCode?: string;
}
const cityMap: Record<string, string> = {
'NYC': 'New York City',
}
function migrateUserAddressChanged(eventV1: UserAddressChangedV1): UserAddressChangedV2 {
const data = eventV1.address.split(',').map(s => s.trim(''));
let [street, city, postalCode] = data;
return {
type: eventV1.type,
street,
city: cityMap[city] || city,
postalCode,
}
}
// Como manejar datos invalidos
if (data.length < 2) {
logger.error(`Dirección invalida para: '${eventV1.address}'`);
incidentService.report({
type: 'InvalidADdressForamt',
rawValue: eventV1,
correlationId: eventV1.correlationId,
timestamp: new Date().UTC()
})
throw new Error("invalid address");
}
4. Outbox Pattern: Implementación Robusta
El patrón Outbox desacopla la transacción de la base de datos de la publicación de eventos, garantizando entrega exactly-once incluso ante fallos.
4.1 Diagrama de Secuencia con Fallos
Este flujo asegura que ningún evento quede sin publicar ni se duplique.
Requisitos Clave:
Entrega exactly-once.
Gestión de desconexiones de MQ.
Registro de métricas: eventos procesados, latencia.
Priorización basada en
payload.metadata.priority
.
5. Sagas y Process Managers
Los Sagas coordinan transacciones distribuidas garantizando consistencia eventual. Veremos patrones de circuit breaker, timeouts y cómo orquestar flujos complejos.
5.1 Patrón Circuit Breaker en Sagas
El circuit breaker previene escaladas de error. Ante fallos transitorios, ejecutamos reintentos; si persisten, lanzamos compensaciones.
5.2 Implementación con Retroceso
class PaymentSaga {
async run(orderId: string) {
try {
await payOrder(orderId);
await reserveStock(orderId);
} catch (error) {
if (isTransient(error)) {
throw new SagaRetryError(); // Reintentar más tarde
}
await this.compensate(orderId); // Deshacer cambios
}
}
private async compensate(orderId: string) {
await cancelPayment(orderId);
await releaseStock(orderId);
await markOrderAsFailed(orderId);
}
}
Este patrón asegura que, si la reserva de stock falla tras el pago, revertimos la transacción completa.
6. Process Managers: Caso Avanzado
Los Process Managers mantienen estado intermedio y gestionan timeouts, ideal en flujos con múltiples dependencias y plazos.
6.1 Estado Complejo con Timeouts
interface SagaState {
id: string;
steps: {
payment: 'pending' | 'completed' | 'failed';
inventory: 'pending' | 'reserved' | 'released';
shipping: 'not_started' | 'scheduled';
};
expiresAt: Date;
retries: number;
}
class SagaTimeoutHandler {
async checkExpired() {
const expired = await db.query(
`UPDATE sagas SET status='expired' WHERE expires_at < NOW() AND status='running' RETURNING *`
);
expired.rows.forEach(saga => this.compensate(saga));
}
}
Este componente detecta sagas que superaron su plazo y aplica compensaciones.
Última actualización