Skip to content

Commit

Permalink
pulled recovery out of Recoverable::Recoverable; removed 'simulated' …
Browse files Browse the repository at this point in the history
…argument in Recoverable::recover()
  • Loading branch information
Haosen Wen committed Jun 2, 2022
1 parent 14ffbb5 commit ad71c8a
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 106 deletions.
10 changes: 6 additions & 4 deletions src/persist/api/Recoverable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ Recoverable::Recoverable(GlobalTestConfig* gtc){
gtc->setEnv("Liveness", "Blocking");
_esys = new pds::EpochSys(gtc);
}
auto recovered = _esys->get_recovered();
if (recovered) {
last_recovered = recovered->size();
recover(recovered, false);
auto recovered_pblks = _esys->get_recovered();
if (recovered_pblks) {
last_recovered_cnt = recovered_pblks->size();
}
}
Recoverable::~Recoverable(){
delete _esys;
delete pending_allocs;
delete pending_retires;
delete epochs;
if (recovered_pblks) {
delete recovered_pblks;
}
Persistent::finalize();
}
void Recoverable::init_thread(GlobalTestConfig*, LocalTestConfig* ltc){
Expand Down
16 changes: 9 additions & 7 deletions src/persist/api/Recoverable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ class Recoverable{
padded<std::vector<pds::PBlk*>>* pending_allocs = nullptr;
// pending retires; each pair is <original payload, anti-payload>
padded<std::vector<pair<pds::PBlk*,pds::PBlk*>>>* pending_retires = nullptr;
// pointer to recovered PBlks from EpochSys
std::unordered_map<uint64_t, pds::PBlk *>* recovered_pblks = nullptr;
// count of last recovered PBlks from EpochSys
uint64_t last_recovered = 0;
uint64_t last_recovered_cnt = 0;
public:
// return num of blocks recovered.
virtual int recover(std::unordered_map<uint64_t, pds::PBlk*>* recovered, bool simulated = false) {
virtual int recover() {
errexit("recover() not implemented. Implement recover() or delete existing persistent heap file.");
return 0;
}
Expand Down Expand Up @@ -289,8 +291,11 @@ class Recoverable{
assert(epochs[pds::EpochSys::tid].ui != NULL_EPOCH);
return _esys->openwrite_pblk(b, epochs[pds::EpochSys::tid].ui);
}
std::unordered_map<uint64_t, pds::PBlk*>* recover_pblks(const int rec_thd=10){
return _esys->recover(rec_thd);
std::unordered_map<uint64_t, pds::PBlk*>* get_recovered_pblks(){
return recovered_pblks;
}
uint64_t get_last_recovered_cnt() {
return last_recovered_cnt;
}
void sync(){
assert(epochs[pds::EpochSys::tid].ui == NULL_EPOCH);
Expand All @@ -308,9 +313,6 @@ class Recoverable{
}
// _esys->flush();
}
uint64_t get_last_recovered_cnt() {
return last_recovered;
}

pds::sc_desc_t* get_dcss_desc(){
return _esys->get_dcss_desc();
Expand Down
21 changes: 3 additions & 18 deletions src/persist/api/montage_global_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,9 @@

namespace pds{
class GlobalRecoverable: public Recoverable{
std::unordered_map<uint64_t, PBlk*>* recovered_pblks = nullptr;
public:
GlobalRecoverable(GlobalTestConfig* gtc): Recoverable(gtc){}
~GlobalRecoverable(){
if (recovered_pblks){
delete recovered_pblks;
}
}
int recover(bool simulated){
// TODO: handle simulated situation here?
recovered_pblks = recover_pblks();
// TODO: return number of blocks here?
return 0;
}
std::unordered_map<uint64_t, PBlk*>* get_recovered(){
return recovered_pblks;
}
~GlobalRecoverable(){}
};

extern GlobalRecoverable* global_recoverable;
Expand Down Expand Up @@ -117,9 +103,8 @@ namespace pds{
// delete(b);
// }})

inline std::unordered_map<uint64_t, PBlk*>* recover(const int rec_thd=10){
global_recoverable->recover(rec_thd);
return global_recoverable->get_recovered();
inline std::unordered_map<uint64_t, PBlk*>* get_recovered_pblks(){
return global_recoverable->get_recovered_pblks();
}

inline void flush(){
Expand Down
2 changes: 1 addition & 1 deletion src/rideables/HOHHashTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class HOHHashTable : public RMap<K,V>, public Recoverable{
}
}

int recover(bool simulated){
int recover(){
errexit("recover of HOHHashTable not implemented");
}

Expand Down
41 changes: 18 additions & 23 deletions src/rideables/MontageGraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class MontageGraph : public RGraph, public Recoverable{
};

MontageGraph(GlobalTestConfig* gtc) : Recoverable(gtc), gtc(gtc) {
if (get_recovered_pblks()) {
recover();
return;
}

size_t sz = numVertices;
this->vMeta = new VertexMeta[numVertices];
std::mt19937_64 gen(time(NULL));
Expand Down Expand Up @@ -168,7 +173,9 @@ class MontageGraph : public RGraph, public Recoverable{
if(gtc->verbose) std::cout << "Filled mean edges per vertex" << std::endl;
}

~MontageGraph() {}
~MontageGraph() {
delete vMeta;
}

// Obtain statistics of graph (|V|, |E|, average degree, vertex degrees)
// Not concurrent safe...
Expand Down Expand Up @@ -329,35 +336,23 @@ class MontageGraph : public RGraph, public Recoverable{
return ret;
}

int recover(bool simulated) {
int recover() {
struct RelationWrapper{
int v1;
int v2;
Relation* e;
} __attribute__((aligned(CACHE_LINE_SIZE)));

// assert(0&&"recover() not implemented!");
if (simulated) {
recover_mode();
delete vMeta;
vMeta = new VertexMeta[numVertices];
// #pragma omp parallel for
for (size_t i = 0; i < numVertices; i++) {
vertex(i) = nullptr;
}
online_mode();
vMeta = new VertexMeta[numVertices];
for (size_t i = 0; i < numVertices; i++) {
vertex(i) = nullptr;
}

int rec_thd = gtc->task_num;
int block_cnt = 0;
auto begin = chrono::high_resolution_clock::now();
std::unordered_map<uint64_t, pds::PBlk*>* recovered = recover_pblks();
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms << "ms getting PBlk(" << recovered->size() << ")" << std::endl;
std::unordered_map<uint64_t, pds::PBlk*>* recovered = get_recovered_pblks();
assert(recovered);

begin = chrono::high_resolution_clock::now();
auto begin = chrono::high_resolution_clock::now();
std::vector<Relation*> relationVector;
std::vector<Vertex*> vertexVector;
{
Expand Down Expand Up @@ -389,9 +384,9 @@ class MontageGraph : public RGraph, public Recoverable{
}
}
}
end = chrono::high_resolution_clock::now();
dur = end - begin;
dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms << "ms gathering vertices(" << vertexVector.size() << ") and edges(" << relationVector.size() << ")..." << std::endl;
begin = chrono::high_resolution_clock::now();

Expand Down
16 changes: 7 additions & 9 deletions src/rideables/MontageHashTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ class MontageHashTable : public RMap<K,V>, public Recoverable{
std::hash<K> hash_fn;
Bucket buckets[idxSize];
GlobalTestConfig* gtc;
MontageHashTable(GlobalTestConfig* gtc_): Recoverable(gtc_), gtc(gtc_){};
MontageHashTable(GlobalTestConfig* gtc_): Recoverable(gtc_), gtc(gtc_){
if (get_recovered_pblks()) {
recover();
}
};

void init_thread(GlobalTestConfig* gtc, LocalTestConfig* ltc){
Recoverable::init_thread(gtc, ltc);
Expand Down Expand Up @@ -200,14 +204,8 @@ class MontageHashTable : public RMap<K,V>, public Recoverable{
}


int recover(std::unordered_map<uint64_t, pds::PBlk*>* recovered, bool simulated){
if (simulated){
recover_mode(); // PDELETE --> noop
// clear transient structures.
clear();
online_mode(); // re-enable PDELETE.
}

int recover(){
std::unordered_map<uint64_t, pds::PBlk*>* recovered = get_recovered_pblks();
assert(recovered);

int rec_cnt = 0;
Expand Down
35 changes: 16 additions & 19 deletions src/rideables/MontageLfHashTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,16 @@ class MontageLfHashTable : public RMap<K,V>, public Recoverable{
}
public:
MontageLfHashTable(GlobalTestConfig* gtc) : Recoverable(gtc), tracker(gtc->task_num, 100, 1000, true), gtc(gtc) {
if (get_recovered_pblks()) {
recover();
}
};
~MontageLfHashTable(){
recover_mode(); // PDELETE --> noop
// clear transient structures.
clear();
online_mode(); // re-enable PDELETE.
};
~MontageLfHashTable(){};

void init_thread(GlobalTestConfig* gtc, LocalTestConfig* ltc){
Recoverable::init_thread(gtc, ltc);
Expand All @@ -114,35 +122,25 @@ class MontageLfHashTable : public RMap<K,V>, public Recoverable{
buckets[i].ui.ptr.store(this,nullptr);
}
}
int recover(bool simulated){
if (simulated){
recover_mode(); // PDELETE --> noop
// clear transient structures.
clear();
online_mode(); // re-enable PDELETE.
}

int recover(){
int rec_cnt = 0;
int rec_thd = gtc->task_num;
if (gtc->checkEnv("RecoverThread")){
rec_thd = stoi(gtc->getEnv("RecoverThread"));
}
auto begin = chrono::high_resolution_clock::now();
std::unordered_map<uint64_t, pds::PBlk*>* recovered = recover_pblks(rec_thd);
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms << "ms getting PBlk(" << recovered->size() << ")" << std::endl;
std::unordered_map<uint64_t, pds::PBlk*>* recovered = get_recovered_pblks();
assert(recovered);

std::vector<Payload*> payloadVector;
payloadVector.reserve(recovered->size());
begin = chrono::high_resolution_clock::now();
auto begin = chrono::high_resolution_clock::now();
for (auto itr = recovered->begin(); itr != recovered->end(); itr++){
rec_cnt++;
Payload* payload = reinterpret_cast<Payload*>(itr->second);
payloadVector.push_back(payload);
}
end = chrono::high_resolution_clock::now();
dur = end - begin;
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms_vec = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms_vec << "ms building vector" << std::endl;
begin = chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -187,7 +185,6 @@ class MontageLfHashTable : public RMap<K,V>, public Recoverable{
dur = end - begin;
auto dur_ms_ins = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms_ins << "ms inserting(" << recovered->size() << ")" << std::endl;
std::cout << "Total time to recover: " << dur_ms+dur_ms_vec+dur_ms_ins << "ms" << std::endl;
delete recovered;
return rec_cnt;
}
Expand Down
30 changes: 12 additions & 18 deletions src/rideables/MontageLfSkipList.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ class MontageLfSkipList : public RMap<K, V>, public Recoverable {
int bg_tid = gtc->task_num;
bg_state.store(background_state::RUNNING);
background_thread = std::move(std::thread(&MontageLfSkipList::bg_loop, this, bg_tid));
if (get_recovered_pblks()) {
recover();
}
};
~MontageLfSkipList(){
recover_mode(); // PDELETE --> noop
// clear transient structures.
clear();
online_mode(); // re-enable PDELETE.
bg_state.store(background_state::FINISHED);
background_thread.join();
};
Expand Down Expand Up @@ -180,38 +187,26 @@ class MontageLfSkipList : public RMap<K, V>, public Recoverable {
head.ptr.store(initialHeadNode);
}

int recover(bool simulated){
if (simulated){
recover_mode(); // PDELETE --> noop
// clear transient structures.
clear();
online_mode(); // re-enable PDELETE.
}

int recover(){
should_cas_verify.store(false);

int rec_cnt = 0;
int rec_thd_count = gtc->task_num;
if (gtc->checkEnv("RecoverThread")){
rec_thd_count = stoi(gtc->getEnv("RecoverThread"));
}
auto begin = chrono::high_resolution_clock::now();
std::unordered_map<uint64_t, pds::PBlk*>* recovered = recover_pblks(rec_thd_count);
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms << "ms getting PBlk(" << recovered->size() << ")" << std::endl;
std::unordered_map<uint64_t, pds::PBlk*>* recovered = get_recovered_pblks();

std::vector<Payload*> payloadVector;
payloadVector.reserve(recovered->size());
begin = chrono::high_resolution_clock::now();
auto begin = chrono::high_resolution_clock::now();
for (auto itr = recovered->begin(); itr != recovered->end(); itr++){
rec_cnt++;
Payload* payload = reinterpret_cast<Payload*>(itr->second);
payloadVector.push_back(payload);
}
end = chrono::high_resolution_clock::now();
dur = end - begin;
auto end = chrono::high_resolution_clock::now();
auto dur = end - begin;
auto dur_ms_vec = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms_vec << "ms building vector" << std::endl;

Expand Down Expand Up @@ -253,7 +248,6 @@ class MontageLfSkipList : public RMap<K, V>, public Recoverable {
dur = end - begin;
auto dur_ms_ins = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
std::cout << "Spent " << dur_ms_ins << "ms inserting(" << recovered->size() << ")" << std::endl;
std::cout << "Total time to recover: " << dur_ms+dur_ms_vec+dur_ms_ins << "ms" << std::endl;

should_cas_verify.store(true);
delete recovered;
Expand Down
2 changes: 1 addition & 1 deletion src/rideables/MontageMSQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class MontageMSQueue : public RQueue<T>, public Recoverable{
Recoverable::init_thread(gtc, ltc);
}

int recover(bool simulated){
int recover(){
errexit("recover of MontageMSQueue not implemented.");
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/rideables/MontageNatarajanTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class MontageNatarajanTree : public RMap<K,V>, public Recoverable{
Recoverable::init_thread(gtc, ltc);
}

int recover(bool simulated){
int recover(){
errexit("recover of MontageNatarajanTree not implemented.");
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/rideables/MontageQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MontageQueue : public RQueue<T>, public Recoverable{
Recoverable::init_thread(gtc, ltc);
}

int recover(bool simulated){
int recover(){
errexit("recover of MontageQueue not implemented.");
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/rideables/MontageSSHashTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class MontageSSHashTable : public RMap<K, V>, public Recoverable {
Recoverable::init_thread(gtc, ltc);
}

int recover(bool simulated){
int recover(){
errexit("recover of MontageSSHashTable not implemented.");
return 0;
}
Expand Down
Loading

0 comments on commit ad71c8a

Please sign in to comment.