Skip to content

Commit

Permalink
workers can update index as a job (#26)
Browse files Browse the repository at this point in the history
* handle error if no job

* workers can update index as a job

* Delete .github/workflows directory

* auto delete outdated abstract files

* re-download partial files automatically

* fix uncommitted unit test

* print message when redownloading partial file

* trying to fix weird bug in year numbering

* introduce y3k bug

* remove some debug text, fix bug in function call

* fix infinite while loop

* clean up a bit

* fix unit test

* fix unit test..
  • Loading branch information
rmillikin authored Mar 29, 2022
1 parent 6630df7 commit 3e4d173
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 137 deletions.
22 changes: 0 additions & 22 deletions .github/workflows/shell_build.yml

This file was deleted.

31 changes: 0 additions & 31 deletions .github/workflows/shell_push.yml

This file was deleted.

12 changes: 0 additions & 12 deletions actions/build_workflow.sh

This file was deleted.

4 changes: 0 additions & 4 deletions actions/deploy_workflow.sh

This file was deleted.

1 change: 1 addition & 0 deletions build_run_local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker compose up --build
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
dockerfile: ./src/server/Dockerfile
image: fast_km-server:build
ports:
- "5000:5000"
- "5001:5000"
depends_on:
- redis
networks:
Expand Down
87 changes: 65 additions & 22 deletions src/indexing/download_abstracts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import ftplib
import os
import math
import glob
from datetime import date
import os.path as path
import indexing.km_util as util

_ftp_lines = []

def connect_to_ftp_server(ftp_address: str, ftp_dir: str):
"""Connects to an FTP server given an FTP address and directory"""

Expand All @@ -23,11 +28,19 @@ def download_file(local_dir: str, remote_filename: str, ftp) -> None:
with open(local_filename, 'wb') as f:
ftp.retrbinary("RETR " + remote_filename, f.write)

def remove_empty_file(filename: str):
"""Removes the file if the file is <1 byte in size"""
def remove_partial_file(filename: str, expected_size: int):
"""Removes partially downloaded file, given an expected file size"""

if path.exists(filename):
local_size = path.getsize(filename)
else:
local_size = 0

if path.exists(filename) and path.getsize(filename) < 1:
if local_size != expected_size and path.exists(filename):
os.remove(filename)
return filename

return None

def list_files_to_download(ftp_address: str, ftp_dir: str, local_dir: str):
"""Lists files in the FTP directory that are not in the local directory"""
Expand All @@ -37,19 +50,36 @@ def list_files_to_download(ftp_address: str, ftp_dir: str, local_dir: str):
ftp = connect_to_ftp_server(ftp_address, ftp_dir)
remote_filenames = ftp.nlst()

# determine the size of files on the server and re-download any local files
# that have only been partially downloaded
ftp.retrlines('LIST', _retrline_callback)

# TODO: determine these instead of hardcoding them
byte_column = 4
filename_column = 8

byte_dict = {}
for line in _ftp_lines:
split_line = [x for x in str(line).split(' ') if x]
file_bytes = int(split_line[byte_column])
file_name = split_line[filename_column]
byte_dict[file_name] = file_bytes

for remote_filename in remote_filenames:
local_filename = path.join(local_dir, remote_filename)
remove_empty_file(local_filename)
remote_size = byte_dict[remote_filename]

if remove_partial_file(local_filename, remote_size):
print('partial file found, we will re-download it: ' + local_filename)

if not path.exists(local_filename):
files_to_download.append(remote_filename)

ftp.quit()
_ftp_lines.clear()
return files_to_download

# TODO: delete/update old files if date or bytes is different
# TODO: unit tests
def bulk_download(ftp_address: str, ftp_dir: str, local_dir: str):
def bulk_download(ftp_address: str, ftp_dir: str, local_dir: str, n_to_download = math.inf):
"""Download all files from an FTP server directory. The server can
disconnect without warning, which results in an EOF exception and an
empty (zero byte) file written. In this case, the script will re-connect,
Expand All @@ -59,37 +89,50 @@ def bulk_download(ftp_address: str, ftp_dir: str, local_dir: str):
if not os.path.exists(local_dir):
os.mkdir(local_dir)

# get list of files to download
remote_files_to_get = list_files_to_download(ftp_address, ftp_dir,
local_dir)
remote_files_to_get = ['temp']
n_downloaded = 0

print('Need to download ' + str(len(remote_files_to_get)) + ' files'
+ ' from ' + ftp_address + '/' + ftp_dir)
while remote_files_to_get and n_downloaded < n_to_download:
# get list of files to download
remote_files_to_get = list_files_to_download(ftp_address, ftp_dir,
local_dir)

print('Need to download ' + str(len(remote_files_to_get)) + ' files'
+ ' from ' + ftp_address + '/' + ftp_dir)

# download the files
while True:
# delete any *.xml.gz* file from previous years
current_year = int(date.today().strftime("%y"))
for year in range(10, current_year):
files_to_remove = glob.glob(os.path.join(local_dir, "pubmed" + str(year) + "*.xml.gz*"))

for file in files_to_remove:
if os.path.basename(file) not in remote_files_to_get:
print('deleting outdated file from \'' + str(year) + ': ' + file)
os.remove(file)

# download the files
try:
# connect to server and navigate to directory to download from
ftp = connect_to_ftp_server(ftp_address, ftp_dir)

for remote_filename in remote_files_to_get:
if n_downloaded >= n_to_download:
break

local_filepath = path.join(local_dir, remote_filename)
remove_empty_file(local_filepath)


if not path.exists(local_filepath):
download_file(local_dir, remote_filename, ftp)
n_downloaded += 1
util.report_progress(n_downloaded, len(remote_files_to_get))

if n_downloaded == len(remote_files_to_get):
if n_downloaded > 0:
print('\n')
break

# handle server disconnections
except EOFError:
pass

# log out of FTP server
ftp.quit()
ftp.quit()

def _retrline_callback(ftp_line: str):
global _ftp_lines
_ftp_lines.append(ftp_line)
8 changes: 8 additions & 0 deletions src/indexing/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mmap
import pickle
import math
import os
import gc
import indexing.km_util as util
from indexing.abstract_catalog import AbstractCatalog
Expand Down Expand Up @@ -82,10 +83,17 @@ def decache_token(self, token: str):
del self._query_cache[ltoken]

def _open_connection(self) -> None:
if not os.path.exists(self._bin_path):
print('warning: index does not exist and needs to be built')
return

self.file_obj = open(self._bin_path, mode='rb')
self.connection = mmap.mmap(self.file_obj.fileno(), length=0, access=mmap.ACCESS_READ)

def _init_byte_info(self) -> None:
if not os.path.exists(self._txt_path):
return

with open(self._txt_path, 'r', encoding=util.encoding) as t:
for index, line in enumerate(t):
split = line.split(sep=delim)
Expand Down
29 changes: 14 additions & 15 deletions src/indexing/index_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,14 @@ class IndexBuilder():
def __init__(self, path_to_pubmed_abstracts: str):
self.path_to_pubmed_abstracts = path_to_pubmed_abstracts

def build_index(self, dump_rate = 300000):
def build_index(self, dump_rate = 300000, overwrite_old = True):
print('cataloging abstracts...')
# catalog abstracts
abstract_catalog = AbstractCatalog(self.path_to_pubmed_abstracts)
abstract_catalog.catalog_abstracts()
abstract_catalog.catalog.clear() # saves RAM

# delete the old index. MUST do this because if indexing is
# interrupted, then we will have a new catalog of abstracts but an
# old index, with no way of knowing if the index is done building
old_index = util.get_index_file(self.path_to_pubmed_abstracts)
old_offsets = util.get_offset_file(self.path_to_pubmed_abstracts)
if os.path.exists(old_index):
os.remove(old_index)
if os.path.exists(old_offsets):
os.remove(old_offsets)

print('building index...')
# build the index
catalog_path = util.get_abstract_catalog(self.path_to_pubmed_abstracts)
cold_storage = dict()
Expand All @@ -44,7 +36,14 @@ def build_index(self, dump_rate = 300000):

# write the index
self._serialize_hot_to_cold_storage(hot_storage, cold_storage, consolidate_cold_storage=True)
self._write_index_to_disk(cold_storage)
self._write_index_to_disk(cold_storage, overwrite_old)

def overwrite_old_index(self):
temp_index_path = util.get_index_file(self.path_to_pubmed_abstracts) + '.tmp'
temp_offset_path = util.get_offset_file(self.path_to_pubmed_abstracts) + '.tmp'

os.replace(temp_index_path, util.get_index_file(self.path_to_pubmed_abstracts))
os.replace(temp_offset_path, util.get_offset_file(self.path_to_pubmed_abstracts))

def _index_abstract(self, abstract: Abstract, hot_storage: dict):
tokens = util.get_tokens(abstract.title)
Expand Down Expand Up @@ -115,7 +114,7 @@ def _serialize_hot_to_cold_storage(self, hot_storage: dict, cold_storage: dict,
serialized_combined_dict = quickle.dumps(combined_dict)
cold_storage[token] = serialized_combined_dict

def _write_index_to_disk(self, cold_storage: dict):
def _write_index_to_disk(self, cold_storage: dict, overwrite_old = True):
dir = os.path.dirname(util.get_index_file(self.path_to_pubmed_abstracts))

if not os.path.exists(dir):
Expand All @@ -141,5 +140,5 @@ def _write_index_to_disk(self, cold_storage: dict):
b.write(serialized_pmids)

# done writing; rename the temp files
os.replace(temp_index_path, util.get_index_file(self.path_to_pubmed_abstracts))
os.replace(temp_offset_path, util.get_offset_file(self.path_to_pubmed_abstracts))
if overwrite_old:
self.overwrite_old_index()
29 changes: 12 additions & 17 deletions src/run_worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import multiprocessing
import os
import time
import argparse
from workers.km_worker import start_worker
import indexing.download_abstracts as downloader
from indexing.index_builder import IndexBuilder
import workers.loaded_index as li
import indexing.km_util as util

parser = argparse.ArgumentParser()
parser.add_argument('-w', '--workers', default=1)
parser.add_argument('-b', '--build_index', default=False)
args = parser.parse_args()

def start_workers(do_multiprocessing = True):
Expand All @@ -19,14 +14,22 @@ def start_workers(do_multiprocessing = True):
if type(n_workers) is str:
n_workers = int(n_workers)

print('starting ' + str(n_workers) + ' workers...')

if do_multiprocessing:
jobs = []
worker_processes = []
for i in range(0, n_workers):
p = multiprocessing.Process(target=start_worker)
jobs.append(p)
worker_processes.append(p)
p.start()

while True:
# if a worker process is dead, restart it
time.sleep(5)
for i, worker in enumerate(worker_processes):
if not worker or not worker.is_alive():
p = multiprocessing.Process(target=start_worker)
worker_processes[i] = p
p.start()

else:
start_worker()

Expand All @@ -35,14 +38,6 @@ def main():
time.sleep(10)
li.pubmed_path = '/mnt/pubmed'

rebuild_index = args.build_index
build_index = rebuild_index or (not os.path.exists(util.get_index_file(li.pubmed_path)))

if build_index:
#downloader.bulk_download()
index_builder = IndexBuilder(li.pubmed_path)
index_builder.build_index()

start_workers()

if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 3e4d173

Please sign in to comment.