From 14a335ac4760495a15b7293632cc8c8dff5cad1b Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 22 Jan 2024 19:14:09 +0800 Subject: [PATCH] reduce NN RPC --- .../core/src/java/org/apache/orc/OrcConf.java | 5 +- .../java/org/apache/orc/impl/ReaderImpl.java | 17 +++- .../apache/orc/util/OrcInputStreamUtil.java | 79 +++++++++++++++++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index fa7bc9bf3f..02eb6ad711 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -251,7 +251,10 @@ public enum OrcConf { ROW_BATCH_CHILD_LIMIT("orc.row.child.limit", "orc.row.child.limit", 1024 * 32, "The maximum number of child elements to buffer before "+ "the ORC row writer writes the batch to the file." - ) + ), + FILE_LENGTH_FAST("orc.file.length.fast", "orc.file.length.fast", + false, "A boolean flag to enable reduce file length RPC. " + ) ; private final String attribute; diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 3afbff5fc3..9b581ba39c 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -48,6 +48,7 @@ import org.apache.orc.TypeDescription; import org.apache.orc.impl.reader.ReaderEncryption; import org.apache.orc.impl.reader.ReaderEncryptionVariant; +import org.apache.orc.util.OrcInputStreamUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -784,12 +785,20 @@ protected OrcTail extractFileTail(FileSystem fs, Path path, // figure out the size of the file using the option or filesystem long size; if (maxFileLength == Long.MAX_VALUE) { - FileStatus fileStatus = fs.getFileStatus(path); - size = fileStatus.getLen(); - modificationTime = fileStatus.getModificationTime(); + boolean fileLengthFast = conf.getBoolean(OrcConf.FILE_LENGTH_FAST.getAttribute(), + (boolean)OrcConf.FILE_LENGTH_FAST.getDefaultValue()); + long fileLength = fileLengthFast ? OrcInputStreamUtil.getFileLength(file) : -1L; + if (fileLength > 0) { + size = fileLength; + modificationTime = -1L; + } else { + FileStatus fileStatus = fs.getFileStatus(path); + size = fileStatus.getLen(); + modificationTime = fileStatus.getModificationTime(); + } } else { size = maxFileLength; - modificationTime = -1; + modificationTime = -1L; } if (size == 0) { // Hive often creates empty files (including ORC) and has an diff --git a/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java new file mode 100644 index 0000000000..ac9ef31185 --- /dev/null +++ b/java/core/src/java/org/apache/orc/util/OrcInputStreamUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.util; + +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class OrcInputStreamUtil { + + private static final String DFS_CLASS = "org.apache.hadoop.hdfs.DFSInputStream"; + private static final String DFS_STRIPED_CLASS = "org.apache.hadoop.hdfs.DFSStripedInputStream"; + + private static Method shortCircuitForbiddenMethod; + private static Method getFileLengthMethod; + + static { + init(); + } + + private static void init() { + try { + initInt(); + } catch (ClassNotFoundException | NoSuchMethodException ignored) { + } + } + + private static void initInt() throws ClassNotFoundException, NoSuchMethodException { + Class dfsClass = Class.forName(DFS_CLASS); + shortCircuitForbiddenMethod = dfsClass.getDeclaredMethod("shortCircuitForbidden"); + shortCircuitForbiddenMethod.setAccessible(true); + getFileLengthMethod = dfsClass.getMethod("getFileLength"); + } + + public static long getFileLength(FSDataInputStream file) { + try { + return getFileLengthInt(file); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + return -1L; + } + } + + private static long getFileLengthInt(FSDataInputStream file) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + if (shortCircuitForbiddenMethod == null || getFileLengthMethod == null) { + return -1L; + } + InputStream wrappedStream = file.getWrappedStream(); + Class wrappedStreamClass = wrappedStream.getClass(); + String className = wrappedStreamClass.getName(); + if (!className.equals(DFS_CLASS) && !className.equals(DFS_STRIPED_CLASS)) { + return -1L; + } + boolean isUnderConstruction = (boolean) shortCircuitForbiddenMethod.invoke(wrappedStream); + // If file are under construction, we need to get the file length from NameNode. + if (!isUnderConstruction) { + return (long) getFileLengthMethod.invoke(wrappedStream); + } + return -1L; + } +}