Skip to content

Commit

Permalink
#4 mapreduce
Browse files Browse the repository at this point in the history
  • Loading branch information
ojixzzz committed Apr 6, 2017
1 parent 5617268 commit 5778ee1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
23 changes: 20 additions & 3 deletions ommongo/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def __init__(self, type, session, exclude_subclasses=False):
self._raw_output = False
self._search = False
self._createIndex = None
self._aggregate = False
self._rawquery = False
self._query_type = None
self._mapreduce_mapper = None
self._mapreduce_reducer = None
self._mapreduce_key = None
self._mapreduce_query = None

def __iter__(self):
return self.__get_query_result()
Expand All @@ -68,7 +73,7 @@ def __iter__(self):
def query(self):
""" The mongo query object which would be executed if this Query
object were used """
if self._aggregate==True:
if self._rawquery==True:
return self.__query
return flatten(self.__query)

Expand Down Expand Up @@ -365,11 +370,23 @@ def search(self, value, createIndex=None):

def aggregate(self, raw_query):

self._aggregate = True
self._rawquery = True
self._query_type = 'aggregate'
self.__query = raw_query
self._raw_output = True
return self.__get_query_result().cursor

def map_reduce(self, mapper, reducer, key, query):

self._rawquery = True
self._query_type = 'map_reduce'
self._mapreduce_query = query
self._mapreduce_mapper = mapper
self._mapreduce_reducer = reducer
self._mapreduce_key = key
self._raw_output = True
return self.__get_query_result().cursor

def find_and_modify(self, new=False, remove=False):
''' The mongo "find and modify" command. Behaves like an update expression
in that "execute" must be called to do the update and return the
Expand Down
7 changes: 5 additions & 2 deletions ommongo/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,11 @@ def execute_query(self, query, session):
raise InvalidConfigException()
cursor = collection.find(query.query, {'__index_score': {'$meta': "textScore"}}, **kwargs)
cursor.sort([('__index_score', {'$meta': 'textScore'})])
elif query._aggregate:
cursor = collection.aggregate(query.query, **kwargs)
elif query._rawquery:
if query._query_type=='aggregate':
cursor = collection.aggregate(query.query, **kwargs)
elif query._query_type=='map_reduce':
cursor = collection.map_reduce( query._mapreduce_mapper, query._mapreduce_reducer, query._mapreduce_key, query=query._mapreduce_query)
else:
cursor = collection.find(query.query, **kwargs)

Expand Down

0 comments on commit 5778ee1

Please sign in to comment.