From d4e88302e367b7f5a3b3da9d2e0f734320cef413 Mon Sep 17 00:00:00 2001 From: macroguo Date: Fri, 19 Jan 2024 16:51:58 +0800 Subject: [PATCH] [CALCITE-2040] Create adapter for Apache Arrow Co-authored-by: Alessandro Solimando Co-authored-by: Jonathan Swenson Co-authored-by: Julian Hyde Co-authored-by: Karshit Shah Co-authored-by: Michael Mior --- .github/workflows/main.yml | 17 +- Jenkinsfile | 4 +- arrow/build.gradle.kts | 32 + arrow/gradle.properties | 18 + .../arrow/AbstractArrowEnumerator.java | 82 ++ .../adapter/arrow/ArrowEnumerable.java | 61 ++ .../calcite/adapter/arrow/ArrowFieldType.java | 102 +++ .../calcite/adapter/arrow/ArrowFilter.java | 62 ++ .../adapter/arrow/ArrowFilterEnumerator.java | 99 +++ .../calcite/adapter/arrow/ArrowMethod.java | 55 ++ .../calcite/adapter/arrow/ArrowProject.java | 85 +++ .../adapter/arrow/ArrowProjectEnumerator.java | 76 ++ .../calcite/adapter/arrow/ArrowRel.java | 69 ++ .../calcite/adapter/arrow/ArrowRules.java | 173 +++++ .../calcite/adapter/arrow/ArrowSchema.java | 119 +++ .../adapter/arrow/ArrowSchemaFactory.java | 53 ++ .../calcite/adapter/arrow/ArrowTable.java | 194 +++++ .../calcite/adapter/arrow/ArrowTableScan.java | 84 +++ .../arrow/ArrowToEnumerableConverter.java | 89 +++ .../adapter/arrow/ArrowTranslator.java | 202 +++++ .../calcite/adapter/arrow/package-info.java | 21 + .../adapter/arrow/ArrowAdapterTest.java | 712 ++++++++++++++++++ .../calcite/adapter/arrow/ArrowData.java | 222 ++++++ .../calcite/adapter/arrow/ArrowExtension.java | 71 ++ arrow/src/test/resources/arrow-model.json | 30 + bom/build.gradle.kts | 4 + build.gradle.kts | 2 +- .../calcite/config/CalciteSystemProperty.java | 6 + .../apache/calcite/util/BuiltInMethod.java | 2 + .../apache/calcite/util/ImmutableIntList.java | 20 +- gradle.properties | 2 + settings.gradle.kts | 1 + site/_docs/adapter.md | 1 + site/_docs/arrow_adapter.md | 83 ++ 34 files changed, 2848 insertions(+), 5 deletions(-) create mode 100644 arrow/build.gradle.kts create mode 100644 arrow/gradle.properties create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java create mode 100644 arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java create mode 100644 arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java create mode 100644 arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java create mode 100644 arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java create mode 100644 arrow/src/test/resources/arrow-model.json create mode 100644 site/_docs/arrow_adapter.md diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4c8a54841cd1..059a2b33c889 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -69,7 +69,9 @@ jobs: with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false - arguments: --scan --no-parallel --no-daemon build javadoc + # Arrow build is excluded because it is not supported on Windows + # See https://arrow.apache.org/docs/java/install.html#system-compatibility + arguments: --scan --no-parallel --no-daemon build javadoc --exclude-task :arrow:build - name: 'sqlline and sqllsh' shell: cmd run: | @@ -103,7 +105,9 @@ jobs: with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false - arguments: --scan --no-parallel --no-daemon build + # Arrow build is excluded because it is not supported on Windows + # See https://arrow.apache.org/docs/java/install.html#system-compatibility + arguments: --scan --no-parallel --no-daemon build --exclude-task :arrow:build - name: 'sqlline and sqllsh' shell: cmd run: | @@ -215,6 +219,9 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+ + # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false @@ -241,6 +248,9 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+ + # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk${{ matrix.jdk }} remote-build-cache-proxy-enabled: false @@ -310,6 +320,9 @@ jobs: S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }} S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }} GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+ + # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility + _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED with: job-id: jdk19 remote-build-cache-proxy-enabled: false diff --git a/Jenkinsfile b/Jenkinsfile index 53cc1e6f31fc..bd4216c05e42 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -41,7 +41,9 @@ node('ubuntu') { } stage('Code Quality') { timeout(time: 1, unit: 'HOURS') { - withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17"]) { + // The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+ + // to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility + withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17","_JAVA_OPTIONS=--add-opens=java.base/java.nio=ALL-UNNAMED"]) { withCredentials([string(credentialsId: 'SONARCLOUD_TOKEN', variable: 'SONAR_TOKEN')]) { if ( env.BRANCH_NAME.startsWith("PR-") ) { sh './gradlew --no-parallel --no-daemon jacocoAggregateTestReport sonar -PenableJacoco -Dsonar.pullrequest.branch=${CHANGE_BRANCH} -Dsonar.pullrequest.base=${CHANGE_TARGET} -Dsonar.pullrequest.key=${CHANGE_ID} -Dsonar.login=${SONAR_TOKEN}' diff --git a/arrow/build.gradle.kts b/arrow/build.gradle.kts new file mode 100644 index 000000000000..0ed73e8ddf31 --- /dev/null +++ b/arrow/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api(project(":core")) + + implementation("com.google.guava:guava") + implementation("org.apache.arrow:arrow-memory-netty") + implementation("org.apache.arrow:arrow-vector") + implementation("org.apache.arrow.gandiva:arrow-gandiva") + annotationProcessor("org.immutables:value") + compileOnly("org.immutables:value-annotations") + + testImplementation("org.apache.arrow:arrow-jdbc") + testImplementation("net.hydromatic:scott-data-hsqldb") + testImplementation("org.apache.commons:commons-lang3") + testImplementation(project(":core")) + testImplementation(project(":testkit")) +} diff --git a/arrow/gradle.properties b/arrow/gradle.properties new file mode 100644 index 000000000000..4d2cfdd304a6 --- /dev/null +++ b/arrow/gradle.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +description=Arrow adapter for Calcite +artifact.name=Calcite Arrow diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java new file mode 100644 index 000000000000..6351dca36ffc --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Enumerator that reads from a collection of Arrow value-vectors. + */ +abstract class AbstractArrowEnumerator implements Enumerator { + protected final ArrowFileReader arrowFileReader; + protected final List fields; + protected final List valueVectors; + protected int currRowIndex; + protected int rowCount; + + AbstractArrowEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields) { + this.arrowFileReader = arrowFileReader; + this.fields = fields; + this.valueVectors = new ArrayList<>(fields.size()); + this.currRowIndex = -1; + } + + abstract void evaluateOperator(ArrowRecordBatch arrowRecordBatch); + + protected void loadNextArrowBatch() { + try { + final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot(); + for (int i : fields) { + this.valueVectors.add(vsr.getVector(i)); + } + this.rowCount = vsr.getRowCount(); + VectorUnloader vectorUnloader = new VectorUnloader(vsr); + ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch(); + evaluateOperator(arrowRecordBatch); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + } + + @Override public Object current() { + if (fields.size() == 1) { + return this.valueVectors.get(0).getObject(currRowIndex); + } + Object[] current = new Object[valueVectors.size()]; + for (int i = 0; i < valueVectors.size(); i++) { + ValueVector vector = this.valueVectors.get(i); + current[i] = vector.getObject(currRowIndex); + } + return current; + } + + @Override public void reset() { + throw new UnsupportedOperationException(); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java new file mode 100644 index 000000000000..142f18c2f6a0 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.vector.ipc.ArrowFileReader; + +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Enumerable that reads from Arrow value-vectors. + */ +class ArrowEnumerable extends AbstractEnumerable { + private final ArrowFileReader arrowFileReader; + private final ImmutableIntList fields; + private final @Nullable Projector projector; + private final @Nullable Filter filter; + + + ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields, + @Nullable Projector projector, @Nullable Filter filter) { + this.arrowFileReader = arrowFileReader; + this.projector = projector; + this.filter = filter; + this.fields = fields; + } + + @Override public Enumerator enumerator() { + try { + if (projector != null) { + return new ArrowProjectEnumerator(arrowFileReader, fields, projector); + } else if (filter != null) { + return new ArrowFilterEnumerator(arrowFileReader, fields, filter); + } + throw new IllegalArgumentException( + "The arrow enumerator must have either a filter or a projection"); + } catch (Exception e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java new file mode 100644 index 000000000000..4cb6c388ef10 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.rel.type.RelDataType; + +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Arrow field type. + */ +enum ArrowFieldType { + INT(Primitive.INT), + BOOLEAN(Primitive.BOOLEAN), + STRING(String.class), + FLOAT(Primitive.FLOAT), + DOUBLE(Primitive.DOUBLE), + DATE(Date.class), + LIST(List.class), + DECIMAL(BigDecimal.class), + LONG(Primitive.LONG), + BYTE(Primitive.BYTE), + SHORT(Primitive.SHORT); + + private final Class clazz; + + ArrowFieldType(Primitive primitive) { + this(requireNonNull(primitive.boxClass, "boxClass")); + } + + ArrowFieldType(Class clazz) { + this.clazz = clazz; + } + + public RelDataType toType(JavaTypeFactory typeFactory) { + RelDataType javaType = typeFactory.createJavaType(clazz); + RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName()); + return typeFactory.createTypeWithNullability(sqlType, true); + } + + public static ArrowFieldType of(ArrowType arrowType) { + switch (arrowType.getTypeID()) { + case Int: + int bitWidth = ((ArrowType.Int) arrowType).getBitWidth(); + switch (bitWidth) { + case 64: + return LONG; + case 32: + return INT; + case 16: + return SHORT; + case 8: + return BYTE; + default: + throw new IllegalArgumentException("Unsupported Int bit width: " + bitWidth); + } + case Bool: + return BOOLEAN; + case Utf8: + return STRING; + case FloatingPoint: + FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision(); + switch (precision) { + case SINGLE: + return FLOAT; + case DOUBLE: + return DOUBLE; + default: + throw new IllegalArgumentException("Unsupported Floating point precision: " + precision); + } + case Date: + return DATE; + case Decimal: + return DECIMAL; + default: + throw new IllegalArgumentException("Unsupported type: " + arrowType); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java new file mode 100644 index 000000000000..9774318ea925 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of a {@link org.apache.calcite.rel.core.Filter} + * relational expression in Arrow. + */ +class ArrowFilter extends Filter implements ArrowRel { + private final List match; + + ArrowFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode condition) { + super(cluster, traitSet, input, condition); + final ArrowTranslator translator = + ArrowTranslator.create(cluster.getRexBuilder(), input.getRowType()); + this.match = translator.translateMatch(condition); + + assert getConvention() == ArrowRel.CONVENTION; + assert getConvention() == input.getConvention(); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + final RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(0.1); + } + + @Override public ArrowFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new ArrowFilter(getCluster(), traitSet, input, condition); + } + + @Override public void implement(Implementor implementor) { + implementor.visitInput(0, getInput()); + implementor.addFilters(match); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java new file mode 100644 index 000000000000..c52a54290535 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.SelectionVector; +import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import java.io.IOException; + +/** + * Enumerator that reads from a filtered collection of Arrow value-vectors. + */ +class ArrowFilterEnumerator extends AbstractArrowEnumerator { + private final BufferAllocator allocator; + private final Filter filter; + private ArrowBuf buf; + private SelectionVector selectionVector; + private int selectionVectorIndex; + + ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, Filter filter) { + super(arrowFileReader, fields); + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.filter = filter; + } + + @Override void evaluateOperator(ArrowRecordBatch arrowRecordBatch) { + try { + this.buf = this.allocator.buffer((long) rowCount * 2); + this.selectionVector = new SelectionVectorInt16(buf); + filter.evaluate(arrowRecordBatch, selectionVector); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + @Override public boolean moveNext() { + if (selectionVector == null + || selectionVectorIndex >= selectionVector.getRecordCount()) { + boolean hasNextBatch; + while (true) { + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + selectionVectorIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + assert selectionVector != null; + if (selectionVectorIndex >= selectionVector.getRecordCount()) { + // the "filtered" batch is empty, but there may be more batches to fetch + continue; + } + currRowIndex = selectionVector.getIndex(selectionVectorIndex++); + } + return hasNextBatch; + } + } else { + currRowIndex = selectionVector.getIndex(selectionVectorIndex++); + return true; + } + } + + @Override public void close() { + try { + if (buf != null) { + buf.close(); + } + filter.close(); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java new file mode 100644 index 000000000000..66b078df4a51 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.util.ImmutableIntList; + +import com.google.common.collect.ImmutableMap; + +import java.lang.reflect.Method; +import java.util.List; + +/** + * Built-in methods in the Arrow adapter. + * + * @see org.apache.calcite.util.BuiltInMethod + */ +@SuppressWarnings("ImmutableEnumChecker") +enum ArrowMethod { + ARROW_QUERY(ArrowTable.class, "query", DataContext.class, + ImmutableIntList.class, List.class); + + final Method method; + + static final ImmutableMap MAP; + + static { + final ImmutableMap.Builder builder = + ImmutableMap.builder(); + for (ArrowMethod value : ArrowMethod.values()) { + builder.put(value.method, value); + } + MAP = builder.build(); + } + + /** Defines a method. */ + ArrowMethod(Class clazz, String methodName, Class... argumentTypes) { + this.method = Types.lookupMethod(clazz, methodName, argumentTypes); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java new file mode 100644 index 000000000000..8fee390d543f --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Project} + * relational expression in Arrow. + */ +class ArrowProject extends Project implements ArrowRel { + + /** Creates an ArrowProject. */ + ArrowProject(RelOptCluster cluster, RelTraitSet traitSet, + RelNode input, List projects, RelDataType rowType) { + super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of()); + assert getConvention() == ArrowRel.CONVENTION; + assert getConvention() == input.getConvention(); + } + + @Override public Project copy(RelTraitSet traitSet, RelNode input, + List projects, RelDataType rowType) { + return new ArrowProject(getCluster(), traitSet, input, projects, + rowType); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + final RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(0.1); + } + + @Override public void implement(Implementor implementor) { + implementor.visitInput(0, getInput()); + List projectedFields = getProjectFields(getProjects()); + if (projectedFields != null) { + implementor.addProjectFields(projectedFields); + } + } + + static @Nullable List getProjectFields(List exps) { + final List fields = new ArrayList<>(); + for (final RexNode exp : exps) { + if (exp instanceof RexInputRef) { + fields.add(((RexInputRef) exp).getIndex()); + } else { + return null; // not a simple projection + } + } + return fields; + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java new file mode 100644 index 000000000000..2426810bbc13 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import java.io.IOException; + +/** + * Enumerator that reads from a projected collection of Arrow value-vectors. + */ +class ArrowProjectEnumerator extends AbstractArrowEnumerator { + private final Projector projector; + + ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, + Projector projector) { + super(arrowFileReader, fields); + this.projector = projector; + } + + @Override protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch) { + try { + projector.evaluate(arrowRecordBatch, valueVectors); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + @Override public boolean moveNext() { + if (currRowIndex >= rowCount - 1) { + final boolean hasNextBatch; + try { + hasNextBatch = arrowFileReader.loadNextBatch(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + if (hasNextBatch) { + currRowIndex = 0; + this.valueVectors.clear(); + loadNextArrowBatch(); + } + return hasNextBatch; + } else { + currRowIndex++; + return true; + } + } + + @Override public void close() { + try { + projector.close(); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java new file mode 100644 index 000000000000..5b002bdc2dcd --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.util.ImmutableIntList; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Relational expression that uses the Arrow calling convention. + */ +public interface ArrowRel extends RelNode { + void implement(Implementor implementor); + + /** Calling convention for relational operations that occur in Arrow. */ + Convention CONVENTION = new Convention.Impl("ARROW", ArrowRel.class); + + /** Callback for the implementation process that converts a tree of + * {@link ArrowRel} nodes into a SQL query. */ + class Implementor { + @Nullable List selectFields; + final List whereClause = new ArrayList<>(); + @Nullable RelOptTable table; + @Nullable ArrowTable arrowTable; + + /** Adds new predicates. + * + * @param predicates Predicates + */ + void addFilters(List predicates) { + whereClause.addAll(predicates); + } + + /** Adds newly projected fields. + * + * @param fields New fields to be projected from a query + */ + void addProjectFields(List fields) { + selectFields = ImmutableIntList.copyOf(fields); + } + + public void visitInput(int ordinal, RelNode input) { + checkArgument(ordinal == 0); + ((ArrowRel) input).implement(this); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java new file mode 100644 index 000000000000..42e47e8be2ce --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.validate.SqlValidatorUtil; + +import com.google.common.collect.ImmutableList; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +import java.util.List; + +/** Planner rules relating to the Arrow adapter. */ +public class ArrowRules { + private ArrowRules() {} + + /** Rule that matches a {@link org.apache.calcite.rel.core.Project} on + * an {@link ArrowTableScan} and pushes down projects if possible. */ + public static final ArrowProjectRule PROJECT_SCAN = + ArrowProjectRule.DEFAULT_CONFIG.toRule(ArrowProjectRule.class); + + public static final ArrowFilterRule FILTER_SCAN = + ArrowFilterRule.Config.DEFAULT.toRule(); + + public static final ConverterRule TO_ENUMERABLE = + ArrowToEnumerableConverterRule.DEFAULT_CONFIG + .toRule(ArrowToEnumerableConverterRule.class); + + public static final List RULES = ImmutableList.of(PROJECT_SCAN, FILTER_SCAN); + + static List arrowFieldNames(final RelDataType rowType) { + return SqlValidatorUtil.uniquify(rowType.getFieldNames(), + SqlValidatorUtil.EXPR_SUGGESTER, true); + } + + /** Base class for planner rules that convert a relational expression to + * the Arrow calling convention. */ + abstract static class ArrowConverterRule extends ConverterRule { + ArrowConverterRule(Config config) { + super(config); + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.core.Filter} to an + * {@link ArrowFilter}. + */ + public static class ArrowFilterRule extends RelRule { + + /** Creates an ArrowFilterRule. */ + protected ArrowFilterRule(Config config) { + super(config); + } + + @Override public void onMatch(RelOptRuleCall call) { + final Filter filter = call.rel(0); + + if (filter.getTraitSet().contains(Convention.NONE)) { + final RelNode converted = convert(filter); + call.transformTo(converted); + } + } + + RelNode convert(Filter filter) { + final RelTraitSet traitSet = + filter.getTraitSet().replace(ArrowRel.CONVENTION); + return new ArrowFilter(filter.getCluster(), traitSet, + convert(filter.getInput(), ArrowRel.CONVENTION), + filter.getCondition()); + } + + /** Rule configuration. */ + @Value.Immutable + public interface Config extends RelRule.Config { + Config DEFAULT = ImmutableConfig.builder() + .withOperandSupplier(b0 -> + b0.operand(LogicalFilter.class).oneInput(b1 -> + b1.operand(ArrowTableScan.class).noInputs())) + .build(); + + @Override default ArrowFilterRule toRule() { + return new ArrowFilterRule(this); + } + } + } + + /** + * Planner rule that projects from an {@link ArrowTableScan} just the columns + * needed to satisfy a projection. If the projection's expressions are + * trivial, the projection is removed. + * + * @see ArrowRules#PROJECT_SCAN + */ + public static class ArrowProjectRule extends ArrowConverterRule { + + /** Default configuration. */ + protected static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(LogicalProject.class, Convention.NONE, + ArrowRel.CONVENTION, "ArrowProjectRule") + .withRuleFactory(ArrowProjectRule::new); + + /** Creates an ArrowProjectRule. */ + protected ArrowProjectRule(Config config) { + super(config); + } + + @Override public @Nullable RelNode convert(RelNode rel) { + final Project project = (Project) rel; + @Nullable List fields = + ArrowProject.getProjectFields(project.getProjects()); + if (fields == null) { + // Project contains expressions more complex than just field references. + return null; + } + final RelTraitSet traitSet = + project.getTraitSet().replace(ArrowRel.CONVENTION); + return new ArrowProject(project.getCluster(), traitSet, + convert(project.getInput(), ArrowRel.CONVENTION), + project.getProjects(), project.getRowType()); + } + } + + /** + * Rule to convert a relational expression from + * {@link ArrowRel#CONVENTION} to {@link EnumerableConvention}. + */ + static class ArrowToEnumerableConverterRule extends ConverterRule { + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(RelNode.class, ArrowRel.CONVENTION, + EnumerableConvention.INSTANCE, "ArrowToEnumerableConverterRule") + .withRuleFactory(ArrowToEnumerableConverterRule::new); + + /** Creates an ArrowToEnumerableConverterRule. */ + protected ArrowToEnumerableConverterRule(Config config) { + super(config); + } + + @Override public RelNode convert(RelNode rel) { + RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention()); + return new ArrowToEnumerableConverter(rel.getCluster(), newTraitSet, rel); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java new file mode 100644 index 000000000000..c40cf4439a18 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.util.Sources; +import org.apache.calcite.util.Util; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.SeekableReadChannel; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** + * Schema mapped onto a set of Arrow files. + */ +class ArrowSchema extends AbstractSchema { + + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowSchema.class); + private final Supplier> tableMapSupplier; + + /** + * Creates an Arrow schema. + * + * @param baseDirectory Base directory to look for relative files + */ + ArrowSchema(File baseDirectory) { + requireNonNull(baseDirectory, "baseDirectory"); + this.tableMapSupplier = + Suppliers.memoize(() -> deduceTableMap(baseDirectory)); + } + + /** + * Looks for a suffix on a string and returns + * either the string with the suffix removed + * or the original string. + */ + private static String trim(String s, String suffix) { + String trimmed = trimOrNull(s, suffix); + return trimmed != null ? trimmed : s; + } + + /** + * Looks for a suffix on a string and returns + * either the string with the suffix removed + * or null. + */ + private static @Nullable String trimOrNull(String s, String suffix) { + return s.endsWith(suffix) + ? s.substring(0, s.length() - suffix.length()) + : null; + } + + @Override protected Map getTableMap() { + return tableMapSupplier.get(); + } + + private static Map deduceTableMap(File baseDirectory) { + File[] files = baseDirectory.listFiles((dir, name) -> name.endsWith(".arrow")); + if (files == null) { + LOGGER.info("directory " + baseDirectory + " not found"); + return ImmutableMap.of(); + } + + final Map tables = new HashMap<>(); + for (File file : files) { + final File arrowFile = new File(Sources.of(file).path()); + final FileInputStream fileInputStream; + try { + fileInputStream = new FileInputStream(arrowFile); + } catch (FileNotFoundException e) { + throw Util.toUnchecked(e); + } + final SeekableReadChannel seekableReadChannel = + new SeekableReadChannel(fileInputStream.getChannel()); + final RootAllocator allocator = new RootAllocator(); + final ArrowFileReader arrowFileReader = + new ArrowFileReader(seekableReadChannel, allocator); + final String tableName = + trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT); + final ArrowTable table = + new ArrowTable(null, arrowFileReader); + tables.put(tableName, table); + } + + return ImmutableMap.copyOf(tables); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java new file mode 100644 index 000000000000..07810c908cad --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import java.io.File; +import java.util.Map; + +/** + * Factory that creates an {@link ArrowSchema}. + */ +public class ArrowSchemaFactory implements SchemaFactory { + + @Override public Schema create(SchemaPlus parentSchema, String name, + Map operand) { + final File baseDirectory = + (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); + final String directory = (String) operand.get("directory"); + File directoryFile = null; + if (directory != null) { + directoryFile = new File(directory); + } + if (baseDirectory != null) { + if (directoryFile == null) { + directoryFile = baseDirectory; + } else if (!directoryFile.isAbsolute()) { + directoryFile = new File(baseDirectory, directoryFile.getPath()); + } + } + if (directoryFile == null) { + throw new RuntimeException("no directory"); + } + return new ArrowSchema(directoryFile); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java new file mode 100644 index 000000000000..f868756f012a --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import org.apache.arrow.gandiva.evaluator.Filter; +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.gandiva.expression.Condition; +import org.apache.arrow.gandiva.expression.ExpressionTree; +import org.apache.arrow.gandiva.expression.TreeBuilder; +import org.apache.arrow.gandiva.expression.TreeNode; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Arrow Table. + */ +public class ArrowTable extends AbstractTable + implements TranslatableTable, QueryableTable { + private final @Nullable RelProtoDataType protoRowType; + /** Arrow schema. (In Calcite terminology, more like a row type than a Schema.) */ + private final Schema schema; + private final ArrowFileReader arrowFileReader; + + ArrowTable(@Nullable RelProtoDataType protoRowType, ArrowFileReader arrowFileReader) { + try { + this.schema = arrowFileReader.getVectorSchemaRoot().getSchema(); + } catch (IOException e) { + throw Util.toUnchecked(e); + } + this.protoRowType = protoRowType; + this.arrowFileReader = arrowFileReader; + } + + @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { + if (this.protoRowType != null) { + return this.protoRowType.apply(typeFactory); + } + return deduceRowType(this.schema, (JavaTypeFactory) typeFactory); + } + + @Override public Expression getExpression(SchemaPlus schema, String tableName, + Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + /** Called via code generation; see uses of + * {@link org.apache.calcite.adapter.arrow.ArrowMethod#ARROW_QUERY}. */ + @SuppressWarnings("unused") + public Enumerable query(DataContext root, ImmutableIntList fields, + List conditions) { + requireNonNull(fields, "fields"); + final Projector projector; + final Filter filter; + + if (conditions.isEmpty()) { + filter = null; + + final List expressionTrees = new ArrayList<>(); + for (int fieldOrdinal : fields) { + Field field = schema.getFields().get(fieldOrdinal); + TreeNode node = TreeBuilder.makeField(field); + expressionTrees.add(TreeBuilder.makeExpression(node, field)); + } + try { + projector = Projector.make(schema, expressionTrees); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } else { + projector = null; + + final List conditionNodes = new ArrayList<>(conditions.size()); + for (String condition : conditions) { + String[] data = condition.split(" "); + List treeNodes = new ArrayList<>(2); + treeNodes.add( + TreeBuilder.makeField(schema.getFields() + .get(schema.getFields().indexOf(schema.findField(data[0]))))); + treeNodes.add(makeLiteralNode(data[2], data[3])); + String equality = data[1]; + conditionNodes.add( + TreeBuilder.makeFunction(equality, treeNodes, new ArrowType.Bool())); + } + final Condition filterCondition; + if (conditionNodes.size() == 1) { + filterCondition = TreeBuilder.makeCondition(conditionNodes.get(0)); + } else { + TreeNode treeNode = TreeBuilder.makeAnd(conditionNodes); + filterCondition = TreeBuilder.makeCondition(treeNode); + } + + try { + filter = Filter.make(schema, filterCondition); + } catch (GandivaException e) { + throw Util.toUnchecked(e); + } + } + + return new ArrowEnumerable(arrowFileReader, fields, projector, filter); + } + + @Override public Queryable asQueryable(QueryProvider queryProvider, + SchemaPlus schema, String tableName) { + throw new UnsupportedOperationException(); + } + + @Override public Type getElementType() { + return Object[].class; + } + + @Override public RelNode toRel(RelOptTable.ToRelContext context, + RelOptTable relOptTable) { + final int fieldCount = relOptTable.getRowType().getFieldCount(); + final ImmutableIntList fields = + ImmutableIntList.copyOf(Util.range(fieldCount)); + final RelOptCluster cluster = context.getCluster(); + return new ArrowTableScan(cluster, cluster.traitSetOf(ArrowRel.CONVENTION), + relOptTable, this, fields); + } + + private static RelDataType deduceRowType(Schema schema, + JavaTypeFactory typeFactory) { + final RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (Field field : schema.getFields()) { + builder.add(field.getName(), + ArrowFieldType.of(field.getType()).toType(typeFactory)); + } + return builder.build(); + } + + private static TreeNode makeLiteralNode(String literal, String type) { + switch (type) { + case "integer": + return TreeBuilder.makeLiteral(Integer.parseInt(literal)); + case "long": + return TreeBuilder.makeLiteral(Long.parseLong(literal)); + case "float": + return TreeBuilder.makeLiteral(Float.parseFloat(literal)); + case "double": + return TreeBuilder.makeLiteral(Double.parseDouble(literal)); + case "string": + return TreeBuilder.makeStringLiteral(literal.substring(1, literal.length() - 1)); + default: + throw new IllegalArgumentException("Invalid literal " + literal + + ", type " + type); + } + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java new file mode 100644 index 000000000000..651a26a55c51 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.ImmutableIntList; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Relational expression representing a scan of an Arrow collection. + */ +class ArrowTableScan extends TableScan implements ArrowRel { + private final ArrowTable arrowTable; + private final ImmutableIntList fields; + + ArrowTableScan(RelOptCluster cluster, RelTraitSet traitSet, + RelOptTable relOptTable, ArrowTable arrowTable, ImmutableIntList fields) { + super(cluster, traitSet, ImmutableList.of(), relOptTable); + this.arrowTable = arrowTable; + this.fields = fields; + + assert getConvention() == ArrowRel.CONVENTION; + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + checkArgument(inputs.isEmpty()); + return this; + } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("fields", fields); + } + + @Override public RelDataType deriveRowType() { + final List fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.Builder builder = + getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override public void register(RelOptPlanner planner) { + planner.addRule(ArrowRules.TO_ENUMERABLE); + for (RelOptRule rule : ArrowRules.RULES) { + planner.addRule(rule); + } + } + + @Override public void implement(ArrowRel.Implementor implementor) { + implementor.arrowTable = arrowTable; + implementor.table = table; + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java new file mode 100644 index 000000000000..3b90dfd890e3 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.BuiltInMethod; + +import com.google.common.primitives.Ints; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Relational expression representing a scan of a table in an Arrow data source. + */ +class ArrowToEnumerableConverter + extends ConverterImpl implements EnumerableRel { + + protected ArrowToEnumerableConverter(RelOptCluster cluster, + RelTraitSet traitSet, RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traitSet, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + return new ArrowToEnumerableConverter(getCluster(), traitSet, sole(inputs)); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + RelOptCost cost = super.computeSelfCost(planner, mq); + return requireNonNull(cost, "cost").multiplyBy(0.1); + } + + @Override public Result implement(EnumerableRelImplementor implementor, + Prefer pref) { + final ArrowRel.Implementor arrowImplementor = new ArrowRel.Implementor(); + arrowImplementor.visitInput(0, getInput()); + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + getRowType(), + pref.preferArray()); + + final RelOptTable table = requireNonNull(arrowImplementor.table, "table"); + final int fieldCount = table.getRowType().getFieldCount(); + return implementor.result(physType, + Blocks.toBlock( + Expressions.call(table.getExpression(ArrowTable.class), + ArrowMethod.ARROW_QUERY.method, implementor.getRootExpression(), + arrowImplementor.selectFields != null + ? Expressions.call( + BuiltInMethod.IMMUTABLE_INT_LIST_COPY_OF.method, + Expressions.constant( + Ints.toArray(arrowImplementor.selectFields))) + : Expressions.call( + BuiltInMethod.IMMUTABLE_INT_LIST_IDENTITY.method, + Expressions.constant(fieldCount)), + Expressions.constant(arrowImplementor.whereClause)))); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java new file mode 100644 index 000000000000..5b68a130dac2 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.DateString; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.calcite.util.DateTimeStringUtils.ISO_DATETIME_FRACTIONAL_SECOND_FORMAT; +import static org.apache.calcite.util.DateTimeStringUtils.getDateFormatter; + +import static java.util.Objects.requireNonNull; + +/** + * Translates a {@link RexNode} expression to a Gandiva string. + */ +class ArrowTranslator { + final RexBuilder rexBuilder; + final RelDataType rowType; + final List fieldNames; + + /** Private constructor. */ + ArrowTranslator(RexBuilder rexBuilder, RelDataType rowType) { + this.rexBuilder = rexBuilder; + this.rowType = rowType; + this.fieldNames = ArrowRules.arrowFieldNames(rowType); + } + + /** Creates an ArrowTranslator. */ + public static ArrowTranslator create(RexBuilder rexBuilder, + RelDataType rowType) { + return new ArrowTranslator(rexBuilder, rowType); + } + + List translateMatch(RexNode condition) { + List disjunctions = RelOptUtil.disjunctions(condition); + if (disjunctions.size() == 1) { + return translateAnd(disjunctions.get(0)); + } else { + throw new AssertionError("cannot translate " + condition); + } + } + + /** + * Returns the value of the literal. + * + * @param literal Literal to translate + * + * @return The value of the literal in the form of the actual type + */ + private static Object literalValue(RexLiteral literal) { + switch (literal.getTypeName()) { + case TIMESTAMP: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final SimpleDateFormat dateFormatter = + getDateFormatter(ISO_DATETIME_FRACTIONAL_SECOND_FORMAT); + Long millis = literal.getValueAs(Long.class); + return dateFormatter.format(requireNonNull(millis, "millis")); + case DATE: + final DateString dateString = literal.getValueAs(DateString.class); + return requireNonNull(dateString, "dateString").toString(); + default: + return requireNonNull(literal.getValue3()); + } + } + + /** + * Translate a conjunctive predicate to a SQL string. + * + * @param condition A conjunctive predicate + * + * @return SQL string for the predicate + */ + private List translateAnd(RexNode condition) { + List predicates = new ArrayList<>(); + for (RexNode node : RelOptUtil.conjunctions(condition)) { + if (node.getKind() == SqlKind.SEARCH) { + final RexNode node2 = RexUtil.expandSearch(rexBuilder, null, node); + predicates.addAll(translateMatch(node2)); + } else { + predicates.add(translateMatch2(node)); + } + } + return predicates; + } + + /** Translate a binary relation. */ + private String translateMatch2(RexNode node) { + switch (node.getKind()) { + case EQUALS: + return translateBinary("equal", "=", (RexCall) node); + case LESS_THAN: + return translateBinary("less_than", ">", (RexCall) node); + case LESS_THAN_OR_EQUAL: + return translateBinary("less_than_or_equal_to", ">=", (RexCall) node); + case GREATER_THAN: + return translateBinary("greater_than", "<", (RexCall) node); + case GREATER_THAN_OR_EQUAL: + return translateBinary("greater_than_or_equal_to", "<=", (RexCall) node); + default: + throw new AssertionError("cannot translate " + node); + } + } + + /** + * Translates a call to a binary operator, reversing arguments if + * necessary. + */ + private String translateBinary(String op, String rop, RexCall call) { + final RexNode left = call.operands.get(0); + final RexNode right = call.operands.get(1); + @Nullable String expression = translateBinary2(op, left, right); + if (expression != null) { + return expression; + } + expression = translateBinary2(rop, right, left); + if (expression != null) { + return expression; + } + throw new AssertionError("cannot translate op " + op + " call " + call); + } + + /** Translates a call to a binary operator. Returns null on failure. */ + private @Nullable String translateBinary2(String op, RexNode left, RexNode right) { + if (right.getKind() != SqlKind.LITERAL) { + return null; + } + final RexLiteral rightLiteral = (RexLiteral) right; + switch (left.getKind()) { + case INPUT_REF: + final RexInputRef left1 = (RexInputRef) left; + String name = fieldNames.get(left1.getIndex()); + return translateOp2(op, name, rightLiteral); + case CAST: + // FIXME This will not work in all cases (for example, we ignore string encoding) + return translateBinary2(op, ((RexCall) left).operands.get(0), right); + default: + return null; + } + } + + /** Combines a field name, operator, and literal to produce a predicate string. */ + private String translateOp2(String op, String name, RexLiteral right) { + Object value = literalValue(right); + String valueString = value.toString(); + String valueType = getLiteralType(value); + + if (value instanceof String) { + final RelDataTypeField field = requireNonNull(rowType.getField(name, true, false), "field"); + SqlTypeName typeName = field.getType().getSqlTypeName(); + if (typeName != SqlTypeName.CHAR) { + valueString = "'" + valueString + "'"; + } + } + return name + " " + op + " " + valueString + " " + valueType; + } + + private static String getLiteralType(Object literal) { + if (literal instanceof BigDecimal) { + BigDecimal bigDecimalLiteral = (BigDecimal) literal; + int scale = bigDecimalLiteral.scale(); + if (scale == 0) { + return "integer"; + } else if (scale > 0) { + return "float"; + } + } else if (String.class.equals(literal.getClass())) { + return "string"; + } + throw new AssertionError("Invalid literal"); + } +} diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java new file mode 100644 index 000000000000..51ced6b206b3 --- /dev/null +++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Query provider that reads from Arrow files. + */ +package org.apache.calcite.adapter.arrow; diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java new file mode 100644 index 000000000000..19e7bdfd5d59 --- /dev/null +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java @@ -0,0 +1,712 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.Table; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.util.Sources; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for the Apache Arrow adapter. + */ +@Execution(ExecutionMode.SAME_THREAD) +@ExtendWith(ArrowExtension.class) +class ArrowAdapterTest { + private static Map arrow; + private static File arrowDataDirectory; + + @BeforeAll + static void initializeArrowState(@TempDir Path sharedTempDir) throws IOException, SQLException { + URL modelUrl = + Objects.requireNonNull(ArrowAdapterTest.class.getResource("/arrow-model.json"), "url"); + Path sourceModelFilePath = Sources.of(modelUrl).file().toPath(); + Path modelFileTarget = sharedTempDir.resolve("arrow-model.json"); + Files.copy(sourceModelFilePath, modelFileTarget); + + Path arrowFilesDirectory = sharedTempDir.resolve("arrow"); + Files.createDirectory(arrowFilesDirectory); + arrowDataDirectory = arrowFilesDirectory.toFile(); + + File dataLocationFile = arrowFilesDirectory.resolve("arrowdata.arrow").toFile(); + ArrowData arrowDataGenerator = new ArrowData(); + arrowDataGenerator.writeArrowData(dataLocationFile); + arrowDataGenerator.writeScottEmpData(arrowFilesDirectory); + + arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString()); + } + + /** Test to read an Arrow file and check its field names. */ + @Test void testArrowSchema() { + ArrowSchema arrowSchema = new ArrowSchema(arrowDataDirectory); + Map tableMap = arrowSchema.getTableMap(); + RelDataTypeFactory typeFactory = + new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType relDataType = tableMap.get("ARROWDATA").getRowType(typeFactory); + + assertEquals(relDataType.getFieldNames(), + ImmutableList.of("intField", "stringField", "floatField", "longField")); + } + + @Test void testArrowProjectAllFields() { + String sql = "select * from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0; floatField=0.0; longField=0\n" + + "intField=1; stringField=1; floatField=1.0; longField=1\n" + + "intField=2; stringField=2; floatField=2.0; longField=2\n" + + "intField=3; stringField=3; floatField=3.0; longField=3\n" + + "intField=4; stringField=4; floatField=4.0; longField=4\n" + + "intField=5; stringField=5; floatField=5.0; longField=5\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectAllFieldsExplicitly() { + String sql = "select \"intField\", \"stringField\", \"floatField\", \"longField\" " + + "from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0; floatField=0.0; longField=0\n" + + "intField=1; stringField=1; floatField=1.0; longField=1\n" + + "intField=2; stringField=2; floatField=2.0; longField=2\n" + + "intField=3; stringField=3; floatField=3.0; longField=3\n" + + "intField=4; stringField=4; floatField=4.0; longField=4\n" + + "intField=5; stringField=5; floatField=5.0; longField=5\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectAllFieldsExplicitlyPermutation() { + String sql = "select \"stringField\", \"intField\", \"longField\", \"floatField\" " + + "from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(stringField=[$1], intField=[$0], longField=[$3], floatField=[$2])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "stringField=0; intField=0; longField=0; floatField=0.0\n" + + "stringField=1; intField=1; longField=1; floatField=1.0\n" + + "stringField=2; intField=2; longField=2; floatField=2.0\n" + + "stringField=3; intField=3; longField=3; floatField=3.0\n" + + "stringField=4; intField=4; longField=4; floatField=4.0\n" + + "stringField=5; intField=5; longField=5; floatField=5.0\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectSingleField() { + String sql = "select \"intField\" from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0\nintField=1\nintField=2\n" + + "intField=3\nintField=4\nintField=5\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectTwoFields() { + String sql = "select \"intField\", \"stringField\" from arrowdata\n"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0\n" + + "intField=1; stringField=1\n" + + "intField=2; stringField=2\n" + + "intField=3; stringField=3\n" + + "intField=4; stringField=4\n" + + "intField=5; stringField=5\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(6) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithIntegerFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" < 4"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[<($0, 4)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0\n" + + "intField=1; stringField=1\n" + + "intField=2; stringField=2\n" + + "intField=3; stringField=3\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithMultipleFilterSameField() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" > 1 and \"intField\" < 4"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[SEARCH($0, Sarg[(1..4)])])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=2; stringField=2\n" + + "intField=3; stringField=3\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithConjunctiveFilters() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\"=12 and \"stringField\"='12'"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[AND(=($0, 12), =($1, '12'))])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=12; stringField=12\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("OR is not supported yet") + @Test void testArrowProjectFieldsWithDisjunctiveFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\"=12 or \"stringField\"='12'"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[OR(=($0, 12), =($1, '12'))])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=12; stringField=12\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("IN is not supported as OR is not supported yet") + @Test void testArrowProjectFieldsWithInFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" in (0, 1, 2)"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[OR(=($0, 0), =($0, 1), =($0, 2))])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0\n" + + "intField=1; stringField=1\n" + + "intField=2; stringField=2\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("IS NOT NULL is not supported yet") + @Test void testArrowProjectFieldsWithIsNotNullFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" is not null\n" + + "order by \"intField\"\n" + + "limit 1"; + String plan = "PLAN=EnumerableLimit(fetch=[1])\n" + + " EnumerableSort(sort0=[$0], dir0=[ASC])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[IS NOT NULL($0)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0; stringField=0\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("IS NULL is not supported yet") + @Test void testArrowProjectFieldsWithIsNullFilter() { + String sql = "select \"intField\", \"stringField\"\n" + + "from arrowdata\n" + + "where \"intField\" is null"; + String plan = "ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[IS NOT NULL($0)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returnsCount(0) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithFloatFilter() { + String sql = "select * from arrowdata\n" + + " where \"floatField\"=15.0"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowFilter(condition=[=(CAST($2):DOUBLE, 15.0)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=15; stringField=15; floatField=15.0; longField=15\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowProjectFieldsWithFilterOnLaterBatch() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "where \"intField\"=25"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowFilter(condition=[=($0, 25)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=25\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowSubquery() { + String sql = "select \"intField\"\n" + + "from (select \"intField\", \"stringField\" from arrowdata where \"stringField\" = '2')\n" + + "where \"intField\" = 2"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowFilter(condition=[AND(=($1, '2'), =($0, 2))])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n" + + "\n"; + String result = "intField=2\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("UNION does not work") + @Test void testArrowUnion() { + String sql = "(select \"intField\"\n" + + "from arrowdata\n" + + "where \"intField\" = 2)\n" + + " union \n" + + "(select \"intField\"\n" + + "from arrowdata\n" + + "where \"intField\" = 1)\n"; + String plan = "PLAN=EnumerableUnion(all=[false])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowFilter(condition=[=($0, 2)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0])\n" + + " ArrowFilter(condition=[=($0, 1)])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=1\nintField=2\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testFieldWithSpace() { + String sql = "select \"my Field\" from (select \"intField\", \"stringField\" as \"my Field\"\n" + + "from arrowdata)\n" + + "where \"my Field\" = '2'"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(my Field=[$1])\n" + + " ArrowFilter(condition=[=($1, '2')])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "my Field=2\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Disabled("literal with space is not supported") + @Test void testLiteralWithSpace() { + String sql = "select \"intField\", \"stringField\" as \"my Field\"\n" + + "from arrowdata\n" + + "where \"stringField\" = 'literal with space'"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], my Field=[$1])\n" + + " ArrowFilter(condition=[=($1, '2')])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = ""; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testLiteralWithQuote() { + String sql = "select \"intField\", \"stringField\" as \"my Field\"\n" + + "from arrowdata\n" + + "where \"stringField\" = ''''"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], stringField=[$1])\n" + + " ArrowFilter(condition=[=($1, '''')])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = ""; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testTinyIntProject() { + String sql = "select DEPTNO from DEPT"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(DEPTNO=[$0])\n" + + " ArrowTableScan(table=[[ARROW, DEPT]], fields=[[0, 1, 2]])\n\n"; + String result = "DEPTNO=10\nDEPTNO=20\nDEPTNO=30\nDEPTNO=40\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testSmallIntProject() { + String sql = "select EMPNO from EMP"; + String plan = "PLAN=ArrowToEnumerableConverter\n" + + " ArrowProject(EMPNO=[$0])\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + String result = "EMPNO=7369\nEMPNO=7499\nEMPNO=7521\n"; + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(3) + .returns(result) + .explainContains(plan); + } + + @Test void testCastDecimalToInt() { + String sql = "select CAST(LOSAL AS INT) as \"trunc\" from SALGRADE"; + String plan = + "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):INTEGER], trunc=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "trunc=700\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[trunc INTEGER]") + .limit(1) + .returns(result) + .explainContains(plan); + } + + @Test void testCastDecimalToFloat() { + String sql = "select CAST(LOSAL AS FLOAT) as \"extra\" from SALGRADE"; + String plan = "PLAN=EnumerableCalc(expr#0..2=[{inputs}]," + + " expr#3=[CAST($t1):FLOAT], extra=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "extra=700.0\nextra=1201.0\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[extra FLOAT]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testCastDecimalToDouble() { + String sql = "select CAST(LOSAL AS DOUBLE) as \"extra\" from SALGRADE"; + String plan = + "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):DOUBLE], extra=[$t3])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n"; + String result = "extra=700.0\nextra=1201.0\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[extra DOUBLE]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testCastIntToDouble() { + String sql = "select CAST(\"intField\" AS DOUBLE) as \"dbl\" from arrowdata"; + String plan = + "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t0):DOUBLE], dbl=[$t4])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "dbl=0.0\ndbl=1.0\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .typeIs("[dbl DOUBLE]") + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testStringOperation() { + String sql = "select\n" + + " \"stringField\" || '_suffix' as \"field1\"\n" + + "from arrowdata"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=['_suffix'], " + + "expr#5=[||($t1, $t4)], field1=[$t5])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "field1=0_suffix\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(1) + .returns(result) + .explainContains(plan); + } + + + @Disabled("join is not supported yet") + @Test void testJoin() { + String sql = "select t1.\"intField\", t2.\"intField\" " + + "from arrowdata t1 join arrowdata t2 on t1.\"intField\" = t2.\"intField\""; + String plan = "PLAN=EnumerableJoin(condition=[=($0, $4)], joinType=[inner])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0\nintField=1\nintField=2\nintField=3\nintField=4\nintField=5\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(1) + .returns(result) + .explainContains(plan); + } + + @Test void testAggWithoutAggFunctions() { + String sql = "select DISTINCT(\"intField\") as \"dep\" from arrowdata"; + String plan = "PLAN=EnumerableAggregate(group=[{0}])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "dep=0\ndep=1\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testAggWithAggFunctions() { + String sql = "select JOB, SUM(SAL) as TOTAL from EMP GROUP BY JOB"; + String plan = "PLAN=EnumerableAggregate(group=[{2}], TOTAL=[SUM($5)])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + String result = "JOB=SALESMAN; TOTAL=5600.00\nJOB=ANALYST; TOTAL=6000.00\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testFilteredAgg() { + String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP"; + String plan = "PLAN=EnumerableAggregate(group=[{}], SALESSUM=[SUM($0) FILTER $1])\n" + + " EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], " + + "expr#10=[IS TRUE($t9)], SAL=[$t5], $f1=[$t10])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + String result = "SALESSUM=2500.00\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testFilteredAggGroupBy() { + String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP group by EMPNO"; + String plan = "PLAN=EnumerableCalc(expr#0..1=[{inputs}], SALESSUM=[$t1])\n" + + " EnumerableAggregate(group=[{0}], SALESSUM=[SUM($1) FILTER $2])\n" + + " EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], " + + "expr#10=[IS TRUE($t9)], EMPNO=[$t0], SAL=[$t5], $f2=[$t10])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + String result = "SALESSUM=1250.00\nSALESSUM=null\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } + + @Test void testAggGroupedByNullable() { + String sql = "select COMM, SUM(SAL) as SALESSUM from EMP GROUP BY COMM"; + String plan = "PLAN=EnumerableAggregate(group=[{6}], SALESSUM=[SUM($5)])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returnsUnordered("COMM=0.00; SALESSUM=1500.00", + "COMM=1400.00; SALESSUM=1250.00", + "COMM=300.00; SALESSUM=1600.00", + "COMM=500.00; SALESSUM=1250.00", + "COMM=null; SALESSUM=23425.00") + .explainContains(plan); + } + + @Test void testArrowAdapterLimitNoSort() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "limit 2"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n" + + " EnumerableLimit(fetch=[2])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=0\nintField=1\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowLimitOffsetNoSort() { + String sql = "select \"intField\"\n" + + "from arrowdata\n" + + "limit 2 offset 2"; + String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n" + + " EnumerableLimit(offset=[2], fetch=[2])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=2\nintField=3\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .returns(result) + .explainContains(plan); + } + + @Test void testArrowSortOnLong() { + String sql = "select \"intField\" from arrowdata order by \"longField\" desc"; + String plan = "PLAN=EnumerableSort(sort0=[$1], dir0=[DESC])\n" + + " ArrowToEnumerableConverter\n" + + " ArrowProject(intField=[$0], longField=[$3])\n" + + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n"; + String result = "intField=49\nintField=48\n"; + + CalciteAssert.that() + .with(arrow) + .query(sql) + .limit(2) + .returns(result) + .explainContains(plan); + } +} diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java new file mode 100644 index 000000000000..3870bb2e115f --- /dev/null +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; +import org.apache.arrow.adapter.jdbc.JdbcToArrow; +import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; +import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FloatingPointVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; + +import com.google.common.collect.ImmutableList; + +import net.hydromatic.scott.data.hsqldb.ScottHsqldb; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Calendar; +import java.util.List; + +/** + * Class that can be used to generate Arrow sample data into a data directory. + */ +public class ArrowData { + + private final int batchSize; + private final int entries; + private int intValue; + private int stringValue; + private float floatValue; + private long longValue; + + public ArrowData() { + this.batchSize = 20; + this.entries = 50; + this.intValue = 0; + this.stringValue = 0; + this.floatValue = 0; + this.longValue = 0; + } + + private Schema makeArrowSchema() { + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + FieldType intType = FieldType.nullable(new ArrowType.Int(32, true)); + FieldType stringType = FieldType.nullable(new ArrowType.Utf8()); + FieldType floatType = + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)); + FieldType longType = FieldType.nullable(new ArrowType.Int(64, true)); + + childrenBuilder.add(new Field("intField", intType, null)); + childrenBuilder.add(new Field("stringField", stringType, null)); + childrenBuilder.add(new Field("floatField", floatType, null)); + childrenBuilder.add(new Field("longField", longType, null)); + + return new Schema(childrenBuilder.build(), null); + } + + public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException { + List tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE"); + + Connection connection = + DriverManager.getConnection(ScottHsqldb.URI, ScottHsqldb.USER, ScottHsqldb.PASSWORD); + + for (String tableName : tableNames) { + String sql = "SELECT * FROM " + tableName; + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + + Calendar calendar = JdbcToArrowUtils.getUtcCalendar(); + + RootAllocator rootAllocator = new RootAllocator(); + JdbcToArrowConfig config = new JdbcToArrowConfigBuilder() + .setAllocator(rootAllocator) + .setReuseVectorSchemaRoot(true) + .setCalendar(calendar) + .setTargetBatchSize(1024) + .build(); + + ArrowVectorIterator vectorIterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config); + Path tablePath = arrowDataDirectory.resolve(tableName + ".arrow"); + + FileOutputStream fileOutputStream = new FileOutputStream(tablePath.toFile()); + + VectorSchemaRoot vectorSchemaRoot = vectorIterator.next(); + + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel()); + + arrowFileWriter.start(); + arrowFileWriter.writeBatch(); + + while (vectorIterator.hasNext()) { + // refreshes the data in the VectorSchemaRoot with the next batch + vectorIterator.next(); + arrowFileWriter.writeBatch(); + } + + arrowFileWriter.close(); + } + } + + public void writeArrowData(File file) throws IOException { + FileOutputStream fileOutputStream = new FileOutputStream(file); + Schema arrowSchema = makeArrowSchema(); + VectorSchemaRoot vectorSchemaRoot = + VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE)); + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel()); + + arrowFileWriter.start(); + + for (int i = 0; i < this.entries;) { + int numRows = Math.min(this.batchSize, this.entries - i); + vectorSchemaRoot.setRowCount(numRows); + for (Field field : vectorSchemaRoot.getSchema().getFields()) { + FieldVector vector = vectorSchemaRoot.getVector(field.getName()); + switch (vector.getMinorType()) { + case INT: + intField(vector, numRows); + break; + case FLOAT4: + floatField(vector, numRows); + break; + case VARCHAR: + varCharField(vector, numRows); + break; + case BIGINT: + longField(vector, numRows); + break; + default: + throw new IllegalStateException("Not supported type yet: " + vector.getMinorType()); + } + } + arrowFileWriter.writeBatch(); + i += numRows; + } + arrowFileWriter.end(); + arrowFileWriter.close(); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + private void intField(FieldVector fieldVector, int rowCount) { + IntVector intVector = (IntVector) fieldVector; + intVector.setInitialCapacity(rowCount); + intVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + intVector.set(i, 1, intValue); + this.intValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void floatField(FieldVector fieldVector, int rowCount) { + FloatingPointVector floatingPointVector = (FloatingPointVector) fieldVector; + floatingPointVector.setInitialCapacity(rowCount); + floatingPointVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + float value = this.floatValue; + floatingPointVector.setWithPossibleTruncate(i, value); + this.floatValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void varCharField(FieldVector fieldVector, int rowCount) { + VarCharVector varCharVector = (VarCharVector) fieldVector; + varCharVector.setInitialCapacity(rowCount); + varCharVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + String value = String.valueOf(this.stringValue); + varCharVector.set(i, new Text(value)); + this.stringValue++; + } + fieldVector.setValueCount(rowCount); + } + + private void longField(FieldVector fieldVector, int rowCount) { + BigIntVector longVector = (BigIntVector) fieldVector; + longVector.setInitialCapacity(rowCount); + longVector.allocateNew(); + for (int i = 0; i < rowCount; i++) { + longVector.set(i, this.longValue); + this.longValue++; + } + fieldVector.setValueCount(rowCount); + } +} diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java new file mode 100644 index 000000000000..e5edb9196555 --- /dev/null +++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.arrow; + +import org.apache.calcite.config.CalciteSystemProperty; + +import org.apache.arrow.gandiva.evaluator.Projector; +import org.apache.arrow.gandiva.exceptions.GandivaException; +import org.apache.arrow.gandiva.expression.ExpressionTree; +import org.apache.arrow.vector.types.pojo.Schema; + +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * JUnit5 extension to handle Arrow tests. + * + *

Tests will be skipped if the Gandiva library cannot be loaded on the given platform. + */ +class ArrowExtension implements ExecutionCondition { + + /** + * Whether to run this test. + * + *

Enabled by default, unless explicitly disabled from command line + * ({@code -Dcalcite.test.arrow=false}) or if Gandiva library, used to implement arrow + * filtering/projection, cannot be loaded. + * + * @return {@code true} if the test is enabled and can run in the current environment, + * {@code false} otherwise + */ + @Override public ConditionEvaluationResult evaluateExecutionCondition( + final ExtensionContext context) { + + boolean enabled = CalciteSystemProperty.TEST_ARROW.value(); + try { + Schema emptySchema = new Schema(new ArrayList<>(), null); + List expressions = new ArrayList<>(); + Projector.make(emptySchema, expressions); + } catch (GandivaException e) { + // this exception comes from using an empty expression, + // but the JNI library was loaded properly + } catch (UnsatisfiedLinkError e) { + enabled = false; + } + + if (enabled) { + return ConditionEvaluationResult.enabled("Arrow tests enabled"); + } else { + return ConditionEvaluationResult.disabled("Cassandra tests disabled"); + } + } +} diff --git a/arrow/src/test/resources/arrow-model.json b/arrow/src/test/resources/arrow-model.json new file mode 100644 index 000000000000..fed2210b0309 --- /dev/null +++ b/arrow/src/test/resources/arrow-model.json @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "version": "1.0", + "defaultSchema": "ARROW", + "schemas": [ + { + "name": "ARROW", + "type": "custom", + "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory", + "operand": { + "directory": "arrow" + } + } + ] +} diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 79fb7cc56bb6..3cc33230daec 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -97,6 +97,10 @@ dependencies { apiv("net.java.dev.jna:jna") apiv("net.java.dev.jna:jna-platform") apiv("net.sf.opencsv:opencsv") + apiv("org.apache.arrow:arrow-memory-netty", "arrow") + apiv("org.apache.arrow:arrow-vector", "arrow") + apiv("org.apache.arrow:arrow-jdbc", "arrow") + apiv("org.apache.arrow.gandiva:arrow-gandiva", "arrow-gandiva") apiv("org.apache.calcite.avatica:avatica-core", "calcite.avatica") apiv("org.apache.calcite.avatica:avatica-server", "calcite.avatica") apiv("org.apache.cassandra:cassandra-all") diff --git a/build.gradle.kts b/build.gradle.kts index 3ad31fedacef..579d4bd907bb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -212,7 +212,7 @@ val javadocAggregateIncludingTests by tasks.registering(Javadoc::class) { } val adaptersForSqlline = listOf( - ":babel", ":cassandra", ":druid", ":elasticsearch", + ":arrow", ":babel", ":cassandra", ":druid", ":elasticsearch", ":file", ":geode", ":innodb", ":kafka", ":mongodb", ":pig", ":piglet", ":plus", ":redis", ":spark", ":splunk") diff --git a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java index 4db1b306a051..4d454c69fac2 100644 --- a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java +++ b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java @@ -210,6 +210,12 @@ public final class CalciteSystemProperty { return "."; }); + /** + * Whether to run Arrow tests. + */ + public static final CalciteSystemProperty TEST_ARROW = + booleanProperty("calcite.test.arrow", true); + /** * Whether to run MongoDB tests. */ diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java index b98cc91ea48a..6280e11fdf2a 100644 --- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java +++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java @@ -342,6 +342,8 @@ public enum BuiltInMethod { SORTED_MULTI_MAP_ARRAYS(SortedMultiMap.class, "arrays", Comparator.class), SORTED_MULTI_MAP_SINGLETON(SortedMultiMap.class, "singletonArrayIterator", Comparator.class, List.class), + IMMUTABLE_INT_LIST_IDENTITY(ImmutableIntList.class, "identity", int.class), + IMMUTABLE_INT_LIST_COPY_OF(ImmutableIntList.class, "copyOf", int[].class), BINARY_SEARCH5_LOWER(BinarySearch.class, "lowerBound", Object[].class, Object.class, int.class, int.class, Comparator.class), BINARY_SEARCH5_UPPER(BinarySearch.class, "upperBound", Object[].class, diff --git a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java index cb34fe3d5397..c8448aa9bcbc 100644 --- a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java +++ b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java @@ -70,13 +70,25 @@ public static ImmutableIntList of() { * Creates an ImmutableIntList from an array of {@code int}. */ public static ImmutableIntList of(int... ints) { + if (ints.length == 0) { + return EMPTY; + } return new ImmutableIntList(ints.clone()); } + /** Same as {@link #of(int...)}, but less ambiguous for code generators + * and compilers. */ + public static ImmutableIntList copyOf(int... ints) { + return of(ints); + } + /** * Creates an ImmutableIntList from an array of {@code Number}. */ public static ImmutableIntList copyOf(Number... numbers) { + if (numbers.length == 0) { + return EMPTY; + } final int[] ints = new int[numbers.length]; for (int i = 0; i < ints.length; i++) { ints[i] = numbers[i].intValue(); @@ -108,6 +120,9 @@ public static ImmutableIntList copyOf(Iterator list) { private static ImmutableIntList copyFromCollection( Collection list) { + if (list.isEmpty()) { + return EMPTY; + } final int[] ints = new int[list.size()]; int i = 0; for (Number number : list) { @@ -192,7 +207,7 @@ public int[] toIntArray() { return ints.clone(); } - /** Returns an List of {@code Integer}. */ + /** Returns a List of {@code Integer}. */ public List toIntegerList() { ArrayList arrayList = new ArrayList<>(size()); for (int i : ints) { @@ -283,6 +298,9 @@ public static List range(final int lower, final int upper) { * @see Mappings#isIdentity(List, int) */ public static ImmutableIntList identity(int count) { + if (count == 0) { + return EMPTY; + } final int[] integers = new int[count]; for (int i = 0; i < integers.length; i++) { integers[i] = i; diff --git a/gradle.properties b/gradle.properties index 51e13a26c701..1bf19c138e26 100644 --- a/gradle.properties +++ b/gradle.properties @@ -83,6 +83,8 @@ jandex.version=2.2.3.Final # elasticsearch does not like asm:6.2.1+ aggdesigner-algorithm.version=6.0 apiguardian-api.version=1.1.2 +arrow-gandiva.version=15.0.0 +arrow.version=15.0.0 asm.version=7.2 byte-buddy.version=1.9.3 cassandra-all.version=4.1.3 diff --git a/settings.gradle.kts b/settings.gradle.kts index a5ea5367709d..adb373f79b9b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -67,6 +67,7 @@ rootProject.name = "calcite" include( "bom", "release", + "arrow", "babel", "cassandra", "core", diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md index 1ceff6518a0c..844b4a96be9a 100644 --- a/site/_docs/adapter.md +++ b/site/_docs/adapter.md @@ -27,6 +27,7 @@ limitations under the License. A schema adapter allows Calcite to read particular kind of data, presenting the data as tables within a schema. +* [Arrow adapter](arrow_adapter.html) (calcite-arrow) * [Cassandra adapter](cassandra_adapter.html) (calcite-cassandra) * CSV adapter (example/csv) * [Druid adapter](druid_adapter.html) (calcite-druid) diff --git a/site/_docs/arrow_adapter.md b/site/_docs/arrow_adapter.md new file mode 100644 index 000000000000..2dc0a1c99274 --- /dev/null +++ b/site/_docs/arrow_adapter.md @@ -0,0 +1,83 @@ +--- +layout: docs +title: Arrow adapter +permalink: /docs/arrow_adapter.html +--- + + +**Note**: Arrow Adapter is an experimental feature; +changes in public API and usage are expected. + +## Overview + +Calcite's adapter for Apache Arrow is able to read and process data in Arrow +format using SQL. + +It can read files in Arrow's +[Feather format](https://arrow.apache.org/docs/python/feather.html) +(which generally have a `.arrow` suffix) in the same way that the +[File Adapter](file_adapter.html) can read `.csv` files. + +## A simple example + +Let's start with a simple example. First, we need a +[model definition]({{ site.baseurl }}/docs/model.html), +as follows. + +{% highlight json %} +{ + "version": "1.0", + "defaultSchema": "ARROW", + "schemas": [ + { + "name": "ARROW", + "type": "custom", + "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory", + "operand": { + "directory": "arrow" + } + } + ] +} +{% endhighlight %} + +The model file is stored as `arrow/src/test/resources/arrow-model.json`, +so you can connect via [`sqlline`](https://github.com/julianhyde/sqlline) +as follows: + +{% highlight bash %} +$ ./sqlline +sqlline> !connect jdbc:calcite:model=arrow/src/test/resources/arrow-model.json admin admin +sqlline> select * from arrow.test; ++----------+----------+------------+ +| fieldOne | fieldTwo | fieldThree | ++----------+----------+------------+ +| 1 | abc | 1.2 | +| 2 | def | 3.4 | +| 3 | xyz | 5.6 | +| 4 | abcd | 1.22 | +| 5 | defg | 3.45 | +| 6 | xyza | 5.67 | ++----------+----------+------------+ +6 rows selected +{% endhighlight %} + +The `arrow` directory contains a file called `test.arrow`, and so it shows up as +a table called `test`.