最近由于项目需要,将以前的数据流整合,以前提取日志文件采取java程序+kafka+flume+hive的方式对日志实施解析和实时存储,这种方式实现起来太不优雅,故现在将日志解析模块由flume 拦截器来做,使数据管道只需要从flume+hive即可。
开发环境
- Java 1.8
- Intelij Idea
- Maven
- Flume 1.6
- Hive 1.2.1
开发步骤
新建maven项目完成后,在pom.xml中加入flume依赖
1 | <dependency> |
接着再project name->src->main
下新建包名和自定义拦截器类,同时让其implements interceptor
,接着实现拦截器中的方法,主要代码如下
1 | package csri.log.interceptor; |
上述代码flume会按下面顺序处理
Builder.build -> intercept(List
events) -> intercept(Event event)
在pom.xml里加入下列build语句
1 | <build> |
在根目录下输入
mvn clean package
即可完成打包。
修改flume conf文件
将前面步骤生成的jar放入flume lib目录下,
在flume source处增加拦截器设置
1 | waf-agent.sources.udp-source.interceptors = wafinterceptor |
启动flume ,自定义的拦截器就生效了。
经过上面步骤,即可实现一个自定义的拦截器。
扩展:在flume source中使用多拦截器
由于产生的日志的设备更换,日志的格式由原先的一种升级到10多种,这给适配解析带来了难度,我们通过在source进行相应的过滤,过滤掉新的数据格式日志。
Flume可以通过正则拦截器实现过滤,同时flume也支持多拦截器。
正则拦截器
在使用 Regex Filtering Interceptor的时候一个属性是excludeEvents
- 当它的值为true 的时候,过滤掉匹配到当前正则表达式的一行
- 当它的值为false的时候,就接受匹配到正则表达式的一行
Property Name | Default | Description | ||
---|---|---|---|---|
type | - | The component type name has to be regex_filter | ||
regex | ”.*” | Regular expression for matching against events | ||
excludeEvents | false | If true, regex determines events to exclude, otherwise regex determines events to include. | ||
### flume interceptor其他属性 | ||||
flume 通过拦截器可以实现修改或者丢弃相应的event,同时flume也支持链式拦截器,event 通过依次通过各个拦截器,如果被某个拦截器过滤掉,那么将不会被传到下一个拦截器,链式拦截器示例如下 | ||||
|
上面的例子中,event先传递到HostInterceptor,处理之后再传递到TimestampInterceptor。