flume自定义拦截器

最近由于项目需要,将以前的数据流整合,以前提取日志文件采取java程序+kafka+flume+hive的方式对日志实施解析和实时存储,这种方式实现起来太不优雅,故现在将日志解析模块由flume 拦截器来做,使数据管道只需要从flume+hive即可。

开发环境

  • Java 1.8
  • Intelij Idea
  • Maven
  • Flume 1.6
  • Hive 1.2.1

开发步骤

新建maven项目完成后,在pom.xml中加入flume依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>

接着再project name->src->main下新建包名和自定义拦截器类,同时让其implements interceptor,接着实现拦截器中的方法,主要代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package csri.log.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

/**
* Created with IntelliJ IDEA.
* User: wutianxiong
* Date: 2017/12/23
* Time: 15:30
*/
public class WafLogInterceptor implements Interceptor {

@Override
public void close()
{
}
@Override
public void initialize()
{
}
@Override
public Event intercept(Event event)
{
byte[] eventBody = event.getBody();
//这里实现对Event body的修改
event.setBody(modifiedEvent);
return event;
}

@Override
public List<Event> intercept(List<Event> events)
{
//flume按批处理Event,先执行下面的步骤
for (Event event : events){
intercept(event);
}
return events;
}
public static class Builder implements Interceptor.Builder
{
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
}
@Override
public Interceptor build() {
return new WafLogInterceptor();
}
}
}

上述代码flume会按下面顺序处理

Builder.build -> intercept(List events) -> intercept(Event event)

在pom.xml里加入下列build语句

1
2
3
4
5
6
7
8
9
10
11
12
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

在根目录下输入

mvn clean package

即可完成打包。

修改flume conf文件

将前面步骤生成的jar放入flume lib目录下,
在flume source处增加拦截器设置

1
2
waf-agent.sources.udp-source.interceptors = wafinterceptor
waf-agent.sources.udp-source.interceptors.wafinterceptor.type = csri.log.interceptor.WafLogInterceptor$Builder //包名+类名

启动flume ,自定义的拦截器就生效了。
经过上面步骤,即可实现一个自定义的拦截器。

扩展:在flume source中使用多拦截器

由于产生的日志的设备更换,日志的格式由原先的一种升级到10多种,这给适配解析带来了难度,我们通过在source进行相应的过滤,过滤掉新的数据格式日志。
Flume可以通过正则拦截器实现过滤,同时flume也支持多拦截器。

正则拦截器

在使用 Regex Filtering Interceptor的时候一个属性是excludeEvents

  • 当它的值为true 的时候,过滤掉匹配到当前正则表达式的一行
  • 当它的值为false的时候,就接受匹配到正则表达式的一行
Property Name Default Description
type - The component type name has to be regex_filter
regex ”.*” Regular expression for matching against events
excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include.
### flume interceptor其他属性
flume 通过拦截器可以实现修改或者丢弃相应的event,同时flume也支持链式拦截器,event 通过依次通过各个拦截器,如果被某个拦截器过滤掉,那么将不会被传到下一个拦截器,链式拦截器示例如下
1
2
3
4
5
6
7
8
9
10
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

上面的例子中,event先传递到HostInterceptor,处理之后再传递到TimestampInterceptor。