Módulo 7: ms-inventory — Reserva de Stock & Compensación
~2 horas
Objetivo
Crear ms-inventory desde cero — el participante principal de la SAGA. Este servicio escucha OrderCreatedEvent, reserva stock, y ejecuta la compensación si el pago falla.
ms-inventory tiene dos responsabilidades:
- Happy path: Recibir
OrderCreatedEvent→ reservar stock → publicarStockReservedEvent - Compensación: Recibir
PaymentFailedEvent→ devolver stock → publicarStockReleasedEvent
7.1 Crear el proyecto con Scaffold
# Desde la raíz de arka-lab/
mkdir ms-inventory && cd ms-inventory
plugins {
id 'co.com.bancolombia.cleanArchitecture' version '4.1.0'
}
gradle wrapper
./gradlew ca \
--package=co.com.arka.inventory \
--type=reactive \
--name=MsInventory \
--lombok=true \
--java-version=17
# Entry point + Driven adapters
./gradlew gep --type webflux
./gradlew gda --type secrets --secrets-backend aws_secrets_manager
./gradlew gda --type r2dbc
# Eliminar tests autogenerados
find . -path "*/src/test/*" -name "*.java" -delete
7.2 Actualizar .env
Agrega las variables de ms-inventory al .env:
MS_INVENTORY_PORT=8082
MS_INVENTORY_HOST=arka-ms-inventory
7.3 Definir los eventos de la SAGA
Duplicamos los records necesarios en ms-inventory:
package co.com.arka.inventory.model.events;
public record OrderCreatedEvent(
String orderId,
String sku,
Integer quantity,
Double amount) {
}
package co.com.arka.inventory.model.events;
public record StockReservedEvent(
String orderId,
String sku,
Integer quantity
) {
}
package co.com.arka.inventory.model.events;
public record StockReserveFailedEvent(
String orderId, String reason
) {
}
package co.com.arka.inventory.model.events;
public record StockReleasedEvent(
String orderId, String reason
) {
}
package co.com.arka.inventory.model.events;
public record PaymentFailedEvent(
String orderId,
String sku,
Integer quantity,
String reason) {
}
7.4 Modelo de Dominio — Product
package co.com.arka.inventory.model.product;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Product {
private String id;
private String sku;
private String name;
private Double price;
private Integer stock;
private String category;
}
No permitimos stock negativo. La verificacion se hace en el caso de uso y tambien existe un CHECK (stock >= 0) en la BD.
Puerto del Repositorio
package co.com.arka.inventory.model.product.gateways;
import co.com.arka.inventory.model.product.Product;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ProductRepository {
Mono<Product> save(Product product);
Mono<Product> findById(String id);
Mono<Product> findBySku(String sku);
Flux<Product> findAll();
}
Events Gateway (Interface Genérica)
Esta interfaz define el contrato agnóstico para que el Caso de Uso emita eventos al exterior, sin acoplarse con la implementación de Kafka:
package co.com.arka.inventory.model.events.gateways;
import reactor.core.publisher.Mono;
public interface EventsGateway<T> {
Mono<Void> emit(T event);
}
7.5 Caso de Uso — InventoryUseCase
package co.com.arka.inventory.usecase.inventory;
import co.com.arka.inventory.model.events.OrderCreatedEvent;
import co.com.arka.inventory.model.events.StockReleasedEvent;
import co.com.arka.inventory.model.events.StockReserveFailedEvent;
import co.com.arka.inventory.model.events.StockReservedEvent;
import co.com.arka.inventory.model.events.gateways.EventsGateway;
import co.com.arka.inventory.model.product.Product;
import co.com.arka.inventory.model.product.gateways.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.logging.Level;
@Log
@RequiredArgsConstructor
public class InventoryUseCase {
private final ProductRepository productRepository;
private final EventsGateway<StockReservedEvent> stockReservedEventsGateway;
private final EventsGateway<StockReleasedEvent> stockReleasedEventsGateway;
private final EventsGateway<StockReserveFailedEvent> stockReserveFailedEventsGateway;
public Mono<Void> reserveStock(OrderCreatedEvent event) {
log.log(Level.INFO, "Reserving stock for order: {0}, sku: {1}, qty: {2}",
new Object[]{event.orderId(), event.sku(), event.quantity()});
return productRepository.findBySku(event.sku())
.flatMap(product -> {
if (product.getStock() >= event.quantity()) {
product.setStock(product.getStock() - event.quantity());
return productRepository.save(product)
.flatMap(saved -> {
var reserved = new StockReservedEvent(event.orderId(), event.sku(), event.quantity());
log.log(Level.INFO, "Stock reserved successfully for order: {0}", event.orderId());
return stockReservedEventsGateway.emit(reserved);
});
} else {
var failed = new StockReserveFailedEvent(event.orderId(),
"Insufficient stock for SKU: " + event.sku()
+ ". Available: " + product.getStock()
+ ", Requested: " + event.quantity());
log.log(Level.WARNING, "Stock reservation failed for order: {0}", event.orderId());
return stockReserveFailedEventsGateway.emit(failed);
}
})
.switchIfEmpty(Mono.defer(() -> {
var failed = new StockReserveFailedEvent(event.orderId(),
"Product not found for SKU: " + event.sku());
log.log(Level.WARNING, "Product not found for SKU: {0}", event.sku());
return stockReserveFailedEventsGateway.emit(failed);
}));
}
public Mono<Void> releaseStock(String orderId, String sku, Integer quantity, String reason) {
if (sku == null || quantity == null || quantity <= 0) {
log.log(Level.WARNING, "Cannot release stock for order {0}: invalid reservation data (sku={1}, quantity={2})",
new Object[]{orderId, sku, quantity});
var released = new StockReleasedEvent(orderId, reason + " (invalid reservation data)");
return stockReleasedEventsGateway.emit(released);
}
log.log(Level.INFO, "Releasing stock for order: {0}, sku: {1}, qty: {2}",
new Object[]{orderId, sku, quantity});
return productRepository.findBySku(sku)
.flatMap(product -> {
product.setStock(product.getStock() + quantity);
return productRepository.save(product)
.flatMap(saved -> {
var released = new StockReleasedEvent(orderId, reason);
log.log(Level.INFO, "Stock released for order: {0}", orderId);
return stockReleasedEventsGateway.emit(released);
});
})
.switchIfEmpty(Mono.defer(() -> {
log.log(Level.WARNING, "Cannot release stock: product not found for SKU: {0}", sku);
var released = new StockReleasedEvent(orderId, reason + " (product not found)");
return stockReleasedEventsGateway.emit(released);
}));
}
public Mono<Product> getProduct(String sku) {
return productRepository.findBySku(sku);
}
public Flux<Product> getAllProducts() {
return productRepository.findAll();
}
}
7.6 Infraestructura — Secrets + R2DBC
Misma estructura que ms-orders:
KafkaBrokerSecretConsumer
package co.com.arka.inventory.events.config;
import lombok.Builder;
@Builder(toBuilder = true)
public record KafkaBrokerSecretConsumer(
String bootstrapServers,
String groupId,
String autoOffsetReset,
Topics topics) {
@Builder(toBuilder = true)
public record Topics(
String orderCreated,
String stockReserved,
String stockReleased,
String stockFailed,
String paymentFailed,
String orderConfirmed,
String orderCancelled
) {}
}
KafkaBrokerSecretProducer
package co.com.arka.inventory.events.config;
import lombok.Builder;
@Builder(toBuilder = true)
public record KafkaBrokerSecretProducer(
String bootstrapServers,
String groupId,
String autoOffsetReset,
Topics topics) {
@Builder(toBuilder = true)
public record Topics(
String orderCreated,
String stockReserved,
String stockReleased,
String stockFailed,
String paymentFailed,
String orderConfirmed,
String orderCancelled
) {}
}
SecretsConfig
package co.com.arka.inventory.config;
import co.com.arka.inventory.events.config.KafkaBrokerSecretConsumer;
import co.com.arka.inventory.events.config.KafkaBrokerSecretProducer;
import co.com.arka.inventory.r2dbc.config.PostgresqlConnectionProperties;
import co.com.bancolombia.secretsmanager.api.GenericManagerAsync;
import co.com.bancolombia.secretsmanager.api.exceptions.SecretException;
import co.com.bancolombia.secretsmanager.config.AWSSecretsManagerConfig;
import co.com.bancolombia.secretsmanager.connector.AWSSecretManagerConnectorAsync;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.regions.Region;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class SecretsConfig {
@Bean
public GenericManagerAsync getSecretManager(@Value("${aws.region}") String region,
@Value("${aws.endpoint}") String endpoint) {
return new AWSSecretManagerConnectorAsync(getConfig(region, endpoint));
}
@Bean
public PostgresqlConnectionProperties postgresqlConnectionProperties(
GenericManagerAsync secretManager,
@Value("${aws.secrets.db-name}") String dbSecretName) throws SecretException {
return secretManager.getSecret(dbSecretName, PostgresqlConnectionProperties.class)
.doOnSuccess(e -> log.info("Secret was obtained successfully: {}", dbSecretName))
.doOnError(e -> log.error("Error getting secret: {}", e.getMessage()))
.onErrorMap(e -> new RuntimeException("Error getting secret", e))
.block();
}
@Bean
public KafkaBrokerSecretConsumer kafkaBrokerSecretConsumer(
GenericManagerAsync secretManager,
@Value("${aws.secrets.kafka-name}") String kafkaSecretName
) throws SecretException {
return secretManager.getSecret(kafkaSecretName, KafkaBrokerSecretConsumer.class)
.doOnSuccess(e -> log.info("Secret was obtained successfully: {}", kafkaSecretName))
.doOnError(e -> log.error("Error getting secret: {}", e.getMessage()))
.onErrorMap(e -> new RuntimeException("Error getting secret", e))
.block();
}
@Bean
public KafkaBrokerSecretProducer kafkaBrokerSecretProducer(
GenericManagerAsync secretManager,
@Value("${aws.secrets.kafka-name}") String kafkaSecretName
) throws SecretException {
return secretManager.getSecret(kafkaSecretName, KafkaBrokerSecretProducer.class)
.doOnSuccess(e -> log.info("Secret was obtained successfully: {}", kafkaSecretName))
.doOnError(e -> log.error("Error getting secret: {}", e.getMessage()))
.onErrorMap(e -> new RuntimeException("Error getting secret", e))
.block();
}
private AWSSecretsManagerConfig getConfig(String region, String endpoint) {
return AWSSecretsManagerConfig.builder()
.region(Region.of(region))
.endpoint(endpoint)
.cacheSize(5) // TODO Set cache size
.cacheSeconds(3600) // TODO Set cache seconds
.build();
}
}
Entidad R2DBC
package co.com.arka.inventory.r2dbc.entity;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
@Data @Builder @NoArgsConstructor @AllArgsConstructor
@Table("products")
public class ProductData {
@Id
private String id;
private String sku;
private String name;
private Double price;
private Integer stock;
private String category;
}
Repositorio + Adapter
package co.com.arka.inventory.r2dbc;
import co.com.arka.inventory.r2dbc.entity.ProductData;
import org.springframework.data.repository.query.ReactiveQueryByExampleExecutor;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;
public interface ProductReactiveRepository extends ReactiveCrudRepository<ProductData, String>, ReactiveQueryByExampleExecutor<ProductData> {
Mono<ProductData> findBySku(String sku);
}
package co.com.arka.inventory.r2dbc;
import co.com.arka.inventory.model.product.Product;
import co.com.arka.inventory.model.product.gateways.ProductRepository;
import co.com.arka.inventory.r2dbc.entity.ProductData;
import co.com.arka.inventory.r2dbc.helper.ReactiveAdapterOperations;
import org.reactivecommons.utils.ObjectMapper;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
@Repository
public class ProductReactiveRepositoryAdapter extends ReactiveAdapterOperations<
Product,
ProductData,
String,
ProductReactiveRepository
> implements ProductRepository {
public ProductReactiveRepositoryAdapter(ProductReactiveRepository repository, ObjectMapper mapper) {
/**
* Could be use mapper.mapBuilder if your domain model implement builder pattern
* super(repository, mapper, d -> mapper.mapBuilder(d,ObjectModel.ObjectModelBuilder.class).build());
* Or using mapper.map with the class of the object model
*/
super(repository, mapper, d -> mapper.map(d, Product.class));
}
@Override
public Mono<Product> findBySku(String sku) {
return repository.findBySku(sku).map(this::toEntity);
}
}
PostgreSQL Connection Pool
package co.com.arka.inventory.r2dbc.config;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
public class PostgreSQLConnectionPool {
public static final int INITIAL_SIZE = 12;
public static final int MAX_SIZE = 15;
public static final int MAX_IDLE_TIME = 30;
@Bean
public ConnectionPool connectionPool(PostgresqlConnectionProperties props) {
PostgresqlConnectionConfiguration dbConfig = PostgresqlConnectionConfiguration.builder()
.host(props.host())
.port(props.port())
.database(props.database())
.username(props.username())
.password(props.password())
.build();
ConnectionPoolConfiguration poolConfiguration = ConnectionPoolConfiguration.builder()
.connectionFactory(new PostgresqlConnectionFactory(dbConfig))
.name("api-postgres-connection-pool")
.initialSize(INITIAL_SIZE)
.maxSize(MAX_SIZE)
.maxIdleTime(Duration.ofMinutes(MAX_IDLE_TIME))
.validationQuery("SELECT 1")
.build();
return new ConnectionPool(poolConfiguration);
}
}
7.7 Emisión de eventos (async-event-bus)
A diferencia de otros servicios, ms-inventory emite 3 tipos de eventos diferentes dependiendo de la disponibilidad de stock, de modo que delegamos cada uno a su propio Gateway de CloudEvents respectivo:
StockReservedEventsGateway
package co.com.arka.inventory.events;
import co.com.arka.inventory.events.config.KafkaBrokerSecretProducer;
import co.com.arka.inventory.model.events.StockReservedEvent;
import co.com.arka.inventory.model.events.gateways.EventsGateway;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
import reactor.core.publisher.Mono;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonCloudEventData;
import tools.jackson.databind.ObjectMapper;
import java.util.UUID;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.logging.Level;
import static reactor.core.publisher.Mono.from;
@Log
@RequiredArgsConstructor
@EnableDomainEventBus
public class StockReservedEventsGateway implements EventsGateway<StockReservedEvent> {
private final KafkaBrokerSecretProducer kafkaBrokerSecretProducer;
private final DomainEventBus domainEventBus;
private final ObjectMapper om;
@Override
public Mono<Void> emit(StockReservedEvent event) {
String eventName = kafkaBrokerSecretProducer.topics().stockReserved();
log.log(Level.INFO, "Sending domain event: {0}: {1}", new String[]{eventName, event.toString()});
CloudEvent eventCloudEvent = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("https://reactive-commons.org/foos"))
.withType(eventName)
.withTime(OffsetDateTime.now())
.withData("application/json", JsonCloudEventData.wrap(om.valueToTree(event)))
.build();
return from(domainEventBus.emit(eventCloudEvent));
}
}
StockReleasedEventsGateway
package co.com.arka.inventory.events;
import co.com.arka.inventory.events.config.KafkaBrokerSecretProducer;
import co.com.arka.inventory.model.events.StockReleasedEvent;
import co.com.arka.inventory.model.events.gateways.EventsGateway;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
import reactor.core.publisher.Mono;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonCloudEventData;
import tools.jackson.databind.ObjectMapper;
import java.util.UUID;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.logging.Level;
import static reactor.core.publisher.Mono.from;
@Log
@RequiredArgsConstructor
@EnableDomainEventBus
public class StockReleasedEventsGateway implements EventsGateway<StockReleasedEvent> {
private final KafkaBrokerSecretProducer kafkaBrokerSecretProducer;
private final DomainEventBus domainEventBus;
private final ObjectMapper om;
@Override
public Mono<Void> emit(StockReleasedEvent event) {
String eventName = kafkaBrokerSecretProducer.topics().stockReleased();
log.log(Level.INFO, "Sending domain event: {0}: {1}", new String[]{eventName, event.toString()});
CloudEvent eventCloudEvent = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("https://reactive-commons.org/foos"))
.withType(eventName)
.withTime(OffsetDateTime.now())
.withData("application/json", JsonCloudEventData.wrap(om.valueToTree(event)))
.build();
return from(domainEventBus.emit(eventCloudEvent));
}
}
StockReserveFailedEventsGateway
package co.com.arka.inventory.events;
import co.com.arka.inventory.events.config.KafkaBrokerSecretProducer;
import co.com.arka.inventory.model.events.StockReserveFailedEvent;
import co.com.arka.inventory.model.events.gateways.EventsGateway;
import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonCloudEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
import reactor.core.publisher.Mono;
import tools.jackson.databind.ObjectMapper;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;
import java.util.logging.Level;
import static reactor.core.publisher.Mono.from;
@Log
@RequiredArgsConstructor
@EnableDomainEventBus
public class StockReserveFailedEventsGateway implements EventsGateway<StockReserveFailedEvent> {
private final KafkaBrokerSecretProducer kafkaBrokerSecretProducer;
private final DomainEventBus domainEventBus;
private final ObjectMapper om;
@Override
public Mono<Void> emit(StockReserveFailedEvent event) {
String eventName = kafkaBrokerSecretProducer.topics().stockFailed();
log.log(Level.INFO, "Sending domain event: {0}: {1}", new String[]{eventName, event.toString()});
CloudEvent eventCloudEvent = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("https://reactive-commons.org/foos"))
.withType(eventName)
.withTime(OffsetDateTime.now())
.withData("application/json", JsonCloudEventData.wrap(om.valueToTree(event)))
.build();
return from(domainEventBus.emit(eventCloudEvent));
}
}
7.8 Consumo de eventos (async-event-handler)
En este lab no configuramos consumers manuales. Usamos Reactive Commons para consumir eventos desde Kafka.
package co.com.arka.inventory.events.handlers;
import co.com.arka.inventory.model.events.OrderCreatedEvent;
import co.com.arka.inventory.model.events.PaymentFailedEvent;
import co.com.arka.inventory.usecase.inventory.InventoryUseCase;
import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.impl.config.annotations.EnableEventListeners;
import reactor.core.publisher.Mono;
import lombok.extern.java.Log;
import java.util.Optional;
import java.util.logging.Level;
import io.cloudevents.CloudEvent;
import tools.jackson.databind.ObjectMapper;
@Log
@RequiredArgsConstructor
@EnableEventListeners
public class EventsHandler {
private final InventoryUseCase inventoryUseCase;
private final ObjectMapper objectMapper;
public Mono<Void> handleOrderCreatedEvent(CloudEvent event) {
Optional<OrderCreatedEvent> orderCreatedEvent = deserialize(event, OrderCreatedEvent.class);
return Mono.justOrEmpty(orderCreatedEvent)
.doOnNext(e -> log.log(Level.INFO, "Event received: {0} -> {1}", new Object[]{event.getType(), e}))
.flatMap(inventoryUseCase::reserveStock)
.doOnSuccess(v -> log.info("Stock reservation processed for event: " + event.getId()))
.then();
}
public Mono<Void> handlePaymentFailedEvent(CloudEvent event) {
Optional<PaymentFailedEvent> paymentFailedEvent = deserialize(event, PaymentFailedEvent.class);
return Mono.justOrEmpty(paymentFailedEvent)
.doOnNext(e -> log.log(Level.INFO, "Event received: {0} -> {1}", new Object[]{event.getType(), e}))
.flatMap(e -> inventoryUseCase.releaseStock(e.orderId(), e.sku(), e.quantity(), e.reason()))
.then();
}
private <T> Optional<T> deserialize(CloudEvent event, Class<T> clazz) {
if (event == null || event.getData() == null) {
log.warning("Received event with empty data");
return Optional.empty();
}
try {
return Optional.of(objectMapper.readValue(event.getData().toBytes(), clazz));
} catch (Exception e) {
log.log(Level.SEVERE, "Failed to deserialize event data to " + clazz.getSimpleName(), e);
throw new RuntimeException("Failed to map event data", e);
}
}
}
package co.com.arka.inventory.events;
import co.com.arka.inventory.events.config.KafkaBrokerSecretConsumer;
import co.com.arka.inventory.events.handlers.EventsHandler;
import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.api.HandlerRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@RequiredArgsConstructor
public class HandlerRegistryConfiguration {
private final KafkaBrokerSecretConsumer kafkaBrokerSecretConsumer;
// see more at: https://reactivecommons.org/reactive-commons-java/#_handlerregistry_2
@Bean
public HandlerRegistry handlerRegistry(EventsHandler events) {
return HandlerRegistry.register()
.listenCloudEvent(kafkaBrokerSecretConsumer.topics().orderCreated(), events::handleOrderCreatedEvent)
.listenCloudEvent(kafkaBrokerSecretConsumer.topics().paymentFailed(), events::handlePaymentFailedEvent);
}
}
7.8 Entry Point — REST
package co.com.arka.inventory.api;
import co.com.arka.inventory.model.product.Product;
import co.com.arka.inventory.usecase.inventory.InventoryUseCase;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.Map;
@Component
@RequiredArgsConstructor
public class Handler {
private final InventoryUseCase inventoryUseCase;
public Mono<ServerResponse> healthCheck(ServerRequest request) {
return ServerResponse.ok().bodyValue(Map.of(
"service", "ms-inventory",
"status", "UP",
"timestamp", LocalDateTime.now().toString()
));
}
public Mono<ServerResponse> getProduct(ServerRequest request) {
String sku = request.pathVariable("sku");
return inventoryUseCase.getProduct(sku)
.flatMap(p -> ServerResponse.ok().bodyValue(p))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> getAllProducts(ServerRequest request) {
return ServerResponse.ok().body(inventoryUseCase.getAllProducts(), Product.class);
}
}
package co.com.arka.inventory.api;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class RouterRest {
@Bean
public RouterFunction<ServerResponse> routerFunction(Handler handler) {
return route(GET("/api/health"), handler::healthCheck)
.andRoute(GET("/api/products/{sku}"), handler::getProduct)
.andRoute(GET("/api/products"), handler::getAllProducts);
}
}
7.9 Configurar application.yaml
server:
port: ${MS_INVENTORY_PORT:8082}
spring:
application:
name: "MsInventory"
devtools:
add-properties: false
h2:
console:
enabled: true
path: "/h2"
profiles:
include: null
kafka:
bootstrap-servers: "${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}"
management:
endpoints:
web:
exposure:
include: "health,prometheus"
endpoint:
health:
probes:
enabled: true
cors:
allowed-origins: "http://localhost:4200,http://localhost:8080"
aws:
endpoint: "http://${LOCALSTACK_HOST:localhost}:${LOCALSTACK_PORT:4566}"
region: "${AWS_REGION:us-east-1}"
secrets:
db-name: "dev/arka/db-inventory-creds"
kafka-name: "dev/arka/kafka-config"
reactive:
commons:
kafka:
app:
connectionProperties:
bootstrap-servers: "${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}"
7.10 Dockerfile
# ── Stage 1: Build ──
FROM gradle:9.2-jdk21 AS builder
VOLUME /tmp
WORKDIR /myapp
COPY applications applications
COPY domain domain
COPY infrastructure infrastructure
COPY *.gradle .
COPY lombok.* .
COPY gradlew.* .
COPY gradle.* .
RUN gradle build -x test --no-daemon
# ── Stage 2: Run ──
FROM eclipse-temurin:21-jre-alpine
VOLUME /tmp
WORKDIR /myapprun
COPY --from=builder /myapp/applications/app-service/build/libs/*.jar MsInventory.jar
RUN apk update && apk add curl
ARG PORT=8082
ENV JAVA_OPTS=" -XX:+UseContainerSupport -XX:MaxRAMPercentage=70 -Djava.security.egd=file:/dev/./urandom"
ENV MS_INVENTORY_PORT=${PORT}
EXPOSE ${MS_INVENTORY_PORT}
ENTRYPOINT ["/bin/sh", "-c", "/opt/java/openjdk/bin/java $JAVA_OPTS -jar MsInventory.jar"]
7.11 Agregar al Docker Compose
# ═══════════════════════════════════════════════════
# MsInventory — Microservicio de Inventario
# ═══════════════════════════════════════════════════
ms-inventory:
build:
context: ./ms-inventory
dockerfile: deployment/Dockerfile
args:
- PORT=${MS_INVENTORY_PORT}
container_name: arka-ms-inventory
ports:
- "8082-8085:${MS_INVENTORY_PORT}"
env_file:
- .env
depends_on:
postgres-inventory:
condition: service_healthy
localstack:
condition: service_healthy
kafka:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:${MS_INVENTORY_PORT}/actuator/health"]
interval: 15s
timeout: 5s
retries: 5
start_period: 60s
labels:
- "traefik.enable=true"
- "traefik.http.routers.inventory.rule=PathPrefix(`/inventory`)"
- "traefik.http.middlewares.inventory-stripprefix.stripprefix.prefixes=/inventory"
- "traefik.http.routers.inventory.middlewares=inventory-stripprefix"
- "traefik.http.services.inventory.loadbalancer.server.port=${MS_INVENTORY_PORT}"
networks:
- arka-network
Las labels de Traefik permiten el balanceo de carga automático cuando escales con --scale ms-inventory=3. Traefik detecta las réplicas y distribuye el tráfico Round-Robin.
7.12 Construir y Probar
# Construir
docker compose up -d --build ms-inventory
# Verificar
docker compose ps
docker logs arka-ms-inventory --tail 50
Probar healthcheck
curl http://localhost:8082/api/health | python3 -m json.tool
Ver productos del seed (del SQL init)
curl http://localhost:8082/api/products | python3 -m json.tool
Probar el flujo SAGA parcial
Ahora que ms-orders y ms-inventory están corriendo:
# Crear una orden
curl -X POST http://localhost:8081/api/orders \
-H "Content-Type: application/json" \
-d '{
"customerId": "cust-001",
"sku": "GPU-RTX-004",
"quantity": 1,
"unitPrice": 1599.99
}' | python3 -m json.tool
Verificar los logs:
# ms-orders: publicó OrderCreatedEvent
docker logs arka-ms-orders --tail 10
# ms-inventory: recibió OrderCreatedEvent, reservó stock, publicó StockReservedEvent
docker logs arka-ms-inventory --tail 10
Verificar en KafkaUI (http://localhost:8080):
- Topic
order-created→ mensaje publicado por ms-orders - Topic
stock-reserved→ mensaje publicado por ms-inventory
Aún falta ms-payment para completar la SAGA. Lo implementaremos en el siguiente módulo.
7.13 ¿Qué acabamos de construir?
- ¿
arka-ms-inventoryestáhealthy? - ¿
GET /api/productsretorna los productos del seed? - ¿Al crear una orden, KafkaUI muestra
stock-reserved?
Siguiente: Módulo 8: ms-payment — Simulador HTTP