开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4473字,纯文字阅读大概需要7分钟。
内容图文
![开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新](/upload/InfoBanner/zyjiaocheng/866/22c59e7ab9424fc790d74b60534924b8.jpg)
开源 ETL 工具 DataX 实践,从mysql 到不同结构的另一个mysql的全量同步和批量更新
链接: datax官方项目地址 查看全量同步 查看批量更新
实践步骤:
参照官方文档,采用方法一部署
如果点击下载没反应,手动复制地址,把http换成https
下载解压完成,运行自检脚本
File “datax.py”, line 114 print readerRef 。因为我电脑安装的是python3 ,脚本里是python2语法
修改下 datax.py 中 114行后面的print print xx 改为 print(xx)
try except Exception, e: 改为 try except Exception as e
修改完datax.py后再次运行
成功!
参考官方文档,编写一个从一个mysql到另一个mysql的全量同步
先准备数据库
1、建两张表 datax_src和datax_target 表 ,就两个字段,其中第二个字段名称不一样 ,案列演示 从datax_src读取数据,写入到datax_target中
-- datax_src 表 字段 id、src_name
CREATE TABLE `datax_src` (
`id` bigint NOT NULL,
`src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- datax_target 表 字段 id、target_name
CREATE TABLE `datax_target` (
`id` bigint NOT NULL,
`target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
2、创建一个函数,向datax_src插入5000条数据
CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int
DETERMINISTIC
BEGIN
DECLARE index_num int DEFAULT 0;
WHILE index_num < 5000
DO
SET index_num = index_num + 1;
INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num));
END WHILE;
RETURN 0;
END
编写job json文件
按照 文档中的 mysqlreader 、mysqlwriter 的示例修改
1、修改mysqlreader的示例 从datax_src同步抽取数据到本地:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
将json文件放在job文件夹中,运行
抛异常了:
DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确
可是密码明明是正确的。然后想着修改json文件换了一个数据库,再执行,成功了!
发现区别就是mysql的版本不一样 mysql5.0.27的执行成功了, 失败这个库是mysql8.0.22。
所以看了下datax目录结构,发现连接器位置如下图。 找了下连接器jar包,把reader和writer文件夹中的mysql-connector5换成了8(这里只截图了reader)
再次执行json文件job:成功执行
2、修改mysqlwriter的示例 ,结合mysqlreader ,把读写合并,
完整的 json文件如下 ,具体参数的含义。官方文档中有详细介绍
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from target_name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
执行!成功!!
查看 datax_target 表: 数据已成功写入
以上是全表数据同步
参考官方文档,再编写一个从一个mysql到另一个mysql的批量更新
下面通过自定义querySql实现一下部分更新的操作
1、更新datax_src表的部分数据,更新id小于10的9条数据
datax 的 job 的 json文件如下: 加了一个 querySql 参数,并把 writer 中的 writeMode 换成 update
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"querySql": [
"select id,src_name from datax_src where id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from datax_target"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
执行,成功
查看下datax_target中的数据:也已被成功更新
内容总结
以上是互联网集市为您收集整理的开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新全部内容,希望文章能够帮你解决开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。