Skip to content

Commit

Permalink
Merge pull request #14 from amymok/refactor-esbody
Browse files Browse the repository at this point in the history
Refactor building of elasticsearch body
  • Loading branch information
amymok authored Jul 10, 2017
2 parents 6e91bd3 + 58977de commit beac47a
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 246 deletions.
270 changes: 270 additions & 0 deletions complaint_search/es_builders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import abc
from collections import defaultdict, namedtuple

class BaseBuilder(object):
__metaclass__ = abc.ABCMeta

# Filters for those with string type
_OPTIONAL_FILTERS = ("product", "issue", "company", "state", "zip_code", "timely",
"company_response", "company_public_response",
"consumer_consent_provided", "submitted_via", "tag", "consumer_disputed")

# Filters for those that need conversion from string to boolean
_OPTIONAL_FILTERS_STRING_TO_BOOL = ("has_narratives",)

_OPTIONAL_FILTERS_PARAM_TO_ES_MAP = {
"product": "product.raw",
"sub_product": "sub_product.raw",
"issue": "issue.raw",
"sub_issue": "sub_issue.raw",
"company_public_response": "company_public_response.raw",
"consumer_consent_provided": "consumer_consent_provided.raw",
"consumer_disputed": "consumer_disputed.raw"
}

_OPTIONAL_FILTERS_CHILD_MAP = {
"product": "sub_product",
"issue": "sub_issue"
}

def __init__(self):
self.params = {}

def add(self, **kwargs):
self.params.update(**kwargs)

@abc.abstractmethod
def build(self):
"""Method that will build the body dictionary."""

def _create_bool_should_clauses(self, es_field_name, value_list,
with_subitems=False, es_subitem_field_name=None):
if value_list:
if not with_subitems:
term_list = [ {"terms": {es_field_name: [value]}}
for value in value_list ]
return {"bool": {"should": term_list}}
else:
item_dict = defaultdict(list)
for v in value_list:
# -*- coding: utf-8 -*-
v_pair = v.split(u'\u2022')
# No subitem
if len(v_pair) == 1:
# This will initialize empty list for item if not in item_dict yet
item_dict[v_pair[0]]
elif len(v_pair) == 2:
# put subproduct into list
item_dict[v_pair[0]].append(v_pair[1])

# Go through item_dict to create filters
f_list = []
for item, subitems in item_dict.iteritems():
item_term = {"terms": {es_field_name: [item]}}
# Item without any subitems
if not subitems:
f_list.append(item_term)
else:
subitem_term = {"terms": {es_subitem_field_name: subitems}}
f_list.append({"and": {"filters": [item_term, subitem_term]}})

return {"bool": {"should": f_list}}

def _create_and_append_bool_should_clauses(self, es_field_name, value_list,
filter_list, with_subitems=False, es_subitem_field_name=None):

filter_clauses = self._create_bool_should_clauses(es_field_name, value_list,
with_subitems, es_subitem_field_name)

if filter_clauses:
filter_list.append(filter_clauses)


class SearchBuilder(BaseBuilder):
def __init__(self):
self.params = {
"format": "json",
"field": "complaint_what_happened",
"size": 10,
"frm": 0,
"sort": "relevance_desc"
}

def build(self):
search = {
"from": self.params.get("frm"),
"size": self.params.get("size"),
"query": {"match_all": {}},
"highlight": {
"fields": {
self.params.get("field"): {}
},
"number_of_fragments": 1,
"fragment_size": 500
}
}

# sort
sort_field, sort_order = self.params.get("sort").rsplit("_", 1)
sort_field = "_score" if sort_field == "relevance" else sort_field
search["sort"] = [{sort_field: {"order": sort_order}}]

# query
if self.params.get("search_term"):
search["query"] = {
"match": {
self.params.get("field"): {
"query": self.params.get("search_term"),
"operator": "and"
}
}
}
else:
search["query"] = {
"query_string": {
"query": "*",
"fields": [
self.params.get("field")
],
"default_operator": "AND"
}
}

return search

class PostFilterBuilder(BaseBuilder):

def build(self):
post_filter = {"and": {"filters": []}}

## date
if self.params.get("min_date") or self.params.get("max_date"):
date_clause = {"range": {"date_received": {}}}
if self.params.get("min_date"):
date_clause["range"]["date_received"]["from"] = self.params.get("min_date")
if self.params.get("max_date"):
date_clause["range"]["date_received"]["to"] = self.params.get("max_date")

post_filter["and"]["filters"].append(date_clause)

## Create bool should clauses for fields in self._OPTIONAL_FILTERS
for field in self._OPTIONAL_FILTERS:
if field in self._OPTIONAL_FILTERS_CHILD_MAP:
self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),
self.params.get(field), post_filter["and"]["filters"], with_subitems=True,
es_subitem_field_name=self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(field),
self._OPTIONAL_FILTERS_CHILD_MAP.get(field)))
else:
self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),
self.params.get(field), post_filter["and"]["filters"])

for field in self._OPTIONAL_FILTERS_STRING_TO_BOOL:
if self.params.get(field):
self._create_and_append_bool_should_clauses(field,
[ 0 if cd.lower() == "no" else 1 for cd in self.params.get(field) ],
post_filter["and"]["filters"])

return post_filter

class AggregationBuilder(BaseBuilder):

def build(self):
# All fields that need to have an aggregation entry
Field = namedtuple('Field', 'name size has_subfield')
fields = [
Field('has_narratives', 10, False),
Field('company', 10000, False),
Field('product', 10000, True),
Field('issue', 10000, True),
Field('state', 50, False),
Field('zip_code', 1000, False),
Field('timely', 10, False),
Field('company_response', 100, False),
Field('company_public_response', 100, False),
Field('consumer_disputed', 100, False),
Field('consumer_consent_provided', 100, False),
Field('tag', 100, False),
Field('submitted_via', 100, False)
]
aggs = {}

# Creating aggregation object for each field above
for field in fields:
field_aggs = {
"filter": {
"and": {
"filters": [

]
}
}
}

es_field_name = self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field.name, field.name)
es_subfield_name = None
if field.has_subfield:
es_subfield_name = self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(field.name))
field_aggs["aggs"] = {
field.name: {
"terms": {
"field": es_field_name,
"size": field.size
},
"aggs": {
es_subfield_name: {
"terms": {
"field": es_subfield_name,
"size": field.size
}
}
}
}
}
else:
field_aggs["aggs"] = {
field.name: {
"terms": {
"field": es_field_name,
"size": field.size
}
}
}

date_filter = {
"range": {
"date_received": {

}
}
}
if "min_date" in self.params:
date_filter["range"]["date_received"]["from"] = self.params["min_date"]
if "max_date" in self.params:
date_filter["range"]["date_received"]["to"] = self.params["max_date"]

field_aggs["filter"]["and"]["filters"].append(date_filter)

# Add filter clauses to aggregation entries (only those that are not the same as field name)
for item in self.params:
if item in self._OPTIONAL_FILTERS and item != field.name:
clauses = self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item),
self.params[item], field_aggs["filter"]["and"]["filters"],
with_subitems=item in self._OPTIONAL_FILTERS_CHILD_MAP,
es_subitem_field_name=self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(item)))
elif item in self._OPTIONAL_FILTERS_STRING_TO_BOOL and item != field.name:
clauses = self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item),
[ 0 if cd.lower() == "no" else 1 for cd in self.params[item] ],
field_aggs["filter"]["and"]["filters"])

aggs[field.name] = field_aggs

return aggs

if __name__ == "__main__":
searchbuilder = SearchBuilder()
print searchbuilder.build()
pfbuilder = PostFilterBuilder()
print pfbuilder.build()
aggbuilder = AggregationBuilder()
print aggbuilder.build()

Loading

0 comments on commit beac47a

Please sign in to comment.