Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_streaming_interrupt(self):
cb = lambda path, item: False
self.assertRaises(ParsingInterrupted,
parse, '<a>x</a>',
item_depth=1, item_callback=cb)
def _handle_result(self, res):
res.encoding = 'utf-8'
xml = res.text
try:
data = xmltodict.parse(xml)['xml']
except (xmltodict.ParsingInterrupted, ExpatError):
# 解析 XML 失败
return xml
return_code = data['return_code']
return_msg = data.get('return_msg')
result_code = data.get('result_code')
errcode = data.get('err_code')
errmsg = data.get('err_code_des')
if return_code != 'SUCCESS' or result_code != 'SUCCESS':
# 返回状态码不为成功
raise WeChatPayException(
return_code,
result_code,
return_msg,
errcode,
errmsg,
def endElement(self, full_name):
name = self._build_name(full_name)
if len(self.path) == self.item_depth:
item = self.item
if item is None:
item = (None if not self.data
else self.cdata_separator.join(self.data))
should_continue = self.item_callback(self.path, item)
if not should_continue:
raise ParsingInterrupted()
if len(self.stack):
data = (None if not self.data
else self.cdata_separator.join(self.data))
item = self.item
self.item, self.data = self.stack.pop()
if self.strip_whitespace and data:
data = data.strip() or None
if data and self.force_cdata and item is None:
item = self.dict_constructor()
if item is not None:
if data:
self.push_data(item, self.cdata_key, data)
self.item = self.push_data(self.item, name, item)
else:
self.item = self.push_data(self.item, name, data)
else: