如何开发一个自己的datax插件
作者:鱼仔
博客首页: https://codeease.top
公众号:Java鱼仔
# (一)概述
DataX采用FrameWork+plugin的方式,插件只需关心数据的读取或者写入本身。而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。在写插件前官方建议先看一遍开发文档。
Datax开发文档:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
本文将带大家一起开发一个读 Http 接口的 Reader。
# (二)开发一个HttpReader
1、从git上将项目clone下来 https://github.com/alibaba/DataX
2、新建一个module,选最基本的maven项目,项目名定义为xxxreader或者xxxwriter
3、从任意模块里将pom.xml中的
4、将任何一个相同模块(reader或者writer)里的resources中的plugin.json和plugin_job_template.json复制到resources目录下
5、修改plugin.json,主要修改name和class
6、修改plugin_job_template.json,这是datax脚本的结构
7、将任何一个相同模块(reader或者writer)里的assembly复制过来,修改下述内容
8、开始编码
编码之前先看一遍DataX开发文档,再结合已有的插件开发就没什么大问题。下面这段HttpReader实现读取接口中的数据,并作为后续Writer的输入,核心代码如下:
public class HttpReader extends Reader {
public static class Job extends Reader.Job{
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
Configuration pluginJobConf=null;
@Override
public void init() {
this.pluginJobConf = super.getPluginJobConf();
LOG.info("pluginJobConfig",pluginJobConf);
}
@Override
public List<Configuration> split(int adviceNumber) {
LOG.info("adviceNumber:{}",adviceNumber);
List<Configuration> configurations = new ArrayList<Configuration>();
for (int i = 0; i < adviceNumber; i++) {
configurations.add(this.pluginJobConf.clone());
}
return configurations;
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task{
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration configuration;
private String httpUrl=null;
private String httpType=null;
private String httpParamJson=null;
private List<String> dataColumn=null;
@Override
public void init() {
this.configuration = super.getPluginJobConf();
this.httpUrl = this.configuration.getString("httpUrl");
this.httpType = this.configuration.getString("httpType");
this.httpParamJson = this.configuration.getString("httpParamJson");
this.dataColumn = this.configuration.getList("dataColumn",String.class);
}
@Override
public void startRead(RecordSender recordSender) {
LOG.info("------------------- http reader log -----------------------");
LOG.info(this.httpUrl);
LOG.info(this.httpType);
LOG.info(this.httpParamJson);
//调用获取
String result=null;
try {
if (Key.GET.equalsIgnoreCase(this.httpType)){
//get请求调用
result = HttpUtil.get(this.httpUrl);
}else if (Key.POST.equalsIgnoreCase(this.httpType)){
//post请求调用
result = HttpUtil.post(this.httpUrl,this.httpParamJson,60);
}else{
throw DataXException.asDataXException(HttpReaderErrorCode.NOT_SUPPORTED_ERROR,"该方法暂时不支持");
}
}catch (Exception e){
throw DataXException.asDataXException(HttpReaderErrorCode.REQUEST_CALL_FAILED,"请求调用失败");
}
Record record=null;
//将结果塞到record中
JSONObject jsonResult = JSON.parseObject(result);
JSONArray array = jsonResult.getJSONArray("data");
for (int i = 0; i < array.size(); i++) {
JSONObject jsonObject = array.getJSONObject(i);
record=recordSender.createRecord();
for (String data:this.dataColumn){
record.addColumn(new StringColumn(jsonObject.getString(data)));
}
recordSender.sendToWriter(record);
}
recordSender.flush();
}
@Override
public void destroy() {
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
9、编码完成后,通过maven打包
根目录的pom.xml的module去掉其他的reader和writer,留下common、core、transformer和刚刚写的httpReader
执行命令:
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
10、本地启动
对core文件夹下的Engine类配置启动参数即可直接在本地调用DataX插件
-Ddatax.home=/Users/ly/IdeaProjects/DataX-master/target/datax/datax
-job /Users/ly/IdeaProjects/DataX-master/http2stream.json -jobid 1
/Users/ly/IdeaProjects/DataX-master
2
3
4
5
11、配置脚本
配置一个从httpReader读,控制台输出的脚本,读取接口中key为a1、a2、a3的数据
{
"job": {
"content": [
{
"reader": {
"name": "httpreader",
"parameter": {
"httpUrl": "http://127.0.0.1:8081/test",
"httpType": "POST",
"httpParamJson":"{\"aa\":\"aa\"}",
"dataColumn": [
"a1","a2","a3"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
在本地起一个简单的测试接口:
@PostMapping("test")
public String test() {
String jsonResult = "{\n" +
" \"data\":[\n" +
" {\n" +
" \"a1\":\"a11\"\n" +
" },\n" +
" {\n" +
" \"a2\":\"a22\"\n" +
" },\n" +
" {\n" +
" \"a3\":\"a333\"\n" +
" }\n" +
" ]\n" +
"}";
return jsonResult;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
调用成功获取到接口中的数据:
# (三)总结
至此,一个简单的 Datax 插件就开发完成了。可以看出 DataX 的扩展性还是很强的,官网上的这些插件自己基本上都能写出来,值得大家去学习一下。