Skip to content

Commit

Permalink
Merge pull request #1 from deltachat/hpk/measure-more
Browse files Browse the repository at this point in the history
add parallel measurements, pingpong entry point, report RTT durations instead of timestamps
  • Loading branch information
hpk42 authored Aug 5, 2024
2 parents f0070b5 + 7779ec3 commit 609eb1a
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 101 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ dependencies = [
"click"
]
version = "0.1"

[project.scripts]
pingpong = "pingpong.__main__:pingpong"
231 changes: 135 additions & 96 deletions src/pingpong/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import json
import logging
import os
import sys
import random
import tempfile
import time
import concurrent.futures
from pathlib import Path
from threading import Thread

from deltachat_rpc_client import DeltaChat, EventType, Rpc

Expand All @@ -21,101 +20,141 @@ def get_temp_credentials() -> dict:
return {"email": addr, "password": password}


def run(api, window, limit):
accounts = []
for _ in range(2):
creds = get_temp_credentials()
account = api.add_account()

account.set_config("bot", "1")
account.set_config("bcc_self", "0")
account.set_config("mvbox_move", "0")
account.set_config("mdns_enabled", "0")
account.set_config("e2ee_enabled", "0")

account.set_config("mail_port", "993")
account.set_config("send_port", "465")

account.set_config("socks5_enabled", "0")
account.set_config("socks5_host", "127.0.0.1")
account.set_config("socks5_port", "9050")

assert not account.is_configured()

account.set_config("addr", creds["email"])
account.set_config("mail_pw", creds["password"])
account.configure()
account.start_io()
accounts.append(account)
logging.info("Configured accounts")

def pinger_process(account):
while True:
event = account.wait_for_event()
if event["kind"] == EventType.INFO:
logging.info("%s", event["msg"])
elif event["kind"] == EventType.WARNING:
logging.warning("%s", event["msg"])
elif event["kind"] == EventType.ERROR:
logging.error("%s", event["msg"])
elif event["kind"] == EventType.INCOMING_MSG:
logging.info("Got an incoming message")

message = account.get_message_by_id(event["msg_id"])
snapshot = message.get_snapshot()
received = int(snapshot.text)
now = time.time()
print(f"{received},{now - start_time}")
if received < limit:
snapshot.chat.send_text(str(received + window))
else:
snapshot.chat.send_text("STOP")
snapshot.message.mark_seen()

if received >= limit:
return True

def echo_process(account):
while True:
event = account.wait_for_event()
if event["kind"] == EventType.INFO:
logging.info("%s", event["msg"])
elif event["kind"] == EventType.WARNING:
logging.warning("%s", event["msg"])
elif event["kind"] == EventType.ERROR:
logging.error("%s", event["msg"])
elif event["kind"] == EventType.INCOMING_MSG:
logging.info("Got an incoming message")

message = account.get_message_by_id(event["msg_id"])
snapshot = message.get_snapshot()
received = snapshot.text
if received == "STOP":
return
snapshot.chat.send_text(snapshot.text)

ponger_addr = accounts[1].get_config("addr")
pinger_ponger_contact = accounts[0].create_contact(ponger_addr, "")
pinger_ponger_chat = pinger_ponger_contact.create_chat()

logging.info("Creating tasks")
ponger = Thread(target=echo_process, args=(accounts[1],))
pinger = Thread(target=pinger_process, args=(accounts[0],))
pinger.start()
ponger.start()

logging.info("Sending text")
for i in range(window):
pinger_ponger_chat.send_text(str(1 + i))
start_time = time.time()

pinger.join()
ponger.join()


def run_bot(window, limit):
def make_accounts(num, account_maker):
"""Test that long-running task does not block short-running task from completion."""
with concurrent.futures.ThreadPoolExecutor(max_workers=num) as executor:
futures = [executor.submit(account_maker) for i in range(num)]
done, pending = concurrent.futures.wait(futures)
return [x.result() for x in done]


def create_account(api):
account = api.add_account()
account.set_config("bot", "1")
account.set_config("bcc_self", "0")
account.set_config("mvbox_move", "0")
account.set_config("mdns_enabled", "0")
account.set_config("e2ee_enabled", "0")

account.set_config("mail_port", "993")
account.set_config("send_port", "465")

account.set_config("socks5_enabled", "0")
account.set_config("socks5_host", "127.0.0.1")
account.set_config("socks5_port", "9050")

assert not account.is_configured()

creds = get_temp_credentials()
account.set_config("addr", creds["email"])
account.set_config("mail_pw", creds["password"])
domain = creds["email"].split("@")[1]
account.set_config("mail_server", domain)
account.set_config("send_server", domain)
account.configure()
# print(f"account configured {creds['email']}", file=sys.stderr)
account.start_io()
return account


class Elapsed:
def __init__(self):
self.start = time.time()

def __call__(self):
return time.time() - self.start

def __str__(self):
return f"{self():0.2f}s"


def get_next_incoming_message_snapshot(account):
while True:
event = account.wait_for_event()
if event["kind"] == EventType.INFO:
logging.info(f"{account.desc} {event['msg']}")
elif event["kind"] == EventType.WARNING:
logging.warning(f"{account.desc} {event['msg']}")
elif event["kind"] == EventType.ERROR:
logging.error(f"{account.desc} {event['msg']}")
elif event["kind"] == EventType.INCOMING_MSG:
message = account.get_message_by_id(event["msg_id"])
return message.get_snapshot()


class PingerProcess:
def __init__(self, account, chat, num_pings, window, reportfunc):
assert window <= num_pings
self.account = account
self.chat = chat
self.num_pings = num_pings
self.window = window
self.reportfunc = reportfunc

def __call__(self):
ping2start = {}

def receive_one_pong():
num = int(get_next_incoming_message_snapshot(self.account).text)
elapsed = ping2start.pop(num)()
self.reportfunc(self.account, num, elapsed)

for seq in range(self.num_pings):
ping2start[seq] = Elapsed()
self.chat.send_text(f"{seq}")
if len(ping2start) == self.window:
receive_one_pong()

while ping2start:
receive_one_pong()


class PongerProcess:
def __init__(self, account, num_pings):
self.account = account
self.num_pings = num_pings

def __call__(self):
for i in range(self.num_pings):
snapshot = get_next_incoming_message_snapshot(self.account)
snapshot.chat.send_text(snapshot.text)


def run(api, proc, num_pings, window):
elapsed = Elapsed()

print(f"making {proc} ping-accounts and {proc} pong-accounts", file=sys.stderr)
accounts = make_accounts(proc * 2, lambda: create_account(api))
speed = proc * 2 / elapsed()
print(
f"finished, took {elapsed} ({speed:0.02f} accounts per second)", file=sys.stderr
)

def reportfunc(account, num, elapsed):
print(f"{num},{elapsed}")

with concurrent.futures.ThreadPoolExecutor(max_workers=proc * 2) as executor:
for i in range(proc):
ac_ping = accounts[i]
ac_pong = accounts[i + proc]
pong_addr = ac_pong.get_config("addr")
chat = ac_ping.create_contact(pong_addr, "").create_chat()
ac_ping.desc = f"ping-{i}"
ac_pong.desc = f"pong-{i}"
futures = [
executor.submit(
PingerProcess(ac_ping, chat, num_pings, window, reportfunc)
),
executor.submit(PongerProcess(ac_pong, num_pings)),
]
done, pending = concurrent.futures.wait(futures)
assert not pending
return [x.result() for x in done]


def run_bot(proc, num_pings, window):
logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(message)s")
with tempfile.TemporaryDirectory() as tmpdirname:
with Rpc(accounts_dir=Path(tmpdirname) / "accounts") as rpc:
api = DeltaChat(rpc)
run(api, window, limit)
run(api, proc, num_pings, window)
19 changes: 15 additions & 4 deletions src/pingpong/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@


@click.command()
@click.option("--limit", default=100, help="Maximum number of messages to send.")
@click.option(
"--window", default=1, help="Number of messages to send at the same time."
"--num-pings", "-n", default=10, help="Number of pings to send in each ping-process"
)
def pingpong(window, limit):
run_bot(window, limit)
@click.option(
"--proc",
"-p",
default=1,
help="Number of ping/pong processes to run concurrently (default 1). ",
)
@click.option(
"--window",
"-w",
default=1,
help="Num of simultanous pings per process (default 1)",
)
def pingpong(proc, num_pings, window):
run_bot(proc, num_pings, window)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion stat.pl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
chomp $line;
my @fields = split "," , $line;
if (defined $prev) {
push(@values, $fields[1] - $prev)
push(@values, $fields[1])
}
$prev = $fields[1]
}
Expand Down

0 comments on commit 609eb1a

Please sign in to comment.