diff --git a/conf/system.ini b/conf/system.ini index 929ea6e..cc8d83c 100644 --- a/conf/system.ini +++ b/conf/system.ini @@ -14,7 +14,7 @@ isAsync=no [adserver] ; 是否启动 mc server -mc=yes +mc=no ; 是否启动 http server http=yes ; 是否启动 head server @@ -28,7 +28,7 @@ serverName=mc-server [http] host=0.0.0.0 -port=10010 +port=20010 timeout=3 threadNum=4 serverName=adinf-adserver @@ -48,7 +48,7 @@ serverName=head-server ;是否是 v0.9.x 新协议获取 isNewConsumerOut=yes ; 支持同时消费多个 topic, 多个用逗号分隔 -topicNameOut=test,admid_mark +topicNameOut=test groupIdOut=test_group_id brokerListOut=10.13.4.161:9192,10.13.4.160:9192 kafkaDebugOut=none @@ -62,10 +62,10 @@ intervalClearStorage=1000 [lua] ; 如果是 debug 模式脚本执行将每次重新加载 debug=no -scriptPath=./lua +scriptPath=/usr/home/zhongxiu/code/lua [consumer] -scriptName=dusty.lua +scriptName=/usr/home/zhongxiu/code/lua/main.lua batchNumber=1000 messageSwp=./message threadNumber=2 diff --git a/rpm/aidp.spec b/rpm/aidp.spec index c14ce38..a5c706d 100644 --- a/rpm/aidp.spec +++ b/rpm/aidp.spec @@ -1,7 +1,7 @@ Summary: Weibo adinf kafka consumer using lua script Name: aidp Version:0.1.1 -Release:el.7 +Release:el6.5 Group:Development/Tools License:BSD URL: http://adinf.weiboad.org @@ -14,7 +14,7 @@ Prefix:/usr/local/adinf/aidp Requires:sudo %define debug_packages %{nil} %define debug_package %{nil} -%define _topdir /home/vagrant/rpmbuild +%define _topdir /usr/home/zhongxiu/rpmbuild %description ------------------------------------- - Everything in order to facilitate - diff --git a/rpm/build_rpm b/rpm/build_rpm index 3af78b2..abc3ae0 100755 --- a/rpm/build_rpm +++ b/rpm/build_rpm @@ -11,7 +11,7 @@ CUR_PWD=`pwd` PATH_DEV_SWAN=/usr/local/adinf/aidp # build rpm 包的目录 -BUILD_ROOT="/home/vagrant/rpmbuild" +BUILD_ROOT="/usr/home/zhongxiu/rpmbuild" SWANSOFT_NAME="aidp" SWANSOFT_VERSION="0.1.1" BUILD_TMP=/tmp diff --git a/src/Aims/Kafka/ConsumerOut.cpp b/src/Aims/Kafka/ConsumerOut.cpp index 619fead..71f2db0 100644 --- a/src/Aims/Kafka/ConsumerOut.cpp +++ b/src/Aims/Kafka/ConsumerOut.cpp @@ -24,7 +24,10 @@ bool ConsumerOut::pull(const std::string& topicName, int partId, uint64_t offset item.offset = offset; item.message = data; item.topicName = topicName; - return _context->message->push(item); + if (_context->message != nullptr) { + return _context->message->push(item); + } + return false; } // }}} diff --git a/src/App.cpp b/src/App.cpp index 2ed9aaf..11385f0 100644 --- a/src/App.cpp +++ b/src/App.cpp @@ -46,7 +46,7 @@ void App::run() { _storage = std::shared_ptr(new app::Storage(_configure)); _storage->init(); - _message = new app::Message(_configure, _storage.get()); + _message = std::shared_ptr(new app::Message(_configure, _storage.get())); _message->start(); } @@ -62,11 +62,7 @@ void App::reload() { // {{{ void App::stop() void App::stop() { - if (_message != nullptr) { - _message->stop(); - delete _message; - _message = nullptr; - } + _message->stop(); } // }}} @@ -82,7 +78,7 @@ void App::setAdServerContext(AdServerContext* context) { void App::setAimsContext(AimsContext* context) { context->app = this; - context->message = _message; + context->message = _message.get(); context->storage = _storage.get(); } diff --git a/src/App.hpp b/src/App.hpp index b0d1dbd..a35ab77 100644 --- a/src/App.hpp +++ b/src/App.hpp @@ -22,8 +22,8 @@ class App { private: AdbaseConfig* _configure; - app::Message* _message = nullptr; std::shared_ptr _storage; + std::shared_ptr _message; mutable std::mutex _mut; void bindLuaMessage(); }; diff --git a/src/App/Message.cpp b/src/App/Message.cpp index b2f8633..ec913e2 100644 --- a/src/App/Message.cpp +++ b/src/App/Message.cpp @@ -4,7 +4,7 @@ #include namespace app { -thread_local adbase::lua::Engine* messageLuaEngine = nullptr; +thread_local std::unique_ptr messageLuaEngine; thread_local std::unordered_map messageLuaMessages; // {{{ Message::Message() @@ -105,9 +105,9 @@ void Message::call(int i) { // {{{ void Message::initLua() void Message::initLua() { - messageLuaEngine = new adbase::lua::Engine(); + std::unique_ptr engine(new adbase::lua::Engine()); + messageLuaEngine = std::move(engine); messageLuaEngine->init(); - messageLuaEngine->clearLoaded(); messageLuaEngine->addSearchPath(_configure->luaScriptPath, true); adbase::lua::BindingClass clazz("message", "aidp", messageLuaEngine->getLuaState()); @@ -115,7 +115,11 @@ void Message::initLua() { GetMessageFn getMessageFn = std::bind(&app::Message::getMessage, this); clazz.addMethod("get", getMessageFn); - _storage->bindClass(messageLuaEngine); + typedef std::function)> RollBackMessageFn; + RollBackMessageFn rollbackFn = std::bind(&app::Message::rollback, this, std::placeholders::_1); + clazz.addMethod("rollback", rollbackFn); + + _storage->bindClass(messageLuaEngine.get()); } // }}} @@ -135,7 +139,7 @@ MessageToLua Message::getMessage() { MessageToLua ret; for (auto &t : messageLuaMessages) { std::list item; - std::string id = std::to_string(t.second.partId) + "_" + std::to_string(t.second.offset); + std::string id = convertKey(t.second); std::string message = t.second.message.retrieveAllAsString(); item.push_back(id); item.push_back(t.second.topicName); @@ -145,6 +149,22 @@ MessageToLua Message::getMessage() { return ret; } +// }}} +// {{{ int Message::rollback() + +int Message::rollback(std::list ids) { + int count = 0; + for (auto &t : ids) { + if (messageLuaMessages.find(t) != messageLuaMessages.end()) { + MessageItem item = messageLuaMessages[t]; + int processQueueNum = item.partId % _configure->consumerThreadNumber; + _queues[processQueueNum]->push(item); + count++; + } + } + return count; +} + // }}} // {{{ bool Message::push() @@ -165,10 +185,6 @@ bool Message::push(MessageItem& item) { void Message::deleteThread(std::thread *t) { t->join(); - if (messageLuaEngine != nullptr) { - delete messageLuaEngine; - messageLuaEngine = nullptr; - } delete t; } @@ -183,7 +199,7 @@ void Message::addLuaMessage(MessageItem& item) { // {{{ const std::string Message::convertKey() const std::string Message::convertKey(MessageItem& item) { - std::string key = std::to_string(item.partId) + "_" + std::to_string(item.offset); + std::string key = std::to_string(item.partId) + "_" + std::to_string(item.offset) + "_" + item.topicName; return key; } diff --git a/src/App/Message.hpp b/src/App/Message.hpp index 7c67755..1a56200 100644 --- a/src/App/Message.hpp +++ b/src/App/Message.hpp @@ -36,6 +36,7 @@ class Message { void reload(); void call(int i); MessageToLua getMessage(); + int rollback(std::list ids); bool push(MessageItem& item); static void deleteThread(std::thread *t); diff --git a/src/BootStrap.cpp b/src/BootStrap.cpp index 103524a..4a1735b 100644 --- a/src/BootStrap.cpp +++ b/src/BootStrap.cpp @@ -28,7 +28,8 @@ BootStrap::~BootStrap() { void BootStrap::init(int argc, char **argv) { _configure = new AdbaseConfig; - _app = new App(_configure); + std::unique_ptr tmpApp(new App(_configure)); + _app = std::move(tmpApp); // 解析指定的参数 parseOption(argc, argv); @@ -114,10 +115,8 @@ void BootStrap::stop(const int sig) { delete _timer; } - if (_app != nullptr) { + if (_app) { _app->stop(); - delete _app; - _app = nullptr; } if (_loop != nullptr) { diff --git a/src/BootStrap.hpp b/src/BootStrap.hpp index e262e07..94799d7 100644 --- a/src/BootStrap.hpp +++ b/src/BootStrap.hpp @@ -32,7 +32,7 @@ class BootStrap { adbase::EventLoop* _loop = nullptr; Aims* _aims = nullptr; Timer* _timer = nullptr; - App* _app = nullptr; + std::unique_ptr _app; void daemonInit(); void asyncLogger(const char* msg, int len); @@ -44,4 +44,4 @@ class BootStrap { void printVersion(); }; -#endif \ No newline at end of file +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index afd44b1..001bd70 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,5 +45,7 @@ TARGET_LINK_LIBRARIES(aidp libadbase.a pthread rt dl + ssl + sasl2 ) INSTALL(TARGETS aidp RUNTIME DESTINATION adinf/aidp/bin) diff --git a/src/Http/Index.cpp b/src/Http/Index.cpp index 39937e6..64fe04f 100644 --- a/src/Http/Index.cpp +++ b/src/Http/Index.cpp @@ -29,7 +29,6 @@ void Index::index(adbase::http::Request* request, adbase::http::Response* respon if (httpLuaEngine == nullptr) { httpLuaEngine = new adbase::lua::Engine(); httpLuaEngine->init(); - httpLuaEngine->clearLoaded(); httpLuaEngine->addSearchPath(_context->config->luaScriptPath, true); bindLuaClass(httpLuaEngine); LOG_INFO << "Http Lua engine init...."; diff --git a/src/Version.hpp b/src/Version.hpp index 5fbbbe7..9e298bc 100644 --- a/src/Version.hpp +++ b/src/Version.hpp @@ -3,8 +3,8 @@ #define AIDP_VERSION_HPP_ #define VERSION "0.1.1" #define SOVERSION "" -#define GIT_DIRTY "85" -#define GIT_SHA1 "5d5b361e" -#define BUILD_ID "localhost.localdomain-1491552560" +#define GIT_DIRTY "0" +#define GIT_SHA1 "1aa5d7c5" +#define BUILD_ID "bpdev-1491978244" #define BUILD_TYPE "Release" #endif