Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as XML and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
parser = etree.DefusedXMLParser(encoding=encoding)
try:
tree = etree.parse(stream, parser=parser, forbid_dtd=True)
except (etree.ParseError, ValueError) as exc:
raise ParseError(detail=str(exc))
data = self._xml_convert(tree.getroot())
return data
if not os.path.isfile(expected_file):
logging.debug(work.get_thread_msg() + "Did not find expected file " + expected_file)
if ALWAYS_GENERATE_EXPECTED:
# There is an actual but no expected, copy the actual to expected and return since there is nothing to compare against.
# This is off by default since it can make tests pass when they should really fail. Might be a good command line option though.
logging.debug(
work.get_thread_msg() + "Copying actual [{}] to expected [{}]".format(actual_file, expected_file))
try_move(actual_file, expected_file)
result.error_status = TestErrorOther()
return result
# Try other possible expected files. These are numbered like 'expected.setup.math.1.txt', 'expected.setup.math.2.txt' etc.
logging.debug(work.get_thread_msg() + " Comparing " + actual_file + " to " + expected_file)
expected_output = TestResult(test_config=test_config)
try:
expected_output.add_test_results(parse(expected_file).getroot(), '')
except ParseError as e:
logging.debug(
work.get_thread_msg() + "Exception parsing expected file: " + expected_file + " exception: " + str(e))
diff_counts, diff_string = result.diff_test_results(expected_output)
result.set_best_matching_expected_output(expected_output, expected_file, expected_file_version, diff_counts)
if result.all_passed():
logging.debug(work.get_thread_msg() + " Results match expected number: " + str(expected_file_version))
result.matched_expected_version = expected_file_version
try:
if not work.verbose:
if not hasattr(work, 'keep_actual_file'):
os.remove(actual_file)
os.remove(actual_diff_file)
except:
pass # Mysterious problem deleting the file. Don't worry about it. It won't impact final results.
def xmlescape(url):
'''Make sure a URL is valid XML'''
try:
ElementTree.fromstring('' % url)
except ElementTree.ParseError:
return cgi.escape(url)
else:
return url
try:
resp = await session.get(xml_location, timeout=5)
xml = await resp.text()
# Samsung Smart TV sometimes returns an empty document the
# first time. Retry once.
if not xml:
resp = await session.get(xml_location, timeout=5)
xml = await resp.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
_LOGGER.debug("Error fetching %s: %s", xml_location, err)
return {}
try:
tree = ElementTree.fromstring(xml)
except ElementTree.ParseError as err:
_LOGGER.debug("Error parsing %s: %s", xml_location, err)
return {}
return util.etree_to_dict(tree).get("root", {}).get("device", {})
evtx_file (pyevt.file): Windows XML EventLog (EVTX) file.
"""
# To handle errors when parsing a Windows XML EventLog (EVTX) file in the
# most granular way the following code iterates over every event record.
# The call to evt_file.get_record() and access to members of evt_record
# should be called within a try-except.
for record_index in range(evtx_file.number_of_records):
if parser_mediator.abort:
break
try:
evtx_record = evtx_file.get_record(record_index)
self._ParseRecord(parser_mediator, record_index, evtx_record)
except (IOError, ElementTree.ParseError) as exception:
parser_mediator.ProduceExtractionWarning(
'unable to parse event record: {0:d} with error: {1!s}'.format(
record_index, exception))
for record_index in range(evtx_file.number_of_recovered_records):
if parser_mediator.abort:
break
try:
evtx_record = evtx_file.get_recovered_record(record_index)
self._ParseRecord(
parser_mediator, record_index, evtx_record, recovered=True)
except IOError as exception:
parser_mediator.ProduceExtractionWarning((
'unable to parse recovered event record: {0:d} with error: '
def __init__(self, task_data=None):
try:
task_xml = ElementTree.fromstring(task_data, forbid_dtd=True)
except ElementTree.ParseError:
raise FormatError("Job file is not XML format")
task_xml = self._strip_namespace(task_xml)
# Load RegistrationInfo data
self.uri = self._get_element_data(task_xml, "RegistrationInfo/URI")
self.security_descriptor = self._get_element_data(task_xml, "RegistrationInfo/SecurityDescriptor")
self.source = self._get_element_data(task_xml, "RegistrationInfo/Source")
self.date = self._get_element_data(task_xml, "RegistrationInfo/Date")
self.author = self._get_element_data(task_xml, "RegistrationInfo/Author")
self.version = self._get_element_data(task_xml, "RegistrationInfo/Version")
self.description = self._get_element_data(task_xml, "RegistrationInfo/Description")
self.documentation = self._get_element_data(task_xml, "RegistrationInfo/Documentation")
# Load Principal data
self.principal_id = task_xml.find("Principals/Principal").get("id")
def checkPostData(postrequest):
"""check if postdata is XML"""
postdata = postrequest.decode('utf-8')
try:
return ETdefused.fromstring(postdata)
except ETdefused.ParseError:
app.logger.error('Invalid XML in post request')
return False
UpstreamChannel(
int(upstream.find("freq").text),
int(upstream.find("power").text),
upstream.find("srate").text,
upstream.find("usid").text,
upstream.find("mod").text,
upstream.find("ustype").text,
int(upstream.find("t1Timeouts").text),
int(upstream.find("t2Timeouts").text),
int(upstream.find("t3Timeouts").text),
int(upstream.find("t4Timeouts").text),
upstream.find("channeltype").text,
int(upstream.find("messageType").text),
)
)
except (element_tree.ParseError, TypeError):
_LOGGER.warning("Can't read upstream channels from %s", self.host)
self.token = None
raise exceptions.ConnectBoxNoDataAvailable() from None
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as XML and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
parser = etree.DefusedXMLParser(encoding=encoding)
try:
tree = etree.parse(stream, parser=parser, forbid_dtd=True)
except (etree.ParseError, ValueError) as exc:
raise ParseError(detail=str(exc))
data = self._xml_convert(tree.getroot())
return data
def _request_xml(sport):
"""
Request XML data from scorespro.com
:param sport: sport being played
:type sport: string
:return: XML data
:rtype: string
"""
url = 'http://www.scorespro.com/rss2/live-{}.xml'.format(sport)
try:
r = requests.get(url)
return _load_xml(r.content)
except ET.ParseError:
raise errors.SportError(sport)