diff --git a/conf/system.ini b/conf/system.ini index cc8d83c..2494a48 100644 --- a/conf/system.ini +++ b/conf/system.ini @@ -9,7 +9,7 @@ logsDir=logs/ ; 日志分卷大小 logRollSize=52428800 ; 1: LOG_TRACE 2: LOG_DEBUG 3: LOG_INFO -logLevel=1 +logLevel=3 isAsync=no [adserver] @@ -28,7 +28,7 @@ serverName=mc-server [http] host=0.0.0.0 -port=20010 +port=10053 timeout=3 threadNum=4 serverName=adinf-adserver @@ -45,14 +45,11 @@ threadNum=4 serverName=head-server [kafkac_out] -;是否是 v0.9.x 新协议获取 -isNewConsumerOut=yes ; 支持同时消费多个 topic, 多个用逗号分隔 -topicNameOut=test +topicNameOut=fa_status_add groupIdOut=test_group_id -brokerListOut=10.13.4.161:9192,10.13.4.160:9192 +brokerListOut=10.77.96.136:9192,10.77.96.137:9192 kafkaDebugOut=none -offsetPathOut=./ statIntervalOut=60000 [timer] diff --git a/src/AdbaseConfig.hpp b/src/AdbaseConfig.hpp index a566f14..dbcbe2a 100644 --- a/src/AdbaseConfig.hpp +++ b/src/AdbaseConfig.hpp @@ -14,9 +14,7 @@ std::string groupId##name;\ std::string brokerListConsumer##name;\ std::string kafkaDebug##name;\ - std::string offsetPath##name;\ - std::string statInterval##name;\ - bool isNewConsumer##name; + std::string statInterval##name; #endif #ifndef DECLARE_KAFKA_PRODUCER_CONFIG #define DECLARE_KAFKA_PRODUCER_CONFIG(name) \ @@ -85,6 +83,7 @@ class App; namespace app { class Message; class Storage; + class Metrics; } typedef struct adserverContext { AdbaseConfig* config; @@ -93,6 +92,7 @@ typedef struct adserverContext { adbase::metrics::Metrics* metrics; // 前后端交互数据指针添加到下面 app::Storage* storage; + app::Metrics* appMetrics; } AdServerContext; typedef struct aimsContext { @@ -101,6 +101,7 @@ typedef struct aimsContext { // 消息队列交互上下文 app::Message* message; app::Storage* storage; + app::Metrics* appMetrics; } AimsContext; typedef struct timerContext { AdbaseConfig* config; @@ -108,6 +109,7 @@ typedef struct timerContext { App* app; // 定时器交互上下文 app::Storage* storage; + app::Metrics* appMetrics; } TimerContext; #endif diff --git a/src/Aims.cpp b/src/Aims.cpp index c6be3fc..2d7eb67 100644 --- a/src/Aims.cpp +++ b/src/Aims.cpp @@ -1,37 +1,5 @@ #include "Aims.hpp" -// {{{ macros - -#define STOP_KAFKA_CONSUMER(name) do {\ -} while(0) - -#define INIT_KAFKA_PRODUCER(name) do {\ - _kafkaProducerCallback##name = new aims::kafka::Producer##name(_context);\ - _kafkaProducer##name = new adbase::kafka::Producer(_configure->brokerListProducer##name,\ - _configure->queueLength##name, _configure->debug##name);\ - _kafkaProducer##name->setSendHandler(std::bind(&aims::kafka::Producer##name::send,\ - _kafkaProducerCallback##name,\ - std::placeholders::_1, std::placeholders::_2,\ - std::placeholders::_3, std::placeholders::_4));\ - _kafkaProducer##name->setAckHandler(std::bind(&aims::kafka::Producer##name::ackCallback, \ - _kafkaProducerCallback##name, std::placeholders::_1));\ - _kafkaProducer##name->setErrorHandler(std::bind(&aims::kafka::Producer##name::errorCallback, \ - _kafkaProducerCallback##name, std::placeholders::_1));\ -} while(0) -#define START_KAFKA_PRODUCER(name) do {\ - _kafkaProducer##name->start();\ -} while(0) -#define STOP_KAFKA_PRODUCER(name) do {\ - if (_kafkaProducer##name != nullptr) {\ - _kafkaProducer##name->stop();\ - }\ - if (_kafkaProducerCallback##name != nullptr) {\ - delete _kafkaProducerCallback##name;\ - _kafkaProducerCallback##name = nullptr;\ - }\ -} while(0) - -// }}} // {{{ Aims::Aims() Aims::Aims(AimsContext* context) : @@ -52,10 +20,15 @@ Aims::~Aims() { void Aims::run() { // 初始化 server init(); + _kafka->start(); +} - for(auto &t : _kafkas) { - t.second->start(); - } +// }}} +// {{{ void Aims::reload() + +void Aims::reload() { + std::vector topicNames = adbase::explode(_configure->topicNameConsumerOut, ',', true); + _kafka->setTopics(topicNames); } // }}} @@ -69,11 +42,10 @@ void Aims::init() { // {{{ void Aims::stop() void Aims::stop() { - for (auto &t : _kafkas) { - if (t.second != nullptr) { - t.second->stop(); - } - } + if (_kafka != nullptr) { + _kafka->stop(); + delete _kafka; + } if (_kafkaConsumerCallbackOut != nullptr) { delete _kafkaConsumerCallbackOut; _kafkaConsumerCallbackOut = nullptr; @@ -86,25 +58,19 @@ void Aims::stop() { void Aims::initKafkaConsumer() { _kafkaConsumerCallbackOut = new aims::kafka::ConsumerOut(_context); std::vector topicNames = adbase::explode(_configure->topicNameConsumerOut, ',', true); - for(auto &t : topicNames) { - adbase::kafka::Consumer* consumer = new adbase::kafka::Consumer(t, _configure->groupIdOut, - _configure->brokerListConsumerOut); - consumer->setMessageHandler(std::bind(&aims::kafka::ConsumerOut::pull, - _kafkaConsumerCallbackOut, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4)); - consumer->setStatCallback(std::bind(&aims::kafka::ConsumerOut::stat, - _kafkaConsumerCallbackOut, - std::placeholders::_1, std::placeholders::_2)); - consumer->setKafkaDebug(_configure->kafkaDebugOut); - consumer->setOffsetStorePath(_configure->offsetPathOut); - consumer->setKafkaStatInterval(_configure->statIntervalOut); - if (_configure->isNewConsumerOut) { - consumer->setIsNewConsumer(true); - consumer->setOffsetStoreMethod("broker"); - } - _kafkas[t] = consumer; - } + LOG_INFO << "Topic list:" << _configure->topicNameConsumerOut; + + _kafka = new adbase::kafka::ConsumerBatch(topicNames, _configure->groupIdOut, + _configure->brokerListConsumerOut); + _kafka->setMessageHandler(std::bind(&aims::kafka::ConsumerOut::pull, + _kafkaConsumerCallbackOut, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4)); + _kafka->setStatCallback(std::bind(&aims::kafka::ConsumerOut::stat, + _kafkaConsumerCallbackOut, + std::placeholders::_1, std::placeholders::_2)); + _kafka->setKafkaDebug(_configure->kafkaDebugOut); + _kafka->setKafkaStatInterval(_configure->statIntervalOut); } // }}} diff --git a/src/Aims.hpp b/src/Aims.hpp index 30f5870..9846614 100644 --- a/src/Aims.hpp +++ b/src/Aims.hpp @@ -12,6 +12,7 @@ class Aims { Aims(AimsContext* context); ~Aims(); void run(); + void reload(); private: /// 传输上下文指针 @@ -22,7 +23,7 @@ class Aims { void stop(); void initKafkaConsumer(); aims::kafka::ConsumerOut* _kafkaConsumerCallbackOut = nullptr; - std::unordered_map _kafkas; + adbase::kafka::ConsumerBatch* _kafka; }; diff --git a/src/Aims/Kafka/ConsumerOut.cpp b/src/Aims/Kafka/ConsumerOut.cpp index 71f2db0..a2c57eb 100644 --- a/src/Aims/Kafka/ConsumerOut.cpp +++ b/src/Aims/Kafka/ConsumerOut.cpp @@ -33,9 +33,8 @@ bool ConsumerOut::pull(const std::string& topicName, int partId, uint64_t offset // }}} // {{{ void ConsumerOut::stat() -void ConsumerOut::stat(adbase::kafka::Consumer* consumer, const std::string& stats) { +void ConsumerOut::stat(adbase::kafka::ConsumerBatch*, const std::string& stats) { LOG_INFO << "Stats:" << stats.substr(0, 1024); - LOG_INFO << consumer->getTopicName(); } // }}} diff --git a/src/Aims/Kafka/ConsumerOut.hpp b/src/Aims/Kafka/ConsumerOut.hpp index 3fc011d..c9c673a 100644 --- a/src/Aims/Kafka/ConsumerOut.hpp +++ b/src/Aims/Kafka/ConsumerOut.hpp @@ -12,11 +12,11 @@ class ConsumerOut { ConsumerOut(AimsContext* context); ~ConsumerOut(); bool pull(const std::string& topicName, int partId, uint64_t offset, const adbase::Buffer& data); - void stat(adbase::kafka::Consumer* consumer, const std::string& stats); + void stat(adbase::kafka::ConsumerBatch* consumer, const std::string& stats); private: AimsContext* _context; }; } } -#endif \ No newline at end of file +#endif diff --git a/src/App.cpp b/src/App.cpp index 11385f0..821f63f 100644 --- a/src/App.cpp +++ b/src/App.cpp @@ -5,27 +5,18 @@ //{{{ macros -#define LOAD_KAFKA_CONSUMER_CONFIG(name, sectionName) do {\ - _configure->isNewConsumer##name = config.getOptionBool("kafkac_"#sectionName, "isNewConsumer"#name);\ - _configure->topicNameConsumer##name = config.getOption("kafkac_"#sectionName, "topicName"#name);\ - _configure->groupId##name = config.getOption("kafkac_"#sectionName, "groupId"#name);\ - _configure->brokerListConsumer##name = config.getOption("kafkac_"#sectionName, "brokerList"#name);\ - _configure->kafkaDebug##name = config.getOption("kafkac_"#sectionName, "kafkaDebug"#name);\ - _configure->offsetPath##name = config.getOption("kafkac_"#sectionName, "offsetPath"#name);\ - _configure->statInterval##name = config.getOption("kafkac_"#sectionName, "statInterval"#name);\ -} while(0) - -#define LOAD_KAFKA_PRODUCER_CONFIG(name, sectionName) do {\ - _configure->topicNameProducer##name = config.getOption("kafkap_"#sectionName, "topicName"#name);\ - _configure->brokerListProducer##name = config.getOption("kafkap_"#sectionName, "brokerList"#name);\ - _configure->debug##name = config.getOption("kafkap_"#sectionName, "debug"#name);\ - _configure->queueLength##name = config.getOptionUint32("kafkap_"#sectionName, "queueLength"#name);\ -} while(0) - #define LOAD_TIMER_CONFIG(name) do {\ _configure->interval##name = config.getOptionUint32("timer", "interval"#name);\ } while(0) +#define LOAD_KAFKA_CONSUMER_CONFIG(name, sectionName) do {\ + _configure->topicNameConsumer##name = config.getOption("kafkac_"#sectionName, "topicName"#name);\ + _configure->groupId##name = config.getOption("kafkac_"#sectionName, "groupId"#name);\ + _configure->brokerListConsumer##name = config.getOption("kafkac_"#sectionName, "brokerList"#name);\ + _configure->kafkaDebug##name = config.getOption("kafkac_"#sectionName, "kafkaDebug"#name);\ + _configure->statInterval##name = config.getOption("kafkac_"#sectionName, "statInterval"#name);\ +} while(0) + //}}} // {{{ App::App() @@ -46,7 +37,9 @@ void App::run() { _storage = std::shared_ptr(new app::Storage(_configure)); _storage->init(); - _message = std::shared_ptr(new app::Message(_configure, _storage.get())); + _metrics = std::shared_ptr(new app::Metrics()); + + _message = std::shared_ptr(new app::Message(_configure, _storage.get(), _metrics.get())); _message->start(); } @@ -71,6 +64,7 @@ void App::stop() { void App::setAdServerContext(AdServerContext* context) { context->app = this; context->storage = _storage.get(); + context->appMetrics = _metrics.get(); } // }}} @@ -80,6 +74,7 @@ void App::setAimsContext(AimsContext* context) { context->app = this; context->message = _message.get(); context->storage = _storage.get(); + context->appMetrics = _metrics.get(); } // }}} @@ -88,14 +83,13 @@ void App::setAimsContext(AimsContext* context) { void App::setTimerContext(TimerContext* context) { context->app = this; context->storage = _storage.get(); + context->appMetrics = _metrics.get(); } // }}} //{{{ void App::loadConfig() void App::loadConfig(adbase::IniConfig& config) { - LOAD_KAFKA_CONSUMER_CONFIG(Out, out); - _configure->luaDebug = config.getOptionBool("lua", "debug"); _configure->luaScriptPath = config.getOption("lua", "scriptPath"); _configure->consumerScriptName = config.getOption("consumer", "scriptName"); @@ -105,6 +99,7 @@ void App::loadConfig(adbase::IniConfig& config) { _configure->messageSwp = config.getOption("consumer", "messageSwp"); _configure->httpScriptName = config.getOption("http", "scriptName"); + LOAD_KAFKA_CONSUMER_CONFIG(Out, out); LOAD_TIMER_CONFIG(ClearStorage); } diff --git a/src/App.hpp b/src/App.hpp index a35ab77..a4881de 100644 --- a/src/App.hpp +++ b/src/App.hpp @@ -6,6 +6,7 @@ #include "AdbaseConfig.hpp" #include "App/Message.hpp" #include "App/Storage.hpp" +#include "App/Metrics.hpp" class App { public: @@ -24,6 +25,7 @@ class App { AdbaseConfig* _configure; std::shared_ptr _storage; std::shared_ptr _message; + std::shared_ptr _metrics; mutable std::mutex _mut; void bindLuaMessage(); }; diff --git a/src/App/Message.cpp b/src/App/Message.cpp index a902b32..7eadd5b 100644 --- a/src/App/Message.cpp +++ b/src/App/Message.cpp @@ -1,5 +1,6 @@ #include "Message.hpp" #include "Storage.hpp" +#include "Metrics.hpp" #include #include @@ -9,7 +10,9 @@ thread_local std::unordered_map messageLuaMessages; // {{{ Message::Message() -Message::Message(AdbaseConfig* configure, Storage* storage): _configure(configure), _storage(storage) { +Message::Message(AdbaseConfig* configure, Storage* storage, Metrics* metrics): _configure(configure), + _storage(storage), + _metrics(metrics) { } // }}} @@ -120,6 +123,7 @@ void Message::initLua() { clazz.addMethod("rollback", rollbackFn); _storage->bindClass(messageLuaEngine.get()); + _metrics->bindClass(messageLuaEngine.get()); } // }}} diff --git a/src/App/Message.hpp b/src/App/Message.hpp index 1a56200..fee4632 100644 --- a/src/App/Message.hpp +++ b/src/App/Message.hpp @@ -14,6 +14,7 @@ namespace app { class Storage; +class Metrics; // message queue typedef struct MessageItem { int partId; @@ -29,7 +30,7 @@ typedef std::list> MessageToLua; class Message { public: - Message(AdbaseConfig* configure, Storage* storage); + Message(AdbaseConfig* configure, Storage* storage, Metrics* metrics); ~Message(); void start(); void stop(); @@ -47,6 +48,7 @@ class Message { AdbaseConfig *_configure; Storage* _storage; + Metrics* _metrics; std::unordered_map _queues; void callMessage(); diff --git a/src/App/Metrics.cpp b/src/App/Metrics.cpp new file mode 100644 index 0000000..3c2d7ea --- /dev/null +++ b/src/App/Metrics.cpp @@ -0,0 +1,142 @@ +#include "Metrics.hpp" + +namespace app { +thread_local std::unordered_map> metricsCounters; +thread_local std::unordered_map> metricsMeters; +thread_local std::unordered_map> metricsHistograms; +thread_local std::unordered_map> metricsTimers; +thread_local std::shared_ptr metricsTimer; +// {{{ Metrics::Metrics() + +Metrics::Metrics() { +} + +// }}} +// {{{ Metrics::~Metrics() + +Metrics::~Metrics() { +} + +// }}} +// {{{ void Metrics::bindClass() + +void Metrics::bindClass(adbase::lua::Engine* engine) { + // metrics + adbase::lua::BindingClass metrics("metrics", "aidp", engine->getLuaState()); + adbase::lua::BindingNamespace metricsCs = metrics.getOwnerNamespace(); + typedef std::function()> GetMetrics; + GetMetrics metricsFn = std::bind(&Metrics::getMetrics, this); + metricsCs.addMethod("metrics", metricsFn); + metrics.addMethod("counter", &Metrics::buildCounter); + metrics.addMethod("meters", &Metrics::buildMeters); + metrics.addMethod("histograms", &Metrics::buildHistograms); + metrics.addMethod("timers", &Metrics::buildTimers); + metrics.addMethod("timer", &Metrics::getTimer); + + // counter + adbase::lua::BindingClass counter("counter", "aidp", engine->getLuaState()); + counter.addMethod("module_name", &adbase::metrics::Counter::getModuleName); + counter.addMethod("metric_name", &adbase::metrics::Counter::getMetricName); + counter.addMethod("add", &adbase::metrics::Counter::add); + counter.addMethod("dec", &adbase::metrics::Counter::dec); + + // meters + adbase::lua::BindingClass meters("meters", "aidp", engine->getLuaState()); + meters.addMethod("module_name", &adbase::metrics::Meters::getModuleName); + meters.addMethod("metric_name", &adbase::metrics::Meters::getMetricName); + meters.addMethod("mark", &adbase::metrics::Meters::mark); + meters.addMethod("get_count", &adbase::metrics::Meters::getCounter); + + LOG_INFO << "TOP:" << lua_gettop(engine->getLuaState()); + + // histograms + adbase::lua::BindingClass histograms("histograms", "aidp", engine->getLuaState()); + histograms.addMethod("module_name", &adbase::metrics::Histograms::getModuleName); + histograms.addMethod("metric_name", &adbase::metrics::Histograms::getMetricName); + histograms.addMethod("update", &adbase::metrics::Histograms::update); + + // timers + adbase::lua::BindingClass timers("timers", "aidp", engine->getLuaState()); + timers.addMethod("module_name", &adbase::metrics::Timers::getModuleName); + timers.addMethod("metric_name", &adbase::metrics::Timers::getMetricName); + timers.addMethod("set_timer", &adbase::metrics::Timers::setTimer); + + // timer + adbase::lua::BindingClass timer("timer", "aidp", engine->getLuaState()); + timer.addMethod("start", &adbase::metrics::Timer::start); + timer.addMethod("stop", &adbase::metrics::Timer::stop); +} + +// }}} +// {{{ std::weak_ptr Metrics::getMetrics() + +std::weak_ptr Metrics::getMetrics() { + return shared_from_this(); +} + +// }}} +// {{{ std::weak_ptr Metrics::getTimer() + +std::weak_ptr Metrics::getTimer() { + if (metricsTimer == false) { + metricsTimer = std::shared_ptr(new adbase::metrics::Timer()); + } + return metricsTimer; +} + +// }}} +// {{{ std::weak_ptr Metrics::buildCounter() + +std::weak_ptr Metrics::buildCounter(const std::string& moduleName, const std::string& metricName) { + adbase::metrics::Counter* counter = adbase::metrics::Metrics::buildCounter(moduleName, metricName); + auto sharedCounter = std::shared_ptr(counter, [](adbase::metrics::Counter*) { LOG_INFO << "delete Counter";}); + std::string key = getKey(moduleName, metricName); + metricsCounters[key] = sharedCounter; + return metricsCounters[key]; +} + +// }}} +// {{{ std::weak_ptr Metrics::buildMeters() + +std::weak_ptr Metrics::buildMeters(const std::string& moduleName, const std::string& metricName) { + adbase::metrics::Meters* meters = adbase::metrics::Metrics::buildMeters(moduleName, metricName); + auto sharedMeters = std::shared_ptr(meters, [](adbase::metrics::Meters*) { LOG_INFO << "delete meters";}); + std::string key = getKey(moduleName, metricName); + metricsMeters[key] = sharedMeters; + return metricsMeters[key]; +} + +// }}} +// {{{ std::weak_ptr Metrics::buildHistograms() + +std::weak_ptr Metrics::buildHistograms(const std::string& moduleName, const std::string& metricName, uint32_t interval) { + adbase::metrics::Histograms* histograms = adbase::metrics::Metrics::buildHistograms(moduleName, metricName, interval); + auto sharedHistograms = std::shared_ptr(histograms, [](adbase::metrics::Histograms*) { LOG_INFO << "delete histograms";}); + std::string key = getKey(moduleName, metricName); + metricsHistograms[key] = sharedHistograms; + return metricsHistograms[key]; +} + +// }}} +// {{{ std::weak_ptr Metrics::buildHistograms() + +std::weak_ptr Metrics::buildTimers(const std::string& moduleName, const std::string& metricName, uint32_t interval) { + adbase::metrics::Timers* timers = adbase::metrics::Metrics::buildTimers(moduleName, metricName, interval); + auto sharedTimers = std::shared_ptr(timers, [](adbase::metrics::Timers*) { LOG_INFO << "delete timers";}); + std::string key = getKey(moduleName, metricName); + metricsTimers[key] = sharedTimers; + return metricsTimers[key]; +} + +// }}} +// {{{ const std::string Metrics::getKey() + +const std::string Metrics::getKey(const std::string& moduleName, const std::string& metricName) { + std::string result = moduleName; + result.append(1, 26); + result.append(metricName); + return result; +} + +// }}} +} diff --git a/src/App/Metrics.hpp b/src/App/Metrics.hpp new file mode 100644 index 0000000..c84f037 --- /dev/null +++ b/src/App/Metrics.hpp @@ -0,0 +1,27 @@ +#ifndef AIDP_APP_METRICS_HPP_ +#define AIDP_APP_METRICS_HPP_ + +#include +#include +#include +#include +#include +#include "AdbaseConfig.hpp" + +namespace app { +class Metrics : public std::enable_shared_from_this { +public: + Metrics(); + ~Metrics(); + void bindClass(adbase::lua::Engine* engine); + std::weak_ptr getMetrics(); + std::weak_ptr buildCounter(const std::string& moduleName, const std::string& metricName); + std::weak_ptr buildMeters(const std::string& moduleName, const std::string& metricName); + std::weak_ptr buildHistograms(const std::string& moduleName, const std::string& metricName, uint32_t interval); + std::weak_ptr buildTimers(const std::string& moduleName, const std::string& metricName, uint32_t interval); + std::weak_ptr getTimer(); +private: + const std::string getKey(const std::string& moduleName, const std::string& metricName); +}; +} +#endif diff --git a/src/BootStrap.cpp b/src/BootStrap.cpp index 4a1735b..d7dd204 100644 --- a/src/BootStrap.cpp +++ b/src/BootStrap.cpp @@ -94,6 +94,9 @@ void BootStrap::reload() { } setLoggerLevel(); _app->reload(); + if (_aims != nullptr) { + _aims->reload(); + } } // }}} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 001bd70..e10d3ef 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,18 +34,20 @@ SET(AIDP_SRC aidp.cpp Timer.cpp App/Message.cpp App/Storage.cpp + App/Metrics.cpp ) ADD_EXECUTABLE(aidp ${AIDP_SRC}) TARGET_LINK_LIBRARIES(aidp libadbase.a libadbase_kafka.a libadbase_lua.a + librdkafka++.a librdkafka.a libevent.a liblua.a pthread rt - dl - ssl - sasl2 + lz4 + z + dl ) INSTALL(TARGETS aidp RUNTIME DESTINATION adinf/aidp/bin) diff --git a/src/Http/Index.cpp b/src/Http/Index.cpp index 64fe04f..e9045bd 100644 --- a/src/Http/Index.cpp +++ b/src/Http/Index.cpp @@ -1,5 +1,6 @@ #include "Index.hpp" #include "App/Storage.hpp" +#include "App/Metrics.hpp" #include namespace adserver { @@ -93,12 +94,15 @@ void Index::bindLuaClass(adbase::lua::Engine* engine) { response.addMethod("set_header", &adbase::http::Response::setHeader); response.addMethod("add_header", &adbase::http::Response::addHeader); - response.addMethod("set_content", &adbase::http::Response::setContent); - response.addMethod("append_content", &adbase::http::Response::appendContent); + response.addMethod("set_content", &adbase::http::Response::setContent); + response.addMethod("set_content", &adbase::http::Response::setContent); + response.addMethod("append_content", &adbase::http::Response::appendContent); + response.addMethod("append_content", &adbase::http::Response::appendContent); response.addMethod("send_reply", &adbase::http::Response::sendReply); response.addMethod("get_code", &adbase::http::Response::getCode); response.addMethod("set_body_size", &adbase::http::Response::getBodySize); _context->storage->bindClass(engine); + _context->appMetrics->bindClass(engine); } // }}} diff --git a/src/Http/Server.cpp b/src/Http/Server.cpp index 3f75120..c77910c 100644 --- a/src/Http/Server.cpp +++ b/src/Http/Server.cpp @@ -1,6 +1,5 @@ #include "Server.hpp" #include "Version.hpp" -#include namespace adserver { namespace http { @@ -15,6 +14,7 @@ Server::Server(AdServerContext* context) : void Server::registerLocation(adbase::http::Server* http) { ADSERVER_HTTP_ADD_API(http, Server, status) + ADSERVER_HTTP_ADD_API(http, Server, metrics) } // }}} @@ -149,5 +149,204 @@ void Server::status(adbase::http::Request* request, adbase::http::Response* resp } // }}} +// {{{ void Server::metrics() + +void Server::metrics(adbase::http::Request* request, adbase::http::Response* response, void*) { + (void)request; + std::string result; + std::string tagValue = request->getQuery("tags"); + std::unordered_map tags; + if (!tagValue.empty()) { + std::vector tagStrs = adbase::explode(tagValue, '|', true); + for (auto &t: tagStrs) { + std::vector tagKV = adbase::explode(t, ':', true); + if (tagKV.size() == 2) { + tags[tagKV[0]] = tagKV[1]; + } + } + } + + // 如下 json 手动拼接为了减少对 json 库的依赖,在实际项目中推荐用 rapidjson + std::unordered_map procs = adbase::procStats(); + tags["name"] = adbase::trim(procs["name"], "()"); + + + int count = 0; + std::string serverAddress = adbase::replace(":", "_", request->getServerAddress(), count); + serverAddress = adbase::replace(".", "_", serverAddress, count); + tags["service"] = serverAddress; + + for (auto &t : procs) { + std::string key = adbase::replace(".", "_", t.first, count); + tags["metric_type"] = "gauges"; + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, toUint64(t.second), tags); + } + + // Metrics + std::unordered_map gauges; + std::unordered_map counters; + std::unordered_map meters; + std::unordered_map histograms; + std::unordered_map timers; + if (_context->metrics != nullptr) { + gauges = _context->metrics->getGauges(); + counters = _context->metrics->getCounter(); + histograms = _context->metrics->getHistograms(); + meters = _context->metrics->getMeters(); + timers = _context->metrics->getTimers(); + } + + std::unordered_map modulesItems; + for (auto &t : gauges) { + adbase::metrics::MetricName name = adbase::metrics::Metrics::getMetricName(t.first); + if (name.moduleName == "self") { + continue; + } + + std::string key = name.moduleName + "_" + name.metricName; + key = adbase::replace(".", "_", key, count); + tags["metric_type"] = "gauges"; + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, t.second, tags); + } + for (auto &t : counters) { + adbase::metrics::MetricName name = adbase::metrics::Metrics::getMetricName(t.first); + if (name.moduleName == "self") { + continue; + } + std::string key = name.moduleName + "_" + name.metricName; + key = adbase::replace(".", "_", key, count); + tags["metric_type"] = "counters"; + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, t.second, tags); + } + + tags["metric_type"] = "meters"; + for (auto &t : meters) { + adbase::metrics::MetricName name = adbase::metrics::Metrics::getMetricName(t.first); + if (name.moduleName == "self") { + continue; + } + std::string key = name.moduleName + "_" + name.metricName; + key = adbase::replace(".", "_", key, count); + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, t.second.count, tags); + result += formatMetric(key + "_mean_rate", toUint64(t.second.meanRate), tags); + result += formatMetric(key + "_min1_rate", toUint64(t.second.min1Rate), tags); + result += formatMetric(key + "_min5_rate", toUint64(t.second.min5Rate), tags); + result += formatMetric(key + "_min15_rate", toUint64(t.second.min15Rate), tags); + } + + tags["metric_type"] = "histograms"; + for (auto &t : histograms) { + adbase::metrics::MetricName name = adbase::metrics::Metrics::getMetricName(t.first); + if (name.moduleName == "self") { + continue; + } + + std::string key = name.moduleName + "_" + name.metricName; + key = adbase::replace(".", "_", key, count); + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, 1, tags); + result += formatMetric(key + "_min", toUint64(t.second.min), tags); + result += formatMetric(key + "_max", toUint64(t.second.max), tags); + result += formatMetric(key + "_mean", toUint64(t.second.mean), tags); + result += formatMetric(key + "_stddev", toUint64(t.second.stddev), tags); + result += formatMetric(key + "_median", toUint64(t.second.median), tags); + result += formatMetric(key + "_percent75", toUint64(t.second.percent75), tags); + result += formatMetric(key + "_percent95", toUint64(t.second.percent95), tags); + result += formatMetric(key + "_percent98", toUint64(t.second.percent98), tags); + result += formatMetric(key + "_percent99", toUint64(t.second.percent99), tags); + result += formatMetric(key + "_percent999", toUint64(t.second.percent999), tags); + } + + tags["metric_type"] = "timers"; + for (auto &t : timers) { + adbase::metrics::MetricName name = adbase::metrics::Metrics::getMetricName(t.first); + + std::string key = name.moduleName + "_" + name.metricName; + key = adbase::replace(".", "_", key, count); + tags["metric_meta"] = key; + result += formatMetric(tags["metric_type"] + "_" + serverAddress, 1, tags); + tags["metric_meta"] = "adbase_value"; + result += formatMetric(key, t.second.meter.count, tags); + result += formatMetric(key + "_mean_rate", toUint64(t.second.meter.meanRate), tags); + result += formatMetric(key + "_min1_rate", toUint64(t.second.meter.min1Rate), tags); + result += formatMetric(key + "_min5_rate", toUint64(t.second.meter.min5Rate), tags); + result += formatMetric(key + "_min15_rate", toUint64(t.second.meter.min15Rate), tags); + result += formatMetric(key + "_min", toUint64(t.second.histogram.min), tags); + result += formatMetric(key + "_max", toUint64(t.second.histogram.max), tags); + result += formatMetric(key + "_mean", toUint64(t.second.histogram.mean), tags); + result += formatMetric(key + "_stddev", toUint64(t.second.histogram.stddev), tags); + result += formatMetric(key + "_median", toUint64(t.second.histogram.median), tags); + result += formatMetric(key + "_percent75", toUint64(t.second.histogram.percent75), tags); + result += formatMetric(key + "_percent95", toUint64(t.second.histogram.percent95), tags); + result += formatMetric(key + "_percent98", toUint64(t.second.histogram.percent98), tags); + result += formatMetric(key + "_percent99", toUint64(t.second.histogram.percent99), tags); + result += formatMetric(key + "_percent999", toUint64(t.second.histogram.percent999), tags); + } + + responseHeader(response); + response->setContent(result); + response->sendReply(); +} + +// }}} +// {{{ const std::string Server::formatMetric() + +const std::string Server::formatMetric(std::string key, uint64_t value, std::unordered_map tags) { + std::string result = "adbase_"; + + int count = 0; + std::string keyFormat = adbase::replace(".", "_", key, count); + result += keyFormat; + if (!tags.empty()) { + result += "{"; + for (auto &t : tags) { + result += t.first; + result += "=\""; + result += t.second; + result += "\","; + } + result = adbase::rightTrim(result, ","); + result += "} "; + } + + result += " "; + result += std::to_string(value) + "\n"; + return result; +} + +// }}} +// {{{ uint64_t Server::toUint64() + +uint64_t Server::toUint64(std::string value) { + errno = 0; + uint64_t result = static_cast(strtoull(value.c_str(), nullptr, 10)); + if (errno != 0) { + return 0; + } + return result; +} + +// }}} +// {{{ uint64_t Server::toUint64() + +uint64_t Server::toUint64(double value) { + return static_cast(value * 10000); +} + +// }}} +} } -} \ No newline at end of file diff --git a/src/Http/Server.hpp b/src/Http/Server.hpp index d1d3be5..0581042 100644 --- a/src/Http/Server.hpp +++ b/src/Http/Server.hpp @@ -1,5 +1,5 @@ -#ifndef SEEDTEST_HTTP_SERVER_HPP_ -#define SEEDTEST_HTTP_SERVER_HPP_ +#ifndef AIDP_HTTP_SERVER_HPP_ +#define AIDP_HTTP_SERVER_HPP_ #include "HttpInterface.hpp" @@ -10,8 +10,14 @@ class Server : HttpInterface { Server(AdServerContext* context); void registerLocation(adbase::http::Server* http); void status(adbase::http::Request* request, adbase::http::Response* response, void*); + void metrics(adbase::http::Request* request, adbase::http::Response* response, void*); + +private: + const std::string formatMetric(std::string key, uint64_t value, std::unordered_map tags); + uint64_t toUint64(std::string value); + uint64_t toUint64(double value); }; } } -#endif \ No newline at end of file +#endif