Skip to content

Commit

Permalink
Merge pull request #60 from DiceTechnology/feature/different_ways_of_…
Browse files Browse the repository at this point in the history
…ip_storage

Adding possibility to store IPs in memory
  • Loading branch information
fifol-img authored Jun 13, 2019
2 parents 29af09e + 71089f1 commit ff7c63b
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 49 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ wip
## Internals
wip: stages of processing, threading etc
### Line reader
wip

`Line reader` aka. `Provider` processes the raw data into easy to look up format, to achieve optimal performance. Depends on characteristic of your application you can choose one of many storage types:
* StorageMode.HEAP
* StorageMode.HEAP_BYTE_ARRAY
* StorageMode.OFF_HEAP
* StorageMode.FILE

Those are directly linked to mapdb modes described [here](http://www.mapdb.org/book/performance/). Default one is `StorageMode.FILE`
### Line processor
wip
### Database builder
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>technology.dice.open</groupId>
<artifactId>dice-where</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<name>dice-where</name>

<description>dice-where is a low memory footprint, highly efficient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,9 @@

package technology.dice.dicewhere.building;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.protobuf.ByteString;
import org.jetbrains.annotations.NotNull;
import org.mapdb.DB;
import org.mapdb.DBException;
import org.mapdb.DBMaker;
Expand All @@ -32,11 +20,18 @@
import technology.dice.dicewhere.lineprocessing.SerializedLine;
import technology.dice.dicewhere.lineprocessing.serializers.IPSerializer;
import technology.dice.dicewhere.lineprocessing.serializers.protobuf.IPInformationProto;
import technology.dice.dicewhere.parsing.ParsedLine;
import technology.dice.dicewhere.provider.ProviderKey;
import technology.dice.dicewhere.utils.IPUtils;
import technology.dice.dicewhere.utils.ProtoValueConverter;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class DatabaseBuilder implements Runnable {
private final BlockingQueue<SerializedLine> source;
private final DatabaseBuilderListener listener;
Expand All @@ -51,19 +46,29 @@ public DatabaseBuilder(
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener,
Decorator<? extends DecoratorInformation> decorator) {
this(StorageMode.FILE, provider, source, listener, decorator);
}

public DatabaseBuilder(
StorageMode storageMode,
ProviderKey provider,
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener) {

this(storageMode, provider, source, listener, null);
}

public DatabaseBuilder(
StorageMode storageMode,
ProviderKey provider,
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener,
Decorator<? extends DecoratorInformation> decorator) {
this.source = source;
this.expectingMore = true;
this.listener = listener;
this.provider = provider;
DB db =
DBMaker.tempFileDB()
.checksumHeaderBypass()
.fileLockDisable()
.fileMmapEnable()
.fileChannelEnable()
.transactionEnable()
.fileDeleteAfterClose()
.make();
DB db = createDB(storageMode);

DB.TreeMapSink<IP, byte[]> sink =
db.treeMap(
Expand All @@ -73,12 +78,33 @@ public DatabaseBuilder(
this.decorator = decorator;
}

public DatabaseBuilder(
ProviderKey provider,
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener) {

this(provider, source, listener, null);
@NotNull
private DB createDB(StorageMode storageMode) {
DB db;
switch (storageMode) {
case HEAP:
db = DBMaker.heapDB().checksumHeaderBypass().transactionEnable().make();
break;
case HEAP_BYTE_ARRAY:
db = DBMaker.memoryDB().checksumHeaderBypass().transactionEnable().make();
break;
case OFF_HEAP:
db = DBMaker.memoryDirectDB().checksumHeaderBypass().transactionEnable().make();
break;
case FILE:
default:
db =
DBMaker.tempFileDB()
.checksumHeaderBypass()
.fileLockDisable()
.fileMmapEnable()
.fileChannelEnable()
.transactionEnable()
.fileDeleteAfterClose()
.make();
break;
}
return db;
}

public void dontExpectMore() {
Expand Down Expand Up @@ -154,4 +180,11 @@ private IPInformationProto.IpInformationProto buildIpProtobuf(IpInformation inpu
public IPDatabase build() {
return new IPDatabase(sink.create());
}

public enum StorageMode {
HEAP,
HEAP_BYTE_ARRAY,
OFF_HEAP,
FILE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package technology.dice.dicewhere.provider.dbip.reading;

import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.parsing.LineParser;
Expand All @@ -15,12 +16,18 @@
public class DbIpCityLiteLineReader extends DbIpLineReader {

public DbIpCityLiteLineReader(Path csv) {
super(csv);
this(csv, null);
}

public DbIpCityLiteLineReader(Path csv, Decorator<? extends DecoratorInformation> decorator) {
this(csv, decorator, DatabaseBuilder.StorageMode.FILE);
}

public DbIpCityLiteLineReader(
Path csv, Decorator<? extends DecoratorInformation> decorator) {
super(csv, decorator);
Path csv,
Decorator<? extends DecoratorInformation> decorator,
DatabaseBuilder.StorageMode storageMode) {
super(csv, decorator, storageMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package technology.dice.dicewhere.provider.dbip.reading;

import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.parsing.LineParser;
Expand All @@ -15,12 +16,18 @@
public class DbIpCountryLiteLineReader extends DbIpLineReader {

public DbIpCountryLiteLineReader(Path csv) {
super(csv);
this(csv, null);
}

public DbIpCountryLiteLineReader(Path csv, Decorator<? extends DecoratorInformation> decorator) {
this(csv, decorator, DatabaseBuilder.StorageMode.FILE);
}

public DbIpCountryLiteLineReader(
Path csv, Decorator<? extends DecoratorInformation> decorator) {
super(csv, decorator);
Path csv,
Decorator<? extends DecoratorInformation> decorator,
DatabaseBuilder.StorageMode storageMode) {
super(csv, decorator, storageMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@

package technology.dice.dicewhere.provider.dbip.reading;

import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.parsing.LineParser;
import technology.dice.dicewhere.provider.ProviderKey;
import technology.dice.dicewhere.provider.dbip.DbIpProviderKey;
import technology.dice.dicewhere.reading.LineReader;

import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;

public abstract class DbIpLineReader extends LineReader {
private static final int BUFFER_SIZE = 1024 * 1024;
private final LineParser lineParser;
Expand All @@ -28,7 +29,16 @@ public DbIpLineReader(@NotNull Path csv) {
this(csv, null);
}

public DbIpLineReader(@NotNull Path csv, @Nullable Decorator<? extends DecoratorInformation> decorator) {
public DbIpLineReader(
@NotNull Path csv, @Nullable Decorator<? extends DecoratorInformation> decorator) {
this(csv, decorator, DatabaseBuilder.StorageMode.FILE);
}

public DbIpLineReader(
@NotNull Path csv,
@Nullable Decorator<? extends DecoratorInformation> decorator,
@NotNull DatabaseBuilder.StorageMode storageMode) {
super(storageMode);
lineParser = buildLineParser(decorator);
this.csv = csv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package technology.dice.dicewhere.provider.dbip.reading;

import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.parsing.LineParser;
Expand All @@ -15,12 +16,19 @@
public class DbIpLocationAndIspLineReader extends DbIpLineReader {

public DbIpLocationAndIspLineReader(Path csv) {
super(csv);
this(csv, null);
}

public DbIpLocationAndIspLineReader(
Path csv, Decorator<? extends DecoratorInformation> decorator) {
super(csv, decorator);
this(csv, decorator, DatabaseBuilder.StorageMode.FILE);
}

public DbIpLocationAndIspLineReader(
Path csv,
Decorator<? extends DecoratorInformation> decorator,
DatabaseBuilder.StorageMode storageMode) {
super(csv, decorator, storageMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.parsing.LineParser;
Expand Down Expand Up @@ -40,7 +41,17 @@ public MaxmindDbReader(
@NotNull Path ipV6CSV,
@Nullable Decorator<? extends DecoratorInformation> decorator)
throws IOException {
this(locationNames, ipV4CSV, ipV6CSV, decorator, DatabaseBuilder.StorageMode.FILE);
}

public MaxmindDbReader(
@NotNull Path locationNames,
@NotNull Path ipV4CSV,
@NotNull Path ipV6CSV,
@Nullable Decorator<? extends DecoratorInformation> decorator,
@NotNull DatabaseBuilder.StorageMode storageMode)
throws IOException {
super(storageMode);
ipV4CSVPath = ipV4CSV;
ipV6CSVPath = ipV6CSV;
MaxmindLocationsParser locationsParser = new MaxmindLocationsParser();
Expand Down
34 changes: 30 additions & 4 deletions src/main/java/technology/dice/dicewhere/reading/LineReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jetbrains.annotations.NotNull;
import technology.dice.dicewhere.building.DatabaseBuilder;
import technology.dice.dicewhere.building.DatabaseBuilderListener;
import technology.dice.dicewhere.building.IPDatabase;
Expand All @@ -19,7 +20,13 @@
import technology.dice.dicewhere.parsing.LineParser;
import technology.dice.dicewhere.provider.ProviderKey;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.io.SequenceInputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,7 +35,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.concurrent.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
Expand All @@ -40,9 +52,18 @@
*/
public abstract class LineReader {
private static final int LINES_BUFFER = 100000;
private final DatabaseBuilder.StorageMode storageMode;
public static byte[] MAGIC_ZIP = {'P', 'K', 0x3, 0x4};
public static int MAGIG_GZIP = 0xff00;

public LineReader() {
this(DatabaseBuilder.StorageMode.FILE);
}

public LineReader(@NotNull DatabaseBuilder.StorageMode storageMode) {
this.storageMode = storageMode;
}

public abstract ProviderKey provider();

public abstract LineParser parser();
Expand Down Expand Up @@ -137,9 +158,14 @@ public final IPDatabase read(
DatabaseBuilder databaseBuilder =
parser()
.getDecorator()
.map(d -> new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener, d))
.map(
d ->
new DatabaseBuilder(
storageMode, provider(), serializedLinesBuffer, buildingListener, d))
.orElseGet(
() -> new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener));
() ->
new DatabaseBuilder(
storageMode, provider(), serializedLinesBuffer, buildingListener));

Future processorFuture = setupExecutorService.submit(processor);
Future databaseBuilderFuture = setupExecutorService.submit(databaseBuilder);
Expand Down

0 comments on commit ff7c63b

Please sign in to comment.