forked from hyphanet/lib-CppFCPLib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJobTicket.h
152 lines (114 loc) · 3.27 KB
/
JobTicket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#ifndef JOBTICKET_H__
#define JOBTICKET_H__
#include "Message.h"
#include "Exceptions.h"
#include "ServerMessage.h"
#include <vector>
#include <iostream>
#include <boost/shared_ptr.hpp>
#include "zthread/Guard.h"
#include "zthread/FastMutex.h"
#include "zthread/Mutex.h"
#include <boost/function.hpp>
namespace FCPLib {
class NodeThread;
class Node;
class JobTicket {
friend class Node;
friend class NodeThread;
public:
typedef boost::shared_ptr<JobTicket> Ptr;
private:
static Ptr factory(Node* n, std::string id, Message::Ptr cmd);
protected:
Node* node;
std::string id;
Message::Ptr cmd;
Response nodeResponse;
bool keep;
bool global;
bool persistent;
std::string repr;
bool isReprValid;
bool _isFinished;
ZThread::Mutex access;
ZThread::FastMutex lock; // used to be able to wait until isFinished
ZThread::FastMutex reqSentLock; // wait until a confirm message is received
int timeQueued;
boost::function<void (int, const ServerMessage::Ptr)> f;
JobTicket()
: keep(false),
global(false),
persistent(false),
isReprValid(false),
_isFinished(false)
{}
void init(Node *n, std::string &id, Message::Ptr cmd);
JobTicket& setKeep( bool x ) { keep = x; return *this; };
JobTicket& setGlobal( bool x ) { global = x; return *this; };
JobTicket& setPersistent( bool x ) { persistent = x; return *this; };
void setCallback( boost::function<void (int, const ServerMessage::Ptr)> f )
{
this->f = f;
}
void setCallback( void (*f)(int, const ServerMessage::Ptr) )
{
this->f = f;
}
void putResult();
// status... last message -- 0, not last message -- 1
void putResponse(int status, ServerMessage::Ptr m)
{
ZThread::Guard<ZThread::Mutex> g(access);
if (f) f(status, m);
if (nodeResponse.empty())
reqSentLock.release(); // first message has arrived, so it has been successfully submitted
nodeResponse.push_back(m);
}
public:
virtual ~JobTicket() {}
const std::string& getCommandName() const { return cmd->getHeader(); }
const std::string& getId() const { return id; }
const Message::Ptr getCommand() const { return cmd; }
void wait(unsigned int timeout_=0);
void waitTillReqSent(unsigned int timeout);
Response getResponse()
{
ZThread::Guard<ZThread::Mutex> g(access);
return Response( nodeResponse );
}
virtual const std::string& toString();
bool isGlobal() const { return global; }
bool isPersistent() const { return persistent; }
bool isFinished()
{
ZThread::Guard<ZThread::Mutex> g(access);
return _isFinished;
}
};
class GetJob : public JobTicket {
friend class Node;
friend class NodeThread;
public:
typedef boost::shared_ptr<GetJob> Ptr;
enum ReturnType { Direct, Disk, None };
private:
ReturnType retType;
std::ostream *stream;
GetJob()
: JobTicket(),
retType(Direct),
stream(NULL)
{}
static Ptr factory(Node*n, std::string id, Message::Ptr cmd);
GetJob& setStream( std::ostream *s ) { stream = s; return *this; }
GetJob& setReturnType( ReturnType r ) { retType = r; return *this; }
public:
~GetJob() {}
std::ostream& getStream() { return *stream; }
ReturnType getReturnType() const { return retType; }
const std::string& toString();
};
typedef std::vector<JobTicket::Ptr> JobCollection;
}
#endif