Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
self.context = context
self.filter = repo_filter
self.fts = False
self.label = 'HHypermap'
self.local_ingest = True
self.dbtype = settings.DATABASES['default']['ENGINE'].split('.')[-1]
# HHypermap PostgreSQL installs are PostGIS enabled
if self.dbtype == 'postgis':
self.dbtype = 'postgresql+postgis+wkt'
if self.dbtype in ['sqlite', 'sqlite3']: # load SQLite query bindings
connection.connection.create_function('query_spatial', 4, util.query_spatial)
connection.connection.create_function('get_anytext', 1, util.get_anytext)
connection.connection.create_function('get_geometry_area', 1, util.get_geometry_area)
# generate core queryables db and obj bindings
self.queryables = {}
for tname in self.context.model['typenames']:
for qname in self.context.model['typenames'][tname]['queryables']:
self.queryables[qname] = {}
items = self.context.model['typenames'][tname]['queryables'][qname].items()
for qkey, qvalue in items:
self.queryables[qname][qkey] = qvalue
# flatten all queryables
# TODO smarter way of doing this
keywords = etree.SubElement(idinfo, 'keywords')
theme = etree.SubElement(keywords, 'theme')
for v in val.split(','):
etree.SubElement(theme, 'themekey').text = v
# accessconstraints
val = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:AccessConstraints']) or ''
etree.SubElement(idinfo, 'accconst').text = val
# abstract
descript = etree.SubElement(idinfo, 'descript')
val = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:Abstract']) or ''
etree.SubElement(descript, 'abstract').text = val
# time
datebegin = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:TempExtent_begin'])
dateend = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:TempExtent_end'])
if all([datebegin, dateend]):
timeperd = etree.SubElement(idinfo, 'timeperd')
timeinfo = etree.SubElement(timeperd, 'timeinfo')
rngdates = etree.SubElement(timeinfo, 'timeinfo')
begdate = etree.SubElement(rngdates, 'begdate').text = datebegin
enddate = etree.SubElement(rngdates, 'enddate').text = dateend
# bbox extent
val = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:BoundingBox'])
bboxel = write_extent(val)
if bboxel is not None:
idinfo.append(bboxel)
# contributor
val = util.getqattr(recobj, context.md_core_model['mappings']['pycsw:Contributor']) or ''
def _write_verboseresponse(self, insertresults):
''' show insert result identifiers '''
insertresult = etree.Element(util.nspath_eval('csw30:InsertResult',
self.parent.context.namespaces))
for ir in insertresults:
briefrec = etree.SubElement(insertresult,
util.nspath_eval('csw30:BriefRecord',
self.parent.context.namespaces))
etree.SubElement(briefrec,
util.nspath_eval('dc:identifier',
self.parent.context.namespaces)).text = ir['identifier']
etree.SubElement(briefrec,
util.nspath_eval('dc:title',
self.parent.context.namespaces)).text = ir['title']
return insertresult
if '/' in link: # path is embedded in link
if link[-1] == '/': # directory, skip
continue
if link[0] == '/':
# strip path of WAF URL
link = '%s://%s%s' % (up.scheme, up.netloc, link)
else: # tack on href to WAF URL
link = '%s/%s' % (record, link)
LOGGER.debug('URL is: %s', link)
links.append(link)
LOGGER.debug('%d links found', len(links))
for link in links:
LOGGER.info('Processing link %s', link)
# fetch and parse
linkcontent = util.http_request('GET', link)
recobj = _parse_metadata(context, repos, linkcontent)[0]
recobj.source = link
recobj.mdsource = link
recobjs.append(recobj)
return recobjs
def _parse_constraint(self, element):
''' Parse csw:Constraint '''
query = {}
tmp = element.find(util.nspath_eval('ogc:Filter', self.parent.context.namespaces))
if tmp is not None:
LOGGER.debug('Filter constraint specified.')
try:
query['type'] = 'filter'
query['where'], query['values'] = fes1.parse(tmp,
self.parent.repository.queryables['_all'], self.parent.repository.dbtype,
self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts)
except Exception as err:
return 'Invalid Filter request: %s' % err
tmp = element.find(util.nspath_eval('csw:CqlText', self.parent.context.namespaces))
if tmp is not None:
LOGGER.debug('CQL specified: %s.', tmp.text)
try:
LOGGER.debug('Transforming CQL into OGC Filter')
query['type'] = 'filter'
self.context.namespaces), nsmap=self.context.namespaces)
node.attrib[util.nspath_eval('xsi:schemaLocation',
self.context.namespaces)] = '%s %s' % \
(self.context.namespaces['soapenv'], self.context.namespaces['soapenv'])
node2 = etree.SubElement(node, util.nspath_eval('soapenv:Body',
self.context.namespaces))
if hasattr(self, 'exception') and self.exception:
node3 = etree.SubElement(node2, util.nspath_eval('soapenv:Fault',
self.context.namespaces))
node4 = etree.SubElement(node3, util.nspath_eval('soapenv:Code',
self.context.namespaces))
etree.SubElement(node4, util.nspath_eval('soapenv:Value',
self.context.namespaces)).text = 'soap:Server'
node4 = etree.SubElement(node3, util.nspath_eval('soapenv:Reason',
self.context.namespaces))
etree.SubElement(node4, util.nspath_eval('soapenv:Text',
self.context.namespaces)).text = 'A server exception was encountered.'
node4 = etree.SubElement(node3, util.nspath_eval('soapenv:Detail',
self.context.namespaces))
node4.append(self.response)
else:
node2.append(self.response)
self.response = node
def response(self, response, kvp, repository, server_url):
"""process OAI-PMH request"""
mode = kvp.pop('mode', None)
if 'config' in kvp:
config_val = kvp.pop('config')
url = '%smode=oaipmh' % util.bind_url(server_url)
node = etree.Element(util.nspath_eval('oai:OAI-PMH', self.namespaces), nsmap=self.namespaces)
node.set(util.nspath_eval('xsi:schemaLocation', self.namespaces), '%s http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd' % self.namespaces['oai'])
LOGGER.debug(etree.tostring(node))
etree.SubElement(node, util.nspath_eval('oai:responseDate', self.namespaces)).text = util.get_today_and_now()
etree.SubElement(node, util.nspath_eval('oai:request', self.namespaces), attrib=kvp).text = url
if 'verb' not in kvp:
etree.SubElement(node, util.nspath_eval('oai:error', self.namespaces), code='badArgument').text = 'Missing \'verb\' parameter'
return node
if kvp['verb'] not in self.request_model.keys():
etree.SubElement(node, util.nspath_eval('oai:error', self.namespaces), code='badArgument').text = 'Unknown verb \'%s\'' % kvp['verb']
return node
node2 = etree.SubElement(
node, util.nspath_eval('soapenv:Body', self.context.namespaces))
if self.exception:
node3 = etree.SubElement(
node2,
util.nspath_eval('soapenv:Fault', self.context.namespaces)
)
node4 = etree.SubElement(
node3,
util.nspath_eval('soapenv:Code', self.context.namespaces)
)
etree.SubElement(
node4,
util.nspath_eval('soapenv:Value', self.context.namespaces)
).text = 'soap:Server'
node4 = etree.SubElement(
node3,
util.nspath_eval('soapenv:Reason', self.context.namespaces)
)
etree.SubElement(
node4,
util.nspath_eval('soapenv:Text', self.context.namespaces)
).text = 'A server exception was encountered.'
node4 = etree.SubElement(
node3,
util.nspath_eval('soapenv:Detail', self.context.namespaces)
)
etree.SubElement(node, util.nspath_eval('atom:link', NAMESPACES), href='%s?service=CSW&version=2.0.2&request=GetRepositoryItem&id=%s' % (url, util.getqattr(result, context.md_core_model['mappings']['pycsw:Identifier'])))
# atom:title
el = etree.SubElement(node, util.nspath_eval(XPATH_MAPPINGS['pycsw:Title'], NAMESPACES))
val = util.getqattr(result, context.md_core_model['mappings']['pycsw:Title'])
if val:
el.text =val
# atom:updated
el = etree.SubElement(node, util.nspath_eval(XPATH_MAPPINGS['pycsw:Modified'], NAMESPACES))
val = util.getqattr(result, context.md_core_model['mappings']['pycsw:Modified'])
if val:
el.text =val
else:
val = util.getqattr(result, context.md_core_model['mappings']['pycsw:InsertDate'])
el.text = val
for qval in ['pycsw:PublicationDate', 'pycsw:AccessConstraints', 'pycsw:Source', 'pycsw:Abstract']:
val = util.getqattr(result, context.md_core_model['mappings'][qval])
if val:
etree.SubElement(node, util.nspath_eval(XPATH_MAPPINGS[qval], NAMESPACES)).text = val
# bbox extent
val = util.getqattr(result, context.md_core_model['mappings']['pycsw:BoundingBox'])
bboxel = write_extent(val, context.namespaces)
if bboxel is not None:
node.append(bboxel)
return node