트러블 슈팅
문제 상황
안정적인 사용자 경험을 위해 알림 서비스에 서킷 브레이커를 도입하는 과정이었다. 카프카의 에러 핸들러가 예외를 잡아 서킷브레이커까지 예외가 도달하지 못해 제대로 작동을 하지 않았다.
KafkaException: Seek to current after exception
kafka.listener.DefaultErrorHandler
KafkaMessageListenerContainer: Error handler threw an exception
원인
서킷 브레이커를 어노테이션 기반으로 적용하는 경우, Resilience4j가 AOP를 통해 메서드 호출을 가로채 상태관리와 예외 처리를 수행한다. 하지만 Kafka 에러 리스너는 Kafka 컨슈머가 메시지를 처리 중 발생한 예외를 직접 처리한다. 리스너 레벨에서 이미 예외를 소모하거나 처리 후 재시도 로직을 적용하기 때문에 우선순위가 더 높게 작동한다.
Kafka의 에러 처리 흐름
KafkaListener는 기본적으로 DefaultErrorHandler
를 통해 예외를 처리하며, 메시지를 다시 리스너로 재전송하거나 재시도/백오프를 수행한다. 이 단계에서 서킷 브레이커로 전달되지 않은 예외는 AOP 관점에서 처리될 수 없게 된다.
AOP 기반 적용의 한계
AOP 기반 서킷 브레이커는 메서드 호출 완료 시점에 작동하지만, Kafka 리스너의 예외 처리 로직은 메서드 호출이 완료되기 전에 실행된다. 리스너 내부적으로 예외가 이미 처리되거나 삼켜지면 서킷 브레이커는 예외를 감지하지 못한다.
Kafka 리스너의 높은 우선순위
KafkaListener는 메시지 소비와 오류 처리를 독립적으로 수행하기 때문에, 서킷 브레이커보다 더 빠르게 메시지 처리 흐름을 제어한다.
최종 해결
서킷 브레이커는 카프카 리스너에게 잡히기 전에 작동해야하므로 AOP 보다 앞서서 작동되어야 한다. 어노테이션이 아닌 오퍼레이션으로 슬랙 메시지 발송의 성공/실패를 제어하고, 실패시 이메일을 발송하는 fallback 메서드를 호출하도록 직접 구현했다. graceful degradation 방식으로 장애 대응을 할 수 있게 됐다.
1. Kafka 리스너 컨테이너 제어
KafkaListenerEndpointRegistry
를 주입받아 모든 Kafka 리스너 컨테이너를 중지(pause)하거나 재개(resume)할 수 있는 메서드를 제공하도록 Kafka 리스너 컨테이너를 제어하는 유틸 클래스를 만들어 주었다.
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
@Autowired
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
log.info("Pause all Kafka listener containers");
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
log.info("Resume all Kafka listener containers");
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
2. 서킷 브레이커와 Kafka 연동
서킷브레이커의 상태가 전환될때 리스너 컨테이너를 먼저 중단하여 메시지 소비를 일시 중지하고 처리가 완료된 후 다시 컨테이너를 재개하여 안정적으로 서킷의 상태를 전환한다.
public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("pushNotificationToSlack");
circuitBreaker.getEventPublisher().onStateTransition(event -> {
CircuitBreaker.StateTransition transition = event.getStateTransition();
log.info("CircuitBreaker has changed status: {}", transition);
switch (transition) {
case CLOSED_TO_OPEN:
case CLOSED_TO_FORCED_OPEN:
case HALF_OPEN_TO_OPEN:
kafkaManager.pause();
break;
case OPEN_TO_HALF_OPEN:
case HALF_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_HALF_OPEN:
kafkaManager.resume();
break;
default:
log.warn("CircuitBreaker changed to unknown state: {}", transition);
}
});
}
3. 서킷 브레이커 적용 및 Fallback 처리
서킷 브레이커의 실행 블록 안에 슬랙 메시지 발송 메서드를 호출하여 서킷브레이커가 적용될 수 있게하고, 실패 시 이메일 발송 메서드를 호출하도록 구성한다.
@Transactional
public void pushNotificationToSlack(UUID id, NotificationDetails details) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("pushNotificationToSlack");
log.info("Circuit status: {}", circuitBreaker.getState());
updateNotificationStateFailed(id);
try {
circuitBreaker.executeRunnable(() -> {
messagingService.sendDirectMessage(
details.getUserSlack(),
details.getMessageTitle().concat(
details.getMessageContent()
)
);
log.info("Successfully sent a Slack message");
});
} catch (Exception e) {
log.error("Slack server has a problem", e);
pushNotificationToMail(details, e); // fallback 메소드
}
updateNotificationStateSent(id);
}
4. 메시지 수동 커밋
서킷브레이커를 거친 후 Kafka 메시지를 수동으로 커밋하여 메시지가 재처리 되지 않도록 한다.
@KafkaListener(
topics = {
"queue.registered",
"queue.delayed",
"queue.canceled",
"queue.remind",
"queue.alerted",
"queue.rush"
})
public void handleQueueEvent(
@Header("kafka_receivedTopic") String topic,
Acknowledgment acknowledgment,
String message
) {
NotificationDetails details = getNotificationDetails(
NotificationChannel.SLACK, message, topic
);
try {
eventService.processNotification(details);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error processing kafka message. Retry or fallback processing.", e);
}
}
5. Resilience4j OPEN 설정
실패 횟수가 임계치를 초과하면 서킷의 상태가 OPEN으로 변경된다.
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowType: COUNT_BASED # 알고리즘 - 타입
slidingWindowSize: 10 # 알고리즘 - 범위, 최근 n회 기준
failureRateThreshold: 20 # 실패 - 임계 값 퍼센트
6. Resilience4j HALF-OPEN 설정
resilience4j:
circuitbreaker:
configs:
default:
permittedNumberOfCallsInHalfOpenState: 3 # Half-open 상태에서 최대 호출 수