[Kafka CLI] 3. Kafka Console Consumer CLI

Kafka Console Consumer란?

브로커에 전송된 데이터를 폴링(polling)하여 데이터를 확인할 수 있는 console consumer 명령어

클러스터 위치토픽명보안설정파일
자신의 카프카 서버 IP사용하고자 하는 데이터와 연관된 이름보안이 필요한 설정 파일명 또는 위치
  • 외부나 보안 설정을 해서 접속하려면 보안설정파일을 설정해서 해야한다.
# 명령어표기
kafka-topics.sh 
--command-config [보안설정파일] 
--bootstrap-server [클러스터위치] 
--topic [토픽명] 
--create #(생성명령)
--partitions [파티션개수]

# Localhost 기준 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 --topic second_topic --create --partitions 3

컨슈머를 이용한 소비 – 현재 메세지 상태에서 소비하기

# 컨슈머를 이용해 소비를 해도 아무일이 일어나지 않는다.
kafka-console-consumer.sh --bootstrap-server [클러스터위치] --topic [토픽명]
  • 컨슈머를 이용해서 소비를해도 아무일이 일어나지 않는 이유는 현재 메세지 상태에서 소비하기 때문이다.
    • 이전 메세지가 아닌 현재 보내진 메세지를 소비한다.

컨슈머를 이용한 소비 – 또 다른 터미널에서 토픽메세지 전달해서 컨슈머 소비 메세지 확인하기.

# 컨슈머를 실행하고 메세지를 날리면 소비상태의 터미널에서 읽어들이는걸 확인할 수 있다.
kafka-console-producer.sh --bootstrap-server [클러스터 위치] --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic [토픽명]
>hello world
>my name j 
>is's working!

# RoundRobinPartitioner 옵션은?
**어떻게 결과가 나타나는지 알기 위해서 DEV에서 확인용, 학습용으로만 사용하자.**
  • 컨슈머를 통해 소비할 토픽 즉, 내가 생성한 명칭에 해당하는 토픽(second_topic)에 프로듀싱 해야한다.
  • 프로듀서 프로퍼티인 partitioner 클래스를 입력해야 한다. (RoundRobinPartitioner)
    • RoundRobinPartitioner 사용하는 이유는 한 번에 하나의 파티션을 프로듀싱해서 모든 파티션을 변경하려고 하기 때문이다.
  • RoundRobinPartitioner 사용하지 않으면
    • 카프카는 많은 최적화가 이루어져있다. 설정하지 않으면 자동으로 최적화된 방식으로 작동된다. ****
    • 최적화된 방식으로 같은 파티션에 약 16KB 데이터를 전송할 때 까지 계속 프로듀싱해야한다. 그 다음 파티션을 전환하게 된다.
    • 기본적으로 아주 비효율적인 설정이기때문에 PRD 환경에서는 사용하지 않고 최적화에 맞게 하는게 좋다.

컨슈머를 이용한 소비 – 시작부터 소비하기 옵션 사용하기(consuming from beginning)

  • –from-beginning : 시작부터 메세지를 소비하도록 옵션을준다.

확인 결과 메세지가 정상적으로 출력은 되는것을 확인할 수 있다. 시작하자마자 소비하여 전체 메세지를 보여준다. 해당 메세지에는 순서가 입력한 순서대로 보장되어 있지 않다.

  • 오류라고 생각할 수 있지만 오류가 아니다. 데이터가 파티션별로 차례차례 읽히기 때문이다.
  • 컨슈머를 통해 확인한 토픽의 모든 메세지를 순서대로 확인할 수 있는데, 확장은 하지 않는다.
  • 카프카는 확장을 위해 여러 파티션이 필요하다.
    • 프로듀서가 여러 파티션에 프로듀싱한다.
    • 동시에 여러 컨슈머로 여러 파티션 소비를 할 수 있다.

컨슈머를 이용한 소비 –

옵션명설정값설명
—formatterDefaultMessageFormatterCLI 명령어 출력을 포맷해서 내보낸다.
–propertyprint.timestamp=true메세지가 수신된 시간을 알 수 있게 해준다.
print.key=true키가 어떤것인지 출력한다.
print.value=true값을 출력한다.
print.partition=true메세지가 할당된 파티션 숫자를 구할 수 있다.
–from-beginning처음부터 메세지를 읽어옴
# 파티션 숫자 표기 및 포맷터 지정하여 소비 출력하기
kafka-console-consumer.sh --bootstrap-server [클러스터위치] --topic [토픽명] --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning
>>>>>>>>
CreateTime:1693368104452	Partition:1	null	hongwoo
CreateTime:1693368126687	Partition:1	null	my name j
CreateTime:1693368506494	Partition:1	null	one
CreateTime:1693368122413	Partition:0	null	hello world
CreateTime:1693368505662	Partition:0	null	
CreateTime:1693368515879	Partition:0	null	three
CreateTime:1693368116383	Partition:2	null	hellow world
CreateTime:1693368132562	Partition:2	null	is's working!
CreateTime:1693368508993	Partition:2	null	two
  • 출력 결과를 확인해 보면 순서대로 생성일자, 속한 파티션, key, value 형태로 보여주고 있다.
  • 파티션별로 순서를 구할 수 있다는 점에 유의해야 한다. – 기억해두면 아주 좋다.
    • 파티션 전체 순서를 구할 수 없지만, 순서는 파티션별로 구할 수 있다.

프로듀서와 컨슈머의 행동을 이해하는게 중요하다.!

LIST