forked from hyphanet/lib-CppFCPLib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNodeThread.cpp
140 lines (125 loc) · 4.53 KB
/
NodeThread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <ctime>
#include <boost/lexical_cast.hpp>
#include "NodeThread.h"
#include "Log.h"
#include "Node.h"
using namespace FCPLib;
using namespace ZThread;
NodeThread::NodeThread(Node* n,
std::string &host,
int port,
JobTicketQueuePtr clientReqQueue_) throw()
: node(n),
clientReqQueue(clientReqQueue_),
host_(host),
port_(port),
s(new Server( host_, port_ ))
{
}
void NodeThread::run(){
ServerMessage::Ptr m;
JobTicket::Ptr job;
log().log(DETAIL, "FCPNode: manager thread starting");
try {
while (!Thread::interrupted()) {
//check for incoming message from node
log().log(NOISY, "_mgrThread: Testing for incoming message");
if (s->dataAvailable()){
log().log(DEBUG, "_mgrThread: Retrieving incoming message");
m = ServerMessage::factory(s);
log().log(DEBUG, "_mgrThread: Got incoming message, dispatching");
// dispatch the message
doMessage(m);
}
//check for incoming message from client
log().log(NOISY, "_mgrThread: Testing for incoming req");
if (!clientReqQueue->empty()){
log().log(DEBUG, "_mgrThread: Got incoming client req");
job = clientReqQueue->get();
log().log(DEBUG, "_mgrThread: Got incoming client req from the queue");
log().log(NOISY, job->toString());
sendClientReq(job);
}
Thread::sleep(100); // do I need this?
}
} catch (ZThread::Synchronization_Exception& e) {
// thread was interupted, normal way to shutdown the thread
// this object will be destroyed
log().log(ERROR, "_mgrThread: Caught Synchronization_Exception");
log().log(ERROR, e.what());
node->setIsAlive(false);
} catch (std::runtime_error& e) {
// some error has occured, keep the thread so you can access the isAlive and getFailure
log().log(ERROR, "_mgrThreag: Caught std::runtime_error");
log().log(ERROR, e.what());
node->setIsAlive( false );
node->setFailure( e.what() );
} catch (std::exception& e) {
// some error has occured, keep the thread so you can access the isAlive and getFailure
log().log(ERROR, "_mgrThreag: Caught std::exception");
log().log(ERROR, e.what());
node->setIsAlive( false );
node->setFailure( e.what() );
} catch (...) {
// thread is stopped and we don't know what has happend
log().log(ERROR, "_mgrThreag: Caught something else");
node->setIsAlive(false);
node->setFailure( "unknown error" );
}
}
void
NodeThread::sendClientReq(JobTicket::Ptr job)
{
log().log(NOISY, "sendClientReq : top");
if (job->getCommandName() != "WatchGlobal") {
log().log(NOISY, "sendClientReq : about to add the job to the map");
jobs[job->isGlobal() ? 1 : 0][job->getId()] = job;
log().log(NOISY, "sendClientReq : added the job to the map");
}
s->send(job->getCommand());
job->timeQueued = (unsigned int) time(0);
}
void
NodeThread::doMessage(ServerMessage::Ptr message)
{
JobTicket::Ptr job;
std::map<std::string, JobTicket::Ptr>::iterator it;
std::string tmp = message->getMessage()->getField("Global");
tmp = tmp == "" ? "false" : tmp;
int isGlobal = tmp == "false" ? 0 : 1;
it = jobs[isGlobal].find(message->getIdOfJob());
if (it == jobs[isGlobal].end()) {
log().log(DETAIL, "doMessage : received " + message->getMessage()->getHeader() + ", cannot find " + message->getIdOfJob() + " in started jobs");
/// message from global queue or error
Message::Ptr m = message->getMessage();
if (!isGlobal) { // error
log().log(DEBUG, "doMessage : received error message");
// TODO: create a mean of passing error messages to client programme
return;
} else { // global queue, create a job
log().log(DEBUG, "doMessage : received message from a global queue");
if ( m->getField("Identifier") == "" ) {
// should never happen
log().log(ERROR, "doMessage : global message does not contain identifier !???");
return;
}
JobTicket::Ptr job = JobTicket::factory( this->node, m->getField("Identifier"), m );
job->setGlobal(isGlobal).setPersistent(true);
jobs[isGlobal][m->getField("Identifier")] = job;
return;
}
}
job = it->second;
if ( message->isLast( job ) ) {
log().log(NOISY, "doMessage : last message for the job");
job->putResponse(1, message);
job->putResult();
if (!job->keep) {
log().log(NOISY, "doMessage : job should not be kept, erasing");
jobs[isGlobal].erase( it );
}
}
else {
job->putResponse(0, message);
}
}