Flume1.7.0新增taildirSource组件
江左梅郎2017/04/21         
相对于flume 1.6.0版本,flume 1.7.0 推出了 taildirSource 组件,通过tail 监控正则表达式匹配目录下的所有文件,实现断点续传。
发现taildirSource重复采集数据问题
flume 1.7.0官方的 taildirSource 对于log4j 日志的监控会有bug。
因为log4j 日志会自动切分,可以按天或者按小时进行切分,log4j 切分日志其实就是新建一个文件,然后修改原来的日志文件名称。但是 taildirSource 组件是不支持修改文件名称的,如果文件被修改名称了,那么taildirSource会认为是一个新的文件,就会重新读取该文件中的数据,这就导致了日志文件重读,造成数据重复采集问题。
解决taildirSource重复采集数据问题
我们通过阅读源码发现里面存在bug,我们只需要修改几处源码就可以解决这个bug问题。
首先从flume官方下载flume1.7源码,找到这个目录apache-flume-1.7.0-src\flume-ng-sources\flume-taildir-source\src\main\java\org\apache\flume\source\taildir,然后将改目录下的java文件导入IDE。
1、修改ReliableTaildirEventReader.java 类
1)
修改ReliableTaildirEventReader构造方法里面的updateTailFiles(skipToEnd)方法。
找到如下代码:
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { //skipToEnd如果没有记录读取位置时,是否跳过文件结尾,默认false long startPos = skipToEnd ? f.length() : 0; //根据f具体文件生成TailFile tf = openFile(f, headers, inode, startPos); }
将其修改为:
if (tf == null) { //skipToEnd如果没有记录读取位置时,是否跳过文件结尾,默认false long startPos = skipToEnd ? f.length() : 0; //根据f具体文件生成TailFile tf = openFile(f, headers, inode, startPos); }
去掉了
!tf.getPath().equals(f.getAbsolutePath())判断条件。
原因解释:
如果加上
!tf.getPath().equals(f.getAbsolutePath()判断条件会出现bug,因为当目录下的文件名修改后,修改前的文件绝对路径tf.getPath()与修改后的文件绝对路径f.getAbsolutePath()肯定会不一致,这时if判断条件为true,则会进入if子句,此时startPos默认为0, 又从0的位置开始采集该文件,会造成重复采集该文件。
2)修改ReliableTaildirEventReader构造方法里面的loadPositionFile(positionFilePath)方法。
找到如下代码:
if (tf != null && tf.updatePos(path, inode, pos)) {
//更新inode与tf映射
tailFiles.put(inode, tf);
}
修改为
if (tf != null && tf.updatePos(tf.getPath(), inode, pos)){
//更新inode与tf映射
tailFiles.put(inode, tf);
}
将updatePos方法中的path修改为了tf.getPath(),强制修改前的文件路径跟修改后的文件路径一致。
原因解释:
如果某一个文件名称修改了,判断tf不为空并且tf与f不一致,这时if为false不会进入子句,就不会更新inode pos数据,后续会造成重复采集数据。
Flume项目打包编译
1)通过mvn package 对上述两个模块进行源码编译生成flume-taildirsource.jar
2)将flume-taildirsource.jar上传到flume lib目录下即可生效
Flume taildirSource测试
Flume配置文件behavior.conf如下所示:
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /home/hadoop/app/flume/checkpoint/behavior/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/hadoop/app/tomcat/logs/behavior/behavior-json.log.* a1.sources.r1.fileHeader = true a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/hadoop/app/flume/checkpoint/behavior a1.channels.c1.dataDirs = /home/hadoop/app/flume/data/behavior a1.channels.c1.capacity = 9000000 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
直接使用官网的TAILDIR source类型。
监控目录/home/hadoop/app/tomcat/logs/behavior/下存在如下文件:
1、bug修改前的测试
1)启动flume之后正常采集/home/hadoop/app/tomcat/logs/behavior/目录下的数据
2)然后我们在/home/hadoop/app/flume/checkpoint/behavior/taildir_position.json文件中可以看到:
[hadoop@txy002 behavior]$ cat taildir_position.json
[{"inode":1580777,"pos":201,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-51"},
{"inode":1580781,"pos":2127,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-58"},
{"inode":1580908,"pos":372,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log"}]
此文件记录每个采集文件信息:
Inode:每个文件的唯一标识(即使文件名称改变,此标识也不会变)
Pos:文件采集便宜量
File:文件绝对路径
3)此时我们查看其中一个文件behavior-json.log采集到/home/hadoop/app/flume/data/behavior目录下的数据
4)然后将behavior-json.log文件名称修改为behavior-json.log.201704201632
5)/home/hadoop/app/flume/checkpoint/behavior/taildir_position.json文件内容会改变
[hadoop@txy002 behavior]$ cat taildir_position.json
[{"inode":1580777,"pos":201,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-51"},
{"inode":1580781,"pos":2127,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-58"},
{"inode":1580908,"pos":372,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.201704201632"}]
注意此时inode:1580908没有变,只不过后面的文件名称变了。
6)此时可以发现文件名称修改后,数据有重新采集了一遍,这是我们不希望发生的。
2、bug修改后的测试
1)修改配置文件:
[hadoop@txy002 behavior-flume]$ cat conf/behavior.conf a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = com.djt.flume.source.TaildirSource a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /home/hadoop/app/flume/checkpoint/behavior/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/hadoop/app/tomcat/logs/behavior/behavior-json.log.* a1.sources.r1.fileHeader = true a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/hadoop/app/flume/checkpoint/behavior a1.channels.c1.dataDirs = /home/hadoop/app/flume/data/behavior a1.channels.c1.capacity = 9000000 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
com.djt.flume.source.TaildirSource 为我们修改后的入口类
2)启动flume采集数据
新增加一个采集文件behavior-json.log.201704201850
3)然后我们在/home/hadoop/app/flume/checkpoint/behavior/taildir_position.json文件中可以看到:
[hadoop@txy002 behavior]$ cat taildir_position.json
[{"inode":1580777,"pos":201,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-51"},
{"inode":1580781,"pos":2127,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-58"},
{"inode":1580908,"pos":372,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log"},
{"inode":1578636,"pos":372,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.201704201850"}]
4)查看目录/home/hadoop/app/flume/data/behavior下采集到的数据
5)将behavior-json.log.201704201850修改为behavior-json.log.201704201900
6)然后我们在/home/hadoop/app/flume/checkpoint/behavior/taildir_position.json文件中可以看到:
[hadoop@txy002 behavior]$ cat taildir_position.json
[{"inode":1580777,"pos":201,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-51"},
{"inode":1580781,"pos":2127,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.2017-02-19-21-58"},
{"inode":1580908,"pos":372,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log"},
{"inode":1578642,"pos":373,"file":"/home/hadoop/app/tomcat/logs/behavior/behavior-json.log.201704201900"}]
7)此时再查看目录/home/hadoop/app/flume/data/behavior下采集到的数据
Flume没有因为修改文件名称而重复采集数据,成功修复了flume taildirSourcebug问题。
相关资料:
6本经典Hadoop书籍,Hadoop学习者必读!
10份精美程序员简历模板,Hadoop,Java,Web前端,Android,PHP,C++,.Net等
【Hadoop学习手册】Hadoop学习常见问题及解决办法
这才是学习Linux的正确姿势【附Linux资料】
10年Java老兵典藏资料吐血奉献,转发还有额外惊喜
100道常见Hadoop面试/笔试题及答案解析
100道经典Java面试题及答案解析
学习Scala的注意了,这些资料你一定不能错过(视频、图文、免费课程等)
关注“大数据研习社”后,即可免费领取!