Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except IOError:
logger.critical("Cannot open evaluation JUnit XML output file '%s'", junitFileName)
sys.exit(1)
try:
inputJson = json.load(inputFile)
except ValueError as e:
logger.critical("Cannot decode json from test output file '%s', error '%s'", args.inputFileName, str(e))
sys.exit(1)
if not isinstance(inputJson, list):
logger.critical("Test output json is not array!")
sys.exit(1)
# run evaluation
xml = JUnitXml()
suite = TestSuite(os.path.splitext(os.path.basename(args.inputFileName))[0]) # once we support multiple test files then for each one should be test suite created
xml.add_testsuite(suite)
suite.timestamp = str(datetime.datetime.now()) # time of evaluation, not the testing it self (evaluations could differ)
#suite.hostname = ''
testCounter = 0
for test in inputJson:
case = TestCase()
suite.add_testcase(case)
if not isinstance(test, dict):
errorMessage = "Input test array element {:d} is not dictionary. Each test has to be dictionary, please see doc!".format(testCounter)
logger.error(errorMessage)
case.result = Error(errorMessage, 'ValueError')
continue
logger.info("Test number %d, name '%s'", testCounter, test.get('name', '-'))
logger.critical("Cannot open evaluation JUnit XML output file '%s'", junitFileName)
sys.exit(1)
try:
inputJson = json.load(inputFile)
except ValueError as e:
logger.critical("Cannot decode json from test output file '%s', error '%s'", args.inputFileName, str(e))
sys.exit(1)
if not isinstance(inputJson, list):
logger.critical("Test output json is not array!")
sys.exit(1)
# run evaluation
xml = JUnitXml()
suite = TestSuite(os.path.splitext(os.path.basename(args.inputFileName))[0]) # once we support multiple test files then for each one should be test suite created
xml.add_testsuite(suite)
suite.timestamp = str(datetime.datetime.now()) # time of evaluation, not the testing it self (evaluations could differ)
#suite.hostname = ''
testCounter = 0
for test in inputJson:
case = TestCase()
suite.add_testcase(case)
if not isinstance(test, dict):
errorMessage = "Input test array element {:d} is not dictionary. Each test has to be dictionary, please see doc!".format(testCounter)
logger.error(errorMessage)
case.result = Error(errorMessage, 'ValueError')
continue
logger.info("Test number %d, name '%s'", testCounter, test.get('name', '-'))
case.name = test.get('name', None)
def _update_junit_report(destination, **kwargs):
"""Updates the junit reports to contain the names of the current Azdo Job."""
destination = os.path.abspath(destination)
report_dir = os.path.join(destination, "reports")
report_name = os.getenv("AgentJobName", "")
for name in glob.glob(os.path.join(report_dir, "*.xml")):
xml = JUnitXml.fromfile(name)
xml.name = f"({report_name}): {xml.name}"
for suite in xml:
suite.classname = f"({report_name}): {suite.classname}"
xml.write()
def generate_junitxml_merged_report(test_results_dir):
"""
Merge all junitxml generated reports in a single one.
:param test_results_dir: output dir containing the junitxml reports to merge.
"""
merged_xml = None
for dir, _, files in os.walk(test_results_dir):
for file in files:
if file.endswith("results.xml"):
if not merged_xml:
merged_xml = JUnitXml.fromfile(os.path.join(dir, file))
else:
merged_xml += JUnitXml.fromfile(os.path.join(dir, file))
merged_xml.write("{0}/test_report.xml".format(test_results_dir), pretty=True)
def gen_results_summary(results_dir, output_fn=None, merge_fn=None,
verbose=False):
"""Scan a results directory and generate a summary file"""
reports = []
combined = JUnitXml()
nr_files = 0
out_f = sys.stdout
for filename in get_results(results_dir):
reports.append(JUnitXml.fromfile(filename))
if len(reports) == 0:
return 0
if output_fn is not None:
out_f = open(output_fn, "w")
props = copy.deepcopy(reports[0].child(Properties))
ltm = check_for_ltm(results_dir, props)
print_header(out_f, props)
sort_by = lambda ts: parse_timestamp(ts.timestamp)
if ltm:
sort_by = lambda ts: ts.hostname
def generate_junitxml_merged_report(test_results_dir):
"""
Merge all junitxml generated reports in a single one.
:param test_results_dir: output dir containing the junitxml reports to merge.
"""
merged_xml = None
for dir, _, files in os.walk(test_results_dir):
for file in files:
if file.endswith("results.xml"):
if not merged_xml:
merged_xml = JUnitXml.fromfile(os.path.join(dir, file))
else:
merged_xml += JUnitXml.fromfile(os.path.join(dir, file))
merged_xml.write("{0}/test_report.xml".format(test_results_dir), pretty=True)
tests = testsuite.tests
skipped = testsuite.skipped
failures = testsuite.failures
errors = testsuite.errors
out_f.write('%s: %d tests, ' % (cfg, tests))
if failures > 0:
out_f.write('%d failures, ' % failures)
if errors > 0:
out_f.write('%d errors, ' % errors)
if skipped > 0:
out_f.write('%d skipped, ' % skipped)
out_f.write('%d seconds\n' % runtime)
if verbose:
for test_case in testsuite:
status = 'Pass'
if isinstance(test_case.result, Failure):
status = 'Failed'
if isinstance(test_case.result, Skipped):
status = 'Skipped'
if isinstance(test_case.result, Error):
status = 'Error'
out_f.write(" %-12s %-8s %ds\n" %
(test_case.name, status, test_case.time))
else:
if failures > 0:
print_tests(out_f, testsuite, Failure, 'Failures')
if errors > 0:
print_tests(out_f, testsuite, Error, 'Errors')
except AttributeError:
pass
if not testOutputReturnedPath:
logger.debug("Returned output payload provided inside the test")
# evaluate test
if 'type' not in test or test['type'] == 'EXACT_MATCH':
testResultString = DeepDiff(testOutputExpectedJson, testOutputReturnedJson, ignore_order=True).json
testResultJson = json.loads(testResultString)
if testResultJson == {}:
test['result'] = 0
else:
test['result'] = 1
test['diff'] = testResultJson
case.result = Failure(json.dumps(testResultJson, sort_keys=True))
else:
errorMessage = "Unknown test type: {}".format(test['type'])
logger.error(errorMessage)
case.result = Error(errorMessage, 'ValueError')
testCounter += 1
# write outputs
if junitFileName:
xml.write(junitFileName, True)
outputFile.write(json.dumps(inputJson, indent=4, ensure_ascii=False) + '\n')
logger.info('FINISHING: '+ os.path.basename(__file__))
out_f.write('%d skipped, ' % skipped)
out_f.write('%d seconds\n' % runtime)
if verbose:
for test_case in testsuite:
status = 'Pass'
if isinstance(test_case.result, Failure):
status = 'Failed'
if isinstance(test_case.result, Skipped):
status = 'Skipped'
if isinstance(test_case.result, Error):
status = 'Error'
out_f.write(" %-12s %-8s %ds\n" %
(test_case.name, status, test_case.time))
else:
if failures > 0:
print_tests(out_f, testsuite, Failure, 'Failures')
if errors > 0:
print_tests(out_f, testsuite, Error, 'Errors')
def gen_results_summary(results_dir, output_fn=None, merge_fn=None,
verbose=False):
"""Scan a results directory and generate a summary file"""
reports = []
combined = JUnitXml()
nr_files = 0
out_f = sys.stdout
for filename in get_results(results_dir):
reports.append(JUnitXml.fromfile(filename))
if len(reports) == 0:
return 0
if output_fn is not None:
out_f = open(output_fn, "w")
props = copy.deepcopy(reports[0].child(Properties))
ltm = check_for_ltm(results_dir, props)