From c1a887db218498c28b7d1bfdceda7d8a38b02b57 Mon Sep 17 00:00:00 2001 From: ptg Date: Wed, 13 Nov 2024 16:59:09 -0500 Subject: [PATCH 1/5] fix async graph --- ptgctl/async_graph.py | 16 ++++++++++++---- ptgctl/tools/display.py | 22 ++++++++++++++++++++-- ptgctl/tools/mock.py | 4 +++- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/ptgctl/async_graph.py b/ptgctl/async_graph.py index 93f2466..1841ed7 100644 --- a/ptgctl/async_graph.py +++ b/ptgctl/async_graph.py @@ -64,15 +64,23 @@ def __init__(self, maxsize=1, buffersize=8): super().__init__(maxsize) def _init(self, maxsize): - self._queue = collections.deque(maxlen=maxsize) + self._queue = collections.deque(maxlen=1000) + self._latest = collections.deque(maxlen=1) self._buffer = collections.deque(maxlen=self.buffersize) def _put(self, item): + self._latest.append(item) self._queue.append(item) self._buffer.append(item) - def read_buffer(self): - return list(self._buffer) + def read_buffer(self): # TODO: rename + output = [] + for i in range(len(self._queue)): + try: + output.append(self._queue.popleft()) + except IndexError: + pass + return output def push(self, item): full = self.full() @@ -122,4 +130,4 @@ async def main(): # g.add_consumer(sample_consumer, q, sleep=3, name='get3') if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/ptgctl/tools/display.py b/ptgctl/tools/display.py index 734b008..d05bf48 100644 --- a/ptgctl/tools/display.py +++ b/ptgctl/tools/display.py @@ -121,16 +121,34 @@ async def raw(api, stream_id, utf=False, **kw): @util.async2sync @util.interruptable -async def file(api, stream_id, out_dir='', include_timestamps=False, **kw): +async def file(api, stream_id, out_dir='', include_timestamps=False, group_sessions=True, **kw): os.makedirs(out_dir or '.', exist_ok=True) + kw.setdefault('latest', False) + SESSION_ID = 'event:session:id' + if group_sessions: + stream_id += '+' + SESSION_ID + async with api.data_pull_connect(stream_id, **kw) as ws: with contextlib.ExitStack() as stack: files = {} pbars = {} + current_session = api.session.id() + print("current session:", current_session) while True: for sid, ts, data in await ws.recv_data(): + if group_sessions and sid == SESSION_ID: + print("Changing session", sid) + stack.pop_all().close() + files.clear() + current_session = data.decode('utf-8') + if group_sessions and not current_session: + continue if sid not in files: - files[sid] = stack.enter_context(open(os.path.join(out_dir, f'{sid}.txt'), 'w')) + sid_file = os.path.join(out_dir, current_session or 'unknown-session' if group_sessions else '', f'{sid}.txt') + print("Opening file", sid_file) + os.makedirs(os.path.dirname(sid_file), exist_ok=True) + files[sid] = stack.enter_context(open(sid_file, 'w')) + if sid not in pbars: pbars[sid] = tqdm.tqdm(desc=sid) files[sid].write(f"{f'{ts}:' if include_timestamps else ''}{data.decode('utf-8')}\n") pbars[sid].update() diff --git a/ptgctl/tools/mock.py b/ptgctl/tools/mock.py index 336b0a9..6d7cf7c 100644 --- a/ptgctl/tools/mock.py +++ b/ptgctl/tools/mock.py @@ -32,10 +32,12 @@ async def video_loop(api, src=0, pos=0, **kw): DIVS = 4 @util.async2sync -async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None): +async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None, skill=None): '''Send video (by default your webcam) to the API.''' sid = CAM_POS_SIDS[pos] sid = f'{prefix or ""}{sid}' + if skill: + api.session.start_recipe(skill) async with api.data_push_connect(sid, batch=True) as ws: async for im in _video_feed(src, fps, shape, speed=speed): if pos: From 598b4a749f6f7cc46be9c4eef13fc780a3c15327 Mon Sep 17 00:00:00 2001 From: ptg Date: Wed, 13 Nov 2024 17:08:51 -0500 Subject: [PATCH 2/5] fix websockets breaking change --- ptgctl/core.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ptgctl/core.py b/ptgctl/core.py index 385d470..ba4eef1 100644 --- a/ptgctl/core.py +++ b/ptgctl/core.py @@ -341,7 +341,7 @@ def _ws(self, *url_parts, headers: dict|None=None, connect_kwargs: dict|None=Non log.info('websocket connect: %s', url) # return websockets.connect(url, extra_headers=headers, **(connect_kwargs or {})) - return cls(url, params=params, extra_headers=headers, **(connect_kwargs or {})) + return cls(url, params=params, additional_headers=headers, **(connect_kwargs or {})) def ping(self, error=False): if error: diff --git a/setup.py b/setup.py index 5a2986b..dfcd04a 100755 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ packages=setuptools.find_packages(), entry_points={'console_scripts': ['{name}={name}:main'.format(name=NAME)]}, install_requires=[ - 'requests', 'websockets', 'fire>=0.5.0', + 'requests', 'websockets>=14.0.0', 'fire>=0.5.0', # 'fire @ git+ssh://git@github.com/google/python-fire@master#egg=fire', 'tabulate', 'tqdm', 'IPython', 'redis_record>=0.0.4', From 8be281b321c78585b8d8f69a76a495be199bfd9c Mon Sep 17 00:00:00 2001 From: ptg Date: Wed, 13 Nov 2024 17:17:09 -0500 Subject: [PATCH 3/5] fix async_graph --- ptgctl/async_graph.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ptgctl/async_graph.py b/ptgctl/async_graph.py index 1841ed7..fc978f9 100644 --- a/ptgctl/async_graph.py +++ b/ptgctl/async_graph.py @@ -59,25 +59,28 @@ async def error_handler(func, *a, __retry_every=5, **kw): class SlidingQueue(asyncio.Queue): - def __init__(self, maxsize=1, buffersize=8): + def __init__(self, maxsize=1024, buffersize=32): self.buffersize = buffersize super().__init__(maxsize) def _init(self, maxsize): - self._queue = collections.deque(maxlen=1000) - self._latest = collections.deque(maxlen=1) + self._queue = collections.deque(maxlen=1) self._buffer = collections.deque(maxlen=self.buffersize) + self._unread = collections.deque(maxlen=maxsize) def _put(self, item): - self._latest.append(item) self._queue.append(item) + self._unread.append(item) self._buffer.append(item) def read_buffer(self): # TODO: rename + return list(self._buffer) + + def pop(self): output = [] - for i in range(len(self._queue)): + for i in range(len(self._unread)): try: - output.append(self._queue.popleft()) + output.append(self._unread.popleft()) except IndexError: pass return output From 8528d76907c94c79acc65436fe290c1f6d3c31fc Mon Sep 17 00:00:00 2001 From: ptg Date: Wed, 13 Nov 2024 17:25:03 -0500 Subject: [PATCH 4/5] fix async_graph --- ptgctl/async_graph.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ptgctl/async_graph.py b/ptgctl/async_graph.py index fc978f9..5bb6407 100644 --- a/ptgctl/async_graph.py +++ b/ptgctl/async_graph.py @@ -93,6 +93,17 @@ def push(self, item): self._wakeup_next(self._getters) +class Queue(asyncio.Queue): + def read_buffer(self): + output = [] + for i in range(len(self._unread)): + try: + output.append(self._unread.popleft()) + except IndexError: + pass + return output + + async def sample_producer(q, sleep=1, limit=20, name='put'): print('starting', name) From 685c739621aa9a51540c4459192565f9c90696c0 Mon Sep 17 00:00:00 2001 From: bea Date: Mon, 18 Nov 2024 11:53:53 -0600 Subject: [PATCH 5/5] fix merge conflict --- ptgctl/tools/mock.py | 5 ++++- ptgctl/util/__init__.py | 8 ++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ptgctl/tools/mock.py b/ptgctl/tools/mock.py index 6d7cf7c..3b7d45f 100644 --- a/ptgctl/tools/mock.py +++ b/ptgctl/tools/mock.py @@ -38,6 +38,7 @@ async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepb sid = f'{prefix or ""}{sid}' if skill: api.session.start_recipe(skill) + tlast = 0 async with api.data_push_connect(sid, batch=True) as ws: async for im in _video_feed(src, fps, shape, speed=speed): if pos: @@ -48,7 +49,9 @@ async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepb # print(load(dump_v3(im))) if stepbystep: input() - await ws.send_data([dump_v3(im)], [sid], [util.format_epoch_time(time.time())]) + t=time.time() + await ws.send_data([dump_v3(im)], [sid], [util.format_epoch_time(t, tlast)]) + tlast = t def _img_dump(im, format='jpeg'): from PIL import Image diff --git a/ptgctl/util/__init__.py b/ptgctl/util/__init__.py index 191ffbf..cb0e1ce 100644 --- a/ptgctl/util/__init__.py +++ b/ptgctl/util/__init__.py @@ -51,8 +51,12 @@ def parse_epoch_time(tid: str): def format_time(dt: datetime.datetime): return format_epoch_time(dt.timestamp()) -def format_epoch_time(tid: float): - return f'{int(tid * 1000)}-0' +def format_epoch_time(tid: float, tlast=None): + tms = int(tid * 1000) + i = 0 + if tlast and tms == int(tlast * 1000): + i = int((tid*1000 - tms)*100) + return f'{tms}-{i}' # misc