Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@staticmethod
def _get_raw_xml(data, path):
xml = data.find(path)
if xml is not None:
return ElementTree.tostring(xml, encoding="utf-8")
else:
return ""
def list_pages(self, url, max_items=200, offset=0):
response = self.get_from_hbogo(url + self.LANGUAGE_CODE + "&max=" + str(max_items) + "&offset=" + str(offset), 'xml')
if response is False:
return
count = 0
for item in response.findall('.//item'):
count += 1
item_link = item.find('link').text
if item_link:
if self.lograwdata:
self.log(ET.tostring(item, encoding='utf8'))
item_type = py2_encode(item.find('clearleap:itemType', namespaces=self.NAMESPACES).text)
if item_type != 'media':
self.addDir(item)
elif item_type == 'media':
self.addLink(item, HbogoConstants.ACTION_PLAY)
else:
self.log('Unknown item type: ' + item_type)
self.log('List pages total items: ' + str(count))
if count == max_items:
self.log('List pages calling next page... max: ' + str(max_items) + ' offset: ' + str(offset + max_items))
self.list_pages(url, max_items, offset + max_items)
def play(self, content_id, retry=0):
self.log("Initializing playback... " + str(content_id))
self.login()
media_item = self.get_from_hbogo(self.API_URL_BROWSE + content_id + self.LANGUAGE_CODE, 'xml')
if media_item is False:
return
media_info = self.construct_media_info(media_item.find('.//item'))
if self.lograwdata:
self.log("Play Media: ")
self.log(ET.tostring(media_item, encoding='utf8'))
mpd_pre_url = media_item.find('.//media:content[@profile="HBO-DASH-WIDEVINE"]', namespaces=self.NAMESPACES).get('url') + '&responseType=xml'
mpd = self.get_from_hbogo(mpd_pre_url, 'xml', False)
if mpd is False:
return
if self.lograwdata:
self.log("Manifest: ")
self.log(ET.tostring(mpd, encoding='utf8'))
mpd_url = mpd.find('.//url').text
self.log("Manifest url: " + str(mpd_url))
media_guid = media_item.find('.//guid').text
license_headers = 'X-Clearleap-AssetID=' + media_guid + '&X-Clearleap-DeviceId=' + self.API_DEVICE_ID + \
def __init__(self, filename, test):
bug_patterns = dict()
dupes = dict()
SEVERITY = {
'1': 'High',
'2': 'Medium',
'3': 'Low'
}
tree = ET.parse(filename)
root = tree.getroot()
for pattern in root.findall('BugPattern'):
plain_pattern = re.sub(r'<[b-z/]*?>||href=', '', ET.tostring(pattern.find('Details'), method='text'))
bug_patterns[pattern.get('type')] = plain_pattern
for bug in root.findall('BugInstance'):
desc = ''
for message in bug.itertext():
desc += message
dupe_key = bug.get('instanceHash')
title = bug.find('ShortMessage').text
cwe = bug.get('cweid', default=0)
severity = SEVERITY[bug.get('priority')]
description = desc
mitigation = bug_patterns[bug.get('type')]
impact = 'N/A'
references = 'N/A'
import urllib.request as urllib2
except ImportError:
import httplib
import urllib2
from otrs.objects import extract_tagname
from otrs.objects import OTRSObject
from posixpath import join as urljoin
import sys
# defusedxml doesn't define these non-parsing related objects
from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
from xml.etree.ElementTree import tostring
etree.Element = _ElementType = Element
etree.SubElement = SubElement
etree.tostring = tostring
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
class OTRSError(Exception):
"""Base class for OTRS Errors."""
def __init__(self, fd):
"""Initialize OTRS Error."""
self.code = fd.getcode()
self.msg = fd.read()
'' \
'' \
''
el_notify = ET.fromstring(notify_base)
el_last_change = el_notify.find('.//LastChange', NS)
el_last_change.text = ET.tostring(el_event).decode('utf-8')
global SUBSCRIBED_CLIENTS
service_name = self.SERVICE_NAME
for sid, url in SUBSCRIBED_CLIENTS[service_name].items():
headers = {
'SID': sid
}
with ClientSession(loop=asyncio.get_event_loop()) as session:
with async_timeout.timeout(10):
data = ET.tostring(el_notify)
LOGGER.debug('Calling: %s', url)
yield from session.request('NOTIFY', url, headers=headers, data=data)
def get_thumbnail_url(self, item):
if self.lograwdata:
self.log("get thumbnail xml:")
self.log(ET.tostring(item, encoding='utf8'))
try:
thumbnails = item.findall('.//media:thumbnail', namespaces=self.NAMESPACES)
for thumb in thumbnails:
if thumb.get('height') == '1080':
if self.lograwdata:
self.log("Huge Poster found, using as thumbnail")
return str(thumb.get('url'))
for thumb in thumbnails:
if thumb.get('height') == '720':
if self.lograwdata:
self.log("Large Poster found, using as thumbnail")
return str(thumb.get('url'))
if self.lograwdata:
self.log("Poster not found using first one")
return str(thumbnails[0].get('url'))
except Exception:
domain_xml = domain.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE
| libvirt.VIR_DOMAIN_XML_INACTIVE)
xmltree = ET.fromstring(domain_xml)
# Delete all the current boot entries
os_tree = xmltree.find("./os")
boot_elements = os_tree.findall("./boot")
for e in boot_elements:
os_tree.remove(e)
# Now apply our boot order which is 'network' and then 'hd'
os_tree.append(ET.fromstring(""))
os_tree.append(ET.fromstring(""))
# And now save the new XML def to the hypervisor
domain_xml = ET.tostring(xmltree, encoding="utf-8")
ses.defineXML(domain_xml.decode('utf-8'))
ses.close()
answered_elem.text = '1' if answered else '0'
outgoing_elem = ET.SubElement(call_elem, 'outgoing')
outgoing_elem.text = '1' if outgoing else '0'
start_time_elem = ET.SubElement(call_elem, 'start-time')
start_time_elem.text = call_list_timestamp(timestamp)
if answered:
answered_time_elem = ET.SubElement(call_elem, 'answered-time')
answered_time_elem.text = call_list_timestamp(timestamp + 10)
end_time_elem = ET.SubElement(call_elem, 'end-time')
end_time_elem.text = call_list_timestamp(timestamp + 60)
return ET.tostring(call_elem)