Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def alert_stream(self, reset_event, kill_event):
"""Open event stream."""
_LOGGING.debug('Stream Thread Started: %s, %s', self.name, self.cam_id)
start_event = False
parse_string = ""
fail_count = 0
url = '%s/ISAPI/Event/notification/alertStream' % self.root_url
# pylint: disable=too-many-nested-blocks
while True:
try:
stream = self.hik_request.get(url, stream=True,
timeout=(CONNECT_TIMEOUT,
READ_TIMEOUT))
if stream.status_code == requests.codes.not_found:
# Try alternate URL for stream
url = '%s/Event/notification/alertStream' % self.root_url
stream = self.hik_request.get(url, stream=True)
if stream.status_code != requests.codes.ok:
raise ValueError('Connection unsucessful.')
else:
_LOGGING.debug('%s Connection Successful.', self.name)
fail_count = 0
self.watchdog.start()
for line in stream.iter_lines():
# _LOGGING.debug('Processing line from %s', self.name)
# filter out keep-alive new lines
if line: