diff --git a/ptgctl/async_graph.py b/ptgctl/async_graph.py index 93f2466..5bb6407 100644 --- a/ptgctl/async_graph.py +++ b/ptgctl/async_graph.py @@ -59,20 +59,31 @@ async def error_handler(func, *a, __retry_every=5, **kw): class SlidingQueue(asyncio.Queue): - def __init__(self, maxsize=1, buffersize=8): + def __init__(self, maxsize=1024, buffersize=32): self.buffersize = buffersize super().__init__(maxsize) def _init(self, maxsize): - self._queue = collections.deque(maxlen=maxsize) + self._queue = collections.deque(maxlen=1) self._buffer = collections.deque(maxlen=self.buffersize) + self._unread = collections.deque(maxlen=maxsize) def _put(self, item): self._queue.append(item) + self._unread.append(item) self._buffer.append(item) - def read_buffer(self): + def read_buffer(self): # TODO: rename return list(self._buffer) + + def pop(self): + output = [] + for i in range(len(self._unread)): + try: + output.append(self._unread.popleft()) + except IndexError: + pass + return output def push(self, item): full = self.full() @@ -82,6 +93,17 @@ def push(self, item): self._wakeup_next(self._getters) +class Queue(asyncio.Queue): + def read_buffer(self): + output = [] + for i in range(len(self._unread)): + try: + output.append(self._unread.popleft()) + except IndexError: + pass + return output + + async def sample_producer(q, sleep=1, limit=20, name='put'): print('starting', name) @@ -122,4 +144,4 @@ async def main(): # g.add_consumer(sample_consumer, q, sleep=3, name='get3') if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/ptgctl/core.py b/ptgctl/core.py index 385d470..ba4eef1 100644 --- a/ptgctl/core.py +++ b/ptgctl/core.py @@ -341,7 +341,7 @@ def _ws(self, *url_parts, headers: dict|None=None, connect_kwargs: dict|None=Non log.info('websocket connect: %s', url) # return websockets.connect(url, extra_headers=headers, **(connect_kwargs or {})) - return cls(url, params=params, extra_headers=headers, **(connect_kwargs or {})) + return cls(url, params=params, additional_headers=headers, **(connect_kwargs or {})) def ping(self, error=False): if error: diff --git a/ptgctl/tools/display.py b/ptgctl/tools/display.py index 734b008..d05bf48 100644 --- a/ptgctl/tools/display.py +++ b/ptgctl/tools/display.py @@ -121,16 +121,34 @@ async def raw(api, stream_id, utf=False, **kw): @util.async2sync @util.interruptable -async def file(api, stream_id, out_dir='', include_timestamps=False, **kw): +async def file(api, stream_id, out_dir='', include_timestamps=False, group_sessions=True, **kw): os.makedirs(out_dir or '.', exist_ok=True) + kw.setdefault('latest', False) + SESSION_ID = 'event:session:id' + if group_sessions: + stream_id += '+' + SESSION_ID + async with api.data_pull_connect(stream_id, **kw) as ws: with contextlib.ExitStack() as stack: files = {} pbars = {} + current_session = api.session.id() + print("current session:", current_session) while True: for sid, ts, data in await ws.recv_data(): + if group_sessions and sid == SESSION_ID: + print("Changing session", sid) + stack.pop_all().close() + files.clear() + current_session = data.decode('utf-8') + if group_sessions and not current_session: + continue if sid not in files: - files[sid] = stack.enter_context(open(os.path.join(out_dir, f'{sid}.txt'), 'w')) + sid_file = os.path.join(out_dir, current_session or 'unknown-session' if group_sessions else '', f'{sid}.txt') + print("Opening file", sid_file) + os.makedirs(os.path.dirname(sid_file), exist_ok=True) + files[sid] = stack.enter_context(open(sid_file, 'w')) + if sid not in pbars: pbars[sid] = tqdm.tqdm(desc=sid) files[sid].write(f"{f'{ts}:' if include_timestamps else ''}{data.decode('utf-8')}\n") pbars[sid].update() diff --git a/ptgctl/tools/mock.py b/ptgctl/tools/mock.py index 41a0595..3b7d45f 100644 --- a/ptgctl/tools/mock.py +++ b/ptgctl/tools/mock.py @@ -32,10 +32,12 @@ async def video_loop(api, src=0, pos=0, **kw): DIVS = 4 @util.async2sync -async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None): +async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None, skill=None): '''Send video (by default your webcam) to the API.''' sid = CAM_POS_SIDS[pos] sid = f'{prefix or ""}{sid}' + if skill: + api.session.start_recipe(skill) tlast = 0 async with api.data_push_connect(sid, batch=True) as ws: async for im in _video_feed(src, fps, shape, speed=speed): diff --git a/setup.py b/setup.py index 5a2986b..dfcd04a 100755 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ packages=setuptools.find_packages(), entry_points={'console_scripts': ['{name}={name}:main'.format(name=NAME)]}, install_requires=[ - 'requests', 'websockets', 'fire>=0.5.0', + 'requests', 'websockets>=14.0.0', 'fire>=0.5.0', # 'fire @ git+ssh://git@github.com/google/python-fire@master#egg=fire', 'tabulate', 'tqdm', 'IPython', 'redis_record>=0.0.4',