본문 바로가기
Delvelopment/Kafka

[Kafka] Topic 메세지 보관주기 설정 (MSK)

by 제제킴 2020. 10. 12.
반응형

AWS 에서 제공하는 MSK 의 다큐먼트먼저 공유한다.

https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/msk-configuration-properties.html

 

사용자 지정 MSK 구성 - Amazon Managed Streaming for Apache Kafka

advertised.listeners 속성은 설정할 수 있지만 listeners 속성은 설정할 수 없습니다.

docs.aws.amazon.com

 

카프카를 운영하면서 메시지를 언제까지 보관해야할지 고민에 빠졌다. 

기본값인 1주일 정도를 보관하자니 너무 무거워서 부담이되고. 짧으면 너무 메세지를 짧게 가지고있어서 확인을 못할 것 같고,

라는 고민에 빠졋다. 결국 1~3일 정도가 적당하다고 판단했고. 

Kafka 메시지를 보관하는 주기에 대한 셋팅을 찾아보게 됬다.

 

기본 개념

메시지 보관 기간 설정은 개별 메시지가 아닌 log segment 파일을 대상으로 처리된다.

이를 바탕으로 메시지가 카프카 브로커에 생성될 때, 해당 파티션의 log segment 파일 맨 마지막에 추가되는데, log.segment.bytes 값을 초과하면(기본 1GB) 해당 파일을 close 하여 만기 처리한 뒤 새 파일을 열어서 쓰게 된다.

이 때, log segment 파일이 close 되어 만기처리가 되어야만 retention.ms 또는 retention.bytes 로 삭제 처리할 수 있다.

ex) log.segment.bytes 가 기본값인 1GB 이고, log.retention.ms 가 7일이면, 1GB 를 다 채우고 난 다음 7일이 지나야 해당 파일이 삭제된다. 단, log.segment.bytes 값이 너무 크면 retention 값이 제대로 동작하지 않을 수 있고, 너무 작으면 파일의 close & open 이 빈번히 발생하므로 적절한 조절이 필요하다.

 

설정 값 예시 

auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=3
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=3
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.cleanup.policy=delete
log.cleaner.delete.retention.ms=86400000
log.retention.ms=86400000
반응형

설정할 수 있는 Apache Kafka 구성 속성

auto.create.topics.enable 서버에서 주제 자동 생성을 활성화합니다.
compression.type 주어진 주제에 대한 최종 압축 유형입니다. 이 속성을 표준 압축 코덱(gzip, snappy, lz4  zstd)으로 설정할 수 있습니다. 또한 압축하지 않은 것과 동일한 uncompressed, 생산자가 설정한 원래 압축 코덱을 유지한다는 의미인 producer를 허용합니다.
default.replication.factor 자동으로 생성된 주제에 대한 기본 복제 인수입니다.
delete.topic.enable 주제 삭제 작업을 활성화합니다. 이 구성이 해제되어 있으면 관리 도구를 통해 주제를 삭제할 수 없습니다.
group.initial.rebalance.delay.ms 그룹 코디네이터가 첫 번째 재분배를 수행하기 전, 더 많은 소비자가 새 그룹에 가입하기를 기다리는 시간입니다. 지연이 길어질수록 재분배가 줄어들 가능성이 높지만 처리가 시작될 때까지 시간이 늘어납니다.
group.max.session.timeout.ms 등록된 소비자에 대한 최대 세션 제한 시간입니다. 제한 시간이 길어지면 오류를 찾는 시간이 길어지는 대신, 소비자는 하트비트 사이에 메시지를 처리하는 데 더 많은 시간을 할애할 수 있습니다.
group.min.session.timeout.ms 등록된 소비자에 대한 최소 세션 제한 시간입니다. 제한 시간이 짧아지면 오류 찾기가 빨라지는 대신, 소비자 하트비트가 자주 발생하므로 브로커 리소스가 낭비될 수 있습니다.
leader.imbalance.per.broker.percentage 브로커당 허용되는 리더 불균형의 비율입니다. 브로커당 이 값을 초과하는 경우 컨트롤러는 리더 밸런스를 트리거합니다. 이 값은 백분율로 지정됩니다.
log.cleaner.delete.retention.ms Apache Kafka가 삭제된 레코드를 유지하는 시간 길이. 최소값은 0입니다.
log.cleaner.min.cleanable.ratio 이 구성 속성은 0에서 1 사이의 값을 가질 수 있습니다. 로그 압축기가 로그 정리를 시도하는 빈도를 결정합니다(로그 압축이 활성화된 것으로 가정). 기본적으로 Apache Kafka는 로그의 50% 이상이 압축된 로그를 정리하지 않습니다. 이 비율은 중복에 의해 로그에서 낭비되는 최대 공간을 제한합니다(로그의 최대 50%가 중복일 수 있음을 의미함). 비율이 높으면 청소의 효율성이 떨어지고 로그의 공간이 낭비됩니다.
log.cleanup.policy 보존 기간이 지난 세그먼트에 대한 기본 정리 정책입니다. 유효한 정책의 쉼표로 구분된 목록입니다. 유효한 정책은 delete  compact입니다.
log.flush.interval.messages 메시지가 디스크로 플러시되기 전에 로그 파티션에 누적된 메시지 수입니다.
log.flush.interval.ms 디스크로 플러시되기 전에 모든 주제의 메시지가 메모리에 보관되는 최대 시간(ms) 입니다. 설정되지 않은 경우 log.flush.scheduler.interval.ms 값이 사용됩니다. 최소값은 0입니다.
log.retention.bytes 삭제하기 전 로그의 최대 크기입니다.
log.retention.hours 로그 파일을 삭제하기 전에 보관하는 시간(3차 log.retention.ms 속성)입니다.
log.retention.minutes 로그 파일을 삭제하기 전에 보관하는 시간(분)(2차 log.retention.ms 속성)입니다. 설정되지 않은 경우 log.retention.hours 값이 사용됩니다.
log.retention.ms 로그 파일을 삭제하기 전에 보관하는 시간(밀리초)입니다. 설정하지 않으면 log.retention.minutes 값이 사용됩니다.
log.roll.ms 새 로그 세그먼트가 롤아웃되기 전 최대 시간(밀리초)입니다. 이 속성을 설정하지 않으면 log.roll.hours의 값이 사용됩니다. 이 속성에 대해 가능한 최소값은 1입니다.
log.segment.bytes 단일 로그 파일의 최대 크기입니다.
max.incremental.fetch.session.cache.slots 유지되는 최대 증분 가져오기 세션 수입니다.
message.max.bytes Kafka에서 허용하는 최대 레코드 배치 크기입니다. 이 값이 커지고 0.10.2보다 오래된 소비자가 있는 경우, 소비자의 가져오기 크기도 이 크기의 레코드 배치를 가져올 수 있도록 커져야 합니다.
최신 메시지 형식 버전에서 레코드는 효율성을 위해 항상 배치로 그룹화됩니다. 이전 메시지 형식 버전에서는 압축되지 않은 레코드가 배치로 그룹화되지 않으며, 이 경우 이 제한은 단일 레코드에만 적용됩니다.
주제 수준 max.message.bytes 구성을 사용하여 주제별로 설정할 수 있습니다.
로그.메시지.타임스탬프.차이.최대.ms 브로커가 메시지를 수신한 시점의 타임스탬프와 메시지에 지정된 타임스탬프 사이에 허용되는 최대 차이입니다. log.message.timestamp.type=CreateTime인 경우 타임스탬프의 차이가 이 임계값을 초과하면 메시지가 거부됩니다. 이 구성은 log.message.timestamp.type=LogAppendTime이 무시됩니다.
로그.메시지.타임스탬프.유형 메시지의 타임스탬프가 메시지 생성 시간인지 아니면 로그 추가 시간인지 지정합니다. 허용 값은 CreateTime, LogAppendTime입니다.
min.insync.replicas 생산자가 ack를 "all"(또는 "-1")로 설정하면 min.insync.replicas는 쓰기가 성공한 것으로 간주되기 위해 쓰기를 승인해야 하는 최소 복제본 수를 지정합니다. 이 최소값이 충족되지 않으면 생산자가 예외를 발생시킵니다(NotEnoughReplicas 또는 NotEnoughReplicasAfterAppend).
min.insync.복제본과 ack를 함께 사용하면 내구성 보장을 강화할 수 있습니다. 일반적인 시나리오는 복제 계수가 3인 주제를 생성하고, min.insync.replicas를 2로 설정하고, "all". 이렇게 하면 대부분의 복제본이 쓰기 를 수신하지 못하는 경우 Producer가 예외를 제기합니다.
num.io.threads 서버가 요청을 처리하는 데 사용하는 스레드 수로서, 디스크 I/O를 포함할 수 있습니다.
num.network.threads 서버가 네트워크에서 요청을 수신하고 응답을 보내는 데 사용하는 스레드 수입니다.
num.recovery.threads.per.data.dir 시작 시 로그 복구 및 종료 시 플러시에 사용할 데이터 디렉터리당 스레드 수입니다.
num.replica.fetchers 소스 브로커의 메시지를 복제하는 데 사용하는 가져오기 스레드 수입니다. 이 값을 늘리면 팔로어 브로커의 I/O 병렬화 정도가 높아질 수 있습니다.
num.partitions 주제별 기본 로그 파티션 수입니다.
offsets.retention.minutes 소비자 그룹이 모든 소비자를 잃은 후(즉, 비어 있음) 해당 오프셋은 폐기되기 전까지 이 보존 기간 동안 유지됩니다. 독립형 소비자(즉, 수동 할당 사용)의 경우, 오프셋은 마지막 커밋 시간에 이 보존 기간을 더한 시간 이후 만료됩니다.
offsets.topic.replication.factor 오프셋 주제에 대한 복제 인수입니다(가용성을 보장하려면 높게 설정). 클러스터 크기가 이 복제 인수 요구 사항을 충족할 때까지 내부 주제 생성에 실패합니다.
replica.fetch.max.bytes 각 파티션에 대해서 가져오려고 하는 메시지 바이트 수입니다. 이 값은 절대적인 최대값이 아닙니다. 가져오기의 첫 번째 비어 있지 않은 파티션의 첫 번째 레코드 배치가 이 값보다 큰 경우, 진행이 이루어질 수 있도록 레코드 배치가 반환됩니다. 브로커에서 허용하는 최대 레코드 배치 크기는 message.max.bytes(브로커 구성) 또는 max.message.bytes(주제 구성)를 통해 정의됩니다.
replica.fetch.response.max.bytes 전체 가져오기 응답에 대해 예상되는 최대 바이트 수입니다. 레코드는 배치로 가져오기되며, 가져오기의 첫 번째 비어 있지 않은 파티션의 첫 번째 레코드 배치가 이 값보다 큰 경우, 진행이 이루어질 수 있도록 레코드 배치가 반환됩니다. 이 값은 절대적인 최대값이 아닙니다. message.max.bytes(브로커 구성) 또는 max.message.bytes(주제 구성) 속성은 브로커가 허용하는 최대 레코드 배치 크기를 지정합니다.
replica.lag.time.max.ms 팔로워가 가져오기 요청을 보내지 않았거나 적어도 이 시간(밀리초) 동안 리더의 로그 끝 오프셋까지 소모하지 않은 경우 리더는 ISR에서 팔로어를 제거합니다.MinValue 10000
최대값(포함) = 30000
socket.receive.buffer.bytes 소켓 서버 소켓의 SO_RCVBUF 버퍼. 이 속성을 설정할 수 있는 최소값은 -1입니다. 값이 -1이면, Amazon MSK 는 OS 기본값을 사용합니다.
socket.request.max.bytes 소켓 요청의 최대 바이트 수입니다.
socket.send.buffer.bytes 소켓 서버 소켓의 SO_SNDBUF 버퍼. 이 속성을 설정할 수 있는 최소값은 -1입니다. 값이 -1이면, Amazon MSK 는 OS 기본값을 사용합니다.
replica.selector.class ReplicaSelector를 구현하는 정규화된 클래스 이름입니다. 이는 브로커가 선호하는 읽기 전용 복제본을 찾는 데 사용됩니다. Apache Kafka 버전 2.4.1 이상을 사용 중이고 소비자가 가장 가까운 복제본에서 가져올 수 있도록 허용하려면 이 속성을 다음으로 설정합니다. org.apache.kafka.common.replica.RackAwareReplicaSelector. 자세한 내용은 을 참조하십시오. Apache Kafka 버전 2.4.1.
replica.socket.receive.buffer.bytes 네트워크 요청에 대한 소켓 수신 버퍼입니다.
트랜잭션.id.만료.ms 트랜잭션 조정자가 트랜잭션 ID가 만료되기 전에 현재 트랜잭션에 대한 트랜잭션 상태 업데이트를 받지 않고 대기하는 시간(밀리초)입니다. 이 설정은 생산자 ID 만료에도 영향을 미칩니다. 생산자 ID는 주어진 생산자 ID로 마지막 쓰기 후 이 시간이 경과하면 만료됩니다. 항목의 보존 설정 때문에 생산자 ID의 마지막 쓰기가 삭제되면 생산자 ID가 더 빨리 만료될 수 있습니다. 이 속성의 최소값은 1밀리초입니다.
transaction.max.timeout.ms 트랜잭션의 최대 제한 시간입니다. 클라이언트가 요청한 트랜잭션 시간이 이 값을 초과하면 브로커는 InitProducerIdRequest에 오류를 반환합니다. 이렇게 하면 클라이언트에 너무 큰 시간 제한이 적용되지 않아 소비자가 트랜잭션에 포함된 주제에서 읽을 수 없습니다.
transaction.state.log.min.isr 트랜잭션 주제에 대해 재정의된 min.insync.replicas 구성.
transaction.state.log.replication.factor 트랜잭션 주제에 대한 복제 인수입니다. 가용성을 높이려면 이 값을 높게 설정합니다. 클러스터 크기가 이 복제 인수 요구 사항을 충족할 때까지 내부 주제 생성에 실패합니다.
unclean.leader.election.enable 데이터 손실의 가능성이 있는데도 ISR 집합에 없는 복제본을 최후의 수단으로 리더로 선택할 수 있도록 허용할지 나타냅니다.
zookeeper.connection.timeout.ms 클라이언트가 ZooKeeper에 대한 연결을 설정하기 위해 대기하는 최대 시간입니다. 설정되지 않은 경우, zookeeper.session.timeout.ms의 값이 사용됩니다.
zookeeper.session.timeout.ms 밀리초 단위의 Apache ZooKeeper 세션 제한 시간입니다.
최소값 = 6000
최대값(포함) = 18000

 

 

2020/11/30 - [Delvelopment/Kafka] - [Kafka] 같은메시지를 반복적으로 소비했던 리밸런싱 이슈 해결 (MAX POLL RECORDS CONFIG = "max.poll.record

2020/10/12 - [Delvelopment/Kafka] - [kafka Connect] 주기적으로 수행되는 무거운 쿼리 ALL_OBJECTS (table.poll.interval.ms)

2020/08/21 - [Delvelopment/Kafka] - MQ (Message queue)란?

2020/08/05 - [Delvelopment/Self-MSA구축기] - [Kafka Tool] KaDeck 이용해 Topic, Message생성하기.

2020/06/29 - [Delvelopment/Kafka] - [Spring Boot, kafka] 스프링 프로젝트에 kafka 리스너 적용기.

2020/06/29 - [Delvelopment/Kafka] - [MAC, Kafka] 맥에 Kafka 설치 하고 토픽생성. (Docker, homebrew, Apache)

2020/05/21 - [Delvelopment/Kafka] - [Apache Kafka] Kafka 란?

반응형

댓글