Skip to content

Commit

Permalink
Merge pull request #7 from civitaspo/v0.1.1
Browse files Browse the repository at this point in the history
V0.1.1
  • Loading branch information
civitaspo committed Sep 9, 2015
2 parents 2b5247b + 915482f commit 81a8e0c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Read files on Hdfs.
- **input_path** file path on Hdfs. you can use glob and Date format like `%Y%m%d/%s`.
- **rewind_seconds** When you use Date format in input_path property, the format is executed by using the time which is Now minus this property.
- **partition** when this is true, partition input files and increase task count. (default: `true`)
- **num_partitions** number of partitions. (default: `Runtime.getRuntime().availableProcessors()`)

## Example

Expand All @@ -32,6 +33,7 @@ in:
input_path: /user/embulk/test/%Y-%m-%d/*
rewind_seconds: 86400
partition: true
num_partitions: 30
decoders:
- {type: gzip}
parser:
Expand All @@ -53,6 +55,8 @@ in:
```
## Note
- The parameter **num_partitions** is the approximate value. The actual num_partitions is larger than this parameter.
- see: [The Partitioning Logic](#partition_logic)
- the feature of the partition supports only 3 line terminators.
- `\n`
- `\r`
Expand All @@ -61,6 +65,36 @@ in:
## The Reference Implementation
- [hito4t/embulk-input-filesplit](https://github.com/hito4t/embulk-input-filesplit)

##<a id="partition_logic">The Partitioning Logic</a>

```
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;

/*
...
*/

int numPartitions;
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
// if the file is compressed, skip partitioning.
numPartitions = 1;
}
else if (!task.getPartition()) {
// if no partition mode, skip partitioning.
numPartitions = 1;
}
else {
// equalize the file size per task as much as possible.
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
}

/*
...
*/

```
## Build
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ configurations {
provided
}

version = "0.1.0"
version = "0.1.1"

sourceCompatibility = 1.7
targetCompatibility = 1.7
Expand Down
33 changes: 16 additions & 17 deletions src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ public interface PluginTask extends Task
@ConfigDefault("true")
public boolean getPartition();

// this parameter is experimental.
@Config("partition_level")
@ConfigDefault("3")
public int getPartitonLevel();
@Config("num_partitions") // this parameter is the approximate value.
@ConfigDefault("-1") // Default: Runtime.getRuntime().availableProcessors()
public int getApproximateNumPartitions();

public List<HdfsPartialFile> getFiles();
public void setFiles(List<HdfsPartialFile> hdfsFiles);
Expand Down Expand Up @@ -235,30 +234,30 @@ public Path apply(@Nullable String input)
}

// TODO: optimum allocation of resources
int partitionCountParameter = task.getPartitonLevel();
int partitionSizeByOneTask = totalFileLength / (Runtime.getRuntime().availableProcessors() * partitionCountParameter);
int approximateNumPartitions =
(task.getApproximateNumPartitions() <= 0) ? Runtime.getRuntime().availableProcessors() : task.getApproximateNumPartitions();
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;

List<HdfsPartialFile> hdfsPartialFiles = new ArrayList<>();
for (Path path : pathList) {
int partitionCount;
int fileLength = (int) fs.getFileStatus(path).getLen(); // declare `fileLength` here because this is used below.
if (fileLength <= 0) {
logger.info("Skip the 0 byte target file: {}", path);
continue;
}

int numPartitions;
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
partitionCount = 1;
numPartitions = 1;
}
else if (!task.getPartition()) {
partitionCount = 1;
numPartitions = 1;
}
else {
int fileLength = (int) fs.getFileStatus(path).getLen();
partitionCount = fileLength / partitionSizeByOneTask;
int remainder = fileLength % partitionSizeByOneTask;

if (remainder > 0) {
partitionCount++;
}
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
}

HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, partitionCount);
HdfsFilePartitioner partitioner = new HdfsFilePartitioner(fs, path, numPartitions);
hdfsPartialFiles.addAll(partitioner.getHdfsPartialFiles());
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/embulk/input/hdfs/HdfsFilePartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ public class HdfsFilePartitioner
{
private FileSystem fs;
private Path path;
private int partitionCount;
private int numPartitions;

public HdfsFilePartitioner(FileSystem fs, Path path, int partitionCount)
public HdfsFilePartitioner(FileSystem fs, Path path, int numPartitions)
{
this.fs = fs;
this.path = path;
this.partitionCount = partitionCount;
this.numPartitions = numPartitions;
}

public List<HdfsPartialFile> getHdfsPartialFiles() throws IOException
{
List<HdfsPartialFile> hdfsPartialFiles = new ArrayList<>();
long size = fs.getFileStatus(path).getLen();
for (int i = 0; i < partitionCount; i++) {
long start = size * i / partitionCount;
long end = size * (i + 1) / partitionCount;
for (int i = 0; i < numPartitions; i++) {
long start = size * i / numPartitions;
long end = size * (i + 1) / numPartitions;
if (start < end) {
hdfsPartialFiles.add(new HdfsPartialFile(path.toString(), start, end));
}
Expand Down

0 comments on commit 81a8e0c

Please sign in to comment.