Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
handler.add('GET', '/linestring/0/0/0.pbf', 200, {},
open('data/mvt/linestring/0/0/0.pbf', 'rb').read())
handler.add('GET', '/linestring/0/0/0.pbf', 200, {},
open('data/mvt/linestring/0/0/0.pbf', 'rb').read())
with webserver.install_http_handler(handler):
ds = ogr.Open('MVT:http://127.0.0.1:%d/linestring/0' % gdaltest.webserver_port)
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
assert f is not None
# No metadata file nor tile
handler = webserver.SequentialHandler()
handler.add('GET', '/linestring/metadata.json', 404, {})
handler.add('GET', '/linestring.json', 404, {})
handler.add('GET', '/linestring/0/0/0.pbf', 404, {})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
ds = ogr.Open('MVT:http://127.0.0.1:%d/linestring/0' % gdaltest.webserver_port)
assert ds is None
# No metadata file, but tiles
handler = webserver.SequentialHandler()
handler.add('GET', '/linestring/metadata.json', 404, {})
handler.add('GET', '/linestring.json', 404, {})
handler.add('GET', '/linestring/0/0/0.pbf', 200, {},
open('data/mvt/linestring/0/0/0.pbf', 'rb').read())
handler.add('GET', '/linestring/0/0/0.pbf', 200, {},
open('data/mvt/linestring/0/0/0.pbf', 'rb').read())
with webserver.install_http_handler(handler):
ds = ogr.Open('MVT:http://127.0.0.1:%d/linestring/0' % gdaltest.webserver_port)
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
}
},
"geotransform": [2, 0, 0.1, 49, 0, -0.1],
"bands": [
{
"name": "PAN",
"description": "Panchromatic band",
"colorInterpretation": "GRAY"
},
{
"name": "THE_MASK",
"colorInterpretation": "MAIN_MASK"
}
]
}}}}}}))
with webserver.install_http_handler(handler):
with gdaltest.config_options(
{'GDAL_DAAS_PERFORM_AUTH': 'No',
'GDAL_DAAS_SERVICE_URL': 'http://127.0.0.1:%d/daas' % gdaltest.webserver_port}):
ds = gdal.OpenEx("DAAS:http://127.0.0.1:%d/daas/sensors/products/foo/images/bar" %
gdaltest.webserver_port, open_options=['PIXEL_ENCODING=RAW'])
assert ds.RasterCount == 1
assert ds.GetRasterBand(1).GetMaskFlags() == gdal.GMF_PER_DATASET
assert ds.GetRasterBand(1).GetMaskBand()
assert ds.GetRasterBand(1).GetNoDataValue() is None
handler = webserver.SequentialHandler()
response = """--bd3f250361b619a49ef290d4fdfcf5d5691e385e5a74254803befd5fe2a7
Content-Disposition: form-data; name="Data";
Content-Type: application/octet-stream
%s
gdal.FileFromMemBuffer('/vsimem/aws_credentials', """
[unrelated]
aws_access_key_id = foo
aws_secret_access_key = bar
[default]
aws_access_key_id = AWS_ACCESS_KEY_ID
aws_secret_access_key = AWS_SECRET_ACCESS_KEY
[unrelated]
aws_access_key_id = foo
aws_secret_access_key = bar
""")
handler = webserver.SequentialHandler()
handler.add('GET', '/s3_fake_bucket/resource', custom_method=get_s3_fake_bucket_resource_method)
with webserver.install_http_handler(handler):
f = open_for_read('/vsis3/s3_fake_bucket/resource')
assert f is not None
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
assert data == 'foo'
gdal.SetConfigOption('CPL_AWS_CREDENTIALS_FILE', '')
gdal.Unlink('/vsimem/aws_credentials')
"srs":{
"type":"image"
}
},
"bands":[
"Band 1",
"Band 2",
"Band 3"
]
}""".encode('ascii')
handler.add('POST', '/getbuffer', 200,
{'Content-Type': 'multipart/form-data; boundary=bd3f250361b619a49ef290d4fdfcf5d5691e385e5a74254803befd5fe2a7'},
response,
expected_headers={'Accept': 'application/octet-stream'},
expected_body=expected_body)
with webserver.install_http_handler(handler):
data = ds.GetRasterBand(1).ReadRaster()
assert data == 'AB'.encode('ascii')
content = request.rfile.read(3).decode('ascii')
if content != 'foo':
sys.stderr.write('Did not get expected content: %s\n' % content)
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()
handler.add('PUT', '/s3_fake_bucket3/put_with_retry.bin', 502)
handler.add('PUT', '/s3_fake_bucket3/put_with_retry.bin', custom_method=method)
with webserver.install_http_handler(handler):
gdal.VSIFCloseL(f)
return
expected_authorization = 'Bearer ACCESS_TOKEN'
if request.headers['Authorization'] != expected_authorization:
sys.stderr.write("Bad Authorization: '%s'\n" % str(request.headers['Authorization']))
request.send_response(403)
return
request.send_response(200)
request.send_header('Content-type', 'text/plain')
request.send_header('Content-Length', 3)
request.end_headers()
request.wfile.write("""foo""".encode('ascii'))
handler.add('GET', '/gs_fake_bucket/resource', custom_method=method)
try:
with webserver.install_http_handler(handler):
f = open_for_read('/vsigs/gs_fake_bucket/resource')
assert f is not None
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
except:
if gdal.GetLastErrorMsg().find('CPLRSASHA256Sign() not implemented') >= 0:
pytest.skip()
finally:
gdal.SetConfigOption('GO2A_AUD', None)
gdal.SetConfigOption('GO2A_NOW', None)
gdal.SetConfigOption('GS_OAUTH2_PRIVATE_KEY', '')
gdal.SetConfigOption('GS_OAUTH2_PRIVATE_KEY_FILE', '')
gdal.SetConfigOption('GS_OAUTH2_CLIENT_EMAIL', '')
assert data == 'foo'
Content-Disposition: form-data; name="Data";
Content-Type: """ + mime_type + """
""").replace('\n', '\r\n').encode('ascii') + content + """
--bd3f250361b619a49ef290d4fdfcf5d5691e385e5a74254803befd5fe2a7
Content-Disposition: form-data; name="Info";
Content-Type: application/json
{"properties":{"format":"application/octet-stream","width":100,"height":100}}
--bd3f250361b619a49ef290d4fdfcf5d5691e385e5a74254803befd5fe2a7--""".replace('\n', '\r\n').encode('ascii')
handler.add('POST', '/getbuffer', 200,
{'Content-Type': 'multipart/form-data; boundary=bd3f250361b619a49ef290d4fdfcf5d5691e385e5a74254803befd5fe2a7'},
response,
expected_headers={'Accept': mime_type})
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
data = ds.GetRasterBand(1).ReadRaster(0, 0, 1, 1)
assert data == 'A'.encode('ascii')
data = ds.GetRasterBand(2).ReadRaster(0, 0, 1, 1)
assert data == 'B'.encode('ascii')
if gdaltest.mbtiles_drv is None:
pytest.skip()
if gdal.GetDriverByName('HTTP') is None:
pytest.skip()
if gdal.GetDriverByName('JPEG') is None:
pytest.skip()
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.FileHandler(
{'/byte_jpeg.mbtiles': open('data/byte_jpeg.mbtiles', 'rb').read()})
with webserver.install_http_handler(handler):
ds = gdal.Open('/vsicurl/http://localhost:%d/byte_jpeg.mbtiles' % gdaltest.webserver_port)
assert ds is not None
assert stat.S_ISDIR(gdal.VSIStatL('/vsis3/s3_bucket_test_mkdir/dir').mode)
dir_content = gdal.ReadDir('/vsis3/s3_bucket_test_mkdir/dir')
assert dir_content == ['.']
# Try creating already existing directory
handler = webserver.SequentialHandler()
handler.add('GET', '/s3_bucket_test_mkdir/dir/', 416, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Mkdir('/vsis3/s3_bucket_test_mkdir/dir', 0)
assert ret != 0
handler = webserver.SequentialHandler()
handler.add('DELETE', '/s3_bucket_test_mkdir/dir/', 204)
with webserver.install_http_handler(handler):
ret = gdal.Rmdir('/vsis3/s3_bucket_test_mkdir/dir')
assert ret == 0
# Try deleting already deleted directory
handler = webserver.SequentialHandler()
handler.add('GET', '/s3_bucket_test_mkdir/dir/', 404)
handler.add('GET', '/s3_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir%2F', 404, {'Connection': 'close'})
with webserver.install_http_handler(handler):
ret = gdal.Rmdir('/vsis3/s3_bucket_test_mkdir/dir')
assert ret != 0
# Try deleting non-empty directory
handler = webserver.SequentialHandler()
handler.add('GET', '/s3_bucket_test_mkdir/dir_nonempty/', 416)
handler.add('GET', '/s3_bucket_test_mkdir/?delimiter=%2F&max-keys=100&prefix=dir_nonempty%2F', 200,
{'Content-type': 'application/xml'},