Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): use deque for insert-order message queue #1316

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions python/mcap/mcap/_message_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import heapq
from typing import List, Optional, Tuple, Union
from abc import ABC, abstractmethod
from collections import deque
from typing import Deque, List, Optional, Tuple, Union

from .records import Channel, ChunkIndex, Message, Schema

Expand Down Expand Up @@ -65,34 +67,68 @@ def position(self) -> Tuple[int, Optional[int]]:
return (self.item[1], self.item[2])


class MessageQueue:
"""A queue of MCAP messages and chunk indices.
def _make_orderable(item: QueueItem, reverse: bool) -> _Orderable:
if isinstance(item, ChunkIndex):
return _ChunkIndexWrapper(item, reverse)
return _MessageTupleWrapper(item, reverse)


class _MessageQueue(ABC):
@abstractmethod
def push(self, item: QueueItem):
raise NotImplementedError()

@abstractmethod
def pop(self) -> QueueItem:
raise NotImplementedError()

@abstractmethod
def __len__(self) -> int:
raise NotImplementedError()

:param log_time_order: if True, this queue acts as a priority queue, ordered by log time.
if False, ``pop()`` returns elements in insert order.
:param reverse: if True, order elements in descending log time order rather than ascending.
"""

def __init__(self, log_time_order: bool, reverse: bool = False):
class LogTimeOrderQueue(_MessageQueue):
def __init__(self, reverse: bool = False):
self._q: List[_Orderable] = []
self._log_time_order = log_time_order
self._reverse = reverse

def push(self, item: QueueItem):
if isinstance(item, ChunkIndex):
orderable = _ChunkIndexWrapper(item, self._reverse)
else:
orderable = _MessageTupleWrapper(item, self._reverse)
if self._log_time_order:
heapq.heappush(self._q, orderable)
else:
self._q.append(orderable)
orderable = _make_orderable(item, self._reverse)
heapq.heappush(self._q, orderable)

def pop(self) -> QueueItem:
if self._log_time_order:
return heapq.heappop(self._q).item
else:
return self._q.pop(0).item
return heapq.heappop(self._q).item

def __len__(self) -> int:
return len(self._q)


class InsertOrderQueue(_MessageQueue):
def __init__(self):
self._q: Deque[QueueItem] = deque()

def push(self, item: QueueItem):
self._q.append(item)

def pop(self) -> QueueItem:
return self._q.popleft() # cspell:disable-line

def __len__(self) -> int:
return len(self._q)


def make_message_queue(
log_time_order: bool = True, reverse: bool = False
) -> _MessageQueue:
"""Create a queue of MCAP messages and chunk indices.

:param log_time_order: if True, this queue acts as a priority queue, ordered by log time.
if False, ``pop()`` returns elements in insert order.
:param reverse: if True, order elements in descending log time order rather than ascending.
only valid if ``log_time_order`` is True, otherwise throws a ValueError.
"""
if log_time_order:
return LogTimeOrderQueue(reverse)
if reverse:
raise ValueError("reverse is only valid with log_time_order=True")
return InsertOrderQueue()
6 changes: 4 additions & 2 deletions python/mcap/mcap/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Tuple,
)

from ._message_queue import MessageQueue
from ._message_queue import make_message_queue
from .data_stream import ReadDataStream, RecordBuilder
from .decoder import DecoderFactory
from .exceptions import DecoderNotFoundError, McapError
Expand Down Expand Up @@ -292,7 +292,9 @@ def iter_messages(
)
return

message_queue = MessageQueue(log_time_order=log_time_order, reverse=reverse)
message_queue = make_message_queue(
log_time_order=log_time_order, reverse=reverse
)
for chunk_index in _chunks_matching_topics(
summary, topics, start_time, end_time
):
Expand Down
34 changes: 29 additions & 5 deletions python/mcap/tests/test_message_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from typing import List

from mcap._message_queue import MessageQueue, QueueItem
from mcap._message_queue import QueueItem, _MessageQueue, make_message_queue
from mcap.records import Channel, ChunkIndex, Message, Schema


Expand Down Expand Up @@ -49,7 +50,7 @@ def dummy_message_tuple(
)


def push_elements(mq: MessageQueue):
def push_elements(mq: _MessageQueue):
mq.push(dummy_chunk_index(3, 6, 100))
mq.push(dummy_chunk_index(1, 2, 400))
mq.push(dummy_chunk_index(4, 5, 500))
Expand All @@ -58,8 +59,13 @@ def push_elements(mq: MessageQueue):
mq.push(dummy_message_tuple(5, 200, 30))


def push_messages_reverse_order(mq: _MessageQueue, n: int = 10_000):
for i in range(n):
mq.push(dummy_message_tuple(n - i, 0, i))


def test_chunk_message_ordering():
mq = MessageQueue(log_time_order=True)
mq = make_message_queue(log_time_order=True)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -81,7 +87,7 @@ def test_chunk_message_ordering():


def test_reverse_ordering():
mq = MessageQueue(log_time_order=True, reverse=True)
mq = make_message_queue(log_time_order=True, reverse=True)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -103,7 +109,7 @@ def test_reverse_ordering():


def test_insert_ordering():
mq = MessageQueue(log_time_order=False)
mq = make_message_queue(log_time_order=False)
push_elements(mq)

results: List[QueueItem] = []
Expand All @@ -122,3 +128,21 @@ def test_insert_ordering():
assert results[4][2] == 20
assert isinstance(results[5], tuple)
assert results[5][2] == 30


def test_insert_order_is_faster():
log_time_order_mq = make_message_queue(log_time_order=True)
push_messages_reverse_order(log_time_order_mq)
log_time_start = time.time()
while log_time_order_mq:
log_time_order_mq.pop()
log_time_end = time.time()

insert_order_mq = make_message_queue(log_time_order=False)
push_messages_reverse_order(insert_order_mq)
insert_start = time.time()
while insert_order_mq:
insert_order_mq.pop()
insert_end = time.time()

assert insert_end - insert_start < log_time_end - log_time_start
Loading