Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def response_csw2sru(self, element, environ):
"""transform a CSW response into an SRU response"""
response_name = etree.QName(element).localname
if response_name == 'Capabilities': # explain
node = etree.Element(util.nspath_eval('sru:explainResponse', self.namespaces), nsmap=self.namespaces)
etree.SubElement(node, util.nspath_eval('sru:version', self.namespaces)).text = self.sru_version
record = etree.SubElement(node, util.nspath_eval('sru:record', self.namespaces))
etree.SubElement(record, util.nspath_eval('sru:recordPacking', self.namespaces)).text = 'XML'
etree.SubElement(record, util.nspath_eval('sru:recordSchema', self.namespaces)).text = 'http://explain.z3950.org/dtd/2.1/'
recorddata = etree.SubElement(record, util.nspath_eval('sru:recordData', self.namespaces))
explain = etree.SubElement(recorddata, util.nspath_eval('zr:explain', self.namespaces))
serverinfo = etree.SubElement(explain, util.nspath_eval('zr:serverInfo', self.namespaces), protocol='SRU', version=self.sru_version, transport='http', method='GET POST SOAP')
parser = etree.XMLParser(schema=schema, resolve_entities=False)
if hasattr(self.parent, 'soap') and self.parent.soap:
# validate the body of the SOAP request
doc = etree.fromstring(etree.tostring(doc), parser)
else: # validate the request normally
doc = etree.fromstring(postdata, parser)
LOGGER.debug('Request is valid XML')
else: # parse Transaction without validation
doc = etree.fromstring(postdata, self.parent.context.parser)
except Exception as err:
errortext = \
'Exception: the document is not valid.\nError: %s' % str(err)
LOGGER.exception(errortext)
return errortext
request['request'] = etree.QName(doc).localname
LOGGER.debug('Request operation %s specified.', request['request'])
tmp = doc.find('.').attrib.get('service')
if tmp is not None:
request['service'] = tmp
tmp = doc.find('.').attrib.get('version')
if tmp is not None:
request['version'] = tmp
tmp = doc.find('.//%s' % util.nspath_eval('ows20:Version',
self.parent.context.namespaces))
if tmp is not None:
request['version'] = tmp.text
tmp = doc.find('.').attrib.get('updateSequence')
def _csw3_2_os(self):
"""CSW 3.0.0 Capabilities to OpenSearch Description"""
response_name = etree.QName(self.exml).localname
if response_name == 'GetRecordsResponse':
startindex = int(self.exml.xpath('//@nextRecord')[0]) - int(
self.exml.xpath('//@numberOfRecordsReturned')[0])
if startindex < 1:
startindex = 1
node = etree.Element(util.nspath_eval('atom:feed',
self.context.namespaces), nsmap=self.namespaces)
etree.SubElement(node, util.nspath_eval('atom:id',
self.context.namespaces)).text = self.cfg.get('server', 'url')
etree.SubElement(node, util.nspath_eval('atom:title',
self.context.namespaces)).text = self.cfg.get('metadata:main',
'identification_title')
author = etree.SubElement(node, util.nspath_eval('atom:author', self.context.namespaces))
etree.SubElement(author, util.nspath_eval('atom:name', self.context.namespaces)).text = self.cfg.get('metadata:main',
def _csw2_2_os(self):
"""CSW 2.0.2 Capabilities to OpenSearch Description"""
operation_name = etree.QName(self.exml).localname
if operation_name == 'GetRecordsResponse':
startindex = int(self.exml.xpath('//@nextRecord')[0]) - int(
self.exml.xpath('//@numberOfRecordsReturned')[0])
if startindex < 1:
startindex = 1
node = etree.Element(util.nspath_eval('atom:feed',
self.context.namespaces), nsmap=self.namespaces)
etree.SubElement(node, util.nspath_eval('atom:id',
self.context.namespaces)).text = self.cfg.get('server', 'url')
etree.SubElement(node, util.nspath_eval('atom:title',
self.context.namespaces)).text = self.cfg.get('metadata:main',
'identification_title')
#etree.SubElement(node, util.nspath_eval('atom:updated',
# self.context.namespaces)).text = self.exml.xpath('//@timestamp')[0]
def _get_comparison_operator(element):
"""return the SQL operator based on Filter query"""
element_name = etree.QName(element).localname
return MODEL['ComparisonOperators']['ogc:%s' % element_name]['opvalue']
LOGGER.debug('Scanning for spatial property name')
if property_name is None:
raise RuntimeError('Missing ogc:PropertyName in spatial filter')
if (property_name.text.find('BoundingBox') == -1 and
property_name.text.find('Envelope') == -1):
raise RuntimeError('Invalid ogc:PropertyName in spatial filter: %s' %
property_name.text)
geometry = gml3.Geometry(element, nsmap)
#make decision to apply spatial ranking to results
set_spatial_ranking(geometry)
spatial_predicate = etree.QName(element).localname.lower()
LOGGER.debug('Spatial predicate: %s', spatial_predicate)
if dbtype == 'mysql': # adjust spatial query for MySQL
LOGGER.debug('Adjusting spatial query for MySQL')
if spatial_predicate == 'bbox':
spatial_predicate = 'intersects'
if spatial_predicate == 'beyond':
spatial_query = "ifnull(distance(geomfromtext(%s), \
geomfromtext('%s')) > convert(%s, signed),false)" % \
(geomattr, geometry.wkt, distance)
elif spatial_predicate == 'dwithin':
spatial_query = "ifnull(distance(geomfromtext(%s), \
geomfromtext('%s')) <= convert(%s, signed),false)" % \
(geomattr, geometry.wkt, distance)
(_get_spatial_operator(
queryables['pycsw:BoundingBox'],
child, dbtype, nsmap), boolean_false))
else:
queries.append("%s = %s" %
(_get_spatial_operator(
queryables['pycsw:BoundingBox'],
child, dbtype, nsmap), boolean_true))
elif child.tag == util.nspath_eval('ogc:FeatureId', nsmap):
LOGGER.debug('ogc:FeatureId filter detected')
queries.append("%s = %s" % (queryables['pycsw:Identifier'], assign_param()))
values.append(child.attrib.get('fid'))
else: # comparison operator
LOGGER.debug('Comparison operator processing')
child_tag_name = etree.QName(child).localname
tagname = ' %s ' % child_tag_name.lower()
if tagname in [' or ', ' and ']: # this is a nested binary logic query
LOGGER.debug('Nested binary logic detected; operator=%s', tagname)
for child2 in child.xpath('child::*'):
queries_nested.append(_get_comparison_expression(child2))
queries.append('(%s)' % tagname.join(queries_nested))
else:
queries.append(_get_comparison_expression(child))
where = boq.join(queries) if (boq is not None and boq != ' not ') \
else queries[0]
return where, values
LOGGER.debug('Scanning for spatial property name')
if property_name is None:
raise RuntimeError('Missing ogc:PropertyName in spatial filter')
if (property_name.text.find('BoundingBox') == -1 and
property_name.text.find('Envelope') == -1):
raise RuntimeError('Invalid ogc:PropertyName in spatial filter: %s' %
property_name.text)
geometry = gml3.Geometry(element, nsmap)
#make decision to apply spatial ranking to results
set_spatial_ranking(geometry)
spatial_predicate = etree.QName(element).localname.lower()
LOGGER.debug('Spatial predicate: %s', spatial_predicate)
if dbtype == 'mysql': # adjust spatial query for MySQL
LOGGER.debug('Adjusting spatial query for MySQL')
if spatial_predicate == 'bbox':
spatial_predicate = 'intersects'
if spatial_predicate == 'beyond':
spatial_query = "ifnull(distance(geomfromtext(%s), \
geomfromtext('%s')) > convert(%s, signed),false)" % \
(geomattr, geometry.wkt, distance)
elif spatial_predicate == 'dwithin':
spatial_query = "ifnull(distance(geomfromtext(%s), \
geomfromtext('%s')) <= convert(%s, signed),false)" % \
(geomattr, geometry.wkt, distance)