Skip to content

Commit

Permalink
merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
beasteers committed Nov 18, 2024
2 parents 3a190a0 + 685c739 commit 632056c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
30 changes: 26 additions & 4 deletions ptgctl/async_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,31 @@ async def error_handler(func, *a, __retry_every=5, **kw):


class SlidingQueue(asyncio.Queue):
def __init__(self, maxsize=1, buffersize=8):
def __init__(self, maxsize=1024, buffersize=32):
self.buffersize = buffersize
super().__init__(maxsize)

def _init(self, maxsize):
self._queue = collections.deque(maxlen=maxsize)
self._queue = collections.deque(maxlen=1)
self._buffer = collections.deque(maxlen=self.buffersize)
self._unread = collections.deque(maxlen=maxsize)

def _put(self, item):
self._queue.append(item)
self._unread.append(item)
self._buffer.append(item)

def read_buffer(self):
def read_buffer(self): # TODO: rename
return list(self._buffer)

def pop(self):
output = []
for i in range(len(self._unread)):
try:
output.append(self._unread.popleft())
except IndexError:
pass
return output

def push(self, item):
full = self.full()
Expand All @@ -82,6 +93,17 @@ def push(self, item):
self._wakeup_next(self._getters)


class Queue(asyncio.Queue):
def read_buffer(self):
output = []
for i in range(len(self._unread)):
try:
output.append(self._unread.popleft())
except IndexError:
pass
return output



async def sample_producer(q, sleep=1, limit=20, name='put'):
print('starting', name)
Expand Down Expand Up @@ -122,4 +144,4 @@ async def main():
# g.add_consumer(sample_consumer, q, sleep=3, name='get3')

if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())
2 changes: 1 addition & 1 deletion ptgctl/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def _ws(self, *url_parts, headers: dict|None=None, connect_kwargs: dict|None=Non

log.info('websocket connect: %s', url)
# return websockets.connect(url, extra_headers=headers, **(connect_kwargs or {}))
return cls(url, params=params, extra_headers=headers, **(connect_kwargs or {}))
return cls(url, params=params, additional_headers=headers, **(connect_kwargs or {}))

def ping(self, error=False):
if error:
Expand Down
22 changes: 20 additions & 2 deletions ptgctl/tools/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,34 @@ async def raw(api, stream_id, utf=False, **kw):

@util.async2sync
@util.interruptable
async def file(api, stream_id, out_dir='', include_timestamps=False, **kw):
async def file(api, stream_id, out_dir='', include_timestamps=False, group_sessions=True, **kw):
os.makedirs(out_dir or '.', exist_ok=True)
kw.setdefault('latest', False)
SESSION_ID = 'event:session:id'
if group_sessions:
stream_id += '+' + SESSION_ID

async with api.data_pull_connect(stream_id, **kw) as ws:
with contextlib.ExitStack() as stack:
files = {}
pbars = {}
current_session = api.session.id()
print("current session:", current_session)
while True:
for sid, ts, data in await ws.recv_data():
if group_sessions and sid == SESSION_ID:
print("Changing session", sid)
stack.pop_all().close()
files.clear()
current_session = data.decode('utf-8')
if group_sessions and not current_session:
continue
if sid not in files:
files[sid] = stack.enter_context(open(os.path.join(out_dir, f'{sid}.txt'), 'w'))
sid_file = os.path.join(out_dir, current_session or 'unknown-session' if group_sessions else '', f'{sid}.txt')
print("Opening file", sid_file)
os.makedirs(os.path.dirname(sid_file), exist_ok=True)
files[sid] = stack.enter_context(open(sid_file, 'w'))
if sid not in pbars:
pbars[sid] = tqdm.tqdm(desc=sid)
files[sid].write(f"{f'{ts}:' if include_timestamps else ''}{data.decode('utf-8')}\n")
pbars[sid].update()
Expand Down
4 changes: 3 additions & 1 deletion ptgctl/tools/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ async def video_loop(api, src=0, pos=0, **kw):

DIVS = 4
@util.async2sync
async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None):
async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None, skill=None):
'''Send video (by default your webcam) to the API.'''
sid = CAM_POS_SIDS[pos]
sid = f'{prefix or ""}{sid}'
if skill:
api.session.start_recipe(skill)
tlast = 0
async with api.data_push_connect(sid, batch=True) as ws:
async for im in _video_feed(src, fps, shape, speed=speed):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
packages=setuptools.find_packages(),
entry_points={'console_scripts': ['{name}={name}:main'.format(name=NAME)]},
install_requires=[
'requests', 'websockets', 'fire>=0.5.0',
'requests', 'websockets>=14.0.0', 'fire>=0.5.0',
# 'fire @ git+ssh://git@github.com/google/python-fire@master#egg=fire',
'tabulate', 'tqdm', 'IPython',
'redis_record>=0.0.4',
Expand Down

0 comments on commit 632056c

Please sign in to comment.