Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logging.warning(
"%s was not opened in 'rb' mode, attaching file may fail." % attachment.name)
url = self._get_url('issue/' + str(issue) + '/attachments')
fname = filename
if not fname:
fname = os.path.basename(attachment.name)
if 'MultipartEncoder' not in globals():
method = 'old'
r = self._session.post(
url,
files={
'file': (fname, attachment, 'application/octet-stream')},
headers=CaseInsensitiveDict({'content-type': None, 'X-Atlassian-Token': 'nocheck'}))
else:
method = 'MultipartEncoder'
def file_stream():
return MultipartEncoder(
fields={
'file': (fname, attachment, 'application/octet-stream')})
m = file_stream()
r = self._session.post(
url, data=m, headers=CaseInsensitiveDict({'content-type': m.content_type, 'X-Atlassian-Token': 'nocheck'}), retry_data=file_stream)
js = json_loads(r)
if not js or not isinstance(js, collections.Iterable):
raise JIRAError("Unable to parse JSON: %s" % js)
attachment = Attachment(self._options, self._session, js[0])
if attachment.size == 0:
payload = {'name': name,
'key': key,
'keyEdited': 'false',
# 'projectTemplate': 'com.atlassian.jira-core-project-templates:jira-issuetracking',
# 'permissionScheme': '',
'projectTemplateWebItemKey': template_key,
'projectTemplateModuleKey': template_key,
'lead': assignee,
# 'assigneeType': '2',
}
if self._version[0] > 6:
# JIRA versions before 7 will throw an error if we specify type parameter
payload['type'] = type
headers = CaseInsensitiveDict(
{'Content-Type': 'application/x-www-form-urlencoded'})
r = self._session.post(url, data=payload, headers=headers)
if r.status_code == 200:
r_json = json_loads(r)
return r_json
f = tempfile.NamedTemporaryFile(
suffix='.html', prefix='python-jira-error-create-project-', delete=False)
f.write(r.text)
if self.logging:
logging.error(
"Unexpected result while running create project. Server response saved in %s for further investigation [HTTP response=%s]." % (
f.name, r.status_code))
def _load(self, url, headers=CaseInsensitiveDict(), params=None, path=None):
""" Load a resource.
:type url: str
:type headers: CaseInsensitiveDict
:type params: Optional[Dict[str,str]]
:type path: Optional[str]
"""
r = self._session.get(url, headers=headers, params=params)
try:
j = json_loads(r)
except ValueError as e:
logging.error("%s:\n%s" % (e, r.text))
raise e
if path:
j = j[path]
def _gain_sudo_session(self, options, destination):
url = self._options['server'] + '/secure/admin/WebSudoAuthenticate.jspa'
if not self._session.auth:
self._session.auth = get_netrc_auth(url)
payload = {
'webSudoPassword': self._session.auth[1],
'webSudoDestination': destination,
'webSudoIsPost': 'true'}
payload.update(options)
return self._session.post(
url, headers=CaseInsensitiveDict({'content-type': 'application/x-www-form-urlencoded'}), data=payload)
def _default_headers(self, user_headers):
# result = dict(user_headers)
# result['accept'] = 'application/json'
return CaseInsensitiveDict(
self._options["headers"].items() + user_headers.items()
)