Skip to content

Commit

Permalink
Merge pull request #47 from DiceTechnology/feature/issue-12/maxmind-d…
Browse files Browse the repository at this point in the history
…atabase-downloader

Feature/issue 12/maxmind database downloader
  • Loading branch information
stykiaz authored Oct 18, 2018
2 parents ab3a6fd + 6b22e4f commit e4ea76e
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
target/
.idea/
bin/
scripts/db_downloaders/maxmind_location/config
scripts/db_downloaders/maxmind_anonymous/config
*.iml
.DS_Store
*.mmdb
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>technology.dice.open</groupId>
<artifactId>dice-where</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<name>dice-where</name>

<description>dice-where is a low memory footprint, highly efficient
Expand Down
25 changes: 25 additions & 0 deletions scripts/db_downloaders/functions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

download_maxmind_db () {
zip_url=$1
zip_file_destination=$2
unzip_destination_directory=$3

echo "Will download ${zip_url} to ${zip_file_destination}"
wget -O ${zip_file_destination} ${zip_url}
echo "Finished downloading ${zip_url}"
echo ""

inzip_path="$(unzip -l ${zip_file_destination} | sed -n 4p | awk '{split($4,path,"/"); print path[1]}')"
db_ts="$(unzip -l ${zip_file_destination} | sed -n 4p | awk '{split($4,path,"/"); print path[1]}' | awk '{ split($1,ts,"_"); print ts[2]}')"

echo "Will unzip ${zip_file_destination} into ${unzip_destination_directory}"
unzip -q -o ${zip_file_destination} -d ${unzip_destination_directory}
mv -f ${unzip_destination_directory}/${inzip_path}/* ${unzip_destination_directory}
rm -fr ${unzip_destination_directory}/${inzip_path}
echo ${db_ts} > ${unzip_destination_directory}/timestamp
echo "Finished unzipping database"

rm -f ${zip_file_destination}
echo "Deleted the downloaded zip"
}
4 changes: 4 additions & 0 deletions scripts/db_downloaders/maxmind_anonymous/config.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
maxmind_anonymous_licence_key="<CHANGE_ME>"
maxmind_anonymous_product="GeoIP2"
maxmind_anonymous_content="Anonymous"
maxmind_anonymous_zip_destination="/Users/stan/www/maxmind"
59 changes: 59 additions & 0 deletions scripts/db_downloaders/maxmind_anonymous/downloader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash

source $(dirname $0)/../functions.sh

init_maxmind_anonymous_download_arguments () {
if [ $# -eq 1 ]; then
if [ -f $1 ]; then
. $1
else
echo "ERROR: passed argument is not a valid file"
exit 1;
fi
elif [ $# -eq 4 ]; then
maxmind_anonymous_product=$1;
maxmind_anonymous_content=$2;
maxmind_anonymous_licence_key=$3;
maxmind_anonymous_zip_destination=$4;
else
echo "ERROR: Improper usage";
echo "Usage 1)";
echo "./location_donwloader.sh config.sh";
echo "Usage 2)";
echo "./location_donwloader.sh <product> <content> <licence_key> <zip_destination>";
exit 1;
fi

if [ -z "$maxmind_anonymous_product" ]; then
echo "ERROR: maxmind_anonymous_product is not defined"
exit 1;
fi;

if [ -z "$maxmind_anonymous_content" ]; then
echo "ERROR: maxmind_anonymous_content is not defined"
exit 1;
fi;

if [ -z "$maxmind_anonymous_licence_key" ]; then
echo "ERROR: maxmind_anonymous_licence_key is not defined"
exit 1;
fi;

if [ -z "$maxmind_anonymous_zip_destination" ] || [ ! -d "$maxmind_anonymous_zip_destination" ]; then
echo "ERROR: maxmind_anonymous_zip_destination needs to be a valid directory"
exit 1;
fi;
}

init_maxmind_anonymous_download_arguments $@

zip_url="https://download.maxmind.com/app/geoip_download?edition_id=${maxmind_anonymous_product}-${maxmind_anonymous_content}-CSV&license_key=${maxmind_anonymous_licence_key}&suffix=zip"
zip_file_destination="${maxmind_anonymous_zip_destination}/${maxmind_anonymous_product}-${maxmind_anonymous_content}-CSV-latest.zip"
unzip_destination_directory="${maxmind_anonymous_zip_destination}/${maxmind_anonymous_product}-${maxmind_anonymous_content}-CSV-latest"


download_maxmind_db ${zip_url} ${zip_file_destination} ${unzip_destination_directory}




4 changes: 4 additions & 0 deletions scripts/db_downloaders/maxmind_location/config.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
maxmind_location_licence_key="<CHANGE_ME>"
maxmind_location_product="GeoIP2"
maxmind_location_content="City"
maxmind_location_zip_destination="./"
59 changes: 59 additions & 0 deletions scripts/db_downloaders/maxmind_location/downloader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash

source $(dirname $0)/../functions.sh

init_maxmind_location_download_arguments () {
if [ $# -eq 1 ]; then
if [ -f $1 ]; then
. $1
else
echo "ERROR: passed argument is not a valid file"
exit 1;
fi
elif [ $# -eq 4 ]; then
maxmind_location_product=$1;
maxmind_location_content=$2;
maxmind_location_licence_key=$3;
maxmind_location_zip_destination=$4;
else
echo "ERROR: Improper usage";
echo "Usage 1)";
echo "./location_donwloader.sh config.sh";
echo "Usage 2)";
echo "./location_donwloader.sh <product> <content> <licence_key> <zip_destination>";
exit 1;
fi

if [ -z "$maxmind_location_product" ]; then
echo "ERROR: maxmind_location_product is not defined"
exit 1;
fi;

if [ -z "$maxmind_location_content" ]; then
echo "ERROR: maxmind_location_content is not defined"
exit 1;
fi;

if [ -z "$maxmind_location_licence_key" ]; then
echo "ERROR: maxmind_location_licence_key is not defined"
exit 1;
fi;

if [ -z "$maxmind_location_zip_destination" ] || [ ! -d "$maxmind_location_zip_destination" ]; then
echo "ERROR: maxmind_location_zip_destination needs to be a valid directory"
exit 1;
fi;
}

init_maxmind_location_download_arguments $@

zip_url="https://download.maxmind.com/app/geoip_download?edition_id=${maxmind_location_product}-${maxmind_location_content}-CSV&license_key=${maxmind_location_licence_key}&suffix=zip"
zip_file_destination="${maxmind_location_zip_destination}/${maxmind_location_product}-${maxmind_location_content}-CSV-latest.zip"
unzip_destination_directory="${maxmind_location_zip_destination}/${maxmind_location_product}-${maxmind_location_content}-CSV-latest"


download_maxmind_db ${zip_url} ${zip_file_destination} ${unzip_destination_directory}




15 changes: 14 additions & 1 deletion src/main/java/technology/dice/dicewhere/api/api/IPResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import technology.dice.dicewhere.api.exceptions.ProviderNotAvailableException;
import technology.dice.dicewhere.building.DatabaseBuilderListener;
import technology.dice.dicewhere.building.IPDatabase;
import technology.dice.dicewhere.lineprocessing.LineProcessor;
import technology.dice.dicewhere.lineprocessing.LineProcessorListener;
import technology.dice.dicewhere.provider.ProviderKey;
import technology.dice.dicewhere.reading.LineReader;
import technology.dice.dicewhere.reading.LineReaderListener;

public class IPResolver {
private static final int DEFAULT_LINE_PROCESSOR_WORKERS_COUNT = 4;
private final Map<ProviderKey, IPDatabase> databases;

private IPResolver(Map<ProviderKey, IPDatabase> databases) {
Expand Down Expand Up @@ -158,6 +160,7 @@ public Map<ProviderKey, Optional<IpInformation>> resolve(@Nonnull IP ip) {
public static class Builder {
private final Map<ProviderKey, LineReader> providers;
private boolean retainOriginalLine = false;
private int workersCount = DEFAULT_LINE_PROCESSOR_WORKERS_COUNT;
private LineReaderListener readerListener = new LineReaderListener() {};
private LineProcessorListener processorListener = new LineProcessorListener() {};
private DatabaseBuilderListener builderListener = new DatabaseBuilderListener() {};
Expand All @@ -166,6 +169,11 @@ public Builder() {
providers = new HashMap<>();
}

public Builder withLineProcessorWorkersCount(int count) {
this.workersCount = count;
return this;
}

public Builder withProvider(@Nonnull LineReader lineReader) {
if (providers.containsKey(Objects.requireNonNull(lineReader).provider())) {
throw new DuplicateProviderException(
Expand Down Expand Up @@ -201,7 +209,12 @@ public IPResolver build() throws IOException {
for (LineReader reader : providers.values()) {
databases.put(
reader.provider(),
reader.read(retainOriginalLine, readerListener, processorListener, builderListener));
reader.read(
retainOriginalLine,
readerListener,
processorListener,
builderListener,
workersCount));
}
return new IPResolver(databases);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;

/**
Expand All @@ -32,18 +31,18 @@
public class LineProcessor implements Runnable {

private static final int WORKER_BATCH_SIZE = 10000;
private static final int WORKER_COUNT = 4;
private final ExecutorService executorService;
private final ArrayBlockingQueue<RawLine> lines;
private final LineParser parser;
private final boolean retainOriginalLine;
private final BlockingQueue<SerializedLine> destination;
private final LineprocessorListenerForProvider progressListener;
private final AtomicBoolean expectingMore = new AtomicBoolean(true);
private final int workersCount;

/**
* @param executorService the executor service used for handling parsing of batches
* @param destination the queue where serializsed lines are written to
* @param destination the queue where serialized lines are written to
* @param parser the parser to use for parsing the line data
* @param retainOriginalLine indicates if the original line data should be retained alongside the
* serialized data
Expand All @@ -54,13 +53,15 @@ public LineProcessor(
BlockingQueue<SerializedLine> destination,
LineParser parser,
boolean retainOriginalLine,
LineprocessorListenerForProvider progressListener) {
this.lines = new ArrayBlockingQueue<>((WORKER_COUNT + 1) * WORKER_BATCH_SIZE);
LineprocessorListenerForProvider progressListener,
int workersCount) {
this.lines = new ArrayBlockingQueue<>((workersCount + 1) * WORKER_BATCH_SIZE);
this.destination = destination;
this.executorService = executorService;
this.parser = parser;
this.retainOriginalLine = retainOriginalLine;
this.progressListener = progressListener;
this.workersCount = workersCount;
}

/**
Expand All @@ -86,15 +87,14 @@ public void addLine(RawLine rawLine) {
/** Runs the processor, parsing the raw line data and serializing it into a suitable form. */
@Override
public void run() {

long started = System.currentTimeMillis();

AtomicLong totalLines = new AtomicLong();
CompletableFuture<List<SerializedLine>>[] workerList = new CompletableFuture[WORKER_COUNT];
CompletableFuture<List<SerializedLine>>[] workerList = new CompletableFuture[workersCount];

while (expectingMore.get() || lines.size() > 0) {
try {
for (int i = 0; i < WORKER_COUNT; i++) {
for (int i = 0; i < workersCount; i++) {
Collection<RawLine> batch = new ArrayList<>(WORKER_BATCH_SIZE);

// Populate the batch from the lines queue
Expand Down Expand Up @@ -151,6 +151,9 @@ private Stream<SerializedLine> attemptParse(RawLine rawLine, long started) {
} catch (LineParsingException e) {
progressListener.parseError(rawLine, e);
return Stream.empty();
} catch (Exception e) {
progressListener.parseError(rawLine, new LineParsingException(e, rawLine));
return Stream.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public final IPDatabase read(
boolean retainOriginalLine,
LineReaderListener readerListener,
LineProcessorListener processListener,
DatabaseBuilderListener buildingListener)
DatabaseBuilderListener buildingListener,
int workersCount)
throws IOException {

long before = System.currentTimeMillis();
Expand All @@ -131,7 +132,8 @@ public final IPDatabase read(
serializedLinesBuffer,
parser(),
retainOriginalLine,
new LineprocessorListenerForProvider(provider(), processListener));
new LineprocessorListenerForProvider(provider(), processListener),
workersCount);
DatabaseBuilder databaseBuilder =
new DatabaseBuilder(provider(), serializedLinesBuffer, buildingListener);

Expand All @@ -153,7 +155,7 @@ public final IPDatabase read(
readerListener.finished(
provider(), databaseBuilder.processedLines(), System.currentTimeMillis() - before);
return databaseBuilder.build();
} catch (InterruptedException | ExecutionException e) {
} catch (Exception e) {
throw new RuntimeException("Line reader read failed", e);
} finally {
parserExecutorService.shutdown();
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/technology/dice/dicewhere/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void finished(
}
});

IPResolver resolver = resolverBuilder.build();
IPResolver resolver = resolverBuilder.withLineProcessorWorkersCount(4).build();

Scanner keyboard = new Scanner(System.in);
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void clean() throws IOException {
DatabaseBuilderListener builderListener = Mockito.mock(DatabaseBuilderListener.class);
DbIpLineReader dbIpReader = new DbIpLocarionAndIspLineReader(path);
IPDatabase database =
dbIpReader.read(false, readerListener, processorListener, builderListener);
dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();
Assert.assertEquals(22, dbSize);
Mockito.verify(readerListener, Mockito.times(1))
Expand Down Expand Up @@ -78,7 +78,7 @@ public void invalidLines() throws IOException {
DatabaseBuilderListener builderListener = Mockito.mock(DatabaseBuilderListener.class);
DbIpLineReader dbIpReader = new DbIpLocarionAndIspLineReader(path);
IPDatabase database =
dbIpReader.read(false, readerListener, processorListener, builderListener);
dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();
Assert.assertEquals(16, dbSize);
Mockito.verify(readerListener, Mockito.times(1))
Expand Down Expand Up @@ -122,7 +122,7 @@ public void outOfOrder() throws IOException {
DatabaseBuilderListener builderListener = Mockito.mock(DatabaseBuilderListener.class);
DbIpLineReader dbIpReader = new DbIpLocarionAndIspLineReader(path);
IPDatabase database =
dbIpReader.read(false, readerListener, processorListener, builderListener);
dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();
Assert.assertEquals(3, dbSize);
Mockito.verify(readerListener, Mockito.times(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void clean() throws Exception {
Path locationNames = getPath("provider/maxmind/GeoLite2-City-Locations-en.csv.zip");
MaxmindDbReader dbIpReader = new MaxmindDbReader(locationNames, v4, v6);

IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener);
IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();

Assert.assertEquals(18, dbSize);
Expand All @@ -51,7 +51,7 @@ public void invalidLines() throws Exception {
Path locationNames = getPath("provider/maxmind/GeoLite2-City-Locations-en.csv.zip");
MaxmindDbReader dbIpReader = new MaxmindDbReader(locationNames, v4, v6);

IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener);
IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();

Assert.assertEquals(11, dbSize);
Expand All @@ -70,7 +70,7 @@ public void outOfOrder() throws Exception {
Path locationNames = getPath("provider/maxmind/GeoLite2-City-Locations-en.csv.zip");
MaxmindDbReader dbIpReader = new MaxmindDbReader(locationNames, v4, v6);

IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener);
IPDatabase database = dbIpReader.read(false, readerListener, processorListener, builderListener, 4);
long dbSize = database.size();

Assert.assertEquals(9, dbSize);
Expand Down

0 comments on commit e4ea76e

Please sign in to comment.