Kafka 기본 개념

2020. 11. 9. 13:44cloud&platform

한마디로 정의하자면, 고성능에 초점을 맞춘 pub/sub 모델의 메시지 관리자라고 말하겠다.

 

메시지를 생성하는 곳(producer)과 소비하는 곳(consumer)를 분리하여, 메시지 자체를 관리하는 것에만 초점을 맞춤. producer와 consumer는 다대다의 복잡한 관계를 맺지 않고, 서로 Kafka와 1~다 대 1 관계를 맺으면 됨. - 중앙 집중형 메시지 관리

 

 

Kafka Basic Flow

디스크 저장 방식 - 일반적은 메시 지큐는 consumer가 메시지를 읽어가면 큐에서 삭제됨. 하지만 kafka는 보관 주기 동안 메시지를 보관. 멀티 컨슈머가 가능한 이유.

그 외 메시지 손실 없는 다양한 작업이 가능 (consumer 수정 등)


 

기본 용어 정리

 

1. Broker : Kafka 가 설치되어 있는 서버/Node

2. Topic : producer와 consumer들이 kafka로 보낸 자신들의 메시지를 구분하기 위한 Name. 다수의 producer/consumer가 하나의 Kafka를 이용하는 경우 메시지들이 뒤섞이지 않게 관리됨.

3. Partition : 병렬처리가 가능하도록 Topic을 나눌 수 있음. 또한 많은 양의 메시지 처리를 위해 partition 수를 늘리기도 함.

4. Producer : 메시지를 생산하여 Broker의 Topic으로 보내는 앱

5. Consumer : Broker의 Topic에 저장된 메시지를 가져가는 앱

 


 

주키퍼 (Zookeeper)

 

여기서는 Kafka 관리를 위한 필수 구성 요소로서의 주키퍼를 설명하겠다.

 

분산 application을 위한 코디네이션 시스템 - 본래 Hadoop의 서브 프로젝트였다가 아파치 메인 프로젝트로 승격되었음.

분산 application이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 application의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공.

 

여러 대의 서버를 앙상블로 구성 (Cluster) : 분산 app들이 주키퍼 서버들과 커넥션을 맺은 후 상태 정보를 주고받음,

- 과반수 방식으로 3개의 앙상블 구성은 1개가 죽으면 서비스 가능, 2개는 불가. 5개의 앙상블은 2개까지 죽어도 가능 - 홀수 운영 필요.

- 상태 정보는 주키퍼의 지노드(znode)라는 곳에 저장 > 계층형 디렉터리 구조.

- zookeeper.connect=server1:2181,server2:2181,server3:2181 와 같이 클라이언트는 앙상블 되는 모든 주키퍼를 지정.

- zookeeper.connect=server1:2181,server2:2181,server3:2181/test/kafka/01 와 같이 특정 계층 구조를 지정하는 것이 더 좋다. (root 서 부터가 아닌, 하위 구조를 독립적으로 사용하므로, 하나의 앙상블에 복수의 app이 사용 가능함.)

- 지노드는 메모리 기반 저장

- 지노드에 변경사항이 발생하면 트랜잭션 로그에 추가. 로그가 어느 정도 커지면 현재 모든 지노드의 상태 스냅샷이 파일 시스템에 저장. 이 스냅샷은 모든 이전 로그들을 대체. > 저장 위치를 별도 지정/관리해 줘야 함.

 


 

Partition

 

파티션이 없는 하나의 Topic에 메시지를 쓰면

- 다수의 producer는 각각 Topic에 쓰는 순서를 기다려야 함. (3개의 producer가 각각 1초에 메시지를 쓴다면 한 번씩 다 쓰는데 총 3초)

- 다수의 consumer는 각각 Topic에서 읽는 순서를 기다려야 함.

- 서버당 메시지 전송 수 X/sec * N

따라서, 파티션을 producer / consumer의 수만큼 두면 : * N의 사라짐 (병렬처리)

 

하지만, 파티션은 많을수록 파일 핸들러 자원(파티션당 2개의 파일 - 인덱스, 데이터)을 낭비하고, 장애 복구 (Cluster 복제 비용) 비용을 증가 시킴.

 

공식 문서는 하나의 Broker 당 최대 2000개 이내의 파티션을 권장.

결국 목표 처리량에 맞는 파티션 계획이 필요.

 

오프셋(offset)

- 파티션마다 메시지가 저장되는 위치 (64비트 정수)

- 파티션에서는 고유의 값 (Topic에서는 중복 값)

- 메시지의 순서를 보장하는 방법 (오프셋 0,1,2 등 순서대로만 consumer가 가져갈 수 있음)

 

설정에 partitions = 1로 쓰는 것은 파티션 1개, 즉 파티션 없음 Topic = Partition (혼동하지 말자)

 

partition이 여러 개로 나뉠 때, Producer가 키를 지정하지 않고 Topic에 메시지를 보내면, round-robin 방식으로 골고루 분배됨 - Consumer는 어떤 파티션의 순서로 읽어야 하는지 모름. 단지 파티션의 오프셋 순서만 지킴. - 즉, 이 경우에는 producer가 보낸 메시지 순서를 보장할 수 없다. - Producer의 메시지 순서가 보장되어야 한다면, 파티션을 나누면 안 됨.

 


 

리플리케이션(Replication)

 

물리적 장애에 대비한 데이터 복제.

Topic의 파티션 단위 - 리더(원본)와 팔로워(복사본)로 용어 구분함. > 모든 읽기와 쓰기는 리더를 통해서만 이루어진다.

특정 Broker에만 생성된 Topic을 다른 Broker로 복제해 둠.

특정 Broker가 죽으면, 복제본이 있는 Broker의 Topic이 리더가 됨.

 

replication.factor

- 기본값 1

- cluser 내 모든 Broker에 동일하게 설정하여야 함.

- Topic(정확히는 Partition) replica의 수 (리더를 포함 - 즉, default일 경우 복제하지 않음)

- Broker의 수를 넘어 지정하지 않는다.

 

단점은 토픽의 사이즈가 * N 개가 됨. 그리고, 리더 / 팔로워를 관리하기 위한 Broker의 추가 리소스 소모한다.

> Topic의 중요도에 따라, factor를 지정하는 전략 필요.

 

모든 Broker가 죽는다면 선택할 수 있는 옵션

- 마지막 리더가 살아나길 기다린다 - 메시지 무손실 보장

- 빨리 살아난 Broker를 리더로 사용한다 - 메시지 손실 가능

- unclean.leader.election.enable = true (두 번째 빠른 방법), false (첫 번째 무손실 방법)

 

Kafka replication

 

ISR (In Sync Replica)

- 리더와 팔로워의 정합성 (팔로워가 리더의 모든 데이터를 완전히 복제하고 있는가)에 대한 Kafka의 해결책 - 신뢰성 향상

- 문제가 있는 팔로워의 Broker를 ISR 그룹에서 제외 시킴(리더로 승격되지 않음)

 


 

Controller

 

리더를 선정하는 프로세스를 담당 - 클러스터 내 Broker 중 하나를 선정.

 

Broker 레벨에서 실패를 감지 - 실패한 Broker에 의해 영향을 받는 모든 파티션의 리더 변경을 책임짐.

 

Controller Broker가 죽으면, 남아 있는 Broker 중 하나가 새로운 Controller가 됨.


 

Producer

 

각각의 메시지를 Topic Partition에 맵핑하고, 파티션의 리더에게 요청을 보내는 것.

키값을 정해 해당 키를 가진 모든 메시지를 동일한 파티션으로 전송 가능.

키를 정하지 않으면, round-robin 방식으로 파티션에 균등하게 분배.

 

kafka-console-producer.sh : 테스트 등을 위해 바로 메시지 보낼 수 있는 명령 제공

- --broker-list를 통해 클러스터 내 모든 Broker:9092 입력, --topic으로 topicName입력

 

동기 / 비동기 전송 가능.

비동기 전송의 경우 org.apache.kafka.clients.producer.Callback 을 구현하는 클래스를 만들어 콜백을 사용할 수 있음.

 

※ 주요 옵션

· bootstrap.serverskafka

- cluster에 처음 연결을 하기 위한 host:port 로 구성된 리스트 정보

- 보통 Broker list 전체를 입력

- kafka cluster는 마스터 개념이 없기 때문에, cluster 내 모든 서버가 클라이언트의 요청을 받을 수 있음.

 

· acks

- kafka topic의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack(승인)의 수

- 이 옵션의 수는 성능에는 반비례, 메시지 안정성에는 비례

- acks = 0 : Producer는 서버로부터 어떠한 ack도 기다리지 않는다. 서버가 데이터를 받았는지 보장하지 않고, 클라이언트는 결과를 모르므로 재요청 설정도 적용되지 않는다.

- acks = 1: 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않는다. 속도와 안정성 면에서 거의 많은 application이 채택,

- ack = all 또는 -1 : 리더는 ISR의 모든 팔로워들의 ack를 기다린다. 데이터 무손실 보장.

단, min.insync.replica=2 보다 크면, ack=all을 수행하기 위한 ISR의 사이즈에 미달될 경우 서비스가 되지 않음. 따라서, 보통 all을 적용할 경우, 2로 세팅하는 것을 추천.

 

· buffer.memory

- kafka 서버로 메시지를 보내기 위한 버퍼 메모리

 

· compression.type : 데이터를 압축하여 보낼 수도 있는데, 어떤 타입으로 압축할지 결정 - none, gzip, snappy, lz4 등

 

· retries : 일시적 오류로 인해 전송에 실패한 데이터를 다시 보냄

 

· batch.size

- Producer는 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도.

- 배치를 보내기 전의 클라이언트 장애가 발생하면 배치 데이터는 유실됨

- 고가용성이 목적이라면 이 옵션을 주지 않는 것도 좋은 방법.

 

· linger.ms

- batch.size 가 차면 바로 메시지를 보내지만, 차지 않을 경우 이 옵션에 정한 시간의 경과하면 보냄. default 0

 

· max.request.size - 최대 메시지 바이트 사이즈. default 1MB

 


 

Consumer

 

Topic의 메시지를 가져와 소비하는 역할. 파티션 리더에게 메시지 가져오기를 요청.

각 요청은 오프셋을 명시하고, 그 위치로부터 수신 가능 - 메시지 위치 조정, 이미 가져온 메시지도 다시 가져올 수 있음.

 

올드 컨슈머 : 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식 (deprecated)

뉴 컨슈머 : 컨슈머의 오프셋을 Kafka의 Topic에 저장하는 방식.

 

kafka-console-comsumer.sh : 테스트 등을 위한 콘솔 Consumer 제공

 

※ Consumer Group

- 하나의 Topic에 여러 Consumer Group이 동시에 접속해서 메시지를 가져가는 개념

 

출처: https://www.popit.kr/kafka-consumer-group/

 

- 각각의 Consumer 그룹이 각각 오프셋을 관리하기 때문에 가능.

- 따라서, 다른 그룹의 오프셋을 침범하지 않고 저렇게 그림처럼 독립적으로 '나만 있음' 하고 가져가는 것이다.

- 따라서, 하나의 메시지를 별도로 다른 용도로 처리할 수 있음.

- Consumer를 개별 단위로 관리하게 되면, Consumer를 확장하는 경우, 기존 오프셋 정보와 추가되는 Consumer의 오프셋 정보 등의 불일치로 혼란 유발, 뒤죽박죽 메시지로 인한 복잡도 증가 → 그룹 개념으로 쉽게 메타정보 공유 가능.

- 그룹에 추가되는 Consumer는 리밸런스를 통해, 파티션의 소유권이 재분배됨.

- 하나의 파티션에는 하나의 Consumer Instance만 가능 (복수의 파티션이 하나의 Consumer도 가능. consumer-01그룹의 server3의 경우. 즉, 하나의 파티션에 복수 Consumer만 아니면 됨).

1:1인대도 Consumer의 처리가 밀리면, 파티션과 같이 늘려줘야 하는 것이다. 그러나 한 번 늘린 파티션은 다시 회수가 불가능하므로 파티션 계획은 늘 테스트와 신중을 기해야 한다.

 

 

특정 파티션을 할당 가능.

- 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에서만 메시지를 가져와야 하는 경우.

- Consumer마다 그룹 아이디를 서로 다르게 해야 한다. (지정했으므로, 그룹 차원의 관리를 벗어난 것이다 - 리밸런스 등에 의한 오류 야기됨)

 

seek메소드를 통해 수동으로 특정 오프셋부터 읽을 수 있음.

 

※ 주요 옵션

· bootstrap.servers : (Producer와 동일)

 

· fetch.min.bytes : 한 번에 가져올 수 있는 최소 데이터 사이즈. 지정한 사이즈보다 작으면, 데이터가 누적될 때까지 기다린다.

· fetch.max.wait.ms : fetch.min.bytes에 의해 대기할 수 있는 최대 시간.

· fetch.max.bytes : 한 번에 가져올 수 있는 최대 데이터 사이즈.

 

· group.id : Consumer가 속한 Consumer Group을 식별하는 식별자

 

· enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋

· auto.commit.interval.ms : 주기적으로 오프셋을 커밋 하는 시간

· auto.offset.reset : 초기 오프셋이 없거나, 더 이상 존재하지 않은 경우(데이터 삭제 등)

- earliest : 가장 초기의 오프셋 값으로 설정

- latest : 가장 마지막 오프셋 값으로 설정 (default)

- none : 이전 오프셋 값을 찾지 못하면 에러

 

· request.timeout.ms : 요청에 대한 타임아웃 설정

· session.timeout.ms : Consumer와 Broker 사이의 세션 타임아웃 시간. Broker가 Consumer가 살아있는 것으로 판단하는 시간 (default 10초). Consumer가 그룹 코디네이터에 heartbeat를 보내지 않고, 이 시간이 경과하면, Consumer Group은 리밸런스를 시도.

· heartbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 하트비트를 보낼 것인지 설정. 당근 위 session.timeout.ms보다는 작아야겠지. (보통은 1/3로 설정)

 

· max.poll.records : 단일 호출에 대한 최대 레코드 수

· max.poll.interval.ms : Consumer가 하트비트는 보내는데, 실제 메시지는 가져가지 않는 경우 무한정 파티션을 점유하지 않도록 이 값을 설정.

 

※ 참조

https://data-flair.training/blogs/kafka-architecture/

 

 

 

 

 

'cloud&platform' 카테고리의 다른 글

Kubernetes Service Object  (0) 2020.12.02
Kubernetes Dashboard 설치  (0) 2020.12.01
Docker 기본 개념  (0) 2020.10.26
Kubernetes 설치 가이드 - v1.11.3  (0) 2020.10.23
Spring Core - IoC, DI, Context And Bean  (0) 2020.10.22