Flume 推送日志到 Kafka
727
2021-04-23
1.基础环境说明
- 虚拟运行环境:VirtualBox 6.1
- 系统:CentOS 7.5
- Java:Zulu JDK 11.48+21-CA (build 11.0.11+9-LTS)
- Kafka:2.12-2.8.0
- Flume:1.9.0
2.搭建 Kafka测试环境
2.1 安装 Kafka
在 Kafka官网 下载tgz文件
tar -vxzf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0/ kafka
cd kafka
2.2 启动 ZooKeeper 服务
使用默认配置启动
$ bin/zookeeper-server-start.sh config/zookeeper.properties
2.3 配置启动 Kafka
进入kafka的config
配置文件目录,所有配置文件都存放在该目录下。
cd config
2.3.1 Kafka 主服务
2.3.1.1 配置使用 SASL_PLAINTEXT 认证
编辑server.properties
文件,找到listeners = PLAINTEXT://your.host.name:9092
并在下面做出修改。
listeners=SASL_PLAINTEXT://192.168.1.123:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.123:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
说明:
- 开启SASL认证 SASL_PLAINTEXT ;
- 监听IP192.168.1.123的9092端口(本机端口),注意:不可以用 0.0.0.0:9092 的方式。
2.3.1.2 配置服务授权信息
创建文件kafka_server_jaas.conf
(文件名和路径随意,但需要与后面的配置和命令相对应),创建文件后填写如下内容:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-test"
user_admin="admin-test"
user_producer="prod-test"
user_consumer="cons-test";
};
说明:
- 第一行
KafkaServer
说明该文件内容为Kafka服务信息; - username 和 password 配置管理员信息;
user_producer="prod-test"
配置用户 producer 的密码为 prod-test;user_consumer="cons-test"
配置用户 consumer 的密码为 cons-test。
2.3.1.3 启动 Kafka Server
$ bin/kafka-server-start.sh config/server.properties
2.3.2 配置消费者
2.3.2.1 配置使用 SASL_PLAINTEXT 认证
消费者用来观察 Flume 推送过来的信息,编辑consumer.properties
文件,在文件末尾添加信息。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2.3.2.2 配置服务授权信息
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-test";
};
2.3.2.3 修改 bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/root/kafka/config/kafka_client_jaas.conf"
fi
2.3.2.4 启动消费者
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.123:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
2.3.3 配置启动生产者以测试Kafka服务
2.3.3.1 修改 bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/root/kafka/config/kafka_client_jaas.conf"
fi
2.3.3.3 启动生产者
$ bin/kafka-console-producer.sh --broker-list 192.168.1.123:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
启动成功后,会出现>
提示符;此时,在提示符后面输入字符并发送,就可以在消费者界面看到已经接收到信息,表示 Kafka 启动成功!
3.Flume
3.1 Flume 配置文件
$ vi flume/job/telnet-to-kafka.conf
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
# 监听 /root/kafka.log 文件的变化并推送到 Kafka
agent.sources.s1.command=tail -F /root/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收器
# agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
# agent.sinks.k1.brokerList=192.168.1.123:9092
#设置Kafka的Topic
# agent.sinks.k1.topic=test
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#设置Kafka接收器
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
agent.sinks.k1.kafka.bootstrap.servers = 192.168.1.123:9092
agent.sinks.k1.kafka.topic = test
agent.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
agent.sinks.k1.kafka.producer.sasl.kerberos.service.name = producer
agent.sinks.k1.channel = c1
agent.sources.s1.channels = c1
3.2 创建并编辑授权文件
$ vi flume/job/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-test";
};
3.3 启动 Flume
$ bin/flume-ng agent -n agent -c conf -f job/telnet-to-kafka.conf -Djava.security.auth.login.config=/root/flume/job/kafka_client_jaas.conf -Dflume.root.logger=INFO,console
3.4 测试
向被监听的文件中追加内容
$ echo test-logger-20210422-001>>/root/kafka.log
追加内容后查看 Kafka 的消费者界面,如果看到追加的文件内容,表示 Kafka 大功告成!
附录
监听文件夹上传指定日志文件
agent.sources.s1.type=spooldir
agent.sources.s1.spoolDir=/root/logs
# agent.sources.s1.includePattern=^.*\.log$
# 正则表达式限制指定文件
agent.sources.s1.includePattern=^jeecgboot-[0-9]{4}-[0-9]{2}-[0-9]{2}\.[0-9]*\.log$
agent.sources.s1.fileSuffix=.push
4. 参考
kafka 快速安装使用
kafka 官方指南
Centos7.3防火墙配置
Kafka的安全认证机制SASL/PLAINTEXT
- 0
- 0
-
分享