Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
('X-RSS-Feed', self.url),
('X-RSS-ID', guid),
('X-RSS-URL', self._get_entry_link(entry)),
('X-RSS-TAGS', self._get_entry_tags(entry)),
))
# remove empty tags, etc.
keys = {k for k, v in extra_headers.items() if v is None}
for key in keys:
extra_headers.pop(key)
if self.bonus_header:
for header in self.bonus_header.splitlines():
if ':' in header:
key,value = header.split(':', 1)
extra_headers[key.strip()] = value.strip()
else:
_LOG.warning(
'malformed bonus-header: {}'.format(
self.bonus_header))
content = self._get_entry_content(entry)
try:
content = self._process_entry_content(
entry=entry, content=content, subject=subject)
except _error.ProcessingError as e:
e.parsed = parsed
raise
message = _email.get_message(
sender=sender,
recipient=self.to,
subject=subject,
body=content['value'],
content_type=content['type'].split('/', 1)[1],
def log(self):
super(IMAPConnectionError, self).log()
_LOG.warning(
'check your config file to confirm that imap-server and other '
'mail server settings are configured properly')
if hasattr(self.__cause__, 'reason'):
_LOG.error('reason: {}'.format(self.__cause__.reason))
def log(self):
super(ProcessingError, self).log()
if type(self) == ProcessingError: # not a more specific subclass
_LOG.warning(
'=== rss2email encountered a problem with this feed ===')
_LOG.warning(
'=== See the rss2email FAQ at {} for assistance ==='.format(
__url__))
_LOG.warning(
'=== If this occurs repeatedly, send this to {} ==='.format(
__email__))
_LOG.warning(
'error: {} {}'.format(
self.parsed.get('bozo_exception', "can't process"),
self.feed.url))
_LOG.warning(_pprint.pformat(self.parsed))
_LOG.warning('rss2email {}'.format(__version__))
_LOG.warning('feedparser {}'.format(_feedparser.__version__))
_LOG.warning('html2text {}'.format(_html2text.__version__))
_LOG.warning('Python {}'.format(_sys.version))
_LOG.error('{}: {}'.format(exc, self))
warned = True
elif isinstance(exc, _feedparser.zlib.error):
_LOG.error('broken compression: {}'.format(self))
warned = True
elif isinstance(exc, (IOError, AttributeError)):
_LOG.error('{}: {}'.format(exc, self))
warned = True
elif isinstance(exc, KeyboardInterrupt):
raise exc
elif isinstance(exc, _sax.SAXParseException):
_LOG.error('sax parsing error: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and
isinstance(exc, _feedparser.CharacterEncodingOverride)):
_LOG.warning(
'incorrectly declared encoding: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and isinstance(exc, _feedparser.NonXMLContentType)):
_LOG.warning('non XML Content-Type: {}: {}'.format(exc, self))
warned = True
elif parsed.bozo or exc:
if exc is None:
exc = "can't process"
_LOG.error('processing error: {}: {}'.format(exc, self))
warned = True
if (not warned and
status in [200, 302] and
not parsed.entries and
not version):
raise _error.ProcessingError(parsed=parsed, feed=self)
b'From: John '
b'To: =?utf-8?b?zpbOtc+Nz4I=?= '
b'Subject: Homage'
b'Content-Transfer-Encoding: 8bit'
b''
b"\x00Y\x00o\x00u\x00'\x00r\x00e\x00 \x00g\x00r\x00e\x00a\x00t\x00,\x00 \x00\x96\x03\xb5\x03\xcd\x03\xc2\x03!\x00\n\x00"
"""
bytesio = _io.BytesIO()
# TODO: use policies argument instead of policy set in `message`
# see https://docs.python.org/3.5/library/email.generator.html?highlight=bytesgenerator#email.generator.BytesGenerator
generator = _BytesGenerator(bytesio)
try:
generator.flatten(message)
except UnicodeEncodeError as e:
# HACK: work around deficiencies in BytesGenerator
_LOG.warning(e)
b = message.as_string().encode(str(message.get_charset()))
m = _email.message_from_bytes(b)
if not m:
raise
h = {k:_decode_header(v) for k,v in m.items()}
head = {k:_decode_header(v) for k,v in message.items()}
body = str(m.get_payload(decode=True), str(m.get_charsets()[0]))
if (h == head and body == message.get_payload()):
return b
raise
else:
return bytesio.getvalue()
def log(self):
super(NoToEmailAddress, self).log()
_LOG.warning(
"please run 'r2e email emailaddress' or "
"'r2e add name url emailaddress'.")
def _check_for_errors(self, parsed):
warned = False
status = getattr(parsed, 'status', 200)
_LOG.debug('HTTP status {}'.format(status))
if status == 301:
_LOG.info('redirect {} from {} to {}'.format(
self.name, self.url, parsed['url']))
self.url = parsed['url']
elif status not in [200, 302, 304, 307]:
raise _error.HTTPError(status=status, feed=self)
http_headers = parsed.get('headers', {})
if http_headers:
_LOG.debug('HTTP headers: {}'.format(http_headers))
if not http_headers:
_LOG.warning('could not get HTTP headers: {}'.format(self))
warned = True
else:
if 'html' in http_headers.get('content-type', 'rss'):
_LOG.warning('looks like HTML: {}'.format(self))
warned = True
if http_headers.get('content-length', '1') == '0':
_LOG.warning('empty page: {}'.format(self))
warned = True
version = parsed.get('version', None)
if version:
_LOG.debug('feed version {}'.format(version))
else:
_LOG.debug('unrecognized version: {}'.format(self))
warned = True
def log(self):
super(SMTPConnectionError, self).log()
_LOG.warning(
'check your config file to confirm that smtp-server and other '
'mail server settings are configured properly')
if hasattr(self.__cause__, 'reason'):
_LOG.error('reason: {}'.format(self.__cause__.reason))