부하 테스트 후, 랜덤한 시점에서 post 요청에 완전히 타임아웃이 나는 문제의 원인을 분석하고 나름대로 해결을 시도해본 결과입니다.
선요약 : 특정 이벤트가 발생하면 알림이 발송되는 로직을 ApplicationEventPublisher를 사용해 횡단적으로 구성했습니다. 이벤트 리스너는 새로운 트랜잭션을 여는 옵션이었기 때문에, 하나의 스레드에서 두 개 이상의 DB 커넥션을 요구하면서 커넥션 데드락이 발생했습니다. 두 번째 DB 커넥션이 필요한 로직을 메세지 큐를 이용한 비동기 처리로 넘겨 문제를 해결했습니다.
# 원했던 기능 : 글을 작성하면 팔로워에게 알림을 보내자.
유저가 특정 글을 작성한 뒤 post 요청을 보내면 해당 유저를 팔로우하고 있는 유저 전부에게 Notification을 생성하고 싶었다. 이를 위해 스프링에서 제공하는 ApplicationEventPublisher을 이용했다.
다음은 초초 간단하게 구성한 post요청에 실행되는 서비스 코드이다.
@Service
@RequiredArgsConstructor
public class RecipeService {
private final RecipeRepository recipeRepository;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public Recipe createRecipe(Recipe recipe) {
Recipe saved = recipeRepository.save(recipe);
// send notification (@TransactionalEventListener이므로 커밋 후에 실행됨)
eventPublisher.publishEvent(new CreateRecipeEvent(saved));
return saved;
}
}
eventPublisher에 CreateRecipeEvent라는 이벤트를 태워 방금 repository에 저장된 recipe 엔티티를 보내는 것이다. eventPublisher은 아래 event listener를 호출한다.
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleCreateRecipeEvent(CreateRecipeEvent createRecipeEvent) {
// recipe를 받고, 레시피 작성자의 팔로워들에게 알림을 보내는 로직
}
ApplicationEventPublisher은 인자로 전달된 event object를 이벤트 리스너 메소드에게 넘기고, 이벤트 리스너에서 필요한 로직을 수행하는 이같은 방식으로 어플리케이션 기능간 결합도를 낮춰줄 수 있다.
# @TransactionalEventListener + @Transactional(REQUIRES_NEW)
.. 을 사용한 이유는, 이벤트 리스너가 repository에 엔티티를 저장하는 작업을 하기 때문에 트랜잭션이 필요하기 때문이다.
event listener 메소드에서 예외가 발생하더라도 기존의 createRecipe() 메소드까지 롤백되게 않게 하고 싶다. 알림을 저장하는 행위는 필요한 행위이지만, 레시피를 저장하는 것에 비해선 덜 중요하며 엄밀히 말하면 횡단 관심사로 생각할 수 있기 때문이다(그렇기 때문에 이벤트로 태운거지만). 아무런 옵션 없이 @TransactionalEventListener를 사용하면, 이벤트 리스너를 호출한 기존 Transactional 메소드(예시의 경우, `createRecipe()` 메소드)가 커밋된 후에 이벤트 리스너가 호출된다. 요컨데 이벤트 리스너 메소드인 handleCreateRecipeEvent()에서 예외가 발생하여 롤백되더라도, 이미 커밋된 기존 메소드는 롤백되지 않는 것이다.
그러나 @TransactionalEventListener만 붙이면 기존의 트랜잭션 안에서 읽기 연산만 가능하고 새로 엔티티를 save하지 못한다. 기존 트랜잭션은 이미 커밋된 후이기 때문이다. Notification 엔티티를 저장하기 위해 새로운 트랜잭션이 필요하기 때문에, @Transactional(propagation = Propagation.REQUIRES_NEW) 어노테이션을 붙여 이벤트 리스너 메소드에 새로운 트랜잭션을 열어 엔티티를 save할 수 있도록 구성했다.
.. 여기까지가 프로젝트로 구성한 아키텍처이다. 잘 동작하는 것처럼 보였으나, 부하 상황에서 문제가 발생했다!
# Deadlock : DB 커넥션을 얻지 못함!!
부하 테스트 글에서 볼 수 있듯, 많은 post 요청이 일어나면 랜덤한 시간에 갑자기 connection timeout이 일어나는 상황이 발생했다. DB 서버에서 충분히 감당할 수 있는 트래픽이었는데 DB 커넥션을 무려 30초나 얻지 못한 것이다(Hikari CP에서 timeout 기본 시간은 30초다). post 트래픽을 1000건 이상 준 것도 아니고 대체 뭐가 문제였을까?
범인은 @Transactional(propagation = Propagation.REQUIRES_NEW)에 있었다.
DB서버와의 연결을 위해 WAS 스레드는 hikariCP에 커넥션을 요청한다. hikariCP는 가용한 커넥션을 확인하고, 이용 가능한 커넥션이 있으면 스레드에게 커넥션을 할당, 없으면 가용한 커넥션이 생길 때까지 스레드를 대기시킨다. DB 커넥션을 얻기 위해 대기 상태에 들어간 스레드는 timeout(default 30초)이 지나면 ConnectionTimeOut 예외를 던지게 된다. 부하 테스트에서 발생한 상황이 바로 이것이다.
사용자가 레시피를 post할 때 트랜잭션은 다음의 두 개이다.
- 레시피 엔티티 자체를 save하기 위한 트랜잭션
- 레시피를 작성한 사용자의 팔로워들에게 알림을 보내기 위한 트랜잭션
그리고 앞선 이미지에서 설명했듯, 각 트랜잭션은 별도의 커넥션을 필요로 한다. 결과적으으로 말하면 post를 요청하는 스레드 1개에 2개의 DB 커넥션이 필요한 것이다.
HikariCP의 기본 커넥션 스레드 수는 10개인데, 만약 동시에 10개의 post 요청이 들어왔다고 생각해보자.
- 1~10번 요청이 각각 1~10번 DB 커넥션을 가져간다.
- Recipe 엔티티가 save되고, commit이 일어난다.
- 1~10번 요청의 event listener가 호출되고, 새로운 트랜잭션이 요구된다.
- 1~10번 요청이 기존의 커넥션을 유지한 채 새로운 DB 커넥션을 요구한다.
- HikariCP : 어.. 저희 커넥션 다 떨어졌는데요. 기다리시죠.
- 1~10번 요청은 자신들이 가진 커넥션을 그대로 들고있는 채로, 알림을 저장하는 새로운 트랜잭션을 시작하기 위해 기다린다.
- 그리고 timeout.
데드락은 보통 상호 배제의 특성을 가지고 있는 자원에 대해, 각 스레드 혹은 프로세스들이 자원을 점유하고 무한 대기를 하는 상황에서 발생한다. 위의 상황과 같다.
위의 그림에서 2번은 어쩌다 운이 좋게 커넥션을 두 개 모두 얻어 요청 처리에 성공하지만, 커넥션 하나를 가진 채 두 번째 커넥션을 기다리고 있는 나머지 대부분의 스레드들은 무한 대기 상태에 빠지다 결국 timeout이 날 것이다.
# 해결방법 : 알림 생성을 비동기적으로 처리
결국 하나의 스레드가 하나의 커넥션만을 필요로 하면 되는 일이다. 때문에 알림을 저장하는 로직을 메세지로 처리하기로 했다. Kakfa 메세지 큐를 이용했다. 지금 포스트는 Kafka에 대한 기본 사용방법을 익히는 포스트가 아니므로, 어떻게 템플릿을 사용했는지 간단하게 코드로 나타내보겠다.
일단 kafka를 통해 직렬화되어 전송될 메세지 DTO를 선언했다!
/**
* kafka로 전송될 Serializable messsage dto
*/
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
private Long senderId;
private Long receiverId;
private NotificationType notificationType;
private String path;
// 메세지가 전송될 kafka topic
@JsonIgnore
private String topic;
}
알림은 1. 팔로워 전체에게 보낼 메세지, 2. 특정 수신자가 존재하는 메세지 로 나뉘기 때문에 topic 필드를 추가했다.
그 뒤 카프카 템플릿을 이용해 메세지를 보내는 MessageSender 컴포넌트를 만들었다.
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaNotificationMessageSender implements NotificationMessageSender {
private final KafkaTemplate<String, NotificationMessage> kafkaTemplate;
@Override
public void sendNotificationMessage(NotificationMessage message) {
kafkaTemplate.send(message.getTopic(), message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.warn("message sent failed!", ex);
}});
}
}
그리고 실제로 이벤트 리스너에서 sendNotificationMessage()를 호출할 수 있도록 구성으로 messageSender를 두고 NotifiacationMessage를 빌드한 뒤 넘길 수 있도록 코드를 재작성했다.
@Service
@Slf4j
@RequiredArgsConstructor
public class RecipetoryEventListener {
private final NotificationMessageSender notificationMessageSender;
/**
* 팔로워들에게 레시피 작성을 알린다.
* @param createRecipeEvent
*/
@TransactionalEventListener
@Transactional(readOnly = true)
public void handleCreateRecipeEvent(CreateRecipeEvent createRecipeEvent) {
// 생성된 레시피, 알림 종류 정보 구성
Recipe createdRecipe = createRecipeEvent.getRecipe();
User author = createdRecipe.getAuthor();
// logging
log.info("{} created recipe {}", author.getId(), createdRecipe.getId());
// build kafka message
NotificationMessage message = NotificationMessage.builder()
.notificationType(NotificationType.NEW_RECIPE)
.topic(KafkaTopic.FOLLOWER_NOTIFICATION)
.path(NotificationType.NEW_RECIPE.getDefaultPath(author.getId()))
.senderId(author.getId()).build();
notificationMessageSender.sendNotificationMessage(message);
}
}
handleCreateRecipe()메소드에 @Async 어노테이션을 붙였다. 메세지 전송 로직을 조금 더 메인 로직과 조금 더 분리시켜, 이벤트 리스너 자체를 별도의 스레드에서 실행시키기 위함이다.
마지막으로 kakfa message listener 메소드를 작성했다.
@KafkaListener(topics = KafkaTopic.NOTIFICATION,
containerFactory = "notificationKafkaListenerContainerFactory",
groupId = "${spring.kafka.group-id}")
@Transactional
public void saveNotification(NotificationMessage notificationMessage) {
NotificationType type = notificationMessage.getNotificationType();
User sender = getUserById(notificationMessage.getSenderId());
User receiver = getUserById(notificationMessage.getReceiverId());
Notification notification = Notification.builder()
.sender(sender).receiver(receiver).notificationType(type)
.path(notificationMessage.getPath())
.message(type.getDefaultMessage(sender)).build();
notificationRepository.save(notification);
}
역직렬화된 NotificationMessage 인스턴스에 따라 repository에 알림 엔티티를 저장하는 메소드이다.
# 부하 해결!
여전히 커다란 부하엔 서버가 죽지만, 이전에 있었던 랜덤한 시간의 timeout 현상은 관찰되지 않았다.
기존과 동일하게 부하를 점점 늘렸던 테스트는 전부 성공했고,
response time도 한두개가 갑자기 틱 하고 자기혼자 10,000ms로 튀는 것이 아닌.. timeout 날 때에 적절히 느려지는 양상을 보였다.
참고로 기존에 random time에 timeout이 났던 테스트 결과는 아래와 같다.