어플리케이션에서 Kafka로 메시지를 발행하는 Producer 설정을 해보도록 하자.
1. 처리량 설정
Kafka의 Producer는 다음과 같이 Serializer를 사용해서 레코드를 byte 배열로 변환한 후 배치로 버퍼에 저장한다. 이후 Sender는 별도의 쓰레드를 통해 배치가 꽉 차거나 일정 시간이 지나면 브로커로 버퍼를 전달한다.

이때 다음의 설정들이 처리량에 영향을 준다.
- batch.size: 배치의 크기. 다 차게 되면 전송한다.
- linger.ms: 전송 대기 시간. 설정한 시간만큼 배치가 차기를 기다렸다 전송한다.
2. ACK 설정
ACK(Acknowledgment)는 메시지 전송 후 브로커로부터 받는 성공 응답을 의미한다. 설정을 통해 어떤 방식으로 응답을 받을지 정할 수 있다.
2.1. acks = 0
1. 프로듀서 → 브로커: 메시지 전송
2. 프로듀서: ACK를 기다리지 않고 다음 작업 수행
전송 속도가 가장 빠르지만, 메시지가 손실될 위험이 있다.
2.2. acks = 1
1. 프로듀서 → 브로커(리더): 메시지 전송
2. 브로커(리더) → 프로듀서: ACK 응답 (리더에 저장 성공)
리더 브로커만 확인하므로 상대적으로 빠르지만, 리더가 실패할 경우 메시지가 손실될 수 있다.
2.3. acks = all
1. 프로듀서 → 브로커(리더): 메시지 전송
2. 브로커(리더) → 브로커(팔로워들): 메시지 복제
3. 브로커(팔로워들) → 브로커(리더): 복제 완료
4. 브로커(리더) → 프로듀서: ACK 응답 (모든 복제본 저장 성공)
가장 높은 데이터 내구성을 보장하지만, 모든 복제본이 성공적으로 저장되어야 하므로 속도가 느려진다.
3. 멱등성 설정
멱등성은 특정 연산을 여러 번 수행해도 결과가 동일하게 유지되는 성질을 의미한다. 따라서 멱등성 활성화를 하면 카프카에 같은 메시지를 여러 번 보내게 되더라도 1회만 처리하게 할 수 있다.
enable.idempotence를 true로 설정함으로써 멱등성 활성화를 할 수 있다.
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
3.1. 순서 보장
기본적으로 메시지의 순서를 보장하려면 max.in.flight.requests.per.connection를 1로 설정하여 한 번에 하나의 요청만 처리되게 해야한다. 하지만 이렇게 되면 순서는 보장되지만 성능이 저하된다는 문제가 있다.
해당 자료에 의하면 멱등성 활성화를 하면 max.in.flight.requests.per.connection를 5까지 설정하여도 메시지 순서를 보장할 수 있다고 한다.
https://learn.conduktor.io/kafka/kafka-producer-retries/
어떻게 가능한걸까? 멱등성 활성화를 하게되면 각 메시지에 시퀀스 번호를 부여하고, 브로커는 이 번호를 확인하여 순서가 맞지 않는 메시지를 거부하게된다. 메시지가 어떻게 처리되는지 예시로 확인해보자.
시나리오
메시지: M1(seq=1), M2(seq=2), M3(seq=3)
정상 케이스
1. M1, M2, M3 동시 전송
2. 모두 성공
→ 순서: M1 → M2 → M3
실패 케이스
1. M1, M2, M3 동시 전송
2. M1 실패, M2/M3 성공
3. 브로커가 M2, M3을 거부 (시퀀스 순서 맞지 않음)
4. M1 재전송 성공
5. M2, M3 재전송 성공
→ 순서: M1 → M2 → M3
3.2. ack = all
Kafka 3.0 버전부터는 enable.idempotence의 기본값이 true로 변경되어 이에 딸려오는 추가 옵션인 ack 옵션 또한 all로 강제하고 있다.
앞서 다뤘듯이 ack=all 옵션은 3가지 옵션 중 가장 느린데, 해당 자료에 의하면 비동기 프로듀서를 사용하면 max.in.flight.requests.per.connection의 값이 3일 때까지는 속도를 보장받을 수 있다고 한다.
https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance
4. 재전송 설정
Kafka 프로듀서가 메시지를 브로커에 전송할 때, 브로커는 성공 응답(ACK) 또는 오류 코드를 반환한다. 이때 오류 코드에 따라 재전송 가능한 오류와 재전송 불가능한 오류로 분류된다.
재전송 가능한 오류 예시
- NOT_ENOUGH_REPLICAS_EXCEPTION
- LEADER_NOT_AVAILABLE_EXCEPTION
- NETWORK_EXCEPTION
재전송 불가능한 오류 예시
- INVALID_CONFIG
- RECORD_TOO_LARGE
- INVALID_REQUIRED_ACKS
재전송 가능한 오류 코드를 받았을 때는 설정에 따라 자동으로 재시도를 수행하게 되는데, 재전송 설정은 다음과 같이 2가지가 있다.
4.1. retries
브로커가 일정 시간 내에 복구되지 않으면, 모든 재시도를 소진한 후에는 메시지를 전송하지 못하고 실패로 간주한다.
4.2. delivery.timeout.ms
설정된 시간 내에 메시지가 성공적으로 전송되지 않으면, 프로듀서는 메시지를 포기하고 실패로 간주한다.
실제 사례를 통해 동작 방식을 비교해보자. 현재 일시적으로 문제가 생긴 브로커가 1.5초 이후에 복구된다고 가정해보자.
retries 예시
// 재시도 횟수 10회
props.put(ProducerConfig.RETRIES_CONFIG, 10);
0.1초: 첫 시도 실패
0.2초: 2번째 시도 실패
0.3초: 3번째 시도 실패
...
1초: 10번째 시도 실패
→ 브로커가 1.5초에 복구되어도 이미 재시도를 모두 소진
delivery.timeout.ms 예시
// 재시도 허용 시간 1분 제한, 재시도 대기 시간 0.1초
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
0초: 첫 시도 실패
0.1초: 대기
0.2초: 2번째 시도 실패
0.3초: 대기
...
1.5초: 브로커 복구
1.6초: n번째 시도 성공
→ 브로커가 복구되면 성공적으로 전송
정리
Kafka 2.1 버전 이후로 retries의 기본값은 2147483647(사실상 무한)이라고 한다. 이렇게 되면 시스템 상태를 고려하지 않고 무한히 재시도 하므로 효율적이지 않다. 따라서 delivery.timeout.ms로 제어하는 것이 권장되는 방식이라고 한다.
5. 메시지 전송 로깅
마지막으로 메시지 발행 이후 성공 여부에 따라 로깅하는 방법에 대해 알아보자.
Spring Kafka는 프로듀서의 전송 결과를 수신하는 ProducerListener를 제공한다. 이를 활용하여 메시지 전송 여부에 따라 결과를 로깅할 수 있다.
KafkaProducerListener
@Component
public class KafkaProducerListener implements ProducerListener<String, String> {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
ProducerListener.super.onSuccess(producerRecord, recordMetadata);
EventLogger.logSentMessageOnSuccess(producerRecord, recordMetadata);
}
@Override
public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
ProducerListener.super.onError(producerRecord, recordMetadata, exception);
EventLogger.logSentMessageOnError(producerRecord, exception);
}
}

6. KafkaProducerConfig
완성된 설정은 다음과 같다.
처리량에 관한 설정은 실질적인 테스트가 어렵다고 판단해 default로 두었다. 그리고 메시지의 중복 전송 방지와 내구성을 위해 멱등성 활성화와 ack=all 설정을, 어느 정도의 속도를 보장하기 위해 max.in.flight.requests.per.connection은 3을 주었다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 동시 요청 수 설정
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 3);
// 멱등성 설정
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Ack 설정
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 재시도 설정
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(KafkaProducerListener kafkaProducerListener) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
}
'Project > 티켓핑' 카테고리의 다른 글
| Kafka Cluster 구축하기 (1) | 2025.06.08 |
|---|---|
| Kafka Consumer 설정하기 (1) | 2025.06.08 |
| Redis Cluster 구축하기 (0) | 2025.06.03 |
| WebFlux 전환하기 (0) | 2025.05.12 |
| 대기열 진입 동시성 문제 해결하기 (0) | 2025.05.12 |
