Skip to content

Commit

Permalink
Rough testing of ClickHouse database for results.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfellows committed May 29, 2024
1 parent bedc920 commit ff08b43
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ namespace cbm {

std::shared_ptr<const flint::SpatialLocationInfo> _spatialLocationInfo;

std::string _connectionString;
std::string _pgConnectionString;
std::string _chConnectionString;
std::string _schema;
Int64 _jobId;
bool _isPrimaryAggregator;
bool _dropSchema;

template<typename TAccumulator>
void load(pqxx::work& tx,
Int64 jobId,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension);

Expand Down
141 changes: 36 additions & 105 deletions Source/moja.modules.cbm/src/cbmaggregatorlibpqxxwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <moja/hash.h>

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/format.hpp>

using namespace pqxx;
Expand All @@ -30,86 +31,42 @@ namespace moja {
namespace modules {
namespace cbm {

/**
* Configuration function
*
* Assign CBMAggregatorLibPQXXWriter._connectionString as variable "connection_string" in parameter config, \n
* CBMAggregatorLibPQXXWriter._schema as variable "schema" in parameter config, \n
* If parameter config has "drop_schema", assign it to CBMAggregatorLibPQXXWriter._dropSchema
*
* @param config DynamicObject&
* @return void
* ************************/

void CBMAggregatorLibPQXXWriter::configure(const DynamicObject& config) {
_connectionString = config["connection_string"].convert<std::string>();
_pgConnectionString = config["connection_string"].convert<std::string>();
_chConnectionString = _pgConnectionString;
boost::replace_first(_chConnectionString, "5432", "5430");
boost::replace_first(_chConnectionString, "dbname=postgres", "");
_schema = config["schema"].convert<std::string>();

if (config.contains("drop_schema")) {
_dropSchema = config["drop_schema"];
}
}

/**
* Subscribes to the signals SystemInit, LocalDomainInit and SystemShutDown
*
* @param notificationCenter NotificationCenter&
* @return void
* ************************/

void CBMAggregatorLibPQXXWriter::subscribe(NotificationCenter& notificationCenter) {
notificationCenter.subscribe(signals::SystemInit, &CBMAggregatorLibPQXXWriter::onSystemInit, *this);
notificationCenter.subscribe(signals::LocalDomainInit, &CBMAggregatorLibPQXXWriter::onLocalDomainInit, *this);
notificationCenter.subscribe(signals::SystemShutdown, &CBMAggregatorLibPQXXWriter::onSystemShutdown, *this);
}

/**
* Initiate System
*
* If CBMAggregatorLibPQXXWriter._isPrimaryAggregator and CBMAggregatorLibPQXXWriter._dropSchema are true \n
* drop CBMAggregatorLibPQXXWriter._schema \n
* Create CBMAggregatorLibPQXXWriter._schema
*
* @return void
* ************************/

void CBMAggregatorLibPQXXWriter::doSystemInit() {
if (!_isPrimaryAggregator) {
return;
}

connection conn(_connectionString);
connection pgConn(_pgConnectionString);
connection chConn(_chConnectionString);
if (_dropSchema) {
doIsolated(conn, (boost::format("DROP SCHEMA %1% CASCADE;") % _schema).str(), true);
doIsolated(pgConn, (boost::format("DROP SCHEMA %1% CASCADE;") % _schema).str(), true);
doIsolated(chConn, (boost::format("DROP DATABASE %1%;") % _schema).str(), true);
}

doIsolated(conn, (boost::format("CREATE SCHEMA %1%;") % _schema).str(), true);
doIsolated(pgConn, (boost::format("CREATE SCHEMA %1%;") % _schema).str(), true);
doIsolated(chConn, (boost::format("CREATE DATABASE %1%;") % _schema).str(), true);
}

/**
* Initiate Local Domain
*
* Assign CBMAggregatorLibPQXXWriter._jobId the value of variable "job_id" in _landUnitData, \n
* if it exists, else to 0
*
* @return void
* ************************/
void CBMAggregatorLibPQXXWriter::doLocalDomainInit() {
_jobId = _landUnitData->hasVariable("job_id")
? _landUnitData->getVariable("job_id")->value().convert<Int64>()
: 0;
}

/**
* doSystemShutDown
*
* If CBMAggregatorLibPQXXWriter._isPrimaryAggregator is true, create unlogged tables for the DateDimension, LandClassDimension, \n
* PoolDimension, ClassifierSetDimension, ModuleInfoDimension, LocationDimension, DisturbanceTypeDimension, \n
* DisturbanceDimension, Pools, Fluxes, ErrorDimension, AgeClassDimension, LocationErrorDimension, \n
* and AgeArea if they do not already exist, and load data into tables on PostgreSQL
*
* @return void
* ************************/
void CBMAggregatorLibPQXXWriter::doSystemShutdown() {
if (!_isPrimaryAggregator) {
return;
Expand All @@ -121,16 +78,17 @@ namespace cbm {
}

MOJA_LOG_INFO << (boost::format("Loading results into %1% on server: %2%")
% _schema % _connectionString).str();
% _schema % _pgConnectionString).str();

connection conn(_connectionString);
doIsolated(conn, (boost::format("SET search_path = %1%;") % _schema).str());
connection pgConn(_pgConnectionString);
connection chConn(_chConnectionString);
doIsolated(pgConn, (boost::format("SET search_path = %1%;") % _schema).str());

MOJA_LOG_INFO << "Creating results tables.";
doIsolated(conn, "CREATE UNLOGGED TABLE IF NOT EXISTS CompletedJobs (id BIGINT PRIMARY KEY);", false);
doIsolated(pgConn, "CREATE UNLOGGED TABLE IF NOT EXISTS CompletedJobs (id BIGINT PRIMARY KEY);", false);

bool resultsPreviouslyLoaded = perform([&conn, this] {
return !nontransaction(conn).exec((boost::format(
bool resultsPreviouslyLoaded = perform([&pgConn, this] {
return !nontransaction(pgConn).exec((boost::format(
"SELECT 1 FROM CompletedJobs WHERE id = %1%;"
) % _jobId).str()).empty();
});
Expand All @@ -140,46 +98,40 @@ namespace cbm {
return;
}

perform([&conn, this] {
work tx(conn);
perform([&pgConn, &chConn, this] {
work pgTx(pgConn);
work chTx(chConn);

// First, try to insert into the completed jobs table - if this is a duplicate, the transaction
// will fail immediately.
tx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str());
pgTx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str());

// Bulk load the job results into a temporary set of tables.
std::vector<std::string> tempTableDdl{
(boost::format("CREATE UNLOGGED TABLE flux_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, %3%_previous VARCHAR, unfccc_land_class_previous VARCHAR, age_range_previous VARCHAR, disturbance_type VARCHAR, disturbance_code INTEGER, from_pool VARCHAR, to_pool VARCHAR, flux_tc NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ") % boost::join(*_classifierNames, "_previous VARCHAR, ")).str(),
(boost::format("CREATE UNLOGGED TABLE pool_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, pool VARCHAR, pool_tc NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(),
(boost::format("CREATE UNLOGGED TABLE error_%1% (year INTEGER, %2% VARCHAR, module VARCHAR, error VARCHAR, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(),
(boost::format("CREATE UNLOGGED TABLE age_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ")).str(),
(boost::format("CREATE UNLOGGED TABLE disturbance_%1% (year INTEGER, %2% VARCHAR, unfccc_land_class VARCHAR, age_range VARCHAR, %3%_previous VARCHAR, unfccc_land_class_previous VARCHAR, age_range_previous VARCHAR, disturbance_type VARCHAR, disturbance_code INTEGER, area NUMERIC);") % _jobId % boost::join(*_classifierNames, " VARCHAR, ") % boost::join(*_classifierNames, "_previous VARCHAR, ")).str()
(boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_fluxes (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, %3%_previous VARCHAR PRIMARY KEY, unfccc_land_class_previous VARCHAR PRIMARY KEY, age_range_previous VARCHAR PRIMARY KEY, disturbance_type VARCHAR PRIMARY KEY, disturbance_code INTEGER PRIMARY KEY, from_pool VARCHAR PRIMARY KEY, to_pool VARCHAR PRIMARY KEY, flux_tc NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ") % boost::join(*_classifierNames, "_previous VARCHAR PRIMARY KEY, ")).str(),
(boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_pools (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, pool VARCHAR PRIMARY KEY, pool_tc NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(),
(boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_errors (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, module VARCHAR PRIMARY KEY, error VARCHAR PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(),
(boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_ages (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ")).str(),
(boost::format("CREATE TABLE IF NOT EXISTS %1%.raw_disturbances (year INTEGER PRIMARY KEY, %2% VARCHAR PRIMARY KEY, unfccc_land_class VARCHAR PRIMARY KEY, age_range VARCHAR PRIMARY KEY, %3%_previous VARCHAR PRIMARY KEY, unfccc_land_class_previous VARCHAR PRIMARY KEY, age_range_previous VARCHAR PRIMARY KEY, disturbance_type VARCHAR PRIMARY KEY, disturbance_code INTEGER PRIMARY KEY, area NUMERIC) ENGINE = SummingMergeTree;") % _schema % boost::join(*_classifierNames, " VARCHAR PRIMARY KEY, ") % boost::join(*_classifierNames, "_previous VARCHAR PRIMARY KEY, ")).str()
};

for (const auto& ddl : tempTableDdl) {
tx.exec(ddl);
chTx.exec(ddl);
}

load(tx, _jobId, "flux", _fluxDimension);
load(tx, _jobId, "pool", _poolDimension);
load(tx, _jobId, "error", _errorDimension);
load(tx, _jobId, "age", _ageDimension);
load(tx, _jobId, "disturbance", _disturbanceDimension);
load(chTx, (boost::format("%1%.raw_fluxes") % _schema).str(), _fluxDimension);
load(chTx, (boost::format("%1%.raw_pools") % _schema).str(), _poolDimension);
load(chTx, (boost::format("%1%.raw_errors") % _schema).str(), _errorDimension);
load(chTx, (boost::format("%1%.raw_ages") % _schema).str(), _ageDimension);
load(chTx, (boost::format("%1%.raw_disturbances") % _schema).str(), _disturbanceDimension);

tx.commit();
pgTx.commit();
chTx.commit();
});

MOJA_LOG_INFO << "PostgreSQL insert complete." << std::endl;
}

/**
* Perform a single transaction using SQL commands (Overloaded function)
*
* @param conn connection_base&
* @param sql string
* @param optional bool
* @return void
* ************************/
void CBMAggregatorLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::string sql, bool optional) {
perform([&conn, sql, optional] {
try {
Expand All @@ -194,15 +146,6 @@ namespace cbm {
});
}

/**
* Perform transactions using SQL commands (Overloaded function)
*
* @param conn connection_base&
* @param sql vector<string>
* @param optional bool
* @return void
* ************************/

void CBMAggregatorLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::vector<std::string> sql, bool optional) {
perform([&conn, sql, optional] {
try {
Expand All @@ -220,26 +163,14 @@ namespace cbm {
});
}

/**
* Load each record in paramter dataDimension into table (table name is based on parameter table and jobId)
*
* @param tx work&
* @param jobId Int64
* @param table string
* @param dataDimension shared_ptr<TAccumulator>
* @return void
* ************************/

template<typename TAccumulator>
void CBMAggregatorLibPQXXWriter::load(
pqxx::work& tx,
Int64 jobId,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension) {

MOJA_LOG_INFO << (boost::format("Loading %1%") % table).str();
auto tempTableName = (boost::format("%1%_%2%") % table % jobId).str();
pqxx::stream_to stream(tx, tempTableName);
pqxx::stream_to stream(tx, table);
auto records = dataDimension->records();
if (!records.empty()) {
for (auto& record : records) {
Expand Down

0 comments on commit ff08b43

Please sign in to comment.