From ff08b4358be72c81c2f73999dff2ac77c8295690 Mon Sep 17 00:00:00 2001 From: Max Fellows Date: Wed, 29 May 2024 13:33:04 -0700 Subject: [PATCH] Rough testing of ClickHouse database for results. --- .../modules/cbm/cbmaggregatorlibpqxxwriter.h | 4 +- .../src/cbmaggregatorlibpqxxwriter.cpp | 141 +++++------------- 2 files changed, 38 insertions(+), 107 deletions(-) diff --git a/Source/moja.modules.cbm/include/moja/modules/cbm/cbmaggregatorlibpqxxwriter.h b/Source/moja.modules.cbm/include/moja/modules/cbm/cbmaggregatorlibpqxxwriter.h index d67fe151..0d78f3fe 100644 --- a/Source/moja.modules.cbm/include/moja/modules/cbm/cbmaggregatorlibpqxxwriter.h +++ b/Source/moja.modules.cbm/include/moja/modules/cbm/cbmaggregatorlibpqxxwriter.h @@ -60,7 +60,8 @@ namespace cbm { std::shared_ptr _spatialLocationInfo; - std::string _connectionString; + std::string _pgConnectionString; + std::string _chConnectionString; std::string _schema; Int64 _jobId; bool _isPrimaryAggregator; @@ -68,7 +69,6 @@ namespace cbm { template void load(pqxx::work& tx, - Int64 jobId, const std::string& table, std::shared_ptr dataDimension); diff --git a/Source/moja.modules.cbm/src/cbmaggregatorlibpqxxwriter.cpp b/Source/moja.modules.cbm/src/cbmaggregatorlibpqxxwriter.cpp index a6ce5ac0..b313634d 100644 --- a/Source/moja.modules.cbm/src/cbmaggregatorlibpqxxwriter.cpp +++ b/Source/moja.modules.cbm/src/cbmaggregatorlibpqxxwriter.cpp @@ -20,6 +20,7 @@ #include #include +#include #include using namespace pqxx; @@ -30,86 +31,42 @@ namespace moja { namespace modules { namespace cbm { - /** - * Configuration function - * - * Assign CBMAggregatorLibPQXXWriter._connectionString as variable "connection_string" in parameter config, \n - * CBMAggregatorLibPQXXWriter._schema as variable "schema" in parameter config, \n - * If parameter config has "drop_schema", assign it to CBMAggregatorLibPQXXWriter._dropSchema - * - * @param config DynamicObject& - * @return void - * ************************/ - void CBMAggregatorLibPQXXWriter::configure(const DynamicObject& config) { - _connectionString = config["connection_string"].convert(); + _pgConnectionString = config["connection_string"].convert(); + _chConnectionString = _pgConnectionString; + boost::replace_first(_chConnectionString, "5432", "5430"); + boost::replace_first(_chConnectionString, "dbname=postgres", ""); _schema = config["schema"].convert(); - - if (config.contains("drop_schema")) { - _dropSchema = config["drop_schema"]; - } } - /** - * Subscribes to the signals SystemInit, LocalDomainInit and SystemShutDown - * - * @param notificationCenter NotificationCenter& - * @return void - * ************************/ - void CBMAggregatorLibPQXXWriter::subscribe(NotificationCenter& notificationCenter) { notificationCenter.subscribe(signals::SystemInit, &CBMAggregatorLibPQXXWriter::onSystemInit, *this); notificationCenter.subscribe(signals::LocalDomainInit, &CBMAggregatorLibPQXXWriter::onLocalDomainInit, *this); notificationCenter.subscribe(signals::SystemShutdown, &CBMAggregatorLibPQXXWriter::onSystemShutdown, *this); } - /** - * Initiate System - * - * If CBMAggregatorLibPQXXWriter._isPrimaryAggregator and CBMAggregatorLibPQXXWriter._dropSchema are true \n - * drop CBMAggregatorLibPQXXWriter._schema \n - * Create CBMAggregatorLibPQXXWriter._schema - * - * @return void - * ************************/ - void CBMAggregatorLibPQXXWriter::doSystemInit() { if (!_isPrimaryAggregator) { return; } - connection conn(_connectionString); + connection pgConn(_pgConnectionString); + connection chConn(_chConnectionString); if (_dropSchema) { - doIsolated(conn, (boost::format("DROP SCHEMA %1% CASCADE;") % _schema).str(), true); + doIsolated(pgConn, (boost::format("DROP SCHEMA %1% CASCADE;") % _schema).str(), true); + doIsolated(chConn, (boost::format("DROP DATABASE %1%;") % _schema).str(), true); } - doIsolated(conn, (boost::format("CREATE SCHEMA %1%;") % _schema).str(), true); + doIsolated(pgConn, (boost::format("CREATE SCHEMA %1%;") % _schema).str(), true); + doIsolated(chConn, (boost::format("CREATE DATABASE %1%;") % _schema).str(), true); } - /** - * Initiate Local Domain - * - * Assign CBMAggregatorLibPQXXWriter._jobId the value of variable "job_id" in _landUnitData, \n - * if it exists, else to 0 - * - * @return void - * ************************/ void CBMAggregatorLibPQXXWriter::doLocalDomainInit() { _jobId = _landUnitData->hasVariable("job_id") ? _landUnitData->getVariable("job_id")->value().convert() : 0; } - /** - * doSystemShutDown - * - * If CBMAggregatorLibPQXXWriter._isPrimaryAggregator is true, create unlogged tables for the DateDimension, LandClassDimension, \n - * PoolDimension, ClassifierSetDimension, ModuleInfoDimension, LocationDimension, DisturbanceTypeDimension, \n - * DisturbanceDimension, Pools, Fluxes, ErrorDimension, AgeClassDimension, LocationErrorDimension, \n - * and AgeArea if they do not already exist, and load data into tables on PostgreSQL - * - * @return void - * ************************/ void CBMAggregatorLibPQXXWriter::doSystemShutdown() { if (!_isPrimaryAggregator) { return; @@ -121,16 +78,17 @@ namespace cbm { } MOJA_LOG_INFO << (boost::format("Loading results into %1% on server: %2%") - % _schema % _connectionString).str(); + % _schema % _pgConnectionString).str(); - connection conn(_connectionString); - doIsolated(conn, (boost::format("SET search_path = %1%;") % _schema).str()); + connection pgConn(_pgConnectionString); + connection chConn(_chConnectionString); + doIsolated(pgConn, (boost::format("SET search_path = %1%;") % _schema).str()); MOJA_LOG_INFO << "Creating results tables."; - doIsolated(conn, "CREATE UNLOGGED TABLE IF NOT EXISTS CompletedJobs (id BIGINT PRIMARY KEY);", false); + doIsolated(pgConn, "CREATE UNLOGGED TABLE IF NOT EXISTS CompletedJobs (id BIGINT PRIMARY KEY);", false); - bool resultsPreviouslyLoaded = perform([&conn, this] { - return !nontransaction(conn).exec((boost::format( + bool resultsPreviouslyLoaded = perform([&pgConn, this] { + return !nontransaction(pgConn).exec((boost::format( "SELECT 1 FROM CompletedJobs WHERE id = %1%;" ) % _jobId).str()).empty(); }); @@ -140,46 +98,40 @@ namespace cbm { return; } - perform([&conn, this] { - work tx(conn); + perform([&pgConn, &chConn, this] { + work pgTx(pgConn); + work chTx(chConn); // First, try to insert into the completed jobs table - if this is a duplicate, the transaction // will fail immediately. - tx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str()); + pgTx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str()); // Bulk load the job results into a temporary set of tables. std::vector tempTableDdl{ - (boost::format("CREATE UNLOGGED TABLE flux_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, %3%_previous VARCHAR, unfccc_land_class_previous VARCHAR, age_range_previous VARCHAR, disturbance_type VARCHAR, disturbance_code INTEGER, from_pool VARCHAR, to_pool VARCHAR, flux_tc NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ") % boost::join(*_classifierNames, "_previous VARCHAR, ")).str(), - (boost::format("CREATE UNLOGGED TABLE pool_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, pool VARCHAR, pool_tc NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(), - (boost::format("CREATE UNLOGGED TABLE error_%1% (year INTEGER, %2% VARCHAR, module VARCHAR, error VARCHAR, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(), - (boost::format("CREATE UNLOGGED TABLE age_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(), - (boost::format("CREATE UNLOGGED TABLE disturbance_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, %3%_previous VARCHAR, unfccc_land_class_previous VARCHAR, age_range_previous VARCHAR, disturbance_type VARCHAR, disturbance_code INTEGER, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ") % boost::join(*_classifierNames, "_previous VARCHAR, ")).str() + (boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_fluxes (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, %3%_previous VARCHAR PRIMARY KEY, unfccc_land_class_previous VARCHAR PRIMARY KEY, age_range_previous VARCHAR PRIMARY KEY, disturbance_type VARCHAR PRIMARY KEY, disturbance_code INTEGER PRIMARY KEY, from_pool VARCHAR PRIMARY KEY, to_pool VARCHAR PRIMARY KEY, flux_tc NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ") % boost::join(*_classifierNames, "_previous VARCHAR PRIMARY KEY, ")).str(), + (boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_pools (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, pool VARCHAR PRIMARY KEY, pool_tc NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(), + (boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_errors (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, module VARCHAR PRIMARY KEY, error VARCHAR PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(), + (boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_ages (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(), + (boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_disturbances (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, %3%_previous VARCHAR PRIMARY KEY, unfccc_land_class_previous VARCHAR PRIMARY KEY, age_range_previous VARCHAR PRIMARY KEY, disturbance_type VARCHAR PRIMARY KEY, disturbance_code INTEGER PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ") % boost::join(*_classifierNames, "_previous VARCHAR PRIMARY KEY, ")).str() }; for (const auto& ddl : tempTableDdl) { - tx.exec(ddl); + chTx.exec(ddl); } - load(tx, _jobId, "flux", _fluxDimension); - load(tx, _jobId, "pool", _poolDimension); - load(tx, _jobId, "error", _errorDimension); - load(tx, _jobId, "age", _ageDimension); - load(tx, _jobId, "disturbance", _disturbanceDimension); + load(chTx, (boost::format("%1%.raw_fluxes") % _schema).str(), _fluxDimension); + load(chTx, (boost::format("%1%.raw_pools") % _schema).str(), _poolDimension); + load(chTx, (boost::format("%1%.raw_errors") % _schema).str(), _errorDimension); + load(chTx, (boost::format("%1%.raw_ages") % _schema).str(), _ageDimension); + load(chTx, (boost::format("%1%.raw_disturbances") % _schema).str(), _disturbanceDimension); - tx.commit(); + pgTx.commit(); + chTx.commit(); }); MOJA_LOG_INFO << "PostgreSQL insert complete." << std::endl; } - /** - * Perform a single transaction using SQL commands (Overloaded function) - * - * @param conn connection_base& - * @param sql string - * @param optional bool - * @return void - * ************************/ void CBMAggregatorLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::string sql, bool optional) { perform([&conn, sql, optional] { try { @@ -194,15 +146,6 @@ namespace cbm { }); } - /** - * Perform transactions using SQL commands (Overloaded function) - * - * @param conn connection_base& - * @param sql vector - * @param optional bool - * @return void - * ************************/ - void CBMAggregatorLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::vector sql, bool optional) { perform([&conn, sql, optional] { try { @@ -220,26 +163,14 @@ namespace cbm { }); } - /** - * Load each record in paramter dataDimension into table (table name is based on parameter table and jobId) - * - * @param tx work& - * @param jobId Int64 - * @param table string - * @param dataDimension shared_ptr - * @return void - * ************************/ - template void CBMAggregatorLibPQXXWriter::load( pqxx::work& tx, - Int64 jobId, const std::string& table, std::shared_ptr dataDimension) { MOJA_LOG_INFO << (boost::format("Loading %1%") % table).str(); - auto tempTableName = (boost::format("%1%_%2%") % table % jobId).str(); - pqxx::stream_to stream(tx, tempTableName); + pqxx::stream_to stream(tx, table); auto records = dataDimension->records(); if (!records.empty()) { for (auto& record : records) {