Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
add message rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed Apr 13, 2017
1 parent 1aa5d7c commit dd159f2
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 37 deletions.
10 changes: 5 additions & 5 deletions conf/system.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ isAsync=no

[adserver]
; 是否启动 mc server
mc=yes
mc=no
; 是否启动 http server
http=yes
; 是否启动 head server
Expand All @@ -28,7 +28,7 @@ serverName=mc-server

[http]
host=0.0.0.0
port=10010
port=20010
timeout=3
threadNum=4
serverName=adinf-adserver
Expand All @@ -48,7 +48,7 @@ serverName=head-server
;是否是 v0.9.x 新协议获取
isNewConsumerOut=yes
; 支持同时消费多个 topic, 多个用逗号分隔
topicNameOut=test,admid_mark
topicNameOut=test
groupIdOut=test_group_id
brokerListOut=10.13.4.161:9192,10.13.4.160:9192
kafkaDebugOut=none
Expand All @@ -62,10 +62,10 @@ intervalClearStorage=1000
[lua]
; 如果是 debug 模式脚本执行将每次重新加载
debug=no
scriptPath=./lua
scriptPath=/usr/home/zhongxiu/code/lua

[consumer]
scriptName=dusty.lua
scriptName=/usr/home/zhongxiu/code/lua/main.lua
batchNumber=1000
messageSwp=./message
threadNumber=2
Expand Down
4 changes: 2 additions & 2 deletions rpm/aidp.spec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Summary: Weibo adinf kafka consumer using lua script
Name: aidp
Version:0.1.1
Release:el.7
Release:el6.5
Group:Development/Tools
License:BSD
URL: http://adinf.weiboad.org
Expand All @@ -14,7 +14,7 @@ Prefix:/usr/local/adinf/aidp
Requires:sudo
%define debug_packages %{nil}
%define debug_package %{nil}
%define _topdir /home/vagrant/rpmbuild
%define _topdir /usr/home/zhongxiu/rpmbuild
%description
-------------------------------------
- Everything in order to facilitate -
Expand Down
2 changes: 1 addition & 1 deletion rpm/build_rpm
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CUR_PWD=`pwd`
PATH_DEV_SWAN=/usr/local/adinf/aidp

# build rpm 包的目录
BUILD_ROOT="/home/vagrant/rpmbuild"
BUILD_ROOT="/usr/home/zhongxiu/rpmbuild"
SWANSOFT_NAME="aidp"
SWANSOFT_VERSION="0.1.1"
BUILD_TMP=/tmp
Expand Down
5 changes: 4 additions & 1 deletion src/Aims/Kafka/ConsumerOut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ bool ConsumerOut::pull(const std::string& topicName, int partId, uint64_t offset
item.offset = offset;
item.message = data;
item.topicName = topicName;
return _context->message->push(item);
if (_context->message != nullptr) {
return _context->message->push(item);
}
return false;
}

// }}}
Expand Down
10 changes: 3 additions & 7 deletions src/App.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void App::run() {
_storage = std::shared_ptr<app::Storage>(new app::Storage(_configure));
_storage->init();

_message = new app::Message(_configure, _storage.get());
_message = std::shared_ptr<app::Message>(new app::Message(_configure, _storage.get()));
_message->start();
}

Expand All @@ -62,11 +62,7 @@ void App::reload() {
// {{{ void App::stop()

void App::stop() {
if (_message != nullptr) {
_message->stop();
delete _message;
_message = nullptr;
}
_message->stop();
}

// }}}
Expand All @@ -82,7 +78,7 @@ void App::setAdServerContext(AdServerContext* context) {

void App::setAimsContext(AimsContext* context) {
context->app = this;
context->message = _message;
context->message = _message.get();
context->storage = _storage.get();
}

Expand Down
2 changes: 1 addition & 1 deletion src/App.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class App {

private:
AdbaseConfig* _configure;
app::Message* _message = nullptr;
std::shared_ptr<app::Storage> _storage;
std::shared_ptr<app::Message> _message;
mutable std::mutex _mut;
void bindLuaMessage();
};
Expand Down
36 changes: 26 additions & 10 deletions src/App/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <adbase/Lua.hpp>

namespace app {
thread_local adbase::lua::Engine* messageLuaEngine = nullptr;
thread_local std::unique_ptr<adbase::lua::Engine> messageLuaEngine;
thread_local std::unordered_map<std::string, MessageItem> messageLuaMessages;

// {{{ Message::Message()
Expand Down Expand Up @@ -105,17 +105,21 @@ void Message::call(int i) {
// {{{ void Message::initLua()

void Message::initLua() {
messageLuaEngine = new adbase::lua::Engine();
std::unique_ptr<adbase::lua::Engine> engine(new adbase::lua::Engine());
messageLuaEngine = std::move(engine);
messageLuaEngine->init();
messageLuaEngine->clearLoaded();
messageLuaEngine->addSearchPath(_configure->luaScriptPath, true);

adbase::lua::BindingClass<app::Message> clazz("message", "aidp", messageLuaEngine->getLuaState());
typedef std::function<MessageToLua()> GetMessageFn;
GetMessageFn getMessageFn = std::bind(&app::Message::getMessage, this);
clazz.addMethod("get", getMessageFn);

_storage->bindClass(messageLuaEngine);
typedef std::function<void (std::list<std::string>)> RollBackMessageFn;
RollBackMessageFn rollbackFn = std::bind(&app::Message::rollback, this, std::placeholders::_1);
clazz.addMethod("rollback", rollbackFn);

_storage->bindClass(messageLuaEngine.get());
}

// }}}
Expand All @@ -135,7 +139,7 @@ MessageToLua Message::getMessage() {
MessageToLua ret;
for (auto &t : messageLuaMessages) {
std::list<std::string> item;
std::string id = std::to_string(t.second.partId) + "_" + std::to_string(t.second.offset);
std::string id = convertKey(t.second);
std::string message = t.second.message.retrieveAllAsString();
item.push_back(id);
item.push_back(t.second.topicName);
Expand All @@ -145,6 +149,22 @@ MessageToLua Message::getMessage() {
return ret;
}

// }}}
// {{{ int Message::rollback()

int Message::rollback(std::list<std::string> ids) {
int count = 0;
for (auto &t : ids) {
if (messageLuaMessages.find(t) != messageLuaMessages.end()) {
MessageItem item = messageLuaMessages[t];
int processQueueNum = item.partId % _configure->consumerThreadNumber;
_queues[processQueueNum]->push(item);
count++;
}
}
return count;
}

// }}}
// {{{ bool Message::push()

Expand All @@ -165,10 +185,6 @@ bool Message::push(MessageItem& item) {

void Message::deleteThread(std::thread *t) {
t->join();
if (messageLuaEngine != nullptr) {
delete messageLuaEngine;
messageLuaEngine = nullptr;
}
delete t;
}

Expand All @@ -183,7 +199,7 @@ void Message::addLuaMessage(MessageItem& item) {
// {{{ const std::string Message::convertKey()

const std::string Message::convertKey(MessageItem& item) {
std::string key = std::to_string(item.partId) + "_" + std::to_string(item.offset);
std::string key = std::to_string(item.partId) + "_" + std::to_string(item.offset) + "_" + item.topicName;
return key;
}

Expand Down
1 change: 1 addition & 0 deletions src/App/Message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Message {
void reload();
void call(int i);
MessageToLua getMessage();
int rollback(std::list<std::string> ids);
bool push(MessageItem& item);
static void deleteThread(std::thread *t);

Expand Down
7 changes: 3 additions & 4 deletions src/BootStrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ BootStrap::~BootStrap() {

void BootStrap::init(int argc, char **argv) {
_configure = new AdbaseConfig;
_app = new App(_configure);
std::unique_ptr<App> tmpApp(new App(_configure));
_app = std::move(tmpApp);

// 解析指定的参数
parseOption(argc, argv);
Expand Down Expand Up @@ -114,10 +115,8 @@ void BootStrap::stop(const int sig) {
delete _timer;
}

if (_app != nullptr) {
if (_app) {
_app->stop();
delete _app;
_app = nullptr;
}

if (_loop != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions src/BootStrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class BootStrap {
adbase::EventLoop* _loop = nullptr;
Aims* _aims = nullptr;
Timer* _timer = nullptr;
App* _app = nullptr;
std::unique_ptr<App> _app;

void daemonInit();
void asyncLogger(const char* msg, int len);
Expand All @@ -44,4 +44,4 @@ class BootStrap {
void printVersion();
};

#endif
#endif
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,7 @@ TARGET_LINK_LIBRARIES(aidp libadbase.a
pthread
rt
dl
ssl
sasl2
)
INSTALL(TARGETS aidp RUNTIME DESTINATION adinf/aidp/bin)
1 change: 0 additions & 1 deletion src/Http/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ void Index::index(adbase::http::Request* request, adbase::http::Response* respon
if (httpLuaEngine == nullptr) {
httpLuaEngine = new adbase::lua::Engine();
httpLuaEngine->init();
httpLuaEngine->clearLoaded();
httpLuaEngine->addSearchPath(_context->config->luaScriptPath, true);
bindLuaClass(httpLuaEngine);
LOG_INFO << "Http Lua engine init....";
Expand Down
6 changes: 3 additions & 3 deletions src/Version.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#define AIDP_VERSION_HPP_
#define VERSION "0.1.1"
#define SOVERSION ""
#define GIT_DIRTY "85"
#define GIT_SHA1 "5d5b361e"
#define BUILD_ID "localhost.localdomain-1491552560"
#define GIT_DIRTY "0"
#define GIT_SHA1 "1aa5d7c5"
#define BUILD_ID "bpdev-1491978244"
#define BUILD_TYPE "Release"
#endif

0 comments on commit dd159f2

Please sign in to comment.