FLUME安装&环境(二):拉取MySQL数据库数据到Kafka
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了FLUME安装&环境(二):拉取MySQL数据库数据到Kafka,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2370字,纯文字阅读大概需要4分钟。
内容图文
![FLUME安装&环境(二):拉取MySQL数据库数据到Kafka](/upload/InfoBanner/zyjiaocheng/918/b628e166e776462c946c03e010e7c894.jpg)
Flume安装成功,环境变量配置成功后,开始进行agent配置文件设置。
1.agent配置文件(mysql+flume+Kafka)
#利用Flume将MySQL表数据准实时抽取到Kafka
a1.channels = c1
a1.sinks = k1
a1.sources = s1
#sources(mysql)
a1.sources.s1.type = org.keedio.flume.source.SQLSource
a1.sources.s1.channels = c1
a1.sources.s1.connection.url = jdbc:mysql://192.168.121.4:3306/alarm
a1.sources.s1.user = root
a1.sources.s1.password = root
a1.sources.s1.table = alarm_query
a1.sources.s1.columns.to.select = *
a1.sources.s1.incremental.column.name = id
a1.sources.s1.incremental.value = 0
a1.sources.s1.run.query.delay=5000
#source状态写入路径(必须存在且可写入)
a1.sources.s1.status.file.path = /opt/apps/flume-1.6.0-cdh5.14.4-bin
a1.sources.s1.status.file.name = sqlsource.status
#channels(memory)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sinks(kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# kfk29,kfk30,kfk31对应主机hosts配置的Kafka主机
a1.sinks.k1.brokerList= D-QP-Safe-4:9092, D-QP-Safe-5:9092, D-QP-Safe-6:9092
a1.sinks.k1.topic=qpdy
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
a1.sinks.k1.channel = c1
2.配置准备
2.1创建flume状态写入的文件夹和文件
mkdir /var/lib/flume
vi s1.status
给文件写入的权力 chmod 777 s1.status
2.2将flume内存空间设置增大(开始时没有进行设置,结果报了内存溢出的错误)
在flume启动脚本flume-ng中,修改JAVA_OPTS="-Xmx20m"为JAVA_OPTS="-Xmx10240m"
此处将堆内存的阈值跳转到了10G,实际生产环境中可以根据具体的硬件情况作出调整
2.3添加主机对应的kafka主机
(flume.conf配置文件需要添加主机对应的Kafka主机,否则无法找到对应的sink)
# vim /etc/hosts
#添加主机对应的kafka主机
192.168.241.229 D-QP-Safe-4
192.168.241.230 D-QP-Safe-5
192.168.241.231 D-QP-Safe-6
2.4向flume安装目标的/lib目录下添加启动mysql,Kafka等的jar包
3.启动flume
要在flume的安装目录的bin目录下启动
#启动命令
flume-ng agent -c /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf -f /opt/apps/flume-1.6.0-cdh5.14.4-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console
a1为配置的agent名,-c和-f后是flume的安装路径(必须一致才能启动成功)
4.查看flume进程
ps -aux | grep flume
如果存在多个进程必须将多余进程kill
为了避免一个个的kill,我们需要提取flume的进程号:
ps -aux | grep flume | awk '{print $2}'
然后全部删除
ps -aux | grep flume | awk '{print $2}' | xargs kill
以上,拉取mysql数据库数据到Kafka就配置好了
内容总结
以上是互联网集市为您收集整理的FLUME安装&环境(二):拉取MySQL数据库数据到Kafka全部内容,希望文章能够帮你解决FLUME安装&环境(二):拉取MySQL数据库数据到Kafka所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。