Skip to content

Commit

Permalink
add "--window" or "-w" to send several mesages at once per process
Browse files Browse the repository at this point in the history
  • Loading branch information
hpk42 committed Nov 20, 2023
1 parent db93796 commit 7779ec3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
35 changes: 26 additions & 9 deletions src/pingpong/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,30 @@ def get_next_incoming_message_snapshot(account):


class PingerProcess:
def __init__(self, account, chat, num_pings):
def __init__(self, account, chat, num_pings, window, reportfunc):
assert window <= num_pings
self.account = account
self.chat = chat
self.num_pings = num_pings
self.window = window
self.reportfunc = reportfunc

def __call__(self):
ping2start = {}

def receive_one_pong():
num = int(get_next_incoming_message_snapshot(self.account).text)
elapsed = ping2start.pop(num)()
self.reportfunc(self.account, num, elapsed)

for seq in range(self.num_pings):
elapsed = Elapsed()
ping2start[seq] = Elapsed()
self.chat.send_text(f"{seq}")
num = int(get_next_incoming_message_snapshot(self.account).text)
assert num == seq
print(f"{num},{elapsed()}")
if len(ping2start) == self.window:
receive_one_pong()

while ping2start:
receive_one_pong()


class PongerProcess:
Expand All @@ -108,7 +120,7 @@ def __call__(self):
snapshot.chat.send_text(snapshot.text)


def run(api, proc, num_pings):
def run(api, proc, num_pings, window):
elapsed = Elapsed()

print(f"making {proc} ping-accounts and {proc} pong-accounts", file=sys.stderr)
Expand All @@ -118,6 +130,9 @@ def run(api, proc, num_pings):
f"finished, took {elapsed} ({speed:0.02f} accounts per second)", file=sys.stderr
)

def reportfunc(account, num, elapsed):
print(f"{num},{elapsed}")

with concurrent.futures.ThreadPoolExecutor(max_workers=proc * 2) as executor:
for i in range(proc):
ac_ping = accounts[i]
Expand All @@ -127,17 +142,19 @@ def run(api, proc, num_pings):
ac_ping.desc = f"ping-{i}"
ac_pong.desc = f"pong-{i}"
futures = [
executor.submit(PingerProcess(ac_ping, chat, num_pings)),
executor.submit(
PingerProcess(ac_ping, chat, num_pings, window, reportfunc)
),
executor.submit(PongerProcess(ac_pong, num_pings)),
]
done, pending = concurrent.futures.wait(futures)
assert not pending
return [x.result() for x in done]


def run_bot(proc, num_pings):
def run_bot(proc, num_pings, window):
logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(message)s")
with tempfile.TemporaryDirectory() as tmpdirname:
with Rpc(accounts_dir=Path(tmpdirname) / "accounts") as rpc:
api = DeltaChat(rpc)
run(api, proc, num_pings)
run(api, proc, num_pings, window)
12 changes: 9 additions & 3 deletions src/pingpong/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
"--proc",
"-p",
default=1,
help="Number of ping/pong processes to run concurrently. ",
help="Number of ping/pong processes to run concurrently (default 1). ",
)
def pingpong(proc, num_pings):
run_bot(proc, num_pings)
@click.option(
"--window",
"-w",
default=1,
help="Num of simultanous pings per process (default 1)",
)
def pingpong(proc, num_pings, window):
run_bot(proc, num_pings, window)


if __name__ == "__main__":
Expand Down

0 comments on commit 7779ec3

Please sign in to comment.