Skip to content

Commit

Permalink
Merge branch 'shm/feat/mem-efficient2' into shm/feat/mem-efficient
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSeemaier committed Apr 24, 2024
2 parents 60fc8ca + e6f601b commit ba29597
Show file tree
Hide file tree
Showing 15 changed files with 1,328 additions and 909 deletions.
1 change: 1 addition & 0 deletions apps/io/shm_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "kaminpar-shm/datastructures/compressed_graph.h"
#include "kaminpar-shm/datastructures/csr_graph.h"
#include "kaminpar-shm/datastructures/graph.h"
#include "kaminpar-shm/kaminpar.h"

#include "kaminpar-common/datastructures/static_array.h"
Expand Down
17 changes: 15 additions & 2 deletions apps/tools/shm_graph_properties_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,30 @@ void print_graph_properties(const Graph &graph, const Context ctx, std::ostream
int main(int argc, char *argv[]) {
Context ctx = create_default_context();
std::string graph_filename;
io::GraphFileFormat graph_file_format = io::GraphFileFormat::METIS;

CLI::App app("Shared-memory graph properties tool");
app.add_option("-G,--graph", graph_filename, "Input graph in METIS format")->required();
app.add_option("-t,--threads", ctx.parallel.num_threads, "Number of threads");
app.add_option("-f,--graph-file-format", graph_file_format)
->transform(CLI::CheckedTransformer(io::get_graph_file_formats()).description(""))
->description(R"(Graph file formats:
- metis
- parhip)")
->capture_default_str();
create_graph_compression_options(&app, ctx);
CLI11_PARSE(app, argc, argv);

tbb::global_control gc(tbb::global_control::max_allowed_parallelism, ctx.parallel.num_threads);

Graph graph =
io::read(graph_filename, ctx.compression.enabled, ctx.compression.may_dismiss, false, false);
Graph graph = io::read(
graph_filename,
graph_file_format,
ctx.compression.enabled,
ctx.compression.may_dismiss,
false,
false
);

ctx.debug.graph_name = str::extract_basename(graph_filename);
ctx.compression.setup(graph);
Expand Down
39 changes: 38 additions & 1 deletion kaminpar-cli/kaminpar_arguments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ CLI::Option_group *create_coarsening_options(CLI::App *app, Context &ctx) {
->capture_default_str();

// Clustering options:
coarsening->add_option("--c-clustering-algorithm", ctx.coarsening.algorithm)
coarsening->add_option("--c-clustering-algorithm", ctx.coarsening.clustering.algorithm)
->transform(CLI::CheckedTransformer(get_clustering_algorithms()).description(""))
->description(R"(One of the following options:
- noop: disable coarsening
Expand Down Expand Up @@ -239,10 +239,17 @@ Options are:
->description(
R"(Determines the mode for aggregating ratings in the second phase of label propagation.
Options are:
- none: Skip the second phase
- direct: Write the ratings directly into the global vector (shared between threads)
- buffered: Write the ratings into a thread-local buffer and then copy them into the global vector when the buffer is full
)"
);
lp->add_option(
"--c-lp-second-phase-relabel",
ctx.coarsening.clustering.lp.relabel_before_second_phase,
"Relabel the clusters before running the second phase"
)
->capture_default_str();

lp->add_option("--c-lp-two-hop-strategy", ctx.coarsening.clustering.lp.two_hop_strategy)
->transform(CLI::CheckedTransformer(get_two_hop_strategies()).description(""))
Expand Down Expand Up @@ -365,6 +372,36 @@ CLI::Option_group *create_lp_refinement_options(CLI::App *app, Context &ctx) {
)
->capture_default_str();

lp->add_option(
"--r-lp-two-phases",
ctx.refinement.lp.use_two_phases,
"Uses two phases in each iteration, where in the second phase the high-degree nodes are "
"treated separately"
)
->capture_default_str();
lp->add_option("--r-lp-second-phase-select-mode", ctx.refinement.lp.second_phase_select_mode)
->transform(CLI::CheckedTransformer(get_second_phase_select_modes()).description(""))
->description(
R"(Determines the mode for selecting nodes for the second phase of label propagation.
Options are:
- high-degree: Select nodes with high degree
- full-rating-map: Select nodes which have a full rating map in the first phase
)"
)
->capture_default_str();
lp->add_option(
"--r-lp-second-phase-aggregation-mode", ctx.refinement.lp.second_phase_aggregation_mode
)
->transform(CLI::CheckedTransformer(get_second_phase_aggregation_modes()).description(""))
->description(
R"(Determines the mode for aggregating ratings in the second phase of label propagation.
Options are:
- none: Skip the second phase
- direct: Write the ratings directly into the global vector (shared between threads)
- buffered: Write the ratings into a thread-local buffer and then copy them into the global vector when the buffer is full
)"
);

return lp;
}

Expand Down
110 changes: 104 additions & 6 deletions kaminpar-common/datastructures/concurrent_two_level_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,34 @@ class ConcurrentTwoLevelVector {
*
* @param capacity The capacity of the vector.
*/
ConcurrentTwoLevelVector(const Size capacity = 0) : _values(capacity), _table(0) {}
ConcurrentTwoLevelVector(const Size capacity = 0)
: _capacity(capacity),
_values(capacity),
_table(0) {}

ConcurrentTwoLevelVector(const ConcurrentTwoLevelVector &) = delete;
ConcurrentTwoLevelVector &operator=(const ConcurrentTwoLevelVector &) = delete;

ConcurrentTwoLevelVector(ConcurrentTwoLevelVector &&) noexcept = default;
ConcurrentTwoLevelVector &operator=(ConcurrentTwoLevelVector &&) noexcept = default;

/*!
* Returns the number of elements that this vector can hold.
*
* @return The number of elements that this vector can hold.
*/
[[nodiscard]] Size capacity() const {
return _capacity;
}

/*!
* Resizes the vector.
*
* @param capacity The capacity to resize to.
*/
void resize(const Size capacity) {
_values.resize(capacity);
_capacity = capacity;
}

/*!
Expand All @@ -78,16 +91,49 @@ class ConcurrentTwoLevelVector {
void free() {
_values.free();
_table = ConcurrentHashTable(0);
_capacity = 0;
}

/*!
* Resets the vector such that new elements can be inserted.
*/
void reset() {
// As Growt does not provide a clear function, just create a new hash table.
// As growt does not provide a clear function, just create a new hash table.
_table = ConcurrentHashTable(0);
}

/**
* Reassigns stored values according to a provided mapping.
*
* @param mapping The mapping according to which the values are reassigned.
* @param new_size The new size of the vector.
*/
void reassign(const StaticArray<Size> &mapping, const Size new_size) {
StaticArray<FirstValue> new_values(new_size);
ConcurrentHashTable new_table(0);

tbb::parallel_for(tbb::blocked_range<Size>(0, _values.size()), [&](const auto &r) {
for (Size pos = r.begin(); pos != r.end(); ++pos) {
const Value value = _values[pos];

if (value == kMaxFirstValue) {
Size new_pos = mapping[pos] - 1;
new_values[new_pos] = kMaxFirstValue;

const Value actual_value = (*_table.get_handle().find(pos)).second;
new_table.get_handle().insert(new_pos, value);
} else if (value != 0) {
Size new_pos = mapping[pos] - 1;
new_values[new_pos] = value;
}
}
});

_values = std::move(new_values);
_table = std::move(new_table);
_capacity = new_size;
}

/*!
* Accesses a value at a given position.
*
Expand Down Expand Up @@ -194,6 +240,7 @@ class ConcurrentTwoLevelVector {
}

private:
Size _capacity;
StaticArray<FirstValue> _values;
ConcurrentHashTable _table;
};
Expand Down Expand Up @@ -225,21 +272,31 @@ class ConcurrentTwoLevelVector {
*
* @param capacity The capacity of the vector.
*/
ConcurrentTwoLevelVector(const Size capacity = 0) : _values(capacity) {}
ConcurrentTwoLevelVector(const Size capacity = 0) : _capacity(capacity), _values(capacity) {}

ConcurrentTwoLevelVector(const ConcurrentTwoLevelVector &) = delete;
ConcurrentTwoLevelVector &operator=(const ConcurrentTwoLevelVector &) = delete;

ConcurrentTwoLevelVector(ConcurrentTwoLevelVector &&) noexcept = default;
ConcurrentTwoLevelVector &operator=(ConcurrentTwoLevelVector &&) noexcept = default;

/*!
* Returns the number of elements that this vector can hold.
*
* @return The number of elements that this vector can hold.
*/
[[nodiscard]] Size capacity() const {
return _capacity;
}

/*!
* Resizes the vector.
*
* @param capacity The capacity to resize to.
*/
void resize(const Size capacity) {
_values.resize(capacity);
_capacity = capacity;
}

/*!
Expand All @@ -248,6 +305,7 @@ class ConcurrentTwoLevelVector {
void free() {
_values.free();
_table.clear();
_capacity = 0;
}

/*!
Expand All @@ -257,6 +315,45 @@ class ConcurrentTwoLevelVector {
_table.clear();
}

/**
* Reassigns stored values according to a provided mapping.
*
* @param mapping The mapping according to which the values are reassigned.
* @param new_size The new size of the vector.
*/
void reassign(const StaticArray<Size> &mapping, const Size new_size) {
StaticArray<FirstValue> new_values(new_size);
ConcurrentHashTable new_table;

tbb::parallel_for(tbb::blocked_range<Size>(0, _values.size()), [&](const auto &r) {
for (Size pos = r.begin(); pos != r.end(); ++pos) {
const Value value = _values[pos];

if (value == kMaxFirstValue) {
Size new_pos = mapping[pos] - 1;
new_values[new_pos] = kMaxFirstValue;

const Value actual_value = [&] {
typename ConcurrentHashTable::const_accessor entry;
_table.find(entry, pos);
return entry->second;
}();

typename ConcurrentHashTable::accessor entry;
new_table.insert(entry, new_pos);
entry->second = actual_value;
} else if (value != 0) {
Size new_pos = mapping[pos] - 1;
new_values[new_pos] = value;
}
}
});

_values = std::move(new_values);
_table = std::move(new_table);
_capacity = new_size;
}

/*!
* Accesses a value at a given position.
*
Expand Down Expand Up @@ -309,7 +406,7 @@ class ConcurrentTwoLevelVector {
void atomic_add(const Size pos, const Value delta) {
KASSERT(pos < _values.size());

Value value = _values[pos];
FirstValue value = _values[pos];
bool success;
do {
if (value == kMaxFirstValue) {
Expand All @@ -323,7 +420,7 @@ class ConcurrentTwoLevelVector {
break;
}

const Value new_value = value + delta;
const Value new_value = static_cast<Value>(value) + delta;
if (new_value < kMaxFirstValue) {
success = __atomic_compare_exchange_n(
&_values[pos], &value, new_value, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED
Expand Down Expand Up @@ -357,7 +454,7 @@ class ConcurrentTwoLevelVector {
void atomic_sub(const Size pos, const Value delta) {
KASSERT(pos < _values.size());

Value value = _values[pos];
FirstValue value = _values[pos];
bool success;
do {
if (value == kMaxFirstValue) {
Expand All @@ -378,6 +475,7 @@ class ConcurrentTwoLevelVector {
}

private:
Size _capacity;
StaticArray<FirstValue> _values;
ConcurrentHashTable _table;
};
Expand Down
1 change: 1 addition & 0 deletions kaminpar-shm/coarsening/cluster_coarsener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bool ClusteringCoarsener::coarsen() {
SCOPED_TIMER("Level", std::to_string(_hierarchy.size()));

if (_clustering.size() < current().n()) {
SCOPED_HEAP_PROFILER("Allocation");
SCOPED_TIMER("Allocation");
_clustering.resize(current().n());
}
Expand Down
Loading

0 comments on commit ba29597

Please sign in to comment.