Skip to content

Commit

Permalink
Merge branch 'rebuild-using-stream_to' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
mhekkel committed Jan 21, 2025
2 parents e91295f + fe13a24 commit e6263f6
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 132 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ endif()

if(BUILD_WEB_APPLICATION)
find_package(libpqxx 7.8.0 QUIET)

if(NOT libpqxx_FOUND)
FetchContent_Declare(
libpqxx
Expand Down Expand Up @@ -277,6 +277,7 @@ if(USE_RSRC)
if(BUILD_WEB_APPLICATION)
list(APPEND RESOURCES ${PROJECT_SOURCE_DIR}/docroot/
${PROJECT_SOURCE_DIR}/db-schema.sql
${PROJECT_SOURCE_DIR}/db-schema-tail.sql
${PROJECT_SOURCE_DIR}/scripts/refine.mcr)
endif()

Expand Down
4 changes: 4 additions & 0 deletions db-schema-tail.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- indices

create index hit_identity_ix on af_pdb_hit(identity);
create index hit_af_id_ix on af_pdb_hit(af_id);
5 changes: 0 additions & 5 deletions db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,3 @@ create table af_transplant (
);

alter table af_transplant owner to "$OWNER";

-- indices

create index hit_identity_ix on af_pdb_hit(identity);
create index hit_af_id_ix on af_pdb_hit(af_id);
240 changes: 117 additions & 123 deletions src/data-service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,26 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include <regex>
#include "data-service.hpp"
#include "alphafill.hpp"
#include "db-connection.hpp"
#include "https-client.hpp"
#include "queue.hpp"
#include "utilities.hpp"

#include <cif++.hpp>
#include <mcfp/mcfp.hpp>
#include <zeep/http/uri.hpp>
#include <zeep/json/parser.hpp>

#include <forward_list>
#include <fstream>
#include <regex>
#include <thread>

#include <sys/wait.h>
#include <unistd.h>

#include <cif++.hpp>
#include <mcfp/mcfp.hpp>

#include <zeep/http/uri.hpp>
#include <zeep/json/parser.hpp>

#include "alphafill.hpp"
#include "data-service.hpp"
#include "db-connection.hpp"
#include "https-client.hpp"
#include "queue.hpp"
#include "utilities.hpp"

namespace fs = std::filesystem;

// --------------------------------------------------------------------
Expand Down Expand Up @@ -261,164 +259,160 @@ uint32_t data_service::count_structures(float min_identity, const std::string &c

using json = zeep::json::element;

void process(blocking_queue<json> &q, cif::progress_bar &p)
void process(blocking_queue<json> &q, cif::progress_bar &p,
std::ostream &os_structures, std::ostream &os_pdb_hits, std::ostream &os_transplants)
{
pqxx::transaction tx1(db_connection::instance());

for (;;)
{
auto data = q.pop();
if (data.empty())
break;

std::string id = data["id"].as<std::string>();

p.message(id);

const auto &[type, uniprot_id, chunk, version] = parse_af_id(id);
bool chunked = fs::exists(file_locator::get_metadata_file(type, uniprot_id, 2, version));

auto r = tx1.exec1(R"(INSERT INTO af_structure (name, chunked, af_version, created, af_file) VALUES()" +
tx1.quote(id) + "," +
tx1.quote(chunked) + "," +
tx1.quote(data["alphafill_version"].as<std::string>()) + "," +
tx1.quote(data["date"].as<std::string>()) + "," +
tx1.quote(data["file"].as<std::string>()) +
") RETURNING id");

int64_t structure_id = r[0].as<int64_t>();

for (auto &hit : data["hits"])
{
r = tx1.exec1(R"(INSERT INTO af_pdb_hit (af_id, identity, length, pdb_asym_id, pdb_id, rmsd) VALUES ()" +
std::to_string(structure_id) + ", " +
std::to_string(hit["alignment"]["identity"].as<double>()) + ", " +
std::to_string(hit["alignment"]["length"].as<int64_t>()) + ", " +
tx1.quote(hit["pdb_asym_id"].as<std::string>()) + ", " +
tx1.quote(hit["pdb_id"].as<std::string>()) + ", " +
std::to_string(hit["global_rmsd"].as<double>()) +
") RETURNING id");

int64_t hit_id = r.front().as<int64_t>();

for (auto &transplant : hit["transplants"])
{
tx1.exec0(R"(INSERT INTO af_transplant (hit_id, asym_id, compound_id, analogue_id, entity_id, rmsd) VALUES ()" +
std::to_string(hit_id) + ", " +
tx1.quote(transplant["asym_id"].as<std::string>()) + ", " +
tx1.quote(transplant["compound_id"].as<std::string>()) + ", " +
tx1.quote(transplant["analogue_id"].as<std::string>()) + ", " +
tx1.quote(transplant["entity_id"].as<std::string>()) + ", " +
std::to_string(transplant["local_rmsd"].as<double>()) +
")");
}
}

p.consumed(1);
}

tx1.commit();
}

// --------------------------------------------------------------------

int data_service::rebuild(const std::string &db_user, const fs::path &db_dir)
int data_service::rebuild()
{
auto &config = mcfp::config::instance();

pqxx::work tx(db_connection::instance());
std::string db_dir = config.get("db-dir");

auto schema = cif::load_resource("db-schema.sql");
if (not schema)
throw std::runtime_error("database schema not found (looking for db-schema.sql)");

std::ostringstream os;
os << schema->rdbuf();
uint64_t file_count = 0;
for (auto di = fs::recursive_directory_iterator(db_dir); di != fs::recursive_directory_iterator(); ++di)
{
if (di->path().extension() != ".json")
continue;
++file_count;
}

std::string s(os.str());
cif::progress_bar progress(file_count, "Processing");

cif::replace_all(s, "$OWNER", db_user);
std::forward_list<std::tuple<uint64_t, std::string, bool, std::string, std::string, std::string>> structures;
std::forward_list<std::tuple<uint64_t, uint64_t, double, int64_t, std::string, std::string, double>> pdb_hits;
std::forward_list<std::tuple<uint64_t, uint64_t, std::string, std::string, std::string, std::string, double>> transplants;

tx.exec0(s);
tx.commit();
auto structures_i = structures.before_begin();
auto pdb_hits_i = pdb_hits.before_begin();
auto transplants_i = transplants.before_begin();

uint64_t structure_id = 0, pdb_hit_id = 0, transplant_id = 0;

auto as_string = [](json &e)
{
if (e.is_null())
return std::string{ "\\N" };
else
return e.as<std::string>();
};

std::vector<fs::path> files;
for (auto di = fs::recursive_directory_iterator(db_dir); di != fs::recursive_directory_iterator(); ++di)
{
if (di->path().extension() != ".json")
continue;
files.push_back(di->path());
}

cif::progress_bar progress(files.size(), "Processing");
blocking_queue<fs::path> q1;
blocking_queue<json> q2;
std::exception_ptr ep;
progress.consumed(1);

std::thread t([&q2, &progress, &ep]()
{
try
{
process(q2, progress);
}
catch (const std::exception &ex)
{
ep = std::current_exception();
} });
std::ifstream file(di->path());

std::vector<std::thread> tg;
size_t thread_count = config.get<size_t>("threads");
if (thread_count < 1)
thread_count = 1;
zeep::json::element data;
zeep::json::parse_json(file, data);

for (size_t i = 0; i < thread_count; ++i)
{
tg.emplace_back([&q1, &q2, &ep]()
{
for (;;)
{
auto f = q1.pop();
std::string id = data["id"].as<std::string>();

if (f.empty())
break;
if (id == "nohd" or data["file"].is_null())
continue;

try
{
std::ifstream file(f);
progress.message(id);

zeep::json::element data;
zeep::json::parse_json(file, data);
const auto &[type, uniprot_id, chunk, version] = parse_af_id(id);
bool chunked = fs::exists(file_locator::get_metadata_file(type, uniprot_id, 2, version));

q2.push(std::move(data));
}
catch (const std::exception &ex)
// id, name, chunked, af_version, created, af_file
structures_i = structures.emplace_after(structures_i, ++structure_id, id, chunked ? 't' : 'f', as_string(data["alphafill_version"]), as_string(data["date"]), as_string(data["file"]));

for (auto &hit : data["hits"])
{
// id, af_id, identity, length, pdb_asym_id, pdb_id, rmsd
pdb_hits_i = pdb_hits.emplace_after(pdb_hits_i,
++pdb_hit_id, structure_id, hit["alignment"]["identity"].as<double>(), hit["alignment"]["length"].as<int64_t>(), as_string(hit["pdb_asym_id"]), as_string(hit["pdb_id"]), hit["global_rmsd"].as<double>());

for (auto &transplant : hit["transplants"])
{
ep = std::current_exception();
// id, hit_id, asym_id, compound_id, analogue_id, entity_id, rmsd
transplants_i = transplants.emplace_after(transplants_i,
++transplant_id, pdb_hit_id, as_string(transplant["asym_id"]), as_string(transplant["compound_id"]), as_string(transplant["analogue_id"]), as_string(transplant["entity_id"]), transplant["local_rmsd"].as<double>());
}
}

q1.push({}); });
}
catch (const std::exception &ex)
{
std::clog << "\nError processing file " << di->path() << "\n";
}
}

for (auto &f : files)
{
if (ep)
std::rethrow_exception(ep);
// --------------------------------------------------------------------

q1.push(std::move(f));
}
pqxx::work tx(db_connection::instance());

std::string db_user = config.get("db-user");

std::ostringstream os_script;
os_script << schema->rdbuf();
std::string script_1 = os_script.str();

cif::replace_all(script_1, "$OWNER", config.get("db-user"));

tx.exec(script_1);

// --------------------------------------------------------------------
// Copy data, table by table

pqxx::stream_to s1 = pqxx::stream_to::table(tx, { "public", "af_structure" },
{"id", "name", "chunked", "af_version", "created", "af_file"});

for (auto &t : structures)
s1 << t;
s1.complete();

pqxx::stream_to s2 = pqxx::stream_to::table(tx, { "public", "af_pdb_hit" },
{"id", "af_id", "identity", "length", "pdb_asym_id", "pdb_id", "rmsd"});

for (auto &t : pdb_hits)
s2 << t;
s2.complete();

pqxx::stream_to s3 = pqxx::stream_to::table(tx, { "public", "af_transplant" },
{"id", "hit_id", "asym_id", "compound_id", "analogue_id", "entity_id", "rmsd"});

for (auto &t : transplants)
s3 << t;
s3.complete();

tx.commit();

// Create indices

q1.push({});
pqxx::work tx2(db_connection::instance());

for (auto &ti : tg)
ti.join();
auto schema2 = cif::load_resource("db-schema-tail.sql");
if (not schema2)
throw std::runtime_error("database schema not found (looking for db-schema-tail.sql)");

q2.push({});
std::ostringstream os2;
os2 << schema2->rdbuf();

t.join();
tx2.exec(os2.str());
tx2.commit();

if (ep)
std::rethrow_exception(ep);
// --------------------------------------------------------------------

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/data-service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class data_service

void start_queue(size_t nr_of_threads);

static int rebuild(const std::string &db_user, const std::filesystem::path &db_dir);
static int rebuild();

std::vector<compound> get_compounds(float min_identity) const;
std::vector<structure> get_structures(float min_identity, uint32_t page, uint32_t pageSize) const;
Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int rebuild_db_main(int argc, char *const argv[])

// --------------------------------------------------------------------

return data_service::rebuild(db_user, dbDir);
return data_service::rebuild();
}
#endif

Expand Down
2 changes: 1 addition & 1 deletion src/structure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void stripCifFile(const std::string &af_id, std::set<std::string> requestedAsyms
for (const auto &[asymID, entityID] : struct_asym.rows<std::string,std::string>("id", "entity_id"))
{
// check if this is a nonpoly entity
if (entity_poly.exists("entity_id"_key == entityID))
if (entity_poly.contains("entity_id"_key == entityID))
continue;

existingAsyms.insert(asymID);
Expand Down

0 comments on commit e6263f6

Please sign in to comment.