02 · EDA Práctico

Hoy, nos centramos en la robustez y la observabilidad de estos sistemas, utilizando RabbitMQ con Node.js. ¿Qué pasa cuando las cosas fallan? ¿Cómo evolucionamos nuestros eventos sin romper todo? ¿Y cómo sabemos qué está pasando dentro de nuestro flujo de eventos?

Configuración del Entorno Práctico (10 min):

  1. Asegúrense de tener Docker y Node.js (v20+ recomendado) instalados.

  2. Clonen el repositorio base para los ejercicios de hoy: git clone [URL_DEL_REPO_DIA_2].

  3. Dentro del repositorio, ejecuten docker-compose up -d para iniciar RabbitMQ en un contenedor. Esto nos dará una instancia limpia y lista para usar.

docker-compose.yml:

services:
  rabbitmq:
    image: rabbitmq:4.1-management # Usamos una versión con interfaz de gestión
    container_name: eda_rabbitmq_dia2
    ports:
      - "5672:5672" # Puerto AMQP para la aplicación
      - "15672:15672" # Puerto para la interfaz de gestión web
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq/
volumes:
  rabbitmq_data:
  1. Crear una nueva carpeta e iniciar proyecto Node.js: npm init -y.

  2. Instalar las dependencias del proyecto Node.js: npm install amqplib.

  3. Pueden acceder a la interfaz de gestión de RabbitMQ en http://localhost:15672 (user: user, pass: password) para ver exchanges, colas y mensajes.


Módulo 1: Manejo Avanzado de Errores y Retries en RabbitMQ

En un mundo ideal, los mensajes siempre se procesan correctamente. En la realidad, las redes fallan, los servicios externos no responden, los datos pueden ser inesperados. Necesitamos estrategias para manejar estos fallos con gracia.

Distinguiremos dos tipos principales de fallos:

  • Transitorios: Errores temporales (ej. timeout de red, bloqueo momentáneo de BD). Estos suelen resolverse con reintentos.

  • Permanentes: Errores que no se resolverán reintentando con el mismo mensaje (ej. evento con formato corrupto, bug en el código del consumidor que no maneja un caso específico). Estos mensajes deben apartarse para no bloquear el procesamiento de otros.

1.1 Poison Queues / Dead-Letter Exchanges (DLX):

  • Cuando un mensaje no puede ser procesado después de ciertos intentos, o debido a un error permanente, no queremos que se pierda ni que atasque la cola principal. RabbitMQ nos ofrece el concepto de 'Dead-Letter Exchange' (DLX).

  • Una cola puede configurarse para enviar mensajes 'muertos' (rechazados o expirados) a un DLX. Este DLX, a su vez, puede enrutar estos mensajes a una cola específica, comúnmente llamada 'Poison Queue' o 'Dead-Letter Queue' (DLQ).

  • Estos mensajes en la DLQ pueden ser inspeccionados manualmente, re-procesados después de una corrección, o archivados.

Diagrama de Flujo DLX:

spinner

Ejercicio: Configurando una DLX:

Crear un productor que envíe un mensaje, y un consumidor que lo rechace sistemáticamente, enviándolo a una DLQ.

Pasos:

  1. En el proyecto base, modificar o crear un script (ejercicios/ej1-dlx/setup_dlx.ts) para declarar mediante amqplib:

    • Un exchange principal: main_exchange (tipo direct).

    • Una cola de trabajo: work_queue, bindeada a main_exchange con routing key work.

    • Un DLX: dlx_exchange (tipo fanout es simple para esto).

    • Una DLQ: dead_letter_queue, bindeada a dlx_exchange.

    • Configurar work_queue para que use dlx_exchange como su deadLetterExchange.

Snippet de ayuda para la configuración de la cola:

Ejercicio:

  1. Modificar el consumidor (ejercicios/ej1-dlx/consumer.ts) para que siempre rechace el mensaje sin reencolarlo (channel.nack(msg, false, false)).

  2. Usar el productor base para enviar un mensaje a main_exchange con routing key work.

  3. Ejecutar el consumidor y verificar en la interfaz de gestión de RabbitMQ (http://localhost:15672) que el mensaje termina en dead_letter_queue.

1.2 Estrategias de Retry (Exponencial con Límite):

Para fallos transitorios, reintentar es una buena estrategia. Pero reintentar inmediatamente puede no ser útil y puede sobrecargar el sistema o el recurso que falló.

Un 'backoff exponencial' es una técnica común: el tiempo de espera entre reintentos aumenta exponencialmente (ej. 1s, 2s, 4s, 8s...). Esto da tiempo a que el problema transitorio se resuelva.

También es crucial tener un límite de reintentos para evitar bucles infinitos y, eventualmente, mover el mensaje a una DLQ si sigue fallando.

En RabbitMQ, una forma de implementar retries con delay es usando una combinación de TTL en mensajes y DLXs, o mediante plugins como rabbitmq_delayed_message_exchange. Para simplificar hoy, simularemos el delay y el conteo de reintentos en las cabeceras del mensaje, republicando a una cola de reintentos o, para este ejercicio, controlando directamente en el consumidor y usando nack para enviar a DLX después de N intentos.

El código provisto en el prompt inicial es una buena base:

Nota importante para el ejercicio: Para simular reintentos sin un sistema de delay complejo, haremos que el consumidor falle algunas veces y use nack(msg, false, true) para reencolar inmediatamente. Esto no es una buena práctica en producción por el riesgo de bucles rápidos, pero nos permitirá ver el contador de intentos. El último intento usará nack(msg, false, false) para enviarlo a la DLX configurada.

Ejercicio: Implementando Consumidor con Reintentos y DLX:

Modificar el consumidor para que simule fallos transitorios, reintente un número limitado de veces, y luego envíe el mensaje a la DLQ.

Pasos:

  1. Asegurarse de tener la work_queue y la dlx_exchange/dead_letter_queue del ejercicio anterior configuradas.

  2. En un nuevo archivo (ejercicios/ej2-retry/consumer_retry.ts), implementen un consumidor para work_queue.

  3. Dentro del callback del consumidor:

  • Obtengan el número de intento actual de msg.properties.headers['x-attempt'] (inicien en 1 si no existe).

  • Simulen un fallo (ej. throw new Error('Simulated transient error')).

  • Si attempt < MAX_RETRIES (ej. MAX_RETRIES = 3):

    • Incrementen x-attempt en las cabeceras.

    • Hagan channel.basicPublish(msg.fields.exchange, msg.fields.routingKey, msg.content, { headers: msg.properties.headers }) para republicar el mensaje con el intento actualizado.

    • Luego, hagan channel.ack(msg) para confirmar el mensaje original (ya que lo hemos republicado).

    • "Esto simula un sistema de reintento donde el mensaje se vuelve a encolar para ser procesado más tarde (idealmente con delay)."

  • Si attempt >= MAX_RETRIES:

    • Impriman un mensaje de "Máximos reintentos alcanzados".

    • Hagan channel.nack(msg, false, false) para enviar el mensaje a la DLX.

  1. Envíen un mensaje a work_queue.

  2. Observen los logs del consumidor y cómo el mensaje eventualmente llega a dead_letter_queue después de los reintentos.

Snippet de ayuda (consumidor):


Módulo 2: Versionado Evolutivo de Eventos

Los sistemas evolucionan, y con ellos, los esquemas de nuestros eventos. Publicar una nueva versión de un evento no puede significar que todos los consumidores de versiones antiguas dejen de funcionar. Necesitamos estrategias para un versionado evolutivo.

Principales estrategias:

2.1 Tolerant Reader (Lector Tolerante):

Los consumidores ignoran campos que no conocen. Si añades nuevos campos opcionales a un evento, los consumidores antiguos seguirán funcionando.

  • Cuándo usar: Adición de campos no críticos, cambios retrocompatibles.

  • Ventaja: Simple de implementar.

2.2 Up-caster (Transformador Ascendente):

Cuando tienes un cambio estructural en el evento (ej. renombrar un campo, cambiar su estructura) que rompería a los consumidores antiguos, pero puedes transformar la versión antigua del evento a la nueva.

El consumidor, al recibir un evento, primero verifica su versión. Si es una versión antigua, un 'up-caster' transforma el payload del evento al formato de la versión más reciente (o la que el consumidor espera) antes de procesarlo.

  • Cuándo usar: Cambios estructurales donde la transformación es posible y lógica.

  • Ventaja: Los consumidores solo necesitan conocer el esquema más reciente (o el que soportan).

spinner

2.3 Parallel Topic / Parallel Stream (Tópico Paralelo):

Para cambios mayores o incompatibles donde un up-caster es demasiado complejo o no tiene sentido, puedes introducir una nueva versión del evento en un tópico/stream completamente nuevo (ej. OrderCompleted.v1 y OrderCompleted.v2 en topics diferentes).

Los productores empiezan a publicar en el nuevo tópico. Los consumidores migran a su propio ritmo para consumir del nuevo tópico. Los consumidores antiguos siguen consumiendo del tópico antiguo hasta que puedan ser deprecados.

  • Cuándo usar: Rupturas mayores, rediseño completo del evento.

  • Ventaja: Aislamiento claro, migración gradual. Desventaja: Gestión de múltiples tópicos/versiones.


Deep Dive en Up-casters:

  • Explicación y Ejemplo de Códig:

    • "Un up-caster es una función simple que toma el payload de un evento antiguo y lo transforma al nuevo. Puede haber una cadena de up-casters si hay múltiples versiones intermedias (v1->v2, v2->v3)."

    • "Revisemos el ejemplo del prompt:"

En el pipeline del consumidor, antes de pasar el evento al handler principal, se pasa por el up-caster.


Ejercicio Práctico 3: Implementando un Consumidor con Up-caster:

Crear un consumidor que pueda procesar dos versiones de un evento TaskAssigned, donde V2 añade un campo priority.

Pasos:

  1. Definan interfaces para TaskAssignedV1 { taskId: string, assignedTo: string, version: 1 } y TaskAssignedV2 { taskId: string, assignedTo: string, priority: 'high' | 'low', version: 2 }.

  2. Creen una función up-caster upcastTaskAssigned(event: any): TaskAssignedV2 que transforme un V1 a V2 (asumiendo una priority: 'low' por defecto para V1).

  3. En ejercicios/ej3-upcaster/consumer_upcast.ts, creen un consumidor.

  4. Dentro del consumidor, parseen el mensaje, apliquen el up-caster, y luego procesen el evento V2 (ej. impriman sus campos).

  5. Usen el productor para enviar instancias de TaskAssignedV1 y TaskAssignedV2 (como JSON strings).

  6. Verifiquen que el consumidor procesa ambas versiones correctamente, mostrando la prioridad (ya sea la por defecto o la especificada).

Snippet de ayuda (up-caster):


Módulo 3: Trazabilidad y Observabilidad Básica con OpenTelemetry

  • Narrativa y Conceptos:

    • "A medida que nuestros sistemas EDA crecen, entender el flujo de un evento a través de múltiples servicios se vuelve complejo. ¿Dónde se originó un problema? ¿Cuánto tiempo tarda cada paso?"

    • "La observabilidad nos da las herramientas para responder esto. Tres pilares clave:"

      • Logs: Ya los conocemos. Útiles, pero a veces insuficientes para seguir flujos.

      • Métricas: Agregaciones numéricas sobre el tiempo (ej. eventos_procesados_por_segundo, latencia_p95_consumo).

      • Trazas Distribuidas: Siguen una solicitud/evento a medida que viaja por diferentes servicios, mostrando la relación causal y los tiempos.

    • "OpenTelemetry (OTel) es un estándar abierto y un conjunto de herramientas para instrumentar nuestras aplicaciones y generar telemetría (trazas, métricas, logs)."

  • Introducción a OpenTelemetry con RabbitMQ (Node.js):

    • Instrumentación Básica:

      • "OTel proporciona SDKs e instrumentaciones automáticas para bibliotecas comunes. Para Node.js, instrumentation-amqplib puede capturar automáticamente operaciones de RabbitMQ (publicar, consumir) como 'spans' (unidades de trabajo en una traza)."

      • "Necesitaremos un 'exporter' para enviar esta telemetría a un backend de observabilidad (ej. Jaeger, Zipkin, Prometheus, o un colector OTel que luego enruta)."

Para este módulo, podemos añadir un colector OpenTelemetry a nuestro docker-compose.yml."

Docker Compose (añadir OTel Collector):

otel-collector-config.yaml (ejemplo muy básico):

  • Propagación de Contexto:

    • "Para que una traza abarque múltiples servicios (productor -> broker -> consumidor), el 'contexto de la traza' (trace ID, span ID) debe propagarse. Generalmente se hace a través de cabeceras de mensajes (ej. traceparent del estándar W3C Trace Context)."

    • "La instrumentación de amqplib a menudo intenta hacer esto automáticamente. Al publicar, inyecta cabeceras. Al consumir, las extrae."

    • "La idea es que el span del consumidor sea hijo del span del productor, formando una traza completa."

  • Demostración/Ejercicio Ligero Opcional:

    • Objetivo: (Si el tiempo y la configuración lo permiten) Ver una traza simple de un mensaje RabbitMQ.

    • Pasos:

      1. "Asegúrense de que el otel-collector esté configurado en Docker Compose y corriendo."

      2. "En el proyecto, creen un archivo otel-setup.ts con la configuración del SDK (como el snippet anterior). Asegúrense de llamar a sdk.start() al inicio de su productor y consumidor."

      3. "Añadan un service.name en la configuración del SDK para identificar sus servicios."

      4. "Ejecuten el productor para enviar un mensaje, y luego el consumidor para procesarlo."

      5. "Observen los logs del otel-collector. Deberían ver la telemetría (trazas) siendo exportada (al logging exporter)."

      6. (Avanzado si hay tiempo y un backend como Jaeger) "Si tienen Jaeger configurado, busquen la traza allí."

    • Nota: "Este ejercicio puede ser más una demostración guiada por el instructor debido al tiempo y la complejidad de la configuración inicial de OTel."

  • Ideas para Dashboards y Alertas:

    • "Una vez que tenemos métricas y trazas, podemos visualizarlas."

    • Métricas Clave para EDA:

      • Throughput de mensajes (por tipo de evento, por cola).

      • Latencia de procesamiento del consumidor (p50, p90, p95, p99).

      • Número de mensajes en colas (especialmente DLQs).

      • Tasa de errores/reintentos.

    • "El prompt original mencionaba ejemplos de paneles para Grafana y alertas:"

      • Panel 1: Throughput de mensajes por tipo de evento.

      • Panel 2: Latencia P95 por consumidor.

      • Panel 3: Tasa de reintentos por minuto.

      • Panel 4: Número de mensajes en la Poison Queue/DLQ.

      • Alerta Ejemplo: "Si la latencia P95 de un consumidor crítico supera los 500ms durante más de 5 minutos, enviar una alerta a Slack."

    • "La observabilidad es un viaje continuo. Empiecen simple y añadan más detalle a medida que lo necesiten."


Conclusión y Próximos Pasos

  • Narrativa del Instructor:

    • "¡Felicidades por completar este segundo día intensivo! Hoy hemos hecho nuestros sistemas EDA mucho más robustos y hemos sentado las bases para entender lo que sucede en ellos."

    • "Hemos aprendido a manejar errores con DLX, a implementar reintentos inteligentes, a evolucionar nuestros eventos con up-casters, y hemos tenido un primer vistazo a la crucial tarea de la observabilidad y trazabilidad con OpenTelemetry."

    • "Recuerden que la práctica constante es clave. Los animo a seguir experimentando con los ejercicios, a explorar más a fondo OpenTelemetry y a pensar cómo aplicar estos conceptos en sus propios proyectos."

  • Recursos Adicionales:

    • Documentación de amqplib, RabbitMQ, OpenTelemetry.

    • Plugins de RabbitMQ como rabbitmq_delayed_message_exchange.

  • Q&A Abierto.

Este plan para el Día 2 intenta ser práctico, centrado en RabbitMQ con Node.js, y proporcionar una progresión lógica desde el manejo de errores hasta la observabilidad. ¡Espero que sea una excelente continuación para tu taller!

Última actualización