diff options
Diffstat (limited to 'tools/testing/selftests/tc-testing/tdc.py')
-rwxr-xr-x | tools/testing/selftests/tc-testing/tdc.py | 575 |
1 files changed, 399 insertions, 176 deletions
diff --git a/tools/testing/selftests/tc-testing/tdc.py b/tools/testing/selftests/tc-testing/tdc.py index fc373fdf2bdc..44de4a272a11 100755 --- a/tools/testing/selftests/tc-testing/tdc.py +++ b/tools/testing/selftests/tc-testing/tdc.py @@ -11,16 +11,96 @@ import re import os import sys import argparse +import importlib import json import subprocess +import time +import traceback from collections import OrderedDict from string import Template from tdc_config import * from tdc_helper import * - -USE_NS = True +import TdcPlugin + + +class PluginMgrTestFail(Exception): + def __init__(self, stage, output, message): + self.stage = stage + self.output = output + self.message = message + +class PluginMgr: + def __init__(self, argparser): + super().__init__() + self.plugins = {} + self.plugin_instances = [] + self.args = [] + self.argparser = argparser + + # TODO, put plugins in order + plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') + for dirpath, dirnames, filenames in os.walk(plugindir): + for fn in filenames: + if (fn.endswith('.py') and + not fn == '__init__.py' and + not fn.startswith('#') and + not fn.startswith('.#')): + mn = fn[0:-3] + foo = importlib.import_module('plugins.' + mn) + self.plugins[mn] = foo + self.plugin_instances.append(foo.SubPlugin()) + + def call_pre_suite(self, testcount, testidlist): + for pgn_inst in self.plugin_instances: + pgn_inst.pre_suite(testcount, testidlist) + + def call_post_suite(self, index): + for pgn_inst in reversed(self.plugin_instances): + pgn_inst.post_suite(index) + + def call_pre_case(self, test_ordinal, testid): + for pgn_inst in self.plugin_instances: + try: + pgn_inst.pre_case(test_ordinal, testid) + except Exception as ee: + print('exception {} in call to pre_case for {} plugin'. + format(ee, pgn_inst.__class__)) + print('test_ordinal is {}'.format(test_ordinal)) + print('testid is {}'.format(testid)) + raise + + def call_post_case(self): + for pgn_inst in reversed(self.plugin_instances): + pgn_inst.post_case() + + def call_pre_execute(self): + for pgn_inst in self.plugin_instances: + pgn_inst.pre_execute() + + def call_post_execute(self): + for pgn_inst in reversed(self.plugin_instances): + pgn_inst.post_execute() + + def call_add_args(self, parser): + for pgn_inst in self.plugin_instances: + parser = pgn_inst.add_args(parser) + return parser + + def call_check_args(self, args, remaining): + for pgn_inst in self.plugin_instances: + pgn_inst.check_args(args, remaining) + + def call_adjust_command(self, stage, command): + for pgn_inst in self.plugin_instances: + command = pgn_inst.adjust_command(stage, command) + return command + + @staticmethod + def _make_argparser(args): + self.argparser = argparse.ArgumentParser( + description='Linux TC unit tests') def replace_keywords(cmd): @@ -33,21 +113,24 @@ def replace_keywords(cmd): return subcmd -def exec_cmd(command, nsonly=True): +def exec_cmd(args, pm, stage, command): """ Perform any required modifications on an executable command, then run it in a subprocess and return the results. """ - if (USE_NS and nsonly): - command = 'ip netns exec $NS ' + command - + if len(command.strip()) == 0: + return None, None if '$' in command: command = replace_keywords(command) + command = pm.call_adjust_command(stage, command) + if args.verbose > 0: + print('command "{}"'.format(command)) proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + env=ENVIR) (rawout, serr) = proc.communicate() if proc.returncode != 0 and len(serr) > 0: @@ -60,36 +143,99 @@ def exec_cmd(command, nsonly=True): return proc, foutput -def prepare_env(cmdlist): +def prepare_env(args, pm, stage, prefix, cmdlist, output = None): """ - Execute the setup/teardown commands for a test case. Optionally - terminate test execution if the command fails. + Execute the setup/teardown commands for a test case. + Optionally terminate test execution if the command fails. """ + if args.verbose > 0: + print('{}'.format(prefix)) for cmdinfo in cmdlist: - if (type(cmdinfo) == list): + if isinstance(cmdinfo, list): exit_codes = cmdinfo[1:] cmd = cmdinfo[0] else: exit_codes = [0] cmd = cmdinfo - if (len(cmd) == 0): + if not cmd: continue - (proc, foutput) = exec_cmd(cmd) + (proc, foutput) = exec_cmd(args, pm, stage, cmd) + + if proc and (proc.returncode not in exit_codes): + print('', file=sys.stderr) + print("{} *** Could not execute: \"{}\"".format(prefix, cmd), + file=sys.stderr) + print("\n{} *** Error message: \"{}\"".format(prefix, foutput), + file=sys.stderr) + print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) + print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) + print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) + raise PluginMgrTestFail( + stage, output, + '"{}" did not complete successfully'.format(prefix)) + +def run_one_test(pm, args, index, tidx): + global NAMES + result = True + tresult = "" + tap = "" + if args.verbose > 0: + print("\t====================\n=====> ", end="") + print("Test " + tidx["id"] + ": " + tidx["name"]) + + # populate NAMES with TESTID for this test + NAMES['TESTID'] = tidx['id'] + + pm.call_pre_case(index, tidx['id']) + prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) + + if (args.verbose > 0): + print('-----> execute stage') + pm.call_pre_execute() + (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) + exit_code = p.returncode + pm.call_post_execute() + + if (exit_code != int(tidx["expExitCode"])): + result = False + print("exit:", exit_code, int(tidx["expExitCode"])) + print(procout) + else: + if args.verbose > 0: + print('-----> verify stage') + match_pattern = re.compile( + str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) + (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) + if procout: + match_index = re.findall(match_pattern, procout) + if len(match_index) != int(tidx["matchCount"]): + result = False + elif int(tidx["matchCount"]) != 0: + result = False + + if not result: + tresult += 'not ' + tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) + tap += tresult - if proc.returncode not in exit_codes: - print - print("Could not execute:") - print(cmd) - print("\nError message:") - print(foutput) - print("\nAborting test run.") - ns_destroy() - exit(1) + if result == False: + if procout: + tap += procout + else: + tap += 'No output!\n' + + prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) + pm.call_post_case() + index += 1 + + # remove TESTID from NAMES + del(NAMES['TESTID']) + return tap -def test_runner(filtered_tests, args): +def test_runner(pm, args, filtered_tests): """ Driver function for the unit tests. @@ -101,75 +247,92 @@ def test_runner(filtered_tests, args): testlist = filtered_tests tcount = len(testlist) index = 1 - tap = str(index) + ".." + str(tcount) + "\n" - + tap = '' + badtest = None + stage = None + emergency_exit = False + emergency_exit_message = '' + + if args.notap: + if args.verbose: + tap = 'notap requested: omitting test plan\n' + else: + tap = str(index) + ".." + str(tcount) + "\n" + try: + pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) + except Exception as ee: + ex_type, ex, ex_tb = sys.exc_info() + print('Exception {} {} (caught in pre_suite).'. + format(ex_type, ex)) + # when the extra print statements are uncommented, + # the traceback does not appear between them + # (it appears way earlier in the tdc.py output) + # so don't bother ... + # print('--------------------(') + # print('traceback') + traceback.print_tb(ex_tb) + # print('--------------------)') + emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) + emergency_exit = True + stage = 'pre-SUITE' + + if emergency_exit: + pm.call_post_suite(index) + return emergency_exit_message + if args.verbose > 1: + print('give test rig 2 seconds to stabilize') + time.sleep(2) for tidx in testlist: - result = True - tresult = "" if "flower" in tidx["category"] and args.device == None: + if args.verbose > 1: + print('Not executing test {} {} because DEV2 not defined'. + format(tidx['id'], tidx['name'])) continue - print("Test " + tidx["id"] + ": " + tidx["name"]) - prepare_env(tidx["setup"]) - (p, procout) = exec_cmd(tidx["cmdUnderTest"]) - exit_code = p.returncode - - if (exit_code != int(tidx["expExitCode"])): - result = False - print("exit:", exit_code, int(tidx["expExitCode"])) - print(procout) - else: - match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) - (p, procout) = exec_cmd(tidx["verifyCmd"]) - match_index = re.findall(match_pattern, procout) - if len(match_index) != int(tidx["matchCount"]): - result = False - - if result == True: - tresult += "ok " - else: - tresult += "not ok " - tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" - - if result == False: - tap += procout - - prepare_env(tidx["teardown"]) + try: + badtest = tidx # in case it goes bad + tap += run_one_test(pm, args, index, tidx) + except PluginMgrTestFail as pmtf: + ex_type, ex, ex_tb = sys.exc_info() + stage = pmtf.stage + message = pmtf.message + output = pmtf.output + print(message) + print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. + format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) + print('---------------') + print('traceback') + traceback.print_tb(ex_tb) + print('---------------') + if stage == 'teardown': + print('accumulated output for this test:') + if pmtf.output: + print(pmtf.output) + print('---------------') + break index += 1 - return tap - + # if we failed in setup or teardown, + # fill in the remaining tests with ok-skipped + count = index + if not args.notap: + tap += 'about to flush the tap output if tests need to be skipped\n' + if tcount + 1 != index: + for tidx in testlist[index - 1:]: + msg = 'skipped - previous {} failed'.format(stage) + tap += 'ok {} - {} # {} {} {}\n'.format( + count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) + count += 1 -def ns_create(): - """ - Create the network namespace in which the tests will be run and set up - the required network devices for it. - """ - if (USE_NS): - cmd = 'ip netns add $NS' - exec_cmd(cmd, False) - cmd = 'ip link add $DEV0 type veth peer name $DEV1' - exec_cmd(cmd, False) - cmd = 'ip link set $DEV1 netns $NS' - exec_cmd(cmd, False) - cmd = 'ip link set $DEV0 up' - exec_cmd(cmd, False) - cmd = 'ip -n $NS link set $DEV1 up' - exec_cmd(cmd, False) - cmd = 'ip link set $DEV2 netns $NS' - exec_cmd(cmd, False) - cmd = 'ip -n $NS link set $DEV2 up' - exec_cmd(cmd, False) + tap += 'done flushing skipped test tap output\n' + if args.pause: + print('Want to pause\nPress enter to continue ...') + if input(sys.stdin): + print('got something on stdin') -def ns_destroy(): - """ - Destroy the network namespace for testing (and any associated network - devices as well) - """ - if (USE_NS): - cmd = 'ip netns delete $NS' - exec_cmd(cmd, False) + pm.call_post_suite(index) + return tap def has_blank_ids(idlist): """ @@ -209,41 +372,70 @@ def set_args(parser): """ Set the command line arguments for tdc. """ - parser.add_argument('-p', '--path', type=str, - help='The full path to the tc executable to use') - parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', - help='Run tests only from the specified category, or if no category is specified, list known categories.') - parser.add_argument('-f', '--file', type=str, - help='Run tests from the specified file') - parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY', - help='List all test cases, or those only within the specified category') - parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', - help='Display the test case with specified id') - parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', - help='Execute the single test case with specified ID') - parser.add_argument('-i', '--id', action='store_true', dest='gen_id', - help='Generate ID numbers for new test cases') + parser.add_argument( + '-p', '--path', type=str, + help='The full path to the tc executable to use') + sg = parser.add_argument_group( + 'selection', 'select which test cases: ' + + 'files plus directories; filtered by categories plus testids') + ag = parser.add_argument_group( + 'action', 'select action to perform on selected test cases') + + sg.add_argument( + '-D', '--directory', nargs='+', metavar='DIR', + help='Collect tests from the specified directory(ies) ' + + '(default [tc-tests])') + sg.add_argument( + '-f', '--file', nargs='+', metavar='FILE', + help='Run tests from the specified file(s)') + sg.add_argument( + '-c', '--category', nargs='*', metavar='CATG', default=['+c'], + help='Run tests only from the specified category/ies, ' + + 'or if no category/ies is/are specified, list known categories.') + sg.add_argument( + '-e', '--execute', nargs='+', metavar='ID', + help='Execute the specified test cases with specified IDs') + ag.add_argument( + '-l', '--list', action='store_true', + help='List all test cases, or those only within the specified category') + ag.add_argument( + '-s', '--show', action='store_true', dest='showID', + help='Display the selected test cases') + ag.add_argument( + '-i', '--id', action='store_true', dest='gen_id', + help='Generate ID numbers for new test cases') + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help='Show the commands that are being run') + parser.add_argument( + '-N', '--notap', action='store_true', + help='Suppress tap results for command under test') parser.add_argument('-d', '--device', help='Execute the test case in flower category') + parser.add_argument( + '-P', '--pause', action='store_true', + help='Pause execution just before post-suite stage') return parser -def check_default_settings(args): +def check_default_settings(args, remaining, pm): """ - Process any arguments overriding the default settings, and ensure the - settings are correct. + Process any arguments overriding the default settings, + and ensure the settings are correct. """ # Allow for overriding specific settings global NAMES if args.path != None: - NAMES['TC'] = args.path + NAMES['TC'] = args.path if args.device != None: - NAMES['DEV2'] = args.device + NAMES['DEV2'] = args.device if not os.path.isfile(NAMES['TC']): print("The specified tc path " + NAMES['TC'] + " does not exist.") exit(1) + pm.call_check_args(args, remaining) + def get_id_list(alltests): """ @@ -277,7 +469,7 @@ def generate_case_ids(alltests): for c in alltests: if (c["id"] == ""): while True: - newid = str('%04x' % random.randrange(16**4)) + newid = str('{:04x}'.format(random.randrange(16**4))) if (does_id_exist(alltests, newid)): continue else: @@ -300,40 +492,107 @@ def generate_case_ids(alltests): json.dump(testlist, outfile, indent=4) outfile.close() +def filter_tests_by_id(args, testlist): + ''' + Remove tests from testlist that are not in the named id list. + If id list is empty, return empty list. + ''' + newlist = list() + if testlist and args.execute: + target_ids = args.execute + + if isinstance(target_ids, list) and (len(target_ids) > 0): + newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) + return newlist + +def filter_tests_by_category(args, testlist): + ''' + Remove tests from testlist that are not in a named category. + ''' + answer = list() + if args.category and testlist: + test_ids = list() + for catg in set(args.category): + if catg == '+c': + continue + print('considering category {}'.format(catg)) + for tc in testlist: + if catg in tc['category'] and tc['id'] not in test_ids: + answer.append(tc) + test_ids.append(tc['id']) + + return answer def get_test_cases(args): """ If a test case file is specified, retrieve tests from that file. Otherwise, glob for all json files in subdirectories and load from each one. + Also, if requested, filter by category, and add tests matching + certain ids. """ import fnmatch - if args.file != None: - if not os.path.isfile(args.file): - print("The specified test case file " + args.file + " does not exist.") - exit(1) - flist = [args.file] - else: - flist = [] - for root, dirnames, filenames in os.walk('tc-tests'): + + flist = [] + testdirs = ['tc-tests'] + + if args.file: + # at least one file was specified - remove the default directory + testdirs = [] + + for ff in args.file: + if not os.path.isfile(ff): + print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") + else: + flist.append(os.path.abspath(ff)) + + if args.directory: + testdirs = args.directory + + for testdir in testdirs: + for root, dirnames, filenames in os.walk(testdir): for filename in fnmatch.filter(filenames, '*.json'): - flist.append(os.path.join(root, filename)) - alltests = list() + candidate = os.path.abspath(os.path.join(root, filename)) + if candidate not in testdirs: + flist.append(candidate) + + alltestcases = list() for casefile in flist: - alltests = alltests + (load_from_file(casefile)) - return alltests + alltestcases = alltestcases + (load_from_file(casefile)) + + allcatlist = get_test_categories(alltestcases) + allidlist = get_id_list(alltestcases) + testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) + idtestcases = filter_tests_by_id(args, alltestcases) + cattestcases = filter_tests_by_category(args, alltestcases) -def set_operation_mode(args): + cat_ids = [x['id'] for x in cattestcases] + if args.execute: + if args.category: + alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] + else: + alltestcases = idtestcases + else: + if cat_ids: + alltestcases = cattestcases + else: + # just accept the existing value of alltestcases, + # which has been filtered by file/directory + pass + + return allcatlist, allidlist, testcases_by_cats, alltestcases + + +def set_operation_mode(pm, args): """ Load the test case data and process remaining arguments to determine what the script should do for this run, and call the appropriate function. """ - alltests = get_test_cases(args) + ucat, idlist, testcases, alltests = get_test_cases(args) if args.gen_id: - idlist = get_id_list(alltests) if (has_blank_ids(idlist)): alltests = generate_case_ids(alltests) else: @@ -347,70 +606,29 @@ def set_operation_mode(args): print("Please correct them before continuing.") exit(1) - ucat = get_test_categories(alltests) - if args.showID: - show_test_case_by_id(alltests, args.showID[0]) + for atest in alltests: + print_test_case(atest) exit(0) - if args.execute: - target_id = args.execute[0] - else: - target_id = "" - - if args.category: - if (args.category == '+c'): - print("Available categories:") - print_sll(ucat) - exit(0) - else: - target_category = args.category - else: - target_category = "" - - - testcases = get_categorized_testlist(alltests, ucat) + if isinstance(args.category, list) and (len(args.category) == 0): + print("Available categories:") + print_sll(ucat) + exit(0) if args.list: - if (args.list == "++"): + if args.list: list_test_cases(alltests) exit(0) - elif(len(args.list) > 0): - if (args.list not in ucat): - print("Unknown category " + args.list) - print("Available categories:") - print_sll(ucat) - exit(1) - list_test_cases(testcases[args.list]) - exit(0) - - if (os.geteuid() != 0): - print("This script must be run with root privileges.\n") - exit(1) - - ns_create() - - if (len(target_category) == 0): - if (len(target_id) > 0): - alltests = list(filter(lambda x: target_id in x['id'], alltests)) - if (len(alltests) == 0): - print("Cannot find a test case with ID matching " + target_id) - exit(1) - catresults = test_runner(alltests, args) - print("All test results: " + "\n\n" + catresults) - elif (len(target_category) > 0): - if (target_category == "flower") and args.device == None: - print("Please specify a NIC device (-d) to run category flower") - exit(1) - if (target_category not in ucat): - print("Specified category is not present in this file.") - exit(1) - else: - catresults = test_runner(testcases[target_category], args) - print("Category " + target_category + "\n\n" + catresults) - - ns_destroy() + if len(alltests): + catresults = test_runner(pm, args, alltests) + else: + catresults = 'No tests found\n' + if args.notap: + print('Tap output suppression requested\n') + else: + print('All test results: \n\n{}'.format(catresults)) def main(): """ @@ -419,10 +637,15 @@ def main(): """ parser = args_parse() parser = set_args(parser) + pm = PluginMgr(parser) + parser = pm.call_add_args(parser) (args, remaining) = parser.parse_known_args() - check_default_settings(args) + args.NAMES = NAMES + check_default_settings(args, remaining, pm) + if args.verbose > 2: + print('args is {}'.format(args)) - set_operation_mode(args) + set_operation_mode(pm, args) exit(0) |