ssahani / rpms / dhcpcd

Forked from rpms/dhcpcd 5 years ago
Clone
Blob Blame History Raw
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1+
# ~~~
#   Description: DHCPv6 Tests for dhcpcd - a DHCP client
#
#   Author: Susant Sahani <susant@redhat.com>
#   Copyright (c) 2018 Red Hat, Inc.
# ~~~

import errno
import os
import sys
import time
import unittest
import subprocess
import signal
import shutil
import psutil
import socket
from pyroute2 import IPRoute

DHCPCD_CI_DIR="/var/run/dhcpcd-ci"
DHCPCD_LOG_FILE='/var/run/dhcpcd-ci/dhcpcd-test-log'
DHCPCD_CONF_FILE='/var/run/dhcpcd-ci/dhcpcd-test.conf'

DHCPCD_PID_FILE='/var/run/dhcpcd.pid'
DHCPCD_DUID_FILE='/etc/dhcpcd.duid'

RADVD_LOG_FILE ='/var/run/dhcpcd-ci/radvd.log'
RADVD_CONFIG_FILE ='/var/run/dhcpcd-ci/radvd-ci.conf'
RADVD_PID_FILE='/var/run/dhcpcd-ci/radvd.pid'

DHCP6S_CONFIG_FILE='/var/run/dhcpcd-ci/dhcp6s.conf'
DHCP6S_PID_FILE='/var/run/dhcp6s.pid'

RESOLVE_CONF='/etc/resolv.conf'

def setUpModule():
    """Initialize the environment, and perform sanity checks on it."""

    if shutil.which('dhcpcd') is None:
        raise OSError(errno.ENOENT, 'dhcpcd not found')
    if shutil.which('dhcp6s') is None:
        raise OSError(errno.ENOENT, 'dhcdp6s not found')
    if shutil.which('radvd') is None:
        raise OSError(errno.ENOENT, 'radvd not found')

def tearDownModule():
    pass

class GenericUtilities():
    """Provide a set of utility functions start stop daemons. write config files etc """

    def StartDhcp6s(self):
        """Start dhcp6s"""

        subprocess.check_output(['dhcp6s', '-c', DHCP6S_CONFIG_FILE, 'veth-peer', '-dD'])

    def StartRadvd(self):
        """Start radvd"""

        subprocess.check_output(['radvd', '-d5', '-C', RADVD_CONFIG_FILE, '-p', RADVD_PID_FILE, '-l', RADVD_LOG_FILE])

    def StartDhcpcd(self):
        subprocess.check_output(['dhcpcd', '-6', '-M', '-d', '--logfile', DHCPCD_LOG_FILE, '-f', DHCPCD_CONF_FILE, 'veth-test'])

    def StopDaemon(self, pid_file):

        with open(pid_file, 'r') as f:
            pid = f.read().rstrip(' \t\r\n\0')
            os.kill(int(pid), signal.SIGTERM)

        os.remove(pid_file)

    def WriteConfigFile(self, path, contents):
        """Write a config file, and queue it to be removed."""

        with open(path, 'w') as unit:
            unit.write(contents)

        self.addCleanup(os.remove, path)

    def findTextInDaemonLogs(self, log_file, **kwargs):
        """dnsmasq server logs."""

        if kwargs is not None:
            with open (log_file, 'rt') as in_file:
                contents = in_file.read()
                for key in kwargs:
                    self.assertRegex(contents, kwargs[key])

    def SetupVethInterface(self):
        """Setup veth interface"""

        ip = IPRoute()

        ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
        idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
        idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]

        ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08')
        ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09')
        ip.link('set', index=idx_veth_test, state='up')
        ip.link('set', index=idx_veth_peer, state='up')
        ip.addr('add', index=idx_veth_peer, address='192.168.111.50')

        ip.close()

    def TearDownVethInterface(self):

        ip = IPRoute()
        ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
        ip.close()

class DhcpcdTests(unittest.TestCase, GenericUtilities):

    def setUp(self):
        """ setup veth and write radvd and dhcpv6configs """
        self.SetupVethInterface()

    def tearDown(self):
        self.StopDaemon(DHCPCD_PID_FILE)
        self.StopDaemon(RADVD_PID_FILE)
        self.StopDaemon(DHCP6S_PID_FILE)

        self.TearDownVethInterface()

    def test_dhcp6s_gets_rdnss_dnssl(self):
        """ DHCP6c gets the RDNSS DNSSL """

        self.StartDhcp6s()
        self.StartRadvd()

        time.sleep(1)

        self.WriteConfigFile(DHCPCD_DUID_FILE, '''00:01:00:01:22:8a:88:26:08:00:27:87:00:7e\n''')

        self.StartDhcpcd()

        time.sleep(10)
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address prefix
        self.assertRegex(output, "2001:888:db8:1::b")
        self.findTextInDaemonLogs(RESOLVE_CONF,
                                  dns1='2001:888:db8:1::a',
                                  dns2='2001:888:db8:1::d',
                                  search='test.com')

    def test_dhcp6s_assigns_static_address_using_duid2(self):
        """ DHCP6c gets the (static) addresses to hosts using known DUID value 00:01:00:01:22:8d:cb:58:0a:00:27:00:00:00 """

        self.StartDhcp6s()
        self.StartRadvd()

        time.sleep(1)

        self.WriteConfigFile(DHCPCD_DUID_FILE, '''00:01:00:01:22:8d:cb:58:0a:00:27:00:00:00\n''')

        self.StartDhcpcd()
        time.sleep(10)
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address
        self.assertRegex(output, "2001:888:db8:1::c")
        self.findTextInDaemonLogs(RESOLVE_CONF,
                                  dns='2001:888:db8:1::a',
                                  search='test.com')

    def test_dhcp6s_assigns_static_address_using_duid1(self):
        """ DHCP6c gets the (static) addresses to hosts using known DUID value 00:01:00:01:22:8a:88:26:08:00:27:87:00:7e """

        self.StartDhcp6s()
        self.StartRadvd()

        time.sleep(1)

        self.WriteConfigFile(DHCPCD_DUID_FILE, '''00:01:00:01:22:8a:88:26:08:00:27:87:00:7e\n''')

        self.StartDhcpcd()
        time.sleep(10)
        output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')

        # Address prefix
        self.assertRegex(output, "2001:888:db8:1::b")
        self.findTextInDaemonLogs(RESOLVE_CONF,
                                  dns='2001:888:db8:1::a',
                                  search='test.com')

if __name__ == '__main__':
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
                                                     verbosity=3))