- Published on
Event Sourcing - CQRS 구현 개인적인 정리
- Authors
- Name
- 김민석
Introduction
상황
최근 많은 시스템에서는 데이터 일관성과 복구 가능성
을 보장하기 위해 Event Sourcing
과 CQRS
패턴을 활용하고 있다.
기존 CRUD 방식
은 데이터 변경 이력을 효과적으로 관리하기 어려웠다.
- 과거의 데이터에 대해서 추적하기 어렵다.
- e.g.
로그 시스템
,이력관리 시스템
을 만들어서 사용해야 했다.
- e.g.
읽기와 쓰기의 부하가 동일한 저장소
에 집중됨에 따라 성능 저하 문제가 발생하는 한계가 있었다.
해결하기 위해 Event Sourcing을 도입하여 데이터 변경을 이벤트로 관리
하고, CQRS를 적용해 읽기와 쓰기를 분리
하는 구조를 채택했다. 그리하여 Event Sourcing을 경험할 수 있도록 토이 프로젝트를 통해 개념과 구현을 알아본다.
Event Sourcing의 역할과 장단점
역할
- 상태 변화를 직접 저장하는 것이 아니라, 모든 변경 사항을 이벤트로 기록한다.
장점
- 모든 변경 내역이 기록되므로
버그 추적이 용이
하다. - 객체 지향적인 설계와 궁합이 좋다
- 임피던스 불일치가 발생하지 않는다.
- e.g. Event Sourcing은 이벤트 단위로 저장되므로, 객체의 상태를 RDB에 직접 매핑하는 과정에서 발생하는
ORM 임피던스 불일치
문제가 최소화된다.
- e.g. Event Sourcing은 이벤트 단위로 저장되므로, 객체의 상태를 RDB에 직접 매핑하는 과정에서 발생하는
단점
- 기존 CRUD 방식보다
구현이 복잡
함.- 단순 모델에는 적합하지 않다.
- 이벤트 소싱은 CQRS 라는 개념이 필수다.
- 복합 조회 문제가 있음.
- e.g. Event Sourcing은 이벤트 데이터를 기반으로
상태를 재구성
해야 하므로, 복잡한 조건을 포함한조회 쿼리
를 직접 수행하는 것이 어렵다.
- e.g. Event Sourcing은 이벤트 데이터를 기반으로
CQRS의 역할과 장단점
역할
- Command(쓰기)와 Query(읽기)를 분리하여 서로 다른 최적화 전략을 적용함.
장점
- 읽기와 쓰기의 부하를 분산시켜
성능 최적화가 가능
함. - 읽기 전용 데이터베이스(Projection)를 활용하여
빠른 조회가 가능
함.
단점
복잡성이 증가
- 쓰기와 읽기 데이터 모델이 다르므로
데이터 동기화 전략
이 필요함.- e.g. Command DB와 Query DB가 분리되어 있기 때문에, 비동기 이벤트 처리 과정에서 데이터 정합성을 유지하기 위한 동기화 전략
문제
처음 Event Sourcing과 CQRS를 적용하면서 여러 어려움을 겪었다.
특히 개념적으로는 익숙했지만, 실제 구현 과정에서 예상치 못한 문제들이 발생했다.
이벤트 순서 문제
이벤트를 순차적으로 저장해야 하지만, 비동기 처리 중 이벤트 순서가 꼬이는 문제가 발생했다.
- e.g.
OrderCreatedEvent
가 먼저 저장되어야 하는데,OrderPaidEvent
가 먼저 반영되는 일이 있었다. - 해결하기 위해
각 이벤트에 타임스탬프를 추가하고
,이벤트를 처리할 때 항상 발생 순서를 기준으로 정렬
하도록 수정했다.
CQRS Projection 동기화 문제
CQRS를 적용하면서 Command와 Query 모델을 분리했는데, Projection(조회용 DB)과 Command DB의 데이터가 일시적으로 불일치하는 문제가 발생했다.
- e.g. 주문이 생성된 직후 조회를 하면 주문이 보이지 않는 경우가 있었다.
이벤트가 저장되면 Projection DB에 즉시 반영되도록 이벤트 리스너를 개선
했다.
구현
이벤트 저장소(Event Store) 구현
이벤트 저장소는 모든 상태 변화를 이벤트
로 기록하고 관리하는 역할을 한다.
- 이벤트 엔티티 정의
@Entity
@Table(name = "stored_events")
public class StoredEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(nullable = false, columnDefinition = "TEXT")
private String eventData;
@Column(nullable = false)
private LocalDateTime createdAt = LocalDateTime.now();
}
- 이벤트 저장소 레포지토리
@Repository
public interface EventStoreRepository extends JpaRepository<StoredEvent, Long> {
List<StoredEvent> findByAggregateIdOrderByCreatedAtAsc(String aggregateId);
}
Aggregate 및 도메인 이벤트 적용
이벤트를 적용하는 도메인 모델(Aggregate
)을 생성한다.
- Aggregate 정의
@Entity
public class OrderAggregate {
@Id
private String orderId;
private String status;
public OrderAggregate() {}
public OrderAggregate(String orderId) {
this.orderId = orderId;
this.status = "CREATED";
}
public void apply(OrderCreatedEvent event) {
this.status = "CREATED";
}
public void apply(OrderPaidEvent event) {
this.status = "PAID";
}
}
Command 핸들링 및 이벤트 발행
Command를 통해 이벤트를 생성하고, 저장 후 Projection DB에 반영한다.
- Command 정의
public class CreateOrderCommand {
private String orderId;
}
- Command 핸들러
@Service
public class OrderService {
private final EventStoreRepository eventStoreRepository;
private final ApplicationEventPublisher eventPublisher;
public OrderService(EventStoreRepository eventStoreRepository, ApplicationEventPublisher eventPublisher) {
this.eventStoreRepository = eventStoreRepository;
this.eventPublisher = eventPublisher;
}
@Transactional
public void createOrder(CreateOrderCommand command) {
OrderCreatedEvent event = new OrderCreatedEvent(command.getOrderId());
StoredEvent storedEvent = new StoredEvent(command.getOrderId(), "OrderCreatedEvent", toJson(event));
eventStoreRepository.save(storedEvent);
eventPublisher.publishEvent(event);
}
private String toJson(Object object) {
try {
return new ObjectMapper().writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON 변환 오류", e);
}
}
}
이벤트 리스너 및 Projection 생성
Projection은 CQRS에서 Query 전용 데이터베이스로 활용된다.
- Projection 엔티티
@Entity
public class OrderProjection {
@Id
private String orderId;
private String status;
}
- 이벤트 리스너
@Component
public class OrderEventListener {
private final OrderProjectionRepository projectionRepository;
@Autowired
public OrderEventListener(OrderProjectionRepository projectionRepository) {
this.projectionRepository = projectionRepository;
}
@EventListener
@Transactional
public void onOrderCreated(OrderCreatedEvent event) {
OrderProjection projection = new OrderProjection(event.getOrderId(), "CREATED");
projectionRepository.save(projection);
}
}
Query 모델 (조회용 API) 구현
- Projection 조회 API
@RestController
@RequestMapping("/orders")
public class OrderQueryController {
private final OrderProjectionRepository repository;
public OrderQueryController(OrderProjectionRepository repository) {
this.repository = repository;
}
@GetMapping("/{orderId}")
public ResponseEntity<OrderProjection> getOrder(@PathVariable String orderId) {
return ResponseEntity.ok(repository.findById(orderId).orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND)));
}
}
실행 결과
주문 생성 및 이벤트 저장 확인
/orders
엔드포인트를 통해 새로운 주문을 생성한다.EventStore
테이블에OrderCreatedEvent
가 저장됨을 확인한다.- 이벤트 리스너가 작동하여
OrderProjection
테이블에도 주문 데이터가 반영된다.
EventStore 테이블 상태
id | aggregateId | eventType | eventData | createdAt |
---|---|---|---|---|
1 | 12345 | OrderCreatedEvent | {"orderId": "12345"} | 2025-02-14 10:00:00 |
Projection 테이블 상태
orderId | status |
---|---|
12345 | CREATED |
주문 조회 테스트
/orders/{orderId}
엔드포인트를 호출하여 주문 데이터를 확인한다.- Projection DB에 저장된 데이터가 반환되는지 검증한다.
테스트 요청
GET "http://localhost:8080/orders/12345"
응답 결과
{
"orderId": "12345",
"status": "CREATED"
}
이벤트 발생 및 상태 업데이트 확인
EventStore 테이블 상태 (이벤트 추가됨)
id | aggregateId | eventType | eventData | createdAt |
---|---|---|---|---|
1 | 12345 | OrderCreatedEvent | {"orderId": "12345"} | 2025-02-14 10:00:00 |
2 | 12345 | OrderPaidEvent | {"orderId": "12345"} | 2025-02-14 10:05:00 |
Projection 테이블 상태 (업데이트됨)
orderId | status |
---|---|
12345 | PAID |
시퀀스 다이어그램

해결
이러한 문제를 해결하기 위해 여러 가지 전략을 적용했다.
이벤트 순서 문제
를 해결하기 위해 이벤트 발생 시타임스탬프
를 추가하고, 처리할 때 정렬하는 방식을 적용했다.- 하지만 단순 정렬만으로는 이벤트 순서를
100% 보장
하기 어려웠다. - 따라서,
idempotency(중복 실행 방지) 전략
을 추가하여 같은 이벤트가 중복으로 실행되지 않도록 했다.- e.g. 이미 처리된 이벤트는 다시 처리되지 않도록 데이터베이스에서 체크하는 방식을 도입했다.
- 하지만 단순 정렬만으로는 이벤트 순서를
Projection 동기화 문제
를 해결하기 위해 Projection DB 업데이트 방식을 개선했다.- 기존에는 비동기 방식으로 Projection을 갱신했지만, 이 방식은 일관성이 떨어지는 단점이 있었다.
- 이벤트 리스너의 트랜잭션을 조정하여
Projection DB가 항상 최신 상태를 반영하도록 보장
했다.
- 이벤트 리스너의 트랜잭션을 조정하여
- 또한, 이벤트를 처리할 때 Projection DB를 동기 처리 방식으로 변경하여 조회 시점의 데이터 일관성을 유지하도록 했다.
- 기존에는 비동기 방식으로 Projection을 갱신했지만, 이 방식은 일관성이 떨어지는 단점이 있었다.
복잡한 조회 문제
는 스냅샷 기법을 도입하여 해결했다.- Event Sourcing에서는 과거 상태를 조회하려면 모든 이벤트를 다시 재생해야 하는데, 이 방식은 데이터가 많아질수록 성능이 저하된다.
- 일정 개수의 이벤트가 쌓이면 현재 상태를 저장하는 Snapshot을 생성하도록 했다.
- 이렇게 하면 과거 데이터를 조회할 때 Snapshot에서 바로 불러올 수 있어 성능이 개선되었다.
결말
이번 Event Sourcing과 CQRS를 실제로 구현하고 적용하면서 이벤트 기반 아키텍처의 장점과 어려움을 경험할 수 있었다.
이벤트 기반으로 설계하면서 데이터 변경 이력을 쉽게 추적할 수 있었고, Projection DB를 활용하여 빠른 조회가 가능해졌고, 데이터의 일관성을 유지할 수 있도록 개선할 수 있었다.
하지만 단순한 CRUD 방식보다 구현이 복잡하고, 데이터 동기화 문제를 해결하기 위해 추가적인 고려 사항이 많다는 점도 알게 되었다.
향후
RDB 기반 Event Store를 사용하는 대신, Kafka 또는 Event Log 기반의 확장형 Event Store를 고려할 계획이다.
- 또한, 이벤트 저장소가 커질수록 성능 저하가 발생할 가능성이 있으므로, 이벤트 압축 및 정리(Compaction) 기법을 연구할 필요가 있다.
Event Sourcing과 CQRS가 단순한 개념이 아니라, 실제로 구현하는 과정에서 여러 가지 설계적 고려 사항이 필요함을 깊이 이해할 수 있었다.
그리고 현재 팀 프로젝트를 진행하며, 비즈니스 요구사항과 시스템 확장성을 고려하여 적절한 수준에서 도입하는 것이 중요하다는 것을 깨달았다.