초기 Kafka 운영 중 이상한 이슈에 직면했다.
- 현상
kafka의 소비되어 처리된 메시지가 한 번이계속 반복적으로 수행하고 적게는 몇번 많게는 수십번까지 다시 소비하는 현상 이였다.
내 이론상 kafka 메시지는 리스너에서 처리된 후 kafka에 커밋을 치고 offset값을 올려주면서 다음 메시지를 처리한다고 생각했는데...
즉, 다시 말해서 정말 생각지도 못했던 이슈였던 것이다.
원인을 찾고 해당 증상을 면밀히 조사하여, kafka 리스너 셋팅 몇개를 수정하면서 문제를 해결하긴 했지만 초기에 알지 못했으면 큰 이슈로 연결 될뻔 했다.
- 원인
Consumer가 메시지를 처리하는 도중 Timeout이 발생했고 그로 인해 파티션 리밸런싱 과정이 일어났다.
Consumer는 (MAX_POLL_RECORDS_CONFIG = "500") Topic에서 메시지를 500개씩 가져오고 있었고,
Consumer와 Broker 사이의 세션 유지시간 (SESSION_TIMEOUT_MS_CONFIG = 10000) 10초,
요청에 대해 응답을 기다리는 최대 시간 (REQUEST_TIMEOUT_MS_CONFIG = 30000) 30초를 셋팅하고 있다.
위 기본값 대로라면, Topic의 메시지를 500개씩 Consumer가 가져오고 있으며, 세션유지시간은 10초, 컨슈머의 응답을 기다리는 시간은 30초 였던 것이다.
500개를 처리하는 중 문제가(timeout 등) 있었다면, Consumer와 Broker 사이의 Rule이 깨진 것이다.
그로 인해 파티션 리밸런싱이 발생했던 것이다.
--> 컨수머에서 500개를 가져오던 중 리밸런싱이 일어났으니 Broker는 Consumer에서 가져간 500개를 처리하지 못했다고 판단하고 다른 Consumer는 해당 메시지를 Broker에서 가져온다.
위 증상이 반복되면서 이미 소비된 메시지가 반복적으로 소비할 수 있는 환경이 갖춰진 것이다.
해결안.
# 요청에 대해 응답을 기다리는 최대 시간
requestTimeoutMs: 30000
# 컨슈머와 브로커사이의 세션 타임 아웃시간.
sessionTimeoutMs: 30000
# session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3
heartbeatIntervalMs: 10000
# 컨슈머가 polling하고 commit 할때까지의 대기시간
maxPollIntervalMs: 600000
# poll ()에 대한 단일 호출에서 반환되는 최대 레코드 수
maxPollRecords: 1
Consumer 프로젝트에서 Kafka 기본 셋팅값을 수정해주면 된다.
필자는 Java (Spring Boot환경)에서 ConsumerConfig.java의 값을 수정했다.
google에 ConsumerConfig.java 만 검색해도 자세한 설명을 나올 것으로 보고, Listener에 대한 설정이 가능하니 참고하길 바란다.
우선적으로 500개씩 Broker에서 가져오는 MAX_POLL_RECORDS_CONFIG를 1개로 수정했다.
(참고한 블로그에선 3개로 셋팅해서 3개로 진행했는데 간혈적으로 타임아웃으로 인해 똑같은 상황이 반복되어 정확해야 하는건이라 1개로 수정.)
그리고 카프카 Broker와 Listener간의 Time들을 수정했는데 조건부적인 작업이라고 생각한다.
MAX_POLL_RECORDS_CONFIG 이 부분이 가장 크리티컬 하다고 판단한다.
session.timeout.ms | 이 옵션은 heartbeat 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며 heartbeat.interval.ms와 밀접한 관련이 있어서 일반적으로 두 속성이 함께 수정된다. |
10000 (10초) |
heartbeat.interval.ms | 컨슈머가 얼마나 자주 heartbeat을 보낼지 조정한다. session.timeout.ms보다 작아야 하며 일반적으로 1/3로 설정 | 3000 (3초) |
max.poll.interval.ms | 이러한 경우에 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외시키도록 하는 옵션이다. |
300000 (5분) |
max.poll.records | 이 옵션으로 polling loop에서 데이터 양을 조정 할 수 있다. |
500 |
enable.auto.commit | 백그라운드로 주기적으로 offset을 commit | true |
auto.commit.interval.ms | 주기적으로 offset을 커밋하는 시간 | 5000 (5초) |
auto.offset.reset | none: 이전 offset값을 찾지 못하면 error 발생 |
latest |
Kafka에서 공식 다큐먼트로 Consumer 옵션들이 있다 참고하면 된다~!
kafka.apache.org/documentation/#consumerconfigs
참고 -
카프카 컨슈머 애플리케이션 배포 전략 (medium.com/11st-pe-techblog/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%EB%B0%B0%ED%8F%AC-%EC%A0%84%EB%9E%B5-4cb2c7550a72)
kafka 설정을 사용한 문제해결 saramin.github.io/2019-09-17-kafka/
2020/10/12 - [Delvelopment/Kafka] - [Kafka] Topic 메세지 보관주기 설정 (MSK)
2020/08/05 - [Delvelopment/Self-MSA구축기] - [Kafka Tool] KaDeck 이용해 Topic, Message생성하기.
2020/06/29 - [Delvelopment/Kafka] - [MAC, Kafka] 맥에 Kafka 설치 하고 토픽생성. (Docker, homebrew, Apache)
'Delvelopment > Kafka' 카테고리의 다른 글
[Java] Synchronize (동기화) 방법 3가지 (0) | 2022.02.20 |
---|---|
[Kafka] Topic 메세지 보관주기 설정 (MSK) (0) | 2020.10.12 |
[kafka Connect] 주기적으로 수행되는 무거운 쿼리 ALL_OBJECTS (table.poll.interval.ms) (3) | 2020.10.12 |
MQ (Message queue)란? (0) | 2020.08.21 |
[Spring Boot, kafka] 스프링 프로젝트에 kafka 리스너 적용기. (0) | 2020.06.29 |
댓글