Addax实现Clickhouse导入hive.md
Addax实现Clickhouse导入hive
Addax是在DataX上修改开发的开源软件,相比DataX有更多免费可用的插件。例如ClickhouseReader插件DataX官方并不提供。
相比waterdrop不依赖spark环境,并且提供作业全链路的流量、数据量运行时监控,精准的速度控制和容错控制。
官方参考文档
https://wgzhao.github.io/Addax/develop/
Addax安装路径
kettle服务器上都有安装:路径 /data/addax/
使用步骤
增量
以资金结算单据表(bsn.bsn_am_zjjsdj_distinct_all)为例
- 1、编辑并上传任务配置文件
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": 2000000,
"record": 100000
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": {
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "${P_CLICKHOUSE_USER}",
"password": "${P_CLICKHOUSE_PASSWORD}",
"column": ["zjjsdj_djnm","zjjsdj_glqtnm","zjjsdj_jsfs","zjjsdj_ywlx","zjjsdj_djzt","createdtime","zjjsdj_je","zjjsdj_yhzhkhhlhh","zjjsdj_dwbh","zjjsdj_yhrq","zjjsdj_djlx","zjjsdj_yhsh","creation_date"],
"where": "part_period='${P_PARTITION_NAME}'",
"connection": [
{
"table": [
"bsn_am_zjjsdj_distinct_all"
],
"jdbcUrl": [
"${P_CLICKHOUSE_URL}"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "${P_HDFS_DEFAULT_FS}",
"fileType": "orc",
"path": "${P_HDFS_HIVE_PATH}/warehouse/ods.db/ods_fms_zjjsdj",
"fileName": "${P_PARTITION_NAME}",
"column": [
{"name":"zjjsdj_djnm","type":"string"},
{"name":"zjjsdj_glqtnm","type":"string"},
{"name":"zjjsdj_jsfs","type":"string"},
{"name":"zjjsdj_ywlx","type":"string"},
{"name":"zjjsdj_djzt","type":"string"},
{"name":"createdtime","type":"string"},
{"name":"zjjsdj_je","type":"decimal"},
{"name":"zjjsdj_yhzhkhhlhh","type":"string"},
{"name":"zjjsdj_dwbh","type":"string"},
{"name":"zjjsdj_yhrq","type":"string"},
{"name":"zjjsdj_djlx","type":"string"},
{"name":"zjjsdj_yhsh","type":"string"},
{"name":"creation_date","type":"date"}
],
"writeMode": "overwrite",
"fieldDelimiter": "\u0001",
"compress": "NONE",
"haveKerberos": "${P_KERBEROS_ENABLED}",
"kerberosPrincipal": "${P_KERBEROS_PRINCIPAL}",
"kerberosKeytabFilePath": "${P_KERBEROS_KEYTAB}",
"hadoopConfig": {
"dfs.nameservices": "${P_HDFS_HA_NAME}",
"dfs.ha.namenodes.${P_HDFS_HA_NAME}": "nn1,nn2",
"dfs.namenode.rpc-address.${P_HDFS_HA_NAME}.nn1": "${P_HDFS_HA_NN1}",
"dfs.namenode.rpc-address.${P_HDFS_HA_NAME}.nn2": "${P_HDFS_HA_NN2}",
"dfs.client.failover.proxy.provider.${P_HDFS_HA_NAME}": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
}
}
}
}
- 2、通过命令启动任务
## kettle
hive -e "alter table ods.ods_fms_zjjsdj add if not exists partition(part_period='${P_PERIOD_NAME}');"
/data/addax/bin/addax.sh \
-p"-DP_PARTITION_NAME=${P_PERIOD_NAME} -DP_CLICKHOUSE_PASSWORD=${P_CLICKHOUSE_PASSWORD} -DP_CLICKHOUSE_USER=${P_CLICKHOUSE_USER} -DP_CLICKHOUSE_URL=${P_CLICKHOUSE_URL} -DP_CLICKHOUSE_USER=${P_CLICKHOUSE_USER} -DP_HDFS_HIVE_PATH=${P_HDFS_HIVE_PATH} -DP_KERBEROS_ENABLED=${P_KERBEROS_ENABLED} -DP_KERBEROS_PRINCIPAL=${P_KERBEROS_PRINCIPAL} -DP_KERBEROS_KEYTAB=${P_KERBEROS_KEYTAB} -DP_HDFS_DEFAULT_FS=${P_HDFS_DEFAULT_FS} -DP_HDFS_HA_NAME=${P_HDFS_HA_NAME} -DP_HDFS_HA_NN1=${P_HDFS_HA_NN1} -DP_HDFS_HA_NN2=${P_HDFS_HA_NN2}" \
/data/addax/job/ck2hive/bsn_am_zjjsdj_test.json
## 替换变量之后,示例
hive -e "alter table ods.ods_fms_zjjsdj add if not exists partition(part_period='202109');"
/data/addax/bin/addax.sh \
-p "-DP_PARTITION_NAME='202109' -DP_CLICKHOUSE_PASSWORD=testpwd -DP_CLICKHOUSE_USER=ck_dev -DP_CLICKHOUSE_URL='jdbc:clickhouse://test.ttcheng.wang:8123/test?socket_timeout=300000' -DP_HDFS_HIVE_PATH='/apps/hive' -DP_KERBEROS_ENABLED=true -DP_KERBEROS_PRINCIPAL='test@HADOOP.COM' -DP_KERBEROS_KEYTAB='/home/test/user.keytab' -DP_HDFS_DEFAULT_FS='hdfs://tstdevhd01:8020' -DP_HDFS_HA_NAME=UATCLUSTER -DP_HDFS_HA_NN1='tstdevhd01:8020' -DP_HDFS_HA_NN2='tstdevhd02:8020'" \
/data/addax/job/ck2hive/bsn_am_zjjsdj_test.json
全量
以资金账户表(bsn.bsn_am_zjzh_view,视图是去重)为例
- 编辑并上传任务配置文件
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": 2000000,
"record": 100000
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": {
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "${P_CLICKHOUSE_USER}",
"password": "${P_CLICKHOUSE_PASSWORD}",
"column": [
"zjzh_zhnm",
"zjzh_lbbh",
"zjzh_hzyhbh",
"zjzh_dwbh",
"zjzh_zhxt",
"zjzh_zhbh",
"sync_date"
],
"where": "1=1",
"connection": [
{
"table": [
"bsn_am_zjzh_view"
],
"jdbcUrl": [
"${P_CLICKHOUSE_URL}"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "${P_HDFS_DEFAULT_FS}",
"fileType": "orc",
"path": "${P_HDFS_HIVE_PATH}/warehouse/ods.db/ods_fms_zjzh",
"fileName": "ods_fms_zjzh",
"column": [
{
"name": "zjzh_zhnm",
"type": "string"
},
{
"name": "zjzh_lbbh",
"type": "string"
},
{
"name": "zjzh_hzyhbh",
"type": "string"
},
{
"name": "zjzh_dwbh",
"type": "string"
},
{
"name": "zjzh_zhxt",
"type": "string"
},
{
"name": "zjzh_zhbh",
"type": "string"
},
{
"name": "creation_date",
"type": "string"
}
],
"writeMode": "overwrite",
"fieldDelimiter": "\u0001",
"compress": "NONE",
"haveKerberos": "${P_KERBEROS_ENABLED}",
"kerberosPrincipal": "${P_KERBEROS_PRINCIPAL}",
"kerberosKeytabFilePath": "${P_KERBEROS_KEYTAB}",
"hadoopConfig": {
"dfs.nameservices": "${P_HDFS_HA_NAME}",
"dfs.ha.namenodes.${P_HDFS_HA_NAME}": "nn1,nn2",
"dfs.namenode.rpc-address.${P_HDFS_HA_NAME}.nn1": "${P_HDFS_HA_NN1}",
"dfs.namenode.rpc-address.${P_HDFS_HA_NAME}.nn2": "${P_HDFS_HA_NN2}",
"dfs.client.failover.proxy.provider.${P_HDFS_HA_NAME}": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
}
}
}
}
- 通过命令启动任务
## kettle
/data/addax/bin/addax.sh \
-p"-DP_CLICKHOUSE_PASSWORD=${P_CLICKHOUSE_PASSWORD} -DP_CLICKHOUSE_USER=${P_CLICKHOUSE_USER} -DP_CLICKHOUSE_URL=${P_CLICKHOUSE_URL} -DP_CLICKHOUSE_USER=${P_CLICKHOUSE_USER} -DP_HDFS_HIVE_PATH=${P_HDFS_HIVE_PATH} -DP_KERBEROS_ENABLED=${P_KERBEROS_ENABLED} -DP_KERBEROS_PRINCIPAL=${P_KERBEROS_PRINCIPAL} -DP_KERBEROS_KEYTAB=${P_KERBEROS_KEYTAB} -DP_HDFS_DEFAULT_FS=${P_HDFS_DEFAULT_FS} -DP_HDFS_HA_NAME=${P_HDFS_HA_NAME} -DP_HDFS_HA_NN1=${P_HDFS_HA_NN1} -DP_HDFS_HA_NN2=${P_HDFS_HA_NN2}" \
/data/addax/job/ck2hive/bsn_am_zjzh_test.json
# 加载数据,只是第一次时需要执行这个命令,后续覆盖不需要执行
hive -e "load data inpath '${P_HDFS_HIVE_PATH}/warehouse/ods.db/ods_fms_zjzh' into table ods.ods_fms_zjzh;"
## 替换变量之后,示例
/data/addax/bin/addax.sh \
-p "-DP_CLICKHOUSE_PASSWORD=testpwd -DP_CLICKHOUSE_USER=ck_dev -DP_CLICKHOUSE_URL='jdbc:clickhouse://test.ttcheng.wang:8123/test?socket_timeout=300000' -DP_HDFS_HIVE_PATH='/apps/hive' -DP_KERBEROS_ENABLED=true -DP_KERBEROS_PRINCIPAL='test@HADOOP.COM' -DP_KERBEROS_KEYTAB='/home/test/user.keytab' -DP_HDFS_DEFAULT_FS='hdfs://tstdevhd01:8020' -DP_HDFS_HA_NAME=TSTCLUSTER -DP_HDFS_HA_NN1='tstdevhd01:8020' -DP_HDFS_HA_NN2='tstdevhd02:8020'" \
/data/addax/job/ck2hive/bsn_am_zjzh_test.json
# 加载数据,只是第一次时需要执行这个命令,后续重复覆盖不需要执行
hive -e "load data inpath '/apps/hive/warehouse/ods.db/ods_fms_zjzh' into table ods.ods_fms_zjzh;"
License:
CC BY 4.0