El patrón Sagas es una solución eficaz para gestionar transacciones distribuidas en microservicios. Aquí te guiamos paso a paso para implementar este patrón usando MassTransit, Azure Service Bus, y ASP.NET Core, en un escenario de transferencia bancaria.
¿Qué es el Patrón Sagas?
El patrón Sagas es un patrón de diseño para gestionar transacciones que abarcan múltiples servicios en una arquitectura de microservicios. Permite descomponer una transacción grande en múltiples transacciones más pequeñas que se ejecutan en cada servicio. Cada una de estas transacciones es independiente y puede deshacer su acción si ocurre un error, lo que es fundamental en arquitecturas distribuidas.
¿Por Qué Usar el Patrón Sagas en Microservicios?
Cuando trabajas con microservicios, gestionar transacciones distribuidas es un reto. El patrón Sagas resuelve este problema al dividir las transacciones en pasos más pequeños, manteniendo consistencia sin necesidad de bloqueos. Así, cada paso en el flujo de trabajo se confirma o se revierte en caso de fallo.
¿Qué es MassTransit?
MassTransit es una biblioteca open source para .NET que facilita la integración de microservicios mediante el uso de mensajes. Esencialmente, es un framework de mensajería que simplifica la comunicación asincrónica entre servicios. Con MassTransit, puedes implementar patrones como mensajes de eventos y colas de comandos, que son ideales para arquitecturas distribuidas como los microservicios.
¿Por Qué Usamos MassTransit?
Vamos a usar MassTransit porque proporciona una forma sencilla de manejar la comunicación entre microservicios en el contexto de una saga. Permite abstraer los detalles de implementación del middleware de mensajería, haciéndolo compatible con varios sistemas como Azure Service Bus, RabbitMQ y Kafka. En este caso, usaremos Azure Service Bus para gestionar los mensajes entre microservicios. Sin embargo, si en el futuro decidimos cambiar a otra plataforma de mensajería, como RabbitMQ o Kafka, podemos hacerlo fácilmente sin modificar gran parte del código, gracias a MassTransit.
¿Cómo Funciona MassTransit?
MassTransit te permite definir y consumir eventos de forma eficiente. Por ejemplo, cuando un servicio crea un pedido, puede publicar un evento que otros servicios pueden consumir para completar sus propias tareas. En términos de una saga, cada servicio escucha los eventos relevantes y actúa en consecuencia.
public class PedidoCreadoEventConsumer : IConsumer<PedidoCreadoEvent>
{
public async Task Consume(ConsumeContext<PedidoCreadoEvent> context)
{
// Procesar evento de pedido creado
}
}
Con esta arquitectura, puedes usar MassTransit para manejar la orquestación o coreografía de las sagas, asegurando que cada servicio sepa cuándo actuar.
Escenario: Transferencia Bancaria
Imagina que deseas realizar una transferencia bancaria desde tu cuenta en el Banco Lorem Ipsum a la cuenta de tu amigo en el Banco Contoso. Para llevar a cabo esta operación, necesitamos coordinar varios servicios, asegurando que la transacción sea válida y que todos los sistemas involucrados completen sus partes de forma exitosa. Este es un escenario ideal para aplicar el patrón Sagas y coordinar el flujo con un orquestador.
Paso a Paso del Escenario
-
Validación en el Banco Central de Argentina: Antes de procesar la transferencia, el Banco Central de Argentina debe verificar si se puede realizar la operación. Este paso lo maneja el microservicio ValidatorApi, que valida la solicitud de transferencia.
-
Transferencia desde Banco Lorem Ipsum: Una vez que la ValidatorApi aprueba la solicitud, el Banco Lorem Ipsum puede procesar la transferencia. Aquí, el microservicio TransferApi es responsable de debitar el dinero de tu cuenta.
-
Confirmación en Banco Contoso: Después de que se completa la transferencia, el microservicio ReceiptApi del Banco Contoso recibe la información de la transacción y acredita el dinero en la cuenta de tu amigo.
En este escenario, el orquestador central se encuentra en el Banco Lorem Ipsum. Es el responsable de coordinar las interacciones entre los servicios, asegurándose de que cada paso se complete antes de proceder al siguiente. Si en algún punto hay un error, el orquestador debe gestionar la compensación, revirtiendo los pasos anteriores para garantizar la consistencia.
¿Qué es y Para Qué Sirve el Orquestador?
El orquestador es un componente clave en el patrón Sagas que coordina la secuencia de eventos que ocurren entre los microservicios. En vez de depender de cada servicio para ejecutar su lógica de compensación, el orquestador toma el control del flujo.
Ventajas del Orquestador en el Patrón Sagas
- Centraliza la lógica de negocios: Todo el flujo de trabajo está gestionado por el orquestador, lo que facilita el control y el seguimiento del proceso.
- Flexibilidad: El orquestador es capaz de manejar casos complejos donde varios servicios dependen unos de otros y se pueden aplicar reglas de negocio específicas.
- Compensación gestionada: Si algo sale mal en un paso, el orquestador puede llamar a los métodos de compensación en cada servicio para revertir los cambios realizados hasta ese punto.
Por ejemplo, en nuestro escenario de transferencia bancaria:
- Si la ValidatorApi rechaza la transferencia, no se hará ningún cambio.
- Si el TransferApi debita el dinero pero el ReceiptApi del Banco Contoso falla, el orquestador puede deshacer la transacción en el Banco Lorem Ipsum y reintegrar el dinero a tu cuenta.
Paso a Paso para Implementar el Patrón Sagas en ASP.NET Core
Paso 1: Configurar la Solución de Microservicios
Vamos a crear en una misma Solución lo siguientes proyectos:
- Solución de Visual Studio con el nombre MicroservicesSagas
- Proyecto biblioteca de clases con el nombre MicroservicesSagas.Commons
- Proyecto Asp NET Core WebApi con el nombre MicroservicesSagas.Orchestrator
- Una nueva carpeta de Soluciones llamada Participants
- Dentro de esta nueva carpeta crear:
- Proyecto Asp NET Core WebApi con el nombre MicroservicesSagas.ValidatorApi
- Proyecto Asp NET Core WebApi con el nombre MicroservicesSagas.TransferApi
- Proyecto Asp NET Core WebApi con el nombre MicroservicesSagas.ReceiptApi
Paso 2: Instalar paquetes Nuget
En el proyecto Commons (proyecto compartido por todos los microservicios), instala los siguientes paquetes NuGet:
<PackageReference Include="MassTransit" Version="8.2.5" />
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="8.2.5" />
<PackageReference Include="MassTransit.EntityFrameworkCore" Version="8.2.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.8" />
En el proyecto Orchestrator, añade:
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="8.0.8" />
Además, agrega el proyecto Commons como dependencia a los cuatro microservicios.
Paso 3: Agregar Events.cs
en el Proyecto Commons
Define los eventos que se utilizarán en la saga de transferencia. Estos eventos representarán cada paso en el flujo de trabajo de la saga:
using MassTransit;
namespace MicroservicesSagas.Commons
{
public record SubmitTransferEvent(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record ValidateTransferCommand(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record TransferValidatedEvent(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record InvalidAmountEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
public record InvalidAccountEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
public record OtherReasonValidationFailedEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
public record CancelTransferCommand(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record TransferCanceledEvent(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record TransferNotCanceledEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
public record TransferCommand(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record TransferSucceededEvent(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record OtherReasonTransferFailedEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
public record IssueReceiptCommand(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record ReceiptIssuedEvent(Guid CorrelationId, Guid TransactionId) : CorrelatedBy<Guid>;
public record OtherReasonReceiptFailedEvent(Guid CorrelationId, Guid TransactionId, string Error) : CorrelatedBy<Guid>;
}
Paso 4: Definir el Estado de la Saga en TransferSagaState.cs
En el proyecto Orchestrator, agrega una clase para manejar el estado de la saga:
using MassTransit;
namespace MicroservicesSagas.Orchestrator
{
public class TransferSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid TransactionId { get; set; }
public DateTime CreatedAt { get; set; }
public byte[] RowVersion { get; set; }
}
}
Paso 5: Crear la Máquina de Estado (State Machine) TransferStateMachine.cs
Implementa la lógica del orquestador utilizando MassTransit y definiendo los eventos y transiciones entre los diferentes estados de la saga:
using MassTransit;
using MicroservicesSagas.Commons;
namespace MicroservicesSagas.Orchestrator
{
public class TransferStateMachine : MassTransitStateMachine<TransferSagaState>
{
public State Validating { get; private set; }
public State Transferring { get; private set; }
public State Receiving { get; private set; }
public State Failed { get; private set; }
public Event<SubmitTransferEvent> SubmitTransfer { get; private set; }
public Event<TransferValidatedEvent> TransferValidatedEvent { get; private set; }
public Event<InvalidAmountEvent> InvalidAmountEvent { get; private set; }
public Event<InvalidAccountEvent> InvalidAccountEvent { get; private set; }
public Event<OtherReasonValidationFailedEvent> OtherReasonValidationFailedEvent { get; private set; }
public Event<TransferCanceledEvent> TransferCanceledEvent { get; private set; }
public Event<TransferNotCanceledEvent> TransferNotCanceledEvent { get; private set; }
public Event<TransferSucceededEvent> TransferSucceededEvent { get; private set; }
public Event<OtherReasonTransferFailedEvent> OtherReasonTransferFailedEvent { get; private set; }
public Event<ReceiptIssuedEvent> ReceiptIssuedEvent { get; private set; }
public Event<OtherReasonReceiptFailedEvent> OtherReasonReceiptFailedEvent { get; private set; }
public TransferStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(SubmitTransfer)
.Then(x => x.Saga.CreatedAt = DateTime.Now)
.Then(x => x.Saga.TransactionId = x.Message.TransactionId)
.TransitionTo(Validating)
.Publish(context => new ValidateTransferCommand(context.Message.CorrelationId, context.Message.TransactionId))
);
During(Validating,
When(TransferValidatedEvent)
.TransitionTo(Transferring)
.Publish(context => new TransferCommand(context.Message.CorrelationId, context.Message.TransactionId)),
When(InvalidAmountEvent)
.TransitionTo(Failed)
.Publish(context => new CancelTransferCommand(context.Message.CorrelationId, context.Message.TransactionId)),
When(InvalidAccountEvent)
.TransitionTo(Failed)
.Publish(context => new CancelTransferCommand(context.Message.CorrelationId, context.Message.TransactionId))
);
During(Transferring,
When(TransferSucceededEvent)
.TransitionTo(Receiving)
.Publish(context => new IssueReceiptCommand(context.Message.CorrelationId, context.Message.TransactionId)),
When(OtherReasonTransferFailedEvent)
.TransitionTo(Failed)
.Publish(context => new CancelTransferCommand(context.Message.CorrelationId, context.Message.TransactionId))
);
During(Receiving,
When(ReceiptIssuedEvent)
.Finalize(),
When(OtherReasonReceiptFailedEvent)
.TransitionTo(Failed)
.Publish(context => new CancelTransferCommand(context.Message.CorrelationId, context.Message.TransactionId))
);
During(Failed,
When(TransferCanceledEvent)
.Finalize(),
When(TransferNotCanceledEvent)
.TransitionTo(Failed)
);
}
}
}
Paso 6: Crear carpeta Data
y Configurar TransferSagaStateMap.cs
para Entity Framework
Para persistir el estado de la saga en una base de datos SQL Server:
using MassTransit;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace MicroservicesSagas.Orchestrator.Data
{
public class TransferSagaStateMap : SagaClassMap<TransferSagaState>
{
protected override void Configure(EntityTypeBuilder<TransferSagaState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
entity.Property(x => x.TransactionId);
entity.Property(x => x.RowVersion).IsRowVersion();
}
}
}
Paso 7: Crear el DbContext en la carpeta Data
Configura el contexto de Entity Framework para trabajar con SQL Server, usando un esquema separado llamado saga:
using MassTransit.EntityFrameworkCoreIntegration;
using Microsoft.EntityFrameworkCore;
namespace MicroservicesSagas.Orchestrator.Data
{
public class TransferSagaDbContext :
SagaDbContext
{
public TransferSagaDbContext(DbContextOptions<TransferSagaDbContext> options)
: base(options)
{
}
public DbSet<TransferSagaState> Transfers => Set<TransferSagaState>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.HasDefaultSchema("saga");
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get { yield return new TransferSagaStateMap(); }
}
}
}
Paso 8: Configurar los AppSettings.json
Agrega las cadenas de conexión a los archivos appsettings.json de todos los proyectos:
"ConnectionStrings": {
"SqlServer": "[ SqlServer ConnectionString]",
"AzureServiceBus": "[ AzureServiceBus ConnectionString]"
},
Paso 9: Configurar Program.cs
del Orquestador
Configura MassTransit con Azure Service Bus y Entity Framework para usar SQL Server:
using System.Reflection;
using MassTransit;
using MassTransit.EntityFrameworkCoreIntegration;
using MicroservicesSagas.Commons;
using MicroservicesSagas.Orchestrator;
using MicroservicesSagas.Orchestrator.Data;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddDbContext<TransferSagaDbContext>((provider, options) =>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("SqlServer"), m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(TransferSagaDbContext)}");
});
});
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<TransferStateMachine, TransferSagaState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<TransferSagaDbContext>();
});
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));
cfg.ConfigureEndpoints(context);
});
});
var app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.UseHttpsRedirection();
app.MapPost("CreateTransaction", async (IPublishEndpoint publishEndpoint) =>
{
var payload = new SubmitTransferEvent(Guid.NewGuid(), Guid.NewGuid());
await publishEndpoint.Publish(payload);
return Results.Ok(payload);
});
app.MapGet("Transactions/{id}", async (Guid id, [FromServices] TransferSagaDbContext context) =>
{
var transfer = await context.Transfers.FirstOrDefaultAsync(x => x.TransactionId == id);
if (transfer is null) return Results.NotFound();
return Results.Ok(transfer);
});
app.Run();
Paso 10: Ejecutar Migraciones
Abre la consola Package Manager Console y ejecuta los siguientes comandos para crear la migración e inicializar la base de datos:
add-migration InitialMigration
update-database
Paso 11: Crear la carpeta Consumers
y la clase ValidateTransactionConsumer.cs
en ValidatorApi
Este consumidor validará la transferencia, emitiendo eventos si la validación falla o si es exitosa:
using MassTransit;
using MicroservicesSagas.Commons;
namespace MicroservicesSagas.ValidatorApi.Consumers
{
public class ValidateTransactionConsumer : IConsumer<ValidateTransferCommand>
{
public async Task Consume(ConsumeContext<ValidateTransferCommand> context)
{
if (await IsInvalidAmount(context.Message.TransactionId))
{
await context.Publish(new InvalidAmountEvent(context.Message.CorrelationId, context.Message.TransactionId, "El monto de la transferencia es inválido."));
return;
}
if (await IsInvalidAccount(context.Message.TransactionId))
{
await context.Publish(new InvalidAccountEvent(context.Message.CorrelationId, context.Message.TransactionId, "La cuenta de la transferencia es inválida."));
return;
}
await context.Publish(new TransferValidatedEvent(context.Message.CorrelationId, context.Message.TransactionId));
}
private Task<bool> IsInvalidAmount(Guid transactionId) => Task.FromResult(false); // Lógica de validación
private Task<bool> IsInvalidAccount(Guid transactionId) => Task.FromResult(false); // Lógica de validación
}
}
Paso 12: Configurar ValidatorApi
en Program.cs
Configura MassTransit y Azure Service Bus en ValidatorApi:
using MassTransit;
using MicroservicesSagas.Commons;
using MicroservicesSagas.ValidatorApi.Consumers;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<ValidateTransactionConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));
cfg.SubscriptionEndpoint<ValidateTransferCommand>("validator-api", e =>
{
e.ConfigureConsumer<ValidateTransactionConsumer>(context);
});
});
});
var app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.UseHttpsRedirection();
app.Run();
Paso 14: Crear la carpeta Consumers
y crear TransferConsumer.cs
en TransferApi
Este consumidor ejecutará la transferencia:
using MassTransit;
using MicroservicesSagas.Commons;
namespace MicroservicesSagas.TransferApi.Consumers
{
public class TransferConsumer : IConsumer<TransferCommand>
{
public async Task Consume(ConsumeContext<TransferCommand> context)
{
var transferResult = await TransferAsync(context.Message.TransactionId);
if (transferResult.Success)
{
await context.Publish(new TransferSucceededEvent(context.Message.CorrelationId, context.Message.TransactionId));
}
else
{
await context.Publish(new OtherReasonTransferFailedEvent(context.Message.CorrelationId, context.Message.TransactionId, "Transfer failed"));
}
}
private Task<TransferResult> TransferAsync(Guid transactionId) => Task.FromResult(new TransferResult(true));
record TransferResult(bool Success);
}
}
Paso 15: Crear CancelTransferConsumer.cs
en TransferApi
Este consumidor manejará la compensación de la transferencia fallida:
using MassTransit;
using MicroservicesSagas.Commons;
namespace MicroservicesSagas.TransferApi.Consumers
{
public class CancelTransferConsumer : IConsumer<CancelTransferCommand>
{
public async Task Consume(ConsumeContext<CancelTransferCommand> context)
{
await context.Publish(new TransferCanceledEvent(context.Message.CorrelationId, context.Message.TransactionId));
}
}
}
Paso 16: Configurar TransferApi
en Program.cs
Configura MassTransit y Azure Service Bus en TransferApi:
using MassTransit;
using MicroservicesSagas.Commons;
using MicroservicesSagas.TransferApi.Consumers;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<TransferConsumer>();
x.AddConsumer<CancelTransferConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));
cfg.SubscriptionEndpoint<TransferCommand>("transfer-api", e =>
{
e.ConfigureConsumer<TransferConsumer>(context);
});
cfg.SubscriptionEndpoint<CancelTransferCommand>("transfer-api", e =>
{
e.ConfigureConsumer<CancelTransferConsumer>(context);
});
});
});
var app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.UseHttpsRedirection();
app.Run();
Paso 17: Crear la carpeta Consumers
y crear ReceiptConsumer.cs
en ReceiptApi
Este consumidor se encargará de procesar el recibo de la transferencia:
using MassTransit;
using MicroservicesSagas.Commons;
namespace MicroservicesSagas.ReceiptApi.Consumers
{
public class ReceiptConsumer : IConsumer<IssueReceiptCommand>
{
public async Task Consume(ConsumeContext<IssueReceiptCommand> context)
{
var receiptResult = await RecordReceiptAsync(context.Message.TransactionId);
if (receiptResult.Success)
{
await context.Publish(new ReceiptIssuedEvent(context.Message.CorrelationId, context.Message.TransactionId));
}
else
{
await context.Publish(new OtherReasonReceiptFailedEvent(context.Message.CorrelationId, context.Message.TransactionId, "Receipt recording failed"));
}
}
private Task<ReceiptResult> RecordReceiptAsync(Guid transactionId) => Task.FromResult(new ReceiptResult(true));
record ReceiptResult(bool Success);
}
}
Paso 18: Configurar ReceiptApi
en Program.cs
Configura MassTransit y Azure Service Bus en ReceiptApi:
using MassTransit;
using MicroservicesSagas.Commons;
using MicroservicesSagas.ReceiptApi.Consumers;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<ReceiptConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("AzureServiceBus"));
cfg.SubscriptionEndpoint<IssueReceiptCommand>("receipt-api", e =>
{
e.ConfigureConsumer<ReceiptConsumer>(context);
});
});
});
var app = builder.Build();
app.UseSwagger();
app.UseSwaggerUI();
app.UseHttpsRedirection();
app.Run();
Paso 19: Configurar Visual Studio para ejecutar todos los proyectos a la vez
En el Explorador de Soluciones, hagan clic derecho en el archivo de la Solución y busquen la opción "Configurar Proyectos de Inicio".
Aparecerá una ventana y deberán configurarla tal como muestra la imagen.
Paso 20: Insertar Puntos de Interrupción (Breakpoints)
Si ejecutamos los servicios sin establecer puntos de interrupción, no podremos saber si están funcionando correctamente o no. Por ello, sugiero insertar breakpoints a lo largo del código, como se ilustra en las imágenes siguientes.
Paso 21: Ejecutar todo y probar Swagger
Conclusión
Hemos configurado paso a paso el patrón Sagas con MassTransit, Azure Service Bus y ASP.NET Core, manejando la validación, transferencia y generación de recibos en un escenario de microservicios. Este enfoque asegura que las transacciones se gestionen de forma segura y eficiente en entornos distribuidos.
Les dejo el repo de GitHub con todo el proyecto terminado
lauchacarro/MicroservicesSagas (github.com)
Si encuentras útil este trabajo y deseas mostrar tu apoyo, siempre puedes invitarme a un buen café ☕.