Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ant 2636 parquet writesupport #27

Draft
wants to merge 15 commits into
base: develop
Choose a base branch
from
42 changes: 38 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<properties>
<java.version>21</java.version>
<open-api-doc.version>2.0.3</open-api-doc.version>
<parquet.version>1.15.0</parquet.version>
<hadoop.version>3.3.6</hadoop.version>
<sonar.organization>antaressimulatorteam</sonar.organization>
<sonar.projectKey>AntaresSimulatorTeam_antares-datamanager-back</sonar.projectKey>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
Expand All @@ -54,10 +56,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
Expand Down Expand Up @@ -94,6 +92,12 @@
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -162,6 +166,36 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<!-- Parquet Avro dependency -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright (c) 2023, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.rte_france.antares.datamanager_back.util;

/**
* @author Sylvain Leclerc <sylvain.leclerc@rte-france.com>
*/
public enum ColumnType {
INT,
FLOAT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (c) 2023, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.rte_france.antares.datamanager_back.util;

import lombok.Value;

import java.util.List;

@Value
public class Matrix {

List<MatrixColumn> columns;

public int getRowCount() {
if (columns.isEmpty()) {
return 0;
}
return columns.getFirst().getSize();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2025, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.rte_france.antares.datamanager_back.util;

import java.util.Arrays;
import java.util.Objects;

public record MatrixColumn(String name, double[] values) {
public int getSize() {
return values.length;
}

public MatrixColumn {
Objects.requireNonNull(name);
Objects.requireNonNull(values);
}

public MatrixColumn renamed(String newName) {
return new MatrixColumn(newName, values);
}

@Override
public String toString() {
return "MatrixColumn{" +
"name='" + name + '\'' +
", values=" + Arrays.toString(values) +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.rte_france.antares.datamanager_back.util;

import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class MatrixGroupConverter extends GroupConverter {
private final List<MatrixColumn> columns = new ArrayList<>();
private final List<String> columnNames;
private final int rowCount;

public MatrixGroupConverter(GroupType schema, int rowCount) {
Objects.requireNonNull(schema);
this.rowCount = Objects.checkIndex(rowCount, 8761);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need checkIndex method here to check rowCount? if rowCount is 8761 index of rows will be from 0 to 8760

this.columnNames = new ArrayList<>();
for (var field : schema.getFields()) {
columnNames.add(field.getName());
}
}

@Override
public Converter getConverter(int fieldIndex) {
return new PrimitiveConverter() {
private final double[] values = new double[rowCount];
private int currentIndex = 0;

@Override
public void addDouble(double value) {
values[currentIndex++] = value;
}

@Override
public void addFloat(float value) {
values[currentIndex++] = value;
}

@Override
public void addInt(int value) {
values[currentIndex++] = value;
}

@Override
public void addLong(long value) {
values[currentIndex++] = value;
}

@Override
public void addBoolean(boolean value) {
values[currentIndex++] = value ? 1.0 : 0.0;
}
};
}

@Override
public void start() {}

@Override
public void end() {
for (var name : columnNames) {
double[] values = new double[rowCount];
columns.add(new MatrixColumn(name, values));
}
}

public Matrix getMatrix() {
return new Matrix(columns);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.rte_france.antares.datamanager_back.util;

import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;

import java.util.Map;
import java.util.Objects;

public class MatrixReadSupport extends ReadSupport<Matrix> {
private final int rowCount;

public MatrixReadSupport(int rowCount) {
Objects.checkIndex(rowCount, 8761);
this.rowCount = rowCount;
}

@Override
public ReadContext init(InitContext context) {
var schema = context.getFileSchema();
return new ReadContext(schema);
}

@Override
public RecordMaterializer<Matrix> prepareForRead(
org.apache.hadoop.conf.Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema,
ReadContext readContext) {
return new RecordMaterializer<Matrix>() {
private final MatrixGroupConverter converter = new MatrixGroupConverter(fileSchema, rowCount);

@Override
public Matrix getCurrentRecord() {
return converter.getMatrix();
}

@Override
public GroupConverter getRootConverter() {
return converter;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.rte_france.antares.datamanager_back.util;

import lombok.Value;

@Value
class MatrixRow {
Matrix matrix;
int row;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright (c) 2023, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.rte_france.antares.datamanager_back.util;

import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.Collections;
import java.util.stream.Collectors;

class MatrixWriteSupport extends WriteSupport<MatrixRow> {

private final Matrix matrix;
private RecordConsumer consumer;

MatrixWriteSupport(Matrix matrix) {
this.matrix = matrix;
}

@Override
public WriteContext init(org.apache.hadoop.conf.Configuration configuration) {
var colTypes = matrix.getColumns().stream()
.map(c -> doubleType(c.name()))
.collect(Collectors.toList());

var schema = new MessageType("matrix", colTypes);
return new WriteContext(schema, Collections.emptyMap());
}

private static Type type(ColumnType type, String name) {
return switch (type) {
case INT -> intType(name);
case FLOAT -> floatType(name);
default -> throw new IllegalArgumentException("Unknown column type " + type);
};
}

private static Type floatType(String name) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, Type.Repetition.REQUIRED)
.named(name);
}

private static Type intType(String name) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
.named(name);
}

private static Type doubleType(String name) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED)
.named(name);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.consumer = recordConsumer;
}

@Override
public void write(MatrixRow matrixRow) {
consumer.startMessage();
var columnIndex = 0;
for (var c : matrixRow.getMatrix().getColumns()) {
consumer.startField(c.name(), columnIndex);
consumer.addDouble(c.values()[matrixRow.getRow()]);
consumer.endField(c.name(), columnIndex);
columnIndex++;
}
consumer.endMessage();
}
}
Loading
Loading