diff --git a/complaint_search/es_builders.py b/complaint_search/es_builders.py new file mode 100644 index 00000000..01bdf3a6 --- /dev/null +++ b/complaint_search/es_builders.py @@ -0,0 +1,270 @@ +import abc +from collections import defaultdict, namedtuple + +class BaseBuilder(object): + __metaclass__ = abc.ABCMeta + + # Filters for those with string type + _OPTIONAL_FILTERS = ("product", "issue", "company", "state", "zip_code", "timely", + "company_response", "company_public_response", + "consumer_consent_provided", "submitted_via", "tag", "consumer_disputed") + + # Filters for those that need conversion from string to boolean + _OPTIONAL_FILTERS_STRING_TO_BOOL = ("has_narratives",) + + _OPTIONAL_FILTERS_PARAM_TO_ES_MAP = { + "product": "product.raw", + "sub_product": "sub_product.raw", + "issue": "issue.raw", + "sub_issue": "sub_issue.raw", + "company_public_response": "company_public_response.raw", + "consumer_consent_provided": "consumer_consent_provided.raw", + "consumer_disputed": "consumer_disputed.raw" + } + + _OPTIONAL_FILTERS_CHILD_MAP = { + "product": "sub_product", + "issue": "sub_issue" + } + + def __init__(self): + self.params = {} + + def add(self, **kwargs): + self.params.update(**kwargs) + + @abc.abstractmethod + def build(self): + """Method that will build the body dictionary.""" + + def _create_bool_should_clauses(self, es_field_name, value_list, + with_subitems=False, es_subitem_field_name=None): + if value_list: + if not with_subitems: + term_list = [ {"terms": {es_field_name: [value]}} + for value in value_list ] + return {"bool": {"should": term_list}} + else: + item_dict = defaultdict(list) + for v in value_list: + # -*- coding: utf-8 -*- + v_pair = v.split(u'\u2022') + # No subitem + if len(v_pair) == 1: + # This will initialize empty list for item if not in item_dict yet + item_dict[v_pair[0]] + elif len(v_pair) == 2: + # put subproduct into list + item_dict[v_pair[0]].append(v_pair[1]) + + # Go through item_dict to create filters + f_list = [] + for item, subitems in item_dict.iteritems(): + item_term = {"terms": {es_field_name: [item]}} + # Item without any subitems + if not subitems: + f_list.append(item_term) + else: + subitem_term = {"terms": {es_subitem_field_name: subitems}} + f_list.append({"and": {"filters": [item_term, subitem_term]}}) + + return {"bool": {"should": f_list}} + + def _create_and_append_bool_should_clauses(self, es_field_name, value_list, + filter_list, with_subitems=False, es_subitem_field_name=None): + + filter_clauses = self._create_bool_should_clauses(es_field_name, value_list, + with_subitems, es_subitem_field_name) + + if filter_clauses: + filter_list.append(filter_clauses) + + +class SearchBuilder(BaseBuilder): + def __init__(self): + self.params = { + "format": "json", + "field": "complaint_what_happened", + "size": 10, + "frm": 0, + "sort": "relevance_desc" + } + + def build(self): + search = { + "from": self.params.get("frm"), + "size": self.params.get("size"), + "query": {"match_all": {}}, + "highlight": { + "fields": { + self.params.get("field"): {} + }, + "number_of_fragments": 1, + "fragment_size": 500 + } + } + + # sort + sort_field, sort_order = self.params.get("sort").rsplit("_", 1) + sort_field = "_score" if sort_field == "relevance" else sort_field + search["sort"] = [{sort_field: {"order": sort_order}}] + + # query + if self.params.get("search_term"): + search["query"] = { + "match": { + self.params.get("field"): { + "query": self.params.get("search_term"), + "operator": "and" + } + } + } + else: + search["query"] = { + "query_string": { + "query": "*", + "fields": [ + self.params.get("field") + ], + "default_operator": "AND" + } + } + + return search + +class PostFilterBuilder(BaseBuilder): + + def build(self): + post_filter = {"and": {"filters": []}} + + ## date + if self.params.get("min_date") or self.params.get("max_date"): + date_clause = {"range": {"date_received": {}}} + if self.params.get("min_date"): + date_clause["range"]["date_received"]["from"] = self.params.get("min_date") + if self.params.get("max_date"): + date_clause["range"]["date_received"]["to"] = self.params.get("max_date") + + post_filter["and"]["filters"].append(date_clause) + + ## Create bool should clauses for fields in self._OPTIONAL_FILTERS + for field in self._OPTIONAL_FILTERS: + if field in self._OPTIONAL_FILTERS_CHILD_MAP: + self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field), + self.params.get(field), post_filter["and"]["filters"], with_subitems=True, + es_subitem_field_name=self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(field), + self._OPTIONAL_FILTERS_CHILD_MAP.get(field))) + else: + self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field), + self.params.get(field), post_filter["and"]["filters"]) + + for field in self._OPTIONAL_FILTERS_STRING_TO_BOOL: + if self.params.get(field): + self._create_and_append_bool_should_clauses(field, + [ 0 if cd.lower() == "no" else 1 for cd in self.params.get(field) ], + post_filter["and"]["filters"]) + + return post_filter + +class AggregationBuilder(BaseBuilder): + + def build(self): + # All fields that need to have an aggregation entry + Field = namedtuple('Field', 'name size has_subfield') + fields = [ + Field('has_narratives', 10, False), + Field('company', 10000, False), + Field('product', 10000, True), + Field('issue', 10000, True), + Field('state', 50, False), + Field('zip_code', 1000, False), + Field('timely', 10, False), + Field('company_response', 100, False), + Field('company_public_response', 100, False), + Field('consumer_disputed', 100, False), + Field('consumer_consent_provided', 100, False), + Field('tag', 100, False), + Field('submitted_via', 100, False) + ] + aggs = {} + + # Creating aggregation object for each field above + for field in fields: + field_aggs = { + "filter": { + "and": { + "filters": [ + + ] + } + } + } + + es_field_name = self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field.name, field.name) + es_subfield_name = None + if field.has_subfield: + es_subfield_name = self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(field.name)) + field_aggs["aggs"] = { + field.name: { + "terms": { + "field": es_field_name, + "size": field.size + }, + "aggs": { + es_subfield_name: { + "terms": { + "field": es_subfield_name, + "size": field.size + } + } + } + } + } + else: + field_aggs["aggs"] = { + field.name: { + "terms": { + "field": es_field_name, + "size": field.size + } + } + } + + date_filter = { + "range": { + "date_received": { + + } + } + } + if "min_date" in self.params: + date_filter["range"]["date_received"]["from"] = self.params["min_date"] + if "max_date" in self.params: + date_filter["range"]["date_received"]["to"] = self.params["max_date"] + + field_aggs["filter"]["and"]["filters"].append(date_filter) + + # Add filter clauses to aggregation entries (only those that are not the same as field name) + for item in self.params: + if item in self._OPTIONAL_FILTERS and item != field.name: + clauses = self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item), + self.params[item], field_aggs["filter"]["and"]["filters"], + with_subitems=item in self._OPTIONAL_FILTERS_CHILD_MAP, + es_subitem_field_name=self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(self._OPTIONAL_FILTERS_CHILD_MAP.get(item))) + elif item in self._OPTIONAL_FILTERS_STRING_TO_BOOL and item != field.name: + clauses = self._create_and_append_bool_should_clauses(self._OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item), + [ 0 if cd.lower() == "no" else 1 for cd in self.params[item] ], + field_aggs["filter"]["and"]["filters"]) + + aggs[field.name] = field_aggs + + return aggs + +if __name__ == "__main__": + searchbuilder = SearchBuilder() + print searchbuilder.build() + pfbuilder = PostFilterBuilder() + print pfbuilder.build() + aggbuilder = AggregationBuilder() + print aggbuilder.build() + diff --git a/complaint_search/es_interface.py b/complaint_search/es_interface.py index 71ed18d7..5c964103 100644 --- a/complaint_search/es_interface.py +++ b/complaint_search/es_interface.py @@ -4,6 +4,7 @@ from collections import defaultdict, namedtuple import requests from elasticsearch import Elasticsearch +from complaint_search.es_builders import SearchBuilder, PostFilterBuilder, AggregationBuilder _ES_URL = "{}://{}:{}".format("http", os.environ.get('ES_HOST', 'localhost'), os.environ.get('ES_PORT', '9200')) @@ -15,29 +16,6 @@ _COMPLAINT_ES_INDEX = os.environ.get('COMPLAINT_ES_INDEX', 'complaint-index') _COMPLAINT_DOC_TYPE = os.environ.get('COMPLAINT_DOC_TYPE', 'complaint-doctype') -# Filters for those with string type -_OPTIONAL_FILTERS = ("product", "issue", "company", "state", "zip_code", "timely", - "company_response", "company_public_response", - "consumer_consent_provided", "submitted_via", "tag", "consumer_disputed") - -# Filters for those that need conversion from string to boolean -_OPTIONAL_FILTERS_STRING_TO_BOOL = ("has_narratives",) - -_OPTIONAL_FILTERS_PARAM_TO_ES_MAP = { - "product": "product.raw", - "sub_product": "sub_product.raw", - "issue": "issue.raw", - "sub_issue": "sub_issue.raw", - "company_public_response": "company_public_response.raw", - "consumer_consent_provided": "consumer_consent_provided.raw", - "consumer_disputed": "consumer_disputed.raw" -} - -_OPTIONAL_FILTERS_CHILD_MAP = { - "product": "sub_product", - "issue": "sub_issue" -} - def get_es(): global _ES_INSTANCE if _ES_INSTANCE is None: @@ -45,143 +23,6 @@ def get_es(): timeout=100) return _ES_INSTANCE - - -def _create_aggregation(**kwargs): - - # All fields that need to have an aggregation entry - Field = namedtuple('Field', 'name size has_subfield') - fields = [ - Field('has_narratives', 10, False), - Field('company', 10000, False), - Field('product', 10000, True), - Field('issue', 10000, True), - Field('state', 50, False), - Field('zip_code', 1000, False), - Field('timely', 10, False), - Field('company_response', 100, False), - Field('company_public_response', 100, False), - Field('consumer_disputed', 100, False), - Field('consumer_consent_provided', 100, False), - Field('tag', 100, False), - Field('submitted_via', 100, False) - ] - aggs = {} - - # Creating aggregation object for each field above - for field in fields: - field_aggs = { - "filter": { - "and": { - "filters": [ - - ] - } - } - } - - es_field_name = _OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field.name, field.name) - es_subfield_name = None - if field.has_subfield: - es_subfield_name = _OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(field.name)) - field_aggs["aggs"] = { - field.name: { - "terms": { - "field": es_field_name, - "size": field.size - }, - "aggs": { - es_subfield_name: { - "terms": { - "field": es_subfield_name, - "size": field.size - } - } - } - } - } - else: - field_aggs["aggs"] = { - field.name: { - "terms": { - "field": es_field_name, - "size": field.size - } - } - } - - date_filter = { - "range": { - "date_received": { - - } - } - } - if "min_date" in kwargs: - date_filter["range"]["date_received"]["from"] = kwargs["min_date"] - if "max_date" in kwargs: - date_filter["range"]["date_received"]["to"] = kwargs["max_date"] - - field_aggs["filter"]["and"]["filters"].append(date_filter) - - # Add filter clauses to aggregation entries (only those that are not the same as field name) - for item in kwargs: - if item in _OPTIONAL_FILTERS and item != field.name: - clauses = _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item), - kwargs[item], field_aggs["filter"]["and"]["filters"], - with_subitems=item in _OPTIONAL_FILTERS_CHILD_MAP, - es_subitem_field_name=_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(item))) - elif item in _OPTIONAL_FILTERS_STRING_TO_BOOL and item != field.name: - clauses = _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item), - [ 0 if cd.lower() == "no" else 1 for cd in kwargs[item] ], - field_aggs["filter"]["and"]["filters"]) - - aggs[field.name] = field_aggs - - return aggs - -def _create_bool_should_clauses(es_field_name, value_list, - with_subitems=False, es_subitem_field_name=None): - if value_list: - if not with_subitems: - term_list = [ {"terms": {es_field_name: [value]}} - for value in value_list ] - return {"bool": {"should": term_list}} - else: - item_dict = defaultdict(list) - for v in value_list: - # -*- coding: utf-8 -*- - v_pair = v.split(u'\u2022') - # No subitem - if len(v_pair) == 1: - # This will initialize empty list for item if not in item_dict yet - item_dict[v_pair[0]] - elif len(v_pair) == 2: - # put subproduct into list - item_dict[v_pair[0]].append(v_pair[1]) - - # Go through item_dict to create filters - f_list = [] - for item, subitems in item_dict.iteritems(): - item_term = {"terms": {es_field_name: [item]}} - # Item without any subitems - if not subitems: - f_list.append(item_term) - else: - subitem_term = {"terms": {es_subitem_field_name: subitems}} - f_list.append({"and": {"filters": [item_term, subitem_term]}}) - - return {"bool": {"should": f_list}} - -def _create_and_append_bool_should_clauses(es_field_name, value_list, - filter_list, with_subitems=False, es_subitem_field_name=None): - - filter_clauses = _create_bool_should_clauses(es_field_name, value_list, - with_subitems, es_subitem_field_name) - - if filter_clauses: - filter_list.append(filter_clauses) - # List of possible arguments: # - format: format to be returned: "json", "csv", "xls", or "xlsx" # - field: field you want to search in: "complaint_what_happened", "company_public_response", "_all" @@ -206,96 +47,26 @@ def _create_and_append_bool_should_clauses(es_field_name, value_list, # - tag - filters a list of tags def search(**kwargs): - # base default parameters - params = { - "format": "json", - "field": "complaint_what_happened", - "size": 10, - "frm": 0, - "sort": "relevance_desc" - } - - params.update(**kwargs) - res = None - body = { - "from": params.get("frm"), - "size": params.get("size"), - "query": {"match_all": {}}, - "highlight": { - "fields": { - params.get("field"): {} - }, - "number_of_fragments": 1, - "fragment_size": 500 - } - } - - # sort - sort_field, sort_order = params.get("sort").rsplit("_", 1) - sort_field = "_score" if sort_field == "relevance" else sort_field - body["sort"] = [{sort_field: {"order": sort_order}}] - - # query - if params.get("search_term"): - body["query"] = { - "match": { - params.get("field"): { - "query": params.get("search_term"), - "operator": "and" - } - } - } - else: - body["query"] = { - "query_string": { - "query": "*", - "fields": [ - params.get("field") - ], - "default_operator": "AND" - } - } - - # post-filter - body["post_filter"] = {"and": {"filters": []}} + search_builder = SearchBuilder() + search_builder.add(**kwargs) + body = search_builder.build() - - ## date - if params.get("min_date") or params.get("max_date"): - date_clause = {"range": {"date_received": {}}} - if params.get("min_date"): - date_clause["range"]["date_received"]["from"] = params.get("min_date") - if params.get("max_date"): - date_clause["range"]["date_received"]["to"] = params.get("max_date") - - body["post_filter"]["and"]["filters"].append(date_clause) - - - ## Create bool should clauses for fields in _OPTIONAL_FILTERS - for field in _OPTIONAL_FILTERS: - if field in _OPTIONAL_FILTERS_CHILD_MAP: - _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field), - params.get(field), body["post_filter"]["and"]["filters"], with_subitems=True, - es_subitem_field_name=_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(field), - _OPTIONAL_FILTERS_CHILD_MAP.get(field))) - else: - _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field), - params.get(field), body["post_filter"]["and"]["filters"]) - - for field in _OPTIONAL_FILTERS_STRING_TO_BOOL: - if params.get(field): - _create_and_append_bool_should_clauses(field, - [ 0 if cd.lower() == "no" else 1 for cd in params.get(field) ], - body["post_filter"]["and"]["filters"]) + post_filter_builder = PostFilterBuilder() + post_filter_builder.add(**kwargs) + body["post_filter"] = post_filter_builder.build() # format - if params.get("format") == "json": - ## Create base aggregation - body["aggs"] = _create_aggregation(**kwargs) + res = None + format = kwargs.get("format", "json") + if format == "json": + aggregation_builder = AggregationBuilder() + aggregation_builder.add(**kwargs) + body["aggs"] = aggregation_builder.build() + res = get_es().search(index=_COMPLAINT_ES_INDEX, body=body) - elif params.get("format") in ("csv", "xls", "xlsx"): - p = {"format": params.get("format"), - "source": json.dumps(body)} + + elif format in ("csv", "xls", "xlsx"): + p = {"format": format, "source": json.dumps(body)} p = urllib.urlencode(p) url = "{}/{}/{}/_data?{}".format(_ES_URL, _COMPLAINT_ES_INDEX, _COMPLAINT_DOC_TYPE, p)