diff --git a/src/indexing/download_abstracts.py b/src/indexing/download_abstracts.py index 5629fbe..25896cb 100755 --- a/src/indexing/download_abstracts.py +++ b/src/indexing/download_abstracts.py @@ -85,6 +85,9 @@ def bulk_download(ftp_address: str, ftp_dir: str, local_dir: str, n_to_download empty (zero byte) file written. In this case, the script will re-connect, remove the empty file, and start downloading files again.""" + if n_to_download == 0: + return + # create local directory if it doesn't exist yet if not os.path.exists(local_dir): os.mkdir(local_dir) diff --git a/src/indexing/index.py b/src/indexing/index.py index 360c10f..30345bf 100644 --- a/src/indexing/index.py +++ b/src/indexing/index.py @@ -54,8 +54,7 @@ def query_index(self, query: str) -> 'set[int]': result = self._query_disk(tokens) if len(result) < 10000 or len(tokens) > 1: - if os.path.exists(self._bin_path): - _place_in_mongo(query, result) + _place_in_mongo(query, result) self._query_cache[query] = result @@ -193,6 +192,31 @@ def _read_bytes_from_disk(self, token: str) -> bytes: stored_bytes = self.connection.read(byte_len) return stored_bytes + def _check_if_mongo_should_be_refreshed(self, terms_to_check: 'list[str]' = ['fever']): + # the purpose of this function is to check a non-cached version of a token + # and compare to a cached version of a token. if the two do not produce + # the same result, then the cache is outdated and needs to be cleared. + + # the list of terms to check should contain words that are frequent enough + # where we can catch if the cache needs to be updated, but not so frequent + # that they add a lot of overhead to every job. + + for item in terms_to_check: + query = item.lower().strip() + mongo_result = _check_mongo_for_query(query) + + tokens = util.get_tokens(query) + result = self._query_disk(tokens) + + if isinstance(mongo_result, type(None)): + _place_in_mongo(query, result) + continue + + if result != mongo_result: + return True + + return False + def _intersect_dict_keys(dicts: 'list[dict]'): lowest_n_keys = sorted(dicts, key=lambda x: len(x))[0] key_intersect = set(lowest_n_keys.keys()) @@ -236,7 +260,6 @@ def _check_mongo_for_query(query: str): return None else: return None - def _place_in_mongo(query, result): if not isinstance(mongo_cache, type(None)): diff --git a/src/indexing/index_builder.py b/src/indexing/index_builder.py index e6b3da8..15dd1bd 100755 --- a/src/indexing/index_builder.py +++ b/src/indexing/index_builder.py @@ -20,8 +20,7 @@ def build_index(self, dump_rate = 300000, overwrite_old = True): abstract_catalog.catalog.clear() # saves RAM print('building index...') - indexing.index._empty_mongo() - + # build the index catalog_path = util.get_abstract_catalog(self.path_to_pubmed_abstracts) cold_storage = dict() diff --git a/src/server/Dockerfile b/src/server/Dockerfile index 0a3cba8..3f33111 100755 --- a/src/server/Dockerfile +++ b/src/server/Dockerfile @@ -8,7 +8,7 @@ WORKDIR /fast-km/ ## install python package requirements RUN pip install --upgrade pip -RUN pip install -r requirements.txt +RUN pip install -q -r requirements.txt ## expose port 5000 for web access to container EXPOSE 5000 diff --git a/src/tests/test_container_integration.py b/src/tests/test_container_integration.py new file mode 100644 index 0000000..b5372cf --- /dev/null +++ b/src/tests/test_container_integration.py @@ -0,0 +1,93 @@ +import pytest +import requests +import time +import os +import shutil +from subprocess import check_output +from indexing import km_util as util +# the tests in this file test the Docker container communication, job +# queuing, mongoDB caching, etc. They require that the API server and workers +# be running with supporting containers in a docker-compose environment. + +flask_port = '5099' +api_url = 'http://localhost:' + flask_port +km_append = '/km/api/jobs' +skim_append = '/skim/api/jobs' +clear_cache_append = '/clear_cache/api/jobs' +update_index_append = '/update_index/api/jobs' +the_auth = ('username', 'password') + +@pytest.fixture +def data_dir(): + return os.path.join(os.getcwd(), "src", "tests", "test_data", "indexer") + +def test_container_integration(data_dir, monkeypatch): + # set the pubmed dir for this test + monkeypatch.setenv(name='PUBMED_DIR', value=data_dir.replace(os.getcwd(), '.')) + + # use "docker compose" by default, but might need to use "docker-compose" (old syntax) + # depending on the machine this is being run on + docker_compose = 'docker compose' + + try: + cmd_output = check_output(docker_compose, shell=True) + except: + #try: + docker_compose = 'docker-compose' + # cmd_output = check_output(docker_compose, shell=True) + #except: + # pytest.skip('skipped; docker compose may not be available on this system') + + try: + # remove any old containers + cmd_output = check_output(docker_compose + ' down', shell=True) + + # delete the index if it exists already + index_dir = util.get_index_dir(data_dir) + if os.path.exists(index_dir): + shutil.rmtree(index_dir) + assert not os.path.exists(index_dir) + + # run the docker containers + time.sleep(1) + + if docker_compose == 'docker compose': + cmd_output = check_output(docker_compose + ' up --build --wait', shell=True) + time.sleep(15) + else: + # docker-compose does not have the '--wait' flag + cmd_output = check_output(docker_compose + ' up --build -d', shell=True) + time.sleep(25) + + # run query + skim_url = api_url + skim_append + query = {'a_terms': ['cancer'], 'b_terms': ['coffee'], 'c_terms': ['water'], 'ab_fet_threshold': 1, 'top_n': 50} + result = _post_job(skim_url, query)['result'] + assert result[0]['total_count'] == 0 + + # build the index + _post_job(api_url + update_index_append, {'n_files': 0, 'clear_cache': False}) + + # run query. the new (built) index should be detected, causing the + # cache to auto-clear. + result = _post_job(skim_url, query)['result'] + assert result[0]['total_count'] > 4000 + + except Exception as e: + assert False, str(e) + + finally: + cmd_output = check_output(docker_compose + ' down', shell=True) + +def _post_job(url, json): + job_id = requests.post(url=url, json=json, auth=the_auth).json()['id'] + + get_response = requests.get(url + '?id=' + job_id, auth=the_auth).json() + job_status = get_response['status'] + + while job_status == 'queued' or job_status == 'started': + time.sleep(1) + get_response = requests.get(url + '?id=' + job_id, auth=the_auth).json() + job_status = get_response['status'] + + return get_response \ No newline at end of file diff --git a/src/workers/Dockerfile b/src/workers/Dockerfile index 3a05052..12b06ce 100755 --- a/src/workers/Dockerfile +++ b/src/workers/Dockerfile @@ -8,7 +8,7 @@ WORKDIR /fast-km/ ## install python package requirements RUN pip install --upgrade pip -RUN pip install -r requirements.txt +RUN pip install -q -r requirements.txt ## Default command on running the container ENTRYPOINT ["python", "-u", "src/run_worker.py"] diff --git a/src/workers/km_worker.py b/src/workers/km_worker.py index 6c7c69d..415910f 100644 --- a/src/workers/km_worker.py +++ b/src/workers/km_worker.py @@ -2,12 +2,16 @@ from rq import Worker, Queue, Connection from indexing.index import Index import workers.loaded_index as li +import time class KmWorker(Worker): def __init__(self, queues=None, *args, **kwargs): super().__init__(queues, *args, **kwargs) def start_worker(): + print('worker sleeping for 5 sec before starting...') + time.sleep(5) + print('starting worker...') _load_index() diff --git a/src/workers/work.py b/src/workers/work.py index 4c83714..9acfc73 100644 --- a/src/workers/work.py +++ b/src/workers/work.py @@ -14,7 +14,8 @@ _q = Queue(connection=_r) def km_work(json: list): - indexing.index._connect_to_mongo() + _initialize_mongo_caching() + return_val = [] if len(json) > 1000000000: @@ -51,7 +52,8 @@ def km_work(json: list): return return_val def km_work_all_vs_all(json: dict): - indexing.index._connect_to_mongo() + _initialize_mongo_caching() + return_val = [] km_only = False @@ -150,7 +152,8 @@ def km_work_all_vs_all(json: dict): return return_val def triple_miner_work(json: list): - indexing.index._connect_to_mongo() + _initialize_mongo_caching() + km_set = [] for query in json: @@ -175,6 +178,10 @@ def update_index_work(json: dict): n_files = json['n_files'] else: n_files = math.inf + if 'clear_cache' in json: + clear_cache = json['clear_cache'] + else: + clear_cache = True # download baseline print('Checking for files to download...') @@ -206,7 +213,9 @@ def update_index_work(json: dict): # remove the old index index_builder.overwrite_old_index() - clear_mongo_cache() + + if clear_cache: + clear_mongo_cache([]) # re-queue interrupted jobs _queue_jobs(interrupted_jobs) @@ -217,6 +226,15 @@ def clear_mongo_cache(json): indexing.index._connect_to_mongo() indexing.index._empty_mongo() +def _initialize_mongo_caching(): + indexing.index._connect_to_mongo() + if li.the_index._check_if_mongo_should_be_refreshed(): + clear_mongo_cache([]) + + # this second call looks weird, but it's to cache the terms_to_check + # such as 'fever' to save the current state of the index + li.the_index._check_if_mongo_should_be_refreshed() + def _restart_workers(requeue_interrupted_jobs = True): print('restarting workers...') workers = Worker.all(_r) @@ -238,7 +256,7 @@ def _restart_workers(requeue_interrupted_jobs = True): # shut down the worker rqc.send_shutdown_command(_r, worker.name) - if requeue_interrupted_jobs(): + if requeue_interrupted_jobs: _queue_jobs(interrupted_jobs) return interrupted_jobs