Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def import_xml_box_file(filepath, input_game_level_id):
print_info("Starting import of file " + filepath)
#TODO refactor validation to reflect new XML format
#errors = validate_xml_box_file(filepath)
errors = []
if len(errors) > 0:
for ind, error in enumerate(errors):
print WARN+"Error " + str(ind) + ": " + error
print_warning_and_exit("XML file was not valid.")
try:
tree = ET.parse(filepath)
boxnode = tree.getroot()
filedir = path.dirname(filepath)
# Check to see if box already exists
bname = boxnode.find('name').text
if len(Box.by_name(unicode(bname))) is not 0:
print_info("Box with name '" + bname + "' already exists. Continuing to next file...")
return
# If box does not already exist, then check for corporation existing
corpnode = boxnode.find('corporation')
corpname = corpnode.find('name').text
corp = Corporation.by_name(unicode(corpname))
if corp is not None:
print_info("Corporation with name '" + corpname + "' already exists. Using the pre-existing corporation.")
else:
def get_packages_swid(package_list):
"""
Get the packages from a swid string
:param package_strs:
:return:
"""
package_xml = None
packages = defaultdict(set)
errors = []
for xml_doc in package_list.split("\n"):
try:
# remove the if any
xml_doc = re.sub('<\?[^>]+\?>', '', xml_doc)
# use DET since this is untrusted data
data = DET.fromstring(xml_doc)
name, version = data.attrib['name'], data.attrib['version']
version = version.split("-")[0]
packages[name].add(version)
except Exception as e:
errors.append(str(e))
return errors, packages
import xml.etree.cElementTree as badET
import defusedxml.cElementTree as goodET
xmlString = "\nTove\nJani\nReminder\nDon't forget me this weekend!\n"
# unsafe
tree = badET.fromstring(xmlString)
print(tree)
badET.parse('filethatdoesntexist.xml')
badET.iterparse('filethatdoesntexist.xml')
a = badET.XMLParser()
# safe
tree = goodET.fromstring(xmlString)
print(tree)
goodET.parse('filethatdoesntexist.xml')
goodET.iterparse('filethatdoesntexist.xml')
a = goodET.XMLParser()
def validate_xml_box_file(filepath):
errors = []
try:
tree = ET.parse(filepath)
root = tree.getroot()
#TODO make sure box has a unique name
# Root node must be of type 'box' and have exactly 7 children
if root.tag != 'box':
errors.append("Root node must be of type 'box'.")
if len(root) is not 7:
errors.append("The root node must have precisely seven children.")
# Make sure the root children are of the correct type
expected_children = ['sponsor', 'corporation', 'name', 'difficulty', 'avatar', 'description', 'flags']
errors += validate_xml_node_children(root, expected_children, 'box')
# Validate the sponsor child
sponsor = root.find('sponsor')
# Add "Point" and "coordinates" tags to element.
point = ET.SubElement(element, "Point")
coords = ET.SubElement(point, "coordinates")
coords.text = "%s, %s" %(lng, lat)
element.remove(subelement)
# Rename "vicinity" and "icon" tags to
# "snippet" and "description" as per naming convention
# being followed in existing Search Services.
elif subelement.tag == "vicinity":
subelement.tag = "snippet"
elif subelement.tag == "icon":
subelement.tag = "description"
xmlstr += ET.tostring(element, method="xml")
total_results += 1
return (xmlstr, total_results)
async def __handle_xml_data(self, data):
self.logger.debug(f'Received XML data: {data}')
element_tree = Et.fromstring(data)
if element_tree.tag == 'policy-file-request':
await self.send_policy_file()
elif element_tree.tag == 'msg':
self.logger.debug('Received valid XML data')
try:
body_tag = element_tree[0]
action = body_tag.get('action')
packet = XMLPacket(action)
if packet in self.server.xml_listeners:
xml_listeners = self.server.xml_listeners[packet]
for listener in xml_listeners:
def _list_page(self, prefix, page_token=None, batch_size=1000):
# We can get at most 1000 keys at a time, so there's no need
# to bother with streaming.
query_string = { 'prefix': prefix, 'max-keys': str(batch_size) }
if page_token:
query_string['marker'] = page_token
resp = self._do_request('GET', '/', query_string=query_string)
if not XML_CONTENT_RE.match(resp.headers['Content-Type']):
raise RuntimeError('unexpected content type: %s' %
resp.headers['Content-Type'])
body = self.conn.readall()
etree = ElementTree.fromstring(body)
root_xmlns_uri = _tag_xmlns_uri(etree)
if root_xmlns_uri is None:
root_xmlns_prefix = ''
else:
# Validate the XML namespace
root_xmlns_prefix = '{%s}' % (root_xmlns_uri, )
if root_xmlns_prefix != self.xml_ns_prefix:
log.error('Unexpected server reply to list operation:\n%s',
self._dump_response(resp, body=body))
raise RuntimeError('List response has unknown namespace')
names = [ x.findtext(root_xmlns_prefix + 'Key')
for x in etree.findall(root_xmlns_prefix + 'Contents') ]
is_truncated = etree.find(root_xmlns_prefix + 'IsTruncated')
if is_truncated.text == 'false':
def loadSettings(self):
if os.path.isfile(get_config_path("MyTerm.xml")):
with open(get_config_path("MyTerm.xml"), 'r') as f:
tree = safeET.parse(f)
port = tree.findtext('GUISettings/PortConfig/port', default='')
if port != '':
self.cmbPort.setCurrentText(port)
baudrate = tree.findtext('GUISettings/PortConfig/baudrate', default='38400')
if baudrate != '':
self.cmbBaudRate.setCurrentText(baudrate)
databits = tree.findtext('GUISettings/PortConfig/databits', default='8')
id = self.cmbDataBits.findText(databits)
if id >= 0:
self.cmbDataBits.setCurrentIndex(id)
parity = tree.findtext('GUISettings/PortConfig/parity', default='None')
id = self.cmbParity.findText(parity)
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for implementing the Custom POI search."""
import json
import logging
import logging.config
from string import Template
import urllib2
from xml.etree.cElementTree import SubElement, tostring
import defusedxml.cElementTree as ET
from search.common import exceptions
from search.common import utils
ET.SubElement = SubElement
ET.tostring = tostring
# Get the logger for logging purposes.
logging.config.fileConfig("/opt/google/gehttpd/conf/ge_logging.conf",
disable_existing_loggers=False)
class CustomPOISearch(object):
"""Class for performing the Custom POI search.
Custom POI Search is a nearby search that demonstrates
how to construct and query an external database based on URL
search string, extract geometries from the result, associate
various styles with them and return the response back to the client.
In this module, an implementation has been provided to
balloon_style: string containing associated balloon style.
Raises:
exceptions.SearchSchemaParserException exception.
psycopg2.Warning/Error exceptions.
"""
self._table_name = table_name
self._file_prefix = file_prefix
logger.info("Ingesting POI file %s into parser...", search_file)
if file_prefix is None:
logger.info("File prefix is None")
else:
logger.info("File prefix is '%s'", file_prefix)
self.__StartDocument()
try:
context = ET.iterparse(search_file, SearchSchemaParser.EVENTS)
except ET.ParseError, e:
row, column = e.position
raise exceptions.SearchSchemaParserException(
"Unable to parse POI file %s."
" A parsing error on row %d column %d: %s" % (
search_file, row, column, e))
logger.info("Ingesting POI file %s into parser done.", search_file)
logger.info("Parsing POI elements and inserting into POI database...")
# File as temp buffer to store records, for COPY db command
self.tmp_file = TempFile(max_size=_K_SPOOL_SIZE, suffix=table_name)
num_elements = 0
self._element_start = self.__StartElementHeader
self._element_end = self.__EndElementHeader
for event, elem in context:
if event == "start":