Skip to content

Commit

Permalink
Merge pull request #5 from cgmeyer/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
cgmeyer authored Dec 20, 2019
2 parents 76c56d8 + 5d161cc commit cef41ca
Showing 1 changed file with 38 additions and 35 deletions.
73 changes: 38 additions & 35 deletions migration/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def write_tsv(self,df,project_id,node,name='temp'):
df.to_csv(outname, sep='\t', index=False, encoding='utf-8')
print("\tTotal of {} records written to node '{}' in file:\n\t\t{}.".format(len(df),node,outname))
except Exception as e:
print("Error writing TSV file: {}".format(e))
print("\tError writing TSV file: {}".format(e))
return df

def make_temp_files(self,prefix,suffix,name='temp',overwrite=True,nodes=['all']):
Expand Down Expand Up @@ -121,7 +121,7 @@ def merge_nodes(self,project_id,in_nodes,out_node,name='temp'):
in_nodes(list): List of node TSVs to merge into a single TSV.
out_node(str): The name of the new merged TSV.
"""
print("Merging nodes {} to '{}'.".format(in_nodes,out_node))
print("\tMerging nodes {} to '{}'.".format(in_nodes,out_node))
dfs = []
for node in in_nodes:
filename = "{}_{}_{}.tsv".format(name,project_id, node)
Expand Down Expand Up @@ -166,11 +166,11 @@ def merge_properties(self,project_id,node,properties,name='temp'):
df_merged = pd.concat([df_rest,df_old],ignore_index=True,sort=False)
df = df_merged.drop(columns=[old_prop])
dropped.append(old_prop)
print("Property '{}' merged into '{}' and dropped from '{}' TSV.".format(old_prop,prop,node))
print("\tProperty '{}' merged into '{}' and dropped from '{}' TSV.".format(old_prop,prop,node))
else:
print("Property '{}' not found in '{}' TSV. Skipping...".format(old_prop,node))
print("\tProperty '{}' not found in '{}' TSV. Skipping...".format(old_prop,node))
if len(dropped) > 0:
print("Properties {} merged into {}.".format(dropped,list(properties.keys())))
print("\tProperties {} merged into {}.".format(dropped,list(properties.keys())))
df = self.write_tsv(df,project_id,node)
return df
else:
Expand Down Expand Up @@ -218,7 +218,7 @@ def create_missing_links(self,project_id,node,link,old_parent,properties,new_dd,
create_missing_links(node='imaging_exam',link='visit',old_parent='cases',properties={'visit_label':'Imaging','visit_method':'In-person Visit'},new_dd=dd,old_dd=prod_dd,links=None)
create_missing_links(node='diagnosis',link='visit',old_parent='cases',properties={'visit_label':'Unknown','visit_method':'Unknown'},new_dd=dd,old_dd=prod_dd)
"""
print("Creating missing '{}' records with links to '{}' for '{}'.".format(link,old_parent,node))
print("\tCreating missing '{}' records with links to '{}' for '{}'.".format(link,old_parent,node))

df = self.read_tsv(project_id=project_id,node=node,name=name)
# filename = "{}_{}_{}.tsv".format(name,project_id,node)
Expand Down Expand Up @@ -341,7 +341,7 @@ def batch_add_visits(self,project_id,new_dd,old_dd,links):
print("\tNo links to 'case' found in the '{}' TSV.".format(node))
if len(dfs) > 0:
df = pd.concat(dfs,ignore_index=True,sort=False)
print("Total of {} missing visit links created for this batch.".format(total))
print("\tTotal of {} missing visit links created for this batch.".format(total))
return df

def move_properties(self,project_id,from_node,to_node,properties,dd,parent_node=None,required_props=None,name='temp'):
Expand All @@ -358,7 +358,7 @@ def move_properties(self,project_id,from_node,to_node,properties,dd,parent_node=
This moves the property 'military_status' from 'demographic' node to 'military_history' node, which should link to the same parent node 'case'.
move_properties(from_node='demographic',to_node='military_history',properties=['military_status'],parent_node='case')
"""
print("Moving {} from '{}' to '{}'.".format(properties,from_node,to_node))
print("\tMoving {} from '{}' to '{}'.".format(properties,from_node,to_node))

from_name = "{}_{}_{}.tsv".format(name,project_id,from_node) #from imaging_exam
try:
Expand Down Expand Up @@ -423,10 +423,10 @@ def move_properties(self,project_id,from_node,to_node,properties,dd,parent_node=
if len(vals) == 1:
case_data.loc[case_data['submitter_id']==case_id,header] = vals
elif len(vals) > 1:
print("{}: {}".format(header,vals))
print("\t{}: {}".format(header,vals))
if header == 'age_at_enrollment': # special case hard-coded for BRAIN Commons migration
lowest_val = min(vals, key=float)
print("Selecting lowest value '{}' from {}.".format(lowest_val,vals))
print("\tSelecting lowest value '{}' from {}.".format(lowest_val,vals))
case_data.loc[case_data['submitter_id']==case_id,header] = lowest_val
count += 1
all_to = pd.merge(df_to,case_data,on='submitter_id', how='left')
Expand All @@ -450,10 +450,10 @@ def move_properties(self,project_id,from_node,to_node,properties,dd,parent_node=
for prop in to_required:
if prop in list(required_props.keys()):
all_to[prop] = required_props[prop]
print("Missing required property '{}' added to new '{}' TSV with all {} values.".format(prop,to_node,required_props[prop]))
print("\tMissing required property '{}' added to new '{}' TSV with all {} values.".format(prop,to_node,required_props[prop]))
else:
all_to[prop] = np.nan
print("Missing required property '{}' added to new '{}' TSV with all null values.".format(prop,to_node))
print("\tMissing required property '{}' added to new '{}' TSV with all null values.".format(prop,to_node))

all_to.to_csv(to_name,sep='\t',index=False,encoding='utf-8')
print("\tProperties moved to '{}' node from '{}'. Data saved in file:\n\t{}".format(to_node,from_node,to_name))
Expand All @@ -468,7 +468,7 @@ def add_property(self,project_id,node,properties):
if prop not in list(df):
df[prop] = properties[prop]
else:
print("Property '{}' already in the TSV for node '{}'.".format(prop,node))
print("\tProperty '{}' already in the TSV for node '{}'.".format(prop,node))

df.to_csv(filename,sep='\t',index=False,encoding='utf-8')
return df
Expand Down Expand Up @@ -667,7 +667,7 @@ def merge_links(self,project_id,node,link,links_to_merge,name='temp'):
sid = "{}.submitter_id".format(sublink)
df.loc[df[link_name].isnull(), link_name] = df[sid]
df.to_csv(filename,sep='\t',index=False,encoding='utf-8')
print("Links merged to '{}' and data written to TSV file: \n\t{}".format(link,filename))
print("\tLinks merged to '{}' and data written to TSV file: \n\t\t{}".format(link,filename))
return df

def drop_ids(self,project_id,node,name='temp'):
Expand Down Expand Up @@ -702,7 +702,7 @@ def batch_drop_ids(self,project_id,suborder,name='temp'):
for node_order in suborder:

node = node_order[0]
print(node)
print("\t{}:".format(node))

df = self.read_tsv(project_id=project_id,node=node,name=name)
# filename = "{}_{}_{}.tsv".format(name,project_id,node)
Expand Down Expand Up @@ -733,7 +733,7 @@ def create_project(self,program,project):
}}""".format(program,program)
prog_json = json.loads(prog_txt)
data = self.sub.create_program(json=prog_json)
print(data)
print("\t{}".format(data))
proj_txt = """{{
"type": "project",
"code": "{}",
Expand All @@ -742,7 +742,7 @@ def create_project(self,program,project):
}}""".format(project,project,project)
proj_json = json.loads(proj_txt)
data = self.sub.create_project(program=program,json=proj_json)
print(data)
print("\t{}".format(data))

def remove_special_chars(self,project_id,node,name='temp'):
""" Replace a special character in 'Parkinson's Disease'
Expand All @@ -758,10 +758,10 @@ def remove_special_chars(self,project_id,node,name='temp'):
df_txt2 = re.sub(substring,"Parkinson's Disease",df_txt)
df = pd.read_csv(StringIO(df_txt2),sep='\t',dtype=str) # this converts int to float (adds .0 to int)
df.to_csv(filename,sep='\t',index=False, encoding='utf-8')
print("Special chars removed from: {}".format(filename))
print("\tSpecial chars removed from: {}".format(filename))

else:
print("No special chars found in {}".format(filename))
print("\tNo special chars found in {}".format(filename))

return df

Expand All @@ -773,7 +773,7 @@ def floats_to_integers(self,project_id,node,prop,name='temp'):

df[prop] = df[prop].str.extract(r'^(\d+).0$', expand=True)
df.to_csv(filename,sep='\t',index=False, encoding='utf-8')
print("Trailing '.0' decimals removed from: {}".format(filename))
print("\tTrailing '.0' decimals removed from: {}".format(filename))
return df

def get_submission_order(self,dd,project_id,name='temp',suffix='tsv',missing_nodes=['project','study','case','visit']):
Expand All @@ -798,7 +798,7 @@ def get_submission_order(self,dd,project_id,name='temp',suffix='tsv',missing_nod
else:
print("\tThe node '{}' is not in the data dictionary! Skipping...".format(node))

print("\tFound the following nodes:\n\t{}".format(all_nodes))
print("\tFound the following nodes:\n\t\t{}".format(all_nodes))

# Check for the common missing root nodes
for missing_node in missing_nodes:
Expand All @@ -809,7 +809,7 @@ def get_submission_order(self,dd,project_id,name='temp',suffix='tsv',missing_nod
while len(all_nodes) > 0:

node = all_nodes.pop(0)
print("\tDetermining order for node '{}'.".format(node))
#print("\tDetermining order for node '{}'.".format(node)) # for trouble-shooting

node_links = dd[node]['links']
for link in node_links:
Expand Down Expand Up @@ -840,7 +840,7 @@ def get_submission_order(self,dd,project_id,name='temp',suffix='tsv',missing_nod
else: #skip it for now
all_nodes.append(node)
else:
print("No link target_type found for node '{}'".format(node))
print("\tNo link target_type found for node '{}'".format(node))
#suborder = sorted(suborder.items(), key=operator.itemgetter(1))
suborder = {key:val for key, val in suborder.items() if val > 0}
print("\tSubmission Order: \n\t\t{}".format(suborder))
Expand Down Expand Up @@ -873,32 +873,34 @@ def submit_tsvs(self,project_id,suborder,check_done=False,name='temp'):
data = self.sub.submit_file(project_id=project_id,filename=filename,chunk_size=1000)
#print("data: {}".format(data)) #for trouble-shooting
logfile.write(filename + '\n' + json.dumps(data)+'\n\n') #put in log file

if len(data['invalid']) == 0 and len(data['succeeded']) > 0:
cmd = ['mv',filename,'done']
mv_done_cmd = ['mv',filename,'done']
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode('UTF-8')
print("Submission successful. Moving file to done:\n\t{}\n\n".format(filename))
output = subprocess.check_output(mv_done_cmd, stderr=subprocess.STDOUT).decode('UTF-8')
print("Submission successful. Moving file to done:\n\t\t{}\n\n".format(filename))
except Exception as e:
output = e.output.decode('UTF-8')
print("ERROR:" + output)
else:
if len(data['invalid'])>0:
invalid_records = list(data['invalid'].keys())[0:10]
for i in invalid_records:
print(data['invalid'][i])
print("Need to fix errors in {}".format(filename))
cmd = ['mv',filename,'failed']
print("{}".format(data['invalid'][i]))
print("Need to fix {} errors in '{}'".format(len(invalid_records),filename))

mv_failed_cmd = ['mv',filename,'failed']
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode('UTF-8')
print("Submission successful. Moving file to done:\n\t{}\n\n".format(filename))
output = subprocess.check_output(mv_failed_cmd, stderr=subprocess.STDOUT).decode('UTF-8')
print("Submission failed. Moving file to failed:\n\t\t{}".format(filename))
except Exception as e:
output = e.output.decode('UTF-8')
print("ERROR:" + output)

except Exception as e:
print(e)
print("\t{}".format(e))
else:
print("\nPreviously submitted file already exists in done directory:\n\t{}\n".format(done_file))
print("\tPreviously submitted file already exists in done directory:\n\t\t{}\n".format(done_file))

def check_migration_counts(self, projects=None, overwrite=False):
""" Gets counts and downloads TSVs for all nodes for every project.
Expand All @@ -925,11 +927,12 @@ def check_migration_counts(self, projects=None, overwrite=False):
query_txt = """{_%s_count (project_id:"%s")}""" % (node,project_id)
res = self.sub.query(query_txt)
count = res['data'][str('_'+node+'_count')]
print(str(count) + ' records found in node ' + node + ' in project ' + project_id)
print("\t{} records found in node '{}' in project '{}'.".format(str(count),node,project_id))

if count > 0:
filename = str(mydir+'/'+project_id+'_'+node+'.tsv')
if (os.path.isfile(filename)) and (overwrite is False):
print('Previously downloaded '+ filename )
print('\tPreviously downloaded '+ filename )
else:
prog,proj = project_id.split('-',1)
self.sub.export_node(prog,proj,node,'tsv',filename)
Expand Down

0 comments on commit cef41ca

Please sign in to comment.