Skip to content

Commit

Permalink
[add][plugin][hdfs] Add hdfsSitePath option to specify hdfs-site.xml …
Browse files Browse the repository at this point in the history
…path
  • Loading branch information
wgzhao committed Feb 6, 2025
1 parent 2892565 commit 2afd64f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 13 deletions.
2 changes: 2 additions & 0 deletions common/src/main/java/com/wgzhao/addax/common/base/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class Key
public static final String DEFAULT_FS = "defaultFS";
// The file type will be read from or write to hadoop hdfs, such as ORC, Parquet, Text etc. string type.
public static final String FILE_TYPE = "fileType";
// The hdfs-site.xml path, use the file will auto get the hadoop configuration
public static final String HDFS_SIZE_PATH = "hdfsSitePath";
// How to present null, by default, NULL is stored as \N. string type
public static final String NULL_FORMAT = "nullFormat";
// The record count, mainly used for streamreader plugin. numeric type
Expand Down
41 changes: 28 additions & 13 deletions docs/reader/hdfsreader.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ HDFS Reader 提供了读取分布式文件系统 Hadoop HDFS 数据存储的能

## 配置项说明

| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
| :--------------------- | :------: | ----------- | ------- | ---------------------------------------------------------------------------------------- |
| path || string || 要读取的文件路径 |
| defaultFS || string || HDFS `NAMENODE` 节点地址,如果配置了 HA 模式,则为 `defaultFS` 的值 |
| fileType || string || 文件的类型 |
| column || `list<map>` || 读取字段列表 |
| fieldDelimiter || char | `,` | 指定文本文件的字段分隔符,二进制文件不需要指定该项 |
| encoding || string | `utf-8` | 读取文件的编码配置, 目前仅支持 `utf-8` |
| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
|:-----------------------| :------: |-------------| ------- |-----------------------------------------------------------|
| path || string || 要读取的文件路径 |
| defaultFS || string || HDFS `NAMENODE` 节点地址,如果配置了 HA 模式,则为 `defaultFS` 的值 |
| fileType || string || 文件的类型 |
| column || `list<map>` || 读取字段列表 |
| fieldDelimiter || char | `,` | 指定文本文件的字段分隔符,二进制文件不需要指定该项 |
| encoding || string | `utf-8` | 读取文件的编码配置, 目前仅支持 `utf-8` |
| nullFormat || string || 可以表示为空的字符,如果用户配置: `"\\N"` ,那么如果源头数据是 `"\N"` ,视作 `null` 字段 |
| haveKerberos || boolean || 是否启用 Kerberos 认证,如果启用,则需要同时配置下面两项 |
| kerberosKeytabFilePath || string || Kerberos 认证的凭证文件路径, 比如 `/your/path/addax.service.keytab` |
| kerberosPrincipal || string || Kerberos 认证的凭证主体, 比如 `addax/node1@WGZHAO.COM` |
| compress || string || 指定要读取的文件的压缩格式 |
| hadoopConfig || map || 里可以配置与 Hadoop 相关的一些高级参数,比如 HA 的配置 |
| haveKerberos || boolean || 是否启用 Kerberos 认证,如果启用,则需要同时配置下面两项 |
| kerberosKeytabFilePath || string || Kerberos 认证的凭证文件路径, 比如 `/your/path/addax.service.keytab` |
| kerberosPrincipal || string || Kerberos 认证的凭证主体, 比如 `addax/node1@WGZHAO.COM` |
| compress || string || 指定要读取的文件的压缩格式 |
| hadoopConfig || map || 里可以配置与 Hadoop 相关的一些高级参数,比如 HA 的配置 |
| hdfsSitePath || string || `hdfs-site.xml` 的路径,详细解释见下 |

### path

Expand Down Expand Up @@ -169,6 +170,20 @@ boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
```

### hdfsSitePath

这是 `4.2.4` 引入的新配置想,用于指定 `hdfs-site.xml` 文件的路径,比如对 HDP/CDH 而言,可以这样配置:

```json
{
"hdfsSitePath": "/etc/hadoop/conf/hdfs-site.xml"
}
```

如果配置了 `hdfsSitePath` , 则插件会从该文件中获得访问 HDFS 文件系统必要的配置,从而在大部分情况下不在需要配置 `hadoopConfig`,减少配置量。

对于把 Addax 部署在 Hadoop 集群上的场景,推荐使用这种方式。

## 类型转换

| Addax 内部类型 | Hive 表 数据类型 |
Expand Down
15 changes: 15 additions & 0 deletions docs/writer/hdfswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ HDFS Writer 提供向 HDFS 文件系统指定路径中写入 `TextFile` , `ORC
| preShell || `list` || 写入数据前执行的shell命令,比如 `hive -e "truncate table test.hello"` |
| postShell || `list` || 写入数据后执行的shell命令,比如 `hive -e "select count(1) from test.hello"` |
| ignoreError || boolean | false | 是否忽略`preShell`, `postShell` 命令的错误 |
| hdfsSitePath || string || `hdfs-site.xml` 的路径,详细解释见下 |

### path

Expand Down Expand Up @@ -136,6 +137,20 @@ Hadoop hdfs 文件系统 namenode 节点地址。格式:`hdfs://ip:port` ;
该配置项用于控制是否忽略 `preShell``postShell` 命令的错误,如果配置为 `true`,则在执行 `preShell``postShell` 命令时,如果命令执行失败,不会导致任务失败,而是会打印错误日志,继续执行任务。
否则,如果配置为 `false`,则在执行 `preShell``postShell` 命令时,如果命令执行失败,会导致任务失败。

### hdfsSitePath

这是 `4.2.4` 引入的新配置想,用于指定 `hdfs-site.xml` 文件的路径,比如对 HDP/CDH 而言,可以这样配置:

```json
{
"hdfsSitePath": "/etc/hadoop/conf/hdfs-site.xml"
}
```

如果配置了 `hdfsSitePath` , 则插件会从该文件中获得访问 HDFS 文件系统必要的配置,从而在大部分情况下不在需要配置 `hadoopConfig`,减少配置量。

对于把 Addax 部署在 Hadoop 集群上的场景,推荐使用这种方式。

## 类型转换

| Addax 内部类型 | HIVE 数据类型 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Set;

import static com.wgzhao.addax.common.base.Key.COLUMN;
import static com.wgzhao.addax.common.base.Key.HDFS_SIZE_PATH;
import static com.wgzhao.addax.common.base.Key.NULL_FORMAT;
import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.EXECUTE_FAIL;
Expand Down Expand Up @@ -93,6 +94,11 @@ public DFSUtil(Configuration taskConfig)
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}

if (taskConfig.getString(HDFS_SIZE_PATH, null) !=null) {
hadoopConf.addResource(new Path(taskConfig.getString(HDFS_SIZE_PATH)));
}

hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));

//是否有Kerberos认证
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Set;

import static com.wgzhao.addax.common.base.Key.HAVE_KERBEROS;
import static com.wgzhao.addax.common.base.Key.HDFS_SIZE_PATH;
import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH;
import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL;
import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
Expand Down Expand Up @@ -74,6 +75,11 @@ protected void getFileSystem(Configuration taskConfig)
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}

if (taskConfig.getString(HDFS_SIZE_PATH, null) !=null) {
hadoopConf.addResource(new Path(taskConfig.getString(HDFS_SIZE_PATH)));
}

hadoopConf.set("fs.defaultFS", defaultFS);

//是否有Kerberos认证
Expand Down

0 comments on commit 2afd64f

Please sign in to comment.