[CQRS] CQRS 어떻게 사용할까?
어떻게 사용할까?
앞선 글에서 다루었듯, CQRS & Event Sourcing의 구조는 아래와 같았습니다.
이러한 구조로 이루어진 아주 간단한 애플리케이션을 작성해 보도록 하겠습니다. 구현을 위한 환경은 아래와 같습니다.
- Web Application - Spring Boot
- Database - MongoDB
- Event Broker - Kafka
게시물을 삽입하기 위한 Board Command Service와 게시물 조회를 위한 Board Query Service, 두 가지 프로젝트를 만들겠습니다. (매우 간단한 예제이므로 Command Event Subscriber는 별도로 서비스를 만들지 않고 Query Service에서 함께 처리하도록 하겠습니다. )
먼저 Board Command Application입니다. 프로젝트 구성은 아래와 같습니다.
사용자가 게시물을 작성하였을 때, Rest방식으로 데이터를 받아오게 됩니다.
@RestController
public class BoardCommandController {
private final BoardCommandService boardCommandService;
@Autowired
public BoardCommandController(BoardCommandService boardCommandService) {
this.boardCommandService = boardCommandService;
}
@PostMapping("/board")
public ResponseEntity<Board> insert(Board board){
return ResponseEntity.ok(boardCommandService.insert(board));
}
}
실질적으로 비즈니스 로직 처리는 BoardCommandService에서 이루어지게 됩니다. MongoDB에 board 객체를 저장하고, 저장한 객체를 Board라는 Topic으로 Kafka로 전송하는 작업을 수행합니다.
@Service
public class BoardCommandService {
private final BoardCommandRepository boardCommandRepo;
private final KafkaTemplate<String, Board> kafkaTemplate;
@Autowired
public BoardCommandService(BoardCommandRepository boardCommandRepo, KafkaTemplate<String, Board> kafkaTemplate) {
this.boardCommandRepo = boardCommandRepo;
this.kafkaTemplate = kafkaTemplate;
}
public Board insert(Board board) {
Board insertedBoard = boardCommandRepo.save(board);
kafkaTemplate.send("board", insertedBoard);
return insertedBoard;
}
}
참고. Spring Boot에서 Kafka를 사용할 때에는 application.yml의 설정만으로도 기본적인 기능을 수행할 수 있습니다. 설정에 대한 자세한 내용은 GitLab을 참고하시기 바랍니다.
별도로 프론트단을 만들지 않고 Postman을 이용하여 요청을 처리하도록 하겠습니다.
Command Service가 제대로 동작하는 것을 확인할 수 있습니다. 그럼 다음으로 Board Query Service를 작성하도록 하겠습니다. 프로젝트 구성은 아래와 같습니다.
사용자가 게시물을 조회하였을 때, Query DB에 저장된 데이터를 읽어 들입니다. 여기에서 사용한 IteratorUtils는 아파치의 Common Library API를 이용한 것으로, Spring Data MongoDB에서 제공하는 findAll 메서드가 Iterable을 반환하므로 사용한 API입니다.
@RestController
public class BoardQueryController {
private final BoardQueryService boardQueryService;
@Autowired
public BoardQueryController(BoardQueryService boardQueryService) {
this.boardQueryService = boardQueryService;
}
@GetMapping("/board")
public ResponseEntity<List<Board>> findAll(){
List<Board> results = IteratorUtils.toList(boardQueryService.findAll().iterator());
return ResponseEntity.ok(results);
}
}
Query Service의 Service단은 단순히 Query를 위한 DB에서 데이터를 읽어오는 역할만 수행합니다.
@Service
public class BoardQueryService {
private final BoardQueryRepository boardQueryRepo;
@Autowired
public BoardQueryService(BoardQueryRepository boardQueryRepo) {
this.boardQueryRepo = boardQueryRepo;
}
public Iterable<Board> findAll() {
return boardQueryRepo.findAll();
}
}
Board Command Service에서 Event Broker인 Kafka로 전송한 데이터는 EventBroker Component에서 받게 됩니다.
@Component
public class BoardBroker {
private final BoardQueryRepository boardQueryRepo;
@Autowired
public BoardBroker(BoardQueryRepository boardQueryRepo) {
this.boardQueryRepo = boardQueryRepo;
}
@KafkaListener(topics="board", groupId="cqrs")
public void subscribeAndUpdate(Board board) {
boardQueryRepo.save(board);
}
}
게시물을 작성함과 동시에 Event Broker로 작성된 데이터가 전송되며, Query Service가 이를 구독하고 있다가 자신의 DB에 전송된 데이터를 저장합니다.
이와 같은 방식으로 Command의 DB와 Query의 DB의 (약간의 시간차는 존재할 수 있겠으나) 데이터가 동기화되게 됩니다.
[references]
event-sourcing-microservices-example
Event Sourcing with Kafka, RabbitMQ and JPA