Skip to content

Commit

Permalink
Merge branch 'feature/faster-csv-parser' into 'development'
Browse files Browse the repository at this point in the history
Faster CSV library using SimpleFlatMapper

Closes #16

See merge request rml/proc/dataio!17
  • Loading branch information
ghsnd committed Oct 30, 2023
2 parents 5cc0b6f + ab587d3 commit dc40d10
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Require Java 17 (or more recent)
- Use SFM for CSV parsing

### Fixed
- Updated Maven Surefire plugin to 3.1.2
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
<artifactId>opencsv</artifactId>
<version>5.8</version>
</dependency>
<dependency>
<groupId>org.simpleflatmapper</groupId>
<artifactId>sfm-csv</artifactId>
<version>8.2.3</version>
</dependency>

<!--======================================-->
<!-- Microsoft document format processing -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package be.ugent.idlab.knows.dataio.access;

public class COMPRESSION {
public static final String GZIP = "gzip";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@
import be.ugent.idlab.knows.dataio.iterators.csvw.CSVWConfiguration;
import be.ugent.idlab.knows.dataio.record.CSVRecord;
import be.ugent.idlab.knows.dataio.record.Record;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;
import be.ugent.idlab.knows.dataio.utils.CSVNullInjector;
import org.simpleflatmapper.lightningcsv.CsvParser;

import java.io.*;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;

public class CSVWSourceIterator extends SourceIterator {
@Serial
private static final long serialVersionUID = -5824558388620967495L;
private static final int BUFFER_SIZE = 1024 * 128; // 128 KiB
private final Access access;
private final CSVWConfiguration config;
private transient String[] header;
private transient String[] next;
private transient CSVReader reader;
private transient InputStreamReader inputReader;
private transient Iterator<String[]> iterator;

public CSVWSourceIterator(Access access, CSVWConfiguration config) throws Exception {
this.access = access;
Expand All @@ -34,55 +36,49 @@ private void readObject(ObjectInputStream inputStream) throws Exception {
this.bootstrap();
}

/**
* Instantiates transient fields. This code needs to be run both at construction time and after deserialization
*/
private void bootstrap() throws Exception {
this.reader = new CSVReaderBuilder(new InputStreamReader(access.getInputStream(), config.getEncoding()))
.withCSVParser(this.config.getParser())
.withSkipLines(this.config.isSkipHeader() ? 1 : 0)
.build();
this.inputReader = new InputStreamReader(access.getInputStream(), this.config.getEncoding());
CSVNullInjector injector = new CSVNullInjector(inputReader, BUFFER_SIZE, this.config.getDelimiter(), this.config.getQuoteCharacter());

CsvParser.DSL parser = config.getSFMParser(BUFFER_SIZE);
this.iterator = parser.iterator(injector.reader());

if (this.config.isSkipHeader()) {
this.header = config.getHeader().toArray(new String[0]);
} else {
this.header = readLine();

if (header == null) {
throw new IllegalStateException("Unable to read the file!");
}
this.header = nextLine();
}

this.next = readLine();
this.next = nextLine();
}

private String[] readLine() throws IOException {
String[] line;
do {
try {
line = this.reader.readNext();
private String[] nextLine() {
if (this.iterator.hasNext()) {
String[] r = this.iterator.next();
// go over the lines till uncommented line found
while (r[0].startsWith(config.getCommentPrefix()) && this.iterator.hasNext()) {
r = this.iterator.next();
}

if (line == null) {
return null;
}
} catch (CsvValidationException e) {
throw new IllegalArgumentException(String.format("File does not conform to configuration! Offending line: %s", Arrays.toString(this.reader.peek())));
if (r[0].startsWith(config.getCommentPrefix())) {
return null;
}
} while (invalidLine(line));

return line;
}
// replace any occurrence of an escaped quote with a single quote
for (int i = 0; i < r.length; i++) {
String s = r[i];
// trim the string that is quoted
if (s.startsWith("\"") && s.endsWith("\"")) {
s = s.substring(1, s.length() - 1);
}

/**
* Checks if the passed line corresponds to the filters set
* A line is considered valid if it doesn't start with the comment prefix
* If the first value is null, the line is accepted
*
* @param line line to be checked
* @return true if the line passes all checks
*/
private boolean invalidLine(String[] line) {
return line[0] != null && line[0].startsWith(this.config.getCommentPrefix());
s = s.replaceAll("\"\"", "\"");
r[i] = s;
}

return r;
}
return null;
}

/**
Expand All @@ -94,7 +90,7 @@ private boolean invalidLine(String[] line) {
public CSVRecord replaceNulls(CSVRecord record) {
Map<String, String> data = record.getData();
data.forEach((key, value) -> {
if (value != null && this.config.getNulls().contains(value)) {
if (this.config.getNulls().contains(value)) {
data.put(key, null);
}
});
Expand Down Expand Up @@ -122,13 +118,9 @@ public Record next() {
if (this.next == null) {
throw new NoSuchElementException();
}

String[] line = this.next;
try {
this.next = readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}

this.next = nextLine();

if (!config.getTrim().equals("false")) {
line = applyTrimArray(line, config.getTrim());
Expand All @@ -144,6 +136,6 @@ public boolean hasNext() {

@Override
public void close() throws IOException {
this.reader.close();
this.inputReader.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package be.ugent.idlab.knows.dataio.iterators.csvw;

import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;

import be.ugent.idlab.knows.dataio.utils.CSVNullInjector;
import org.simpleflatmapper.lightningcsv.CsvParser;

import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -49,7 +50,11 @@ public final class CSVWConfiguration implements Serializable {
this.skipHeader = skipHeader;
this.commentPrefix = commentPrefix;
this.header = header;
this.nulls = nulls;

List<String> nullValues = new ArrayList<>(nulls);
nullValues.add(CSVNullInjector.NULL_VALUE); // add our special null value

this.nulls = nullValues;
this.encoding = encoding;
}

Expand Down Expand Up @@ -93,12 +98,11 @@ public String getEncoding() {
return encoding;
}

public CSVParser getParser() {
return new CSVParserBuilder()
.withSeparator(this.delimiter)
.withEscapeChar(this.escapeCharacter)
.withQuoteChar(this.quoteCharacter)
.withFieldAsNull(CSVReaderNullFieldIndicator.EMPTY_SEPARATORS)
.build();
public CsvParser.DSL getSFMParser(int bufferSize) {
return CsvParser
.separator(this.delimiter)
.escape(this.escapeCharacter)
.quote(this.quoteCharacter)
.bufferSize(bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class CSVRecord extends Record {
private final Map<String, String> datatypes;

public CSVRecord(String[] header, String[] data, Map<String, String> datatypes) {
this.data = new HashMap<>();
this.data = new HashMap<>(header.length);
if (header.length > data.length) {
logger.warn("Header has more columns than this row");
}
Expand All @@ -26,7 +26,7 @@ public CSVRecord(String[] header, String[] data, Map<String, String> datatypes)
if (i < data.length) {
this.data.put(header[i], data[i]);
} else {
this.data.put(header[i], "");
this.data.put(header[i], null);
}
}
this.datatypes = datatypes;
Expand Down
Loading

0 comments on commit dc40d10

Please sign in to comment.