From 04843933f91a354511cbbfc4aff086de94089052 Mon Sep 17 00:00:00 2001 From: u0154820 Date: Thu, 11 Jan 2024 10:26:47 +0200 Subject: [PATCH] update automation addition --- README.md | 4 +- resource_code/config.py | 31 +- resource_code/data_integration.py | 672 ----------------------- resource_code/data_integration_kaggle.py | 180 ++++++ resource_code/data_integration_openml.py | 423 ++++++++++++++ resource_code/data_integration_pwc.py | 171 ++++++ resource_code/dump_storage.py | 37 ++ resource_code/get_data_sample.py | 39 +- resource_code/openml_data_collector.py | 19 +- resource_code/preprocessing_modules.py | 4 +- resource_code/update_sources.py | 38 ++ 11 files changed, 913 insertions(+), 705 deletions(-) delete mode 100644 resource_code/data_integration.py create mode 100644 resource_code/data_integration_kaggle.py create mode 100644 resource_code/data_integration_openml.py create mode 100644 resource_code/data_integration_pwc.py create mode 100644 resource_code/dump_storage.py create mode 100644 resource_code/update_sources.py diff --git a/README.md b/README.md index 22c1c1f..3611947 100644 --- a/README.md +++ b/README.md @@ -85,4 +85,6 @@ View and explore the [RDF mappings](https://github.com/dtai-kg/MLSea-Discover/tr Generate the RDF dumps of MLSea-KG by running: - python data_integration.py \ No newline at end of file + python data_integration_openml.py + python data_integration_kaggle.py + python data_integration_pwc.py \ No newline at end of file diff --git a/resource_code/config.py b/resource_code/config.py index 46b3d6d..f23e464 100644 --- a/resource_code/config.py +++ b/resource_code/config.py @@ -5,14 +5,27 @@ PWC_INPUT = "/Users/ioannisdasoulas/Desktop/ML-Discovery/ML-KG/Data/PwC-Data/" OUTPUT_PATH = "/Users/ioannisdasoulas/Desktop/ML-Discovery/ML-KG/RDF_Dumps/" ORIGINAL_DATA_FOLDER = "Original-Data/" -#UPDATE_MONTH_FOLDER = "December2023/" +UPDATE_MONTH_FOLDER = "10-01-2024/" # OpenML API Checkpoints -OPENML_RUN_CHECKPOINT = 4037070 +OPENML_RUN_CHECKPOINT = 4037082 OPENML_RUN_CURRENT_OFFSET = 6000000 -OPENML_DATASET_CHECKPOINT = 5399 -OPENML_FLOW_CHECKPOINT = 47250 -OPENML_TASK_CHECKPOINT = 16736 +OPENML_DATASET_CHECKPOINT = 5402 +OPENML_FLOW_CHECKPOINT = 16751 +OPENML_TASK_CHECKPOINT = 47250 + +# Dumps current file number +OPENML_TASK_DUMP_PART = 1 +OPENML_FLOW_DUMP_PART = 1 +OPENML_DATASET_DUMP_PART = 1 +OPENML_RUN_DUMP_PART = 29 +KAGGLE_DUMP_PART = 1 +PWC_DUMP_PART = 1 + +# Triples limit per dump +OPENML_DUMP_LIMIT = 50000000 +KAGGLE_DUMP_LIMIT = 30000000 +PWC_DUMP_LIMIT = 20000000 def update_openml_checkpoints(run_cp, dataset_cp, task_cp, flow_cp): @@ -21,10 +34,10 @@ def update_openml_checkpoints(run_cp, dataset_cp, task_cp, flow_cp): content = file.read() # Update the values in memory - content = content.replace('OPENML_RUN_CHECKPOINT = 4037070', 'OPENML_RUN_CHECKPOINT = ' + str(run_cp)) - content = content.replace('OPENML_DATASET_CHECKPOINT = 5399', 'OPENML_DATASET_CHECKPOINT = ' + str(dataset_cp)) - content = content.replace('OPENML_FLOW_CHECKPOINT = 47250', 'OPENML_FLOW_CHECKPOINT = ' + str(task_cp)) - content = content.replace('OPENML_TASK_CHECKPOINT = 16736', 'OPENML_TASK_CHECKPOINT = ' + str(flow_cp)) + content = content.replace('OPENML_RUN_CHECKPOINT = 4037082', 'OPENML_RUN_CHECKPOINT = ' + str(run_cp)) + content = content.replace('OPENML_DATASET_CHECKPOINT = 5402', 'OPENML_DATASET_CHECKPOINT = ' + str(dataset_cp)) + content = content.replace('OPENML_FLOW_CHECKPOINT = 16751', 'OPENML_FLOW_CHECKPOINT = ' + str(flow_cp)) + content = content.replace('OPENML_TASK_CHECKPOINT = 47250', 'OPENML_TASK_CHECKPOINT = ' + str(task_cp)) # Write the changes back to the constants.py file with open('config.py', 'w') as file: diff --git a/resource_code/data_integration.py b/resource_code/data_integration.py deleted file mode 100644 index 5785bc3..0000000 --- a/resource_code/data_integration.py +++ /dev/null @@ -1,672 +0,0 @@ -import morph_kgc -from get_data_sample import * -from queries import * -import sys -import pandas as pd -import json -import warnings -import gzip -import shutil -import os -from preprocessing_modules import * -warnings.simplefilter(action='ignore', category=FutureWarning) -import config - -def integrate_kaggle_kernel(kernels_df, users_df, kernel_versions_df, - targetpath, files, file_part, file_subpart): - - kernels_df['CreationDate'] = kernels_df['CreationDate'].apply(preprocess_datetime) - kernel_versions_df['CreationDate'] = kernel_versions_df['CreationDate'].apply(preprocess_datetime) - kernel_versions_df = preprocess_df_strings(kernel_versions_df) - users_df = preprocess_df_strings(users_df) - data_dict = {"kernels_df": kernels_df, - "users_df": users_df, - "kernel_versions_df": kernel_versions_df} - graph = morph_kgc.materialize('./morph_config/kaggle_kernel_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "kaggle_kernels_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - return files, len(graph) - -def integrate_kaggle_dataset(datasets_df, users_df, dataset_versions_df, dataset_tags_df, - targetpath, files, file_part, file_subpart): - - datasets_df['CreationDate'] = datasets_df['CreationDate'].apply(preprocess_datetime) - dataset_versions_df['CreationDate'] = dataset_versions_df['CreationDate'].apply(preprocess_datetime) - dataset_versions_df = preprocess_df_strings(dataset_versions_df) - users_df = preprocess_df_strings(users_df) - dataset_tags_df = preprocess_df_strings(dataset_tags_df) - data_dict = {"datasets_df": datasets_df, - "users_df": users_df, - "dataset_versions_df": dataset_versions_df, - "dataset_tags_df": dataset_tags_df} - graph = morph_kgc.materialize('./morph_config/kaggle_dataset_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "kaggle_datasets_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - return files, len(graph) - -def integrate_pwc_object(mapping_config_file, targetpath, - files, file_part, file_subpart): - - graph = morph_kgc.materialize(mapping_config_file) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "pwc_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - return files, len(graph) - - -def integrate_openml_tasks(tasks_df, targetpath, files, file_part, file_subpart): - - tasks_df = preprocess_df_strings(tasks_df) - data_dict = {"tasks_df": tasks_df} - graph = morph_kgc.materialize('./morph_config/task_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "openml_tasks_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - - return files, len(graph) - - -def integrate_openml_datasets( - datasets_df, dataset_creators_df, dataset_tags_df, - dataset_features_df, dataset_references_df, targetpath, - files, file_part, file_subpart): - - datasets_df = preprocess_df_strings(datasets_df) - dataset_creators_df = preprocess_df_strings(dataset_creators_df) - dataset_tags_df = preprocess_df_strings(dataset_tags_df) - dataset_features_df = preprocess_df_strings(dataset_features_df) - data_dict = {"datasets_df": datasets_df, - "dataset_creators_df": dataset_creators_df, - "dataset_tags_df": dataset_tags_df, - "dataset_features_df": dataset_features_df, - "dataset_references_df": dataset_references_df} - graph = morph_kgc.materialize('./morph_config/dataset_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "openml_datasets_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - - return files, len(graph) - - -def integrate_openml_runs( - runs_df, run_evaluations_df, run_settings_df, targetpath, - files, file_part, file_subpart): - - runs_df['upload_time'] = runs_df['upload_time'].apply(preprocess_datetime) - runs_df = preprocess_df_strings(runs_df) - run_settings_df = preprocess_df_strings(run_settings_df) - run_evaluations_df = preprocess_df_strings(run_evaluations_df) - data_dict = {"runs_df": runs_df, - "run_evaluations_df": run_evaluations_df, - "run_settings_df": run_settings_df} - graph = morph_kgc.materialize('./morph_config/run_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "openml_runs_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - - return files, len(graph) - - -def integrate_openml_flows( - flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, targetpath, - files, file_part, file_subpart): - - flows_df = preprocess_df_strings(flows_df) - flow_params_df = preprocess_df_strings(flow_params_df) - flow_dependencies_df = preprocess_df_strings(flow_dependencies_df) - data_dict = {"flows_df": flows_df, - "flow_params_df": flow_params_df, - "flow_tags_df": flow_tags_df, - "flow_dependencies_df": flow_dependencies_df} - graph = morph_kgc.materialize('./morph_config/flow_conf.ini', data_dict) - print("RDF generated!") - print("Saving to disk...") - # for s,p,o in graph.triples((None, None, None)): - # print(s,p,o) - - filename = "openml_flows_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" - graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") - files.append(filename) - - - return files, len(graph) - -# def integrate_openml_flows( -# flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, db, named_graph): ## Integrate directly to db - -# data_dict = {"flows_df": flows_df, -# "flow_params_df": flow_params_df, -# "flow_tags_df": flow_tags_df, -# "flow_dependencies_df": flow_dependencies_df} -# graph = morph_kgc.materialize('./morph_config/flow_conf.ini', data_dict) -# print("RDF generated! Uploading to database...") -# db.insert_data(named_graph, graph) - -# return - - - -def find_instance_count(db, graph): - - query = get_query(SELECT_ID_COUNT, graph) - try: - count = int(db.get_triples(query)["results"]["bindings"][0]["instanceCount"]["value"]) - except: - print("Connection problem! Returning...") - sys.exit() - - return count - -# def integrate_openml_tasks_from_csv(datapath, db, batch_size): ## Integration directly to db - -# named_graph = OPENML_TASK_GRAPH - -# task_batch_size, task_batch_offset = batch_size, find_instance_count(db, named_graph) -# tasks_df, tasks_clearance = get_task_batch( -# datapath, task_batch_offset, task_batch_size) - -# while tasks_clearance == True: - -# print(f"\nIntegrating triples from Task {task_batch_offset + 1} to Task {task_batch_size+task_batch_offset}...") -# integrate_openml_tasks(tasks_df, db, named_graph) -# print("Integration complete!\n") - -# task_batch_offset += task_batch_size - -# tasks_df, tasks_clearance = get_task_batch( -# datapath, task_batch_offset, task_batch_size) - -# print("No more task data to integrate. Returning...\n") - -# return - -def integrate_openml_tasks_from_csv(datapath, targetpath, batch_offset, batch_size): - - tasks_df, tasks_clearance = get_task_batch( - datapath, batch_offset, batch_size) - file_part = 1 - file_subpart = 1 - total_triples = 0 - goal_triples = 50000000 - files = [] - - while tasks_clearance == True: - - print(f"\nIntegrating triples from Task {batch_offset + 1} to Task {batch_size+batch_offset}...") - files, n_triples = integrate_openml_tasks(tasks_df, targetpath, files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "openml_tasks_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - tasks_df, tasks_clearance = get_task_batch( - datapath, batch_offset, batch_size) - - if len(files) > 0: - output_file = "openml_tasks_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - print("No more task data to integrate. Returning...\n") - - return - -def integrate_openml_flows_from_csv(datapath, targetpath, batch_offset, batch_size): - - flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, flow_clearance = get_flow_batch( - datapath, batch_offset, batch_size) - file_part = 1 - file_subpart = 1 - total_triples = 0 - goal_triples = 50000000 - files = [] - - while flow_clearance == True: - - print(f"\nIntegrating triples from Flow {batch_offset + 1} to Flow {batch_size+batch_offset}...") - files, n_triples = integrate_openml_flows(flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, - targetpath, files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "openml_flows_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, flow_clearance = get_flow_batch( - datapath, batch_offset, batch_size) - - if len(files) > 0: - output_file = "openml_flows_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - print("No more flow data to integrate. Returning...\n") - - return - - -def integrate_openml_datasets_from_csv(datapath, targetpath, batch_offset, batch_size): - - (datasets_df, dataset_creators_df, dataset_tags_df, - dataset_features_df, dataset_references_df, dataset_clearance) = get_dataset_batch( - datapath, batch_offset, batch_size) - file_part = 1 - file_subpart = 1 - total_triples = 0 - goal_triples = 25000000 - files = [] - - while dataset_clearance == True: - - print(f"\nIntegrating triples from Dataset {batch_offset + 1} to Dataset {batch_size+batch_offset}...") - files, n_triples = integrate_openml_datasets(datasets_df, dataset_creators_df, dataset_tags_df, - dataset_features_df, dataset_references_df, targetpath, files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "openml_datasets_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - (datasets_df, dataset_creators_df, dataset_tags_df, - dataset_features_df, dataset_references_df, dataset_clearance) = get_dataset_batch( - datapath, batch_offset, batch_size) - - if len(files) > 0: - output_file = "openml_datasets_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - print("No more dataset data to integrate. Returning...\n") - - return - - -def integrate_openml_runs_from_csv(datapath, targetpath, batch_offset, batch_size): - - run_checkpoint_1, run_checkpoint_2, run_checkpoint_3 = 0, 3162550, 5999999 - - full_runs_df = pd.read_csv(datapath + "runs3.csv", dtype={'did': 'Int64', - 'error_message': 'object', 'openml_url': 'object', 'predictions_url': 'object', 'uploader_name': 'object'}) - full_run_evaluations_df = pd.read_csv(datapath + "run_evaluations3.csv") - full_run_settings_df = pd.read_csv(datapath + "run_settings3.csv") - - runs_df, run_evaluations_df, run_settings_df, run_clearance = get_run_batch( - datapath, batch_offset, batch_size, full_runs_df, full_run_evaluations_df, full_run_settings_df) - file_part = 16 - file_subpart = 1 - total_triples = 0 - goal_triples = 50000000 - files = [] - - while run_clearance == True: - - print(f"\nIntegrating triples from Run {batch_offset + run_checkpoint_1 + 1} to Run {batch_size + batch_offset + run_checkpoint_1}...") - files, n_triples = integrate_openml_runs(runs_df, run_evaluations_df, run_settings_df, targetpath, - files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "openml_runs_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - runs_df, run_evaluations_df, run_settings_df, run_clearance = get_run_batch( - datapath, batch_offset, batch_size, - full_runs_df, full_run_evaluations_df, full_run_settings_df) - - if len(files) > 0: - output_file = "openml_runs_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - print("No more run data to integrate. Returning...\n") - - return - - -def integrate_pwc_from_json_batch(datapath, targetpath, filename, mapping_config_file, batch_size, - file_part, file_subpart, total_triples, goal_triples, files): - - batch_offset = 0 - with open(datapath+filename, 'r', encoding='utf-8') as j: - contents = json.load(j) - - sample_filename = filename.split('.')[0] + "_sample.json" - batch_clearance = get_pwc_json_batch(sample_filename, contents, batch_offset, batch_size) - - while batch_clearance == True: - print(f"\nIntegrating triples from PwC {filename} {batch_offset + 1} to PwC {filename} {batch_size + batch_offset}...") - files, n_triples = integrate_pwc_object(mapping_config_file, targetpath, - files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "pwc_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - batch_clearance = get_pwc_json_batch(sample_filename, contents, batch_offset, batch_size) - - print("No more data to integrate. Returning...\n") - - return file_part, file_subpart, total_triples, files - - -def integrate_pwc_from_csv(datapath, targetpath, filename, mapping_config_file, batch_size, - file_part, file_subpart, total_triples, goal_triples, files): - - df = pd.read_csv(datapath + filename) - batch_offset = 0 - batch, batch_clearance = get_df_batch(df, batch_offset, batch_size) - batch.to_csv("Mappings/PwC/Data/evaluations_sample.csv", index=False) - - while batch_clearance == True: - print(f"\nIntegrating triples from PwC {filename} {batch_offset + batch_size + 1} to PwC {filename} {batch_size + batch_offset}...") - files, n_triples = integrate_pwc_object(mapping_config_file, targetpath, - files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "pwc_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - batch_offset += batch_size - - batch, batch_clearance = get_df_batch(df, batch_offset, batch_size) - batch.to_csv("Mappings/PwC/Data/evaluations_sample.csv", index=False) - - print("No more data to integrate. Returning...\n") - - return file_part, file_subpart, total_triples, files - - -def integrate_kaggle_datasets_from_csv(datapath, targetpath, offset, batch_size, - file_part, total_triples, goal_triples, files): - - datasets_df, users_df, dataset_versions_df, dataset_tags_df, tags_df = load_kaggle_dataset_data(datapath) - - datasets_df_sample, users_df_sample, dataset_versions_df_sample, dataset_tags_df_sample, dataset_clearance = ( - get_kaggle_dataset_batch(datasets_df, users_df, - dataset_versions_df, dataset_tags_df, tags_df, offset, batch_size)) - file_subpart = 1 - - while dataset_clearance: - print(f"\nIntegrating triples from Dataset {offset + 1} to Dataset {batch_size + offset}...") - files, n_triples = integrate_kaggle_dataset(datasets_df_sample, users_df_sample, - dataset_versions_df_sample, dataset_tags_df_sample, targetpath, files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "kaggle" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - offset += batch_size - - datasets_df_sample, users_df_sample, dataset_versions_df_sample, dataset_tags_df_sample, dataset_clearance = ( - get_kaggle_dataset_batch(datasets_df, users_df, - dataset_versions_df, dataset_tags_df, tags_df, offset, batch_size)) - - print("No more dataset data to integrate. Exiting...\n") - - return file_part, total_triples, files - - - -def integrate_kaggle_kernels_from_csv(datapath, targetpath, offset, batch_size, - file_part, total_triples, goal_triples, files): - - (kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, - dataset_versions_df, kernel_languages_df) = load_kaggle_kernel_data(datapath) - - kernels_df_sample, users_df_sample, kernel_versions_df_sample, kernel_clearance = ( - get_kaggle_kernel_batch(kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, - dataset_versions_df, kernel_languages_df, offset, batch_size) ) - file_subpart = 1 - - while kernel_clearance: - print(f"\nIntegrating triples from Kernel {offset + 1} to Kernel {batch_size + offset}...") - files, n_triples = integrate_kaggle_kernel(kernels_df_sample, users_df_sample, - kernel_versions_df_sample, targetpath, files, file_part, file_subpart) - total_triples += n_triples - print("Integration complete!") - print("Current dump triple count:", total_triples, "\n") - file_subpart += 1 - - if total_triples > goal_triples: - output_file = "kaggle" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - file_subpart = 1 - file_part += 1 - total_triples = 0 - files = [] - - offset += batch_size - - kernels_df_sample, users_df_sample, kernel_versions_df_sample, kernel_clearance = ( - get_kaggle_kernel_batch(kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, - dataset_versions_df, kernel_languages_df, offset, batch_size)) - - - print("No more kernel data to integrate. Returning...\n") - - return file_part, total_triples, files - -def integrate_pwc(): - - print("Processing Papers with Code dumps...") - datapath = config.PWC_INPUT + config.ORIGINAL_DATA_FOLDER - filenames = ['datasets.json', - 'paper_code_links.json', - 'papers_with_abstracts.json', - 'evaluations.json'] - for file in filenames: - preprocess_json(datapath, file) - pre_process_pwc_evaluations(datapath) - filenames.append('evaluations.csv') - print("Dumps were succesfully cleaned!\n") - - mappings = ['./morph_config/pwc_dataset_conf.ini', - './morph_config/pwc_paper_code_links_conf.ini', - './morph_config/pwc_paper_conf.ini', - './morph_config/pwc_model_conf.ini', - './morph_config/pwc_evaluations_conf.ini'] - - targetpath = config.OUTPUT_PATH - file_part = 1 - file_subpart = 1 - total_triples = 0 - goal_triples = 50000000 - files = [] - batch_size = 5000 - for i in range(0,len(filenames)): - - if filenames[i].split('.')[1] == "json": - file_part, file_subpart, total_triples, files = integrate_pwc_from_json_batch( - datapath, targetpath, filenames[i], mappings[i], batch_size, - file_part, file_subpart, total_triples, goal_triples, files) - else: - file_part, file_subpart, total_triples, files = integrate_pwc_from_csv( - datapath, targetpath, filenames[i], mappings[i], batch_size, - file_part, file_subpart, total_triples, goal_triples, files) - - - output_file = "pwc_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - return - -def integrate_openml(): - - datapath = config.OPENML_INPUT + config.ORIGINAL_DATA_FOLDER - targetpath = config.OUTPUT_PATH - batch_offset = 0 - batch_size = 1000 - dataset_batch_size = 1 - integrate_openml_tasks_from_csv(datapath, targetpath, batch_offset, batch_size) - integrate_openml_flows_from_csv(datapath, targetpath, batch_offset, batch_size) - integrate_openml_datasets_from_csv(datapath, targetpath, batch_offset, batch_size) - integrate_openml_runs_from_csv(datapath, targetpath, batch_offset, dataset_batch_size) - - return - -def integrate_kaggle(): - - datapath = config.KAGGLE_INPUT + config.ORIGINAL_DATA_FOLDER - targetpath = config.OUTPUT_PATH - file_part = 1 - total_triples = 0 - goal_triples = 50000000 - files = [] - - offset = 0 - size = 10000 - file_part, total_triples, files = integrate_kaggle_datasets_from_csv( - datapath, targetpath, offset, size, - file_part, total_triples, goal_triples, files) - - offset = 0 - size = 10000 - file_part, total_triples, files = integrate_kaggle_kernels_from_csv( - datapath, targetpath, offset, size, - file_part, total_triples, goal_triples, files) - - output_file = "kaggle_" + str(file_part) + ".nt.gz" - concatenate_and_compress(targetpath, files, output_file) - delete_files(targetpath, files) - - return - - -def concatenate_and_compress(targetpath, files, output_file): - print("\nConcatenating and compressing dumps...") - with open(targetpath + output_file, 'wb') as out_file, \ - gzip.open(out_file, 'wt', encoding='utf-8') as zip_file: - for file_name in files: - with open(targetpath + file_name, 'rt', encoding='utf-8') as in_file: - shutil.copyfileobj(in_file, zip_file) - -def delete_files(targetpath, files): - for file_name in files: - os.remove(targetpath + file_name) - -if __name__ == "__main__": - - integrate_openml() - integrate_kaggle() - integrate_pwc() - - - - - - - - - - - \ No newline at end of file diff --git a/resource_code/data_integration_kaggle.py b/resource_code/data_integration_kaggle.py new file mode 100644 index 0000000..fd4a24f --- /dev/null +++ b/resource_code/data_integration_kaggle.py @@ -0,0 +1,180 @@ +import morph_kgc +from get_data_sample import load_kaggle_dataset_data, load_kaggle_kernel_data, get_kaggle_dataset_batch, get_kaggle_kernel_batch +from queries import * +import pandas as pd +import warnings +from dump_storage import * +from preprocessing_modules import * +warnings.simplefilter(action='ignore', category=FutureWarning) + +def integrate_kaggle_kernel(kernels_df, users_df, kernel_versions_df, + targetpath, files, file_part, file_subpart): + + kernels_df['CreationDate'] = kernels_df['CreationDate'].apply(preprocess_datetime) + kernel_versions_df['CreationDate'] = kernel_versions_df['CreationDate'].apply(preprocess_datetime) + kernel_versions_df = preprocess_df_strings(kernel_versions_df) + users_df = preprocess_df_strings(users_df) + data_dict = {"kernels_df": kernels_df, + "users_df": users_df, + "kernel_versions_df": kernel_versions_df} + graph = morph_kgc.materialize('./morph_config/kaggle_kernel_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "kaggle_kernels_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + return files, len(graph) + +def integrate_kaggle_dataset(datasets_df, users_df, dataset_versions_df, dataset_tags_df, + targetpath, files, file_part, file_subpart): + + datasets_df['CreationDate'] = datasets_df['CreationDate'].apply(preprocess_datetime) + dataset_versions_df['CreationDate'] = dataset_versions_df['CreationDate'].apply(preprocess_datetime) + dataset_versions_df = preprocess_df_strings(dataset_versions_df) + users_df = preprocess_df_strings(users_df) + dataset_tags_df = preprocess_df_strings(dataset_tags_df) + data_dict = {"datasets_df": datasets_df, + "users_df": users_df, + "dataset_versions_df": dataset_versions_df, + "dataset_tags_df": dataset_tags_df} + graph = morph_kgc.materialize('./morph_config/kaggle_dataset_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "kaggle_datasets_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + return files, len(graph) + +def integrate_kaggle_datasets_from_csv(datapath, targetpath, offset, batch_size, + file_part, total_triples, goal_triples, files, update=False): + + datasets_df, users_df, dataset_versions_df, dataset_tags_df, tags_df = load_kaggle_dataset_data(datapath, update) + + datasets_df_sample, users_df_sample, dataset_versions_df_sample, dataset_tags_df_sample, dataset_clearance = ( + get_kaggle_dataset_batch(datasets_df, users_df, + dataset_versions_df, dataset_tags_df, tags_df, offset, batch_size)) + file_subpart = 1 + + while dataset_clearance: + print(f"\nIntegrating triples from Dataset {offset + 1} to Dataset {batch_size + offset}...") + files, n_triples = integrate_kaggle_dataset(datasets_df_sample, users_df_sample, + dataset_versions_df_sample, dataset_tags_df_sample, targetpath, files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "kaggle" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + offset += batch_size + + datasets_df_sample, users_df_sample, dataset_versions_df_sample, dataset_tags_df_sample, dataset_clearance = ( + get_kaggle_dataset_batch(datasets_df, users_df, + dataset_versions_df, dataset_tags_df, tags_df, offset, batch_size)) + + print("No more dataset data to integrate. Exiting...\n") + + return file_part, total_triples, files + + + +def integrate_kaggle_kernels_from_csv(datapath, targetpath, offset, batch_size, + file_part, total_triples, goal_triples, files, update=False): + + (kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, + dataset_versions_df, kernel_languages_df) = load_kaggle_kernel_data(datapath, update) + + kernels_df_sample, users_df_sample, kernel_versions_df_sample, kernel_clearance = ( + get_kaggle_kernel_batch(kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, + dataset_versions_df, kernel_languages_df, offset, batch_size) ) + file_subpart = 1 + + while kernel_clearance: + print(f"\nIntegrating triples from Kernel {offset + 1} to Kernel {batch_size + offset}...") + files, n_triples = integrate_kaggle_kernel(kernels_df_sample, users_df_sample, + kernel_versions_df_sample, targetpath, files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "kaggle" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + offset += batch_size + + kernels_df_sample, users_df_sample, kernel_versions_df_sample, kernel_clearance = ( + get_kaggle_kernel_batch(kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, + dataset_versions_df, kernel_languages_df, offset, batch_size)) + + + print("No more kernel data to integrate. Returning...\n") + + return file_part, total_triples, files + + +def integrate_kaggle(update = False): + + print("Preparing Kaggle integration...") + datapath = config.KAGGLE_INPUT + targetpath = config.OUTPUT_PATH + + if update == False: + file_part = 1 + total_triples = 0 + files = [] + elif update == True: + file_part = config.KAGGLE_DUMP_PART + current_dump = "kaggle_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + goal_triples = config.KAGGLE_DUMP_LIMIT + + print("Initializing Kaggle Dataset integration...\n") + offset = 0 + size = 10000 + file_part, total_triples, files = integrate_kaggle_datasets_from_csv( + datapath, targetpath, offset, size, + file_part, total_triples, goal_triples, files, update) + + print("Initializing Kaggle Kernel integration...\n") + offset = 0 + size = 10000 + file_part, total_triples, files = integrate_kaggle_kernels_from_csv( + datapath, targetpath, offset, size, + file_part, total_triples, goal_triples, files, update) + + if len(files)>0: + output_file = "kaggle_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + return + + +if __name__ == "__main__": + + integrate_kaggle(update = True) \ No newline at end of file diff --git a/resource_code/data_integration_openml.py b/resource_code/data_integration_openml.py new file mode 100644 index 0000000..82d8de7 --- /dev/null +++ b/resource_code/data_integration_openml.py @@ -0,0 +1,423 @@ +import morph_kgc +from get_data_sample import get_task_batch, get_dataset_batch, get_run_batch, get_flow_batch +from queries import * +import sys +import pandas as pd +import warnings +from dump_storage import * +from preprocessing_modules import * +import config +from openml_data_collector import get_checkpoints +warnings.simplefilter(action='ignore', category=FutureWarning) + + +def integrate_openml_tasks(tasks_df, targetpath, files, file_part, file_subpart): + + tasks_df = preprocess_df_strings(tasks_df) + data_dict = {"tasks_df": tasks_df} + graph = morph_kgc.materialize('./morph_config/task_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "openml_tasks_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + + return files, len(graph) + + +def integrate_openml_datasets( + datasets_df, dataset_creators_df, dataset_tags_df, + dataset_features_df, dataset_references_df, targetpath, + files, file_part, file_subpart): + + datasets_df = preprocess_df_strings(datasets_df) + dataset_creators_df = preprocess_df_strings(dataset_creators_df) + dataset_tags_df = preprocess_df_strings(dataset_tags_df) + dataset_features_df = preprocess_df_strings(dataset_features_df) + data_dict = {"datasets_df": datasets_df, + "dataset_creators_df": dataset_creators_df, + "dataset_tags_df": dataset_tags_df, + "dataset_features_df": dataset_features_df, + "dataset_references_df": dataset_references_df} + graph = morph_kgc.materialize('./morph_config/dataset_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "openml_datasets_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + + return files, len(graph) + + +def integrate_openml_runs( + runs_df, run_evaluations_df, run_settings_df, targetpath, + files, file_part, file_subpart): + + runs_df['upload_time'] = runs_df['upload_time'].apply(preprocess_datetime) + runs_df = preprocess_df_strings(runs_df) + run_settings_df = preprocess_df_strings(run_settings_df) + run_evaluations_df = preprocess_df_strings(run_evaluations_df) + data_dict = {"runs_df": runs_df, + "run_evaluations_df": run_evaluations_df, + "run_settings_df": run_settings_df} + graph = morph_kgc.materialize('./morph_config/run_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "openml_runs_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + + return files, len(graph) + + +def integrate_openml_flows( + flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, targetpath, + files, file_part, file_subpart): + + flows_df = preprocess_df_strings(flows_df) + flow_params_df = preprocess_df_strings(flow_params_df) + flow_dependencies_df = preprocess_df_strings(flow_dependencies_df) + data_dict = {"flows_df": flows_df, + "flow_params_df": flow_params_df, + "flow_tags_df": flow_tags_df, + "flow_dependencies_df": flow_dependencies_df} + graph = morph_kgc.materialize('./morph_config/flow_conf.ini', data_dict) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "openml_flows_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + + return files, len(graph) + +def find_instance_count(db, graph): + + query = get_query(SELECT_ID_COUNT, graph) + try: + count = int(db.get_triples(query)["results"]["bindings"][0]["instanceCount"]["value"]) + except: + print("Connection problem! Returning...") + sys.exit() + + return count + +# Direct integration implementation +# def integrate_openml_flows( +# flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, db, named_graph): ## Integrate directly to db + +# data_dict = {"flows_df": flows_df, +# "flow_params_df": flow_params_df, +# "flow_tags_df": flow_tags_df, +# "flow_dependencies_df": flow_dependencies_df} +# graph = morph_kgc.materialize('./morph_config/flow_conf.ini', data_dict) +# print("RDF generated! Uploading to database...") +# db.insert_data(named_graph, graph) + +# return + +# def integrate_openml_tasks_from_csv(datapath, db, batch_size): ## Integration directly to db + +# named_graph = OPENML_TASK_GRAPH + +# task_batch_size, task_batch_offset = batch_size, find_instance_count(db, named_graph) +# tasks_df, tasks_clearance = get_task_batch( +# datapath, task_batch_offset, task_batch_size) + +# while tasks_clearance == True: + +# print(f"\nIntegrating triples from Task {task_batch_offset + 1} to Task {task_batch_size+task_batch_offset}...") +# integrate_openml_tasks(tasks_df, db, named_graph) +# print("Integration complete!\n") + +# task_batch_offset += task_batch_size + +# tasks_df, tasks_clearance = get_task_batch( +# datapath, task_batch_offset, task_batch_size) + +# print("No more task data to integrate. Returning...\n") + +# return + +def integrate_openml_tasks_from_csv(datapath, targetpath, batch_offset, batch_size, update=False): + + print("Preparing for OpenML Task integration...\n") + tasks_df, tasks_clearance = get_task_batch( + datapath, batch_offset, batch_size) + + files = [] + if update == False: + file_subpart = 1 + file_part = 1 + total_triples = 0 + elif update == True and tasks_clearance == True: + file_subpart = 2 + file_part = config.OPENML_TASK_DUMP_PART + current_dump = "openml_tasks_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + goal_triples = config.OPENML_DUMP_LIMIT + + while tasks_clearance == True: + + print(f"\nIntegrating triples from Task {batch_offset + 1} to Task {batch_size+batch_offset}...") + files, n_triples = integrate_openml_tasks(tasks_df, targetpath, files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "openml_tasks_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + tasks_df, tasks_clearance = get_task_batch( + datapath, batch_offset, batch_size) + + if len(files) > 0: + output_file = "openml_tasks_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + print("No more task data to integrate. Returning...\n") + + return + +def integrate_openml_flows_from_csv(datapath, targetpath, batch_offset, batch_size, update=False): + + print("Preparing for OpenML Flow integration...\n") + flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, flow_clearance = get_flow_batch( + datapath, batch_offset, batch_size) + + files = [] + if update == False: + file_subpart = 1 + file_part = 1 + total_triples = 0 + elif update == True and flow_clearance == True: + file_subpart = 2 + file_part = config.OPENML_FLOW_DUMP_PART + current_dump = "openml_flows_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + goal_triples = config.OPENML_DUMP_LIMIT + + while flow_clearance == True: + + print(f"\nIntegrating triples from Flow {batch_offset + 1} to Flow {batch_size+batch_offset}...") + files, n_triples = integrate_openml_flows(flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, + targetpath, files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "openml_flows_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + flows_df, flow_params_df, flow_tags_df, flow_dependencies_df, flow_clearance = get_flow_batch( + datapath, batch_offset, batch_size) + + if len(files) > 0: + output_file = "openml_flows_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + print("No more flow data to integrate. Returning...\n") + + return + + +def integrate_openml_datasets_from_csv(datapath, targetpath, batch_offset, batch_size, update=False): + + print("Preparing for OpenML Dataset integration...\n") + (datasets_df, dataset_creators_df, dataset_tags_df, + dataset_features_df, dataset_references_df, dataset_clearance) = get_dataset_batch( + datapath, batch_offset, batch_size) + + files = [] + if update == False: + file_subpart = 1 + file_part = 1 + total_triples = 0 + elif update == True and dataset_clearance == True: + file_subpart = 2 + file_part = config.OPENML_DATASET_DUMP_PART + current_dump = "openml_datasets_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + goal_triples = config.OPENML_DUMP_LIMIT + + while dataset_clearance == True: + + print(f"\nIntegrating triples from Dataset {batch_offset + 1} to Dataset {batch_size+batch_offset}...") + files, n_triples = integrate_openml_datasets(datasets_df, dataset_creators_df, dataset_tags_df, + dataset_features_df, dataset_references_df, targetpath, files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "openml_datasets_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + (datasets_df, dataset_creators_df, dataset_tags_df, + dataset_features_df, dataset_references_df, dataset_clearance) = get_dataset_batch( + datapath, batch_offset, batch_size) + + if len(files) > 0: + output_file = "openml_datasets_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + print("No more dataset data to integrate. Returning...\n") + + return + + +def integrate_openml_runs_from_csv(datapath, targetpath, batch_offset, batch_size, update=False): + + print("Preparing for OpenML Run integration...\n") + run_checkpoint_1, run_checkpoint_2, run_checkpoint_3 = 0, 3162550, 5999999 + + full_runs_df = pd.read_csv(datapath + "runs3.csv", dtype={'did': 'Int64', + 'error_message': 'object', 'openml_url': 'object', 'predictions_url': 'object', 'uploader_name': 'object'}) + full_run_evaluations_df = pd.read_csv(datapath + "run_evaluations3.csv") + full_run_settings_df = pd.read_csv(datapath + "run_settings3.csv") + + runs_df, run_evaluations_df, run_settings_df, run_clearance = get_run_batch( + datapath, batch_offset, batch_size, full_runs_df, full_run_evaluations_df, full_run_settings_df) + + files = [] + if update == False: + file_subpart = 1 + file_part = 1 + total_triples = 0 + elif update == True and run_clearance == True: + file_subpart = 2 + file_part = config.OPENML_RUN_DUMP_PART + current_dump = "openml_runs_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + goal_triples = config.OPENML_DUMP_LIMIT + + while run_clearance == True: + + print(f"\nIntegrating triples from Run {batch_offset + run_checkpoint_1 + 1} to Run {batch_size + batch_offset + run_checkpoint_1}...") + files, n_triples = integrate_openml_runs(runs_df, run_evaluations_df, run_settings_df, targetpath, + files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "openml_runs_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + runs_df, run_evaluations_df, run_settings_df, run_clearance = get_run_batch( + datapath, batch_offset, batch_size, + full_runs_df, full_run_evaluations_df, full_run_settings_df) + + if len(files) > 0: + output_file = "openml_runs_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + print("No more run data to integrate. Returning...\n") + + return + + +def integrate_openml(update=False): + + datapath = config.OPENML_INPUT + targetpath = config.OUTPUT_PATH + + if update == False: + tasks_offset = 0 + flows_offset = 0 + datasets_offset = 0 + runs_offset = 0 + else: + tasks_offset = config.OPENML_TASK_CHECKPOINT + flows_offset = config.OPENML_FLOW_CHECKPOINT + datasets_offset = config.OPENML_DATASET_CHECKPOINT + runs_offset = config.OPENML_RUN_CHECKPOINT + + batch_size = 1000 + dataset_batch_size = 1 + + integrate_openml_tasks_from_csv(datapath, targetpath, tasks_offset, batch_size, update) + integrate_openml_flows_from_csv(datapath, targetpath, flows_offset, batch_size, update) + integrate_openml_datasets_from_csv(datapath, targetpath, datasets_offset, dataset_batch_size, update) + integrate_openml_runs_from_csv(datapath, targetpath, runs_offset, batch_size, update) + + # Update OpenML integration checkpoints + runs_csv = datapath + "runs3.csv" + datasets_csv = datapath + "datasets.csv" + tasks_csv = datapath + "tasks.csv" + flows_csv = datapath + "flows.csv" + + checkpoints, latest_ids = get_checkpoints([runs_csv, datasets_csv, tasks_csv, flows_csv]) + run_cp, dataset_cp, task_cp, flow_cp = checkpoints[0], checkpoints[1], checkpoints[2], checkpoints[3] + config.update_openml_checkpoints(run_cp, dataset_cp, task_cp, flow_cp) + + return + +if __name__ == "__main__": + + integrate_openml(update=True) diff --git a/resource_code/data_integration_pwc.py b/resource_code/data_integration_pwc.py new file mode 100644 index 0000000..97484e9 --- /dev/null +++ b/resource_code/data_integration_pwc.py @@ -0,0 +1,171 @@ +import morph_kgc +from get_data_sample import get_df_batch, get_pwc_json_batch +from queries import * +import sys +import pandas as pd +import json +import warnings +import re +from dump_storage import * +from preprocessing_modules import * +from update_sources import * +warnings.simplefilter(action='ignore', category=FutureWarning) + +def integrate_pwc_object(mapping_config_file, targetpath, + files, file_part, file_subpart): + + graph = morph_kgc.materialize(mapping_config_file) + print("RDF generated!") + print("Saving to disk...") + # for s,p,o in graph.triples((None, None, None)): + # print(s,p,o) + + filename = "pwc_" + str(file_part) + "_part_" + str(file_subpart) + ".nt" + graph.serialize(destination = targetpath + filename, format = "nt", encoding = "utf-8") + files.append(filename) + + return files, len(graph) + +def integrate_pwc_from_json_batch(datapath, targetpath, filename, mapping_config_file, batch_size, + file_part, file_subpart, total_triples, goal_triples, files): + + batch_offset = 0 + with open(datapath+filename, 'r', encoding='utf-8') as j: + contents = json.load(j) + + sample_filename = filename.split('.')[0] + "_sample.json" + batch_clearance = get_pwc_json_batch(sample_filename, contents, batch_offset, batch_size) + + while batch_clearance == True: + print(f"\nIntegrating triples from PwC {filename} {batch_offset + 1} to PwC {filename} {batch_size + batch_offset}...") + files, n_triples = integrate_pwc_object(mapping_config_file, targetpath, + files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "pwc_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + batch_clearance = get_pwc_json_batch(sample_filename, contents, batch_offset, batch_size) + + print("No more data to integrate. Returning...\n") + + return file_part, file_subpart, total_triples, files + + +def integrate_pwc_from_csv(datapath, targetpath, filename, mapping_config_file, batch_size, + file_part, file_subpart, total_triples, goal_triples, files): + + df = pd.read_csv(datapath + filename) + batch_offset = 0 + batch, batch_clearance = get_df_batch(df, batch_offset, batch_size) + batch.to_csv("Mappings/PwC/Data/evaluations_sample.csv", index=False) + + while batch_clearance == True: + print(f"\nIntegrating triples from PwC {filename} {batch_offset + batch_size + 1} to PwC {filename} {batch_size + batch_offset}...") + files, n_triples = integrate_pwc_object(mapping_config_file, targetpath, + files, file_part, file_subpart) + total_triples += n_triples + print("Integration complete!") + print("Current dump triple count:", total_triples, "\n") + file_subpart += 1 + + if total_triples > goal_triples: + output_file = "pwc_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + file_subpart = 1 + file_part += 1 + total_triples = 0 + files = [] + + batch_offset += batch_size + + batch, batch_clearance = get_df_batch(df, batch_offset, batch_size) + batch.to_csv("Mappings/PwC/Data/evaluations_sample.csv", index=False) + + print("No more data to integrate. Returning...\n") + + return file_part, file_subpart, total_triples, files + + +def integrate_pwc(update = False): + + print("Processing Papers with Code dumps...") + + targetpath = config.OUTPUT_PATH + goal_triples = config.PWC_DUMP_LIMIT + original_path = config.PWC_INPUT + config.ORIGINAL_DATA_FOLDER + update_path = config.PWC_INPUT + config.UPDATE_MONTH_FOLDER + updates_folder = "Updates/" + + if update == False: + file_part = 1 + file_subpart = 1 + total_triples = 0 + files = [] + datapath = original_path + else: + datapath = update_path + updates_folder + file_subpart = 2 + file_part = config.PWC_DUMP_PART + current_dump = "pwc_" + str(file_part) + ".nt" + total_triples = count_dump_triples(targetpath + current_dump + ".gz") + unzip_and_save(targetpath + current_dump + ".gz") + files = [current_dump] + + + filenames = ['datasets.json', + 'paper_code_links.json', + 'papers_with_abstracts.json', + 'evaluations.json'] + for file in filenames: + preprocess_json(datapath, file) + if update == True: + get_json_updates(original_path + file, update_path + file, + update_path + updates_folder + file) + + pre_process_pwc_evaluations(datapath) + filenames.append('evaluations.csv') + print("Dumps were succesfully cleaned!\n") + + mappings = ['./morph_config/pwc_dataset_conf.ini', + './morph_config/pwc_paper_code_links_conf.ini', + './morph_config/pwc_paper_conf.ini', + './morph_config/pwc_model_conf.ini', + './morph_config/pwc_evaluations_conf.ini'] + + batch_size = 5000 + for i in range(0,len(filenames)): + + if filenames[i].split('.')[1] == "json": + file_part, file_subpart, total_triples, files = integrate_pwc_from_json_batch( + datapath, targetpath, filenames[i], mappings[i], batch_size, + file_part, file_subpart, total_triples, goal_triples, files) + else: + file_part, file_subpart, total_triples, files = integrate_pwc_from_csv( + datapath, targetpath, filenames[i], mappings[i], batch_size, + file_part, file_subpart, total_triples, goal_triples, files) + + + if len(files) > 0: + output_file = "pwc_" + str(file_part) + ".nt.gz" + concatenate_and_compress(targetpath, files, output_file) + delete_files(targetpath, files) + + return + + +if __name__ == "__main__": + + integrate_pwc(update = True) \ No newline at end of file diff --git a/resource_code/dump_storage.py b/resource_code/dump_storage.py new file mode 100644 index 0000000..d018674 --- /dev/null +++ b/resource_code/dump_storage.py @@ -0,0 +1,37 @@ +import gzip +import shutil +import os +import config +import pandas as pd + +def concatenate_and_compress(targetpath, files, output_file): + print("\nConcatenating and compressing dumps...") + with open(targetpath + output_file, 'wb') as out_file, \ + gzip.open(out_file, 'wt', encoding='utf-8') as zip_file: + for file_name in files: + with open(targetpath + file_name, 'rt', encoding='utf-8') as in_file: + shutil.copyfileobj(in_file, zip_file) + +def delete_files(targetpath, files): + for file_name in files: + os.remove(targetpath + file_name) + +def count_dump_triples(file_path): + with gzip.open(file_path, 'rt', encoding='utf-8') as gzipped_file: + line_count = sum(1 for line in gzipped_file) + return line_count + +def unzip_and_save(file_path): + # Check if the file is a gzip file + if not file_path.endswith('.gz'): + raise ValueError("Not a Gzip file") + + # Define the output file path + output_file_path = os.path.splitext(file_path)[0] + + with gzip.open(file_path, 'rt', encoding='utf-8') as gzipped_file: + with open(output_file_path, 'w', encoding='utf-8') as output_file: + output_file.write(gzipped_file.read()) + + return + diff --git a/resource_code/get_data_sample.py b/resource_code/get_data_sample.py index a90a25e..29837d0 100644 --- a/resource_code/get_data_sample.py +++ b/resource_code/get_data_sample.py @@ -5,6 +5,8 @@ warnings.simplefilter(action='ignore', category=FutureWarning) from datetime import datetime from preprocessing_modules import * +import config +from update_sources import * def get_openml_random_sample(initial_sample_size, random_sample_size): @@ -110,9 +112,9 @@ def get_df_batch(df, offset, size): integration_clearance = True df_size = len(df) - if (offset+size) > df_size and offset < df_size: + if (offset+size) >= df_size and offset < df_size: df = df.iloc[offset:].copy() - elif offset > df_size: + elif offset >= df_size: integration_clearance = False elif offset+size < df_size: df = df.iloc[offset:(offset+size)].copy() @@ -235,11 +237,18 @@ def get_kaggle_kernel_batch(kernels_df, users_df, kernel_versions_df, kernel_ver return kernels_df, users_df, kernel_versions_df, kernel_clearance -def load_kaggle_dataset_data(datapath): +def load_kaggle_dataset_data(datapath, update = False): - datasets_df = pd.read_csv(datapath + "Datasets.csv")[[ - "Id", "CreatorUserId", "CurrentDatasetVersionId", - "CreationDate", "TotalViews", "TotalDownloads", "TotalKernels"]] + dataset_columns_to_keep = ["Id", "CreatorUserId", "CurrentDatasetVersionId","CreationDate"] + original_path = datapath + config.ORIGINAL_DATA_FOLDER + update_path = datapath + config.UPDATE_MONTH_FOLDER + + if update == False: + datapath = original_path + datasets_df = pd.read_csv(datapath + "Datasets.csv")[dataset_columns_to_keep] + elif update == True: + datapath = update_path + datasets_df = get_csv_updates(original_path + "Datasets.csv", datapath + "Datasets.csv", dataset_columns_to_keep) users_df = pd.read_csv(datapath + "Users.csv")[["Id", "UserName", "DisplayName"]] @@ -254,14 +263,21 @@ def load_kaggle_dataset_data(datapath): return datasets_df, users_df, dataset_versions_df, dataset_tags_df, tags_df -def load_kaggle_kernel_data(datapath): +def load_kaggle_kernel_data(datapath, update = False): + + kernel_columns_to_keep = ["Id", "AuthorUserId", "CurrentKernelVersionId", "CreationDate", "CurrentUrlSlug"] + original_path = datapath + config.ORIGINAL_DATA_FOLDER + update_path = datapath + config.UPDATE_MONTH_FOLDER + + if update == False: + datapath = original_path + kernels_df = pd.read_csv(datapath + "Kernels.csv")[kernel_columns_to_keep] + elif update == True: + datapath = update_path + kernels_df = get_csv_updates(original_path + "Kernels.csv", datapath + "Kernels.csv", kernel_columns_to_keep) users_df = pd.read_csv(datapath + "Users.csv")[["Id", "UserName", "DisplayName"]] - kernels_df = pd.read_csv(datapath + "Kernels.csv")[[ - "Id", "AuthorUserId", "CurrentKernelVersionId", - "CreationDate", "CurrentUrlSlug" - ]] #Filtering of kernels from restricted accounts kernels_df = kernels_df[kernels_df["AuthorUserId"].isin(users_df["Id"])] @@ -284,4 +300,3 @@ def load_kaggle_kernel_data(datapath): return kernels_df, users_df, kernel_versions_df, kernel_version_ds_df, dataset_versions_df, kernel_languages_df - diff --git a/resource_code/openml_data_collector.py b/resource_code/openml_data_collector.py index c6cb25b..cfb1425 100644 --- a/resource_code/openml_data_collector.py +++ b/resource_code/openml_data_collector.py @@ -4,12 +4,13 @@ import numpy as np import multiprocessing import validators -from preprocessing_modules import * import config +from preprocessing_modules import * + def extract_run_sources(n_runs, timeout, batch_size, run_cp): - filepath = "~/Desktop/ML-KG/Data/OpenML-Data/" + filepath = config.OPENML_INPUT #Download OpenML Runs print("Extracting OpenML Run data...") @@ -148,7 +149,7 @@ def populate_metadata_run_dfs(batch_run_df, run_dict): def extract_dataset_sources(n_datasets, timeout, batch_size, dataset_cp): - filepath = "~/Desktop/ML-KG/Data/OpenML-Data/" + filepath = config.OPENML_INPUT # named_graph = OPENML_DATASET_GRAPH # dataset_cp = find_instance_count(db, named_graph) ##################################### @@ -374,7 +375,7 @@ def populate_metadata_dataset_dfs(batch_dataset_df, dataset_dict): def extract_task_sources(n_tasks, timeout, batch_size, task_cp): - filepath = "~/Desktop/ML-KG/Data/OpenML-Data/" + filepath = config.OPENML_INPUT #Download OpenML Tasks print("Extracting OpenML Task data...") @@ -461,7 +462,7 @@ def populate_metadata_task_dfs(batch_task_df, task_dict): def extract_flow_sources(n_flows, timeout, batch_size, flow_cp): - filepath = "~/Desktop/ML-KG/Data/OpenML-Data/" + filepath = config.OPENML_INPUT #Download OpenML Flows print("Extracting OpenML Flow data...") @@ -659,6 +660,8 @@ def openml_data_collector(): # Tasks: 261.4 K (261,404) # Flows: 16.7 K (16,719) + print("OpenML Data Collector initiated...\n") + openml.config.apikey = 'eee9181dd538cb1a9daac582a55efd72' filepath = config.OPENML_INPUT @@ -670,18 +673,16 @@ def openml_data_collector(): batch_size = 100 dataset_batch_size = 1 - # CSV storing checkpoints runs_csv = filepath + "runs3.csv" datasets_csv = filepath + "datasets.csv" tasks_csv = filepath + "tasks.csv" flows_csv = filepath + "flows.csv" checkpoints, latest_ids = get_checkpoints([runs_csv, datasets_csv, tasks_csv, flows_csv]) - run_cp, dataset_cp, task_cp, flow_cp = 6000000 +checkpoints[0], checkpoints[1], checkpoints[2], checkpoints[3] + run_cp, dataset_cp, task_cp, flow_cp = checkpoints[0], checkpoints[1], checkpoints[2], checkpoints[3] config.update_openml_checkpoints(run_cp, dataset_cp, task_cp, flow_cp) run_lid, dataset_lid, task_lid, flow_lid = latest_ids[0], latest_ids[1], latest_ids[2], latest_ids[3] - print("OpenML Data Collector initiated...\n") print(f"Currently already collected: {config.OPENML_RUN_CURRENT_OFFSET + run_cp} Runs, {dataset_cp} Datasets, {task_cp} Tasks and {flow_cp} Flows") print(f"Latest Run collected: Run {run_lid}") print(f"Latest Dataset collected: Dataset {dataset_lid}") @@ -708,7 +709,7 @@ def run_multi_thread_collector(): processes = [] runs_max_search_size = 100000 - datasets_max_search_size = 500 + datasets_max_search_size = 1000 tasks_max_search_size = 10000 flows_max_search_size = 3500 run_args = (runs_max_search_size, run_timeout, batch_size, config.OPENML_RUN_CURRENT_OFFSET + run_cp) diff --git a/resource_code/preprocessing_modules.py b/resource_code/preprocessing_modules.py index 1199593..a653617 100644 --- a/resource_code/preprocessing_modules.py +++ b/resource_code/preprocessing_modules.py @@ -9,9 +9,9 @@ def preprocess_json_strings(data, preprocess_string_func, preprocess_datetime_fu if isinstance(data, dict): for key, value in data.items(): if key.lower() == "date" or key.lower() == "paper_date" or key.lower() == "introduced_date": - data[key] = preprocess_datetime_func(value) + data[key] = preprocess_datetime_func(value) if value is not None else None elif key.lower() == "homepage": - data[key] = value.lower() + data[key] = value.lower() if value is not None else None else: data[key] = preprocess_json_strings(value, preprocess_string_func, preprocess_datetime_func) diff --git a/resource_code/update_sources.py b/resource_code/update_sources.py new file mode 100644 index 0000000..a4420b1 --- /dev/null +++ b/resource_code/update_sources.py @@ -0,0 +1,38 @@ +import pandas as pd +import config +import json +from preprocessing_modules import * +from json import dumps, loads + +def get_csv_updates(csv1, csv2, columns): + + # Load CSV files into DataFrames + df1 = pd.read_csv(csv1)[columns] + df2 = pd.read_csv(csv2)[columns] + + # Find rows in df2 that are not in df1 + updates = pd.merge(df1, df2, how='outer', indicator=True).query('_merge == "right_only"').drop('_merge', axis=1) + return updates + +def get_json_updates(json1, json2, output_json): + + # Load JSON files + with open(json1, 'r') as file: + data1 = json.load(file) + + with open(json2, 'r') as file: + data2 = json.load(file) + + # Identify new items in the updated array + set1 = set(dumps(x, sort_keys=True) for x in data1) + set2 = set(dumps(x, sort_keys=True) for x in data2) + new_items = [loads(x) for x in set2.difference(set1)] + + with open(output_json, 'w') as new_file: + json.dump(new_items, new_file, indent=2) + +# Example usage +# get_json_updates(config.PWC_INPUT + config.ORIGINAL_DATA_FOLDER + "paper_code_links.json", +# config.PWC_INPUT + config.UPDATE_MONTH_FOLDER + "paper_code_links.json", +# config.PWC_INPUT + config.UPDATE_MONTH_FOLDER + "paper_code_links2.json") +