Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_httpverb(self):
"""Test that each HTTP verb works properly when JSON is returned."""
verbs_to_methods = [
('GET', self.client.get),
('POST', self.client.post),
('PUT', self.client.put),
('PATCH', self.client.patch),
('DELETE', self.client.delete),
('HEAD', self.client.head),
('OPTIONS', self.client.options),
]
for verb, method in verbs_to_methods:
url = uritemplate.expand(self.client.url, {'param': 'foo'})
self.adapter.register_uri(verb, url, text='{"success": true}')
response = method(param='foo')
assert response.success
# test named param inference
response = method('foo')
assert response.success
raise UnknownFileType(media_filename)
if not mimeparse.best_match([media_mime_type], ','.join(accept)):
raise UnacceptableMimeTypeError(media_mime_type)
media_upload = MediaFileUpload(media_filename,
mimetype=media_mime_type)
elif isinstance(media_filename, MediaUpload):
media_upload = media_filename
else:
raise TypeError('media_filename must be str or MediaUpload.')
# Check the maxSize
if media_upload.size() is not None and media_upload.size() > maxSize > 0:
raise MediaUploadSizeError("Media larger than: %s" % maxSize)
# Use the media path uri for media uploads
expanded_url = uritemplate.expand(mediaPathUrl, params)
url = _urljoin(self._baseUrl, expanded_url + query)
if media_upload.resumable():
url = _add_query_parameter(url, 'uploadType', 'resumable')
if media_upload.resumable():
# This is all we need to do for resumable, if the body exists it gets
# sent in the first request, otherwise an empty body is sent.
resumable = media_upload
else:
# A non-resumable upload
if body is None:
# This is a simple media upload
headers['content-type'] = media_upload.mimetype()
body = media_upload.getbytes(0, media_upload.size())
url = _add_query_parameter(url, 'uploadType', 'media')
else:
body_value = kwargs.get('body', None)
media_filename = kwargs.get('media_body', None)
if self._developerKey:
actual_query_params['key'] = self._developerKey
model = self._model
# If there is no schema for the response then presume a binary blob.
if 'response' not in methodDesc:
model = RawModel()
headers = {}
headers, params, query, body = model.request(headers,
actual_path_params, actual_query_params, body_value)
expanded_url = uritemplate.expand(pathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
resumable = None
multipart_boundary = ''
if media_filename:
# Ensure we end up with a valid MediaUpload object.
if isinstance(media_filename, basestring):
(media_mime_type, encoding) = mimetypes.guess_type(media_filename)
if media_mime_type is None:
raise UnknownFileType(media_filename)
if not mimeparse.best_match([media_mime_type], ','.join(accept)):
raise UnacceptableMimeTypeError(media_mime_type)
media_upload = MediaFileUpload(media_filename, media_mime_type)
elif isinstance(media_filename, MediaUpload):
media_upload = media_filename
simplebinding['_count'] = len(resp)
satisfied_count = 0
total_count = len(resp)
result_list = []
failure_message_template = rule['showfail'] or rule['show']
for binding in resp:
satisfied = True
failmsg = failure_message_template
simplebinding = constraintbinding.copy()
for k in binding:
if not isinstance(k,rdflib.BNode):
simplebinding[str(k)] = unicode(binding[k])
simplebinding['_count'] = len(resp)
# Do the required test
if aggregates:
fileref = uritemplate.expand(aggregates, simplebinding)
fileuri = rometa.getComponentUri(fileref)
simplebinding.update({'_fileref': fileref, '_fileuri': fileuri})
log.debug("evalQueryTest RO aggregates %s (%s)"%(fileref, str(fileuri)))
satisfied = rometa.roManifestContains( (rometa.getRoUri(), ORE.aggregates, fileuri) )
failmsg = failmsg or "Aggregates %(_fileref)s"
if islive:
fileref = uritemplate.expand(islive, simplebinding)
fileuri = rometa.getComponentUri(fileref)
simplebinding.update({'_fileref': fileref, '_fileuri': fileuri})
log.debug("evalQueryTest RO isLive %s (%s)"%(fileref, str(fileuri)))
satisfied = isLiveUri(fileuri)
failmsg = failmsg or "Accessible %(_fileref)s"
if exists:
existsparams = (
{ 'querybase': str(rometa.getRoUri())
, 'queryverb': "ASK"
raise UnknownFileType(media_filename)
if not mimeparse.best_match([media_mime_type], ','.join(accept)):
raise UnacceptableMimeTypeError(media_mime_type)
media_upload = MediaFileUpload(media_filename,
mimetype=media_mime_type)
elif isinstance(media_filename, MediaUpload):
media_upload = media_filename
else:
raise TypeError('media_filename must be str or MediaUpload.')
# Check the maxSize
if maxSize > 0 and media_upload.size() > maxSize:
raise MediaUploadSizeError("Media larger than: %s" % maxSize)
# Use the media path uri for media uploads
expanded_url = uritemplate.expand(mediaPathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
if media_upload.resumable():
url = _add_query_parameter(url, 'uploadType', 'resumable')
if media_upload.resumable():
# This is all we need to do for resumable, if the body exists it gets
# sent in the first request, otherwise an empty body is sent.
resumable = media_upload
else:
# A non-resumable upload
if body is None:
# This is a simple media upload
headers['content-type'] = media_upload.mimetype()
body = media_upload.getbytes(0, media_upload.size())
url = _add_query_parameter(url, 'uploadType', 'media')
else:
def add_privilege(self, user, priv):
data = {'owner': self.owner['username'], 'slug': self.name, 'user': user}
url = uritemplate.expand('https://bitbucket.org/api/1.0/privileges/{owner}/{slug}/{user}', data)
return self.put(url, data=priv)
def _get_url(url, path_params):
"""
Given a templated URL and some parameters that have been provided,
expand the URL.
"""
if path_params:
return uritemplate.expand(url, path_params)
return url
raise UnacceptableMimeTypeError(media_mime_type)
media_upload = MediaFileUpload(media_filename, media_mime_type)
elif isinstance(media_filename, MediaUpload):
media_upload = media_filename
else:
raise TypeError('media_filename must be str or MediaUpload.')
# Check the maxSize
if maxSize > 0 and media_upload.size() > maxSize:
raise MediaUploadSizeError("Media larger than: %s" % maxSize)
# Use the media path uri for media uploads
if media_upload.resumable():
expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
else:
expanded_url = uritemplate.expand(mediaPathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
if media_upload.resumable():
# This is all we need to do for resumable, if the body exists it gets
# sent in the first request, otherwise an empty body is sent.
resumable = media_upload
else:
# A non-resumable upload
if body is None:
# This is a simple media upload
headers['content-type'] = media_upload.mimetype()
body = media_upload.getbytes(0, media_upload.size())
else:
# This is a multipart/related upload.
msgRoot = MIMEMultipart('related')
# msgRoot should not write out it's own headers
touched = get_touched_pullrequest_files(pull_request, github_auth, app_logger)
commit_sha = pull_request['head']['sha']
for filename in touched:
if relpath(filename, 'sources').startswith('..'):
# Skip things outside of sources directory.
continue
if splitext(filename)[1] != '.json':
# Skip non-JSON files.
continue
contents_url = pull_request['head']['repo']['contents_url'] + '{?ref}'
contents_url = expand_uri(contents_url, dict(path=filename, ref=commit_sha))
app_logger.debug('Contents URL {}'.format(contents_url))
got = get(contents_url, auth=github_auth)
contents = got.json()
if got.status_code not in range(200, 299):
app_logger.warning('Skipping {} - {}'.format(filename, got.status_code))
continue
if contents['encoding'] != 'base64':
raise ValueError('Unrecognized encoding "{encoding}"'.format(**contents))
app_logger.debug('Contents SHA {sha}'.format(**contents))
files[filename] = contents['content'], contents['sha']
return files