From 6250e07b6975a9ebd13eb68814464529464d66bf Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Mon, 20 Jan 2025 11:54:30 -0500 Subject: [PATCH 1/6] split mq to two impls, use deque for FIFO --- python/mcap/mcap/_message_queue.py | 76 ++++++++++++++++++------- python/mcap/mcap/reader.py | 4 +- python/mcap/tests/test_message_queue.py | 33 +++++++++-- 3 files changed, 85 insertions(+), 28 deletions(-) diff --git a/python/mcap/mcap/_message_queue.py b/python/mcap/mcap/_message_queue.py index 7f5abeac0c..0b8a3155ba 100644 --- a/python/mcap/mcap/_message_queue.py +++ b/python/mcap/mcap/_message_queue.py @@ -1,5 +1,7 @@ import heapq -from typing import List, Optional, Tuple, Union +from abc import ABC, abstractmethod +from collections import deque +from typing import Deque, List, Optional, Tuple, Union from .records import Channel, ChunkIndex, Message, Schema @@ -65,34 +67,66 @@ def position(self) -> Tuple[int, Optional[int]]: return (self.item[1], self.item[2]) -class MessageQueue: - """A queue of MCAP messages and chunk indices. +def _make_orderable(item: QueueItem, reverse: bool) -> _Orderable: + if isinstance(item, ChunkIndex): + return _ChunkIndexWrapper(item, reverse) + return _MessageTupleWrapper(item, reverse) - :param log_time_order: if True, this queue acts as a priority queue, ordered by log time. - if False, ``pop()`` returns elements in insert order. - :param reverse: if True, order elements in descending log time order rather than ascending. - """ - def __init__(self, log_time_order: bool, reverse: bool = False): +class _MessageQueue(ABC): + + @abstractmethod + def push(self, item: QueueItem): + raise NotImplementedError() + + @abstractmethod + def pop(self) -> QueueItem: + raise NotImplementedError() + + @abstractmethod + def __len__(self) -> int: + raise NotImplementedError() + + +class LogTimeOrderQueue(_MessageQueue): + def __init__(self, reverse: bool = False): self._q: List[_Orderable] = [] - self._log_time_order = log_time_order self._reverse = reverse def push(self, item: QueueItem): - if isinstance(item, ChunkIndex): - orderable = _ChunkIndexWrapper(item, self._reverse) - else: - orderable = _MessageTupleWrapper(item, self._reverse) - if self._log_time_order: - heapq.heappush(self._q, orderable) - else: - self._q.append(orderable) + orderable = _make_orderable(item, self._reverse) + heapq.heappush(self._q, orderable) def pop(self) -> QueueItem: - if self._log_time_order: - return heapq.heappop(self._q).item - else: - return self._q.pop(0).item + return heapq.heappop(self._q).item def __len__(self) -> int: return len(self._q) + + +class InsertOrderQueue(_MessageQueue): + def __init__(self, reverse: bool = False): + self._q: Deque[_Orderable] = deque() + self._reverse = reverse + + def push(self, item: QueueItem): + orderable = _make_orderable(item, self._reverse) + self._q.append(orderable) + + def pop(self) -> QueueItem: + return self._q.popleft().item + + def __len__(self) -> int: + return len(self._q) + + +def make_message_queue(log_time_order: bool = True, reverse: bool = False) -> _MessageQueue: + """Create a queue of MCAP messages and chunk indices. + + :param log_time_order: if True, this queue acts as a priority queue, ordered by log time. + if False, ``pop()`` returns elements in insert order. + :param reverse: if True, order elements in descending log time order rather than ascending. + """ + if log_time_order: + return LogTimeOrderQueue(reverse) + return InsertOrderQueue(reverse) diff --git a/python/mcap/mcap/reader.py b/python/mcap/mcap/reader.py index 453be08182..48109c4fd5 100644 --- a/python/mcap/mcap/reader.py +++ b/python/mcap/mcap/reader.py @@ -15,7 +15,7 @@ Tuple, ) -from ._message_queue import MessageQueue +from ._message_queue import make_message_queue from .data_stream import ReadDataStream, RecordBuilder from .decoder import DecoderFactory from .exceptions import DecoderNotFoundError, McapError @@ -292,7 +292,7 @@ def iter_messages( ) return - message_queue = MessageQueue(log_time_order=log_time_order, reverse=reverse) + message_queue = make_message_queue(log_time_order=log_time_order, reverse=reverse) for chunk_index in _chunks_matching_topics( summary, topics, start_time, end_time ): diff --git a/python/mcap/tests/test_message_queue.py b/python/mcap/tests/test_message_queue.py index 16d133b5b2..cfd650f6fa 100644 --- a/python/mcap/tests/test_message_queue.py +++ b/python/mcap/tests/test_message_queue.py @@ -1,6 +1,7 @@ +import time from typing import List -from mcap._message_queue import MessageQueue, QueueItem +from mcap._message_queue import make_message_queue, _MessageQueue, QueueItem from mcap.records import Channel, ChunkIndex, Message, Schema @@ -49,7 +50,7 @@ def dummy_message_tuple( ) -def push_elements(mq: MessageQueue): +def push_elements(mq: _MessageQueue): mq.push(dummy_chunk_index(3, 6, 100)) mq.push(dummy_chunk_index(1, 2, 400)) mq.push(dummy_chunk_index(4, 5, 500)) @@ -58,8 +59,13 @@ def push_elements(mq: MessageQueue): mq.push(dummy_message_tuple(5, 200, 30)) +def push_messages_reverse_order(mq: _MessageQueue, n: int = 10_000): + for i in range(n): + mq.push(dummy_message_tuple(n-i, 0, i)) + + def test_chunk_message_ordering(): - mq = MessageQueue(log_time_order=True) + mq = make_message_queue(log_time_order=True) push_elements(mq) results: List[QueueItem] = [] @@ -81,7 +87,7 @@ def test_chunk_message_ordering(): def test_reverse_ordering(): - mq = MessageQueue(log_time_order=True, reverse=True) + mq = make_message_queue(log_time_order=True, reverse=True) push_elements(mq) results: List[QueueItem] = [] @@ -103,7 +109,7 @@ def test_reverse_ordering(): def test_insert_ordering(): - mq = MessageQueue(log_time_order=False) + mq = make_message_queue(log_time_order=False) push_elements(mq) results: List[QueueItem] = [] @@ -122,3 +128,20 @@ def test_insert_ordering(): assert results[4][2] == 20 assert isinstance(results[5], tuple) assert results[5][2] == 30 + +def test_insert_order_is_faster(): + log_time_order_mq = make_message_queue(log_time_order=True) + push_messages_reverse_order(log_time_order_mq) + log_time_start = time.time() + while log_time_order_mq: + log_time_order_mq.pop() + log_time_end = time.time() + + insert_order_mq = make_message_queue(log_time_order=False) + push_messages_reverse_order(insert_order_mq) + insert_start = time.time() + while insert_order_mq: + insert_order_mq.pop() + insert_end = time.time() + + assert insert_end - insert_start < log_time_end - log_time_start From fe5e94d241fdc73ddd7a46ac331ac5b4b8c1edd9 Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Mon, 20 Jan 2025 12:57:49 -0500 Subject: [PATCH 2/6] flake --- python/mcap/tests/test_message_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/mcap/tests/test_message_queue.py b/python/mcap/tests/test_message_queue.py index cfd650f6fa..a25a986f30 100644 --- a/python/mcap/tests/test_message_queue.py +++ b/python/mcap/tests/test_message_queue.py @@ -129,6 +129,7 @@ def test_insert_ordering(): assert isinstance(results[5], tuple) assert results[5][2] == 30 + def test_insert_order_is_faster(): log_time_order_mq = make_message_queue(log_time_order=True) push_messages_reverse_order(log_time_order_mq) From 00e82e5189955aa21491108452b4807222c5b15f Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Mon, 20 Jan 2025 13:00:52 -0500 Subject: [PATCH 3/6] ignore spellchecker --- python/mcap/mcap/_message_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/mcap/mcap/_message_queue.py b/python/mcap/mcap/_message_queue.py index 0b8a3155ba..2248daeda6 100644 --- a/python/mcap/mcap/_message_queue.py +++ b/python/mcap/mcap/_message_queue.py @@ -114,7 +114,7 @@ def push(self, item: QueueItem): self._q.append(orderable) def pop(self) -> QueueItem: - return self._q.popleft().item + return self._q.popleft().item # cspell:disable-line def __len__(self) -> int: return len(self._q) From 350ed8812704a0bdd62b0aeb8a2028feecb39544 Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Mon, 20 Jan 2025 13:20:29 -0500 Subject: [PATCH 4/6] black fmt --- python/mcap/mcap/_message_queue.py | 5 +++-- python/mcap/mcap/reader.py | 4 +++- python/mcap/tests/test_message_queue.py | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/python/mcap/mcap/_message_queue.py b/python/mcap/mcap/_message_queue.py index 2248daeda6..c63f2c4cef 100644 --- a/python/mcap/mcap/_message_queue.py +++ b/python/mcap/mcap/_message_queue.py @@ -74,7 +74,6 @@ def _make_orderable(item: QueueItem, reverse: bool) -> _Orderable: class _MessageQueue(ABC): - @abstractmethod def push(self, item: QueueItem): raise NotImplementedError() @@ -120,7 +119,9 @@ def __len__(self) -> int: return len(self._q) -def make_message_queue(log_time_order: bool = True, reverse: bool = False) -> _MessageQueue: +def make_message_queue( + log_time_order: bool = True, reverse: bool = False +) -> _MessageQueue: """Create a queue of MCAP messages and chunk indices. :param log_time_order: if True, this queue acts as a priority queue, ordered by log time. diff --git a/python/mcap/mcap/reader.py b/python/mcap/mcap/reader.py index 48109c4fd5..c07d3f9308 100644 --- a/python/mcap/mcap/reader.py +++ b/python/mcap/mcap/reader.py @@ -292,7 +292,9 @@ def iter_messages( ) return - message_queue = make_message_queue(log_time_order=log_time_order, reverse=reverse) + message_queue = make_message_queue( + log_time_order=log_time_order, reverse=reverse + ) for chunk_index in _chunks_matching_topics( summary, topics, start_time, end_time ): diff --git a/python/mcap/tests/test_message_queue.py b/python/mcap/tests/test_message_queue.py index a25a986f30..425f5ba2b7 100644 --- a/python/mcap/tests/test_message_queue.py +++ b/python/mcap/tests/test_message_queue.py @@ -61,7 +61,7 @@ def push_elements(mq: _MessageQueue): def push_messages_reverse_order(mq: _MessageQueue, n: int = 10_000): for i in range(n): - mq.push(dummy_message_tuple(n-i, 0, i)) + mq.push(dummy_message_tuple(n - i, 0, i)) def test_chunk_message_ordering(): From e87b2d32a9c20fd91ef6e8368a1cd82f117fc4f6 Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Mon, 20 Jan 2025 13:22:00 -0500 Subject: [PATCH 5/6] isort --- python/mcap/tests/test_message_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/mcap/tests/test_message_queue.py b/python/mcap/tests/test_message_queue.py index 425f5ba2b7..90737b5934 100644 --- a/python/mcap/tests/test_message_queue.py +++ b/python/mcap/tests/test_message_queue.py @@ -1,7 +1,7 @@ import time from typing import List -from mcap._message_queue import make_message_queue, _MessageQueue, QueueItem +from mcap._message_queue import QueueItem, _MessageQueue, make_message_queue from mcap.records import Channel, ChunkIndex, Message, Schema From 735eb7f3bc8caf80d572644024a623a83499d8c6 Mon Sep 17 00:00:00 2001 From: Alexander Reynolds Date: Tue, 21 Jan 2025 18:09:36 -0500 Subject: [PATCH 6/6] remove orderable for insert order q --- python/mcap/mcap/_message_queue.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/mcap/mcap/_message_queue.py b/python/mcap/mcap/_message_queue.py index c63f2c4cef..f0452874f7 100644 --- a/python/mcap/mcap/_message_queue.py +++ b/python/mcap/mcap/_message_queue.py @@ -104,16 +104,14 @@ def __len__(self) -> int: class InsertOrderQueue(_MessageQueue): - def __init__(self, reverse: bool = False): - self._q: Deque[_Orderable] = deque() - self._reverse = reverse + def __init__(self): + self._q: Deque[QueueItem] = deque() def push(self, item: QueueItem): - orderable = _make_orderable(item, self._reverse) - self._q.append(orderable) + self._q.append(item) def pop(self) -> QueueItem: - return self._q.popleft().item # cspell:disable-line + return self._q.popleft() # cspell:disable-line def __len__(self) -> int: return len(self._q) @@ -127,7 +125,10 @@ def make_message_queue( :param log_time_order: if True, this queue acts as a priority queue, ordered by log time. if False, ``pop()`` returns elements in insert order. :param reverse: if True, order elements in descending log time order rather than ascending. + only valid if ``log_time_order`` is True, otherwise throws a ValueError. """ if log_time_order: return LogTimeOrderQueue(reverse) - return InsertOrderQueue(reverse) + if reverse: + raise ValueError("reverse is only valid with log_time_order=True") + return InsertOrderQueue()