Skip to content

Commit

Permalink
Merge pull request #50 from DiceTechnology/bugfix/read-db-decorator-w…
Browse files Browse the repository at this point in the history
…ith-buffer

IPInformation decoration prior to building IP DB
  • Loading branch information
stykiaz authored Oct 23, 2018
2 parents e4ea76e + 646a437 commit 12803e4
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 174 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>technology.dice.open</groupId>
<artifactId>dice-where</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<name>dice-where</name>

<description>dice-where is a low memory footprint, highly efficient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,36 @@

package technology.dice.dicewhere.building;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.protobuf.ByteString;
import org.mapdb.DB;
import org.mapdb.DBException;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import technology.dice.dicewhere.api.api.IP;
import technology.dice.dicewhere.api.api.IpInformation;
import technology.dice.dicewhere.decorator.Decorator;
import technology.dice.dicewhere.decorator.DecoratorInformation;
import technology.dice.dicewhere.lineprocessing.SerializedLine;
import technology.dice.dicewhere.lineprocessing.serializers.IPSerializer;
import technology.dice.dicewhere.lineprocessing.serializers.protobuf.IPInformationProto;
import technology.dice.dicewhere.parsing.ParsedLine;
import technology.dice.dicewhere.provider.ProviderKey;
import technology.dice.dicewhere.utils.IPUtils;
import technology.dice.dicewhere.utils.ProtoValueConverter;

public class DatabaseBuilder implements Runnable {
private final BlockingQueue<SerializedLine> source;
Expand All @@ -28,12 +44,13 @@ public class DatabaseBuilder implements Runnable {
private final DB.TreeMapSink<IP, byte[]> sink;
private boolean expectingMore;
private int processedLines = 0;
private final Decorator<? extends DecoratorInformation> decorator;

public DatabaseBuilder(
ProviderKey provider,
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener) {

DatabaseBuilderListener listener,
Decorator<? extends DecoratorInformation> decorator) {
this.source = source;
this.expectingMore = true;
this.listener = listener;
Expand All @@ -53,20 +70,33 @@ public DatabaseBuilder(
Objects.requireNonNull(provider).name(), new IPSerializer(), Serializer.BYTE_ARRAY)
.createFromSink();
this.sink = sink;
this.decorator = decorator;
}

public DatabaseBuilder(
ProviderKey provider,
BlockingQueue<SerializedLine> source,
DatabaseBuilderListener listener) {

this(provider, source, listener, null);
}

public void dontExpectMore() {
expectingMore = false;
}

public int reimainingLines() {
public int remainingLines() {
return source.size();
}

public int processedLines() {
return processedLines;
}

protected Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
return Optional.ofNullable(decorator);
}

@Override
public void run() {
while (expectingMore || source.size() > 0) {
Expand All @@ -77,12 +107,15 @@ public void run() {
for (SerializedLine currentLine : availableForAdding) {
try {
beingProcessed = currentLine;
sink.put(currentLine.getStartIp(), currentLine.getInfo());
decorateEntry(currentLine.getParsedLine().getInfo())
.forEach(i -> sink.put(i.getStartOfRange(), buildIpProtobuf(i).toByteArray()));
processedLines++;
listener.lineAdded(provider, currentLine);

} catch (DBException.NotSorted e) {
listener.lineOutOfOrder(provider, beingProcessed, e);
} catch (Exception e) {
throw new RuntimeException("Database builder interrupted", e);
}
}
} catch (InterruptedException e) {
Expand All @@ -92,6 +125,32 @@ public void run() {
}
}

private Stream<IpInformation> decorateEntry(IpInformation entry) throws UnknownHostException {
if (getDecorator().isPresent()) {
return getDecorator().get().decorate(entry);
} else {
return Stream.of(entry);
}
}

private IPInformationProto.IpInformationProto buildIpProtobuf(IpInformation input) {
IPInformationProto.IpInformationProto.Builder messageBuilder =
IPInformationProto.IpInformationProto.newBuilder()
.setCity(input.getCity().orElse(""))
.setGeonameId(input.getGeonameId().orElse(""))
.setCountryCodeAlpha2(input.getCountryCodeAlpha2())
.setLeastSpecificDivision(input.getLeastSpecificDivision().orElse(""))
.setMostSpecificDivision(input.getMostSpecificDivision().orElse(""))
.setPostcode(input.getPostcode().orElse(""))
.setStartOfRange(ByteString.copyFrom(input.getStartOfRange().getBytes()))
.setEndOfRange(ByteString.copyFrom(input.getEndOfRange().getBytes()))
.setIsVpn(ProtoValueConverter.toThreeStateValue(input.isVpn().orElse(null)));

input.getOriginalLine().ifPresent(messageBuilder::setOriginalLine);

return messageBuilder.build();
}

public IPDatabase build() {
return new IPDatabase(sink.create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ public abstract class Decorator<T extends DecoratorInformation> {
*/
public Stream<IpInformation> decorate(IpInformation original) throws UnknownHostException {
Objects.requireNonNull(original);
List<List<T>> extraInformation = databaseReaders.entrySet().stream().map(e -> e.getValue().fetchForRange(original.getStartOfRange(), original.getEndOfRange())).collect(ImmutableList.toImmutableList());
List<List<T>> extraInformation =
databaseReaders
.entrySet()
.stream()
.map(
e ->
e.getValue()
.fetchForRange(original.getStartOfRange(), original.getEndOfRange()))
.collect(ImmutableList.toImmutableList());

return this.mergeIpInfoWithDecoratorInformation(
original, mergeDecorationRanges(extraInformation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.protobuf.ByteString;
import technology.dice.dicewhere.api.exceptions.LineParsingException;
import technology.dice.dicewhere.lineprocessing.serializers.protobuf.IPInformationProto.IpInformationProto;
import technology.dice.dicewhere.parsing.LineParser;
import technology.dice.dicewhere.parsing.ParsedLine;
import technology.dice.dicewhere.reading.RawLine;
import technology.dice.dicewhere.utils.ProtoValueConverter;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -135,7 +133,7 @@ private ImmutableList<SerializedLine> buildSerializedLineBatch(
long started, Collection<RawLine> batch) {
return batch
.stream()
.flatMap(rawline -> attemptParse(rawline, started))
.flatMap(rawLine -> attemptParse(rawLine, started))
.collect(ImmutableList.toImmutableList());
}

Expand All @@ -159,33 +157,13 @@ private Stream<SerializedLine> attemptParse(RawLine rawLine, long started) {

private Stream<SerializedLine> attemptSerialize(ParsedLine parsedLine) {
try {
IpInformationProto message = createIpProtobuf(parsedLine);

return Stream.of(
new SerializedLine(parsedLine.getStartIp(), message.toByteArray(), parsedLine));
new SerializedLine(parsedLine.getStartIp(), parsedLine));

} catch (Exception e) {
progressListener.serializeError(parsedLine, e);
return Stream.empty();
}
}

private IpInformationProto createIpProtobuf(ParsedLine parsedLine) {
IpInformationProto.Builder messageBuilder =
IpInformationProto.newBuilder()
.setCity(parsedLine.getInfo().getCity().orElse(""))
.setGeonameId(parsedLine.getInfo().getGeonameId().orElse(""))
.setCountryCodeAlpha2(parsedLine.getInfo().getCountryCodeAlpha2())
.setLeastSpecificDivision(parsedLine.getInfo().getLeastSpecificDivision().orElse(""))
.setMostSpecificDivision(parsedLine.getInfo().getMostSpecificDivision().orElse(""))
.setPostcode(parsedLine.getInfo().getPostcode().orElse(""))
.setStartOfRange(ByteString.copyFrom(parsedLine.getStartIp().getBytes()))
.setEndOfRange(ByteString.copyFrom(parsedLine.getEndIp().getBytes()))
.setIsVpn(
ProtoValueConverter.toThreeStateValue(parsedLine.getInfo().isVpn().orElse(null)));

parsedLine.getInfo().getOriginalLine().ifPresent(messageBuilder::setOriginalLine);

return messageBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,16 @@
public class SerializedLine {
private ParsedLine parsedLine;
private final IP startIp;
private final byte[] info;

public SerializedLine(IP startIp, byte[] info, ParsedLine parsedLine) {
public SerializedLine(IP startIp, ParsedLine parsedLine) {
this.startIp = startIp;
this.info = info;
this.parsedLine = parsedLine;
}

public IP getStartIp() {
return startIp;
}

public byte[] getInfo() {
return info;
}

public ParsedLine getParsedLine() {
return parsedLine;
}
Expand Down
23 changes: 4 additions & 19 deletions src/main/java/technology/dice/dicewhere/parsing/LineParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,16 @@

public abstract class LineParser {

protected abstract Optional<Decorator<? extends DecoratorInformation>> getDecorator();
public abstract Optional<Decorator<? extends DecoratorInformation>> getDecorator();

public Stream<ParsedLine> parse(RawLine rawLine, boolean retainOriginalLine)
throws LineParsingException {
IpInformation parsedInfo = this.parseLine(rawLine, retainOriginalLine);
try {
return decorateParsedLine(parsedInfo, rawLine);
} catch (UnknownHostException e) {
// may be we need another exception here, as this will be triggered by the decorators
throw new LineParsingException(e, rawLine);
}
return Stream.of(
new ParsedLine(
parsedInfo.getStartOfRange(), parsedInfo.getEndOfRange(), parsedInfo, rawLine));
}

protected abstract IpInformation parseLine(RawLine rawLine, boolean retainOriginalLine)
throws LineParsingException;

private Stream<ParsedLine> decorateParsedLine(IpInformation ipInfo, RawLine rawLine)
throws UnknownHostException {
if (getDecorator().isPresent()) {
Stream<IpInformation> decoratedIpInfo = getDecorator().get().decorate(ipInfo);
return decoratedIpInfo.map(
info -> new ParsedLine(info.getStartOfRange(), info.getEndOfRange(), info, rawLine));
} else {
return Stream.of(
new ParsedLine(ipInfo.getStartOfRange(), ipInfo.getEndOfRange(), ipInfo, rawLine));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DbIpIpToCityLiteCSVLineParser(Decorator<? extends DecoratorInformation> d
}

@Override
protected Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
public Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
return Optional.ofNullable(decorator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DbIpIpToCountryLiteCSVLineParser(Decorator<? extends DecoratorInformation
}

@Override
protected Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
public Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
return Optional.ofNullable(decorator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public DbIpIpToLocationAndIspCSVLineParser(Decorator<? extends DecoratorInformat
}

@Override
protected Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
public Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
return Optional.ofNullable(decorator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public MaxmindLineParser(Map<String, MaxmindLocation> locationDictionary) {
}

@Override
protected Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
public Optional<Decorator<? extends DecoratorInformation>> getDecorator() {
return Optional.ofNullable(decorator);
}

Expand Down
10 changes: 7 additions & 3 deletions src/main/java/technology/dice/dicewhere/reading/LineReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public final IPDatabase read(
LineReaderListener readerListener,
LineProcessorListener processListener,
DatabaseBuilderListener buildingListener,
int workersCount)
throws IOException {
int workersCount) {

long before = System.currentTimeMillis();
ExecutorService parserExecutorService =
Expand All @@ -134,8 +133,13 @@ public final IPDatabase read(
retainOriginalLine,
new LineprocessorListenerForProvider(provider(), processListener),
workersCount);

DatabaseBuilder databaseBuilder =
new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener);
parser()
.getDecorator()
.map(d -> new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener, d))
.orElseGet(
() -> new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener));

Future processorFuture = setupExecutorService.submit(processor);
Future databaseBuilderFuture = setupExecutorService.submit(databaseBuilder);
Expand Down
33 changes: 15 additions & 18 deletions src/main/java/technology/dice/dicewhere/utils/IPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,23 @@

public class IPUtils {

public static IP increment(IP ip) throws UnknownHostException {
return increment(ip, 1);
}
public static IP increment(IP ip) throws UnknownHostException {
return increment(ip, 1);
}

public static IP decrement(IP ip) throws UnknownHostException {
return increment(ip, -1);
}
public static IP decrement(IP ip) throws UnknownHostException {
return increment(ip, -1);
}

public static IP increment(IP ip, int increment) throws UnknownHostException {
return new IP(from(ip.getBytes()).increment(increment).getBytes());
}
public static IP increment(IP ip, int increment) throws UnknownHostException {
return new IP(from(ip.getBytes()).increment(increment).getBytes());
}

public static IPAddress from(IP ip) throws UnknownHostException {
return from(ip.getBytes());
}

public static IPAddress from(byte[] bytes) throws UnknownHostException {
return new IPAddressString(
InetAddress.getByAddress(bytes).getCanonicalHostName())
.getAddress();
}
public static IPAddress from(IP ip) throws UnknownHostException {
return from(ip.getBytes());
}

public static IPAddress from(byte[] bytes) throws UnknownHostException {
return new IPAddressString(InetAddress.getByAddress(bytes).getHostAddress()).getAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class DecoratorTestUtils {
+ "1.0.5.32/28,1,1,0,0,0\n"
+ "1.0.7.16/29,1,1,0,0,0\n"
+ "1.0.8.16/29,1,1,0,0,0\n"
+ "1.0.8.32/29,1,1,0,0,0";
+ "1.0.8.32/29,1,1,0,0,0\n"
+ "1.0.32.1/32,1,1,0,0,0";

public static final String IPv6_LINES =
"network,is_anonymous,is_anonymous_vpn,is_hosting_provider,is_public_proxy,is_tor_exit_node\n"
Expand Down
Loading

0 comments on commit 12803e4

Please sign in to comment.