Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _daas_getbuffer(pixel_encoding, drv_name, drv_options, mime_type):
if gdaltest.daas_drv is None:
pytest.skip()
if gdaltest.webserver_port == 0:
pytest.skip()
drv = gdal.GetDriverByName(drv_name)
if drv is None:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/daas/sensors/products/foo/images/bar', 200, {},
json.dumps({"response": {"payload": {"payload": {"imageMetadata": {"properties": {
"width": 100,
"height": 100,
"pixelType": "Byte",
"_links": {
"getBuffer": {
"href": 'http://127.0.0.1:%d/getbuffer' % gdaltest.webserver_port
}
},
"bands": [
{
"name": "Band 1"
},
{
"name": "Band 2"
def test_vsiswift_fake_mkdir_rmdir():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
# Invalid name
ret = gdal.Mkdir('/vsiswift', 0)
assert ret != 0
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/dir/', 404, {'Connection': 'close'})
handler.add('GET', '/v1/AUTH_something/foo?delimiter=%2F&limit=10000', 200, {'Connection': 'close'}, "[]")
handler.add('PUT', '/v1/AUTH_something/foo/dir/', 201)
with webserver.install_http_handler(handler):
ret = gdal.Mkdir('/vsiswift/foo/dir', 0)
assert ret == 0
# Try creating already existing directory
handler = webserver.SequentialHandler()
handler.add('GET', '/v1/AUTH_something/foo/dir/', 404, {'Connection': 'close'})
handler.add('GET', '/v1/AUTH_something/foo?delimiter=%2F&limit=10000',
200,
{'Connection': 'close', 'Content-type': 'application/json'},
"""[ { "subdir": "dir/" } ]""")
with webserver.install_http_handler(handler):
ret = gdal.Mkdir('/vsiswift/foo/dir', 0)
if gdaltest.opaif_drv is None:
pytest.skip()
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/oapif/collections', 200, {'Content-Type': 'application/json'},
"""{ "collections" : [ {
"id": "foo"
}] }""")
with webserver.install_http_handler(handler):
ds = ogr.Open('OAPIF:http://localhost:%d/oapif' % gdaltest.webserver_port)
lyr = ds.GetLayer(0)
handler = webserver.SequentialHandler()
handler.add('GET', '/oapif/collections/foo/items?limit=10', 200, {'Content-Type': 'application/json'}, '{}')
handler.add('GET', '/oapif', 200, {'Content-Type': 'application/json'},
"""{ "links" : [ { "rel": "service-desc",
"href" : "http://localhost:%d/oapif/my_api",
"type": "application/vnd.oai.openapi+json;version=3.0" } ] }""" % gdaltest.webserver_port )
# Fake openapi response
handler.add('GET', '/oapif/my_api', 200, {'Content-Type': 'application/vnd.oai.openapi+json; charset=utf-8; version=3.0'},
"""{
"openapi": "3.0.0",
"paths" : {
"/collections/foo/items": {
"get": {
"parameters": [
{
"$ref" : "#/components/parameters/resultType"
}
def test_vsiaz_write_appendblob_retry():
if gdaltest.webserver_port == 0:
pytest.skip()
gdal.VSICurlClearCache()
with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01',
'VSIAZ_CHUNK_SIZE_BYTES': '10'}):
f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
assert f is not None
handler = webserver.SequentialHandler()
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('0123456789abcdef', 1, 16, f) == 16
gdal.VSIFCloseL(f)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()
handler.add('POST', '/oss_fake_bucket4/large_file.bin?uploadId=my_id', custom_method=method)
gdal.ErrorReset()
with webserver.install_http_handler(handler):
gdal.VSIFCloseL(f)
assert gdal.GetLastErrorMsg() == ''
handler = webserver.SequentialHandler()
handler.add('POST', '/oss_fake_bucket4/large_file_initiate_403_error.bin?uploads', 403)
handler.add('POST', '/oss_fake_bucket4/large_file_initiate_empty_result.bin?uploads', 200)
handler.add('POST', '/oss_fake_bucket4/large_file_initiate_invalid_xml_result.bin?uploads', 200, {}, 'foo')
handler.add('POST', '/oss_fake_bucket4/large_file_initiate_no_uploadId.bin?uploads', 200, {}, '')
with webserver.install_http_handler(handler):
for filename in ['/vsioss/oss_fake_bucket4/large_file_initiate_403_error.bin',
'/vsioss/oss_fake_bucket4/large_file_initiate_empty_result.bin',
'/vsioss/oss_fake_bucket4/large_file_initiate_invalid_xml_result.bin',
'/vsioss/oss_fake_bucket4/large_file_initiate_no_uploadId.bin']:
with gdaltest.config_option('VSIOSS_CHUNK_SIZE', '1'): # 1 MB
f = gdal.VSIFOpenL(filename, 'wb')
assert f is not None
with gdaltest.error_handler():
ret = gdal.VSIFWriteL(big_buffer, 1, size, f)
assert ret == 0
gdal.ErrorReset()
def test_daas_getbuffer_pixel_encoding_failures():
if gdaltest.daas_drv is None:
pytest.skip()
if gdaltest.webserver_port == 0:
pytest.skip()
handler = webserver.SequentialHandler()
handler.add('GET', '/daas/sensors/products/foo/images/bar', 200, {},
json.dumps({"response": {"payload": {"payload": {"imageMetadata": {"properties": {
"width": 2,
"height": 1,
"pixelType": "UInt16",
"_links": {
"getBuffer": {
"href": 'http://127.0.0.1:%d/getbuffer' % gdaltest.webserver_port
}
},
"bands": [
{
"name": "Band 1"
}
]
}}}}}}))
handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
{}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
assert dir_contents == ['bar.baz', 'mysubdir']
stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
assert stat_res.size == 123456
assert stat_res.mtime == 1
# ReadDir on something known to be a file shouldn't cause network access
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + '/foo/bar.baz')
assert dir_contents is None
# Test error on ReadDir()
handler = webserver.SequentialHandler()
handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
with webserver.install_http_handler(handler):
dir_contents = gdal.ReadDir(
gdaltest.webhdfs_base_connection + 'foo/error_test/')
assert dir_contents is None
test/
""")
with webserver.install_http_handler(handler):
listdir = gdal.ReadDir('/vsioss/visoss_8', 0)
assert listdir == ['test', 'test/']
handler = webserver.SequentialHandler()
with webserver.install_http_handler(handler):
assert not stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test').mode)
handler = webserver.SequentialHandler()
with webserver.install_http_handler(handler):
assert stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test/').mode)
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
assert data == 'foo'
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_fake_bucket/resource', custom_method=get_oss_fake_bucket_resource_method)
with webserver.install_http_handler(handler):
f = open_for_read('/vsioss_streaming/oss_fake_bucket/resource')
assert f is not None
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
gdal.VSIFCloseL(f)
assert data == 'foo'
handler = webserver.SequentialHandler()
def method(request):
request.protocol_version = 'HTTP/1.1'
request.send_response(400)
response = """
<code>AccessDenied</code>
The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.
unused
unuset
localhost:%d
""" % request.server.port
response = '%x\r\n%s\r\n0\r\n\r\n' % (len(response), response)
request.send_header('Content-type', 'application/xml')
request.send_header('Transfer-Encoding', 'chunked')
request.send_header('Connection', 'close')
def test_visoss_4():
if gdaltest.webserver_port == 0:
pytest.skip()
with webserver.install_http_handler(webserver.SequentialHandler()):
with gdaltest.error_handler():
f = gdal.VSIFOpenL('/vsioss/oss_fake_bucket3', 'wb')
assert f is None
handler = webserver.SequentialHandler()
handler.add('GET', '/oss_fake_bucket3/empty_file.bin', 200, {'Connection': 'close'}, 'foo')
with webserver.install_http_handler(handler):
assert gdal.VSIStatL('/vsioss/oss_fake_bucket3/empty_file.bin').size == 3
# Empty file
handler = webserver.SequentialHandler()
def method(request):
if request.headers['Content-Length'] != '0':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
return
request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()