ssahani / rpms / dhcpcd

Forked from rpms/dhcpcd a year ago
Clone

Blame tests/ipv4-tests/dhcpcd-tests.py

Susant Sahani 97d0b2
#!/usr/bin/env python3
Susant Sahani 97d0b2
# SPDX-License-Identifier: LGPL-2.1+
Susant Sahani 97d0b2
# ~~~
Susant Sahani 97d0b2
#   Description: Tests for dhcpcd - a DHCP client
Susant Sahani 97d0b2
#
Susant Sahani 97d0b2
#   Author: Susant Sahani <susant@redhat.com>
Susant Sahani 97d0b2
#   Copyright (c) 2018 Red Hat, Inc.
Susant Sahani 97d0b2
# ~~~
Susant Sahani 97d0b2
Susant Sahani 97d0b2
import errno
Susant Sahani 97d0b2
import os
Susant Sahani 97d0b2
import sys
Susant Sahani 97d0b2
import time
Susant Sahani 97d0b2
import unittest
Susant Sahani 97d0b2
import subprocess
Susant Sahani 97d0b2
import signal
Susant Sahani 97d0b2
import shutil
Susant Sahani 97d0b2
import psutil
Susant Sahani 97d0b2
import socket
Susant Sahani 97d0b2
from pyroute2 import IPRoute
Susant Sahani 97d0b2
Susant Sahani 97d0b2
DHCPCD_CI_DIR="/var/run/dhcpcd-ci"
Susant Sahani 97d0b2
DHCPCD_LOG_FILE='/var/run/dhcpcd-ci/dhcpcd-test-log'
Susant Sahani 97d0b2
DHCPCD_PID_FILE='/var/run/dhcpcd.pid'
Susant Sahani 97d0b2
Susant Sahani 97d0b2
DHCPCD_TCP_DUMP_FILE='/tmp/dhcpcd-tcp-dump.pcap'
Susant Sahani 97d0b2
Susant Sahani 97d0b2
DNSMASQ_PID_FILE='/var/run/dhcpcd-ci/test-dnsmasq.pid'
Susant Sahani 97d0b2
DNSMASQ_LOG_FILE='/var/run/dhcpcd-ci/dnsmasq-log-file'
Susant Sahani 97d0b2
Susant Sahani 97d0b2
def setUpModule():
Susant Sahani 97d0b2
    """Initialize the environment, and perform sanity checks on it."""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    if shutil.which('dhcpcd') is None:
Susant Sahani 97d0b2
        raise OSError(errno.ENOENT, 'dhcpcd not found')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    if shutil.which('dnsmasq') is None:
Susant Sahani 97d0b2
        raise OSError(errno.ENOENT, 'dnsmasq not found')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
Susant Sahani 97d0b2
def tearDownModule():
Susant Sahani 97d0b2
    pass
Susant Sahani 97d0b2
Susant Sahani 97d0b2
class GenericUtilities():
Susant Sahani 97d0b2
    """Provide a set of utility functions start stop daemons. write config files etc """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def StartDnsMasq(self, conf):
Susant Sahani 97d0b2
        """Start DnsMasq"""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        conf_file=os.path.join(DHCPCD_CI_DIR, conf)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        subprocess.check_output(['dnsmasq', '-8', DNSMASQ_LOG_FILE, '--log-dhcp', '--pid-file=/var/run/dhcpcd-ci/test-dnsmasq.pid',
Susant Sahani 97d0b2
                                 '-C', conf_file, '-i', 'veth-peer', '-R'])
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def StartDhcpcd(self, conf):
Susant Sahani 97d0b2
        """ Start dnsmaq """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        conf_file=os.path.join(DHCPCD_CI_DIR, conf)
Susant Sahani 97d0b2
        subprocess.check_output(['dhcpcd', '-4', '-M', '-d', '--logfile', DHCPCD_LOG_FILE, '-f', conf_file, 'veth-test'])
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def StopDaemon(self, pid_file):
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        with open(pid_file, 'r') as f:
Susant Sahani 97d0b2
            pid = f.read().rstrip(' \t\r\n\0')
Susant Sahani 97d0b2
        os.kill(int(pid), signal.SIGTERM)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        os.remove(pid_file)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def findTextInDaemonLogs(self, log_file, **kwargs):
Susant Sahani 97d0b2
        """dnsmasq server logs."""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        if kwargs is not None:
Susant Sahani 97d0b2
            with open (log_file, 'rt') as in_file:
Susant Sahani 97d0b2
                contents = in_file.read()
Susant Sahani 97d0b2
                for key in kwargs:
Susant Sahani 97d0b2
                    self.assertRegex(contents, kwargs[key])
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def FindProtocolFieldsinTCPDump(self, **kwargs):
Susant Sahani 97d0b2
        """Look attributes in tcpdump."""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        contents = subprocess.check_output(['tcpdump', '-vv', '-r', DHCPCD_TCP_DUMP_FILE]).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        if kwargs is not None:
Susant Sahani 97d0b2
                for key in kwargs:
Susant Sahani 97d0b2
                    self.assertRegex(contents, kwargs[key])
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def SetupVethInterface(self):
Susant Sahani 97d0b2
        """Setup veth interface"""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        ip = IPRoute()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
Susant Sahani 97d0b2
        idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
Susant Sahani 97d0b2
        idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08')
Susant Sahani 97d0b2
        ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09')
Susant Sahani 97d0b2
        ip.link('set', index=idx_veth_test, state='up')
Susant Sahani 97d0b2
        ip.link('set', index=idx_veth_peer, state='up')
Susant Sahani 97d0b2
        ip.addr('add', index=idx_veth_peer, address='192.168.111.50')
Susant Sahani 97d0b2
        ip.close()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def TearDownVethInterface(self):
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        ip = IPRoute()
Susant Sahani 97d0b2
        ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
Susant Sahani 97d0b2
        ip.close()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def StartCaptureBOOTPPackets(self):
Susant Sahani 97d0b2
        """Start tcpdump to capture dhcp packets"""
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        subprocess.check_output(['systemctl','restart', 'tcpdumpd.service'])
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def StopCapturingPackets(self):
Susant Sahani 97d0b2
        subprocess.check_output(['systemctl', 'stop', 'tcpdumpd.service'])
Susant Sahani 97d0b2
        time.sleep(3);
Susant Sahani 97d0b2
Susant Sahani 97d0b2
class DhcpcdTests(unittest.TestCase, GenericUtilities):
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def setUp(self):
Susant Sahani 97d0b2
        """ setup veth and write radvd and dhcpv6configs """
Susant Sahani 97d0b2
        self.SetupVethInterface()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def tearDown(self):
Susant Sahani 97d0b2
        self.StopDaemon(DHCPCD_PID_FILE)
Susant Sahani 97d0b2
        self.StopDaemon(DNSMASQ_PID_FILE)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.TearDownVethInterface()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def test_dhcpcd_ipv4(self):
Susant Sahani 97d0b2
        """ dhcpcd gets address """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDnsMasq('dnsmasq-ipv4.conf')
Susant Sahani 97d0b2
        time.sleep(1)
Susant Sahani 97d0b2
        self.StartDhcpcd('dhcpcd-domain-dns.conf')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        time.sleep(5)
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Address prefix
Susant Sahani 97d0b2
        self.assertRegex(output, "192.168.111.*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Default route
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        self.assertRegex(output, "default via 192.168.1.1*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def test_dhcpcd_dns_domain(self):
Susant Sahani 97d0b2
        """ dhcpcd request DNS and domain name """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDnsMasq('dnsmasq-ipv4.conf')
Susant Sahani 97d0b2
        time.sleep(1)
Susant Sahani 97d0b2
        self.StartDhcpcd('dhcpcd-domain-dns.conf')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Address prefix
Susant Sahani 97d0b2
        self.assertRegex(output, "192.168.111.*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Default route
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        self.assertRegex(output, "default via 192.168.1.1*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Dump the lease file
Susant Sahani 97d0b2
        output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        self.assertRegex(output, 'domain_name=example-test.com')
Susant Sahani 97d0b2
        self.assertRegex(output, 'domain_name_servers=\'8.8.8.8 8.8.4.4\'')
Susant Sahani 97d0b2
        self.assertRegex(output, 'routers=192.168.1.1')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def test_dhcpcd_mtu(self):
Susant Sahani 97d0b2
        """ dhcpcd gets MTU 1492 """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDnsMasq('dnsmasq-mtu.conf')
Susant Sahani 97d0b2
        time.sleep(1)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDhcpcd('dhcpcd-mtu.conf')
Susant Sahani 97d0b2
        time.sleep(5)
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Address prefix
Susant Sahani 97d0b2
        self.assertRegex(output, "192.168.111.*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        # Dump the lease file
Susant Sahani 97d0b2
        output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        self.assertRegex(output, 'interface_mtu=1492')
Susant Sahani 97d0b2
Susant Sahani 97d0b2
    def test_dhcpcd_clientid_vendorclassid_userclass(self):
Susant Sahani 97d0b2
        """ verify dhcpcd sends custom clientid vendor class id and userclass """
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDnsMasq('dnsmasq-vendorclass.conf')
Susant Sahani 97d0b2
        time.sleep(1)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartCaptureBOOTPPackets()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.StartDhcpcd('dhcpcd-vendorclass.conf')
Susant Sahani 97d0b2
        time.sleep(5)
Susant Sahani 97d0b2
        self.StopCapturingPackets()
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
Susant Sahani 97d0b2
        # Address prefix
Susant Sahani 97d0b2
        self.assertRegex(output, "192.168.111.*")
Susant Sahani 97d0b2
Susant Sahani 97d0b2
        self.findTextInDaemonLogs(DNSMASQ_LOG_FILE, vendor_class='vendor class: Zeus_dhcpcd_vendorclass_id',
Susant Sahani 97d0b2
                                  user_class='user class: AAAA BBBB CCCC DDDD',
Susant Sahani 97d0b2
                                  host_name='client provides name: Zeus')
Susant Sahani 97d0b2
        self.findTextInDaemonLogs(DHCPCD_LOG_FILE, client_id='using ClientID 00:11:11:12:12:13:13:14:14:15:15:16:16')
Susant Sahani 97d0b2
        self.FindProtocolFieldsinTCPDump(vendor='Vendor-Option Option 43, length 11: 104.101.108.108.111.32.119.111.114.108.100',
Susant Sahani 97d0b2
                                         user_class='instance#1:.*AAAA BBBB CCCC DDDD", length 19',
Susant Sahani 97d0b2
                                         vendor_class='Vendor-Class Option 60, length 26:.*Zeus_dhcpcd_vendorclass_id',
Susant Sahani 97d0b2
                                         host_name='Hostname.*Zeus')
Susant Sahani 97d0b2
        os.remove(DHCPCD_TCP_DUMP_FILE)
Susant Sahani 97d0b2
Susant Sahani 97d0b2
if __name__ == '__main__':
Susant Sahani 97d0b2
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
Susant Sahani 97d0b2
                                                     verbosity=3))