首页 / 大数据 / 大数据篇:Flume
大数据篇:Flume
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了大数据篇:Flume,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7532字,纯文字阅读大概需要11分钟。
内容图文
![大数据篇:Flume](/upload/InfoBanner/zyjiaocheng/1228/3836a3666d464369aafc11ae5d77b4f6.jpg)
大数据篇:Flume
Flume是什么?
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
如果没有Flume
数据的采集发送怎么处理呢?处理到哪里呢?Flume最主要的作用就是实时读取服务器本地磁盘数据,写入Hdfs或Kafka等中间件。
1 基础架构
-
Agent主要由:source、channel、sink三个组件组成.
-
Source:
- 从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro(Flume对接Flume),Exec(命令行如tail -f),Taildir(目录本地文件),Kafka等。
-
Channel:
- channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数量的source和sink链接,支持的类型有: JDBC , File System,Memory等。
-
sink:
- sink将数据channels消费数据(events)并将其传递给目标地,目标地可能是另一个sink,Flume提供多种数据发送的方式,比如Avro,HDFS,Hive,Kafka。
-
Event
- Flume以事件的形式将数据从源头传送到最终的目的
- Event是数据传输的基本单元
- Event由Header和Body两部分组成,Header用来存放该Event的一些属性(K-V结构),Body存放数据(Byte Array结构)。
2 案例演示
2.1 netcat->Memory->Logger
- 通过netcat工具向本机44444端口发送数据
- Flume监控本机44444端口读取数据
- Flume将获取数据打印到控制台
- 安装netcat工具
yum -y install nc
#监听44444端口(服务端)
nc -lk 44444
#监听44444端口(客户端)
nc localhost 44444
#互相发送数据接收即可
- 创建Agent配置文件flume-netcat-logger.conf
vim flume-netcat-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sinks相关配置
a1.sinks.k1.type = logger
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
#普通写法
flume-ng agent --conf /etc/flume-ng/conf --conf-file flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
#简写
flume-ng agent -c /etc/flume-ng/conf -f flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
2.2 .log本地文件->Memory->Hdfs
- 生成本地日志文件
- Flume获取本地数据文件
- Flume将获取的文件发送到Hdfs
- 创建Agent配置文件flume-log-hdfs.conf
vim flume-log-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/flume-test/logs/a.log
# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/events/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134210000
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-log-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/logs
echo "1" > /root/flume-test/logs/a.log
echo "2" >> /root/flume-test/logs/a.log
echo "3" >> /root/flume-test/logs/a.log
echo "4" >> /root/flume-test/logs/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" >> /root/flume-test/logs/a.log
echo "6" >> /root/flume-test/logs/a.log
echo "7" >> /root/flume-test/logs/a.log
echo "8" >> /root/flume-test/logs/a.log
2.3 本地文件夹->Memory->Hdfs
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取的文件发送到Hdfs
- 创建Agent配置文件flume-file-hdfs.conf
vim flume-file-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/flume-test/dirlogs
#忽略文件
a1.sources.r1.ignorePattern = ([^ ]*\.txt)
# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/dirlogs/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = log-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134210000
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/dirlogs
echo "1" > /root/flume-test/dirlogs/a.log
echo "2" >> /root/flume-test/dirlogs/a.log
echo "3" > /root/flume-test/dirlogs/a.txt
echo "4" >> /root/flume-test/dirlogs/a.txt
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/dirlogs/b.log
echo "6" >> /root/flume-test/dirlogs/b.log
#采用cp直接放入一个写好的文件测试效果
不能在监控目录中创建并持续修改文件
上传完成的文件以.COMPLETED结尾
被监控文件夹500毫秒扫描一次文件变动
2.4 本地文件夹->Memory->Logger
监控目录下的实时追加文件
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取数据打印到控制台
- 创建Agent配置文件flume-files-logger.conf
vim flume-files-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = TAILDIR
#位置信息
a1.sources.r1.positionFile = /root/flume-test/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/flume-test/test1/a.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /root/flume-test/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
# sinks相关配置
a1.sinks.k1.type = logger
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-files-logger.conf -n a1 -Dflume.root.logger=INFO,console
- 创建本地文件
mkdir /root/flume-test/test1/
mkdir /root/flume-test/test2/
echo "1" > /root/flume-test/test1/a.log
echo "2" >> /root/flume-test/test1/a.log
echo "3" >> /root/flume-test/test1/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/test2/b.log
echo "6" >> /root/flume-test/test2/b.log
echo "7" >> /root/flume-test/test2/b.log
#停止flume,追加数据,在启动测试断点续传效果。
2.5 netcat->Memory->kafka
- 生成本地文件夹及文件数据
- Flume获取本地数据文件
- Flume将获取数据打印到控制台
- 创建Agent配置文件flume-files-kafka.conf
vim flume-file-kafka.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sinks相关配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = top-test
a1.sinks.k1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100
# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
- 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-kafka.conf -n a1 -Dflume.root.logger=INFO,console
- 启动kafka消费者
kafka-console-consumer --topic top-test --bootstrap-server cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --from-beginning --group g1
- 使用netcat
nc localhost 44444
原文:https://www.cnblogs.com/ttzzyy/p/12638360.html
内容总结
以上是互联网集市为您收集整理的大数据篇:Flume全部内容,希望文章能够帮你解决大数据篇:Flume所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。