1. 주문서비스의 OrderCancelled 와 배송서비스의 DeliveryCancelled 라는 이벤트는 이미 존재 합니다.
  • 그러면 해야할 작업은 상품 서비스에서 OrderPlaced 이벤트가 왔을때, 재고량 체크를 하여 재고량이 없을시 ProductOutOfStock 이벤트를 발행 하면 됩니다.
  • 주문 서비스에서 ProductOutOfStock 이벤트를 수신하여 OrderCancelled 이벤트를 발행하도록 구현하면 됩니다.
  1. 현재 주문 서비스에서 재고량을 체크 하도록 샘플은 구현이 되어있습니다. 주문서비스의 Order.java 파일을 열어서 throw new OrderException(“No Available stock!”); 라고 설정되어있는 부분을 주석 처리 합니다. (2개)

order 프로젝트 > Order.java


@PrePersist
private void orderCheck(){
...
if("true".equalsIgnoreCase(env.getProperty("checkStock"))){
// 1. 주문에 대한 상품 조회 - API
...
if( jsonObject.get("stock").getAsInt() < getQuantity()){
//          throw new OrderException("No Available stock!");
}
}else{
...
if( product.getStock() < getQuantity()){
//          throw new OrderException("No Available stock!");
}
}
...
}
  1. 상품 서비스에서 ProductOutOfStock 이벤트를 생성합니다.
  • 간단하게 상품아이디와 주문아이디만 받아서 넘기는 작업을 하겠습니다.

products 프로젝트 > ProductOutOfStock.java

public class ProductOutOfStock extends AbstractEvent {

private String stateMessage = "재고량 바닥";
private Long productId;
private Long orderId;

public ProductOutOfStock(){
super();
}

public String getStateMessage() {
return stateMessage;
}

public void setStateMessage(String stateMessage) {
this.stateMessage = stateMessage;
}

public Long getProductId() {
return productId;
}

public void setProductId(Long productId) {
this.productId = productId;
}

public Long getOrderId() {
return orderId;
}

public void setOrderId(Long orderId) {
this.orderId = orderId;
}
}
  1. 상품 서비스에서 재고량이 없는 경우 ProductOutOfStock 이벤트를 발행하도록 구현합니다.
  • ProductService.java 의 @StreamListener 부분에 아래처럼 변경

products 프로젝트 > ProductService.java 파일의 기존 코드


/**
* 주문이 발생시, 수량을 줄인다.
*/
if( orderPlaced.isMe()){

Optional<Product> productOptional = productRepository.findById(orderPlaced.getProductId());
Product product = productOptional.get();
product.setStock(product.getStock() - orderPlaced.getQuantity());

productRepository.save(product);

}

products 프로젝트 > ProductService.java 파일의 변경 코드


/**
* 주문이 발생시, 수량을 줄인다.
*/
if( orderPlaced.isMe()){

Optional<Product> productOptional = productRepository.findById(orderPlaced.getProductId());
    Product product = productOptional.get();
    product.setStock(product.getStock() - orderPlaced.getQuantity());

    if( product.getStock() < 0 ){
            System.out.println("productOutOfStock 이벤트 발생");
            ProductOutOfStock productOutOfStock = new ProductOutOfStock();
            productOutOfStock.setProductId(orderPlaced.getProductId());
            productOutOfStock.setOrderId(orderPlaced.getOrderId());
            productOutOfStock.publish();

    }else{
        productRepository.save(product);
    }

    }
  • 여기까지 작업시 주문발생 하였을때, 상품서비스의 재고량이 없다면 ProductOutOfStock 이벤트가 발행됩니다.
  1. 주문서비스에서 ProductOutOfStock 이벤트를 수신하여 주문을 취소하는 로직을 작성합니다.
  • ProductOutOfStock 이벤트를 주문서비스에서 받아야 하니, 상품서비스의 ProductOutOfStock.java 파일을 복사하여 주문서비스에 붙여넣기 합니다.

  • 이벤트를 수신하는 OrderService.java 파일에서 아래와 같이 StreamListener 를 추가합니다.

order 프로젝트 > OrderService.java


@Autowired
private OrderRepository orderRepository;


@StreamListener(KafkaProcessor.INPUT)
public void onProductOutOfStock(@Payload ProductOutOfStock productOutOfStock) {
    try {
        if (productOutOfStock.isMe()) {
        System.out.println("##### listener : " + productOutOfStock.toJson());
        Optional<Order> orderOptional = orderRepository.findById(productOutOfStock.getOrderId());
                Order order = orderOptional.get();
                order.setState("OrderCancelled");
                orderRepository.save(order);
        }
    } catch (Exception e){
        e.printStackTrace();
    }
}
  1. 주문,상품,배송 서비스를 모두 재시작 합니다.

  2. 재고량이 없을시, ProductOutOfStock 가 발행되고 주문이 취소 되는지 확인 합니다.

  • 재고량이 넘치도록 주문을 합니다. (상품ID 1번을 100개 주문)

  • http localhost:8081/orders productId=1 quantity=100 customerId="1@uengine.org" customerName=“홍길동” customerAddr=“서울시”

  • 카프카의 consumer 를 조회하여 이벤트가 어떻게 호출 되는지 확인 해 봅니다

  • 윈도우
    kafka-console-consumer.bat --bootstrap-server http://localhost:9092 --topic eventTopic --from-beginning

  • 리눅스
    kafka-console-consumer.sh --bootstrap-server http://localhost:9092 --topic eventTopic --from-beginning


{"eventType":"OrderPlaced","timestamp":"20200528152024","stateMessage":"주문이 발생함","productId":1,"orderId":1,"productName":"TV","quantity":100,"price":10000,"customerId":"1@uengine.org","customerName":"“홍길동”","customerAddr":"서울시","me":true}

{"eventType":"ProductOutOfStock","timestamp":"20200528152024","stateMessage":"재고량 바닥","productId":1,"orderId":1,"me":true}

{"eventType":"OrderCancelled","timestamp":"20200528152024","stateMessage":"주문이 취소됨","productId":1,"orderId":1,"productName":"TV","quantity":100,"price":10000,"customerId":"1@uengine.org","customerName":"“홍길동”","me":true}

{"eventType":"ProductChanged","timestamp":"20200528152024","stateMessage":"상품 변경이 발생함","productId":1,"productName":"TV","productPrice":10000,"productStock":110,"imageUrl":"/goods/img/TV.jpg","me":true}

{"eventType":"DeliveryStarted","timestamp":"20200528152024","stateMessage":"배송이 시작됨","deliveryId":1,"orderId":1,"quantity":100,"productId":null,"productName":"TV","customerId":"1@uengine.org","customerName":"“홍길동”","deliveryAddress":"서울시","deliveryState":"DeliveryStarted","me":true}

{"eventType":"DeliveryCancelled","timestamp":"20200528152025","stateMessage":"배송이 취소됨","deliveryId":1,"orderId":1,"quantity":100,"productId":null,"productName":"TV","customerId":"1@uengine.org","customerName":"“홍길동”","deliveryAddress":"서울시","deliveryState":"DeliveryCancelled","me":true}
  1. 주문을 하였을때 ProductOutOfStock 와 OrderCancelled, DeliveryCancelled 등이 연이어 발생 하는 것을 확인 할 수 있습니다. 추가적으로 ProductOutOfStock 이벤트일때 ProductChanged 가 발행하지 않도록 하는 구현이 추가가 되어야 합니다. 해당 부분은 직접 구현 하여 보시고, 결과는 아래 링크에서 확인 하실 수 있습니다.

구현 완료시 프로세스 모형

Copyright © uEngine All Rights Reserved | 주소 : 서울특별시 서초구 신반포로45길 18 502호(잠원동, 주일빌딩)
사업자등록번호 : 211-87-95355 | 전화번호 : 02-567-8301 | 대표이사 : 장진영
Copyright © uEngine All Rights Reserved
주소 : 서울특별시 서초구 신반포로45길 18 502호(잠원동, 주일빌딩)
사업자등록번호 : 211-87-95355
전화번호 : 02-567-8301
대표이사 : 장진영