Skip to content

Commit

Permalink
Pull in changes from internal development. (#15)
Browse files Browse the repository at this point in the history
# Changes
- Refactoring in log_manager and firmware_tool to support additional
automation
 - Support for interface tcp3
 - Support for additional device types

# Bug Fixes
 - Fix timeouts in the case where no data is being received
 - Fix non-blocking serial read
  • Loading branch information
axlan authored Jan 8, 2025
2 parents c558420 + 21bbc34 commit 63bb6bc
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 155 deletions.
1 change: 1 addition & 0 deletions bin/config_message_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'udp2': InterfaceID(TransportType.UDP, 2),
'tcp1': InterfaceID(TransportType.TCP, 1),
'tcp2': InterfaceID(TransportType.TCP, 2),
'tcp3': InterfaceID(TransportType.TCP, 3),
'file': InterfaceID(TransportType.FILE, 1),
'unix1': InterfaceID(TransportType.UNIX, 1),
'unix2': InterfaceID(TransportType.UNIX, 2),
Expand Down
2 changes: 1 addition & 1 deletion bin/p1_convert_user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path
from typing import Type

import construct # For ignoring wrapper class in DeepDiff
import construct # For ignoring wrapper class in DeepDiff
from deepdiff import DeepDiff
from fusion_engine_client.messages.configuration import (
ConfigurationSource, DataType, DataVersion, PlatformStorageDataMessage,
Expand Down
315 changes: 175 additions & 140 deletions firmware_tools/lg69t/firmware_tool.py

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions p1_runner/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ClientConnection:
logger = logging.getLogger('point_one.data_source')


RESPONSE_TIMEOUT = 5
RESPONSE_TIMEOUT = 5.0
RX_BYTE_TIMEOUT = 0.1
MAX_DATA_BUFFER_SIZE = 10 * 1024 * 1024
DATA_BUFFER_DROP_SIZE = 1 * 1024 * 1024
Expand Down Expand Up @@ -205,11 +205,13 @@ def read(self, size: int, timeout=RESPONSE_TIMEOUT) -> bytes:
if self.rx_thread is None:
raise RuntimeError('Reading DeviceInterface without calling "start_rx_thread".')
data = b''
start_time = time.time()
while size > 0 and time.time() - start_time < timeout:
start_time = time.monotonic()
now = start_time
while size > 0 and now - start_time <= timeout:
logger.trace(f'Buffered {len(self.data_buffer)} B.')
if not self.data_event.wait(RX_BYTE_TIMEOUT):
logger.debug('Timed out waiting for byte to be added to buffer.')
now = time.monotonic()
continue
self.data_lock.acquire()

Expand All @@ -224,6 +226,7 @@ def read(self, size: int, timeout=RESPONSE_TIMEOUT) -> bytes:
size = 0

self.data_lock.release()
now = time.monotonic()
if self.rx_log:
self.rx_log.write(data)
return data
Expand Down
14 changes: 13 additions & 1 deletion p1_runner/device_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fusion_engine_client.messages import *
from fusion_engine_client.parsers import (FusionEngineDecoder,
FusionEngineEncoder)
from fusion_engine_client.parsers.decoder import MessageWithBytesTuple

import p1_runner.trace as logging
from p1_runner.data_source import RESPONSE_TIMEOUT, DataSource
Expand Down Expand Up @@ -135,6 +136,17 @@ def wait_for_reboot(self, data_stop_timeout=REBOOT_MAX_START_TIME, data_restart_

return reboot_started and reboot_finished

def wait_for_any_fe_message(self, response_timeout=RESPONSE_TIMEOUT) -> List[MessageWithBytesTuple]:
start_time = time.time()
while time.time() - start_time < response_timeout:
msgs = self.fe_decoder.on_data(self.data_source.read(1, response_timeout))
if len(msgs) > 0:
return msgs # type: ignore
return []

def poll_messages(self, read_buffer_size=MAX_FE_MSG_SIZE, response_timeout=0.0) -> List[MessageWithBytesTuple]:
return self.fe_decoder.on_data(self.data_source.read(read_buffer_size, response_timeout)) # type: ignore

def wait_for_message(self, msg_type, response_timeout=RESPONSE_TIMEOUT):
if isinstance(msg_type, MessageType):
return self._wait_for_fe_message(msg_type, response_timeout)
Expand All @@ -144,7 +156,7 @@ def wait_for_message(self, msg_type, response_timeout=RESPONSE_TIMEOUT):
def _wait_for_fe_message(self, msg_type, response_timeout):
start_time = time.time()
while True:
msgs = self.fe_decoder.on_data(self.data_source.read(1))
msgs = self.fe_decoder.on_data(self.data_source.read(1, response_timeout))
for msg in msgs:
if msg[0].message_type == msg_type:
logger.debug('Response: %s', str(msg[1]))
Expand Down
26 changes: 24 additions & 2 deletions p1_runner/device_type.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import re
from enum import Enum, auto
from typing import Optional
from typing import Dict, Optional


class DeviceType(Enum):
UNKNOWN = auto()

ATLAS = auto()
LG69T_AM = auto()
LG69T_AP = auto()
LG69T_AH = auto()
LG69T_AJ = auto()
ATLAS = auto()

BEAM2K = auto()
DJI_MAVIC = auto()
Expand All @@ -26,6 +27,17 @@ def is_lg69t(self) -> bool:
def device_uses_unframed_logs(self) -> bool:
return self.is_lg69t() or self is DeviceType.LC29H

def is_gnss_only(self) -> bool:
return self in (DeviceType.LG69T_AM,)

@classmethod
def mapping_device_to_regex(cls) -> Dict['DeviceType', str]:
return {
DeviceType.ATLAS: 'v[0-9]*.*',
DeviceType.LG69T_AM: 'lg69t-am-v[0-9]*.*',
DeviceType.LG69T_AP: 'lg69t-ap-v[0-9]*.*',
}

@classmethod
def from_string(cls, name: Optional[str]) -> 'DeviceType':
if name is not None:
Expand All @@ -35,3 +47,13 @@ def from_string(cls, name: Optional[str]) -> 'DeviceType':
pass

return DeviceType.UNKNOWN

@classmethod
def get_build_type_from_version(cls, version_str) -> Optional['DeviceType']:
mapping = cls.mapping_device_to_regex()
for key, val in mapping.items():
r = fr'{val}'
if re.match(r, version_str):
return key

return None
28 changes: 21 additions & 7 deletions p1_runner/log_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import uuid
from datetime import datetime, timezone
from typing import Optional

from . import trace as logging
from .log_manifest import DeviceType, LogManifest
Expand All @@ -20,7 +21,7 @@ class LogManager(threading.Thread):

def __init__(
self, device_id, device_type='UNKNOWN', logs_base_dir='/logs', files=None, log_extension='.raw',
create_symlink=True, log_created_cmd=None, log_timestamps=True):
create_symlink=True, log_created_cmd=None, log_timestamps=True, directory_to_reuse: Optional[str] = None):
super().__init__(name='log_manager')

self.device_id = device_id
Expand All @@ -29,11 +30,13 @@ def __init__(
self.create_symlink = create_symlink
self.log_created_cmd = log_created_cmd
self.data_filename = 'input' + log_extension
self.directory_to_reuse = directory_to_reuse

self.log_guid = None
self.creation_time = None
self.sequence_num = None
self.log_dir = None
self.timestamp_path = None
self.log_timestamps = log_timestamps
self.start_time = time.time()
self.last_timestamp = time.time()
Expand All @@ -54,14 +57,19 @@ def get_abs_file_path(self, relative_path):
else:
return os.path.join(self.log_dir, relative_path)

def start(self):
self.logger.debug('Starting log manager.')
def create_log_dir(self):
if self.directory_to_reuse:
self.log_dir = self.directory_to_reuse
if not os.path.exists(self.log_dir):
raise IOError("Log directory '%s' doesn't exist." % self.log_dir)
else:
return

self.log_guid = str(uuid.uuid4()).replace('-', '')
self.creation_time = datetime.now(tz=timezone.utc)

self.log_dir = os.path.join(self.logs_base_dir, self.creation_time.strftime('%Y-%m-%d'), self.device_id,
self.log_guid)

if os.path.exists(self.log_dir):
raise IOError("Log directory '%s' already exists." % self.log_dir)
else:
Expand Down Expand Up @@ -93,6 +101,10 @@ def start(self):

self._create_manifest()

def start(self):
self.logger.debug('Starting log manager.')
if self.log_dir is None:
self.create_log_dir()
super().start()

def stop(self):
Expand All @@ -114,9 +126,9 @@ def run(self):
self.logger.debug("Opening bin file '%s'." % path)
timestamp_file = None
if self.log_timestamps:
timestamp_path = os.path.join(self.log_dir, self.data_filename + '.timestamps')
self.logger.debug("Opening timestamp file '%s'." % timestamp_path)
timestamp_file = open(timestamp_path, 'wb')
self.timestamp_path = os.path.join(self.log_dir, self.data_filename + '.timestamps')
self.logger.debug("Opening timestamp file '%s'." % self.timestamp_path)
timestamp_file = open(self.timestamp_path, 'wb')
with open(path, 'wb') as bin_file:
if self.log_created_cmd is not None:
try:
Expand Down Expand Up @@ -165,6 +177,8 @@ def _create_manifest(self):
manifest.device_type = self.device_type if self.device_type is not None else 'UNKNOWN'

manifest.channels.append(self.data_filename)
if self.timestamp_path:
manifest.channels.append(self.timestamp_path)
manifest.channels.extend(self.files)
manifest.channels.sort()

Expand Down
3 changes: 2 additions & 1 deletion p1_runner/ntrip_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import traceback
from datetime import datetime, timezone
from functools import reduce
from typing import Iterable

import ntripstreams
from serial import SerialException
Expand Down Expand Up @@ -54,7 +55,7 @@ def set_data_callback(self, callback):
def is_connected(self):
return self.connected

def send_position(self, lla_deg: list, time: datetime = None):
def send_position(self, lla_deg: Iterable[float], time: datetime = None):
if not self.connected:
self.logger.trace('Not connected. Ignoring position update. [%.8f, %.8f, %.2f]' % tuple(lla_deg))
return False
Expand Down

0 comments on commit 63bb6bc

Please sign in to comment.