最近flink已经变得比较流行了,所以大家要了解flink并且使用flink。现在最流行的实时计算应该就是flink了,它具有了流计算和批处理功能。它可以处理有界数据和无界数据,也就是可以处理永远生产的数据。具体的细节我们不讨论,我们直接搭建一个flink功能。总体的思路是source -> transform -> sink,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,...
对于mysql,redis,Kafka,zookeeper磁盘缓存技术使用分析
大部分组件是基于磁盘存储的,但由于CPU速度和磁盘速度之间的鸿沟,都会使用缓存技术来提高性能,缓存简单来说就是一块内存区域,首先将从磁盘读到的数据放在缓存中,之后查询或修改时直接操作缓存,对于缓存中的数据则以一定的频率刷新到磁盘上,怎样缓存,缓存多少,何时刷新,这些影响着整个组件的性能。在看过一些关于mysql等组件的架构原理后,会发现不论是基于磁盘的...
/** *** @autor gaowei* @Date 2020-04-13 17:59 */
object kafkaToMysqlTest {class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {var conn: Connection = _override def open(partitionId: Long, epochId: Long): Boolean = {Class.forName("com.mysql.jdbc.Driver")conn = DriverManager.getConnection(url, user, pwd)true}override def process(value: Row): Unit = {val p = conn.prepar...
启动MySQL
创建maxwell的数据库和用户
在MySQL中创建一个测试数据库和表
前面三个步骤详见 Maxwell读取MySQL binlog日志通过stdout展示
启动Zookeeper
[hadoop@hadoop001 ~]$ cd $ZK_HOME/bin
[hadoop@hadoop001 bin]$ ./zkServer.sh start
启动kafka,并创建主题为maxwell的topic
[hadoop@hadoop001 bin]$ cd $KAFKA_HOME
//查看kafka版本,防止maxwell不支持
[hadoop@hadoop001 kafka]$ find ./libs/ -name \*kafka_\* | head -1...
canal本质就是"冒充"从库,通过订阅mysql bin-log来获取数据库的更改信息。
mysql配置(my.cnf)
mysql需要配置my.cnf开启bin-log日志并且将bin-log日志格式设置为row, 同时为了防止bin-log日志占用过多磁盘,可以设置一下过期时间,
[mysqld]
log-bin=mysql-bin # 打开binlog
binlog-format=ROW # ROW格式
server_id=1 # mysql Replication 需要设置 在mysql集群里唯一expire_logs_days=7 # binlog文件保存7天
max_binlog_size = 500m ...
一.平台环境介绍:1.系统信息:项目信息系统版本:Ubuntu14.04.2 LTS \n \l用户:*****密码:******Java环境:openjdk-7-jre语言:en_US.UTF-8,en_US:en磁盘:每台vda为系统盘(50G),vdb为数据盘(200G)挂载于/storage目录hcloud15最为DB,第二块磁盘为2000G主机范围:192.168.21.7~192.168.21.15,192.168.21.17,192.168.21.18(11台)主机名:以IP地址为准,依次为hcloud07~hcloud182.服务组件分布:服务名称分布主机Zookeeperhcl...
0、题记
实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。而mysql写入kafka的选型方案有:方案一:logstash_output_kafka 插件。方案二:kafka_connector。方案三:debezium 插件。方案四:flume。方案五:其他类似方案。其中:debezium和flume是基于mysql binlog实现的。
如果需要同步历史全量数据+实时更新数据,建议使用logstash。
1、logstash同步原理
常用的...
download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.tar.gztar zxvf jdk-8u45-linux-x64.tar.gz
cd jdk-8u45-linux-x64
sudo vi /etc/profile添加如下内容:
export JAVA_HOME=/home/dir/jdk1.8.0_45
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin安装 Python sudo apt-get install python安装 zookeeper
wget http://mirror.b...
"account": "001","accountName": "旺财宝","subaccount": "001","subaccountName": "caller001","timestamp": 1474625187000,"eventType": "phone","eventTags": [{"name": "incoming","value": 1},{"name": "missed","value": 1},{"name": "edited","value": 1}]
}最终通过Storm,在Mysql中汇聚成如下格式 account
account_name
subaccount
subaccount_name
event_type
event_tag
start_time
end_time
count001
旺财宝
phone
...
private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181"private lazy val (table, conn) = createConnectiondef bulk(items:Iterator) = {items.foreach(conn.put(_))conn.flush....} ......
}然后保证这个类在map,foreachRDD等函数下使用,譬如:dstream.foreachRDD{ rdd =>rdd.foreachPartition{iter=>SimpleHBaseClient.bulk(iter) }
}为什么要保证放到foreachRDD/map 等这些函数里呢?Spark的机制是先将用户的程序作为一个...
一、软件环境:
操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口
二、环境部署
1、安装jdk
export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/l...
一. 概述
在大数据的静态数据处理中,目前普遍采用的是用 Spark + Hdfs (Hive / Hbase) 的技术架构来对数据进行处理。
但有时候有其他的需求,需要从其他不同数据源不间断得采集数据,然后存储到 Hdfs 中进行处理。而追加(append)这种操作在 Hdfs 里面明显是比较麻烦的一件事。所幸有了 Storm 这么个流数据处理这样的东西问世,可以帮我们解决这些问题。
不过光有 Storm 还不够,我们还需要其他中间件来协助我们,让所有其他数...
(开始时没有进行设置,结果报了内存溢出的错误)
在flume启动脚本flume-ng中,修改JAVA_OPTS="-Xmx20m"为JAVA_OPTS="-Xmx10240m"
此处将堆内存的阈值跳转到了10G,实际生产环境中可以根据具体的硬件情况作出调整
2.3添加主机对应的kafka主机
(flume.conf配置文件需要添加主机对应的Kafka主机,否则无法找到对应的sink)
# vim /etc/hosts
#添加主机对应的kafka主机
192.168.241.229 D-QP-Safe-4
192.168.241.230 D-QP-Safe-5...
接着上一篇,将mysql的数据导入kafka中
public static void main(String[] arg) throws Exception {TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO };RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://*:3306/tablename?characterEnco...
1. 实验环境
CPU:4
内存:8G
ip:192.168.0.187
开启iptables防火墙
关闭selinux
java >=1.5
使用yum方式安装的java,提前配置好JAVA_HOME环境变量
vim /etc/profile.d/java.sh#!/bin/bashexport JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk # 路径根据实际情况而定export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile.d/java.sh
2. MySQL信息
mysql账号
root
MySQL密码
liykpntuu9?C
操作
vim /etc/my.cnf
[mysqld]
log-bin=mysq...