Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_simple_cdata(self):
obj = {'a': 'b'}
self.assertEqual(obj, parse(unparse(obj)))
self.assertEqual(unparse(obj), unparse(parse(unparse(obj))))
def assertXMLEqual(self, expected, xml):
expected = xmltodict.unparse(xmltodict.parse(expected))
xml = xmltodict.unparse(xmltodict.parse(xml))
self.assertEqual(expected, xml)
def test_streaming_interrupt(self):
cb = lambda path, item: False
self.assertRaises(ParsingInterrupted,
parse, '<a>x</a>',
item_depth=1, item_callback=cb)
def __list_flows(
api_call: str,
output_format: str = 'dict'
) -> Union[Dict, pd.DataFrame]:
xml_string = openml._api_calls._perform_api_call(api_call, 'get')
flows_dict = xmltodict.parse(xml_string, force_list=('oml:flow',))
# Minimalistic check if the XML is useful
assert type(flows_dict['oml:flows']['oml:flow']) == list, \
type(flows_dict['oml:flows'])
assert flows_dict['oml:flows']['@xmlns:oml'] == \
'http://openml.org/openml', flows_dict['oml:flows']['@xmlns:oml']
flows = dict()
for flow_ in flows_dict['oml:flows']['oml:flow']:
fid = int(flow_['oml:id'])
flow = {'id': fid,
'full_name': flow_['oml:full_name'],
'name': flow_['oml:name'],
'version': flow_['oml:version'],
'external_version': flow_['oml:external_version'],
'uploader': flow_['oml:uploader']}
def __list_tasks(api_call, output_format='dict'):
xml_string = openml._api_calls._perform_api_call(api_call, 'get')
tasks_dict = xmltodict.parse(xml_string, force_list=('oml:task',
'oml:input'))
# Minimalistic check if the XML is useful
if 'oml:tasks' not in tasks_dict:
raise ValueError('Error in return XML, does not contain "oml:runs": %s'
% str(tasks_dict))
elif '@xmlns:oml' not in tasks_dict['oml:tasks']:
raise ValueError('Error in return XML, does not contain '
'"oml:runs"/@xmlns:oml: %s'
% str(tasks_dict))
elif tasks_dict['oml:tasks']['@xmlns:oml'] != 'http://openml.org/openml':
raise ValueError('Error in return XML, value of '
'"oml:runs"/@xmlns:oml is not '
'"http://openml.org/openml": %s'
% str(tasks_dict))
assert type(tasks_dict['oml:tasks']['oml:task']) == list, \
def dbpedia_lookup(value, col_type=None, max_hits=50):
# URL encode the spaces
value = str(value).replace(" ", "%20")
base_url = '{}/api/search/KeywordSearch?MaxHits={}&QueryString="{}"'
url = base_url.format(DBP_LOOKUP_URI, max_hits, value)
if col_type is not None:
# If a col_type is provided, we append an extra
# parameter to the API
url_suffix = '&QueryClass="{}"'.format(col_type)
url = base_url.format(DBP_LOOKUP_URI, max_hits, value)
url += url_suffix
try:
response = requests.get(url, headers={'Connection': 'close'})
# Response is in XML, we parse it
tree = xmltodict.parse(response.content.decode('utf-8'))
# Check if it contains results
if 'ArrayOfResult' in tree and 'Result' in tree['ArrayOfResult']:
result = tree['ArrayOfResult']['Result']
if isinstance(result, dict):
parsed_result = [result['URI']]
else:
parsed_result = [x['URI'] for x in result]
return parsed_result
else:
return []
except Exception as e:
warnings.warn('[DBPEDIA_LOOKUP] Something went wrong with request to '
'{}. Returning nothing...'.format(url))
parser.add_argument("-x", "--docx", help="target is a modern docx file")
parser.add_argument("-d", "--doc", help="Target is a a regular-old doc file")
args = parser.parse_args()
if args.docx is True:
#Parsing as a docx document:
#Defining a regex for DDE/DDEAUTO
regex = re.compile("DDE.*")
print "Very simple DDE detector for word documents\r\n Written by Amit Serper, @0xAmit\n\n"
#First we open the docx file as a zip
document = zipfile.ZipFile(args.docx, 'r')
#Parsing the XML
xmldata = document.read('word/document.xml')
d = xmltodict.parse(xmldata)
#Looking up for the DDE object in this position, flattening the xml because we're lazy.
DDE = nested_lookup('w:instrText', d)
if DDE:
print "Malicious DDE objects found: \n{0}".format(regex.findall(str(DDE)))
else:
print "No DDE objects were found"
else:
#Parsing as an old DOC file
with open(args.doc, 'rb') as doc:
docstr = doc.read()
pos = docstr.find('DDE') # We're looking for the string 'DDE', this will return its position.
pos = pos-1 # Deducting 1 so we're now 1 char before the string starts
doc_regex = re.compile('^[^"]+') # Regex is looking for the final char to be '"' since that's our delimiter.
res = doc_regex.findall(docstr[pos:]) # Matching from regex to '"'
if "DDE" in str(res):
def get_flash_size(self):
"""Return the available space in the remote directory.
"""
dir_out_dict = xmltodict.parse(self.device.show('dir', text=True)[1])
dir_out = dir_out_dict['ins_api']['outputs']['output']['body']
match = re.search(r'(\d+) bytes free', dir_out)
bytes_free = match.group(1)
return int(bytes_free)
import xmltodict
# NETCONF filter to use
netconf_filter = open("filter-ietf-interfaces.xml").read()
if __name__ == '__main__':
with manager.connect(host=ios_xe1["address"], port=ios_xe1["port"],
username=ios_xe1["username"],
password=ios_xe1["password"],
hostkey_verify=False) as m:
# Get Configuration and State Info for Interface
netconf_reply = m.get(netconf_filter)
# Process the XML and store in useful dictionaries
intf_details = xmltodict.parse(netconf_reply.xml)["rpc-reply"]["data"]
intf_config = intf_details["interfaces"]["interface"]
intf_info = intf_details["interfaces-state"]["interface"]
print("")
print("Interface Details:")
print(" Name: {}".format(intf_config["name"]))
print(" Description: {}".format(intf_config["description"]))
print(" Type: {}".format(intf_config["type"]["#text"]))
print(" MAC Address: {}".format(intf_info["phys-address"]))
print(" Packets Input: {}".format(intf_info["statistics"]["in-unicast-pkts"]))
print(" Packets Output: {}".format(intf_info["statistics"]["out-unicast-pkts"]))
def Converter(workdir, folder, format):
if format == "xml":
fileList = [f for f in os.listdir(folder) if f.endswith(".xml")]
res = {}
t = time.time()
for f in fileList:
print "cTAKES XML files processing: " + f
with open(folder + f) as ff:
doc = xmltodict.parse(ff.read())
try:
bow = doc['CAS']['uima.cas.Sofa']['@sofaString']
bow = bow.replace('\n', ' ').replace('\r', '').replace('\t', ' ').replace(r'<br>', '').replace(r'<p>', '').replace(r'</p>', '').replace(r'<b>', '').replace(r'</b>', '').replace('START_OF_RECORD=1||||1||||', '').replace('||||END_OF_RECORD', '')
bow = bow.lower().encode('utf-8')
concept = doc['CAS']['org.apache.ctakes.typesystem.type.refsem.UmlsConcept']
l = []
for i in xrange(len(concept)):
if concept[i]['@codingScheme'] == 'SNOMEDCT':
prefix = 'S'
elif concept[i]['@codingScheme'] == 'RXNORM':
prefix = 'R'
l.append([concept[i]['@cui'], concept[i]['@tui'], prefix + concept[i]['@code'], concept[i]['@codingScheme']])
umls = []