利用Flume将MySQL表数据准实时抽取到HDFS
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了利用Flume将MySQL表数据准实时抽取到HDFS,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7680字,纯文字阅读大概需要11分钟。
内容图文
- use test;
- create table wlslog
- (id int not null,
- time_stamp varchar(40),
- category varchar(40),
- type varchar(40),
- servername varchar(40),
- code varchar(40),
- msg varchar(40),
- primary key ( id )
- );
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,‘apr-8-2014-7:06:16-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to standby‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,‘apr-8-2014-7:06:17-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to starting‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,‘apr-8-2014-7:06:18-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to admin‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,‘apr-8-2014-7:06:19-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to resuming‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,‘apr-8-2014-7:06:20-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000361‘,‘started weblogic adminserver‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,‘apr-8-2014-7:06:21-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to running‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);
- commit;
2. 建立相关目录与文件
(1)创建本地状态文件[plain] view plain copy
- mkdir -p /var/lib/flume
- cd /var/lib/flume
- touch sql-source.status
- chmod -R 777 /var/lib/flume
(2)建立HDFS目标目录
[plain] view plain copy
- hdfs dfs -mkdir -p /flume/mysql
- hdfs dfs -chmod -R 777 /flume/mysql
3. 准备JAR包
从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。[plain] view plain copy
- cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/
[plain] view plain copy
- cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar
4. 建立HAWQ外部表
[sql] view plain copy- create external table ext_wlslog
- (id int,
- time_stamp varchar(40),
- category varchar(40),
- type varchar(40),
- servername varchar(40),
- code varchar(40),
- msg varchar(40)
- ) location (‘pxf://mycluster/flume/mysql?profile=hdfstextmulti‘) format ‘csv‘ (quote=e‘"‘);
5. 配置Flume
在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:[plain] view plain copy
- agent.channels.ch1.type = memory
- agent.sources.sql-source.channels = ch1
- agent.channels = ch1
- agent.sinks = HDFS
- agent.sources = sql-source
- agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
- agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test
- agent.sources.sql-source.user = root
- agent.sources.sql-source.password = 123456
- agent.sources.sql-source.table = wlslog
- agent.sources.sql-source.columns.to.select = *
- agent.sources.sql-source.incremental.column.name = id
- agent.sources.sql-source.incremental.value = 0
- agent.sources.sql-source.run.query.delay=5000
- agent.sources.sql-source.status.file.path = /var/lib/flume
- agent.sources.sql-source.status.file.name = sql-source.status
- agent.sinks.HDFS.channel = ch1
- agent.sinks.HDFS.type = hdfs
- agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql
- agent.sinks.HDFS.hdfs.fileType = DataStream
- agent.sinks.HDFS.hdfs.writeFormat = Text
- agent.sinks.HDFS.hdfs.rollSize = 268435456
- agent.sinks.HDFS.hdfs.rollInterval = 0
- agent.sinks.HDFS.hdfs.rollCount = 0
属性 |
描述 |
agent.channels.ch1.type |
Agent的channel类型 |
agent.sources.sql-source.channels |
Source对应的channel名称 |
agent.channels |
Channel名称 |
agent.sinks |
Sink名称 |
agent.sources |
Source名称 |
agent.sources.sql-source.type |
Source类型 |
agent.sources.sql-source.connection.url |
数据库URL |
agent.sources.sql-source.user |
数据库用户名 |
agent.sources.sql-source.password |
数据库密码 |
agent.sources.sql-source.table |
数据库表名 |
agent.sources.sql-source.columns.to.select |
查询的列 |
agent.sources.sql-source.incremental.column.name |
增量列名 |
agent.sources.sql-source.incremental.value |
增量初始值 |
agent.sources.sql-source.run.query.delay |
发起查询的时间间隔,单位是毫秒 |
agent.sources.sql-source.status.file.path |
状态文件路径 |
agent.sources.sql-source.status.file.name |
状态文件名称 |
agent.sinks.HDFS.channel |
Sink对应的channel名称 |
agent.sinks.HDFS.type |
Sink类型 |
agent.sinks.HDFS.hdfs.path |
Sink路径 |
agent.sinks.HDFS.hdfs.fileType |
流数据的文件类型 |
agent.sinks.HDFS.hdfs.writeFormat |
数据写入格式 |
agent.sinks.HDFS.hdfs.rollSize |
目标文件轮转大小,单位是字节 |
agent.sinks.HDFS.hdfs.rollInterval |
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件 |
agent.sinks.HDFS.hdfs.rollCount |
当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件 |
表1
6. 运行Flume代理
保存上一步的设置,然后重启Flume服务,如图2所示。图2
重启后,状态文件已经记录了将最新的id值7,如图3所示。
图3
查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。
图4
查询HAWQ外部表,结果也有全部7条数据,如图5所示。
图5
至此,初始数据抽取已经完成。
7. 测试准实时增量抽取
在源表中新增id为8、9、10的三条记录。[sql] view plain copy
- use test;
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);
- commit;
图6
五、方案优缺点
利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。这种方案的缺点与其优点一样突出,主要体现在以下几方面。
- 在源库上执行了查询,具有入侵性。
- 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
- 只能识别新增数据,检测不到删除与更新。
- 要求源库必须有用于表示增量的字段。
参考:
Flume架构以及应用介绍Streaming MySQL Database Table Data to HDFS with Flume
how to read data from oracle using FLUME to kafka broker
https://github.com/keedio/flume-ng-sql-source
- v
利用Flume将MySQL表数据准实时抽取到HDFS
标签:avr 服务 java 种类 post into 复制 ora 更新
本文系统来源:http://www.cnblogs.com/hark0623/p/7083278.html
内容总结
以上是互联网集市为您收集整理的利用Flume将MySQL表数据准实时抽取到HDFS全部内容,希望文章能够帮你解决利用Flume将MySQL表数据准实时抽取到HDFS所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。