Kafka Console Consumer란?
브로커에 전송된 데이터를 폴링(polling)하여 데이터를 확인할 수 있는 console consumer 명령어
클러스터 위치 | 토픽명 | 보안설정파일 |
---|---|---|
자신의 카프카 서버 IP | 사용하고자 하는 데이터와 연관된 이름 | 보안이 필요한 설정 파일명 또는 위치 |
- 외부나 보안 설정을 해서 접속하려면 보안설정파일을 설정해서 해야한다.
# 명령어표기
kafka-topics.sh
--command-config [보안설정파일]
--bootstrap-server [클러스터위치]
--topic [토픽명]
--create #(생성명령)
--partitions [파티션개수]
# Localhost 기준 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 --topic second_topic --create --partitions 3
컨슈머를 이용한 소비 – 현재 메세지 상태에서 소비하기
# 컨슈머를 이용해 소비를 해도 아무일이 일어나지 않는다.
kafka-console-consumer.sh --bootstrap-server [클러스터위치] --topic [토픽명]
- 컨슈머를 이용해서 소비를해도 아무일이 일어나지 않는 이유는 현재 메세지 상태에서 소비하기 때문이다.
- 이전 메세지가 아닌 현재 보내진 메세지를 소비한다.
컨슈머를 이용한 소비 – 또 다른 터미널에서 토픽메세지 전달해서 컨슈머 소비 메세지 확인하기.
# 컨슈머를 실행하고 메세지를 날리면 소비상태의 터미널에서 읽어들이는걸 확인할 수 있다.
kafka-console-producer.sh --bootstrap-server [클러스터 위치] --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic [토픽명]
>hello world
>my name j
>is's working!
# RoundRobinPartitioner 옵션은?
**어떻게 결과가 나타나는지 알기 위해서 DEV에서 확인용, 학습용으로만 사용하자.**
- 컨슈머를 통해 소비할 토픽 즉, 내가 생성한 명칭에 해당하는 토픽(second_topic)에 프로듀싱 해야한다.
- 프로듀서 프로퍼티인 partitioner 클래스를 입력해야 한다. (RoundRobinPartitioner)
- RoundRobinPartitioner 사용하는 이유는 한 번에 하나의 파티션을 프로듀싱해서 모든 파티션을 변경하려고 하기 때문이다.
- RoundRobinPartitioner 사용하지 않으면
- 카프카는 많은 최적화가 이루어져있다. 설정하지 않으면 자동으로 최적화된 방식으로 작동된다. ****
- 최적화된 방식으로 같은 파티션에 약 16KB 데이터를 전송할 때 까지 계속 프로듀싱해야한다. 그 다음 파티션을 전환하게 된다.
- 기본적으로 아주 비효율적인 설정이기때문에 PRD 환경에서는 사용하지 않고 최적화에 맞게 하는게 좋다.
컨슈머를 이용한 소비 – 시작부터 소비하기 옵션 사용하기(consuming from beginning)
- –from-beginning : 시작부터 메세지를 소비하도록 옵션을준다.
확인 결과 메세지가 정상적으로 출력은 되는것을 확인할 수 있다. 시작하자마자 소비하여 전체 메세지를 보여준다. 해당 메세지에는 순서가 입력한 순서대로 보장되어 있지 않다.
- 오류라고 생각할 수 있지만 오류가 아니다. 데이터가 파티션별로 차례차례 읽히기 때문이다.
- 컨슈머를 통해 확인한 토픽의 모든 메세지를 순서대로 확인할 수 있는데, 확장은 하지 않는다.
- 카프카는 확장을 위해 여러 파티션이 필요하다.
- 프로듀서가 여러 파티션에 프로듀싱한다.
- 동시에 여러 컨슈머로 여러 파티션 소비를 할 수 있다.
컨슈머를 이용한 소비 –
옵션명 | 설정값 | 설명 |
---|---|---|
—formatter | DefaultMessageFormatter | CLI 명령어 출력을 포맷해서 내보낸다. |
–property | print.timestamp=true | 메세지가 수신된 시간을 알 수 있게 해준다. |
print.key=true | 키가 어떤것인지 출력한다. | |
print.value=true | 값을 출력한다. | |
print.partition=true | 메세지가 할당된 파티션 숫자를 구할 수 있다. | |
–from-beginning | 처음부터 메세지를 읽어옴 |
# 파티션 숫자 표기 및 포맷터 지정하여 소비 출력하기
kafka-console-consumer.sh --bootstrap-server [클러스터위치] --topic [토픽명] --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning
>>>>>>>>
CreateTime:1693368104452 Partition:1 null hongwoo
CreateTime:1693368126687 Partition:1 null my name j
CreateTime:1693368506494 Partition:1 null one
CreateTime:1693368122413 Partition:0 null hello world
CreateTime:1693368505662 Partition:0 null
CreateTime:1693368515879 Partition:0 null three
CreateTime:1693368116383 Partition:2 null hellow world
CreateTime:1693368132562 Partition:2 null is's working!
CreateTime:1693368508993 Partition:2 null two
- 출력 결과를 확인해 보면 순서대로 생성일자, 속한 파티션, key, value 형태로 보여주고 있다.
- 파티션별로 순서를 구할 수 있다는 점에 유의해야 한다. – 기억해두면 아주 좋다.
- 파티션 전체 순서를 구할 수 없지만, 순서는 파티션별로 구할 수 있다.