Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

  • 分布式与微服务

    • SpringCloud

    • SpringCloudAlibaba

    • Dubbo

    • 认证授权

    • 任务调度

      • 为什么要引入分布式任务调度系统?
      • 为什么说datax是目前最好的异构数据源数据交换工具
        • (一)什么是Datax
        • (二)Datax架构
        • (三)Datax运行原理
        • (四)DataX快速入门
        • (五)datax控制台数据同步
        • (六)datax mysql数据同步
        • (七)总结
        • 问题汇总
          • [/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件
          • 账号密码正确,报Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).
      • 如何开发一个自己的datax插件
  • 开发经验大全

  • 版本新特性

  • Java
  • 分布式与微服务
  • 任务调度
CodeEase
2023-10-11
目录

为什么说datax是目前最好的异构数据源数据交换工具

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)什么是Datax

以前我做过一个项目,其中有个需求就是每天定时把sql server中的数据同步到Mysql中,当时写了一段Java的代码来实现,一套Java代码中需要写两个数据源的连接以及两套sql的代码,十分不方便。如果还要实现Oracle、Mysql、SqlServer的互相同步,那代码逻辑就更加复杂。而且通过代码的方式,同步600万条数据要花费2个多小时,性能效率十分低下。

最近在工作中接触到了一个新的工具datax,才意识到数据同步原来还有这么简单的方式。

Datax是阿里巴巴开源的一个异构数据源离线同步工具,DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

简单来讲,datax就是可以把各个数据库之间的数据来回传输同步,并且操作起来只需要配置一下json文件就可以了。

目前Datax开源在github上:https://github.com/alibaba/DataX

# (二)Datax架构

Datax的架构采用FrameWork+plugin构建,其中:

Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework

Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲、流控、并发、数据转换等核心技术问题。

# (三)Datax运行原理

Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理

Task:由Job切分出来,是Datax的最小单元,每隔Task负责一部分数据的同步工作

Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5. TaskGroup:负责启动Task

# (四)DataX快速入门

datax的推荐系统为:

  • Linux
  • JDK(1.8以上,推荐1.8)
  • Python(推荐Python2.6.X)
  • Apache Maven 3.x (Compile DataX)

我这里就按照推荐系统进行操作。

首先我们将datax下载下来,datax的下载有两种方式,一种是直接下载压缩包,另外一种是下载源码自己手动编译,这里先展示下载压缩包的使用方式:

首先是下载datax的压缩包:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

下载下来后上传到linux服务器上,解压:

tar -zxvf datax.tar.gz
1

进入datax的bin目录,运行自检脚本

cd datax/bin/
python datax.py ../job/job.json 
1
2

运行结果如果是下面这样说明datax安装成功。

# (五)datax控制台数据同步

datax的作用就是实现异构数据库之间的数据传输,并且应用起来还比较简单,只需要配置好对应的json模板,就可以对数据进行传输。

通过下面的命令,就可以拿到datax对应的json模板,比如我现在的reader是控制台数据,writer也是控制台数据:

python datax.py -r streamreader -w streamwriter
1

就拿到了对应的模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

我们简单配置一下看看效果,效果就是在控制台输出十遍hello,world,在job目录下新建一个文件叫stream2stream.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"hello"
                            },
                            {
                                "type":"string",
                                "value":"world"
                            }
                        ], 
                        "sliceRecordCount": "10"
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "UTF-8", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

运行项目:

python datax.py ../job/stream2stream.json
1

查看效果

# (六)datax mysql数据同步

因为本地只装了mysql,就直接用mysql演示数据同步,首先还是通过命令拿到基本配置模板:

python datax.py -r mysqlreader -w mysqlwriter
1

简单介绍一下模板:

column:表示reader或者writer中对应的列名

connection:填写连接信息

where:设置连接条件

具体的其他参数可以在官方文档中全部找到更详细说明

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [],  #需要同步的列
                        "connection": [ #连接信息
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "",  #密码
                        "username": "",  #用户名
                        "where": "" #筛选条件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [],  #写入段的列名,与上面需要同步的值的位置保持一致
                        "connection": [ #连接信息
                            {
                                "jdbcUrl": "", 
                                "table": []
                            }
                        ], 
                        "password": "",  #密码
                        "preSql": [],  #执行写入之前做的事情
                        "session": [], # DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
                        "username": "",  #用户名
                        "writeMode": "" #控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

做个初始工作,在mysql中先建两张表,一张有数据,一张没有数据:

CREATE TABLE `user`(
`id` int(4) not null auto_increment,
`name` varchar(32) not null,
PRIMARY KEY(id)
)
CREATE TABLE `user2`(
`id` int(4) not null auto_increment,
`name` varchar(32) not null,
PRIMARY KEY(id)
)
INSERT INTO `user` VALUES (1,'javayz')
INSERT INTO `user` VALUES (2,'java')
1
2
3
4
5
6
7
8
9
10
11
12

接下来配置mysql2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://10.10.128.120:3306/test"], 
                                "table": ["user"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://10.10.128.120:3306/test", 
                                "table": ["user2"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

同样运行脚本:

python datax.py ../job/mysql2mysql.json
1

控制台输出成功之后,查看数据库,可以发现数据已经同步过去了。

# (七)总结

上面展示了两种方式的数据同步,除此之外datax还支持大量的数据库,并且使用文档写的十分详细,大家有兴趣可以自己去尝试一下,十分有意思的工具。

我是鱼仔,我们下期再见!

# 问题汇总

# [/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件

需要删除隐藏文件

rm -rf /usr/local/datax/datax/plugin/*/._*
1

# 账号密码正确,报Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).

如果你的Mysql是8.0以上,需要手动将MySQL的mysql-connector-java-xxx的jar包放到datax/lib目录下。

上次更新: 2025/02/18, 11:30:08
为什么要引入分布式任务调度系统?
如何开发一个自己的datax插件

← 为什么要引入分布式任务调度系统? 如何开发一个自己的datax插件→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式