Skip to content

Commit

Permalink
auto-check to see if mongo cache needs to be cleared (#36)
Browse files Browse the repository at this point in the history
* implement all vs all KM and unit test

* dont cache results if no index is loaded

* add function to check if cache needs to be cleared

* update integration test

* maybe fix environment variable

* try to get test to run on appveyor

* remove wait flag from docker-compose

* formatting

* relative path

* remove debug log statements
  • Loading branch information
rmillikin authored May 19, 2022
1 parent 63682cf commit db85fc5
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/indexing/download_abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def bulk_download(ftp_address: str, ftp_dir: str, local_dir: str, n_to_download
empty (zero byte) file written. In this case, the script will re-connect,
remove the empty file, and start downloading files again."""

if n_to_download == 0:
return

# create local directory if it doesn't exist yet
if not os.path.exists(local_dir):
os.mkdir(local_dir)
Expand Down
29 changes: 26 additions & 3 deletions src/indexing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def query_index(self, query: str) -> 'set[int]':
result = self._query_disk(tokens)

if len(result) < 10000 or len(tokens) > 1:
if os.path.exists(self._bin_path):
_place_in_mongo(query, result)
_place_in_mongo(query, result)

self._query_cache[query] = result

Expand Down Expand Up @@ -193,6 +192,31 @@ def _read_bytes_from_disk(self, token: str) -> bytes:
stored_bytes = self.connection.read(byte_len)
return stored_bytes

def _check_if_mongo_should_be_refreshed(self, terms_to_check: 'list[str]' = ['fever']):
# the purpose of this function is to check a non-cached version of a token
# and compare to a cached version of a token. if the two do not produce
# the same result, then the cache is outdated and needs to be cleared.

# the list of terms to check should contain words that are frequent enough
# where we can catch if the cache needs to be updated, but not so frequent
# that they add a lot of overhead to every job.

for item in terms_to_check:
query = item.lower().strip()
mongo_result = _check_mongo_for_query(query)

tokens = util.get_tokens(query)
result = self._query_disk(tokens)

if isinstance(mongo_result, type(None)):
_place_in_mongo(query, result)
continue

if result != mongo_result:
return True

return False

def _intersect_dict_keys(dicts: 'list[dict]'):
lowest_n_keys = sorted(dicts, key=lambda x: len(x))[0]
key_intersect = set(lowest_n_keys.keys())
Expand Down Expand Up @@ -236,7 +260,6 @@ def _check_mongo_for_query(query: str):
return None
else:
return None


def _place_in_mongo(query, result):
if not isinstance(mongo_cache, type(None)):
Expand Down
3 changes: 1 addition & 2 deletions src/indexing/index_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def build_index(self, dump_rate = 300000, overwrite_old = True):
abstract_catalog.catalog.clear() # saves RAM

print('building index...')
indexing.index._empty_mongo()


# build the index
catalog_path = util.get_abstract_catalog(self.path_to_pubmed_abstracts)
cold_storage = dict()
Expand Down
2 changes: 1 addition & 1 deletion src/server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ WORKDIR /fast-km/

## install python package requirements
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN pip install -q -r requirements.txt

## expose port 5000 for web access to container
EXPOSE 5000
Expand Down
93 changes: 93 additions & 0 deletions src/tests/test_container_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pytest
import requests
import time
import os
import shutil
from subprocess import check_output
from indexing import km_util as util
# the tests in this file test the Docker container communication, job
# queuing, mongoDB caching, etc. They require that the API server and workers
# be running with supporting containers in a docker-compose environment.

flask_port = '5099'
api_url = 'http://localhost:' + flask_port
km_append = '/km/api/jobs'
skim_append = '/skim/api/jobs'
clear_cache_append = '/clear_cache/api/jobs'
update_index_append = '/update_index/api/jobs'
the_auth = ('username', 'password')

@pytest.fixture
def data_dir():
return os.path.join(os.getcwd(), "src", "tests", "test_data", "indexer")

def test_container_integration(data_dir, monkeypatch):
# set the pubmed dir for this test
monkeypatch.setenv(name='PUBMED_DIR', value=data_dir.replace(os.getcwd(), '.'))

# use "docker compose" by default, but might need to use "docker-compose" (old syntax)
# depending on the machine this is being run on
docker_compose = 'docker compose'

try:
cmd_output = check_output(docker_compose, shell=True)
except:
#try:
docker_compose = 'docker-compose'
# cmd_output = check_output(docker_compose, shell=True)
#except:
# pytest.skip('skipped; docker compose may not be available on this system')

try:
# remove any old containers
cmd_output = check_output(docker_compose + ' down', shell=True)

# delete the index if it exists already
index_dir = util.get_index_dir(data_dir)
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
assert not os.path.exists(index_dir)

# run the docker containers
time.sleep(1)

if docker_compose == 'docker compose':
cmd_output = check_output(docker_compose + ' up --build --wait', shell=True)
time.sleep(15)
else:
# docker-compose does not have the '--wait' flag
cmd_output = check_output(docker_compose + ' up --build -d', shell=True)
time.sleep(25)

# run query
skim_url = api_url + skim_append
query = {'a_terms': ['cancer'], 'b_terms': ['coffee'], 'c_terms': ['water'], 'ab_fet_threshold': 1, 'top_n': 50}
result = _post_job(skim_url, query)['result']
assert result[0]['total_count'] == 0

# build the index
_post_job(api_url + update_index_append, {'n_files': 0, 'clear_cache': False})

# run query. the new (built) index should be detected, causing the
# cache to auto-clear.
result = _post_job(skim_url, query)['result']
assert result[0]['total_count'] > 4000

except Exception as e:
assert False, str(e)

finally:
cmd_output = check_output(docker_compose + ' down', shell=True)

def _post_job(url, json):
job_id = requests.post(url=url, json=json, auth=the_auth).json()['id']

get_response = requests.get(url + '?id=' + job_id, auth=the_auth).json()
job_status = get_response['status']

while job_status == 'queued' or job_status == 'started':
time.sleep(1)
get_response = requests.get(url + '?id=' + job_id, auth=the_auth).json()
job_status = get_response['status']

return get_response
2 changes: 1 addition & 1 deletion src/workers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ WORKDIR /fast-km/

## install python package requirements
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN pip install -q -r requirements.txt

## Default command on running the container
ENTRYPOINT ["python", "-u", "src/run_worker.py"]
4 changes: 4 additions & 0 deletions src/workers/km_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
from rq import Worker, Queue, Connection
from indexing.index import Index
import workers.loaded_index as li
import time

class KmWorker(Worker):
def __init__(self, queues=None, *args, **kwargs):
super().__init__(queues, *args, **kwargs)

def start_worker():
print('worker sleeping for 5 sec before starting...')
time.sleep(5)

print('starting worker...')

_load_index()
Expand Down
28 changes: 23 additions & 5 deletions src/workers/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
_q = Queue(connection=_r)

def km_work(json: list):
indexing.index._connect_to_mongo()
_initialize_mongo_caching()

return_val = []

if len(json) > 1000000000:
Expand Down Expand Up @@ -51,7 +52,8 @@ def km_work(json: list):
return return_val

def km_work_all_vs_all(json: dict):
indexing.index._connect_to_mongo()
_initialize_mongo_caching()

return_val = []
km_only = False

Expand Down Expand Up @@ -150,7 +152,8 @@ def km_work_all_vs_all(json: dict):
return return_val

def triple_miner_work(json: list):
indexing.index._connect_to_mongo()
_initialize_mongo_caching()

km_set = []

for query in json:
Expand All @@ -175,6 +178,10 @@ def update_index_work(json: dict):
n_files = json['n_files']
else:
n_files = math.inf
if 'clear_cache' in json:
clear_cache = json['clear_cache']
else:
clear_cache = True

# download baseline
print('Checking for files to download...')
Expand Down Expand Up @@ -206,7 +213,9 @@ def update_index_work(json: dict):

# remove the old index
index_builder.overwrite_old_index()
clear_mongo_cache()

if clear_cache:
clear_mongo_cache([])

# re-queue interrupted jobs
_queue_jobs(interrupted_jobs)
Expand All @@ -217,6 +226,15 @@ def clear_mongo_cache(json):
indexing.index._connect_to_mongo()
indexing.index._empty_mongo()

def _initialize_mongo_caching():
indexing.index._connect_to_mongo()
if li.the_index._check_if_mongo_should_be_refreshed():
clear_mongo_cache([])

# this second call looks weird, but it's to cache the terms_to_check
# such as 'fever' to save the current state of the index
li.the_index._check_if_mongo_should_be_refreshed()

def _restart_workers(requeue_interrupted_jobs = True):
print('restarting workers...')
workers = Worker.all(_r)
Expand All @@ -238,7 +256,7 @@ def _restart_workers(requeue_interrupted_jobs = True):
# shut down the worker
rqc.send_shutdown_command(_r, worker.name)

if requeue_interrupted_jobs():
if requeue_interrupted_jobs:
_queue_jobs(interrupted_jobs)

return interrupted_jobs
Expand Down

0 comments on commit db85fc5

Please sign in to comment.