Skip to content

Commit

Permalink
Add linux_utils.network module (network location awareness)
Browse files Browse the repository at this point in the history
  • Loading branch information
xolox committed Feb 9, 2020
1 parent 1230997 commit c9fd8d2
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ following functionality is currently implemented:
- A basic Python API for cryptsetup_ and a Python implementation of
cryptdisks_start_ and cryptdisks_stop_ (with a command line interface).
- Atomic filesystem operations for Linux in Python.
- Simple network location awareness / discovery.

The package is currently tested on cPython 2.7, 3.4, 3.5, 3.6, 3.7 and PyPy
(2.7) on Ubuntu Linux (using `Travis CI`_).
Expand Down
6 changes: 6 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ the `linux-utils` package.
.. automodule:: linux_utils.luks
:members:

:mod:`linux_utils.network`
--------------------------

.. automodule:: linux_utils.network
:members:

:mod:`linux_utils.tabfile`
--------------------------

Expand Down
159 changes: 159 additions & 0 deletions linux_utils/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# linux-utils: Linux system administration tools for Python.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: February 9, 2020
# URL: https://linux-utils.readthedocs.io

"""
Python API for Linux networking tools.
The functions in this module make it possible to inspect the current network
configuration of a Linux system, which can provide hints about the physical
location of the system.
"""

# Standard library modules.
import logging

# Modules included in our package.
from linux_utils import coerce_context

# Public identifiers that require documentation.
__all__ = (
'determine_network_location',
'find_gateway_ip',
'find_gateway_mac',
'find_mac_address',
'have_internet_connection',
'logger',
)

# Initialize a logger for this module.
logger = logging.getLogger(__name__)


def determine_network_location(context=None, **gateways):
"""
Determine the physical location of the current system.
This works by matching the MAC address of the current gateway against a set
of known MAC addresses, which provides a simple but robust way to identify
the current network. Because networks tend to have a physical location,
identifying the current network tells us our physical location.
:param gateways: One or more keyword arguments with lists of strings
containing MAC addresses of known networks.
:param context: See :func:`.coerce_context()` for details.
:returns: The name of the matched MAC address (a string) or :data:`None`
when the MAC address of the current gateway is unknown.
Here's an example involving two networks and a physical location with
multiple gateways:
.. code-block:: python
>>> determine_network_location(
... home=['84:9C:A6:76:23:8E'],
... office=['00:15:C5:5F:92:79', 'B6:25:B2:19:28:61'],
... )
'home'
This is used to tweak my desktop environment based on the physical location
of my laptop, for example at home my external monitor is to the right of my
laptop whereas at work it's the other way around, so the :man:`xrandr`
commands to be run differ between the two locations.
"""
context = coerce_context(context)
current_gateway_mac = find_gateway_mac(context)
if current_gateway_mac:
for network_name, known_gateways in gateways.items():
if any(current_gateway_mac.upper() == gateway.upper() for gateway in known_gateways):
logger.info("%s is connected to the %s network.", context, network_name)
return network_name
logger.info(
"%s isn't connected to a known network (unknown gateway MAC address %s).", context, current_gateway_mac
)
else:
logger.info("Failed to determine gateway of %s, assuming network connection is down.", context)


def find_gateway_ip(context=None):
"""
Find the IP address of the current gateway using the ``ip route show`` command.
:param context: See :func:`.coerce_context()` for details.
:returns: The IP address of the gateway (a string) or :data:`None`.
"""
context = coerce_context(context)
logger.debug("Looking for IP address of current gateway ..")
for line in context.capture("ip", "route", "show").splitlines():
tokens = line.split()
logger.debug("Parsing 'ip route show' output: %s", tokens)
if len(tokens) >= 3 and tokens[:2] == ["default", "via"]:
ip_address = tokens[2]
logger.debug("Found gateway IP address: %s", ip_address)
return ip_address
logger.debug("Couldn't find IP address of gateway in 'ip route show' output!")


def find_gateway_mac(context=None):
"""
Find the MAC address of the current gateway using :func:`find_gateway_ip()` and :func:`find_mac_address()`.
:param context: See :func:`.coerce_context()` for details.
:returns: The MAC address of the gateway (a string) or :data:`None`.
"""
context = coerce_context(context)
ip_address = find_gateway_ip(context)
if ip_address:
mac_address = find_mac_address(ip_address, context)
if mac_address:
logger.debug("Found gateway MAC address: %s", mac_address)
return mac_address
logger.debug("Couldn't find MAC address of gateway in 'arp -n' output!")


def find_mac_address(ip_address, context=None):
"""
Determine the MAC address of an IP address using the ``arp -n`` command.
:param ip_address: The IP address we're interested in (a string).
:param context: See :func:`.coerce_context()` for details.
:returns: The MAC address of the IP address (a string) or :data:`None`.
"""
context = coerce_context(context)
logger.debug("Looking for MAC address of %s ..", ip_address)
for line in context.capture("arp", "-n").splitlines():
tokens = line.split()
logger.debug("Parsing 'arp -n' output: %s", tokens)
if len(tokens) >= 3 and tokens[0] == ip_address:
mac_address = tokens[2]
logger.debug("Found MAC address of %s: %s", ip_address, mac_address)
return mac_address
logger.debug("Couldn't find MAC address in 'arp -n' output!")


def have_internet_connection(endpoint="8.8.8.8", context=None):
"""
Check if an internet connection is available using :man:`ping`.
:param endpoint: The public IP address to :man:`ping` (a string).
:param context: See :func:`.coerce_context()` for details.
:returns: :data:`True` if an internet connection is available,
:data:`False` otherwise.
This works by pinging 8.8.8.8 which is one of `Google public DNS servers`_.
This IP address was chosen because it is documented that Google uses
anycast_ to keep this IP address available at all times.
.. _Google public DNS servers: https://developers.google.com/speed/public-dns/
.. _anycast: https://en.wikipedia.org/wiki/Anycast
"""
context = coerce_context(context)
logger.debug("Checking if %s has internet connectivity ..", context)
if context.test("ping", "-c1", "-w1", "8.8.8.8"):
logger.debug("Confirmed that %s has internet connectivity.", context)
return True
else:
logger.debug("No internet connectivity detected on %s.", context)
return False
75 changes: 73 additions & 2 deletions linux_utils/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Test suite for the `linux-utils' Python package.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: June 24, 2017
# Last Change: February 9, 2020
# URL: https://linux-utils.readthedocs.io

"""Test suite for the `linux-utils` package."""
Expand All @@ -17,7 +17,8 @@
# External dependencies.
from executor import ExternalCommandFailed, execute
from executor.contexts import LocalContext
from humanfriendly.testing import TemporaryDirectory, TestCase, run_cli
from humanfriendly.text import dedent
from humanfriendly.testing import MockedProgram, TemporaryDirectory, TestCase, run_cli
from mock import MagicMock

# The module we're testing.
Expand All @@ -35,6 +36,13 @@
lock_filesystem,
unlock_filesystem,
)
from linux_utils.network import (
determine_network_location,
find_gateway_ip,
find_gateway_mac,
find_mac_address,
have_internet_connection,
)
from linux_utils.tabfile import parse_tab_file

# The following files have fixed locations to enable the configuration file
Expand Down Expand Up @@ -401,3 +409,66 @@ def test_cryptdisks_start_stop_error_reporting(self):
for fallback in cryptdisks_start_cli, cryptdisks_stop_cli:
returncode, output = run_cli(fallback, TEST_UNKNOWN_TARGET, merged=True)
assert returncode != 0

def test_determine_network_location(self):
"""Test :func:`linux_utils.network.determine_network_location()`."""
with self.mock_arp, self.mock_ip:
# Make sure the happy path works as intended.
assert determine_network_location(home=['80:34:58:ad:6c:f5']) == 'home'
# Make sure nothing bad happens when we're not on a known network.
assert determine_network_location() is None

def test_find_gateway_ip(self):
"""Test :func:`linux_utils.network.find_gateway_ip()`."""
with self.mock_ip:
assert find_gateway_ip() == '192.168.1.1'

def test_find_gateway_mac(self):
"""Test :func:`linux_utils.network.find_gateway_mac()`."""
with self.mock_arp, self.mock_ip:
assert find_gateway_mac() == '80:34:58:ad:6c:f5'

def test_find_mac_address(self):
"""Test :func:`linux_utils.network.find_mac_address()`."""
with self.mock_arp:
assert find_mac_address('192.168.1.4') == '4b:21:f5:49:88:85'

def test_have_internet_connection(self):
"""Test :func:`linux_utils.network.have_internet_connection().`"""
with MockedProgram(name='ping', returncode=0):
assert have_internet_connection() is True
with MockedProgram(name='ping', returncode=1):
assert have_internet_connection() is False

@property
def mock_arp(self):
"""Mocked ``arp`` program."""
return MockedProgram(name='arp', script=dedent("""
cat << EOF
Address HWtype HWaddress Flags Mask Iface
192.168.1.4 ether 4b:21:f5:49:88:85 C wlp3s0
192.168.3.28 ether 3d:a6:19:62:9a:83 C wlp3s0
192.168.3.5 ether c5:4c:8d:56:25:0c C wlp3s0
192.168.1.1 ether 80:34:58:ad:6c:f5 C wlp3s0
192.168.3.2 ether 20:22:a0:22:0c:db C wlp3s0
192.168.1.12 ether ad:12:75:46:e9:70 C wlp3s0
192.168.3.6 ether 08:33:c7:ef:f7:27 C wlp3s0
192.168.1.11 ether c9:0e:95:24:68:31 C wlp3s0
192.168.3.4 ether e7:e6:2c:3b:bc:8a C wlp3s0
192.168.3.3 ether 72:d7:d3:2c:54:93 C wlp3s0
192.168.1.6 ether 95:ef:85:cf:d3:36 C wlp3s0
192.168.3.7 ether 65:c0:be:40:cd:31 C wlp3s0
EOF
"""))

@property
def mock_ip(self):
"""Mocked ``ip route show`` program."""
return MockedProgram(name='ip', script=dedent("""
cat << EOF
default via 192.168.1.1 dev wlp3s0 proto dhcp metric 600
169.254.0.0/16 dev virbr0 scope link metric 1000 linkdown
192.168.0.0/16 dev wlp3s0 proto kernel scope link src 192.168.2.214 metric 600
192.168.122.0/24 dev virbr0 proto kernel scope link src 192.168.122.1 linkdown
EOF
"""))
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
coloredlogs >= 7.0
executor >= 16.0.1
humanfriendly >= 5.0
humanfriendly >= 6.0
property-manager >= 2.3
six >= 1.10.0

0 comments on commit c9fd8d2

Please sign in to comment.