Skip to content

Commit

Permalink
Merge pull request #54 from lesamouraipourpre/parse-gsa-and-gsv
Browse files Browse the repository at this point in the history
Implement GSV and GSA parsing
  • Loading branch information
tannewt authored Feb 23, 2021
2 parents e6d7ba9 + 13b6cc9 commit 1b80dea
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 39 deletions.
125 changes: 86 additions & 39 deletions adafruit_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def __init__(self, uart, debug=False):
self.height_geoid = None
self.speed_knots = None
self.track_angle_deg = None
self.sats = None
self._sats = None # Temporary holder for information from GSV messages
self.sats = None # Completed information from GSV messages
self.isactivedata = None
self.true_track = None
self.mag_track = None
Expand Down Expand Up @@ -121,16 +122,30 @@ def update(self):
print(sentence)
data_type, args = sentence
data_type = bytes(data_type.upper(), "ascii")
# return sentence
if data_type in (
b"GPGLL",
b"GNGLL",
): # GLL, Geographic Position – Latitude/Longitude
(talker, sentence_type) = GPS._parse_talker(data_type)

# Check for all currently known GNSS talkers
# GA - Galileo
# GB - BeiDou Systems
# GI - NavIC
# GL - GLONASS
# GP - GPS
# GQ - QZSS
# GN - GNSS / More than one of the above
if talker not in (b"GA", b"GB", b"GI", b"GL", b"GP", b"GQ", b"GN"):
# It's not a known GNSS source of data
return True

if sentence_type == b"GLL": # Geographic position - Latitude/Longitude
self._parse_gpgll(args)
elif data_type in (b"GPRMC", b"GNRMC"): # RMC, minimum location info
elif sentence_type == b"RMC": # Minimum location info
self._parse_gprmc(args)
elif data_type in (b"GPGGA", b"GNGGA"): # GGA, 3d location fix
elif sentence_type == b"GGA": # 3D location fix
self._parse_gpgga(args)
elif sentence_type == b"GSV": # Satellites in view
self._parse_gpgsv(talker, args)
elif sentence_type == b"GSA": # GPS DOP and active satellites
self._parse_gpgsa(talker, args)
return True

def send_command(self, command, add_checksum=True):
Expand Down Expand Up @@ -241,6 +256,14 @@ def _parse_sentence(self):
data_type = sentence[1:delimiter]
return (data_type, sentence[delimiter + 1 :])

@staticmethod
def _parse_talker(data_type):
# Split the data_type into talker and sentence_type
if data_type[0] == b"P": # Proprietary codes
return (data_type[:1], data_type[1:])

return (data_type[:2], data_type[2:])

def _parse_gpgll(self, args):
data = args.split(",")
if data is None or data[0] is None or (data[0] == ""):
Expand Down Expand Up @@ -402,7 +425,8 @@ def _parse_gpgga(self, args):
self.altitude_m = _parse_float(data[8])
self.height_geoid = _parse_float(data[10])

def _parse_gpgsa(self, args):
def _parse_gpgsa(self, talker, args):
talker = talker.decode("ascii")
data = args.split(",")
if data is None or (data[0] == ""):
return # Unexpected number of params
Expand All @@ -412,9 +436,9 @@ def _parse_gpgsa(self, args):
# Parse 3d fix
self.fix_quality_3d = _parse_int(data[1])
satlist = list(filter(None, data[2:-4]))
self.sat_prns = {}
for i, sat in enumerate(satlist, 1):
self.sat_prns["gps{}".format(i)] = _parse_int(sat)
self.sat_prns = []
for sat in satlist:
self.sat_prns.append("{}{}".format(talker, _parse_int(sat)))

# Parse PDOP, dilution of precision
self.pdop = _parse_float(data[-3])
Expand All @@ -423,9 +447,11 @@ def _parse_gpgsa(self, args):
# Parse VDOP, vertical dilution of precision
self.vdop = _parse_float(data[-1])

def _parse_gpgsv(self, args):
def _parse_gpgsv(self, talker, args):
# Parse the arguments (everything after data type) for NMEA GPGGA
# pylint: disable=too-many-branches
# 3D location fix sentence.
talker = talker.decode("ascii")
data = args.split(",")
if data is None or (data[0] == ""):
return # Unexpected number of params.
Expand All @@ -442,33 +468,54 @@ def _parse_gpgsv(self, args):

sat_tup = data[3:]

satdict = {}
for i in range(len(sat_tup) / 4):
j = i * 4
key = "gps{}".format(i + (4 * (self.mess_num - 1)))
satnum = _parse_int(sat_tup[0 + j]) # Satellite number
satdeg = _parse_int(sat_tup[1 + j]) # Elevation in degrees
satazim = _parse_int(sat_tup[2 + j]) # Azimuth in degrees
satsnr = _parse_int(sat_tup[3 + j]) # signal-to-noise ratio in dB
value = (satnum, satdeg, satazim, satsnr)
satdict[key] = value

if self.sats is None:
self.sats = {}
for satnum in satdict:
self.sats[satnum] = satdict[satnum]
satlist = []
timestamp = time.monotonic()
for i in range(len(sat_tup) // 4):
try:
j = i * 4
value = (
# Satellite number
"{}{}".format(talker, _parse_int(sat_tup[0 + j])),
# Elevation in degrees
_parse_int(sat_tup[1 + j]),
# Azimuth in degrees
_parse_int(sat_tup[2 + j]),
# signal-to-noise ratio in dB
_parse_int(sat_tup[3 + j]),
# Timestamp
timestamp,
)
satlist.append(value)
except ValueError:
# Something wasn't an int
pass

if self._sats is None:
self._sats = []
for value in satlist:
self._sats.append(value)

if self.mess_num == self.total_mess_num:
# Last part of GSV message
if len(self._sats) == self.satellites:
# Transfer received satellites to self.sats
if self.sats is None:
self.sats = {}
else:
# Remove all satellites which haven't
# been seen for 30 seconds
timestamp = time.monotonic()
old = []
for i in self.sats:
sat = self.sats[i]
if (timestamp - sat[4]) > 30:
old.append(i)
for i in old:
self.sats.pop(i)
for sat in self._sats:
self.sats[sat[0]] = sat
self._sats.clear()

try:
if self.satellites < self.satellites_prev:
for i in self.sats:
try:
if int(i[-2]) >= self.satellites:
del self.sats[i]
except ValueError:
if int(i[-1]) >= self.satellites:
del self.sats[i]
except TypeError:
pass
self.satellites_prev = self.satellites


Expand Down
105 changes: 105 additions & 0 deletions examples/gps_satellitefix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# SPDX-FileCopyrightText: 2021 lesamouraipourpre
# SPDX-License-Identifier: MIT

import time
import board

import adafruit_gps

# Create a serial connection for the GPS connection using default speed and
# a slightly higher timeout (GPS modules typically update once a second).
# These are the defaults you should use for the GPS FeatherWing.
# For other boards set RX = GPS module TX, and TX = GPS module RX pins.
# uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=10)

# for a computer, use the pyserial library for uart access
# import serial
# uart = serial.Serial("/dev/ttyUSB0", baudrate=9600, timeout=10)

# If using I2C, we'll create an I2C interface to talk to using default pins
i2c = board.I2C()

# Create a GPS module instance.
# gps = adafruit_gps.GPS(uart, debug=False) # Use UART/pyserial
gps = adafruit_gps.GPS_GtopI2C(i2c, debug=False) # Use I2C interface

# Initialize the GPS module by changing what data it sends and at what rate.
# These are NMEA extensions for PMTK_314_SET_NMEA_OUTPUT and
# PMTK_220_SET_NMEA_UPDATERATE but you can send anything from here to adjust
# the GPS module behavior:
# https://cdn-shop.adafruit.com/datasheets/PMTK_A11.pdf

# Turn on everything (not all of it is parsed!)
gps.send_command(b"PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0")

# Set update rate to once a second (1hz) which is what you typically want.
gps.send_command(b"PMTK220,1000")
# Or decrease to once every two seconds by doubling the millisecond value.
# Be sure to also increase your UART timeout above!
# gps.send_command(b'PMTK220,2000')
# You can also speed up the rate, but don't go too fast or else you can lose
# data during parsing. This would be twice a second (2hz, 500ms delay):
# gps.send_command(b'PMTK220,500')


def format_dop(dop):
# https://en.wikipedia.org/wiki/Dilution_of_precision_(navigation)
if dop > 20:
msg = "Poor"
elif dop > 10:
msg = "Fair"
elif dop > 5:
msg = "Moderate"
elif dop > 2:
msg = "Good"
elif dop > 1:
msg = "Excellent"
else:
msg = "Ideal"
return f"{dop} - {msg}"


talkers = {
"GA": "Galileo",
"GB": "BeiDou",
"GI": "NavIC",
"GL": "GLONASS",
"GP": "GPS",
"GQ": "QZSS",
"GN": "GNSS",
}

# Main loop runs forever printing the location, etc. every second.
last_print = time.monotonic()
while True:
# Make sure to call gps.update() every loop iteration and at least twice
# as fast as data comes from the GPS unit (usually every second).
# This returns a bool that's true if it parsed new data (you can ignore it
# though if you don't care and instead look at the has_fix property).
if not gps.update() or not gps.has_fix:
time.sleep(0.1)
continue

if gps.nmea_sentence[3:6] == "GSA":
print(f"{gps.latitude:.6f}, {gps.longitude:.6f} {gps.altitude_m}m")
print(f"2D Fix: {gps.has_fix} 3D Fix: {gps.has_3d_fix}")
print(f" PDOP (Position Dilution of Precision): {format_dop(gps.pdop)}")
print(f" HDOP (Horizontal Dilution of Precision): {format_dop(gps.hdop)}")
print(f" VDOP (Vertical Dilution of Precision): {format_dop(gps.vdop)}")
print("Satellites used for fix:")
for s in gps.sat_prns:
talker = talkers[s[0:2]]
number = s[2:]
print(f" {talker}-{number} ", end="")
if gps.sats is None:
print("- no info")
else:
try:
sat = gps.sats[s]
if sat is None:
print("- no info")
else:
print(f"Elevation:{sat[1]}* Azimuth:{sat[2]}* SNR:{sat[3]}dB")
except KeyError:
print("- no info")
print()

0 comments on commit 1b80dea

Please sign in to comment.