Skip to content

Commit

Permalink
Merge pull request #95 from n1analytics/release-0.8.0
Browse files Browse the repository at this point in the history
Release 0.8.0
  • Loading branch information
mhsiah authored Apr 18, 2018
2 parents d5473c1 + a1ee48a commit 23a7be7
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 83 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
0.8.0
-----

Fix to greedy solver, so that mappings are set by the first match, not repeatedly overwritten. #89

Other improvements
~~~~~~~~~~~~~~~~~~

- Order of k and threshold parameters now consistent across library
- Limit size of `k` to prevent OOM DoS
- Fix misaligned pointer handling #77

0.7.1
-----
Removed the default values for the threshold and "top k results" parameters
throughout as these parameters should always be determined by the requirements
at the call site. This modifies the API of the functions
`entitymatch.{*filter_similarity*,calculate_mapping_greedy}`,
`distributed_processing.calculate_filter_similarity` and
`network_flow.map_entities` by requiring the values of `k` and `threshold` to
be specified in every case.

0.7.0
-----

Expand Down
97 changes: 84 additions & 13 deletions _cffi_build/dice_one_against_many.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <memory>
#include <functional>
#include <algorithm>
#include <vector>
#include <queue>
Expand Down Expand Up @@ -260,7 +262,7 @@ class Node {

struct score_cmp {
bool operator()(const Node& a, const Node& b) const {
return a.score > b.score;
return a.score >= b.score;
}
};

Expand Down Expand Up @@ -293,6 +295,66 @@ abs_diff(uint32_t a, uint32_t b) {
}


typedef const uint64_t word_tp;
typedef std::function<void (word_tp *)> deleter_fn;
typedef std::unique_ptr<word_tp, deleter_fn> word_ptr;

/**
* Given a char pointer ptr pointing to nbytes bytes, return a
* std::unique_ptr to uint64_t containing those same byte values
* (using the host's byte order, i.e. no byte reordering is done).
*
* The main purpose of this function is to fix pointer alignment
* issues when converting from a char pointer to a uint64 pointer; the
* issue being that a uint64 pointer has stricter alignment
* requirements than a char pointer.
*
* We assume that releasing the memory associated with ptr is someone
* else's problem. So, if ptr is already correctly aligned, we just
* cast it to uint64 and return a unique_ptr whose deleter is
* empty. If ptr is misaligned, then we copy the nbytes at ptr to a
* location that is correctly aligned and then return a unique_ptr
* whose deleter will delete that allocated space when the unique_ptr
* goes out of scope.
*
* NB: ASSUMES nbytes is divisible by WORD_BYTES.
*
* TODO: On some architectures it might be advantageous to have
* 16-byte aligned memory, even when the words used are only 8 bytes
* (this is to do with cache line size). This function could be easily
* modified to allow experimentation with different alignments.
*/
static word_ptr
adjust_ptr_alignment(const char *ptr, size_t nbytes) {
static constexpr size_t wordbytes = sizeof(word_tp);
size_t nwords = nbytes / wordbytes;
uintptr_t addr = reinterpret_cast<uintptr_t>(ptr);
// ptr is correctly aligned if addr is divisible by wordbytes
if (addr % wordbytes != 0) {
// ptr is NOT correctly aligned

// copy_words has correct alignment
uint64_t *copy_words = new uint64_t[nwords];
// alias copy_words with a byte pointer; the cast safe because
// uint8_t alignment is less restrictive than uint64_t
uint8_t *copy_bytes = reinterpret_cast<uint8_t *>(copy_words);
// copy everything across
std::copy(ptr, ptr + nbytes, copy_bytes);
// return a unique_ptr with array delete to delete copy_words
auto array_delete = [] (word_tp *p) { delete[] p; };
return word_ptr(copy_words, array_delete);
} else {
// ptr IS correctly aligned

// safe cast because the address of ptr is wordbyte-aligned.
word_tp *same = reinterpret_cast<word_tp *>(ptr);
// don't delete backing array since we don't own it
auto empty_delete = [] (word_tp *) { };
return word_ptr(same, empty_delete);
}
}


extern "C"
{
/**
Expand All @@ -315,9 +377,11 @@ extern "C"
const char *arrays, int narrays, int array_bytes) {
// assumes WORD_BYTES divides array_bytes
int nwords = array_bytes / WORD_BYTES;
const uint64_t *u = reinterpret_cast<const uint64_t *>(arrays);
// The static_cast is to avoid int overflow in the multiplication
size_t total_bytes = static_cast<size_t>(narrays) * array_bytes;
auto ptr = adjust_ptr_alignment(arrays, total_bytes);
auto u = ptr.get();

// assumes WORD_PER_POPCOUNT divides nwords
clock_t t = clock();
switch (nwords) {
case 32: _popcount_arrays<32>(counts, u, narrays); break;
Expand All @@ -341,13 +405,14 @@ extern "C"
const char *array1,
const char *array2,
int array_bytes) {
const uint64_t *u, *v;
uint32_t u_popc, v_popc;
// assumes WORD_BYTES divides array_bytes
int nwords = array_bytes / WORD_BYTES;

u = reinterpret_cast<const uint64_t *>(array1);
v = reinterpret_cast<const uint64_t *>(array2);
auto ptr_u = adjust_ptr_alignment(array1, array_bytes);
auto ptr_v = adjust_ptr_alignment(array2, array_bytes);
auto u = ptr_u.get();
auto v = ptr_v.get();

// If the popcount of one of the arrays is zero, then the
// popcount of the "intersection" (logical AND) will be zero,
Expand Down Expand Up @@ -381,8 +446,12 @@ extern "C"
if (keybytes % WORD_BYTES != 0)
return -1;
int keywords = keybytes / WORD_BYTES;
const uint64_t *comp1 = reinterpret_cast<const uint64_t *>(one);
const uint64_t *comp2 = reinterpret_cast<const uint64_t *>(many);
// The static_cast is to avoid int overflow in the multiplication
size_t total_bytes = static_cast<size_t>(n) * keybytes;
auto ptr_comp1 = adjust_ptr_alignment(one, total_bytes);
auto ptr_comp2 = adjust_ptr_alignment(many, total_bytes);
auto comp1 = ptr_comp1.get();
auto comp2 = ptr_comp2.get();

// Here we create top_k_scores on the stack by providing it
// with a vector in which to put its elements. We do this so
Expand Down Expand Up @@ -437,15 +506,17 @@ extern "C"
}
}

int i = 0;
while ( ! top_k_scores.empty()) {
// Copy the scores and indices in reverse order so that the
// best match is at index 0 and the worst is at index
// top_k_scores.size()-1.
int nscores = top_k_scores.size();
for (int i = top_k_scores.size() - 1; i >= 0; --i) {
scores[i] = top_k_scores.top().score;
indices[i] = top_k_scores.top().index;
// Popping the top element is O(log(k))!
top_k_scores.pop();
i += 1;
}

return i;
assert(top_k_scores.empty());
return nscores;
}
}
41 changes: 29 additions & 12 deletions anonlink/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ def compute_popcount_speed(n):

# Python
popcounts, elapsed_time = popcount_vector(clks, use_python=True)
python_speed_in_MiB = clks_MiB / elapsed_time
python_Mops = n / (1e6 * elapsed_time)

# Guard against division by zero (have observed elapsed_nocopy ==
# 0 in the wild; see issue #92).
inv_elapsed_time = 1 / elapsed_time if elapsed_time > 0 else float('inf')

python_speed_in_MiB = clks_MiB * inv_elapsed_time
python_Mops = (n / 1e6) * inv_elapsed_time

elapsed_time_ms = elapsed_time * 1e3
print("Python (bitarray.count()): | {:7.2f} | {:9.2f} | {:7.2f}"
.format(elapsed_time_ms, python_speed_in_MiB, python_Mops))
Expand All @@ -39,13 +45,19 @@ def compute_popcount_speed(n):
popcounts, elapsed_nocopy = popcount_vector(clks, use_python=False)
end = timer()
elapsed_time = end - start
copy_percent = 100*(elapsed_time - elapsed_nocopy) / elapsed_time

# Guard against division by zero (have observed elapsed_nocopy ==
# 0 in the wild; see issue #92).
inv_elapsed_time = 1 / elapsed_time if elapsed_time > 0 else float('inf')
inv_elapsed_nocopy = 1 / elapsed_nocopy if elapsed_nocopy > 0 else float('inf')

copy_percent = 100*(elapsed_time - elapsed_nocopy) * inv_elapsed_time
elapsed_time_ms = elapsed_time * 1e3
elapsed_nocopy_ms = elapsed_nocopy * 1e3
native_speed_in_MiB = clks_MiB / elapsed_time
native_speed_in_MiB_nocopy = clks_MiB / elapsed_nocopy
native_Mops = n / (1e6 * elapsed_time)
native_Mops_nocopy = n / (1e6 * elapsed_nocopy)
native_speed_in_MiB = clks_MiB * inv_elapsed_time
native_speed_in_MiB_nocopy = clks_MiB * inv_elapsed_nocopy
native_Mops = (n / 1e6) * inv_elapsed_time
native_Mops_nocopy = (n / 1e6) * inv_elapsed_nocopy
print("Native code (no copy): | {:7.2f} | {:9.2f} | {:7.2f}"
.format(elapsed_nocopy_ms, native_speed_in_MiB_nocopy, native_Mops_nocopy))
print("Native code (w/ copy): | {:7.2f} | {:9.2f} | {:7.2f} ({:.1f}% copying)"
Expand All @@ -66,25 +78,30 @@ def compute_comparison_speed(n1, n2, threshold):
Using the greedy solver, how fast can hashes be computed using one core.
"""

assert n1 != 0 and n2 != 0
filters1 = [some_filters[random.randrange(0, 8000)] for _ in range(n1)]
filters2 = [some_filters[random.randrange(2000, 10000)] for _ in range(n2)]

start = timer()
sparse_matrix = calculate_filter_similarity(filters1, filters2, k=len(filters2), threshold=threshold)
sparse_matrix = calculate_filter_similarity(filters1, filters2, len(filters2), threshold)
t1 = timer()
res = greedy_solver(sparse_matrix)
end = timer()

similarity_time = t1 - start
solver_time = end - t1
elapsed_time = end - start

inv_similarity_time = 1 / similarity_time if similarity_time > 0 else float('inf')
inv_elapsed_time = 1 / elapsed_time if elapsed_time > 0 else float('inf')

print("{:6d} | {:6d} | {:4d}e6 ({:5.2f}%) | {:6.3f} ({:4.1f}% / {:4.1f}%) | {:8.3f}".format(
n1, n2, n1*n2 // 1000000,
100.0*len(sparse_matrix)/(n1*n2),
elapsed_time,
100.0*similarity_time/elapsed_time,
100.0*solver_time/elapsed_time,
(n1*n2)/(1e6*similarity_time)))
100.0*similarity_time * inv_elapsed_time,
100.0*solver_time * inv_elapsed_time,
(n1*n2 / 1e6) * inv_similarity_time))
return elapsed_time


Expand All @@ -109,7 +126,7 @@ def compare_python_c(ntotal=10000, nsubset=6000, frac=0.8):

# Pure Python version
start = timer()
result = python_filter_similarity(filters1, filters2)
result = python_filter_similarity(filters1, filters2, 1, 0.0)
end = timer()
python_time = end - start

Expand Down
5 changes: 2 additions & 3 deletions anonlink/distributed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@


def calc_chunk_result(chunk_number, chunk, filters2, k, threshold):
chunk_results = anonlink.entitymatch.calculate_filter_similarity(chunk, filters2,
k=k, threshold=threshold)
chunk_results = anonlink.entitymatch.calculate_filter_similarity(chunk, filters2, k, threshold)

partial_sparse_result = []
# offset chunk's A index by chunk_size * chunk_number
Expand All @@ -23,7 +22,7 @@ def calc_chunk_result(chunk_number, chunk, filters2, k, threshold):
return partial_sparse_result


def calculate_filter_similarity(filters1, filters2, k=10, threshold=0.5):
def calculate_filter_similarity(filters1, filters2, k, threshold):
"""
Example way of computing similarity scores in parallel.
Expand Down
51 changes: 29 additions & 22 deletions anonlink/entitymatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,34 @@
from anonlink._entitymatcher import ffi, lib

import sys
from operator import itemgetter

from . import bloommatcher as bm
from . import util

log = logging.getLogger('anonlink.entitymatch')


def python_filter_similarity(filters1, filters2):
def python_filter_similarity(filters1, filters2, k, threshold):
"""Pure python method for determining Bloom Filter similarity
Both arguments are 3-tuples - bitarray with bloom filter for record, index of record, bitcount
:return: A list of tuples *one* for each entity in filters1.
:return: A list of tuples *k* for each entity in filters1.
The tuple comprises:
- the index in filters1
- the similarity score between 0 and 1 of the best match
- the similarity score between 0 and 1 of the k matches above threshold
- The index in filters2 of the best match
"""
result = []
for i, f1 in enumerate(filters1):
coeffs = [bm.dicecoeff_precount(f1[0], x[0], float(f1[2] + x[2])) for x in filters2]
# argmax
best = max(enumerate(coeffs), key=lambda x: x[1])[0]
assert coeffs[best] <= 1.0
result.append((i, coeffs[best], best))
def dicecoeff(x):
return bm.dicecoeff_precount(f1[0], x[0], float(f1[2] + x[2]))

coeffs = filter(lambda c: c[1] >= threshold,
enumerate(map(dicecoeff, filters2)))
top_k = sorted(coeffs, key=itemgetter(1), reverse=True)[:k]
result.extend([(i, coeff, j) for j, coeff in top_k])
return result


Expand All @@ -44,8 +47,13 @@ def cffi_filter_similarity_k(filters1, filters2, k, threshold):
if length_f1 == 0:
return []

# There's no sense in having k > length_f2. Also, k is passed to
# ffi.new(...) below, so we need to protect against an
# out-of-memory DoS if k is of untrustworthy origin.
k = min(k, length_f2)

filter_bits = len(filters1[0][0])
assert(filter_bits % 64 == 0, 'Filter length must be a multple of 64 bits.')
assert filter_bits % 64 == 0, 'Filter length must be a multple of 64 bits.'
filter_bytes = filter_bits // 8

match_one_against_many_dice_k_top = lib.match_one_against_many_dice_k_top
Expand Down Expand Up @@ -104,21 +112,20 @@ def greedy_solver(sparse_similarity_matrix):
:param sparse_similarity_matrix: Iterable of tuples: (indx_a, similarity_score, indx_b)
"""
mappings = {}
mapping = {}

# original indicies of filters which have been claimed
matched_entries_b = set()
# Indices of filters which have been claimed
matched = set()

for result in sparse_similarity_matrix:
index_a, score, possible_index_b = result
if possible_index_b not in matched_entries_b:
mappings[index_a] = possible_index_b
matched_entries_b.add(possible_index_b)
for i, score, j in sparse_similarity_matrix:
if i not in mapping and j not in matched:
mapping[i] = j
matched.add(j)

return mappings
return mapping


def calculate_mapping_greedy(filters1, filters2, threshold=0.95, k=5):
def calculate_mapping_greedy(filters1, filters2, k, threshold):
"""
Brute-force one-shot solver.
Expand All @@ -136,7 +143,7 @@ def calculate_mapping_greedy(filters1, filters2, threshold=0.95, k=5):
return greedy_solver(sparse_matrix)


def calculate_filter_similarity(filters1, filters2, k=10, threshold=0.5, use_python=False):
def calculate_filter_similarity(filters1, filters2, k, threshold, use_python=False):
"""Computes a sparse similarity matrix with:
- size no larger than k * len(filters1)
- order of len(filters1) + len(filters2)
Expand Down Expand Up @@ -167,7 +174,7 @@ def calculate_filter_similarity(filters1, filters2, k=10, threshold=0.5, use_pyt
raise ValueError("Didn't meet minimum number of entities")
# use C++ version by default
if use_python:
return python_filter_similarity(filters1, filters2)
return python_filter_similarity(filters1, filters2, k, threshold)
else:
return cffi_filter_similarity_k(filters1, filters2, k=k, threshold=threshold)
return cffi_filter_similarity_k(filters1, filters2, k, threshold)

2 changes: 1 addition & 1 deletion anonlink/network_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_score(node_name):
return entity_map


def map_entities(weights, threshold=0.8, method=None):
def map_entities(weights, threshold, method=None):
"""Calculate a dictionary mapping using similarity scores.
:param weights: The list of tuples including n-gram similarity scores
Expand Down
Loading

0 comments on commit 23a7be7

Please sign in to comment.