diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..e43b0f988 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/.travis.yml b/.travis.yml index c7070a71f..6d6af98bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: generic sudo: required -dist: trusty +dist: xenial matrix: include: - os: linux @@ -16,8 +16,15 @@ matrix: - libev-dev - libsqlite-dev - libmysqlclient-dev + - libboost-all-dev + - flex + - bison + - pkg-config +before_script: +- wget https://ftp.fau.de/apache/thrift/0.13.0/thrift-0.13.0.tar.gz +- tar -xzf thrift-0.13.0.tar.gz +- pushd thrift-0.13.0 && ./configure --prefix=/usr --with-php=no --with-erlang=no --with-go=no --with-java=no --with-python=no --with-py3=no --with-ruby=no --with-nodejs=no --with-c_glib=no --with-cpp=yes && make && sudo make install && popd script: -- cd chronos - mkdir build - cd build - cmake .. && make diff --git a/CMake/Modules/FindLibThrift.cmake b/CMake/Modules/FindLibThrift.cmake new file mode 100644 index 000000000..6169633d0 --- /dev/null +++ b/CMake/Modules/FindLibThrift.cmake @@ -0,0 +1,7 @@ +find_package(PkgConfig REQUIRED) + +pkg_check_modules(LIBTHRIFT REQUIRED thrift) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(libthrift DEFAULT_MSG LIBTHRIFT_LIBRARIES LIBTHRIFT_INCLUDE_DIRS LIBTHRIFT_LIBDIR) +mark_as_advanced(LIBTHRIFT_INCLUDE_DIRS LIBTHRIFT_LIBRARIES LIBTHRIFT_LIBDIR) diff --git a/chronos/CMake/Modules/FindMySQLClient.cmake b/CMake/Modules/FindMySQLClient.cmake similarity index 100% rename from chronos/CMake/Modules/FindMySQLClient.cmake rename to CMake/Modules/FindMySQLClient.cmake diff --git a/chronos/CMake/Modules/FindSQLite.cmake b/CMake/Modules/FindSQLite.cmake similarity index 100% rename from chronos/CMake/Modules/FindSQLite.cmake rename to CMake/Modules/FindSQLite.cmake diff --git a/CMake/Modules/FindThriftCompiler.cmake b/CMake/Modules/FindThriftCompiler.cmake new file mode 100644 index 000000000..2491cbfa2 --- /dev/null +++ b/CMake/Modules/FindThriftCompiler.cmake @@ -0,0 +1,14 @@ +find_program(THRIFT_COMPILER NAMES thrift) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(thrift DEFAULT_MSG THRIFT_COMPILER) +mark_as_advanced(THRIFT_COMPILER) + +macro(thrift_compile FILENAME GENERATOR OUTPUTDIR) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${OUTPUTDIR}) + execute_process(COMMAND ${THRIFT_COMPILER} --gen ${GENERATOR} -out ${CMAKE_CURRENT_BINARY_DIR}/${OUTPUTDIR} ${CMAKE_CURRENT_SOURCE_DIR}/${FILENAME} + RESULT_VARIABLE RES) + if(RES) + message(FATAL_ERROR "Failed to compile ${FILENAME} with thrift generator ${GENERATOR}") + endif() +endmacro(thrift_compile) diff --git a/chronos/CMake/Modules/Findlibev.cmake b/CMake/Modules/Findlibev.cmake similarity index 100% rename from chronos/CMake/Modules/Findlibev.cmake rename to CMake/Modules/Findlibev.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 000000000..aa42cb776 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.1) +project(cron-job.org) + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/CMake/Modules") +set(CMAKE_CXX_STANDARD 14) + +add_subdirectory(protocol) +add_subdirectory(chronos) diff --git a/README.md b/README.md index bcdc0e8e7..171eeaf61 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Structure --------- * `database` contains the MySQL database structure. * `chronos` is cron-job.org's cron job execution daemon and is responsible for fetching the jobs. +* `protocol` contains the interface definitions for interaction between system nodes. * `web` contains the web interface (coming soon) chronos @@ -18,12 +19,18 @@ cron-job.org supports storing the job results for the user's convenience. This c The whole software is optimized on performance rather than on data integrity, i.e. when your server crashes or you have a power outage / hardware defect, the job history is most likely lost. Since this is volatile data anyway, it's not considered a big issue. +`chronos` can now run on multiple nodes. Each node requires an own MySQL server/database and stores its own jobs. The host +running the web interface also manages the user database and an association between job and node. The web interface can +create, delete, update and fetch jobs and job logs from the particular node via a Thrift-based protocol defined in the +`protocol` folder. + ### Prerequisites In order to build chronos, you need development files of: * curl (preferably with c-ares as resolver and libidn2 for IDN support) * libev * mysqlclient * sqlite3 +* thrift (compiler and libthrift) To build, you need a C++14 compiler and cmake. diff --git a/chronos/App.cpp b/chronos/App.cpp index 30552e8bb..cc88ba873 100644 --- a/chronos/App.cpp +++ b/chronos/App.cpp @@ -28,7 +28,10 @@ #include #include "UpdateThread.h" +#include "NotificationThread.h" #include "WorkerThread.h" +#include "NodeService.h" +#include "MasterService.h" #include "Config.h" namespace @@ -133,7 +136,7 @@ void App::processJobs(time_t forTime, time_t plannedTime) std::shared_ptr wt = std::make_shared(t->tm_mday, t->tm_mon+1, t->tm_year+1900, t->tm_hour, t->tm_min); MYSQL_ROW row; - auto res = db->query("SELECT DISTINCT(`timezone`) FROM `user`"); + auto res = db->query("SELECT DISTINCT(`timezone`) FROM `job` WHERE `enabled`=1"); while((row = res->fetchRow()) != nullptr) { std::string timeZone(row[0]); @@ -150,14 +153,14 @@ void App::processJobs(time_t forTime, time_t plannedTime) int wday = -1; switch(cWDay) { - case cctz::weekday::monday: wday = 1; break; + case cctz::weekday::monday: wday = 1; break; case cctz::weekday::tuesday: wday = 2; break; case cctz::weekday::wednesday: wday = 3; break; case cctz::weekday::thursday: wday = 4; break; - case cctz::weekday::friday: wday = 5; break; + case cctz::weekday::friday: wday = 5; break; case cctz::weekday::saturday: wday = 6; break; - case cctz::weekday::sunday: wday = 0; break; - default: wday = -1; break; + case cctz::weekday::sunday: wday = 0; break; + default: wday = -1; break; } processJobsForTimeZone(civilTime.hour(), civilTime.minute(), civilTime.month(), civilTime.day(), wday, civilTime.year(), @@ -188,13 +191,12 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int << "timeZone = " << timeZone << std::endl; - auto res = db->query("SELECT TRIM(`url`),`job`.`jobid`,`auth_enable`,`auth_user`,`auth_pass`,`notify_failure`,`notify_success`,`notify_disable`,`fail_counter`,`save_responses`,`job`.`userid`,`request_method`,COUNT(`job_header`.`jobheaderid`),`job_body`.`body` FROM `job` " + auto res = db->query("SELECT TRIM(`url`),`job`.`jobid`,`auth_enable`,`auth_user`,`auth_pass`,`notify_failure`,`notify_success`,`notify_disable`,`fail_counter`,`save_responses`,`job`.`userid`,`request_method`,COUNT(`job_header`.`jobheaderid`),`job_body`.`body`,`title` FROM `job` " "INNER JOIN `job_hours` ON `job_hours`.`jobid`=`job`.`jobid` " "INNER JOIN `job_mdays` ON `job_mdays`.`jobid`=`job`.`jobid` " "INNER JOIN `job_wdays` ON `job_wdays`.`jobid`=`job`.`jobid` " "INNER JOIN `job_minutes` ON `job_minutes`.`jobid`=`job`.`jobid` " "INNER JOIN `job_months` ON `job_months`.`jobid`=`job`.`jobid` " - "INNER JOIN `user` ON `job`.`userid`=`user`.`userid` " "LEFT JOIN `job_header` ON `job_header`.`jobid`=`job`.`jobid` " "LEFT JOIN `job_body` ON `job_body`.`jobid`=`job`.`jobid` " "WHERE (`hour`=-1 OR `hour`=%d) " @@ -202,7 +204,7 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int "AND (`mday`=-1 OR `mday`=%d) " "AND (`wday`=-1 OR `wday`=%d) " "AND (`month`=-1 OR `month`=%d) " - "AND `user`.`timezone`='%q' " + "AND `job`.`timezone`='%q' " "AND `enabled`=1 " "GROUP BY `job`.`jobid` " "ORDER BY `fail_counter` ASC, `last_duration` ASC", @@ -247,6 +249,8 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int req->requestBody = row[13]; } + req->result->title = row[14]; + wt->addJob(req); } } @@ -256,6 +260,16 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int std::cout << "App::processJobsForTimeZone(): Finished" << std::endl; } +void App::cleanUpNotifications() +{ + std::cout << "App::cleanUpNotifications()" << std::endl; + + static constexpr const int TIME_ONE_DAY = 86400; + + db->query("DELETE FROM `notification` WHERE `date` < UNIX_TIMESTAMP()-%d", + TIME_ONE_DAY); +} + void App::signalHandler(int sig) { if(sig == SIGINT) @@ -270,43 +284,77 @@ int App::run() MySQL_DB::libInit(); initSSLLocks(); - db = createMySQLConnection(); - startUpdateThread(); - signal(SIGINT, App::signalHandler); - bool firstLoop = true; - struct tm lastTime = { 0 }; - int jitterCorrectionOffset = calcJitterCorrectionOffset(); - while(!stop) + if(config->getInt("master_service_enable")) { - time_t currentTime = time(nullptr) + jitterCorrectionOffset; - struct tm *t = localtime(¤tTime); - - if(t->tm_min > lastTime.tm_min - || t->tm_hour > lastTime.tm_hour - || t->tm_mday > lastTime.tm_mday - || t->tm_mon > lastTime.tm_mon - || t->tm_year > lastTime.tm_year) + startMasterServiceThread(); + } + + if(config->getInt("node_service_enable")) + { + startNodeServiceThread(); + } + + if(config->getInt("job_executor_enable")) + { + db = createMySQLConnection(); + startNotificationThread(); + startUpdateThread(); + + bool firstLoop = true; + struct tm lastTime = { 0 }; + int jitterCorrectionOffset = calcJitterCorrectionOffset(); + while(!stop) { - // update last time - memcpy(&lastTime, t, sizeof(struct tm)); + time_t currentTime = time(nullptr) + jitterCorrectionOffset; + struct tm *t = localtime(¤tTime); + + if(t->tm_min > lastTime.tm_min + || t->tm_hour > lastTime.tm_hour + || t->tm_mday > lastTime.tm_mday + || t->tm_mon > lastTime.tm_mon + || t->tm_year > lastTime.tm_year) + { + // update last time + memcpy(&lastTime, t, sizeof(struct tm)); + + if(!firstLoop || t->tm_sec == 59 - jitterCorrectionOffset) + { + processJobs(currentTime, currentTime - t->tm_sec); + jitterCorrectionOffset = calcJitterCorrectionOffset(); - if(!firstLoop || t->tm_sec == 59 - jitterCorrectionOffset) + cleanUpNotifications(); + } + + firstLoop = false; + } + else { - processJobs(currentTime, currentTime - t->tm_sec); - jitterCorrectionOffset = calcJitterCorrectionOffset(); + usleep(100*1000); } - - firstLoop = false; } - else + + stopUpdateThread(); + stopNotificationThread(); + } + else + { + while(!stop) { usleep(100*1000); } } - this->stopUpdateThread(); + if(config->getInt("node_service_enable")) + { + stopNodeServiceThread(); + } + + if(config->getInt("master_service_enable")) + { + stopMasterServiceThread(); + } uninitSSLLocks(); MySQL_DB::libCleanup(); @@ -335,6 +383,59 @@ void App::updateThreadMain() } } +void App::notificationThreadMain() +{ + try + { + notificationThreadObj = std::make_unique(); + notificationThreadObj->run(); + notificationThreadObj.reset(); + } + catch(const std::runtime_error &ex) + { + std::cout << "Notification thread runtime error: " << ex.what() << std::endl; + stop = true; + } +} + +void App::nodeServiceThreadMain() +{ + std::cout << "App::nodeServiceThreadMain(): Entered" << std::endl; + + try + { + nodeServiceObj = std::make_unique(config->get("node_service_interface"), config->getInt("node_service_port")); + nodeServiceObj->run(); + nodeServiceObj.reset(); + } + catch(const std::runtime_error &ex) + { + std::cout << "Node service thread runtime error: " << ex.what() << std::endl; + stop = true; + } + + std::cout << "App::nodeServiceThreadMain(): Finished" << std::endl; +} + +void App::masterServiceThreadMain() +{ + std::cout << "App::masterServiceThreadMain(): Entered" << std::endl; + + try + { + masterServiceObj = std::make_unique(config->get("master_service_interface"), config->getInt("master_service_port")); + masterServiceObj->run(); + masterServiceObj.reset(); + } + catch(const std::runtime_error &ex) + { + std::cout << "Master service thread runtime error: " << ex.what() << std::endl; + stop = true; + } + + std::cout << "App::masterServiceThreadMain(): Finished" << std::endl; +} + void App::startUpdateThread() { updateThread = std::thread(std::bind(&App::updateThreadMain, this)); @@ -346,6 +447,39 @@ void App::stopUpdateThread() updateThread.join(); } +void App::startNotificationThread() +{ + notificationThread = std::thread(std::bind(&App::notificationThreadMain, this)); +} + +void App::stopNotificationThread() +{ + notificationThreadObj->stopThread(); + notificationThread.join(); +} + +void App::startNodeServiceThread() +{ + nodeServiceThread = std::thread(std::bind(&App::nodeServiceThreadMain, this)); +} + +void App::stopNodeServiceThread() +{ + nodeServiceObj->stop(); + nodeServiceThread.join(); +} + +void App::startMasterServiceThread() +{ + masterServiceThread = std::thread(std::bind(&App::masterServiceThreadMain, this)); +} + +void App::stopMasterServiceThread() +{ + masterServiceObj->stop(); + masterServiceThread.join(); +} + std::unique_ptr App::createMySQLConnection() { return(std::make_unique(config->get("mysql_host"), @@ -354,3 +488,12 @@ std::unique_ptr App::createMySQLConnection() config->get("mysql_db"), config->get("mysql_sock"))); } + +std::unique_ptr App::createMasterMySQLConnection() +{ + return(std::make_unique(config->get("master_mysql_host"), + config->get("master_mysql_user"), + config->get("master_mysql_pass"), + config->get("master_mysql_db"), + config->get("master_mysql_sock"))); +} diff --git a/chronos/App.h b/chronos/App.h index e4f17bb4e..be3464430 100644 --- a/chronos/App.h +++ b/chronos/App.h @@ -24,7 +24,10 @@ namespace Chronos { class MySQL_DB; class UpdateThread; + class NotificationThread; class WorkerThread; + class NodeService; + class MasterService; class App { @@ -42,15 +45,26 @@ namespace Chronos static App *getInstance(); static void signalHandler(int sig); void updateThreadMain(); + void notificationThreadMain(); + void nodeServiceThreadMain(); + void masterServiceThreadMain(); int run(); std::unique_ptr createMySQLConnection(); + std::unique_ptr createMasterMySQLConnection(); private: void startUpdateThread(); void stopUpdateThread(); + void startNotificationThread(); + void stopNotificationThread(); + void startNodeServiceThread(); + void stopNodeServiceThread(); + void startMasterServiceThread(); + void stopMasterServiceThread(); void processJobs(time_t forTime, time_t plannedTime); void processJobsForTimeZone(int hour, int minute, int month, int mday, int wday, int year, time_t timestamp, const std::string &timeZone, const std::shared_ptr &wt); + void cleanUpNotifications(); int calcJitterCorrectionOffset(); public: @@ -60,8 +74,14 @@ namespace Chronos bool stop = false; static App *instance; std::thread updateThread; + std::thread notificationThread; + std::thread nodeServiceThread; + std::thread masterServiceThread; std::unique_ptr db; std::unique_ptr updateThreadObj; + std::unique_ptr notificationThreadObj; + std::unique_ptr nodeServiceObj; + std::unique_ptr masterServiceObj; }; }; diff --git a/chronos/CMakeLists.txt b/chronos/CMakeLists.txt index 6a65bdda3..799470a9e 100644 --- a/chronos/CMakeLists.txt +++ b/chronos/CMakeLists.txt @@ -1,9 +1,4 @@ -cmake_minimum_required(VERSION 3.1) -project(chronos) - -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/CMake/Modules") - -set(CMAKE_CXX_STANDARD 14) +set(NAME chronos) set(SOURCES main.cpp @@ -16,6 +11,9 @@ set(SOURCES UpdateThread.cpp Utils.cpp WorkerThread.cpp + NodeService.cpp + MasterService.cpp + NotificationThread.cpp ) set(CCTZ_SOURCES @@ -30,7 +28,7 @@ set(CCTZ_SOURCES cctz/src/time_zone_posix.cc ) -add_executable(${PROJECT_NAME} ${SOURCES} ${CCTZ_SOURCES}) +add_executable(${NAME} ${SOURCES} ${CCTZ_SOURCES}) find_package(MySQLClient REQUIRED) find_package(CURL REQUIRED) @@ -47,16 +45,17 @@ include_directories( ${SQLITE_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ) -target_link_libraries(${PROJECT_NAME} +target_link_libraries(${NAME} ${MySQLClient_LIBRARIES} ${CURL_LIBRARIES} ${LIBEV_LIBRARIES} ${SQLITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_CRYPTO_LIBRARY} + protocol ) -include_directories(${PROJECT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -install(TARGETS ${PROJECT_NAME} DESTINATION bin) +install(TARGETS ${NAME} DESTINATION bin) diff --git a/chronos/HTTPRequest.cpp b/chronos/HTTPRequest.cpp index c0118d7a6..94ba39456 100644 --- a/chronos/HTTPRequest.cpp +++ b/chronos/HTTPRequest.cpp @@ -226,7 +226,7 @@ void HTTPRequest::submit(CURLM *curlMultiHandle) curl_easy_setopt(easy, CURLOPT_CAINFO, NULL); curl_easy_setopt(easy, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); - if(requestMethod == RequestMethod::POST || requestMethod == RequestMethod::PUT || requestMethod == RequestMethod::PATCH) + if(requestMethod == RequestMethod::POST || requestMethod == RequestMethod::PUT || requestMethod == RequestMethod::PATCH || requestMethod == RequestMethod::DELETE) { curl_easy_setopt(easy, CURLOPT_POSTFIELDSIZE, requestBody.size()); curl_easy_setopt(easy, CURLOPT_POSTFIELDS, requestBody.c_str()); @@ -256,6 +256,7 @@ void HTTPRequest::submit(CURLM *curlMultiHandle) break; case RequestMethod::DELETE: + curl_easy_setopt(easy, CURLOPT_POST, 1); curl_easy_setopt(easy, CURLOPT_CUSTOMREQUEST, "DELETE"); break; diff --git a/chronos/JobResult.h b/chronos/JobResult.h index 290d51835..770b37c74 100644 --- a/chronos/JobResult.h +++ b/chronos/JobResult.h @@ -41,6 +41,7 @@ namespace Chronos uint64_t dateDone = 0 ; // in ms int jitter = 0; // in ms std::string url; + std::string title; int duration = 0; // in ms JobStatus_t status = JOBSTATUS_UNKNOWN; int httpStatus = 0; diff --git a/chronos/MasterService.cpp b/chronos/MasterService.cpp new file mode 100644 index 000000000..dc23d14f2 --- /dev/null +++ b/chronos/MasterService.cpp @@ -0,0 +1,148 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017-2019 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "MasterService.h" + +#include + +#include +#include +#include +#include + +#include "ChronosMaster.h" +#include "App.h" +#include "Utils.h" + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; + +namespace { + +class ChronosMasterHandler : virtual public ChronosMasterIf +{ +public: + ChronosMasterHandler() + { + } + + bool ping() override + { + std::cout << "ChronosMasterHandler::ping()" << std::endl; + return true; + } + + void reportNodeStats(const int32_t nodeId, const NodeStatsEntry &stats) override + { + using namespace Chronos; + + std::cout << "ChronosMasterHandler::reportNodeStats(" << nodeId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMasterMySQLConnection()); + + db->query("REPLACE INTO `nodestats`(`nodeid`,`d`,`m`,`y`,`h`,`i`,`jobs`,`jitter`) VALUES(%v,%d,%d,%d,%d,%d,%v,%f)", + nodeId, + stats.d, stats.m, stats.y, stats.h, stats.i, + stats.jobs, stats.jitter); + } + catch(const std::exception &ex) + { + std::cout << "ChronosMasterHandler::reportNodeStats(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void getUserDetails(UserDetails &_return, const int64_t userId) override + { + using namespace Chronos; + + std::cout << "ChronosMasterHandler::getUserDetails(" << userId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMasterMySQLConnection()); + + MYSQL_ROW row; + auto res = db->query("SELECT `userid`,`email`,`firstname`,`lastname`,`lastlogin_lang` " + "FROM `user` WHERE `userid`=%v", + userId); + if(res->numRows() == 0) + throw ResourceNotFound(); + while((row = res->fetchRow())) + { + _return.userId = std::stoll(row[0]); + _return.email = row[1]; + _return.firstName = row[2]; + _return.lastName = row[3]; + _return.language = row[4]; + } + } + catch(const std::exception &ex) + { + std::cout << "ChronosMasterHandler::getUserDetails(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void getPhrases(Phrases &_return) override + { + using namespace Chronos; + + std::cout << "ChronosMasterHandler::getPhrases()" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMasterMySQLConnection()); + + MYSQL_ROW row; + auto res = db->query("SELECT `lang`,`key`,`value` FROM `phrases`"); + while((row = res->fetchRow())) + { + _return.phrases[row[0]][row[1]] = row[2]; + } + } + catch(const std::exception &ex) + { + std::cout << "ChronosMasterHandler::getPhrases(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } +}; + +} + +namespace Chronos { + +MasterService::MasterService(const std::string &interface, int port) + : server(std::make_shared( + std::make_shared(std::make_shared()), + std::make_shared(interface, port), + std::make_shared(), + std::make_shared() + )) +{ +} + +void MasterService::run() +{ + server->serve(); +} + +void MasterService::stop() +{ + server->stop(); +} + +} // Chronos \ No newline at end of file diff --git a/chronos/MasterService.h b/chronos/MasterService.h new file mode 100644 index 000000000..6c2f3d25e --- /dev/null +++ b/chronos/MasterService.h @@ -0,0 +1,36 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017-2019 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _MASTERSERVICE_H_ +#define _MASTERSERVICE_H_ + +#include +#include + +namespace apache { namespace thrift { namespace server { class TThreadedServer; } } } + +namespace Chronos +{ + class MasterService + { + public: + MasterService(const std::string &interface, int port); + + public: + void run(); + void stop(); + + private: + std::shared_ptr<::apache::thrift::server::TThreadedServer> server; + }; +}; + +#endif diff --git a/chronos/MySQL_DB.cpp b/chronos/MySQL_DB.cpp index 6cc418590..b5acc3541 100644 --- a/chronos/MySQL_DB.cpp +++ b/chronos/MySQL_DB.cpp @@ -144,6 +144,9 @@ std::unique_ptr MySQL_DB::query(const char *szQuery, ...) case 'u': strQuery.append(std::to_string(va_arg(arglist, unsigned long))); break; + case 'v': + strQuery.append(std::to_string(va_arg(arglist, long long))); + break; case 'q': szArg = va_arg(arglist, char *); szBuff2 = new char[strlen(szArg)*2+1]; diff --git a/chronos/NodeService.cpp b/chronos/NodeService.cpp new file mode 100644 index 000000000..d2630dc68 --- /dev/null +++ b/chronos/NodeService.cpp @@ -0,0 +1,612 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017-2019 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "NodeService.h" + +#include + +#include +#include +#include +#include + +#include "ChronosNode.h" +#include "App.h" +#include "Utils.h" +#include "SQLite.h" + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; + +namespace { + +class ChronosNodeHandler : virtual public ChronosNodeIf +{ +public: + ChronosNodeHandler() + : userDbFilePathScheme(Chronos::App::getInstance()->config->get("user_db_file_path_scheme")), + userDbFileNameScheme(Chronos::App::getInstance()->config->get("user_db_file_name_scheme")) + { + } + + bool ping() override + { + std::cout << "ChronosNodeHandler::ping()" << std::endl; + return true; + } + + void getJobsForUser(std::vector &_return, const int64_t userId) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::getJobsForUser(" << userId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + MYSQL_ROW row; + auto res = db->query("SELECT `jobid`,`userid`,`enabled`,`title`,`save_responses`,`last_status`,`last_fetch`,`last_duration`,`fail_counter`,`url`,`request_method`,`timezone` FROM `job` WHERE `userid`=%v", + userId); + _return.reserve(res->numRows()); + while((row = res->fetchRow())) + { + Job job; + + job.identifier.jobId = std::stoll(row[0]); + job.identifier.userId = std::stoll(row[1]); + + job.metaData.enabled = std::strcmp(row[2], "1") == 0; + job.metaData.title = row[3]; + job.metaData.saveResponses = std::strcmp(row[4], "1") == 0; + job.__isset.metaData = true; + + job.executionInfo.lastStatus = static_cast(std::stoi(row[5])); //!< @todo Nicer conversion + job.executionInfo.lastFetch = std::stoll(row[6]); + job.executionInfo.lastDuration = std::stoi(row[7]); + job.executionInfo.failCounter = std::stoi(row[8]); + job.__isset.executionInfo = true; + + job.data.url = row[9]; + job.data.requestMethod = static_cast(std::stoi(row[10])); //!< @todo Nicer conversion + job.__isset.data = true; + + job.schedule.timezone = row[11]; + job.__isset.schedule = true; + + getJobSchedule(db, job.identifier, "hour", job.schedule.hours); + getJobSchedule(db, job.identifier, "mday", job.schedule.mdays); + getJobSchedule(db, job.identifier, "minute", job.schedule.minutes); + getJobSchedule(db, job.identifier, "month", job.schedule.months); + getJobSchedule(db, job.identifier, "wday", job.schedule.wdays); + + _return.push_back(job); + } + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::getJobsForUser(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void getJobDetails(Job &_return, const JobIdentifier &identifier) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::getJobDetails(" << identifier.jobId << ", " << identifier.userId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + MYSQL_ROW row; + auto res = db->query("SELECT `jobid`,`userid`,`enabled`,`title`,`save_responses`,`last_status`,`last_fetch`," + "`last_duration`,`fail_counter`,`url`,`request_method`,`auth_enable`,`auth_user`,`auth_pass`," + "`notify_failure`,`notify_success`,`notify_disable`,`timezone` " + "FROM `job` WHERE `jobid`=%v AND `userid`=%v", + identifier.jobId, + identifier.userId); + if(res->numRows() == 0) + throw ResourceNotFound(); + while((row = res->fetchRow())) + { + _return.identifier.jobId = std::stoll(row[0]); + _return.identifier.userId = std::stoll(row[1]); + + _return.metaData.enabled = std::strcmp(row[2], "1") == 0; + _return.metaData.title = row[3]; + _return.metaData.saveResponses = std::strcmp(row[4], "1") == 0; + _return.__isset.metaData = true; + + _return.executionInfo.lastStatus = static_cast(std::stoi(row[5])); //!< @todo Nicer conversion + _return.executionInfo.lastFetch = std::stoll(row[6]); + _return.executionInfo.lastDuration = std::stoi(row[7]); + _return.executionInfo.failCounter = std::stoi(row[8]); + _return.__isset.executionInfo = true; + + _return.data.url = row[9]; + _return.data.requestMethod = static_cast(std::stoi(row[10])); //!< @todo Nicer conversion + _return.__isset.data = true; + + _return.authentication.enable = std::strcmp(row[11], "1") == 0; + _return.authentication.user = row[12]; + _return.authentication.password = row[13]; + _return.__isset.authentication = true; + + _return.notification.onFailure = std::strcmp(row[14], "1") == 0; + _return.notification.onSuccess = std::strcmp(row[15], "1") == 0; + _return.notification.onDisable = std::strcmp(row[16], "1") == 0; + _return.__isset.notification = true; + + _return.schedule.timezone = row[17]; + _return.__isset.schedule = true; + } + + getJobSchedule(db, identifier, "hour", _return.schedule.hours); + getJobSchedule(db, identifier, "mday", _return.schedule.mdays); + getJobSchedule(db, identifier, "minute", _return.schedule.minutes); + getJobSchedule(db, identifier, "month", _return.schedule.months); + getJobSchedule(db, identifier, "wday", _return.schedule.wdays); + + res = db->query("SELECT `body` FROM `job_body` WHERE `jobid`=%v", + identifier.jobId); + while((row = res->fetchRow())) + { + _return.extendedData.body = row[0]; + } + + res = db->query("SELECT `key`,`value` FROM `job_header` WHERE `jobid`=%v", + identifier.jobId); + while((row = res->fetchRow())) + { + _return.extendedData.headers.emplace(row[0], row[1]); + } + + _return.__isset.extendedData = true; + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::getJobDetails(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void createOrUpdateJob(const Job &job) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::createOrUpdateJob(" << job.identifier.jobId << ", " << job.identifier.userId << ")" << std::endl; + + if(job.identifier.userId <= 0 || job.identifier.jobId <= 0) + throw InvalidArguments(); + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + db->query("BEGIN"); + + const auto jobUser = jobUserId(db, job.identifier.jobId); + if(jobUser == -1) + { + db->query("INSERT INTO `job`(`jobid`,`userid`) VALUES(%v,%v)", + job.identifier.jobId, + job.identifier.userId); + } + else if (jobUser != job.identifier.userId) + { + throw Forbidden(); + } + + if(job.__isset.metaData) + { + db->query("UPDATE `job` SET `enabled`=%d, `title`='%q', `save_responses`=%d WHERE `jobid`=%v", + job.metaData.enabled ? 1 : 0, + job.metaData.title.c_str(), + job.metaData.saveResponses ? 1 : 0, + job.identifier.jobId); + } + + if(job.__isset.authentication) + { + db->query("UPDATE `job` SET `auth_enable`=%d, `auth_user`='%q', `auth_pass`='%q' WHERE `jobid`=%v", + job.authentication.enable ? 1 : 0, + job.authentication.user.c_str(), + job.authentication.password.c_str(), + job.identifier.jobId); + } + + if(job.__isset.notification) + { + db->query("UPDATE `job` SET `notify_failure`=%d, `notify_success`=%d, `notify_disable`=%d WHERE `jobid`=%v", + job.notification.onFailure ? 1 : 0, + job.notification.onSuccess ? 1 : 0, + job.notification.onDisable ? 1 : 0, + job.identifier.jobId); + } + + if(job.__isset.schedule) + { + db->query("UPDATE `job` SET `timezone`='%q' WHERE `jobid`=%v", + job.schedule.timezone.c_str(), + job.identifier.jobId); + + saveJobSchedule(db, job.identifier, "hour", job.schedule.hours); + saveJobSchedule(db, job.identifier, "mday", job.schedule.mdays); + saveJobSchedule(db, job.identifier, "minute", job.schedule.minutes); + saveJobSchedule(db, job.identifier, "month", job.schedule.months); + saveJobSchedule(db, job.identifier, "wday", job.schedule.wdays); + } + + if(job.__isset.data) + { + db->query("UPDATE `job` SET `url`='%q',`request_method`=%d WHERE `jobid`=%v", + job.data.url.c_str(), + static_cast(job.data.requestMethod), //!< @todo Nicer conversion + job.identifier.jobId); + } + + if(job.__isset.extendedData) + { + if(Utils::trim(job.extendedData.body).length() == 0) + { + db->query("DELETE FROM `job_body` WHERE `jobid`=%v", + job.identifier.jobId); + } + else + { + db->query("REPLACE INTO `job_body`(`jobid`,`body`) VALUES(%v,'%q')", + job.identifier.jobId, + job.extendedData.body.c_str()); + } + + db->query("DELETE FROM `job_header` WHERE `jobid`=%v", + job.identifier.jobId); + for(const auto &header : job.extendedData.headers) + { + db->query("INSERT INTO `job_header`(`jobid`,`key`,`value`) VALUES(%v,'%q','%q')", + job.identifier.jobId, + header.first.c_str(), + header.second.c_str()); + } + } + + db->query("COMMIT"); + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::createOrUpdateJob(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void deleteJob(const JobIdentifier &identifier) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::deleteJob(" << identifier.jobId << ", " << identifier.userId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + if(!jobExists(db, identifier)) + throw ResourceNotFound(); + + db->query("BEGIN"); + db->query("DELETE FROM `notification` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_hours` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_mdays` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_minutes` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_months` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_wdays` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_body` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job_header` WHERE `jobid`=%v", identifier.jobId); + db->query("DELETE FROM `job` WHERE `jobid`=%v", identifier.jobId); + db->query("COMMIT"); + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::deleteJob(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void getJobLog(std::vector &_return, const JobIdentifier &identifier, const int16_t maxEntries) override + { + static constexpr const int TIME_ONE_DAY = 86400; + + std::cout << "ChronosNodeHandler::getJobLog(" << identifier.jobId << ", " << identifier.userId << ", " << maxEntries << ")" << std::endl; + + if(maxEntries <= 0) + throw InvalidArguments(); + + try + { + //! @note No verification of identifier here since we look in the user DB and thus cannot accidentally fetch data + //! for a different user. Also, we need to accept jobId == 0 to fetch logs for all the user's jobs. + //! @note To account for different time zones, we fetch logs from tomorrow, today and yesterday (from GMT PoV). + + struct tm tmYesterday = timeStruct(- TIME_ONE_DAY); + struct tm tmToday = timeStruct(0); + struct tm tmTomorrow = timeStruct(TIME_ONE_DAY); + + getJobLogForDay(_return, identifier, tmTomorrow.tm_mday, tmTomorrow.tm_mon, maxEntries); + getJobLogForDay(_return, identifier, tmToday.tm_mday, tmToday.tm_mon, maxEntries - std::min(static_cast(maxEntries), _return.size())); + getJobLogForDay(_return, identifier, tmYesterday.tm_mday, tmYesterday.tm_mon, maxEntries - std::min(static_cast(maxEntries), _return.size())); + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::getJobLog(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void getJobLogDetails(JobLogEntry &_return, const int64_t userId, const int16_t mday, const int16_t month, const int64_t jobLogId) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::getJobLogDetails(" << userId << ", " << mday << ", " << month << ", " << jobLogId << ")" << std::endl; + + std::string dbFilePath = Utils::userDbFilePath(userDbFilePathScheme, userDbFileNameScheme, userId, mday, month); + std::unique_ptr userDB; + + try + { + userDB = std::make_unique(dbFilePath.c_str(), true /* read only */); + + auto stmt = userDB->prepare("SELECT * FROM \"joblog\" LEFT JOIN \"joblog_response\" ON \"joblog_response\".\"joblogid\"=\"joblog\".\"joblogid\" WHERE \"joblog\".\"joblogid\"=:joblogid"); + stmt->bind(":joblogid", jobLogId); + + while(stmt->execute()) + { + _return = convertToJobLogEntry(stmt, userId, mday, month); + + if(stmt->hasField("headers")) + _return.headers = stmt->stringValue("headers"); + if(stmt->hasField("body")) + _return.body = stmt->stringValue("body"); + + _return.__isset.headers = true; + _return.__isset.body = true; + + return; + } + + throw ResourceNotFound(); + } + catch(const std::exception &ex) + { + throw ResourceNotFound(); + } + } + + void getNotifications(std::vector &_return, const int64_t userId, const int16_t maxEntries) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::getNotifications(" << userId << ")" << std::endl; + + if(userId <= 0) + throw InvalidArguments(); + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + MYSQL_ROW row; + auto res = db->query("SELECT `notification`.`joblogid`,`notification`.`jobid`,`job`.`userid`,`notification`.`date`,`notification`.`type`,`notification`.`date_started`,`notification`.`date_planned`,`notification`.`url`,`notification`.`execution_status`,`notification`.`execution_status_text`,`notification`.`execution_http_status` " + "FROM `notification` " + "INNER JOIN `job` ON `job`.`jobid`=`notification`.`jobid` " + "WHERE `job`.`userid`=%v " + "ORDER BY `notification`.`notificationid` DESC LIMIT %u", + userId, + static_cast(maxEntries)); + _return.reserve(res->numRows()); + while((row = res->fetchRow())) + { + NotificationEntry n; + + n.notificationId = std::stoll(row[0]); + + n.jobIdentifier.jobId = std::stoll(row[1]); + n.jobIdentifier.userId = std::stoll(row[2]); + + n.date = std::stoll(row[3]); + + n.type = static_cast(std::stoi(row[4])); //!< @todo Nicer conversion + + n.dateStarted = std::stoll(row[5]); + n.datePlanned = std::stoll(row[6]); + + n.url = row[7]; + + n.executionStatus = static_cast(std::stoi(row[8])); //!< @todo Nicer conversion + n.executionStatusText = row[9]; + n.httpStatus = std::stoi(row[10]); + + _return.push_back(n); + } + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::getNotifications(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + + void disableJobsForUser(const int64_t userId) override + { + using namespace Chronos; + + std::cout << "ChronosNodeHandler::disableJobsForUser(" << userId << ")" << std::endl; + + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + db->query("UPDATE `job` SET `enabled`=0 WHERE `userid`=%v", + userId); + } + catch(const std::exception &ex) + { + std::cout << "ChronosNodeHandler::disableJobsForUser(): Exception: " << ex.what() << std::endl; + throw InternalError(); + } + } + +private: + template + void getJobSchedule(std::unique_ptr &db, const JobIdentifier &identifier, const char *name, std::set &target) const + { + MYSQL_ROW row; + auto res = db->query("SELECT `%s` FROM `job_%ss` WHERE `jobid`=%v", + name, name, + identifier.jobId); + while((row = res->fetchRow())) + { + target.insert(std::stoi(row[0])); + } + } + + template + void saveJobSchedule(std::unique_ptr &db, const JobIdentifier &identifier, const char *name, const std::set &items) const + { + db->query("DELETE FROM `job_%ss` WHERE `jobid`=%v", + name, + identifier.jobId); + + for(const auto &val : items) + { + db->query("INSERT INTO `job_%ss`(`jobid`,`%s`) VALUES(%v,%d)", + name, name, + identifier.jobId, + val); + } + } + + long long jobUserId(std::unique_ptr &db, const long long jobId) const + { + MYSQL_ROW row; + auto res = db->query("SELECT `userid` FROM `job` WHERE `jobid`=%v", + jobId); + while((row = res->fetchRow())) + { + return std::stoll(row[0]); + } + return(-1); + } + + bool jobExists(std::unique_ptr &db, const JobIdentifier &identifier) const + { + const auto userId = jobUserId(db, identifier.jobId); + return(userId != -1 && userId == identifier.userId); + } + + struct tm timeStruct(const int offsetFromNow) const + { + struct tm tmStruct = { 0 }; + time_t tmTime = time(nullptr) + offsetFromNow; + if(gmtime_r(&tmTime, &tmStruct) == nullptr) + throw std::runtime_error("gmtime_r returned nullptr"); + return tmStruct; + } + + void getJobLogForDay(std::vector &_return, const JobIdentifier &identifier, const int mday, const int month, const int16_t maxEntries) const + { + using namespace Chronos; + + if(maxEntries == 0) + return; + + std::string dbFilePath = Utils::userDbFilePath(userDbFilePathScheme, userDbFileNameScheme, identifier.userId, mday, month); + std::unique_ptr userDB; + + try + { + userDB = std::make_unique(dbFilePath.c_str(), true /* read only */); + } + catch(const std::exception &ex) + { + //! @note Ignore failures during open (the db probably doesn't exist because there's no log entry on that day) + return; + } + + std::string query = "SELECT \"joblogid\",\"jobid\",\"date\",\"date_planned\",\"jitter\",\"url\",\"duration\",\"status\",\"status_text\",\"http_status\" FROM \"joblog\" "; + if(identifier.jobId > 0) + { + query += "WHERE \"jobid\"=:jobid "; + } + query += "ORDER BY \"joblogid\" DESC LIMIT " + std::to_string(maxEntries); + + auto stmt = userDB->prepare(query); + if(identifier.jobId > 0) + { + stmt->bind(":jobid", identifier.jobId); + } + + while(stmt->execute()) + { + _return.push_back(convertToJobLogEntry(stmt, identifier.userId, mday, month)); + } + } + + JobLogEntry convertToJobLogEntry(const std::unique_ptr &stmt, int64_t userId, int16_t mday, int16_t month) const + { + JobLogEntry entry; + entry.jobLogId = stmt->intValue("joblogid"); + entry.jobIdentifier.userId = userId; + entry.jobIdentifier.jobId = stmt->intValue("jobid"); + entry.date = stmt->intValue("date"); + entry.datePlanned = stmt->intValue("date_planned"); + entry.jitter = stmt->intValue("jitter"); + entry.url = stmt->stringValue("url"); + entry.duration = stmt->intValue("duration"); + entry.status = static_cast(stmt->intValue("status")); //!< @todo Nicer conversion + entry.statusText = stmt->stringValue("status_text"); + entry.httpStatus = stmt->intValue("http_status"); + entry.mday = mday; + entry.month = month; + return entry; + } + + std::string userDbFilePathScheme; + std::string userDbFileNameScheme; +}; + +} + +namespace Chronos { + +NodeService::NodeService(const std::string &interface, int port) + : server(std::make_shared( + std::make_shared(std::make_shared()), + std::make_shared(interface, port), + std::make_shared(), + std::make_shared() + )) +{ +} + +void NodeService::run() +{ + server->serve(); +} + +void NodeService::stop() +{ + server->stop(); +} + +} // Chronos \ No newline at end of file diff --git a/chronos/NodeService.h b/chronos/NodeService.h new file mode 100644 index 000000000..f9da40936 --- /dev/null +++ b/chronos/NodeService.h @@ -0,0 +1,36 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017-2019 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _NODESERVICE_H_ +#define _NODESERVICE_H_ + +#include +#include + +namespace apache { namespace thrift { namespace server { class TThreadedServer; } } } + +namespace Chronos +{ + class NodeService + { + public: + NodeService(const std::string &interface, int port); + + public: + void run(); + void stop(); + + private: + std::shared_ptr<::apache::thrift::server::TThreadedServer> server; + }; +}; + +#endif diff --git a/chronos/Notification.h b/chronos/Notification.h index faff6b5f7..c77a34e43 100644 --- a/chronos/Notification.h +++ b/chronos/Notification.h @@ -12,6 +12,8 @@ #ifndef _NOTIFICATION_H_ #define _NOTIFICATION_H_ +#include "JobResult.h" + namespace Chronos { enum NotificationType_t @@ -20,6 +22,22 @@ namespace Chronos NOTIFICATION_TYPE_SUCCESS = 1, NOTIFICATION_TYPE_DISABLE = 2 }; + + struct Notification + { + int userID = 0; + int jobID = 0; + uint64_t date = 0; // in ms + uint64_t dateStarted = 0; // in ms + uint64_t datePlanned = 0; // in ms + NotificationType_t type; + std::string url; + std::string title; + JobStatus_t status = JOBSTATUS_UNKNOWN; + std::string statusText; + int httpStatus = 0; + int failCounter = 0; + }; }; #endif diff --git a/chronos/NotificationThread.cpp b/chronos/NotificationThread.cpp new file mode 100644 index 000000000..a27bc4f79 --- /dev/null +++ b/chronos/NotificationThread.cpp @@ -0,0 +1,426 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2020 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "NotificationThread.h" + +#include +#include + +#include +#include + +#include + +#include "App.h" +#include "Notification.h" +#include "NotificationThread.h" +#include "SQLite.h" +#include "Utils.h" + +#include +#include +#include +#include + +#include "ChronosMaster.h" + +class Mail +{ + struct HeaderItem + { + std::string value; + bool placeholders; + }; + +public: + void addHeader(const std::string &key, const std::string &value, bool placeholders = false) + { + m_headers.emplace(key, HeaderItem{value, placeholders}); + } + + void assign(const std::string &key, const std::string &value) + { + m_placeholders.emplace(key, value); + } + + void setText(const std::string &mailText) + { + m_text = mailText; + } + + void setMailFrom(const std::string &address) + { + m_mailFrom = address; + } + + void setRcptTo(const std::string &address) + { + m_rcptTo = address; + } + + const std::string &mailFrom() const + { + return m_mailFrom; + } + + const std::string &rcptTo() const + { + return m_rcptTo; + } + + std::string dump() const + { + std::stringstream ss; + buildMail(ss); + return ss.str(); + } + +private: + void buildMail(std::stringstream &out) const + { + constexpr const char CRLF[] = "\r\n"; + + for(const auto &headerItem : m_headers) + { + out << headerItem.first << ": " + << sanitizeHeader(headerItem.second.placeholders + ? replacePlaceholders(headerItem.second.value) + : headerItem.second.value) + << CRLF; + } + out << CRLF; + + out << replacePlaceholders(m_text); + } + + std::string replacePlaceholders(const std::string &in) const + { + std::string result = in; + for(const auto &item : m_placeholders) + Chronos::Utils::replace(result, item.first, item.second); + return result; + } + + std::string sanitizeHeader(const std::string &in) const + { + std::string result = in; + const std::string forbidden = "\r\n"; + + std::size_t pos; + while ((pos = result.find_first_of(forbidden)) != std::string::npos) + result.erase(result.begin() + pos); + + return result; + } + +private: + std::string m_mailFrom; + std::string m_rcptTo; + std::unordered_map m_headers; + std::unordered_map m_placeholders; + std::string m_text; +}; + +namespace { + +size_t curlStringReadFunction(char *buffer, size_t size, size_t nitems, void *userData) +{ + if(userData == nullptr) + { + std::cerr << "curlStringReadFunction(): userData is nullptr!" << std::endl; + return 0; + } + + std::string *stringData = reinterpret_cast(userData); + + std::size_t bytesToRead = std::min(stringData->size(), size * nitems); + if(bytesToRead > 0) + { + std::memcpy(buffer, stringData->c_str(), bytesToRead); + stringData->erase(stringData->begin(), stringData->begin() + bytesToRead); + } + + return bytesToRead; +} + +} + +using namespace Chronos; + +NotificationThread *NotificationThread::instance = nullptr; + +NotificationThread::NotificationThread() +{ + if(NotificationThread::instance != nullptr) + throw std::runtime_error("Notification thread instance already exists"); + + NotificationThread::instance = this; + + masterSocket = std::make_shared( + App::getInstance()->config->get("master_service_address"), + App::getInstance()->config->getInt("master_service_port")); + masterTransport = std::make_shared(masterSocket); + masterProtocol = std::make_shared(masterTransport); + masterClient = std::make_shared(masterProtocol); + + defaultLang = App::getInstance()->config->get("default_lang"); + mailFrom = App::getInstance()->config->get("notification_mail_from"); + mailSender = App::getInstance()->config->get("notification_mail_sender"); + smtpServer = App::getInstance()->config->get("smtp_server"); +} + +NotificationThread::~NotificationThread() +{ + NotificationThread::instance = nullptr; +} + +NotificationThread *NotificationThread::getInstance() +{ + if(NotificationThread::instance == nullptr) + throw std::runtime_error("No notification thread instance available"); + return(NotificationThread::instance); +} + +void NotificationThread::addNotification(Notification &¬ification) +{ + std::lock_guard lg(queueMutex); + queue.push(std::move(notification)); + queueSignal.notify_one(); +} + +void NotificationThread::stopThread() +{ + stop = true; + + std::lock_guard lg(queueMutex); + queueSignal.notify_one(); +} + +void NotificationThread::run() +{ + std::cout << "NotificationThread::run(): Entered" << std::endl; + + decltype(queue) tempQueue; + + stop = false; + while(!stop) + { + { + std::unique_lock lock(queueMutex); + if(queue.empty()) + queueSignal.wait(lock); + queue.swap(tempQueue); + } + + auto numNotifications = tempQueue.size(); + if(numNotifications > 100) + std::cout << "NotificationThread::run(): " << numNotifications << " notification jobs fetched" << std::endl; + + time_t tStart = time(nullptr); + if(!tempQueue.empty()) + { + masterTransport->open(); + + syncPhrases(); + + while(!tempQueue.empty()) + { + Notification notification = std::move(tempQueue.front()); + tempQueue.pop(); + processNotification(notification); + } + + masterTransport->close(); + } + time_t tEnd = time(nullptr); + + if(numNotifications > 100) + std::cout << "NotificationThread::run(): Processing " << numNotifications << " took " << (tEnd-tStart) << " seconds" << std::endl; + } + + std::cout << "NotificationThread::run(): Finished" << std::endl; +} + +void NotificationThread::syncPhrases() +{ + try + { + Phrases newPhrases; + masterClient->getPhrases(newPhrases); + + phrases.clear(); + for (const auto &langItem : newPhrases.phrases) { + for (const auto &phraseItem : langItem.second) { + phrases[langItem.first][phraseItem.first] = phraseItem.second; + } + } + } + catch (const apache::thrift::TException &ex) + { + std::cerr << "NotificationThread::syncPhrases(): Failed to sync phrases: " << ex.what() << std::endl; + } +} + +std::string NotificationThread::getPhrase(const std::string &lang, const std::string &key) const +{ + auto it = phrases.find(lang); + if(it != phrases.end()) + { + const auto it2 = it->second.find(key); + if(it2 != it->second.end()) + return it2->second; + } + + if(lang != defaultLang) + return getPhrase(defaultLang, key); + + std::cerr << "NotificationThread::getPhrase(): Unknown phrase: " << lang << " " << key << std::endl; + return "${unknownPhrase:" + key + "}"; +} + +std::string NotificationThread::formatDate(const std::string &lang, const uint64_t date) const +{ + if(date == 0) + { + return "-"; + } + + time_t timeVal = static_cast(date); + struct tm parsedTime; + + if(gmtime_r(&timeVal, &parsedTime) == nullptr) + { + std::cerr << "NotificationThread::formatDate(): gmtime_r() failed!" << std::endl; + return "?"; + } + + const std::string format = getPhrase(lang, "format.date.full"); + + char dateBuffer[255]; + if (std::strftime(dateBuffer, sizeof(dateBuffer), format.c_str(), &parsedTime) == 0) + { + std::cerr << "NotificationThread::formatDate(): strftime() failed!" << std::endl; + return "?"; + } + + return dateBuffer; +} + +std::string NotificationThread::formatStatus(const std::string &lang, const Notification ¬ification) const +{ + std::string result; + + switch(notification.status) + { + case JOBSTATUS_OK: result = getPhrase(lang, "job.status.ok"); break; + case JOBSTATUS_FAILED_TIMEOUT: result = getPhrase(lang, "job.status.timeout"); break; + case JOBSTATUS_FAILED_SIZE: result = getPhrase(lang, "job.status.size"); break; + case JOBSTATUS_FAILED_URL: result = getPhrase(lang, "job.status.url"); break; + default: result = getPhrase(lang, "job.status.failed"); break; + } + + switch(notification.status) + { + case JOBSTATUS_OK: + case JOBSTATUS_FAILED_HTTPERROR: + result += " (" + std::to_string(notification.httpStatus) + " " + notification.statusText + ")"; + break; + default: + break; + } + + return result; +} + +void NotificationThread::processNotification(const Notification ¬ification) +{ + UserDetails userDetails; + + try + { + masterClient->getUserDetails(userDetails, notification.userID); + } + catch (const apache::thrift::TException &ex) + { + std::cerr << "NotificationThread::processNotification(): Failed to retrieve user details: " << ex.what() << std::endl; + } + + Mail mail; + mail.setMailFrom(mailFrom); + mail.setRcptTo(userDetails.email); + mail.addHeader("From", mailSender); + mail.addHeader("To", std::string("<") + userDetails.email + std::string(">")); + mail.addHeader("Content-Type", "text/plain; charset=UTF-8"); + mail.addHeader("Content-Transfer-Encoding", "8bit"); + mail.addHeader("Precedence", "bulk"); + + mail.assign("$firstname", userDetails.firstName); + mail.assign("$lastname", userDetails.lastName); + mail.assign("$title", !notification.title.empty() ? notification.title : notification.url); + mail.assign("$url", notification.url); + mail.assign("$executed", formatDate(userDetails.language, notification.dateStarted)); + mail.assign("$scheduled", formatDate(userDetails.language, notification.datePlanned)); + mail.assign("$attempts", std::to_string(notification.failCounter)); + mail.assign("$status", formatStatus(userDetails.language, notification)); + + switch(notification.type) + { + case NOTIFICATION_TYPE_FAILURE: + mail.addHeader("Subject", getPhrase(userDetails.language, "notify.failure.mail.subject"), true); + mail.setText(getPhrase(userDetails.language, "notify.failure.mail.text")); + break; + + case NOTIFICATION_TYPE_SUCCESS: + mail.addHeader("Subject", getPhrase(userDetails.language, "notify.success.mail.subject"), true); + mail.setText(getPhrase(userDetails.language, "notify.success.mail.text")); + break; + + case NOTIFICATION_TYPE_DISABLE: + mail.addHeader("Subject", getPhrase(userDetails.language, "notify.disable.mail.subject"), true); + mail.setText(getPhrase(userDetails.language, "notify.disable.mail.text")); + break; + + default: + std::cerr << "NotificationThread::processNotification(): Unknown notification type!" << std::endl; + return; + } + + sendMail(mail); +} + +void NotificationThread::sendMail(const Mail &mail) const +{ + CURL *curl = curl_easy_init(); + if(curl == nullptr) + { + std::cerr << "NotificationThread::sendMail(): curl_easy_init() failed!" << std::endl; + return; + } + + struct curl_slist *recipients = nullptr; + recipients = curl_slist_append(recipients, mail.rcptTo().c_str()); + + std::string mailData = mail.dump(); + + curl_easy_setopt(curl, CURLOPT_URL, smtpServer.c_str()); + curl_easy_setopt(curl, CURLOPT_MAIL_FROM, mail.mailFrom().c_str()); + curl_easy_setopt(curl, CURLOPT_MAIL_RCPT, recipients); + curl_easy_setopt(curl, CURLOPT_READFUNCTION, curlStringReadFunction); + curl_easy_setopt(curl, CURLOPT_READDATA, &mailData); + curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); + + int res = curl_easy_perform(curl); + if(res != CURLE_OK) + std::cerr << "NotificationThread::sendMail(): Failed to send email: " << res << std::endl; + + curl_slist_free_all(recipients); + curl_easy_cleanup(curl); +} diff --git a/chronos/NotificationThread.h b/chronos/NotificationThread.h new file mode 100644 index 000000000..1fadc82e6 --- /dev/null +++ b/chronos/NotificationThread.h @@ -0,0 +1,86 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2020 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _NOTIFICATIONTHREAD_H_ +#define _NOTIFICATIONTHREAD_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include "Notification.h" + +class ChronosMasterClient; +class Mail; + +namespace apache { namespace thrift { + +namespace protocol { +class TProtocol; +} + +namespace transport { +class TTransport; +} + +} } + +namespace Chronos +{ + class NotificationThread + { + public: + NotificationThread(); + ~NotificationThread(); + + private: + NotificationThread(const NotificationThread &other) = delete; + NotificationThread(NotificationThread &&other) = delete; + NotificationThread &operator=(const NotificationThread &other) = delete; + NotificationThread &operator=(NotificationThread &&other) = delete; + + public: + static NotificationThread *getInstance(); + void run(); + void stopThread(); + void addNotification(Notification &¬ification); + + private: + void syncPhrases(); + std::string getPhrase(const std::string &lang, const std::string &key) const; + std::string formatDate(const std::string &lang, const uint64_t date) const; + std::string formatStatus(const std::string &lang, const Notification ¬ification) const; + void processNotification(const Notification ¬ification); + void sendMail(const Mail &mail) const; + + private: + bool stop = false; + static NotificationThread *instance; + std::mutex queueMutex; + std::condition_variable queueSignal; + std::queue queue; + std::shared_ptr masterSocket; + std::shared_ptr masterTransport; + std::shared_ptr masterProtocol; + std::shared_ptr masterClient; + std::string defaultLang; + std::string mailFrom; + std::string mailSender; + std::string smtpServer; + std::unordered_map> phrases; + }; +}; + +#endif diff --git a/chronos/SQLite.cpp b/chronos/SQLite.cpp index 2317695ba..46f1b727e 100644 --- a/chronos/SQLite.cpp +++ b/chronos/SQLite.cpp @@ -18,9 +18,10 @@ using namespace Chronos; -SQLite_DB::SQLite_DB(const std::string &fileName, const int BusyTimeoutMs) : strFileName(fileName) +SQLite_DB::SQLite_DB(const std::string &fileName, const bool readOnly, const int BusyTimeoutMs) : strFileName(fileName) { - int res = sqlite3_open(strFileName.c_str(), &handle); + int res = sqlite3_open_v2(strFileName.c_str(), &handle, + readOnly ? SQLITE_OPEN_READONLY : SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr); if(res != SQLITE_OK) { std::stringstream err; @@ -125,14 +126,14 @@ int SQLite_Statement::fieldIndex(const std::string &field) bool SQLite_Statement::execute() { int res = sqlite3_step(stmt); - if(res != SQLITE_OK && res != SQLITE_DONE) + if(res != SQLITE_OK && res != SQLITE_DONE && res != SQLITE_ROW) { std::stringstream err; err << "Failed to execute query: " << sqlite3_errstr(res); throw std::runtime_error(err.str()); } - if(res == SQLITE_OK && !columnsFetched) + if((res == SQLITE_OK || res == SQLITE_ROW) && !columnsFetched) { columns.clear(); for(int i = 0; i < sqlite3_column_count(stmt); ++i) @@ -141,7 +142,7 @@ bool SQLite_Statement::execute() } columnsFetched = true; } - return(res == SQLITE_OK); + return(res == SQLITE_OK || res == SQLITE_ROW); } void SQLite_Statement::reset() @@ -164,5 +165,13 @@ std::string SQLite_Statement::stringValue(const std::string &field) auto it = columns.find(field); if(it == columns.end()) throw std::runtime_error("Field not found: " + field); - return std::string(reinterpret_cast(sqlite3_column_text(stmt, it->second))); + const unsigned char *columnText = sqlite3_column_text(stmt, it->second); + if(columnText == nullptr) + return {}; + return std::string(reinterpret_cast(columnText)); +} + +bool SQLite_Statement::hasField(const std::string &field) const +{ + return(columns.find(field) != columns.end()); } diff --git a/chronos/SQLite.h b/chronos/SQLite.h index 7f9b156d3..c938884df 100644 --- a/chronos/SQLite.h +++ b/chronos/SQLite.h @@ -45,6 +45,7 @@ namespace Chronos void reset(); int intValue(const std::string &field); std::string stringValue(const std::string &field); + bool hasField(const std::string &field) const; private: int fieldIndex(const std::string &field); @@ -60,7 +61,7 @@ namespace Chronos class SQLite_DB { public: - SQLite_DB(const std::string &fileName, const int BusyTimeoutMs = 2500); + SQLite_DB(const std::string &fileName, const bool readOnly = false, const int BusyTimeoutMs = 2500); ~SQLite_DB(); SQLite_DB(const SQLite_DB &other) = delete; diff --git a/chronos/UpdateThread.cpp b/chronos/UpdateThread.cpp index 99c8ebd00..bcad673d1 100644 --- a/chronos/UpdateThread.cpp +++ b/chronos/UpdateThread.cpp @@ -18,6 +18,7 @@ #include "App.h" #include "Notification.h" +#include "NotificationThread.h" #include "SQLite.h" #include "Utils.h" @@ -58,25 +59,12 @@ void UpdateThread::addResult(std::unique_ptr result) void UpdateThread::storeResult(const std::unique_ptr &result) { - std::string userPathPart = Utils::userPathPart(result->userID); - struct tm tmStruct = { 0 }; time_t tmTime = result->datePlanned / 1000; if(gmtime_r(&tmTime, &tmStruct) == nullptr) throw std::runtime_error("gmtime_r returned nullptr"); - // e.g. /var/lib/cron-job.org/%u - std::string dbDirPath = userDbFilePathScheme; - Utils::replace(dbDirPath, "%u", userPathPart); - if(!Utils::directoryExists(dbDirPath)) - Utils::mkPath(dbDirPath); - - // e.g. joblog-%m-%d.db - std::string dbFileName = userDbFileNameScheme; - Utils::replace(dbFileName, "%d", Utils::toString(tmStruct.tm_mday, 2)); - Utils::replace(dbFileName, "%m", Utils::toString(tmStruct.tm_mon, 2)); - - std::string dbFilePath = dbDirPath + "/" + dbFileName; + std::string dbFilePath = Utils::userDbFilePath(userDbFilePathScheme, userDbFileNameScheme, result->userID, tmStruct.tm_mday, tmStruct.tm_mon); int jobLogID = 0; int jobLogIDDay = tmStruct.tm_mday; int jobLodIDMonth = tmStruct.tm_mon; @@ -172,7 +160,7 @@ void UpdateThread::storeResult(const std::unique_ptr &result) } res.reset(); - bool createNotificationRow = false; + bool createNotification = false; NotificationType_t notificationType; // disable job? @@ -185,7 +173,7 @@ void UpdateThread::storeResult(const std::unique_ptr &result) // notify? if(result->notifyDisable) { - createNotificationRow = true; + createNotification = true; notificationType = NOTIFICATION_TYPE_DISABLE; } } @@ -195,7 +183,7 @@ void UpdateThread::storeResult(const std::unique_ptr &result) && result->status != JOBSTATUS_OK && failCounter == 1) { - createNotificationRow = true; + createNotification = true; notificationType = NOTIFICATION_TYPE_FAILURE; } @@ -205,18 +193,40 @@ void UpdateThread::storeResult(const std::unique_ptr &result) && result->oldFailCounter > 0 && failCounter == 0) { - createNotificationRow = true; + createNotification = true; notificationType = NOTIFICATION_TYPE_SUCCESS; } - if(createNotificationRow) + if(createNotification) { - db->query("INSERT INTO `notification`(`jobid`,`joblogid`,`date`,`type`) " - "VALUES(%d,%d,%d,%d)", + Notification n; + n.userID = result->userID; + n.jobID = result->jobID; + n.date = time(NULL); + n.dateStarted = result->dateStarted / 1000; + n.datePlanned = result->datePlanned / 1000; + n.type = notificationType; + n.url = result->url; + n.title = result->title; + n.status = result->status; + n.statusText = result->statusText; + n.httpStatus = result->httpStatus; + n.failCounter = failCounter; + + db->query("INSERT INTO `notification`(`jobid`,`joblogid`,`date`,`type`,`date_started`,`date_planned`,`url`,`execution_status`,`execution_status_text`,`execution_http_status`) " + "VALUES(%d,%d,%u,%u,%u,%u,'%q',%u,'%q',%u)", result->jobID, jobLogID, - static_cast(time(NULL)), - static_cast(notificationType)); + static_cast(time(NULL)), + static_cast(n.type), + static_cast(n.dateStarted), + static_cast(n.datePlanned), + n.url.c_str(), + static_cast(n.status), + n.statusText.c_str(), + static_cast(n.httpStatus)); + + NotificationThread::getInstance()->addNotification(std::move(n)); } } diff --git a/chronos/Utils.cpp b/chronos/Utils.cpp index 028bb651e..ba3dedad4 100644 --- a/chronos/Utils.cpp +++ b/chronos/Utils.cpp @@ -84,6 +84,24 @@ std::string Utils::userPathPart(const int userID) return result; } +std::string Utils::userDbFilePath(const std::string &userDbFilePathScheme, const std::string &userDbFileNameScheme, const int userID, const int mday, const int month) +{ + const std::string userPart = userPathPart(userID); + + // e.g. /var/lib/cron-job.org/%u + std::string dbDirPath = userDbFilePathScheme; + Utils::replace(dbDirPath, "%u", userPart); + if(!Utils::directoryExists(dbDirPath)) + Utils::mkPath(dbDirPath); + + // e.g. joblog-%m-%d.db + std::string dbFileName = userDbFileNameScheme; + Utils::replace(dbFileName, "%d", Utils::toString(mday, 2)); + Utils::replace(dbFileName, "%m", Utils::toString(month, 2)); + + return dbDirPath + "/" + dbFileName; +} + std::string Utils::toString(int num, int places) { std::string result = std::to_string(num); diff --git a/chronos/Utils.h b/chronos/Utils.h index 101ac15e1..e9ba97e8f 100644 --- a/chronos/Utils.h +++ b/chronos/Utils.h @@ -25,6 +25,7 @@ namespace Chronos std::string trim(const std::string &in); void replace(std::string &str, const std::string &search, const std::string &repl); std::string userPathPart(const int userID); + std::string userDbFilePath(const std::string &userDbFilePathScheme, const std::string &userDbFileNameScheme, const int userID, const int mday, const int month); std::string toString(int num, int places); bool directoryExists(const std::string &path); bool mkPath(const std::string &path, const mode_t mode = 0755); diff --git a/chronos/WorkerThread.cpp b/chronos/WorkerThread.cpp index 51ee205c4..b2ea22721 100644 --- a/chronos/WorkerThread.cpp +++ b/chronos/WorkerThread.cpp @@ -18,6 +18,13 @@ #include "UpdateThread.h" #include "App.h" +#include +#include +#include +#include + +#include "ChronosMaster.h" + using namespace Chronos; namespace { @@ -214,17 +221,38 @@ void WorkerThread::jobDone(HTTPRequest *req) void WorkerThread::addStat() { + std::shared_ptr masterSocket + = std::make_shared( + App::getInstance()->config->get("master_service_address"), + App::getInstance()->config->getInt("master_service_port")); + std::shared_ptr masterTransport + = std::make_shared(masterSocket); + std::shared_ptr masterProtocol + = std::make_shared(masterTransport); + std::shared_ptr masterClient + = std::make_shared(masterProtocol); + try { - std::unique_ptr db(App::getInstance()->createMySQLConnection()); + masterTransport->open(); + + NodeStatsEntry stats; + stats.d = mday; + stats.m = month; + stats.y = year; + stats.h = hour; + stats.i = minute; + stats.jobs = jobCount; + stats.jitter = jitterSum / static_cast(jobCount); + + masterClient->reportNodeStats(App::getInstance()->config->getInt("node_id"), + stats); - db->query("REPLACE INTO `stats`(`d`,`m`,`y`,`h`,`i`,`jobs`,`jitter`) VALUES(%d,%d,%d,%d,%d,%d,%f)", - mday, month, year, hour, minute, - jobCount, jitterSum / static_cast(jobCount)); + masterTransport->close(); } - catch(const std::exception &ex) + catch(const apache::thrift::TException &ex) { - std::cout << "WorkerThread::addStat(): Exception: " << ex.what() << std::endl; + std::cerr << "WorkerThread::addStat(): Failed to report node stats: " << ex.what() << std::endl; } } diff --git a/chronos/chronos.cfg b/chronos/chronos.cfg index be44ee341..6d4b5f872 100644 --- a/chronos/chronos.cfg +++ b/chronos/chronos.cfg @@ -4,6 +4,9 @@ mysql_user = enter_mysql_user_here mysql_pass = enter_mysql_password_here mysql_db = enter_mysql_db_name_here +# Disable job executor? +job_executor_enable = 1 + # Use job log entries up to this age for jitter average calculation jitter_avg_time = 300 @@ -27,3 +30,45 @@ user_db_file_name_scheme = joblog-%m-%d.db # User-Agent header for requests user_agent = Mozilla/4.0 (compatible) + +# Default language +default_lang = de + +# Email envelope from +notification_mail_from = + +# Email from header +notification_mail_sender = "cron-job.org" + +# SMTP server +smtp_server = smtp://localhost + +# This node's is +node_id = 1 + +# Start node service? +node_service_enable = 1 + +# Master service database +master_mysql_host = localhost +master_mysql_user = enter_mysql_user_here +master_mysql_pass = enter_mysql_password_here +master_mysql_db = enter_mysql_db_name_here + +# Node service port +node_service_port = 9090 + +# Node service interface (0.0.0.0 = all) +node_service_interface = 127.0.0.1 + +# Start master service? +master_service_enable = 1 + +# Master service port +master_service_port = 9091 + +# Master service interface (0.0.0.0 = all) +master_service_interface = 127.0.0.1 + +# Master service address (for client) +master_service_address = 127.0.0.1 diff --git a/database/struct_master.sql b/database/struct_master.sql new file mode 100644 index 000000000..da4ccd51d --- /dev/null +++ b/database/struct_master.sql @@ -0,0 +1,82 @@ +CREATE TABLE `nodestats`( + `nodeid` int(11) NOT NULL, + `d` tinyint(4) NOT NULL DEFAULT '0', + `m` tinyint(4) NOT NULL DEFAULT '0', + `y` int(11) NOT NULL DEFAULT '0', + `h` tinyint(4) NOT NULL DEFAULT '0', + `i` tinyint(4) NOT NULL DEFAULT '0', + `jobs` int(11) NOT NULL DEFAULT '0', + `jitter` double NOT NULL DEFAULT '0', + PRIMARY KEY (`nodeid`,`d`,`m`,`y`,`h`,`i`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `user` ( + `userid` int(11) NOT NULL AUTO_INCREMENT, + `usergroupid` int(11) NOT NULL DEFAULT 1, + `status` tinyint(4) NOT NULL DEFAULT '0', + `email` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `firstname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `lastname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `signup_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `signup_date` int(11) NOT NULL DEFAULT '0', + `verification_token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `verification_date` int(11) NOT NULL DEFAULT '0', + `lastlogin_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `lastlogin_date` int(11) NOT NULL DEFAULT '0', + `lastlogin_lang` varchar(4) NOT NULL DEFAULT 'de', + `timezone` varchar(32) NOT NULL DEFAULT 'Europe/Berlin', + PRIMARY KEY (`userid`), + KEY `email` (`email`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `user_pwreset` ( + `userid` int(11) NOT NULL DEFAULT '0', + `expires` int(11) NOT NULL DEFAULT '0', + `token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + PRIMARY KEY (`userid`), + KEY `expires` (`expires`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `usergroup`( + `usergroupid` int(11) NOT NULL AUTO_INCREMENT, + `title` varchar(128) NOT NULL DEFAULT '', + PRIMARY KEY(`usergroupid`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; +INSERT INTO `usergroup`(`usergroupid`, `title`) VALUES(1, 'Default'); + +CREATE TABLE `node`( + `nodeid` int(11) NOT NULL AUTO_INCREMENT, + `name` varchar(128) NOT NULL DEFAULT '', + `ip` varchar(32) NOT NULL DEFAULT '', + `port` int(11) NOT NULL DEFAULT 9090, + `enabled` tinyint(4) NOT NULL DEFAULT '1', + PRIMARY KEY(`nodeid`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; +INSERT INTO `node`(`nodeid`,`name`,`ip`,`port`) VALUES(1, 'Local Node', '127.0.0.1',9090); + +CREATE TABLE `usergroupnode`( + `usergroupid` int(11) NOT NULL, + `nodeid` int(11) NOT NULL, + `enabled` tinyint(4) NOT NULL DEFAULT '1', + PRIMARY KEY(`usergroupid`, `nodeid`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; +INSERT INTO `usergroupnode`(`usergroupid`,`nodeid`,`enabled`) VALUES(1, 1, 1); + +CREATE TABLE `job`( + `jobid` int(11) NOT NULL AUTO_INCREMENT, + `userid` int(11) NOT NULL DEFAULT 0, + `nodeid` int(11) NOT NULL DEFAULT 0, + PRIMARY KEY(`jobid`), + KEY(`userid`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `phrases`( + `lang` varchar(4) NOT NULL DEFAULT 'de', + `key` varchar(64) NOT NULL DEFAULT '', + `value` TEXT NOT NULL, + PRIMARY KEY(`lang`,`key`) +) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; diff --git a/database/struct.sql b/database/struct_node.sql similarity index 57% rename from database/struct.sql rename to database/struct_node.sql index f1f20065d..775ee78ef 100644 --- a/database/struct.sql +++ b/database/struct_node.sql @@ -16,8 +16,10 @@ CREATE TABLE `job` ( `fail_counter` int(11) NOT NULL DEFAULT '0', `save_responses` tinyint(4) NOT NULL DEFAULT '0', `request_method` tinyint(4) NOT NULL DEFAULT '0', + `timezone` varchar(32) NOT NULL DEFAULT 'Europe/Berlin', PRIMARY KEY (`jobid`), - KEY `userid` (`userid`) + KEY `userid` (`userid`), + KEY `timezone` (`timezone`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; CREATE TABLE `job_body` ( @@ -65,62 +67,19 @@ CREATE TABLE `job_wdays` ( PRIMARY KEY (`jobid`,`wday`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; -CREATE TABLE `jobdeletequeue` ( - `jobid` int(11) NOT NULL DEFAULT '0', - `date` int(11) NOT NULL DEFAULT '0', - PRIMARY KEY (`jobid`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; - CREATE TABLE `notification` ( `notificationid` int(11) NOT NULL AUTO_INCREMENT, `jobid` int(11) NOT NULL DEFAULT '0', `joblogid` int(11) unsigned NOT NULL DEFAULT '0', `date` int(14) NOT NULL DEFAULT '0', - `date_processed` int(14) NOT NULL DEFAULT '0', `type` tinyint(4) NOT NULL DEFAULT '0', - `status` tinyint(4) NOT NULL DEFAULT '0', + `date_started` int(14) NOT NULL DEFAULT '0', + `date_planned` int(14) NOT NULL DEFAULT '0', + `url` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `execution_status` tinyint(4) NOT NULL DEFAULT '0', + `execution_status_text` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `execution_http_status` int(11) NOT NULL DEFAULT '0', PRIMARY KEY (`notificationid`), KEY `jobid` (`jobid`), KEY `joblogid` (`joblogid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; - -CREATE TABLE `stats` ( - `d` tinyint(4) NOT NULL DEFAULT '0', - `m` tinyint(4) NOT NULL DEFAULT '0', - `y` int(11) NOT NULL DEFAULT '0', - `h` tinyint(4) NOT NULL DEFAULT '0', - `i` tinyint(4) NOT NULL DEFAULT '0', - `jobs` int(11) NOT NULL DEFAULT '0', - `jitter` double NOT NULL DEFAULT '0', - PRIMARY KEY (`d`,`m`,`y`,`h`,`i`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; - -CREATE TABLE `user` ( - `userid` int(11) NOT NULL AUTO_INCREMENT, - `status` tinyint(4) NOT NULL DEFAULT '0', - `email` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `firstname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `lastname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `signup_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `signup_date` int(11) NOT NULL DEFAULT '0', - `verification_token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `verification_date` int(11) NOT NULL DEFAULT '0', - `lastlogin_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `lastlogin_date` int(11) NOT NULL DEFAULT '0', - `lastlogin_lang` varchar(4) NOT NULL DEFAULT 'de', - `timezone` varchar(255) NOT NULL DEFAULT 'Europe/Berlin', - PRIMARY KEY (`userid`), - KEY `email` (`email`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; - -CREATE TABLE `user_pwreset` ( - `userid` int(11) NOT NULL DEFAULT '0', - `expires` int(11) NOT NULL DEFAULT '0', - `token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', - PRIMARY KEY (`userid`), - KEY `expires` (`expires`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; diff --git a/protocol/CMakeLists.txt b/protocol/CMakeLists.txt new file mode 100644 index 000000000..c43344c16 --- /dev/null +++ b/protocol/CMakeLists.txt @@ -0,0 +1,26 @@ +set(NAME protocol) + +find_package(ThriftCompiler REQUIRED) +find_package(LibThrift REQUIRED) +find_package(Boost COMPONENTS headers) + +thrift_compile(protocol.thrift cpp protocol-cpp) +thrift_compile(protocol.thrift php:nsglobal=chronos protocol-php) + +add_library(${NAME} SHARED + ${CMAKE_CURRENT_BINARY_DIR}/protocol-cpp/ChronosNode.cpp + ${CMAKE_CURRENT_BINARY_DIR}/protocol-cpp/ChronosMaster.cpp + ${CMAKE_CURRENT_BINARY_DIR}/protocol-cpp/protocol_constants.cpp + ${CMAKE_CURRENT_BINARY_DIR}/protocol-cpp/protocol_types.cpp +) +target_include_directories(${NAME} + INTERFACE + ${CMAKE_CURRENT_BINARY_DIR}/protocol-cpp/ + PUBLIC + ${LIBTHRIFT_INCLUDE_DIRS} + ${Boost_INCLUDE_DIRS} +) +target_link_libraries(${NAME} + PUBLIC + ${LIBTHRIFT_LDFLAGS} +) diff --git a/protocol/protocol.thrift b/protocol/protocol.thrift new file mode 100644 index 000000000..999029004 --- /dev/null +++ b/protocol/protocol.thrift @@ -0,0 +1,192 @@ +enum JobStatus +{ + UNKNOWN = 0, + OK = 1, + FAILED_DNS = 2, + FAILED_CONNECT = 3, + FAILED_HTTPERROR = 4, + FAILED_TIMEOUT = 5, + FAILED_SIZE = 6, + FAILED_URL = 7, + FAILED_INTERNAL = 8, + FAILED_OTHERS = 9 +} + +enum RequestMethod +{ + GET = 0, + POST = 1, + OPTIONS = 2, + HEAD = 3, + PUT = 4, + DELETE = 5, + TRACE = 6, + CONNECT = 7 +} + +struct JobIdentifier +{ + 1: i64 jobId; + 2: i64 userId; +} + +struct JobMetadata +{ + 1: bool enabled; + 2: string title; + 3: bool saveResponses; +} + +struct JobExecutionInfo +{ + 1: JobStatus lastStatus; + 2: i64 lastFetch; + 3: i32 lastDuration; + 4: i32 failCounter; +} + +struct JobData +{ + 1: string url; + 2: RequestMethod requestMethod; +} + +struct JobExtendedData +{ + 1: string body; + 2: map headers; +} + +struct JobSchedule +{ + 1: set hours; + 2: set mdays; + 3: set minutes; + 4: set months; + 5: set wdays; + 6: string timezone; +} + +struct JobAuthentication +{ + 1: bool enable; + 2: string user; + 3: string password; +} + +struct JobNotification +{ + 1: bool onFailure; + 2: bool onSuccess; + 3: bool onDisable; +} + +struct Job +{ + 1: required JobIdentifier identifier; + 2: optional JobMetadata metaData; + 3: optional JobExecutionInfo executionInfo; + 4: optional JobAuthentication authentication; + 5: optional JobNotification notification; + 6: optional JobSchedule schedule; + 7: optional JobData data; + 8: optional JobExtendedData extendedData; +} + +struct NodeStatsEntry +{ + 1: i8 d; + 2: i8 m; + 3: i16 y; + 4: i8 h; + 5: i8 i; + 6: i64 jobs; + 7: double jitter; +} + +struct JobLogEntry +{ + 1: i64 jobLogId; + 2: JobIdentifier jobIdentifier; + 3: i64 date; + 4: i64 datePlanned; + 5: i32 jitter; + 6: string url; + 7: i32 duration; + 8: JobStatus status; + 9: string statusText; + 10: i16 httpStatus; + 11: i16 mday; + 12: i16 month; + 13: optional string headers; + 14: optional string body; +} + +struct UserDetails +{ + 1: i64 userId; + 2: string email; + 3: string firstName; + 4: string lastName; + 5: string language; +} + +struct Phrases +{ + 1: map> phrases; +} + +enum NotificationType +{ + FAILURE = 0, + SUCCESS = 1, + DISABLE = 2 +} + +struct NotificationEntry +{ + 1: i64 notificationId; + 2: i64 jobLogId; + 3: JobIdentifier jobIdentifier; + 4: i64 date; + 5: NotificationType type; + 6: i64 dateStarted; + 7: i64 datePlanned; + 8: string url; + 9: JobStatus executionStatus; + 10: string executionStatusText; + 11: i16 httpStatus; +} + +exception ResourceNotFound {} +exception Forbidden {} +exception InvalidArguments {} +exception InternalError {} + +service ChronosNode +{ + bool ping(); + + list getJobsForUser(1: i64 userId) throws(1: InternalError ie); + Job getJobDetails(1: JobIdentifier identifier) throws(1: ResourceNotFound rnf, 2: InternalError ie); + + list getJobLog(1: JobIdentifier identifier, 2: i16 maxEntries) throws(1: InternalError ie, 2: InvalidArguments ia); + JobLogEntry getJobLogDetails(1: i64 userId, 2: i16 mday, 3: i16 month, 4: i64 jobLogId) throws(1: ResourceNotFound rnf, 2: Forbidden ad, 3: InternalError ie, 4: InvalidArguments ia); + + void createOrUpdateJob(1: Job job) throws(1: ResourceNotFound rnf, 2: Forbidden ad, 3: InternalError ie, 4: InvalidArguments ia); + + list getNotifications(1: i64 userId, 2: i16 maxEntries) throws(1: InternalError ie, 2: InvalidArguments ia); + + void deleteJob(1: JobIdentifier identifier) throws(1: ResourceNotFound rnf, 2: InternalError ie); + + void disableJobsForUser(1: i64 userId) throws(1: InternalError ie); +} + +service ChronosMaster +{ + bool ping(); + + void reportNodeStats(1: i32 nodeId, 2: NodeStatsEntry stats); + UserDetails getUserDetails(1: i64 userId) throws(1: ResourceNotFound rnf, 2: InternalError ie); + Phrases getPhrases() throws(1: InternalError ie); +}