Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fc24 #33

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

Fc24 #33

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,8 @@ fab georemote <mechanism_name> # ["cometbft", "hotstuff", "bullshark"]

Demo of geodec running CometBFT with georemote and remote both.


https://github.com/GeoDecConsensus/geodec/assets/97289118/dcf9a365-8528-42f4-b7d4-de49a7162804



## Maintainers

[Naman Garg](https://x.com/namn_grg)
Expand All @@ -145,5 +142,4 @@ This project is licensed under the Apache License - see the [LICENSE.md](LICENSE

Thanks and credits to [Alberto Sonnino](https://github.com/asonnino) for the initial hotstuff and bullsharks benchmark.


Thank you [PBS Foundation](https://pbs.foundation/) for supporting this work.
2 changes: 1 addition & 1 deletion benchmark/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def run_client(address, size, rate, mechanism, timeout=None, nodes=[]):
f'--rate {rate} --timeout {timeout} {nodes}')
elif mechanism == 'cometbft':
return (f'./client -c 1 --size {size} --rate {rate} --time {timeout}'
f' --endpoints ws://localhost:26657/websocket -v --broadcast-tx-method sync --expect-peers {len(nodes)} --min-peer-connectivity {len(nodes)}')
f' --endpoints ws://localhost:26657/websocket -v --broadcast-tx-method sync --expect-peers {int(len(nodes)/2)} --min-peer-connectivity {int(round(len(nodes)/2))}')
# f' --endpoints ws://localhost:26657/websocket -v --expect-peers {len(nodes)-1} --min-peer-connectivity {len(nodes)-1}')
elif mechanism == 'bullshark':
nodes = f'--nodes {" ".join(nodes)}' if nodes else ''
Expand Down
218 changes: 218 additions & 0 deletions benchmark/geo_runs_stake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import os
import json
import pandas as pd
import logging
from datetime import datetime
import copy
import time
import subprocess
import sys

# Setup logging to both console and file
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Create a handler for the log file
file_handler = logging.FileHandler("processing_log.txt")
file_handler.setLevel(logging.INFO)

# Create a handler for the console (stdout)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)

# Create a formatter and attach it to both handlers
formatter = logging.Formatter("%(asctime)s - %(message)s")
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Add the handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# Define the JSON configuration path
CONFIG_PATH = "/home/ubuntu/geodec/settings.json"
FAB_PARAMS_JSON = "/home/ubuntu/geodec/fab-params.json"

# Define the list of chains and their CSV files
CHAINS = {
"Ethereum": "ethereum.csv",
"Ethernodes": "ethernodes.csv",
"Aptos": "aptos.csv",
"Sui": "sui.csv",
"Solana": "solana.csv",
"Avalanche": "avalanche.csv",
}

CONSENSUS_MECHANISMS = [
"cometbft",
"hotstuff",
# "bullshark"
]

GEO_INPUT_KEY = "geo_input" # Key in the JSON where the geo_input file path is stored


def load_json_config(config_path):
"""
Load the JSON configuration file.

:param config_path: Path to the JSON config file.
:return: Parsed JSON object.
"""
try:
with open(config_path, "r") as f:
config = json.load(f)
return config
except Exception as e:
logger.error(f"Failed to load JSON configuration: {e}")
raise


def save_json_config(config, config_path):
"""
Save the updated JSON configuration back to the file.

:param config: The updated JSON object.
:param config_path: Path to save the JSON file.
"""
try:
with open(config_path, "w") as f:
json.dump(config, f, indent=4)
logger.info("JSON configuration saved successfully.")
except Exception as e:
logger.error(f"Failed to save JSON configuration: {e}")
raise


def update_geo_input_in_json(chain_name, consensus_name, config):
"""
Update the 'geo_input' file path in the JSON configuration for the given chain.

:param chain_name: Name of the chain.
:param config: The current JSON config object.
"""
try:
new_geo_input = f"/home/ubuntu/geodec/rundata/{CHAINS[chain_name]}"
df = pd.read_csv(new_geo_input)

logger.info(f"Updating {GEO_INPUT_KEY} to {new_geo_input} for {chain_name}.")

config["consensusMechanisms"][consensus_name]["geodec"][GEO_INPUT_KEY] = new_geo_input
save_json_config(config, CONFIG_PATH)

except Exception as e:
logger.error(f"Failed to update geo_input in JSON for {chain_name}: {e}")
raise

return len(df)

def update_chain_config_in_json(num_nodes, consensus_name, config):
config["remote"][consensus_name]["bench_params"]["nodes"] = [ int(num_nodes) ]

save_json_config(config, FAB_PARAMS_JSON)


def process_weight_columns(input_file, consensus_name):
"""
Processes the weight columns by renaming them to 'stake' and executing the subprocess
for each column, then reverting the changes.

:param input_file: The file path of the geo_input CSV file.
:return: The processed DataFrame.
"""
try:
# Read the CSV file
logger.info(f"Reading input file: {input_file}")
df = pd.read_csv(input_file)
original_columns = df.columns.tolist()

# Identify weight columns
weight_columns = [col for col in original_columns if "weight" in col.lower()]
logger.info(f"Found {len(weight_columns)} weight columns: {weight_columns}")
weight_columns = weight_columns[3:]
print(weight_columns)

addLatency = True

# Process each weight column
for weight_col in weight_columns:
# Create a copy of the DataFrame for this iteration
df_temp = copy.deepcopy(df)

now = datetime.now()

logger.info("==============================================================")
logger.info(f"{str(now)} Running test for weight column '{weight_col}': ")

# Rename the current weight column to "stake"
logger.info(f"Renaming column '{weight_col}' to 'stake'")
df_temp = df_temp.rename(columns={weight_col: "stake"})
df_temp.to_csv(input_file, index=False)

# Execute the subprocess and capture its output
logger.info(f"Running subprocess for {weight_col}")
subprocess.run(["fab", "georemote", consensus_name, str(addLatency)])


logger.info(f"Reverting column name back to '{weight_col}'")
logger.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

# Only add the latencies for the first run
addLatency = False

# Wait before processing the next column
time.sleep(10)

df.to_csv(input_file, index=False)

logger.info("All weight columns processed successfully")

# Verify final column names match original
if df.columns.tolist() == original_columns:
logger.info("Final column names match original names")
else:
logger.warning("Final column names differ from original names")

return df

except Exception as e:
logger.error(f"Error processing weight columns: {e}")
raise


def process_all_chains(consensus_name):
"""
Iterates over all chains, updating the geo_input path in JSON,
and processing the weight columns for each one.
"""
try:
# Load the JSON configuration
config = load_json_config(CONFIG_PATH)
chain_config = load_json_config(FAB_PARAMS_JSON)

for chain in CHAINS:
logger.info(f"Processing chain: {chain}")

# Update the geo_input path in the JSON for the current chain
num_nodes = update_geo_input_in_json(chain, consensus_name, config)

# Update the node count in the Fab params JSON for the current chain
update_chain_config_in_json(num_nodes, consensus_name, chain_config)

# Get the updated geo_input file path from the config
input_file = config["consensusMechanisms"][consensus_name]["geodec"][GEO_INPUT_KEY]

# Process the weight columns for the current geo_input CSV
process_weight_columns(input_file, consensus_name)

logger.info("Processing completed for all chains successfully")

except Exception as e:
logger.error(f"Program failed during processing: {e}")
raise


if __name__ == "__main__":
for consensus_name in CONSENSUS_MECHANISMS:
logger.info(f"Processing consensus: {consensus_name}")
process_all_chains(consensus_name)
34 changes: 29 additions & 5 deletions benchmark/geodec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def getGeoInput(geo_input_file):
for row in csv_reader:
if row['id']: # Ensure id is not empty
geo_input[int(row['id'])] = 1
if row['count'] and row['id']:
geo_input[int(row['id'])] = int(row['count'])
# try:
# if row['count'] and row['id']:
# geo_input[int(row['id'])] = int(row['count'])
return geo_input

def _getServers(geoInput, servers_file):
Expand Down Expand Up @@ -105,12 +106,35 @@ def _aggregatePingDelays(pings_file, pings_grouped_file):

@staticmethod
def getPingDelay(geoInput, pings_grouped_file, pings_file):
# Check if the grouped file exists, if not, create it by aggregating
if not os.path.exists(pings_grouped_file):
GeoDec._aggregatePingDelays(pings_file, pings_grouped_file)

# Read the CSV file with ping delays
pingsDelays = pd.read_csv(pings_grouped_file)
id = list(geoInput.keys())
pingsDelays = pingsDelays[pingsDelays.source.isin(id) & pingsDelays.destination.isin(id)].query('source != destination')
return pingsDelays

# Extract the IDs from geoInput
id_list = list(geoInput.keys())

# Filter the data for only the source and destination within the geoInput, excluding same source-destination pairs
pingsDelays_filtered = pingsDelays[pingsDelays.source.isin(id_list) & pingsDelays.destination.isin(id_list)].query('source != destination')

# Find missing combinations
all_combinations = [(source, destination) for source in id_list for destination in id_list if source != destination]
existing_combinations = list(zip(pingsDelays_filtered['source'], pingsDelays_filtered['destination']))

missing_combinations = [pair for pair in all_combinations if pair not in existing_combinations]

# Print the ping delays that were not found
if missing_combinations:
print("The following ping delays were not found:")
for source, destination in missing_combinations:
print(f"Source: {source}, Destination: {destination}")
else:
print("All ping delays were found.")

# Return the filtered pingsDelays dataframe
return pingsDelays_filtered

def _check_if_quorum(dist_matrix, server, target, quorum_threshold):
distance = dist_matrix[target][server]
Expand Down
46 changes: 29 additions & 17 deletions benchmark/latency_setter.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,63 @@
from fabric import Connection, ThreadingGroup as Group
from benchmark.utils import Print


class LatencySetter:
def __init__(self, settings, connect):
self.settings = settings
self.connect = connect

@staticmethod
def _initalizeDelayQDisc(interface):
return (f'sudo tc qdisc add dev {interface} parent root handle 1:0 htb default 100')

# Add commands to delete any existing root qdisc before initializing
delete_cmd = f"sudo tc qdisc del dev {interface} root || true"
add_cmd = f"sudo tc qdisc add dev {interface} root handle 1:0 htb default 100"
# Return the combined command
return f"{delete_cmd}; {add_cmd}"

# Not needed
@staticmethod
def _deleteDelayQDisc(interface):
return (f'sudo tc qdisc del dev {interface} parent root')
return f"sudo tc qdisc del dev {interface} parent root"

def configDelay(self, hosts):
# Print.info('Delay qdisc initalization...')
Print.info("Delay qdisc initalization...")
cmd = LatencySetter._initalizeDelayQDisc(self.settings.interface)
g = Group(*hosts, user=self.settings.key_name, connect_kwargs=self.connect)
g.run(cmd, hide=True)

# Not needed
def deleteDelay(self, hosts):
# Print.info('Delete qdisc configurations...')
Print.info("Delete qdisc configurations...")
cmd = LatencySetter._deleteDelayQDisc(self.settings.interface)
g = Group(*hosts, user=self.settings.key_name, connect_kwargs=self.connect)
g.run(cmd, hide=True)

def addDelays(self, servers, pingDelays, interface):
for index, source in servers.iterrows():
source_commands = ''
source_commands = ""
counter = 1
for index, destination in servers.iterrows():
if source['id'] != destination['id']:
query = 'source == ' + str(source['id']) + ' and destination == '+ str(destination['id'])
delay_data = pingDelays.query(query)
delay = delay_data['avg'].values.astype(float)[0]
delay_dev = delay_data['mdev'].values.astype(float)[0]
cmd = LatencySetter._getDelayCommand(counter, destination['ip'], interface, delay/2, delay_dev/2)
if source["id"] != destination["id"]:
query = "source == " + str(source["id"]) + " and destination == " + str(destination["id"])
delay_data = pingDelays.query(query)
delay = delay_data["avg"].values.astype(float)[0]
delay_dev = delay_data["mdev"].values.astype(float)[0]
cmd = LatencySetter._getDelayCommand(
counter, destination["ip"], interface, delay / 2, delay_dev / 2
)
source_commands = source_commands + cmd
counter = counter + 1
host = source['ip']
host = source["ip"]
# execute the command for source IP
c = Connection(host, user=self.settings.key_name, connect_kwargs=self.connect)
c.run(source_commands, hide=True)
print(f"Added delay: {host}")

@staticmethod
def _getDelayCommand(n, ip, interface, delay, delay_dev):
return (f'sudo tc class add dev {interface} parent 1:0 classid 1:{n+1} htb rate 1gbit;'
f'sudo tc filter add dev {interface} parent 1:0 protocol ip u32 match ip dst {ip} flowid 1:{n+1};'
f'sudo tc qdisc add dev {interface} parent 1:{n+1} handle {n*10}:0 netem delay {delay}ms {delay_dev}ms; ')
return (
f"sudo tc class add dev {interface} parent 1:0 classid 1:{n+1} htb rate 1gbit;"
f"sudo tc filter add dev {interface} parent 1:0 protocol ip u32 match ip dst {ip} flowid 1:{n+1};"
f"sudo tc qdisc add dev {interface} parent 1:{n+1} handle {n*10}:0 netem delay {delay}ms {delay_dev}ms; "
)
3 changes: 2 additions & 1 deletion benchmark/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def aggregate_runs(run_id_array):
data_to_aggregate = data.loc[data["run_id"].isin(run_id_array)]

# Compute the mean for the specified fields
aggregated_data = data_to_aggregate.mean(numeric_only=True).reset_index()
# aggregated_data = data_to_aggregate.mean(numeric_only=True).reset_index()
aggregated_data = data_to_aggregate.median(numeric_only=True).reset_index()
aggregated_data = aggregated_data.loc[
aggregated_data["index"].isin(
[
Expand Down
3 changes: 3 additions & 0 deletions benchmark/mechanisms/hotstuff.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ def __init__(self, settings):
# This is missing from the Rocksdb installer (needed for Rocksdb).
'sudo apt-get install -y clang',

# Delete the original repo
f'rm -rf {self.settings.repo_name}',

# Clone the repo.
f'(git clone {self.settings.repo_url} || (cd {self.settings.repo_name} ; git pull))'
]
Expand Down
Loading