Skip to content

Commit

Permalink
reduce NN RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
cxzl25 committed Jan 22, 2024
1 parent 803e273 commit 14a335a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
5 changes: 4 additions & 1 deletion java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ public enum OrcConf {
ROW_BATCH_CHILD_LIMIT("orc.row.child.limit", "orc.row.child.limit",
1024 * 32, "The maximum number of child elements to buffer before "+
"the ORC row writer writes the batch to the file."
)
),
FILE_LENGTH_FAST("orc.file.length.fast", "orc.file.length.fast",
false, "A boolean flag to enable reduce file length RPC. "
)
;

private final String attribute;
Expand Down
17 changes: 13 additions & 4 deletions java/core/src/java/org/apache/orc/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.reader.ReaderEncryption;
import org.apache.orc.impl.reader.ReaderEncryptionVariant;
import org.apache.orc.util.OrcInputStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -784,12 +785,20 @@ protected OrcTail extractFileTail(FileSystem fs, Path path,
// figure out the size of the file using the option or filesystem
long size;
if (maxFileLength == Long.MAX_VALUE) {
FileStatus fileStatus = fs.getFileStatus(path);
size = fileStatus.getLen();
modificationTime = fileStatus.getModificationTime();
boolean fileLengthFast = conf.getBoolean(OrcConf.FILE_LENGTH_FAST.getAttribute(),
(boolean)OrcConf.FILE_LENGTH_FAST.getDefaultValue());
long fileLength = fileLengthFast ? OrcInputStreamUtil.getFileLength(file) : -1L;
if (fileLength > 0) {
size = fileLength;
modificationTime = -1L;
} else {
FileStatus fileStatus = fs.getFileStatus(path);
size = fileStatus.getLen();
modificationTime = fileStatus.getModificationTime();
}
} else {
size = maxFileLength;
modificationTime = -1;
modificationTime = -1L;
}
if (size == 0) {
// Hive often creates empty files (including ORC) and has an
Expand Down
79 changes: 79 additions & 0 deletions java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.orc.util;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class OrcInputStreamUtil {

private static final String DFS_CLASS = "org.apache.hadoop.hdfs.DFSInputStream";
private static final String DFS_STRIPED_CLASS = "org.apache.hadoop.hdfs.DFSStripedInputStream";

private static Method shortCircuitForbiddenMethod;
private static Method getFileLengthMethod;

static {
init();
}

private static void init() {
try {
initInt();
} catch (ClassNotFoundException | NoSuchMethodException ignored) {
}
}

private static void initInt() throws ClassNotFoundException, NoSuchMethodException {
Class<?> dfsClass = Class.forName(DFS_CLASS);
shortCircuitForbiddenMethod = dfsClass.getDeclaredMethod("shortCircuitForbidden");
shortCircuitForbiddenMethod.setAccessible(true);
getFileLengthMethod = dfsClass.getMethod("getFileLength");
}

public static long getFileLength(FSDataInputStream file) {
try {
return getFileLengthInt(file);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
return -1L;
}
}

private static long getFileLengthInt(FSDataInputStream file)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
if (shortCircuitForbiddenMethod == null || getFileLengthMethod == null) {
return -1L;
}
InputStream wrappedStream = file.getWrappedStream();
Class<? extends InputStream> wrappedStreamClass = wrappedStream.getClass();
String className = wrappedStreamClass.getName();
if (!className.equals(DFS_CLASS) && !className.equals(DFS_STRIPED_CLASS)) {
return -1L;
}
boolean isUnderConstruction = (boolean) shortCircuitForbiddenMethod.invoke(wrappedStream);
// If file are under construction, we need to get the file length from NameNode.
if (!isUnderConstruction) {
return (long) getFileLengthMethod.invoke(wrappedStream);
}
return -1L;
}
}

0 comments on commit 14a335a

Please sign in to comment.