이어서 Kafka에서 메시지를 받아 소비하는 Consumer 설정을 해보도록 하자.
1. 처리량 설정
Consumer의 처리량과 관련된 주요 옵션은 다음과 같다.
- fetch.min.bytes: 한 번에 가져올 최소 데이터 크기.
값이 커지면 처리량이 증가하지만 대기시간이 늘어난다. - fetch.max.wait.ms: 데이터가 최소 크기(fetch.min.bytes)가 될 때까지 기다려야 되는 시간.
마찬가지로 값이 커지면 처리량이 증가하지만 대기시간이 늘어난다. - max.poll.records: 단일 poll() 호출당 최대 레코드 수.
값이 클수록 높은 처리량을 가지지만 메모리 사용량이 증가한다. - fetch.max.bytes: 한 번에 가져올 수 있는 최대 데이터 크기.
값이 커지면 처리량이 증가하지만 메모리 사용량 또한 증가한다. - concurrency: 여러 개의 파티션을 동시에 처리하기 위해 사용하는 리스너 스레드의 수.
이 값은 파티션 수와 동일하거나 작게 설정하는 것을 권장한다.
2. 초기 오프셋 읽기 위치 설정
Offset은 Consumer가 마지막으로 처리한 메시지의 위치를 나타낸다. Consumer가 poll()을 통해 메시지를 가져올땐 커밋한 오프셋 이후의 메시지들을 가져와서 읽게 된다.
이때 Consumer가 처음 접근하거나 커밋한 오프셋이 없는 경우 처음 읽게 되는 설정을 할 수 있다.
- earliest - 맨 처음 오프셋 사용
- latest - 가장 마지막 오프셋 사용 (default)
- none - 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생 (일반적으로 사용하지 않음)
3. 오프셋 커밋 설정
오프셋의 커밋 방식은 자동 커밋과 수동 커밋으로 나뉘게 된다.
3.1. 자동 커밋
자동 커밋은 Consumer가 메시지를 읽은 후 일정한 주기마다 커밋을 자동으로 오프셋을 커밋하는 방식이다.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
개발자가 별도로 커밋 코드를 작성할 필요가 없어 간편하지만 메시지 유실의 위험이 있다.
예를 들어 Consumer가 메시지를 읽고 처리하기 이전에 장애가 생기게 된다면, 이후에 poll() 시에는 이후의 메시지부터 받게 되어 해당 메시지를 유실하게 된다. 뿐만 아니라 Cosumer가 메시지를 읽고 처리하는 도중에 장애가 발생하여 자동 커밋이 이루어지지 않았다면, 이후에 poll() 시에는 같은 메시지를 받아와 중복으로 처리하게 될 수도 있다.
3.2. 수동 커밋
수동 커밋은 Consumer가 메시지를 처리한 후 명시적으로 오프셋을 커밋하는 방식이다. 메시지의 처리 이후에만 커밋이 이루어지므로 자동 커밋 방식보다 메시지의 안전성이 보장된다.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
수동 커밋 코드 예시
@KafkaListener를 통해 메시지를 소비하는 경우 다음과 같이 메시지의 처리가 완료되면 acknowledge() 메서드를 호출하여 오프셋 커밋을 완료할 수 있다.
@KafkaListener(topics = "payment-completed", groupId = "order-group")
public void handlePaymentCompletedEvent(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
EventLogger.logReceivedMessage(record);
PaymentCompletedEvent event = EventSerializer.deserialize(record.value(), PaymentCompletedEvent.class);
orderService.updateOrderStatus(event.orderId());
acknowledgment.acknowledge();
}
하지만 수동 커밋 또한 여전히 메시지 중복이 발생할 수 있는 여지가 존재한다.
예를 들어 Consumer가 메시지를 읽고 처리한 이후 커밋하기 직전에 어플리케이션의 장애가 발생하게 된다면, 다시 같은 메시지를 읽어오게 된다. 따라서 Consumer의 중복 처리를 따로 해줘야 한다. 대신 이때는 메시지를 중복으로 읽어오게 되더라도 한 번만 처리되게 멱등성을 보장할 수 있게 설계해야 한다.
4. 재시도 & DLQ 설정
Consumer는 메시지의 처리 과정에서 예외가 발생하면 재시도를 하게 된다.기본값으로 지정된 재시도 횟수는 10회이다. 추가로 재시도가 모두 실패로 끝나게 될 경우에 대한 처리 또한 필요하다.
4.1. DLQ
DQL(Dead Letter Queue)는 이러한 재시도가 모두 실패가 되었을 때, 해당 메시지를 별도로 관리하는 큐이다.
DLQ로 전송된 메시지를 통해 실패 원인을 분석하거나 적절하게 후처리할 수 있다.
4.2. DefaultErrorHandler
@Bean
public DefaultErrorHandler dlqErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
DeadLetterPublishingRecoverer dlqRecover = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, e) -> {
EventLogger.logMessageConsumeError((ConsumerRecord<String, String>) record, getOriginalException(e));
return new TopicPartition(record.topic() + ".dlq", record.partition());
});
return new DefaultErrorHandler(dlqRecover, new FixedBackOff(1000L, 3L));
}
해당 설정을 통해 다음과 같이 메시지의 재시도 3회가 모두 실패한 경우엔 메시지의 토픽에 ".dlq"라는 값이 추가된 새로운 토픽으로 메시지를 전송하게 된다.

5. KafkaConsumerConfig
완성된 설정은 다음과 같다.
Producer와 마찬가지로 처리량에 관한 설정은 default로 두었고, 정확한 메시지 처리를 보장하기 위해 수동 커밋 설정을 하였다. 또한, 메시지의 실패 처리를 위해 재시도 횟수와 DLQ 설정을 해주었다.
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 초기 오프셋 읽기 위치 설정
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 수동 커밋
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(DefaultErrorHandler dlqErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(dlqErrorHandler);
return factory;
}
@Bean
public DefaultErrorHandler dlqErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
DeadLetterPublishingRecoverer dlqRecover = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, e) -> {
EventLogger.logMessageConsumeError((ConsumerRecord<String, String>) record, getOriginalException(e));
return new TopicPartition(record.topic() + ".dlq", record.partition());
});
return new DefaultErrorHandler(dlqRecover, new FixedBackOff(1000L, 3L));
}
private static Exception getOriginalException(Exception e) {
Throwable cause = e.getCause();
return (cause instanceof Exception) ? (Exception) cause : new Exception(cause);
}
}
'Project > 티켓핑' 카테고리의 다른 글
| Kafka Cluster 구축하기 (1) | 2025.06.08 |
|---|---|
| Kafka Producer 설정하기 (0) | 2025.06.08 |
| Redis Cluster 구축하기 (0) | 2025.06.03 |
| WebFlux 전환하기 (0) | 2025.05.12 |
| 대기열 진입 동시성 문제 해결하기 (0) | 2025.05.12 |
