首页 / 更多教程 / 数据采集的flume架构
数据采集的flume架构
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了数据采集的flume架构,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7437字,纯文字阅读大概需要11分钟。
内容图文
![数据采集的flume架构](/upload/InfoBanner/zyjiaocheng/1329/c492754f6b514fe4b348076ebd59e83f.jpg)
测试1:
新建一个flume1.conf文件
name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
source
a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 6666
channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = atguigu
a1.sinks.k1.kafka.bootstrap.servers = DAQ102:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.useFlumeEventFormat = false
bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
测试2:
使用选择器,将不同的数据添加到不同的topic中
拦截器代码:
package com.hybg.daq;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//原理:根据body的数据包含什么内容在header中添加什么内容
//获取header文件
Map<String, String> headers = event.getHeaders();
//获取body文件
String string = new String(event.getBody(), StandardCharsets.UTF_8);
//判断是否包含某个文件
if(string.contains("atguigu")){
headers.put("topic","atguigu");
}else if(string.contains("shangguigu")){
headers.put("topic","shangguigu");
}else {
headers.put("topic","other");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
source
a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 4444
配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hybg.daq.MyInterceptor$MyBuilder
channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = other
a1.sinks.k1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.useFlumeEventFormat = false
bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
测试三:
name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.batchSize = 5000
a1.sources.source1.batchDurationMillis = 2000
a1.sources.source1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.sources.source1.kafka.topics = atguigu,shangguigu,other
a1.sources.source1.kafka.consumer.group.id = customs
channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
sinks
a1.sinks.k1.type = logger
bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
①三个组件都有
name
a1.sources = r1
a1.channels = c1
a1.sinks=k1
sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 5555
channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.channels.c1.kafka.topic = atguigu
a1.channels.c1.parseAsFlumeEvent = false
sink
a1.sinks.k1.type = logger
bind
a1.sources.r1.channels = c1
a1.sinks.r1.channel = c1
②有source有channel
name
a1.sources = r1
a1.channels = c1
sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = DAQ102
a1.sources.r1.port = 5555
channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.channels.c1.kafka.topic = atguigu
a1.channels.c1.parseAsFlumeEvent = false
bind
a1.sources.r1.channels = c1
③有sink有channel
name
a1.channels = c1
a1.sinks=k1
channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092,DAQ104:9092
a1.channels.c1.kafka.topic = atguigu
a1.channels.c1.parseAsFlumeEvent = false
sink
a1.sinks.k1.type = logger
bind
a1.sinks.r1.channel = c1
数据采集模块:
第一层flume:
name
a1.sources = r1
a1.channels = c1
sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
将文件夹中所有的app的文件都进行读取
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.batchSize = 1000
设置断点续传的文件存储位置
a1.sources.r1.positionFile =/opt/module/flume-1.9.0/position/taildir_position.json
拦截器j
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hybg.daq.TaildirInterceptor$MyBuilder
channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
bind
a1.sources.r1.channels = c1
第二层flume:
name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
source
a2.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a2.sources.r1.batchSize = 1000
a2.sources.r1.batchDurationMillis = 2000
a2.sources.r1.kafka.bootstrap.servers = DAQ102:9092,DAQ103:9092
a2.sources.r1.kafka.topics = topic_log
a2.sources.r1.useFlumeEventFormat = false
拦截器
设置拦截器后可能会导致错误,出现timestrap不能读取的问题
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = com.hybg.daq.timeInterceptor$MyBuilder
channel
a2.channels.c1.type = file
文件的
a2.channels.c1.dataDirs = /opt/module/flume-1.9.0/jobs/filechannel
a2.channels.c1.capacity = 1000000
a2.channels.c1.transactionCapacity = 10000
a2.channels.c1.checkpointDir = /opt/module/flume-1.9.0/jobs/checkpointdir
a2.channels.c1.keep-alive = 3
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1. backupCheckpointDir = /otherdiskdir
sinks
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a2.sinks.k1.hdfs.filePrefix = log-
a2.sinks.k1.hdfs.round = false
a2.sinks.k1.hdfs.roundValue = 10
a2.sinks.k1.hdfs.rollSize = 134217728
a2.sinks.k1.hdfs.rollCount = 0
控制输出文件是原生文件。
a2.sinks.k1.hdfs.fileType = CompressedStream
a2.sinks.k1.hdfs.codeC = lzop
bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
启动脚本2:
!/bin/bash
if [ $# -lt 1 ]
then
echo “<<<<<<<<<<<<<<<<<<<<输入有效参数>>>>>>>>>>>>>>>>>>>>”
echo “{start,stop}”
exit
fi
case $1 in
“start”)
echo “<<<<<<<<<<<<<<<<<<<<向HDFS传递数据>>>>>>>>>>>>>>>>>>>>”
ssh DAQ104 “flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume_2.conf -n a2 -Dflume.root.logger=INFO,console 1>/opt/module/flume-1.9.0 2>&1 &”
;;
“stop”)
echo “<<<<<<<<<<<<<<<<<<<<停止向HDFS传递数据>>>>>>>>>>>>>>>>>>>>”
ssh DAQ104 “ps -ef | grep flume_2.conf | grep -v grep | awk ‘{print $2}’ | xargs kill -9”
;;
*)
echo “<<<<<<<<<<<<<<<<<<<<参数错误>>>>>>>>>>>>>>>>>>>>”
;;
esac
————————————————
版权声明:本文为CSDN博主「海洋饼干1126」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_44868211/article/details/118104017
原文:https://www.cnblogs.com/HYBG-JXMD/p/14925311.html
内容总结
以上是互联网集市为您收集整理的数据采集的flume架构全部内容,希望文章能够帮你解决数据采集的flume架构所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。