Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cherche_osm_buildings_proches(code_departement, code_commune, osm, transform_to_osm, transform_from_osm):
""" Cherche a intégrer les nœuds "addr:housenumber" du fichier
d'entrée osm avec les building extraits de la base OSM.
"""
sys.stdout.write((u"Intégration avec les buidings proches présent dans la base OSM.\n").encode("utf-8"))
sys.stdout.write((u"Chargement des buidings\n").encode("utf-8"))
sys.stdout.flush();
buildings_osm = get_osm_buildings_and_barrier_ways(code_departement, code_commune)
for node in itertools.chain.from_iterable(
[itervalues(o.nodes) for o in [osm, buildings_osm]]):
if not hasattr(node,'xy'):
node.position = transform_from_osm((float(node.attrs["lon"]), float(node.attrs["lat"])))
# créé un index spatial de tous les ways:
ways_index = rtree.index.Index()
for way in itervalues(buildings_osm.ways):
if way.nodes[0] == way.nodes[-1]:
way.shape = Polygon([buildings_osm.nodes[id].position for id in way.nodes])
else:
way.shape = LineString([buildings_osm.nodes[id].position for id in way.nodes])
ways_index.insert(way.id(), way.shape.bounds, way.id())
sys.stdout.write((u"Recherche des buiding proches\n").encode("utf-8"))
sys.stdout.flush();
for node in osm.nodes.values():
if "addr:housenumber" in node.tags:
x,y = node.position
search_bounds = [x - MAX_BUILDING_DISTANCE_METERS, y - MAX_BUILDING_DISTANCE_METERS,
x + MAX_BUILDING_DISTANCE_METERS, y + MAX_BUILDING_DISTANCE_METERS]
near_ways = [buildings_osm.ways[e.object] for e in ways_index.intersection(search_bounds, objects=True)]
if hasattr(node, 'limite_parcelle') and node.limite_parcelle != None:
#and node.liimite_parcelle.distance(node.position) < MAX_BUILDING_DISTANCE_METERS:
#'landparcel_id': line[0],
#'mistral_function': line[1],
'toid': line['toid'],
#'household_id': line[4],
#'res_count': line[6],
#'lad': line[13],
'oa': line['oa'],
}
}
yield (i, geom_point.bounds, feature)
i += 1
output = []
try:
# create index from generator (see http://toblerity.org/rtree/performance.html#use-stream-loading)
idx = index.Index(premises())
for n in idx.intersection((shape(exchange_area['geometry']).bounds), objects=True):
point = n.object['representative_point']
if prepared_area.contains(point):
del n.object['representative_point']
output.append(n.object)
except:
print('{} failed'.format(exchange_area['properties']['id']))
return output
def compute_buildings_polygons_and_rtree(osm_data, tolerance):
buildings_rtree = rtree.index.Index()
osm_data.buildings_rtree = buildings_rtree
for way in itervalues(osm_data.ways):
if way.isBuilding:
if len(way.nodes) >= 3:
way.polygon = Polygon([osm_data.nodes[i].position for i in way.nodes])
else:
way.polygon = LineString([osm_data.nodes[i].position for i in way.nodes])
way.bbox = way.polygon.bounds
way.tolerance_polygon = way.polygon.buffer(tolerance)
buildings_rtree.insert(way.id(), way.bbox, way.textid())
for rel in itervalues(osm_data.relations):
if rel.isBuilding:
exterior = None
interiors = []
for rtype,rref,rrole in rel.itermembers():
if rtype == "way":
def createRtreeIndex(pf):
idx = rtree.index.Index()
for point in pf:
point_id = int(point['id'])
point_bounds = shapely.geometry.shape(point['geometry']).bounds
idx.insert(point_id, point_bounds)
print('Success -- created rtree')
return idx
def compute_buildings_polygons_and_rtree(osm_data, tolerance):
buildings_rtree = rtree.index.Index()
osm_data.buildings_rtree = buildings_rtree
for way in osm_data.ways.itervalues():
if way.isBuilding:
if len(way.nodes) >= 3:
way.polygon = Polygon([osm_data.nodes[i].position for i in way.nodes])
else:
way.polygon = LineString([osm_data.nodes[i].position for i in way.nodes])
way.bbox = way.polygon.bounds
way.tolerance_polygon = way.polygon.buffer(tolerance)
buildings_rtree.insert(way.id(), way.bbox, way.textid())
for rel in osm_data.relations.itervalues():
if rel.isBuilding:
exterior = None
interiors = []
for rtype,rref,rrole in rel.itermembers():
if rtype == "way":
# Keep index of segments so we don't need to marshal them
# in/out of rtree.
self._segments = {}
self._idsBySegment = {}
# Lookup table for subnets by segment coordinates.
# (need to update subnet lookups as they merge)
self._subnetLookupBySegment = {}
# rtree spatial index for intersection test speedup
# setup properties first
p = index.Property()
p.index_capacity = 10
p.leaf_capacity = 10
p.near_minimum_overlap_factor = 3
self._spatialIndex = index.Index(properties=p)
feature = {
'type': 'Feature',
'geometry': mapping(geom),
'representative_point': geom_point,
'properties':{
'res_count': line['res_count'],
'floor_area': line['floor_area'],
'height_toroofbase': line['height_toroofbase'],
'height_torooftop': line['height_torooftop'],
'number_of_floors': line['number_of_floors'],
'footprint_area': line['footprint_area'],
}
}
yield (i, geom_point.bounds, feature)
idx = index.Index(premises())
output = []
for postcode_sector in postcode_sectors:
postcode_shape = shape(postcode_sector['geometry'])
prepared_area = prep(postcode_shape)
for n in idx.intersection((postcode_shape.bounds), objects=True):
point = n.object['representative_point']
if prepared_area.contains(point):
del n.object['representative_point']
output.append(n.object)
return output
def setup_getfeatureinfo(self, layer, request, location=None):
location = location or 'face'
tree = None
try:
latitude = request.GET['latitude']
longitude = request.GET['longitude']
# Find closest cell or node (only node for now)
if location == 'face':
tree = rtree.index.Index(self.face_tree_root)
elif location == 'node':
tree = rtree.index.Index(self.node_tree_root)
else:
raise NotImplementedError("No RTree for location '{}'".format(location))
try:
nindex = list(tree.nearest((longitude, latitude, longitude, latitude), 1, objects=True))[0]
except IndexError:
raise ValueError("No cells in the {} tree for point {}, {}".format(location, longitude, latitude))
closest_x, closest_y = tuple(nindex.bbox[2:])
geo_index = nindex.object
except BaseException:
raise
finally:
if tree is not None:
tree.close()
def convert(buildingsFile, osmOut):
with open(buildingsFile) as f:
features = json.load(f)
allAddresses = {}
buildings = []
buildingShapes = []
buildingIdx = index.Index()
# Returns the coordinates for this address
def keyFromAddress(address):
return str(address['geometry']['coordinates'][0]) + "," + str(address['geometry']['coordinates'][1])
for feature in features:
if feature['geometry']['type'] == 'Polygon' or feature['geometry']['type'] == 'MultiPolygon':
extra_tags = osm_tags.get_osm_tags(feature)
feature['properties']['osm'] = extra_tags
buildings.append(feature)
shape = asShape(feature['geometry'])
buildingShapes.append(shape)
buildingIdx.add(len(buildingShapes) - 1, shape.bounds)
# These are the addresses that don't overlap any buildings
elif feature['geometry']['type'] == 'Point':