ssahani / rpms / dhcpcd

Forked from rpms/dhcpcd 5 years ago
Clone
Blob Blame History Raw
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1+
# ~~~
#   Description: Tests for dhcpcd - a DHCP client
#
#   Author: Susant Sahani <susant@redhat.com>
#   Copyright (c) 2018 Red Hat, Inc.
# ~~~

import errno
import os
import sys
import time
import unittest
import subprocess
import signal
import shutil
import psutil
import socket
from pyroute2 import IPRoute

DHCPCD_CI_DIR="/var/run/dhcpcd-ci"
DHCPCD_LOG_FILE='/var/run/dhcpcd-ci/dhcpcd-test-log'
DHCPCD_PID_FILE='/var/run/dhcpcd.pid'

DHCPCD_TCP_DUMP_FILE='/tmp/dhcpcd-tcp-dump.pcap'

DNSMASQ_PID_FILE='/var/run/dhcpcd-ci/test-dnsmasq.pid'
DNSMASQ_LOG_FILE='/var/run/dhcpcd-ci/dnsmasq-log-file'

def setUpModule():
    """Initialize the environment, and perform sanity checks on it."""

    if shutil.which('dhcpcd') is None:
        raise OSError(errno.ENOENT, 'dhcpcd not found')

    if shutil.which('dnsmasq') is None:
        raise OSError(errno.ENOENT, 'dnsmasq not found')


def tearDownModule():
    pass

class GenericUtilities():
    """Provide a set of utility functions start stop daemons. write config files etc """

    def StartDnsMasq(self, conf):
        """Start DnsMasq"""

        conf_file=os.path.join(DHCPCD_CI_DIR, conf)

        subprocess.check_output(['dnsmasq', '-8', DNSMASQ_LOG_FILE, '--log-dhcp', '--pid-file=/var/run/dhcpcd-ci/test-dnsmasq.pid',
                                 '-C', conf_file, '-i', 'veth-peer', '-R'])

    def StartDhcpcd(self, conf):
        """ Start dnsmaq """

        conf_file=os.path.join(DHCPCD_CI_DIR, conf)
        subprocess.check_output(['dhcpcd', '-4', '-M', '-d', '--logfile', DHCPCD_LOG_FILE, '-f', conf_file, 'veth-test'])

    def StopDaemon(self, pid_file):

        with open(pid_file, 'r') as f:
            pid = f.read().rstrip(' \t\r\n\0')
        os.kill(int(pid), signal.SIGTERM)

        os.remove(pid_file)

    def findTextInDaemonLogs(self, log_file, **kwargs):
        """dnsmasq server logs."""

        if kwargs is not None:
            with open (log_file, 'rt') as in_file:
                contents = in_file.read()
                for key in kwargs:
                    self.assertRegex(contents, kwargs[key])

    def FindProtocolFieldsinTCPDump(self, **kwargs):
        """Look attributes in tcpdump."""

        contents = subprocess.check_output(['tcpdump', '-vv', '-r', DHCPCD_TCP_DUMP_FILE]).rstrip().decode('utf-8')
        if kwargs is not None:
                for key in kwargs:
                    self.assertRegex(contents, kwargs[key])

    def SetupVethInterface(self):
        """Setup veth interface"""

        ip = IPRoute()

        ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
        idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
        idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]

        ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08')
        ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09')
        ip.link('set', index=idx_veth_test, state='up')
        ip.link('set', index=idx_veth_peer, state='up')
        ip.addr('add', index=idx_veth_peer, address='192.168.111.50')
        ip.close()

    def TearDownVethInterface(self):

        ip = IPRoute()
        ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
        ip.close()

    def StartCaptureBOOTPPackets(self):
        """Start tcpdump to capture dhcp packets"""

        subprocess.check_output(['systemctl','restart', 'tcpdumpd.service'])

    def StopCapturingPackets(self):
        subprocess.check_output(['systemctl', 'stop', 'tcpdumpd.service'])
        time.sleep(3);

class DhcpcdTests(unittest.TestCase, GenericUtilities):

    def setUp(self):
        """ setup veth and write radvd and dhcpv6configs """
        self.SetupVethInterface()

    def tearDown(self):
        self.StopDaemon(DHCPCD_PID_FILE)
        self.StopDaemon(DNSMASQ_PID_FILE)

        self.TearDownVethInterface()

    def test_dhcpcd_ipv4(self):
        """ dhcpcd gets address """

        self.StartDnsMasq('dnsmasq-ipv4.conf')
        time.sleep(1)
        self.StartDhcpcd('dhcpcd-domain-dns.conf')

        time.sleep(5)
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address prefix
        self.assertRegex(output, "192.168.111.*")

        # Default route
        output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
        self.assertRegex(output, "default via 192.168.1.1*")

    def test_dhcpcd_dns_domain(self):
        """ dhcpcd request DNS and domain name """

        self.StartDnsMasq('dnsmasq-ipv4.conf')
        time.sleep(1)
        self.StartDhcpcd('dhcpcd-domain-dns.conf')

        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address prefix
        self.assertRegex(output, "192.168.111.*")

        # Default route
        output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
        self.assertRegex(output, "default via 192.168.1.1*")

        # Dump the lease file
        output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
        self.assertRegex(output, 'domain_name=example-test.com')
        self.assertRegex(output, 'domain_name_servers=\'8.8.8.8 8.8.4.4\'')
        self.assertRegex(output, 'routers=192.168.1.1')

    def test_dhcpcd_mtu(self):
        """ dhcpcd gets MTU 1492 """

        self.StartDnsMasq('dnsmasq-mtu.conf')
        time.sleep(1)

        self.StartDhcpcd('dhcpcd-mtu.conf')
        time.sleep(5)
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address prefix
        self.assertRegex(output, "192.168.111.*")

        # Dump the lease file
        output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
        self.assertRegex(output, 'interface_mtu=1492')

    def test_dhcpcd_clientid_vendorclassid_userclass(self):
        """ verify dhcpcd sends custom clientid vendor class id and userclass """

        self.StartDnsMasq('dnsmasq-vendorclass.conf')
        time.sleep(1)

        self.StartCaptureBOOTPPackets()

        self.StartDhcpcd('dhcpcd-vendorclass.conf')
        time.sleep(5)
        self.StopCapturingPackets()

        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
        # Address prefix
        self.assertRegex(output, "192.168.111.*")

        self.findTextInDaemonLogs(DNSMASQ_LOG_FILE, vendor_class='vendor class: Zeus_dhcpcd_vendorclass_id',
                                  user_class='user class: AAAA BBBB CCCC DDDD',
                                  host_name='client provides name: Zeus')
        self.findTextInDaemonLogs(DHCPCD_LOG_FILE, client_id='using ClientID 00:11:11:12:12:13:13:14:14:15:15:16:16')
        self.FindProtocolFieldsinTCPDump(vendor='Vendor-Option Option 43, length 11: 104.101.108.108.111.32.119.111.114.108.100',
                                         user_class='instance#1:.*AAAA BBBB CCCC DDDD", length 19',
                                         vendor_class='Vendor-Class Option 60, length 26:.*Zeus_dhcpcd_vendorclass_id',
                                         host_name='Hostname.*Zeus')
        os.remove(DHCPCD_TCP_DUMP_FILE)

if __name__ == '__main__':
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
                                                     verbosity=3))