观热点:大数据NiFi(二十一):监控日志文件生产到Kafka
2023-03-09 10:11:02 来源:腾讯云
监控日志文件生产到Kafka
案例:监控某个目录下的文件内容,将消息生产到Kafka中。
此案例使用到“TailFile”和“PublishKafka_1_0”处理器。
(资料图片)
一、配置“TailFile”处理器
创建“TailFile”处理器并配置:
注意:以上需要在NiFi集群中的每个节点上创建“/root/test/logdata”文件,“logdata”是文件,而非目录。
二、配置“PublishKafka_1_0”处理器
“PublishKafka_1_0”处理器作用是使用Kafka 1.0生产者API将FlowFile的内容作为消息发送给Apache Kafka。发送的内容可以是单独的FlowFile,也可以通过用户指定分隔符分割的FlowFile内容。
关于“PublishKafka_1_0”处理器的“Properties”主要配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
---|---|---|---|
Kafka Brokers(Kafka节点) | localhost:9092 | 逗号分割的Kafka集群Broker列表。格式:host:port | |
Topic Name(topic 名称) | 将消息生产到的Topic 名称。 | ||
Delivery Guarantee(数据传递保证) | 0 | 指定保证消息被发送到Kafka的要求。对应Kafka的"acks"属性。可以配置的项如下:Best Effort (尽力交付,相当于ack=0):在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。这提供了最好的性能,但可能会导致数据丢失。例如:消息写出到Kafka节点,但是对应节点挂掉,这时将消息路由到成功。Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):KafkaProducer把消息发送出去,至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。该情况下,如果follower没有成功备份数据,而此时leader刚好又挂掉了,就会导致消息丢失。该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。Guarantee Replicated Delivery(保证复制交付,相当于ack=-1):FlowFile数据写出后,Kafka topic ISR列表离跟leader保持同步的那些follower都要把消息同步过去,该消息才会被认为成功,否则路由到失败。 | |
Use Transactions(使用事务) | true | ▪true▪false | 指定NiFi是否应该在与Kafka通信时提供事务性保证。如果发送数据到Kafka有问题,并且这个属性设置为false,那么已经发送到Kafka的消息将继续发送,并被传递给消费者。如果这个设置为true,那么Kafka事务将被回滚,这样这些消息对消费者是不可用的。将此设置为true需要将 |
在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。这提供了最好的性能,但可能会导致数据丢失。例如:消息写出到Kafka节点,但是对应节点挂掉,这时将消息路由到成功。
Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):KafkaProducer把消息发送出去,至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。该情况下,如果follower没有成功备份数据,而此时leader刚好又挂掉了,就会导致消息丢失。该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。 Guarantee Replicated Delivery(保证复制交付,相当于ack=-1): FlowFile数据写出后,Kafka topic ISR列表离跟leader保持同步的那些follower都要把消息同步过去,该消息才会被认为成功,否则路由到失败。 Use Transactions(使用事务)true true false 指定NiFi是否应该在与Kafka通信时提供事务性保证。如果发送数据到Kafka有问题,并且这个属性设置为false,那么已经发送到Kafka的消息将继续发送,并被传递给消费者。如果这个设置为true,那么Kafka事务将被回滚,这样这些消息对消费者是不可用的。将此设置为true需要将
“PublishKafka_1_0”处理器配置如下:
1、创建“PublishKafka_1_0”处理器
2、配置“PROPERTIES”
注意:以上topic 可以在Kafka中创建好,也可以执行时自动创建。
3、连接“TailFile”处理器和“PublishKafka_1_0”处理器
连接“TailFile”处理器和“PublishKafka_1_0”处理器,并设置“PublishKafka_1_0”处理器“failure”和“success”路由关系为自动终止。
三、运行测试
1、启动Kafka集群,启动NiFi处理流程
2、向/root/test/logdata文件中写入数据并保存
向NiFi集群中的其中一台节点的“logdata”中写入以下数据即可
[root@node1 test]# echo "hello world1" > /root/test/logdata[root@node1 test]# echo "hello world2" >> /root/test/logdata[root@node1 test]# echo "hello world3" >> /root/test/logdata
3、查看Kafka中自动创建的“nifi_topic”中的数据
以上数据每写入一行,有个空行,这是由于“TailFile”处理器监控数据导致的,实际就是写入了3条数据,可以通过后期业务处理时,对数据进行trim处理即可。
关键词:
[责任编辑:xwzkw]
相关阅读
- (2023-03-09)观热点:大数据NiFi(二十一):监控日志文件生产到Kafka
- (2023-03-09)钕铁硼磁铁断裂的原因
- (2023-03-09)【速看料】防溺水知识 文字_防溺水知识资料文字
- (2023-03-09)最资讯丨怎么查电脑现在的功耗_查看自己电脑功耗方法是什么
- (2023-03-08)天府农博岛迎来沐春季
- (2023-03-08)世界热议:东北大米的特点是什么_东北大米的特点
- (2023-03-08)当前聚焦:上海车展上市 东风标致408X将于3月21日开启预售
- (2023-03-08)【焦点热闻】幸福触手可及播出时间
- (2023-03-08)三八节,7or9用一支舞表达“舒适基本式”
- (2023-03-08)天生“悦”野,奇瑞TJ-1操稳测试露出,这把“稳”了
- (2023-03-08)每日聚焦:谢尔比GT350H福特赛车野马绝非小马
- (2023-03-08)与广大女性共同成长,短视频情感栏目“曼曼来了”将进行全新品牌升级
- (2023-03-08)爱巢测—AMH居家检测,女性健康不容忽视的检测项目
- (2023-03-08)轮胎上找不到生产日期是什么原因_轮胎上找不到生产日期
- (2023-03-08)全球动态:姜子牙被称为姜太公的原因
- (2023-03-08)全球观点:03月08日09时山西运城疫情数据 阳了以后为什么会腰疼?应该怎么办?
- (2023-03-08)世界观察:孤门一辉
- (2023-03-08)南京市江宁区人社局领导一行莅临云生集团走访交流
- (2023-03-08)职场内外貌受到指责的女性比率达到23%
- (2023-03-08)全国政协委员、四川银行董事长林罡:数字化融入发展血脉
- (2023-03-08)夏雨主演的有什么好看的电影
- (2023-03-08)天天观热点:网红板块3月7日跌2.21%,新华制药领跌,主力资金净流出13.42亿元
- (2023-03-08)【播资讯】樊长使
- (2023-03-08)全球今日报丨文件夹无法删除的解决方法是什么_文件夹无法删除的解决方法
- (2023-03-07)环球时讯:吉林工商学院几本
- (2023-03-07)老爷爷疑惑查血糖不戳手指,医生详解静脉血和指尖血“门道”
- (2023-03-07)焦点快播:最多九套全新外观 《星战幸存者》豪华版内容介绍
- (2023-03-07)大玉儿传奇
- (2023-03-07)港珠澳大桥出入境人数已超71万人次
- (2023-03-07)赢渠梁怎么死的