Skip to content

Commit

Permalink
two threads for rebuilding
Browse files Browse the repository at this point in the history
  • Loading branch information
mhekkel committed Jan 21, 2025
1 parent fe13a24 commit b1c88f9
Showing 1 changed file with 79 additions and 50 deletions.
129 changes: 79 additions & 50 deletions src/data-service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,81 +283,110 @@ int data_service::rebuild()
if (not schema)
throw std::runtime_error("database schema not found (looking for db-schema.sql)");

uint64_t file_count = 0;
// --------------------------------------------------------------------

std::vector<fs::path> files;
for (auto di = fs::recursive_directory_iterator(db_dir); di != fs::recursive_directory_iterator(); ++di)
{
if (di->path().extension() != ".json")
continue;
++file_count;
files.push_back(di->path());
}

cif::progress_bar progress(file_count, "Processing");
cif::progress_bar progress(files.size(), "Processing");
blocking_queue<json> q;

// --------------------------------------------------------------------

std::forward_list<std::tuple<uint64_t, std::string, bool, std::string, std::string, std::string>> structures;
std::forward_list<std::tuple<uint64_t, uint64_t, double, int64_t, std::string, std::string, double>> pdb_hits;
std::forward_list<std::tuple<uint64_t, uint64_t, std::string, std::string, std::string, std::string, double>> transplants;

auto structures_i = structures.before_begin();
auto pdb_hits_i = pdb_hits.before_begin();
auto transplants_i = transplants.before_begin();
std::thread tp([&]()
{
auto structures_i = structures.before_begin();
auto pdb_hits_i = pdb_hits.before_begin();
auto transplants_i = transplants.before_begin();

uint64_t structure_id = 0, pdb_hit_id = 0, transplant_id = 0;
uint64_t structure_id = 0, pdb_hit_id = 0, transplant_id = 0;

auto as_string = [](json &e)
{
if (e.is_null())
return std::string{ "\\N" };
else
return e.as<std::string>();
};
auto as_string = [](json &e)
{
if (e.is_null())
return std::string{ "\\N" };
else
return e.as<std::string>();
};

for (auto di = fs::recursive_directory_iterator(db_dir); di != fs::recursive_directory_iterator(); ++di)
{
if (di->path().extension() != ".json")
continue;
for (;;)
{
auto data = q.pop();
if (data.empty())
break;

progress.consumed(1);

progress.consumed(1);
try
{
std::string id = data["id"].as<std::string>();

if (id == "nohd" or data["file"].is_null())
continue;

progress.message(id);

const auto &[type, uniprot_id, chunk, version] = parse_af_id(id);
bool chunked = fs::exists(file_locator::get_metadata_file(type, uniprot_id, 2, version));

// id, name, chunked, af_version, created, af_file
structures_i = structures.emplace_after(structures_i, ++structure_id, id, chunked ? 't' : 'f',
as_string(data["alphafill_version"]), as_string(data["date"]), as_string(data["file"]));

for (auto &hit : data["hits"])
{
// id, af_id, identity, length, pdb_asym_id, pdb_id, rmsd
pdb_hits_i = pdb_hits.emplace_after(pdb_hits_i,
++pdb_hit_id, structure_id, hit["alignment"]["identity"].as<double>(),
hit["alignment"]["length"].as<int64_t>(), as_string(hit["pdb_asym_id"]),
as_string(hit["pdb_id"]), hit["global_rmsd"].as<double>());

for (auto &transplant : hit["transplants"])
{
// id, hit_id, asym_id, compound_id, analogue_id, entity_id, rmsd
transplants_i = transplants.emplace_after(transplants_i,
++transplant_id, pdb_hit_id, as_string(transplant["asym_id"]),
as_string(transplant["compound_id"]), as_string(transplant["analogue_id"]),
as_string(transplant["entity_id"]), transplant["local_rmsd"].as<double>());
}
}
}
catch (const std::exception &ex)
{
std::clog << "\nError processing json\n";
}
} });

for (auto &f : files)
{
try
{
std::ifstream file(di->path());
std::ifstream file(f);

zeep::json::element data;
zeep::json::parse_json(file, data);

std::string id = data["id"].as<std::string>();

if (id == "nohd" or data["file"].is_null())
continue;

progress.message(id);

const auto &[type, uniprot_id, chunk, version] = parse_af_id(id);
bool chunked = fs::exists(file_locator::get_metadata_file(type, uniprot_id, 2, version));

// id, name, chunked, af_version, created, af_file
structures_i = structures.emplace_after(structures_i, ++structure_id, id, chunked ? 't' : 'f', as_string(data["alphafill_version"]), as_string(data["date"]), as_string(data["file"]));

for (auto &hit : data["hits"])
{
// id, af_id, identity, length, pdb_asym_id, pdb_id, rmsd
pdb_hits_i = pdb_hits.emplace_after(pdb_hits_i,
++pdb_hit_id, structure_id, hit["alignment"]["identity"].as<double>(), hit["alignment"]["length"].as<int64_t>(), as_string(hit["pdb_asym_id"]), as_string(hit["pdb_id"]), hit["global_rmsd"].as<double>());

for (auto &transplant : hit["transplants"])
{
// id, hit_id, asym_id, compound_id, analogue_id, entity_id, rmsd
transplants_i = transplants.emplace_after(transplants_i,
++transplant_id, pdb_hit_id, as_string(transplant["asym_id"]), as_string(transplant["compound_id"]), as_string(transplant["analogue_id"]), as_string(transplant["entity_id"]), transplant["local_rmsd"].as<double>());
}
}
q.push(std::move(data));
}
catch (const std::exception &ex)
{
std::clog << "\nError processing file " << di->path() << "\n";
std::clog << "Error processing file " << f << "\n";
}
}

q.push({});

tp.join();

// --------------------------------------------------------------------

pqxx::work tx(db_connection::instance());
Expand All @@ -376,21 +405,21 @@ int data_service::rebuild()
// Copy data, table by table

pqxx::stream_to s1 = pqxx::stream_to::table(tx, { "public", "af_structure" },
{"id", "name", "chunked", "af_version", "created", "af_file"});
{ "id", "name", "chunked", "af_version", "created", "af_file" });

for (auto &t : structures)
s1 << t;
s1.complete();

pqxx::stream_to s2 = pqxx::stream_to::table(tx, { "public", "af_pdb_hit" },
{"id", "af_id", "identity", "length", "pdb_asym_id", "pdb_id", "rmsd"});
{ "id", "af_id", "identity", "length", "pdb_asym_id", "pdb_id", "rmsd" });

for (auto &t : pdb_hits)
s2 << t;
s2.complete();

pqxx::stream_to s3 = pqxx::stream_to::table(tx, { "public", "af_transplant" },
{"id", "hit_id", "asym_id", "compound_id", "analogue_id", "entity_id", "rmsd"});
{ "id", "hit_id", "asym_id", "compound_id", "analogue_id", "entity_id", "rmsd" });

for (auto &t : transplants)
s3 << t;
Expand Down

0 comments on commit b1c88f9

Please sign in to comment.