- 주문서비스의 OrderCancelled 와 배송서비스의 DeliveryCancelled 라는 이벤트는 이미 존재 합니다.
- 그러면 해야할 작업은 상품 서비스에서 OrderPlaced 이벤트가 왔을때, 재고량 체크를 하여 재고량이 없을시 ProductOutOfStock 이벤트를 발행 하면 됩니다.
- 주문 서비스에서 ProductOutOfStock 이벤트를 수신하여 OrderCancelled 이벤트를 발행하도록 구현하면 됩니다.
- 현재 주문 서비스에서 재고량을 체크 하도록 샘플은 구현이 되어있습니다. 주문서비스의 Order.java 파일을 열어서 throw new OrderException(“No Available stock!”); 라고 설정되어있는 부분을 주석 처리 합니다. (2개)
order 프로젝트 > Order.java
@PrePersist
private void orderCheck(){
...
if("true".equalsIgnoreCase(env.getProperty("checkStock"))){
// 1. 주문에 대한 상품 조회 - API
...
if( jsonObject.get("stock").getAsInt() < getQuantity()){
// throw new OrderException("No Available stock!");
}
}else{
...
if( product.getStock() < getQuantity()){
// throw new OrderException("No Available stock!");
}
}
...
}
- 상품 서비스에서 ProductOutOfStock 이벤트를 생성합니다.
- 간단하게 상품아이디와 주문아이디만 받아서 넘기는 작업을 하겠습니다.
products 프로젝트 > ProductOutOfStock.java
public class ProductOutOfStock extends AbstractEvent {
private String stateMessage = "재고량 바닥";
private Long productId;
private Long orderId;
public ProductOutOfStock(){
super();
}
public String getStateMessage() {
return stateMessage;
}
public void setStateMessage(String stateMessage) {
this.stateMessage = stateMessage;
}
public Long getProductId() {
return productId;
}
public void setProductId(Long productId) {
this.productId = productId;
}
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
}
- 상품 서비스에서 재고량이 없는 경우 ProductOutOfStock 이벤트를 발행하도록 구현합니다.
- ProductService.java 의 @StreamListener 부분에 아래처럼 변경
products 프로젝트 > ProductService.java 파일의 기존 코드
/**
* 주문이 발생시, 수량을 줄인다.
*/
if( orderPlaced.isMe()){
Optional<Product> productOptional = productRepository.findById(orderPlaced.getProductId());
Product product = productOptional.get();
product.setStock(product.getStock() - orderPlaced.getQuantity());
productRepository.save(product);
}
products 프로젝트 > ProductService.java 파일의 변경 코드
/**
* 주문이 발생시, 수량을 줄인다.
*/
if( orderPlaced.isMe()){
Optional<Product> productOptional = productRepository.findById(orderPlaced.getProductId());
Product product = productOptional.get();
product.setStock(product.getStock() - orderPlaced.getQuantity());
if( product.getStock() < 0 ){
System.out.println("productOutOfStock 이벤트 발생");
ProductOutOfStock productOutOfStock = new ProductOutOfStock();
productOutOfStock.setProductId(orderPlaced.getProductId());
productOutOfStock.setOrderId(orderPlaced.getOrderId());
productOutOfStock.publish();
}else{
productRepository.save(product);
}
}
- 여기까지 작업시 주문발생 하였을때, 상품서비스의 재고량이 없다면 ProductOutOfStock 이벤트가 발행됩니다.
- 주문서비스에서 ProductOutOfStock 이벤트를 수신하여 주문을 취소하는 로직을 작성합니다.
-
ProductOutOfStock 이벤트를 주문서비스에서 받아야 하니, 상품서비스의 ProductOutOfStock.java 파일을 복사하여 주문서비스에 붙여넣기 합니다.
-
이벤트를 수신하는 OrderService.java 파일에서 아래와 같이 StreamListener 를 추가합니다.
order 프로젝트 > OrderService.java
@Autowired
private OrderRepository orderRepository;
@StreamListener(KafkaProcessor.INPUT)
public void onProductOutOfStock(@Payload ProductOutOfStock productOutOfStock) {
try {
if (productOutOfStock.isMe()) {
System.out.println("##### listener : " + productOutOfStock.toJson());
Optional<Order> orderOptional = orderRepository.findById(productOutOfStock.getOrderId());
Order order = orderOptional.get();
order.setState("OrderCancelled");
orderRepository.save(order);
}
} catch (Exception e){
e.printStackTrace();
}
}
-
주문,상품,배송 서비스를 모두 재시작 합니다.
-
재고량이 없을시, ProductOutOfStock 가 발행되고 주문이 취소 되는지 확인 합니다.
-
재고량이 넘치도록 주문을 합니다. (상품ID 1번을 100개 주문)
-
http localhost:8081/orders productId=1 quantity=100 customerId="1@uengine.org" customerName=“홍길동” customerAddr=“서울시”
-
카프카의 consumer 를 조회하여 이벤트가 어떻게 호출 되는지 확인 해 봅니다
-
윈도우
kafka-console-consumer.bat --bootstrap-server http://localhost:9092 --topic eventTopic --from-beginning
-
리눅스
kafka-console-consumer.sh --bootstrap-server http://localhost:9092 --topic eventTopic --from-beginning
{"eventType":"OrderPlaced","timestamp":"20200528152024","stateMessage":"주문이 발생함","productId":1,"orderId":1,"productName":"TV","quantity":100,"price":10000,"customerId":"1@uengine.org","customerName":"“홍길동”","customerAddr":"서울시","me":true}
{"eventType":"ProductOutOfStock","timestamp":"20200528152024","stateMessage":"재고량 바닥","productId":1,"orderId":1,"me":true}
{"eventType":"OrderCancelled","timestamp":"20200528152024","stateMessage":"주문이 취소됨","productId":1,"orderId":1,"productName":"TV","quantity":100,"price":10000,"customerId":"1@uengine.org","customerName":"“홍길동”","me":true}
{"eventType":"ProductChanged","timestamp":"20200528152024","stateMessage":"상품 변경이 발생함","productId":1,"productName":"TV","productPrice":10000,"productStock":110,"imageUrl":"/goods/img/TV.jpg","me":true}
{"eventType":"DeliveryStarted","timestamp":"20200528152024","stateMessage":"배송이 시작됨","deliveryId":1,"orderId":1,"quantity":100,"productId":null,"productName":"TV","customerId":"1@uengine.org","customerName":"“홍길동”","deliveryAddress":"서울시","deliveryState":"DeliveryStarted","me":true}
{"eventType":"DeliveryCancelled","timestamp":"20200528152025","stateMessage":"배송이 취소됨","deliveryId":1,"orderId":1,"quantity":100,"productId":null,"productName":"TV","customerId":"1@uengine.org","customerName":"“홍길동”","deliveryAddress":"서울시","deliveryState":"DeliveryCancelled","me":true}
- 주문을 하였을때 ProductOutOfStock 와 OrderCancelled, DeliveryCancelled 등이 연이어 발생 하는 것을 확인 할 수 있습니다. 추가적으로 ProductOutOfStock 이벤트일때 ProductChanged 가 발행하지 않도록 하는 구현이 추가가 되어야 합니다. 해당 부분은 직접 구현 하여 보시고, 결과는 아래 링크에서 확인 하실 수 있습니다.