-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgps.py
72 lines (64 loc) · 2.13 KB
/
gps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Thread for tracking GPS position
Monitors NMEA stream for GGA positions
Caches last GGA position for retrieval via GPSMon.position()
"""
import pynmea2
import serial
import threading
import logging
import time
from typing import Optional
log = logging.getLogger('GPS')
__all__ = ['GPSMon']
class GPSMon(threading.Thread):
"""
Continuously monitor gps position
"""
def __init__(self, port, logfilename: str=None):
super().__init__(name=self.__class__.name, daemon=True)
self.port = port
self.ser = serial.Serial(port, baudrate=9600, timeout=1.0)
self.lastpos: pynmea2.GGA = None
self.lastpostime = time.time()
self.logfilename = logfilename
self.logfile = None
return
def position(self, maxage: Optional[float] = None) -> Optional[pynmea2.GGA]:
"""
Get the current position.
Returns None if we don't have a valid position, or position is older than maxage
:return:
"""
if maxage:
if time.time() - self.lastpostime > maxage:
return None
return self.lastpos
def run(self):
log.info("GPSMon started")
self.reader = pynmea2.NMEAStreamReader(self.ser)
if self.logfilename:
self.logfile = open(self.logfilename, 'ba')
while 1:
line = self.ser.readline()
try:
for msg in self.reader.next(line.decode()):
log.debug(msg)
if msg.sentence_type == 'GGA':
if msg.gps_qual in (1,2):
self.lastpos = msg
self.lastpostime = time.time()
log.debug(msg.__repr__())
except (UnicodeDecodeError, pynmea2.ParseError):
log.warning("Bad line: %s", line)
pass
finally:
if self.logfile:
self.logfile.write(line)
self.logfile.flush()
# Test harness
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
gps = GPSMon('/dev/ttyACM0')
gps.start()
gps.join()