How to use the storage.unit_tests.test_blob._Connection function in storage

To help you get started, we’ve selected a few storage examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_upload_from_string_w_bytes(self):
        from six.moves.http_client import OK
        from six.moves.urllib.parse import parse_qsl
        from six.moves.urllib.parse import urlsplit
        from google.cloud.streaming import http_wrapper

        BLOB_NAME = 'blob-name'
        UPLOAD_URL = 'http://example.com/upload/name/key'
        DATA = b'ABCDEF'
        loc_response = {'status': OK, 'location': UPLOAD_URL}
        chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
                           'range': 'bytes 0-4'}
        chunk2_response = {'status': OK}
        connection = _Connection(
            (loc_response, '{}'),
            (chunk1_response, ''),
            (chunk2_response, ''),
        )
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(BLOB_NAME, bucket=bucket)
        blob._CHUNK_SIZE_MULTIPLE = 1
        blob.chunk_size = 5
        blob.upload_from_string(DATA)
        rq = connection.http._requested
        self.assertEqual(len(rq), 1)
        self.assertEqual(rq[0]['method'], 'POST')
        uri = rq[0]['uri']
        scheme, netloc, path, qs, _ = urlsplit(uri)
        self.assertEqual(scheme, 'http')
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_create_resumable_upload_session(self):
        from six.moves.http_client import OK
        from six.moves.urllib.parse import parse_qsl
        from six.moves.urllib.parse import urlsplit

        BLOB_NAME = 'blob-name'
        UPLOAD_URL = 'http://example.com/upload/name/key'
        loc_response = {'status': OK, 'location': UPLOAD_URL}
        connection = _Connection(
            (loc_response, '{}'),
        )
        client = _Client(connection)
        bucket = _Bucket(client=client)
        blob = self._make_one(BLOB_NAME, bucket=bucket)

        resumable_url = blob.create_resumable_upload_session()

        self.assertEqual(resumable_url, UPLOAD_URL)

        rq = connection.http._requested
        self.assertEqual(len(rq), 1)
        self.assertEqual(rq[0]['method'], 'POST')

        uri = rq[0]['uri']
        scheme, netloc, path, qs, _ = urlsplit(uri)
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_upload_from_file_w_slash_in_name(self):
        from six.moves.http_client import OK
        from six.moves.urllib.parse import parse_qsl
        from six.moves.urllib.parse import urlsplit
        from google.cloud._testing import _NamedTemporaryFile
        from google.cloud.streaming import http_wrapper

        BLOB_NAME = 'parent/child'
        UPLOAD_URL = 'http://example.com/upload/name/parent%2Fchild'
        DATA = b'ABCDEF'
        loc_response = {'status': OK, 'location': UPLOAD_URL}
        chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
                           'range': 'bytes 0-4'}
        chunk2_response = {'status': OK}
        connection = _Connection(
            (loc_response, '{}'),
            (chunk1_response, ''),
            (chunk2_response, ''),
        )
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(BLOB_NAME, bucket=bucket)
        blob._CHUNK_SIZE_MULTIPLE = 1
        blob.chunk_size = 5

        with _NamedTemporaryFile() as temp:
            with open(temp.name, 'wb') as file_obj:
                file_obj.write(DATA)
            with open(temp.name, 'rb') as file_obj:
                blob.upload_from_file(file_obj, rewind=True)
                self.assertEqual(file_obj.tell(), len(DATA))
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_download_to_file_wo_media_link(self):
        from io import BytesIO
        from six.moves.http_client import OK
        from six.moves.http_client import PARTIAL_CONTENT

        BLOB_NAME = 'blob-name'
        MEDIA_LINK = 'http://example.com/media/'
        chunk1_response = {'status': PARTIAL_CONTENT,
                           'content-range': 'bytes 0-2/6'}
        chunk2_response = {'status': OK,
                           'content-range': 'bytes 3-5/6'}
        connection = _Connection(
            (chunk1_response, b'abc'),
            (chunk2_response, b'def'),
        )
        # Only the 'reload' request hits on this side:  the others are done
        # through the 'http' object.
        reload_response = {'status': OK, 'content-type': 'application/json'}
        connection._responses = [(reload_response, {"mediaLink": MEDIA_LINK})]
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(BLOB_NAME, bucket=bucket)
        fh = BytesIO()
        blob.download_to_file(fh)
        self.assertEqual(fh.getvalue(), b'abcdef')
        self.assertEqual(blob.media_link, MEDIA_LINK)
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
SOURCE_KEY_HASH).rstrip().decode('ascii')
        DEST_KEY = b'90123456789012345678901234567890'  # 32 bytes
        DEST_KEY_B64 = base64.b64encode(DEST_KEY).rstrip().decode('ascii')
        DEST_KEY_HASH = hashlib.sha256(DEST_KEY).digest()
        DEST_KEY_HASH_B64 = base64.b64encode(
            DEST_KEY_HASH).rstrip().decode('ascii')
        BLOB_NAME = 'blob'
        TOKEN = 'TOKEN'
        RESPONSE = {
            'totalBytesRewritten': 42,
            'objectSize': 42,
            'done': True,
            'resource': {'etag': 'DEADBEEF'},
        }
        response = ({'status': OK}, RESPONSE)
        connection = _Connection(response)
        client = _Client(connection)
        bucket = _Bucket(client=client)
        source = self._make_one(
            BLOB_NAME, bucket=bucket, encryption_key=SOURCE_KEY)
        dest = self._make_one(BLOB_NAME, bucket=bucket,
                              encryption_key=DEST_KEY)

        token, rewritten, size = dest.rewrite(source, token=TOKEN)

        self.assertIsNone(token)
        self.assertEqual(rewritten, 42)
        self.assertEqual(size, 42)

        kw = connection._requested
        self.assertEqual(len(kw), 1)
        self.assertEqual(kw[0]['method'], 'POST')
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_generate_signed_url_w_slash_in_name(self):
        BLOB_NAME = 'parent/child'
        EXPIRATION = '2014-10-16T20:34:37.000Z'
        connection = _Connection()
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(BLOB_NAME, bucket=bucket)
        URI = ('http://example.com/abucket/a-blob-name?Signature=DEADBEEF'
               '&Expiration=2014-10-16T20:34:37.000Z')

        SIGNER = _Signer()
        with mock.patch('google.cloud.storage.blob.generate_signed_url',
                        new=SIGNER):
            signed_url = blob.generate_signed_url(EXPIRATION)
            self.assertEqual(signed_url, URI)

        EXPECTED_ARGS = (_Connection.credentials,)
        EXPECTED_KWARGS = {
            'api_access_endpoint': 'https://storage.googleapis.com',
            'expiration': EXPIRATION,
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_compose_wo_content_type_set(self):
        SOURCE_1 = 'source-1'
        SOURCE_2 = 'source-2'
        DESTINATION = 'destinaton'
        connection = _Connection()
        client = _Client(connection)
        bucket = _Bucket(client=client)
        source_1 = self._make_one(SOURCE_1, bucket=bucket)
        source_2 = self._make_one(SOURCE_2, bucket=bucket)
        destination = self._make_one(DESTINATION, bucket=bucket)

        with self.assertRaises(ValueError):
            destination.compose(sources=[source_1, source_2])
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_rewrite_other_bucket_other_name_no_encryption_partial(self):
        from six.moves.http_client import OK

        SOURCE_BLOB = 'source'
        DEST_BLOB = 'dest'
        DEST_BUCKET = 'other-bucket'
        TOKEN = 'TOKEN'
        RESPONSE = {
            'totalBytesRewritten': 33,
            'objectSize': 42,
            'done': False,
            'rewriteToken': TOKEN,
            'resource': {'etag': 'DEADBEEF'},
        }
        response = ({'status': OK}, RESPONSE)
        connection = _Connection(response)
        client = _Client(connection)
        source_bucket = _Bucket(client=client)
        source_blob = self._make_one(SOURCE_BLOB, bucket=source_bucket)
        dest_bucket = _Bucket(client=client, name=DEST_BUCKET)
        dest_blob = self._make_one(DEST_BLOB, bucket=dest_bucket)

        token, rewritten, size = dest_blob.rewrite(source_blob)

        self.assertEqual(token, TOKEN)
        self.assertEqual(rewritten, 33)
        self.assertEqual(size, 42)

        kw = connection._requested
        self.assertEqual(len(kw), 1)
        self.assertEqual(kw[0]['method'], 'POST')
        PATH = '/b/name/o/%s/rewriteTo/b/%s/o/%s' % (
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
content_type_arg=None,
                                          expected_content_type=None):
        from six.moves.http_client import OK
        from six.moves.urllib.parse import parse_qsl
        from six.moves.urllib.parse import urlsplit
        from google.cloud._testing import _NamedTemporaryFile
        from google.cloud.streaming import http_wrapper

        BLOB_NAME = 'blob-name'
        UPLOAD_URL = 'http://example.com/upload/name/key'
        DATA = b'ABCDEF'
        loc_response = {'status': OK, 'location': UPLOAD_URL}
        chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
                           'range': 'bytes 0-4'}
        chunk2_response = {'status': OK}
        connection = _Connection(
            (loc_response, '{}'),
            (chunk1_response, ''),
            (chunk2_response, ''),
        )
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(BLOB_NAME, bucket=bucket,
                              properties=properties)
        blob._CHUNK_SIZE_MULTIPLE = 1
        blob.chunk_size = 5

        with _NamedTemporaryFile(suffix='.jpeg') as temp:
            with open(temp.name, 'wb') as file_obj:
                file_obj.write(DATA)
            blob.upload_from_filename(temp.name,
                                      content_type=content_type_arg)
github googleapis / google-cloud-python / storage / unit_tests / test_blob.py View on Github external
def test_exists_miss(self):
        from six.moves.http_client import NOT_FOUND

        NONESUCH = 'nonesuch'
        not_found_response = ({'status': NOT_FOUND}, b'')
        connection = _Connection(not_found_response)
        client = _Client(connection)
        bucket = _Bucket(client)
        blob = self._make_one(NONESUCH, bucket=bucket)
        self.assertFalse(blob.exists())