Skip to content

Commit

Permalink
Support multiple execution nodes (#57)
Browse files Browse the repository at this point in the history
* Make timezone configurable per job

Resolves #47

* First implementation of Node service and related refactorings

Relates to #45

* Update readme

* Add master service, implement notifications and phrases

* Fix ubuntu build

* Fix typo in sql struct, add missing todo

* Fix build on macOS

* Add notifications support in node service

* Only consider enabled jobs for timezone retrieval

* Add timezone key to job table

* Add missing config setting

* Add disableJobsForUser to node service

* Attempt to fix travis build

* Update travis to xenial

* Fix thrift build

* Allow request body for DELETE requests

Resolves #54
  • Loading branch information
pschlan authored Mar 29, 2020
1 parent 8d8593b commit 7e251d2
Show file tree
Hide file tree
Showing 33 changed files with 2,075 additions and 131 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
11 changes: 9 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: generic
sudo: required
dist: trusty
dist: xenial
matrix:
include:
- os: linux
Expand All @@ -16,8 +16,15 @@ matrix:
- libev-dev
- libsqlite-dev
- libmysqlclient-dev
- libboost-all-dev
- flex
- bison
- pkg-config
before_script:
- wget https://ftp.fau.de/apache/thrift/0.13.0/thrift-0.13.0.tar.gz
- tar -xzf thrift-0.13.0.tar.gz
- pushd thrift-0.13.0 && ./configure --prefix=/usr --with-php=no --with-erlang=no --with-go=no --with-java=no --with-python=no --with-py3=no --with-ruby=no --with-nodejs=no --with-c_glib=no --with-cpp=yes && make && sudo make install && popd
script:
- cd chronos
- mkdir build
- cd build
- cmake .. && make
7 changes: 7 additions & 0 deletions CMake/Modules/FindLibThrift.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
find_package(PkgConfig REQUIRED)

pkg_check_modules(LIBTHRIFT REQUIRED thrift)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(libthrift DEFAULT_MSG LIBTHRIFT_LIBRARIES LIBTHRIFT_INCLUDE_DIRS LIBTHRIFT_LIBDIR)
mark_as_advanced(LIBTHRIFT_INCLUDE_DIRS LIBTHRIFT_LIBRARIES LIBTHRIFT_LIBDIR)
File renamed without changes.
File renamed without changes.
14 changes: 14 additions & 0 deletions CMake/Modules/FindThriftCompiler.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
find_program(THRIFT_COMPILER NAMES thrift)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(thrift DEFAULT_MSG THRIFT_COMPILER)
mark_as_advanced(THRIFT_COMPILER)

macro(thrift_compile FILENAME GENERATOR OUTPUTDIR)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${OUTPUTDIR})
execute_process(COMMAND ${THRIFT_COMPILER} --gen ${GENERATOR} -out ${CMAKE_CURRENT_BINARY_DIR}/${OUTPUTDIR} ${CMAKE_CURRENT_SOURCE_DIR}/${FILENAME}
RESULT_VARIABLE RES)
if(RES)
message(FATAL_ERROR "Failed to compile ${FILENAME} with thrift generator ${GENERATOR}")
endif()
endmacro(thrift_compile)
File renamed without changes.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.1)
project(cron-job.org)

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/CMake/Modules")
set(CMAKE_CXX_STANDARD 14)

add_subdirectory(protocol)
add_subdirectory(chronos)
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Structure
---------
* `database` contains the MySQL database structure.
* `chronos` is cron-job.org's cron job execution daemon and is responsible for fetching the jobs.
* `protocol` contains the interface definitions for interaction between system nodes.
* `web` contains the web interface (coming soon)

chronos
Expand All @@ -18,12 +19,18 @@ cron-job.org supports storing the job results for the user's convenience. This c

The whole software is optimized on performance rather than on data integrity, i.e. when your server crashes or you have a power outage / hardware defect, the job history is most likely lost. Since this is volatile data anyway, it's not considered a big issue.

`chronos` can now run on multiple nodes. Each node requires an own MySQL server/database and stores its own jobs. The host
running the web interface also manages the user database and an association between job and node. The web interface can
create, delete, update and fetch jobs and job logs from the particular node via a Thrift-based protocol defined in the
`protocol` folder.

### Prerequisites
In order to build chronos, you need development files of:
* curl (preferably with c-ares as resolver and libidn2 for IDN support)
* libev
* mysqlclient
* sqlite3
* thrift (compiler and libthrift)

To build, you need a C++14 compiler and cmake.

Expand Down
207 changes: 175 additions & 32 deletions chronos/App.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
#include <openssl/crypto.h>

#include "UpdateThread.h"
#include "NotificationThread.h"
#include "WorkerThread.h"
#include "NodeService.h"
#include "MasterService.h"
#include "Config.h"

namespace
Expand Down Expand Up @@ -133,7 +136,7 @@ void App::processJobs(time_t forTime, time_t plannedTime)
std::shared_ptr<WorkerThread> wt = std::make_shared<WorkerThread>(t->tm_mday, t->tm_mon+1, t->tm_year+1900, t->tm_hour, t->tm_min);

MYSQL_ROW row;
auto res = db->query("SELECT DISTINCT(`timezone`) FROM `user`");
auto res = db->query("SELECT DISTINCT(`timezone`) FROM `job` WHERE `enabled`=1");
while((row = res->fetchRow()) != nullptr)
{
std::string timeZone(row[0]);
Expand All @@ -150,14 +153,14 @@ void App::processJobs(time_t forTime, time_t plannedTime)
int wday = -1;
switch(cWDay)
{
case cctz::weekday::monday: wday = 1; break;
case cctz::weekday::monday: wday = 1; break;
case cctz::weekday::tuesday: wday = 2; break;
case cctz::weekday::wednesday: wday = 3; break;
case cctz::weekday::thursday: wday = 4; break;
case cctz::weekday::friday: wday = 5; break;
case cctz::weekday::friday: wday = 5; break;
case cctz::weekday::saturday: wday = 6; break;
case cctz::weekday::sunday: wday = 0; break;
default: wday = -1; break;
case cctz::weekday::sunday: wday = 0; break;
default: wday = -1; break;
}

processJobsForTimeZone(civilTime.hour(), civilTime.minute(), civilTime.month(), civilTime.day(), wday, civilTime.year(),
Expand Down Expand Up @@ -188,21 +191,20 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int
<< "timeZone = " << timeZone
<< std::endl;

auto res = db->query("SELECT TRIM(`url`),`job`.`jobid`,`auth_enable`,`auth_user`,`auth_pass`,`notify_failure`,`notify_success`,`notify_disable`,`fail_counter`,`save_responses`,`job`.`userid`,`request_method`,COUNT(`job_header`.`jobheaderid`),`job_body`.`body` FROM `job` "
auto res = db->query("SELECT TRIM(`url`),`job`.`jobid`,`auth_enable`,`auth_user`,`auth_pass`,`notify_failure`,`notify_success`,`notify_disable`,`fail_counter`,`save_responses`,`job`.`userid`,`request_method`,COUNT(`job_header`.`jobheaderid`),`job_body`.`body`,`title` FROM `job` "
"INNER JOIN `job_hours` ON `job_hours`.`jobid`=`job`.`jobid` "
"INNER JOIN `job_mdays` ON `job_mdays`.`jobid`=`job`.`jobid` "
"INNER JOIN `job_wdays` ON `job_wdays`.`jobid`=`job`.`jobid` "
"INNER JOIN `job_minutes` ON `job_minutes`.`jobid`=`job`.`jobid` "
"INNER JOIN `job_months` ON `job_months`.`jobid`=`job`.`jobid` "
"INNER JOIN `user` ON `job`.`userid`=`user`.`userid` "
"LEFT JOIN `job_header` ON `job_header`.`jobid`=`job`.`jobid` "
"LEFT JOIN `job_body` ON `job_body`.`jobid`=`job`.`jobid` "
"WHERE (`hour`=-1 OR `hour`=%d) "
"AND (`minute`=-1 OR `minute`=%d) "
"AND (`mday`=-1 OR `mday`=%d) "
"AND (`wday`=-1 OR `wday`=%d) "
"AND (`month`=-1 OR `month`=%d) "
"AND `user`.`timezone`='%q' "
"AND `job`.`timezone`='%q' "
"AND `enabled`=1 "
"GROUP BY `job`.`jobid` "
"ORDER BY `fail_counter` ASC, `last_duration` ASC",
Expand Down Expand Up @@ -247,6 +249,8 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int
req->requestBody = row[13];
}

req->result->title = row[14];

wt->addJob(req);
}
}
Expand All @@ -256,6 +260,16 @@ void App::processJobsForTimeZone(int hour, int minute, int month, int mday, int
std::cout << "App::processJobsForTimeZone(): Finished" << std::endl;
}

void App::cleanUpNotifications()
{
std::cout << "App::cleanUpNotifications()" << std::endl;

static constexpr const int TIME_ONE_DAY = 86400;

db->query("DELETE FROM `notification` WHERE `date` < UNIX_TIMESTAMP()-%d",
TIME_ONE_DAY);
}

void App::signalHandler(int sig)
{
if(sig == SIGINT)
Expand All @@ -270,43 +284,77 @@ int App::run()
MySQL_DB::libInit();
initSSLLocks();

db = createMySQLConnection();
startUpdateThread();

signal(SIGINT, App::signalHandler);

bool firstLoop = true;
struct tm lastTime = { 0 };
int jitterCorrectionOffset = calcJitterCorrectionOffset();
while(!stop)
if(config->getInt("master_service_enable"))
{
time_t currentTime = time(nullptr) + jitterCorrectionOffset;
struct tm *t = localtime(&currentTime);

if(t->tm_min > lastTime.tm_min
|| t->tm_hour > lastTime.tm_hour
|| t->tm_mday > lastTime.tm_mday
|| t->tm_mon > lastTime.tm_mon
|| t->tm_year > lastTime.tm_year)
startMasterServiceThread();
}

if(config->getInt("node_service_enable"))
{
startNodeServiceThread();
}

if(config->getInt("job_executor_enable"))
{
db = createMySQLConnection();
startNotificationThread();
startUpdateThread();

bool firstLoop = true;
struct tm lastTime = { 0 };
int jitterCorrectionOffset = calcJitterCorrectionOffset();
while(!stop)
{
// update last time
memcpy(&lastTime, t, sizeof(struct tm));
time_t currentTime = time(nullptr) + jitterCorrectionOffset;
struct tm *t = localtime(&currentTime);

if(t->tm_min > lastTime.tm_min
|| t->tm_hour > lastTime.tm_hour
|| t->tm_mday > lastTime.tm_mday
|| t->tm_mon > lastTime.tm_mon
|| t->tm_year > lastTime.tm_year)
{
// update last time
memcpy(&lastTime, t, sizeof(struct tm));

if(!firstLoop || t->tm_sec == 59 - jitterCorrectionOffset)
{
processJobs(currentTime, currentTime - t->tm_sec);
jitterCorrectionOffset = calcJitterCorrectionOffset();

if(!firstLoop || t->tm_sec == 59 - jitterCorrectionOffset)
cleanUpNotifications();
}

firstLoop = false;
}
else
{
processJobs(currentTime, currentTime - t->tm_sec);
jitterCorrectionOffset = calcJitterCorrectionOffset();
usleep(100*1000);
}

firstLoop = false;
}
else

stopUpdateThread();
stopNotificationThread();
}
else
{
while(!stop)
{
usleep(100*1000);
}
}

this->stopUpdateThread();
if(config->getInt("node_service_enable"))
{
stopNodeServiceThread();
}

if(config->getInt("master_service_enable"))
{
stopMasterServiceThread();
}

uninitSSLLocks();
MySQL_DB::libCleanup();
Expand Down Expand Up @@ -335,6 +383,59 @@ void App::updateThreadMain()
}
}

void App::notificationThreadMain()
{
try
{
notificationThreadObj = std::make_unique<NotificationThread>();
notificationThreadObj->run();
notificationThreadObj.reset();
}
catch(const std::runtime_error &ex)
{
std::cout << "Notification thread runtime error: " << ex.what() << std::endl;
stop = true;
}
}

void App::nodeServiceThreadMain()
{
std::cout << "App::nodeServiceThreadMain(): Entered" << std::endl;

try
{
nodeServiceObj = std::make_unique<NodeService>(config->get("node_service_interface"), config->getInt("node_service_port"));
nodeServiceObj->run();
nodeServiceObj.reset();
}
catch(const std::runtime_error &ex)
{
std::cout << "Node service thread runtime error: " << ex.what() << std::endl;
stop = true;
}

std::cout << "App::nodeServiceThreadMain(): Finished" << std::endl;
}

void App::masterServiceThreadMain()
{
std::cout << "App::masterServiceThreadMain(): Entered" << std::endl;

try
{
masterServiceObj = std::make_unique<MasterService>(config->get("master_service_interface"), config->getInt("master_service_port"));
masterServiceObj->run();
masterServiceObj.reset();
}
catch(const std::runtime_error &ex)
{
std::cout << "Master service thread runtime error: " << ex.what() << std::endl;
stop = true;
}

std::cout << "App::masterServiceThreadMain(): Finished" << std::endl;
}

void App::startUpdateThread()
{
updateThread = std::thread(std::bind(&App::updateThreadMain, this));
Expand All @@ -346,6 +447,39 @@ void App::stopUpdateThread()
updateThread.join();
}

void App::startNotificationThread()
{
notificationThread = std::thread(std::bind(&App::notificationThreadMain, this));
}

void App::stopNotificationThread()
{
notificationThreadObj->stopThread();
notificationThread.join();
}

void App::startNodeServiceThread()
{
nodeServiceThread = std::thread(std::bind(&App::nodeServiceThreadMain, this));
}

void App::stopNodeServiceThread()
{
nodeServiceObj->stop();
nodeServiceThread.join();
}

void App::startMasterServiceThread()
{
masterServiceThread = std::thread(std::bind(&App::masterServiceThreadMain, this));
}

void App::stopMasterServiceThread()
{
masterServiceObj->stop();
masterServiceThread.join();
}

std::unique_ptr<MySQL_DB> App::createMySQLConnection()
{
return(std::make_unique<MySQL_DB>(config->get("mysql_host"),
Expand All @@ -354,3 +488,12 @@ std::unique_ptr<MySQL_DB> App::createMySQLConnection()
config->get("mysql_db"),
config->get("mysql_sock")));
}

std::unique_ptr<MySQL_DB> App::createMasterMySQLConnection()
{
return(std::make_unique<MySQL_DB>(config->get("master_mysql_host"),
config->get("master_mysql_user"),
config->get("master_mysql_pass"),
config->get("master_mysql_db"),
config->get("master_mysql_sock")));
}
Loading

0 comments on commit 7e251d2

Please sign in to comment.