Projects/[Spring] Coffee Shop Project

Spring 내부 이벤트와 Kafka 이벤트 처리, 무엇이 다를까?

montmer27 2026. 4. 4. 10:35

상황

결제 확정 이벤트가 발생했을 때 인기 메뉴 랭킹 업데이트와 데이터 수집 플랫폼 전송을 처리해야 했다. 이벤트 처리 방식으로 Spring 내부 이벤트와 Kafka 두 가지를 고려했다. 프로젝트 요구사항에는 다중 서버 환경에서 다수의 인스턴스로 동작하더라도 기능에 문제가 없어야 한다는 조건이 있었다.

Spring 내부 이벤트란

Spring이 제공하는 ApplicationEvent 기반의 이벤트 시스템이다. 같은 JVM 안에서 이벤트를 발행하고 리스너가 처리하는 구조로, 별도 인프라 없이 사용할 수 있다.

장점은 구현이 단순하고 트랜잭션 참여가 가능하다는 것이다. 단점은 같은 JVM 안에서만 동작한다는 것이다.

다중 서버 환경에서 Spring 내부 이벤트의 한계

Spring 내부 이벤트는 이벤트를 발행한 인스턴스에서만 처리가 일어난다. 다중 서버 환경에서는 나머지 인스턴스가 이벤트를 받지 못한다.

서버 A에서 결제가 완료되면 서버 A의 리스너만 이벤트를 처리하고, 서버 B와 C는 이벤트를 받지 못한다. 인기 메뉴 랭킹 업데이트나 데이터 수집 플랫폼 전송이 일부 인스턴스에서만 동작하는 상황이 생긴다.

Kafka를 선택한 이유

Kafka는 브로커가 이벤트를 중앙에서 관리한다. 어느 인스턴스에서 이벤트를 발행해도 Consumer Group이 브로커에서 이벤트를 가져가 처리하므로, 다중 서버 환경에서도 항상 처리가 보장된다.

서버 A, B, C 중 어느 서버에서 결제가 완료되든 Kafka 브로커에 이벤트가 발행되고, Consumer Group이 이를 가져가 처리한다.

또한 이벤트가 브로커에 저장되기 때문에 앱이 죽어도 유실되지 않고, Consumer가 재기동 후 다시 처리할 수 있다.

결제 확정 이벤트 하나에 여러 처리가 필요한 팬아웃 구조도 Consumer Group을 분리하는 방식으로 자연스럽게 지원한다.

트레이드오프

Kafka는 별도 인프라가 필요하고 운영 복잡도가 올라간다는 단점이 있다. 이벤트 처리 실패 시 재처리 전략도 직접 구현해야 한다. 단순한 단일 서버 환경이라면 Spring 내부 이벤트가 더 적합한 선택일 수 있다.

이벤트 발행 시점

트랜잭션 커밋 이후에 이벤트를 발행한다. 트랜잭션 안에서 발행하면 DB 롤백 시 이벤트가 이미 나간 상태가 되어 데이터 불일치가 발생한다. TransactionSynchronizationManager의 afterCommit() 콜백을 사용해 커밋 완료 후에만 이벤트가 발행되도록 보장했다.

package com.example.gigacoffee.domain.point.service;

import com.example.gigacoffee.common.exception.BusinessException;
import com.example.gigacoffee.common.exception.ErrorCode;
import com.example.gigacoffee.domain.order.entity.Order;
import com.example.gigacoffee.domain.order.enums.OrderStatus;
import com.example.gigacoffee.domain.order.repository.OrderRepository;
import com.example.gigacoffee.domain.point.dto.PointPaymentRequest;
import com.example.gigacoffee.domain.point.dto.PointPaymentResponse;
import com.example.gigacoffee.domain.point.entity.PointPayment;
import com.example.gigacoffee.domain.point.entity.UserPoint;
import com.example.gigacoffee.domain.point.repository.PointPaymentRepository;
import com.example.gigacoffee.domain.point.repository.UserPointRepository;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class PointService {

    private final UserPointRepository userPointRepository;
    private final PointPaymentRepository pointPaymentRepository;
    private final OrderRepository orderRepository;
    private final RedissonClient redissonClient;
    private final PaymentEventProducer paymentEventProducer;

    @Transactional
    public PointPaymentResponse payment(Long userId, PointPaymentRequest request) {
        RLock lock = redissonClient.getLock("point:lock:" + userId);

        try {
            if (!lock.tryLock(3, 3, TimeUnit.SECONDS)) {
                throw new BusinessException(ErrorCode.LOCK_ACQUISITION_FAILED);
            }

            // 1. 주문 조회
            Order order = orderRepository.findById(request.getOrderId())
                    .orElseThrow(() -> new BusinessException(ErrorCode.ORDER_NOT_FOUND));

            // 2. 주문 상태 검증
            if (order.getOrderStatus() != OrderStatus.PENDING) {
                throw new BusinessException(ErrorCode.ORDER_NOT_CANCELLABLE);
            }

            // 3. 포인트 잔액 검증 및 차감
            UserPoint userPoint = userPointRepository.findByUserId(userId)
                    .orElseThrow(() -> new BusinessException(ErrorCode.POINT_NOT_FOUND));

            userPoint.deduct(order.getTotalPrice());

            // 4. 결제 이력 저장
            PointPayment pointPayment = PointPayment.create(
                    userId,
                    order.getId(),
                    order.getTotalPrice()
            );
            pointPaymentRepository.save(pointPayment);

            // 5. 주문 상태 변경
            order.complete();

            // 6. 트랜잭션 커밋 후 Kafka 이벤트 발행
            List<Long> menuIds = order.getOrderMenus().stream()
                    .map(orderMenu -> orderMenu.getMenu().getId())
                    .toList();

            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                        @Override
                        public void afterCommit() {
                            paymentEventProducer.sendPaymentConfirmed(
                                    new PaymentConfirmedEvent(userId, menuIds, order.getTotalPrice())
                            );
                        }
                    }
            );

            return PointPaymentResponse.of(userPoint, order.getTotalPrice());

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new BusinessException(ErrorCode.LOCK_ACQUISITION_FAILED);
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}