DataX迁移MongoDB
2023-12-20 06:37:45
DataX迁移MongoDB
- 项目地址:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
- 迁移MongoDB,读取组件为mongodbreader,写入组件为mongodbwriter
源码修改
-
目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码
src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java
if (tempCol == null) { //continue; 这个不能直接continue会导致record到目的端错位 record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) { //TODO deal with Double.isNaN() record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) { record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) { record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) { record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) { record.addColumn(new LongColumn((Long) tempCol)); } else { if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) { String splitter = column.getString(KeyConstant.COLUMN_SPLITTER); if(Strings.isNullOrEmpty(splitter)) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } else { ArrayList array = (ArrayList)tempCol; String tempArrayStr = Joiner.on(splitter).join(array); record.addColumn(new StringColumn(tempArrayStr)); } } else { record.addColumn(new StringColumn(tempCol.toString())); } }
-
修改为:
if (tempCol == null) { //continue; 这个不能直接continue会导致record到目的端错位 record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) { //TODO deal with Double.isNaN() record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) { record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) { record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) { record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) { record.addColumn(new LongColumn((Long) tempCol)); } else if (tempCol instanceof Binary) { // 处理 MongoDB 的 Binary 类型数据 Binary binaryData = (Binary) tempCol; byte[] binaryBytes = binaryData.getData(); // 将字节数组添加到 DataX 中的二进制列 record.addColumn(new BytesColumn(binaryBytes)); } else { if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) { String splitter = column.getString(KeyConstant.COLUMN_SPLITTER); if(Strings.isNullOrEmpty(splitter)) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } else { ArrayList array = (ArrayList)tempCol; String tempArrayStr = Joiner.on(splitter).join(array); record.addColumn(new StringColumn(tempArrayStr)); } } else { record.addColumn(new StringColumn(tempCol.toString())); } }
- 修改源码后,要重新打包,由于只更改了mongodbreader,故在打包时,可以考虑将根
迁移脚本
-
编写job脚本:
1.json
{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["ip1:27017"], "collectionName": "data", "column": [ { "name": "_id", "type": "long" }, { "name": "fileContent", "type": "bytes" } ], "dbName": "monitor", "userName": "root", "userPassword": "123456", "query": { "_id": { "$lt": 21 } } } }, "writer": { "name": "mongodbwriter", "parameter": { "address": ["ip2:27017"], "collectionName": "data", "column": [ { "name": "_id", "type": "long" }, { "name": "fileContent", "type": "bytes" } ], "writeMode": { "isReplace": "true", "replaceKey": "_id" } "dbName": "test", "userName": "root", "userPassword": "123456", } } } ], "setting": { "speed": { "channel": "2" } } } }
reader
中的query
节点为查询条件,上述demo中是查询_id
小于21的记录。
-
执行命令:
python datax.py G:\Code\1.json
datax.py
在打包后的target目录下,相对路径:target\datax\datax\bin
文章来源:https://blog.csdn.net/yudaxiaye/article/details/135076633
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!