From 7779ec3cc0027515c500740b5c3c8f681358261a Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 20 Nov 2023 18:36:00 +0100 Subject: [PATCH] add "--window" or "-w" to send several mesages at once per process --- src/pingpong/__init__.py | 35 ++++++++++++++++++++++++++--------- src/pingpong/__main__.py | 12 +++++++++--- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/pingpong/__init__.py b/src/pingpong/__init__.py index f9f9df5..4e0cc1f 100644 --- a/src/pingpong/__init__.py +++ b/src/pingpong/__init__.py @@ -83,18 +83,30 @@ def get_next_incoming_message_snapshot(account): class PingerProcess: - def __init__(self, account, chat, num_pings): + def __init__(self, account, chat, num_pings, window, reportfunc): + assert window <= num_pings self.account = account self.chat = chat self.num_pings = num_pings + self.window = window + self.reportfunc = reportfunc def __call__(self): + ping2start = {} + + def receive_one_pong(): + num = int(get_next_incoming_message_snapshot(self.account).text) + elapsed = ping2start.pop(num)() + self.reportfunc(self.account, num, elapsed) + for seq in range(self.num_pings): - elapsed = Elapsed() + ping2start[seq] = Elapsed() self.chat.send_text(f"{seq}") - num = int(get_next_incoming_message_snapshot(self.account).text) - assert num == seq - print(f"{num},{elapsed()}") + if len(ping2start) == self.window: + receive_one_pong() + + while ping2start: + receive_one_pong() class PongerProcess: @@ -108,7 +120,7 @@ def __call__(self): snapshot.chat.send_text(snapshot.text) -def run(api, proc, num_pings): +def run(api, proc, num_pings, window): elapsed = Elapsed() print(f"making {proc} ping-accounts and {proc} pong-accounts", file=sys.stderr) @@ -118,6 +130,9 @@ def run(api, proc, num_pings): f"finished, took {elapsed} ({speed:0.02f} accounts per second)", file=sys.stderr ) + def reportfunc(account, num, elapsed): + print(f"{num},{elapsed}") + with concurrent.futures.ThreadPoolExecutor(max_workers=proc * 2) as executor: for i in range(proc): ac_ping = accounts[i] @@ -127,7 +142,9 @@ def run(api, proc, num_pings): ac_ping.desc = f"ping-{i}" ac_pong.desc = f"pong-{i}" futures = [ - executor.submit(PingerProcess(ac_ping, chat, num_pings)), + executor.submit( + PingerProcess(ac_ping, chat, num_pings, window, reportfunc) + ), executor.submit(PongerProcess(ac_pong, num_pings)), ] done, pending = concurrent.futures.wait(futures) @@ -135,9 +152,9 @@ def run(api, proc, num_pings): return [x.result() for x in done] -def run_bot(proc, num_pings): +def run_bot(proc, num_pings, window): logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(message)s") with tempfile.TemporaryDirectory() as tmpdirname: with Rpc(accounts_dir=Path(tmpdirname) / "accounts") as rpc: api = DeltaChat(rpc) - run(api, proc, num_pings) + run(api, proc, num_pings, window) diff --git a/src/pingpong/__main__.py b/src/pingpong/__main__.py index 181830a..5ddfb19 100644 --- a/src/pingpong/__main__.py +++ b/src/pingpong/__main__.py @@ -10,10 +10,16 @@ "--proc", "-p", default=1, - help="Number of ping/pong processes to run concurrently. ", + help="Number of ping/pong processes to run concurrently (default 1). ", ) -def pingpong(proc, num_pings): - run_bot(proc, num_pings) +@click.option( + "--window", + "-w", + default=1, + help="Num of simultanous pings per process (default 1)", +) +def pingpong(proc, num_pings, window): + run_bot(proc, num_pings, window) if __name__ == "__main__":