Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
assert isinstance(force, bool)
assert isinstance(refresh, bool)
pg_id = process_group.id
if refresh or force:
target = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id)
else:
target = process_group
try:
return nipyapi.nifi.ProcessGroupsApi().remove_process_group(
id=target.id,
version=target.revision.version,
client_id=target.revision.client_id
)
except nipyapi.nifi.rest.ApiException as e:
if force:
# Stop, drop, and roll.
purge_process_group(target, stop=True)
# Remove inbound connections
for con in list_all_connections():
if pg_id in [con.destination_group_id, con.source_group_id]:
delete_connection(con)
# Stop all Controller Services
for x in list_all_controllers(process_group.id):
delete_controller(x, True)
# Remove templates
for template in nipyapi.templates.list_all_templates(native=False):
if target.id == template.template.group_id:
nipyapi.templates.delete_template(template.id)
# have to refresh revision after changes
target = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id)
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is provided
# in serialized form
elif isinstance(body, str):
request_body = body
r = self.pool_manager.request(method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided arguments.
Please check that your arguments match declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object".format(string)
)
resource = resource[1:] if resource.startswith('/') else resource
log.info("Getting %s Policy for %s:%s:%s", service, action,
resource, str(r_id))
if service == 'nifi':
pol_api = nipyapi.nifi.PoliciesApi()
else:
pol_api = nipyapi.registry.PoliciesApi()
try:
nipyapi.utils.bypass_slash_encoding(service, True)
response = pol_api.get_access_policy_for_resource(
action=action,
resource='/'.join([resource, r_id]) if r_id else resource
)
nipyapi.utils.bypass_slash_encoding(service, False)
return response
except nipyapi.nifi.rest.ApiException as e:
if 'Unable to find access policy' in e.body:
log.info("Access policy not found")
if not auto_create:
return None
return nipyapi.security.create_access_policy(
resource, action, r_id, service
)
log.info("Unexpected Error, raising...")
raise ValueError(e.body)
finally:
nipyapi.utils.bypass_slash_encoding(service, False)
def rest_exceptions():
"""Simple exception wrapper for Rest Exceptions"""
try:
yield
except (nipyapi.nifi.rest.ApiException,
nipyapi.registry.rest.ApiException) as e:
raise ValueError(e.body)
def __deserialize_datatime(self, string):
"""
Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason=(
"Failed to parse `{0}` into a datetime object"
.format(string)
)
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided arguments.
Please check that your arguments match declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
raise ApiException(http_resp=r)
return r
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
raise ApiException(http_resp=r)
return r