flume自定义sink之mysql
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了flume自定义sink之mysql,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3484字,纯文字阅读大概需要5分钟。
内容图文
![flume自定义sink之mysql](/upload/InfoBanner/zyjiaocheng/478/8861f9b15c25408ca37c6dda66ce3d95.jpg)
package me; import static org.mockito.Matchers.booleanThat; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import com.google.common.base.Preconditions; public class MySink extends AbstractSink implements Configurable { private Connection connect; private Statement stmt; private String columnName; private String url; private String user; private String password; private String tableName; // 在整个sink结束时执行一遍 @Override public synchronized void stop() { // TODO Auto-generated method stub super.stop(); } // 在整个sink开始时执行一遍 @Override public synchronized void start() { // TODO Auto-generated method stub super.start(); try { connect = DriverManager.getConnection(url, user, password); // 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码 stmt = connect.createStatement(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // 不断循环调用 @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; txn.begin(); while (true) { event = ch.take(); if (event != null) { break; } } try { String body = new String(event.getBody()); if (body.split(",").length == columnName.split(",").length) { String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")"; stmt.executeUpdate(sql); txn.commit(); return Status.READY; } else { txn.rollback(); return null; } } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } @Override public void configure(Context arg0) { columnName = arg0.getString("column_name"); Preconditions.checkNotNull(columnName, "column_name must be set!!"); url = arg0.getString("url"); Preconditions.checkNotNull(url, "url must be set!!"); user = arg0.getString("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = arg0.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); tableName = arg0.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); } }
agent.sources = s1 agent.channels = c1 agent.sinks = sk1 agent.sources.s1.type = netcat agent.sources.s1.bind = localhost agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 agent.sinks.sk1.type = me.MySink agent.sinks.sk1.url=jdbc:mysql://192.168.16.33:3306/test agent.sinks.sk1.tableName= test.user agent.sinks.sk1.user=root agent.sinks.sk1.password=WoChu@123 agent.sinks.sk1.column_name=id, username, password agent.sinks.sk1.channel = c1 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100
lihudeMacBook-Pro:~ SunAndLi$ cd hadoop-2.7.2/flume/ lihudeMacBook-Pro:flume SunAndLi$ bin/flume-ng agent -c conf -f conf/sink-mysql --name agent -Dflume.root.logger=INFO,console
flume自定义sink之mysql
标签:http sources extends images alt 登陆 port name 结束
本文系统来源:http://www.cnblogs.com/sunyaxue/p/6645415.html
内容总结
以上是互联网集市为您收集整理的flume自定义sink之mysql全部内容,希望文章能够帮你解决flume自定义sink之mysql所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。