Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _exclude_logs(self):
"""
Iterate through log file stores and remove ones that match exclude rules.
"""
# Take comma-separated string of pathname patterns and separate them into individual patterns
exclude_rules = context.app_config.get('nginx', {}).get('exclude_logs', '').split(',')
for rule in [x for x in exclude_rules if x]: # skip potentially empty rules due to improper formatting
# access logs
for excluded_file in glib(self.access_logs.keys(), rule):
del self.access_logs[excluded_file]
# error logs
for excluded_file in glib(self.error_logs.keys(), rule):
del self.error_logs[excluded_file]
def enabled_extension(modname):
# not defined in the config
if modname not in context.app_config.get('extensions', {}):
return False
# not enabled
if not boolean(context.app_config.get('extensions', {}).get(modname, False)):
return False
# not enabled in backend
if not context.capabilities[modname]:
return False
return True
Tries to get alive api urls
There are two types of api urls: internal and external
- internal are for the agent and usually they have the localhost ip in address
- external are for the browsers and usually they have a normal server name
Returns a tuple of str or Nones - (external_url, internal_url)
Even if external api url is not responding (cannot be accesible from the host)
we should return it to show in our UI
:return: (str or None, str or None)
"""
internal_urls = self.config.api_internal_urls
external_urls = self.config.api_external_urls
if 'api' in context.app_config.get('nginx', {}):
predefined_uri = context.app_config['nginx']['api']
internal_urls.append(http.resolve_uri(predefined_uri))
internal_api_url = self.__get_alive_status(internal_urls, json=True, what='api internal')
if internal_api_url:
self.eventd.event(
level=INFO,
message='nginx internal api detected, %s' % internal_api_url
)
external_api_url = self.__get_alive_status(external_urls, json=True, what='api external')
if len(self.config.api_external_urls) > 0:
if not external_api_url:
external_api_url = self.config.api_external_urls[0]
self.eventd.event(
def _setup_connection_args(self):
connection_args = context.app_config.get('mysql', None)
if connection_args is not None:
if 'port' in connection_args:
connection_args['port'] = int(connection_args['port'])
self.connection_args = connection_args
def get_alive_stub_status_url(self):
"""
Tries to get alive stub_status url
Records some events about it
:return: str stub_status url
"""
urls_to_check = self.config.stub_status_urls
if 'stub_status' in context.app_config.get('nginx', {}):
predefined_uri = context.app_config['nginx']['stub_status']
urls_to_check.append(http.resolve_uri(predefined_uri))
stub_status_url = self.__get_alive_status(urls_to_check, what='stub status')
if stub_status_url:
# Send stub detected event
self.eventd.event(
level=INFO,
message='nginx stub_status detected, %s' % stub_status_url
)
else:
self.eventd.event(
level=INFO,
message='nginx stub_status not found in nginx config'
)
return stub_status_url
Tries to get alive plus urls
There are two types of plus status urls: internal and external
- internal are for the agent and usually they have the localhost ip in address
- external are for the browsers and usually they have a normal server name
Returns a tuple of str or Nones - (external_url, internal_url)
Even if external status url is not responding (cannot be accesible from the host)
we should return it to show in our UI
:return: (str or None, str or None)
"""
internal_urls = self.config.plus_status_internal_urls
external_urls = self.config.plus_status_external_urls
if 'plus_status' in context.app_config.get('nginx', {}):
predefined_uri = context.app_config['nginx']['plus_status']
internal_urls.append(http.resolve_uri(predefined_uri))
internal_status_url = self.__get_alive_status(internal_urls, json=True, what='plus status internal')
if internal_status_url:
self.eventd.event(
level=INFO,
message='nginx internal plus_status detected, %s' % internal_status_url
)
external_status_url = self.__get_alive_status(external_urls, json=True, what='plus status external')
if len(self.config.plus_status_external_urls) > 0:
if not external_status_url:
external_status_url = self.config.plus_status_external_urls[0]
self.eventd.event(