https://samori.tistory.com/119
ELK + 내장 Filebeat
samori.tistory.com
에 이어 Kafka라는
데이터 스트리밍을 제공하는 오픈 소스 플랫폼을 연결해서 파일비트와 ELK 사이에 둘 것이다.
왜?
아래와 같은 장점이 있기 때문이다.
Scalability
Fault Tolerance
Real-Time Analytics
Load Management
다 중요하지만 Fault Tolerance와 Load Management가 핵심인데 바로 데이터 유실 방지와 분산 처리이다.
아래는 샘플 Compose 파일로 하나의 Kafka 서비스가 아닌 3개의 서비스를 띄우고 이를 연결해준 것을 볼 수 있다.
이는 Fault Tolerance(데이터 유실 방지)와 Load Management(분산 처리)의 이점을 살리기 위함이다.
기존 방식과 하나 차이가 있다고 하면 바로 Zookeeper라고 하는 관리 서비스를 쓰지 않고 KAFKA에 내장된 KRaft를 쓴 것인데 KRaft는 Zookeeper와는 달리 외부 서비스가 아니어서 의존성이 낮고 또 그런 특징이 있어서 속도도 꽤 최적화된 것으로 나온다. 하지만 구 버전에서는 Zookeeper와 사용해야 하니 KRaft가 지원이 되는 Kafka 3.3 이상의 버전인지 확인이 필요하다.
services:
Kafka00Service:
image: bitnami/kafka:4.0.0
restart: unless-stopped
container_name: Kafka00Container
ports:
- '9092:9092' # 내부 네트워크 통신을 위한 PLAINTEXT 리스너
- '10000:10000' # 외부 접근을 위한 EXTERNAL 리스너
environment:
# KRaft 설정
- KAFKA_ENABLE_KRAFT=yes # KRaft 모드 활성화
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw # 고유 클러스터 ID, 모든 브로커에 동일하게 설정
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 리스너 설정
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:10000
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://Kafka00Service:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 클러스터 설정
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- elk
volumes:
- Kafka00:/bitnami/kafka
Kafka01Service:
image: bitnami/kafka:4.0.0
restart: unless-stopped
container_name: Kafka01Container
ports:
- '9093:9092' # 내부 네트워크 통신을 위한 PLAINTEXT 리스너
- '10001:10000' # 외부 접근을 위한 EXTERNAL 리스너
environment:
# KRaft 설정
- KAFKA_ENABLE_KRAFT=yes # KRaft 모드 활성화
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw # 고유 클러스터 ID, 모든 브로커에 동일하게 설정
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 리스너 설정
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:10000
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://Kafka01Service:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 클러스터 설정
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- elk
volumes:
- Kafka01:/bitnami/kafka
Kafka02Service:
image: bitnami/kafka:4.0.0
restart: unless-stopped
container_name: Kafka02Container
ports:
- '9094:9092' # 내부 네트워크 통신을 위한 PLAINTEXT 리스너
- '10002:10000' # 외부 접근을 위한 EXTERNAL 리스너
environment:
# KRaft 설정
- KAFKA_ENABLE_KRAFT=yes # KRaft 모드 활성화
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw # 고유 클러스터 ID, 모든 브로커에 동일하게 설정
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 리스너 설정
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:10000
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://Kafka02Service:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 클러스터 설정
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- elk
volumes:
- Kafka02:/bitnami/kafka
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: unless-stopped
container_name: KafkaWebUiContainer
ports:
- '8085:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
networks:
- elk
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
networks:
elk:
name: docker-elk_elk
driver: bridge
external: true
Kafka-UI가 있는 것을 볼 수 있는데 이는 간단하게 웹에서 카프카 설정을 볼 수 있는 곳이다.
예시 화면
여기서 포인트는 외부가 아닌 내부 네트워크로 연결을 해주는 것으로
기존 ELK의 네트워크를 카프카와 연결 시켜주는 것이다.
...
networks:
- elk
networks:
elk:
name: docker-elk_elk
driver: bridge
external: true
그렇게 연결시켜주고 나면 IP대신 서비스명 예: Kafka00Service 를 쓸 수 있고 외부 포트인 10000~10002 가 아닌 9092를 쓸 수 있다. 아래가 그 예시이다.
FILEBEAT
output.kafka:
hosts: ["Kafka00Service:9092", "Kafka01Service:9092", "Kafka02Service:9092"]
topic: "filebeat-logs"
partition.round_robin:
reachable_only: true
LOGSTASH (아래는 Nginx Access log 기준의 파싱. 로그의 종류에 따라, 입맛에 맞춰 Grok Pattern을 바꾸면 된다)
input {
kafka {
bootstrap_servers => "Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092"
topics => ["filebeat-logs"]
group_id => "logstash"
consumer_threads => 3
auto_offset_reset => "earliest"
session_timeout_ms => 30000
}
}
filter {
# First parse the JSON from Filebeat
json {
source => "message"
target => "filebeat_data"
}
# Extract the actual log message from the nested JSON
if [filebeat_data][message] {
# Set the actual log message for processing
mutate {
rename => { "[filebeat_data][message]" => "log_message" }
}
# Apply grok pattern to the actual log message
grok {
match => {
"log_message" => '%{IP:remote_addr} - - \[%{HTTPDATE:time_local}\] "%{WORD:http_method} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} %{NUMBER:request_time:float} "(?:%{URI:http_referer}|-)" %{NUMBER:session_id:int} "(?:%{DATA:http_user_agent}|-)" "-"'
}
tag_on_failure => ["_grokparsefailure"]
}
# Process the timestamp if grok was successful
if "_grokparsefailure" not in [tags] {
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Seoul"
target => "@timestamp"
}
# Copy useful fields from the Filebeat metadata
mutate {
copy => {
"[filebeat_data][log][file][path]" => "source_file"
}
remove_field => [ "message", "event", "host", "time_local", "filebeat_data" ]
}
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "elastic"
password => "changeme"
index => "kafka-logs-%{+YYYY.MM.dd}"
codec => json
}
stdout {
codec => rubydebug
}
}
바로 grok pattern으로 파싱을 하지 않고 한번 json으로 파싱을 해주는 이유는 파일비트가 json으로 감싸서 kafka에 보내기 때문이다. 그러면 받은 Logstash는 raw data를 받는 것이 아니라 json 포맷의 데이터를 받아서 1차 가공이 필요한 것이다. 가공이 된 후 grok pattern으로 온전히 파싱이 되는 원리이다.