java – 编写自定义Flume装饰器,但收到错误.我错过了什么?
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 编写自定义Flume装饰器,但收到错误.我错过了什么?,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3482字,纯文字阅读大概需要5分钟。
内容图文
![java – 编写自定义Flume装饰器,但收到错误.我错过了什么?](/upload/InfoBanner/zyjiaocheng/775/2ec3272dabd647c1b1c8a605108ef2f4.jpg)
我正在为Cloudera的分布式日志聚合系统Flume编写一个自定义装饰器插件.我的Java代码如下:
package multiplex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
private final String serverName;
private final String logType;
public JsonMultiplexDecorator(S s, String serverName, String logType) {
super(s);
this.serverName = serverName;
this.logType = logType;
}
@Override
public void append(Event e) throws IOException {
String body = new String(e.getBody()).replaceAll("\"", "\\\"");
String json = "{ \"server\": \"" + this.serverName + "\"," +
"\"log_type\": \"" + this.logType + "\", " +
"\"body\": \"" + body + "\" }";
EventImpl e2 = new EventImpl(json.getBytes(),
e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
e.getAttrs());
super.append(e2);
}
public static SinkDecoBuilder builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink> build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 2,
"usage: multiplexDecorator(serverName, logType)");
return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
}
};
}
public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
List<Pair<String, SinkDecoBuilder>> builders =
new ArrayList<Pair<String, SinkDecoBuilder>>();
builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));
return builders;
}
}
这可以很好地编译成带有ant的JAR文件,我可以在运行时将它加载到Flume中并成功配置节点以使用它.但是,当一个事件在加载了这个插件的节点上实际出现时,我会在我的日志中收到错误,如下所示:
2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
([logicalNode xxxxx]是EC2内部DNS名称的占位符).我没有很多Java经验,所以我不确定我在这里做错了什么,或者这是一个Flume错误.我应该提一下,我使用Flume源中的HelloWorld插件示例编写了这个,并且还使用了一些内置的Flume装饰器.
解决方法:
构造EventImpl e2时,您传递的e.getAttrs()是不可修改的.尝试将e.getAttrs()复制到您自己的地图中;使用新的HashMap(e.getAttrs())的浅拷贝应该就足够了.
内容总结
以上是互联网集市为您收集整理的java – 编写自定义Flume装饰器,但收到错误.我错过了什么?全部内容,希望文章能够帮你解决java – 编写自定义Flume装饰器,但收到错误.我错过了什么?所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。