How to use the webserver.SequentialHandler function in webserver

To help you get started, we’ve selected a few webserver examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github OSGeo / gdal / autotest / gdrivers / daas.py View on Github external
def _daas_getbuffer(pixel_encoding, drv_name, drv_options, mime_type):

    if gdaltest.daas_drv is None:
        pytest.skip()
    if gdaltest.webserver_port == 0:
        pytest.skip()
    drv = gdal.GetDriverByName(drv_name)
    if drv is None:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/daas/sensors/products/foo/images/bar', 200, {},
                json.dumps({"response": {"payload": {"payload": {"imageMetadata": {"properties": {
                    "width": 100,
                    "height": 100,
                    "pixelType": "Byte",
                    "_links": {
                        "getBuffer": {
                            "href": 'http://127.0.0.1:%d/getbuffer' % gdaltest.webserver_port
                        }
                    },
                    "bands": [
                        {
                            "name": "Band 1"
                        },
                        {
                            "name": "Band 2"
github OSGeo / gdal / autotest / gcore / vsiswift.py View on Github external
def test_vsiswift_fake_mkdir_rmdir():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    # Invalid name
    ret = gdal.Mkdir('/vsiswift', 0)
    assert ret != 0

    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/dir/', 404, {'Connection': 'close'})
    handler.add('GET', '/v1/AUTH_something/foo?delimiter=%2F&limit=10000', 200, {'Connection': 'close'}, "[]")
    handler.add('PUT', '/v1/AUTH_something/foo/dir/', 201)
    with webserver.install_http_handler(handler):
        ret = gdal.Mkdir('/vsiswift/foo/dir', 0)
    assert ret == 0

    # Try creating already existing directory
    handler = webserver.SequentialHandler()
    handler.add('GET', '/v1/AUTH_something/foo/dir/', 404, {'Connection': 'close'})
    handler.add('GET', '/v1/AUTH_something/foo?delimiter=%2F&limit=10000',
                200,
                {'Connection': 'close', 'Content-type': 'application/json'},
                """[ { "subdir": "dir/" } ]""")
    with webserver.install_http_handler(handler):
        ret = gdal.Mkdir('/vsiswift/foo/dir', 0)
github OSGeo / gdal / autotest / ogr / ogr_oapif.py View on Github external
if gdaltest.opaif_drv is None:
        pytest.skip()

    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/oapif/collections', 200, {'Content-Type': 'application/json'},
                """{ "collections" : [ {
                    "id": "foo"
                 }] }""")
    with webserver.install_http_handler(handler):
        ds = ogr.Open('OAPIF:http://localhost:%d/oapif' % gdaltest.webserver_port)
    lyr = ds.GetLayer(0)

    handler = webserver.SequentialHandler()
    handler.add('GET', '/oapif/collections/foo/items?limit=10', 200, {'Content-Type': 'application/json'}, '{}')
    handler.add('GET', '/oapif', 200, {'Content-Type': 'application/json'},
                """{ "links" : [ { "rel": "service-desc",
                                   "href" : "http://localhost:%d/oapif/my_api",
                                   "type": "application/vnd.oai.openapi+json;version=3.0" } ] }""" % gdaltest.webserver_port )
    # Fake openapi response
    handler.add('GET', '/oapif/my_api', 200, {'Content-Type': 'application/vnd.oai.openapi+json; charset=utf-8; version=3.0'},
                """{
            "openapi": "3.0.0",
            "paths" : {
                "/collections/foo/items": {
                    "get": {
                        "parameters": [
                            {
                                "$ref" : "#/components/parameters/resultType"
                            }
github OSGeo / gdal / autotest / gcore / vsiaz.py View on Github external
def test_vsiaz_write_appendblob_retry():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    gdal.VSICurlClearCache()

    with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
                                  'GDAL_HTTP_RETRY_DELAY': '0.01',
                                  'VSIAZ_CHUNK_SIZE_BYTES': '10'}):

        f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
        assert f is not None

        handler = webserver.SequentialHandler()
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 201)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
        handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)

        with webserver.install_http_handler(handler):
            assert gdal.VSIFWriteL('0123456789abcdef', 1, 16, f) == 16
            gdal.VSIFCloseL(f)
github OSGeo / gdal / autotest / gcore / vsioss.py View on Github external
request.send_header('Content-Length', 0)
            request.end_headers()
            return

        request.send_response(200)
        request.send_header('Content-Length', 0)
        request.end_headers()

    handler.add('POST', '/oss_fake_bucket4/large_file.bin?uploadId=my_id', custom_method=method)

    gdal.ErrorReset()
    with webserver.install_http_handler(handler):
        gdal.VSIFCloseL(f)
    assert gdal.GetLastErrorMsg() == ''

    handler = webserver.SequentialHandler()
    handler.add('POST', '/oss_fake_bucket4/large_file_initiate_403_error.bin?uploads', 403)
    handler.add('POST', '/oss_fake_bucket4/large_file_initiate_empty_result.bin?uploads', 200)
    handler.add('POST', '/oss_fake_bucket4/large_file_initiate_invalid_xml_result.bin?uploads', 200, {}, 'foo')
    handler.add('POST', '/oss_fake_bucket4/large_file_initiate_no_uploadId.bin?uploads', 200, {}, '')
    with webserver.install_http_handler(handler):
        for filename in ['/vsioss/oss_fake_bucket4/large_file_initiate_403_error.bin',
                         '/vsioss/oss_fake_bucket4/large_file_initiate_empty_result.bin',
                         '/vsioss/oss_fake_bucket4/large_file_initiate_invalid_xml_result.bin',
                         '/vsioss/oss_fake_bucket4/large_file_initiate_no_uploadId.bin']:
            with gdaltest.config_option('VSIOSS_CHUNK_SIZE', '1'):  # 1 MB
                f = gdal.VSIFOpenL(filename, 'wb')
            assert f is not None
            with gdaltest.error_handler():
                ret = gdal.VSIFWriteL(big_buffer, 1, size, f)
            assert ret == 0
            gdal.ErrorReset()
github OSGeo / gdal / autotest / gdrivers / daas.py View on Github external
def test_daas_getbuffer_pixel_encoding_failures():

    if gdaltest.daas_drv is None:
        pytest.skip()
    if gdaltest.webserver_port == 0:
        pytest.skip()

    handler = webserver.SequentialHandler()
    handler.add('GET', '/daas/sensors/products/foo/images/bar', 200, {},
                json.dumps({"response": {"payload": {"payload": {"imageMetadata": {"properties": {
                    "width": 2,
                    "height": 1,
                    "pixelType": "UInt16",
                    "_links": {
                        "getBuffer": {
                            "href": 'http://127.0.0.1:%d/getbuffer' % gdaltest.webserver_port
                        }
                    },
                    "bands": [
                        {
                            "name": "Band 1"
                        }
                    ]
                }}}}}}))
github OSGeo / gdal / autotest / gcore / vsiwebhdfs.py View on Github external
handler.add('GET', '/webhdfs/v1/foo/?op=LISTSTATUS', 200,
                {}, '{"FileStatuses":{"FileStatus":[{"type":"FILE","modificationTime":1000,"pathSuffix":"bar.baz","length":123456},{"type":"DIRECTORY","pathSuffix":"mysubdir","length":0}]}}')
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(gdaltest.webhdfs_base_connection + '/foo')
    assert dir_contents == ['bar.baz', 'mysubdir']
    stat_res = gdal.VSIStatL(gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    assert stat_res.size == 123456
    assert stat_res.mtime == 1

    # ReadDir on something known to be a file shouldn't cause network access
    dir_contents = gdal.ReadDir(
        gdaltest.webhdfs_base_connection + '/foo/bar.baz')
    assert dir_contents is None

    # Test error on ReadDir()
    handler = webserver.SequentialHandler()
    handler.add('GET', '/webhdfs/v1foo/error_test/?op=LISTSTATUS', 404)
    with webserver.install_http_handler(handler):
        dir_contents = gdal.ReadDir(
            gdaltest.webhdfs_base_connection + 'foo/error_test/')
    assert dir_contents is None
github OSGeo / gdal / autotest / gcore / vsioss.py View on Github external
test/
                        
                    
                """)

    with webserver.install_http_handler(handler):
        listdir = gdal.ReadDir('/vsioss/visoss_8', 0)
    assert listdir == ['test', 'test/']

    handler = webserver.SequentialHandler()
    with webserver.install_http_handler(handler):
        assert not stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test').mode)

    handler = webserver.SequentialHandler()
    with webserver.install_http_handler(handler):
        assert stat.S_ISDIR(gdal.VSIStatL('/vsioss/visoss_8/test/').mode)
github OSGeo / gdal / autotest / gcore / vsioss.py View on Github external
data = gdal.VSIFReadL(1, 4, f).decode('ascii')
        gdal.VSIFCloseL(f)

        assert data == 'foo'

    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_fake_bucket/resource', custom_method=get_oss_fake_bucket_resource_method)
    with webserver.install_http_handler(handler):
        f = open_for_read('/vsioss_streaming/oss_fake_bucket/resource')
        assert f is not None
        data = gdal.VSIFReadL(1, 4, f).decode('ascii')
        gdal.VSIFCloseL(f)

    assert data == 'foo'

    handler = webserver.SequentialHandler()

    def method(request):
        request.protocol_version = 'HTTP/1.1'
        request.send_response(400)
        response = """

  <code>AccessDenied</code>
  The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.
  unused
  unuset
  localhost:%d
""" % request.server.port
        response = '%x\r\n%s\r\n0\r\n\r\n' % (len(response), response)
        request.send_header('Content-type', 'application/xml')
        request.send_header('Transfer-Encoding', 'chunked')
        request.send_header('Connection', 'close')
github OSGeo / gdal / autotest / gcore / vsioss.py View on Github external
def test_visoss_4():

    if gdaltest.webserver_port == 0:
        pytest.skip()

    with webserver.install_http_handler(webserver.SequentialHandler()):
        with gdaltest.error_handler():
            f = gdal.VSIFOpenL('/vsioss/oss_fake_bucket3', 'wb')
    assert f is None

    handler = webserver.SequentialHandler()
    handler.add('GET', '/oss_fake_bucket3/empty_file.bin', 200, {'Connection': 'close'}, 'foo')
    with webserver.install_http_handler(handler):
        assert gdal.VSIStatL('/vsioss/oss_fake_bucket3/empty_file.bin').size == 3

    # Empty file
    handler = webserver.SequentialHandler()

    def method(request):
        if request.headers['Content-Length'] != '0':
            sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
            request.send_response(400)
            return

        request.send_response(200)
        request.send_header('Content-Length', 0)
        request.end_headers()