Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
formatter = _logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
for handler in _LOG.handlers: # type: _logging.Handler
handler.setFormatter(formatter)
if not getattr(args, 'func', None):
parser.error('too few arguments')
try:
if not args.config:
args.config = None
feeds = _feeds.Feeds(datafile=args.data, configfiles=args.config)
if args.func != _command.new:
lock = args.func not in [_command.list, _command.opmlexport]
feeds.load(lock=lock)
args.func(feeds=feeds, args=args)
except _error.RSS2EmailError as e:
e.log()
if _logging.ERROR - 10 * args.verbose < _logging.DEBUG:
raise # don't mask the traceback
_sys.exit(1)
port = config.getint(section, 'smtp-port')
_LOG.debug('sending message to {} via {}'.format(recipient, server))
ssl = config.getboolean(section, 'smtp-ssl')
smtp_auth = config.getboolean(section, 'smtp-auth')
try:
if ssl or smtp_auth:
context = _ssl.create_default_context()
if ssl:
smtp = _smtplib.SMTP_SSL(host=server, port=port, context=context)
else:
smtp = _smtplib.SMTP(host=server, port=port)
except KeyboardInterrupt:
raise
except Exception as e:
raise _error.SMTPConnectionError(server=server) from e
if smtp_auth:
username = config.get(section, 'smtp-username')
password = config.get(section, 'smtp-password')
try:
if not ssl:
smtp.starttls(context=context)
smtp.login(username, password)
except KeyboardInterrupt:
raise
except Exception as e:
raise _error.SMTPAuthenticationError(
server=server, username=username)
smtp.send_message(message, sender, recipient.split(','))
smtp.quit()
('X-RSS-Feed', self.url),
('X-RSS-ID', guid),
('X-RSS-URL', self._get_entry_link(entry)),
('X-RSS-TAGS', self._get_entry_tags(entry)),
))
# remove empty tags, etc.
keys = {k for k, v in extra_headers.items() if v is None}
for key in keys:
extra_headers.pop(key)
if self.bonus_header:
for header in self.bonus_header.splitlines():
if ':' in header:
key,value = header.split(':', 1)
extra_headers[key.strip()] = value.strip()
else:
_LOG.warning(
'malformed bonus-header: {}'.format(
self.bonus_header))
content = self._get_entry_content(entry)
try:
content = self._process_entry_content(
entry=entry, content=content, subject=subject)
except _error.ProcessingError as e:
e.parsed = parsed
raise
message = _email.get_message(
sender=sender,
recipient=self.to,
subject=subject,
body=content['value'],
content_type=content['type'].split('/', 1)[1],
... content_type='plain',
... extra_headers={'Approved': 'joe@bob.org'})
>>> print(message.as_string()) # doctest: +REPORT_UDIFF
MIME-Version: 1.0
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
From: John
To: =?utf-8?b?zpbOtc+Nz4I=?=
Subject: Testing
Approved: joe@bob.org
Hello, world!
"""
if config is None:
config = _config.CONFIG
if section not in config.sections():
section = 'DEFAULT'
encodings = [
x.strip() for x in config.get(section, 'encodings').split(',')]
# Split real name (which is optional) and email address parts
sender_name,sender_addr = _parseaddr(sender)
recipient_list = []
for recipient_name, recipient_addr in _getaddresses([recipient]):
recipient_encoding = guess_encoding(recipient_name, encodings)
recipient_name = str(_Header(recipient_name, recipient_encoding).encode())
recipient_addr.encode('ascii')
recipient_list.append(_formataddr((recipient_name, recipient_addr)))
sender_encoding = guess_encoding(sender_name, encodings)
recipient_encoding = guess_encoding(recipient_name, encodings)
def sendmail_send(sender, recipient, message, config=None, section='DEFAULT'):
if config is None:
config = _config.CONFIG
message_bytes = _flatten(message)
sendmail = config.get(section, 'sendmail')
sender_name,sender_addr = _parseaddr(sender)
_LOG.debug(
'sending message to {} via {}'.format(recipient, sendmail))
try:
p = _subprocess.Popen(
[sendmail, '-F', sender_name, '-f', sender_addr, recipient],
stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE)
stdout,stderr = p.communicate(message_bytes)
status = p.wait()
if status:
raise _error.SendmailError(
status=status, stdout=stdout, stderr=stderr)
except Exception as e:
help='path for exported OPML (defaults to stdout)')
args = parser.parse_args(*args, **kwargs)
if args.verbose:
LOG.setLevel(max(_logging.DEBUG, _logging.ERROR - 10 * args.verbose))
try:
if not args.config:
args.config = None
feeds = Feeds(datafile=args.data, configfiles=args.config)
if args.func != cmd_new:
lock = args.func not in [cmd_list, cmd_opmlexport]
feeds.load(lock=lock)
args.func(feeds=feeds, args=args)
except RSS2EmailError as e:
e.log()
_sys.exit(1)
def sendmail_send(sender, recipient, message, config=None, section='DEFAULT'):
if config is None:
config = CONFIG
LOG.debug(
'sending message to {} via /usr/sbin/sendmail'.format(recipient))
try:
p = _subprocess.Popen(
['/usr/sbin/sendmail', recipient],
stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE)
stdout,stderr = p.communicate(message.as_string().encode('ascii'))
status = p.wait()
if status:
raise SendmailError(status=status, stdout=stdout, stderr=stderr)
except Exception as e:
raise SendmailError() from e
def sendmail_send(sender, recipient, message, config=None, section='DEFAULT'):
if config is None:
config = CONFIG
LOG.debug(
'sending message to {} via /usr/sbin/sendmail'.format(recipient))
try:
p = _subprocess.Popen(
['/usr/sbin/sendmail', recipient],
stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE)
stdout,stderr = p.communicate(message.as_string().encode('ascii'))
status = p.wait()
if status:
raise SendmailError(status=status, stdout=stdout, stderr=stderr)
except Exception as e:
raise SendmailError() from e
for line in file:
fields = line.strip().split()
if not len(fields):
continue
url = fields[0]
if url[0] == '#':
logging.debug ("Skipping commented out: %s" % url[1:])
continue
settings = None
to = None
if (len(fields) > 1):
to = fields[1]
# yes, we store the url twice. not the most efficient, but like this
# you can do fast lookups and have a normal Feed object
logging.debug ("loading: %s" % url)
feeds[url] = Feed(url, to)
except IOError, e:
logging.critical ("Feedfile could not be opened: %s", e)
sys.exit(1)
# Then, add info about where we left off (seen entries), if available
if os.path.exists(FEEDS_STATE):
try:
statefile = open(FEEDS_STATE, 'r')
except IOError, e:
logging.critical ( "Feed state file %s could not be opened: %s", FEEDS_STATE, e)
sys.exit(1)
# a dictionary with key url, value: a dict
feeds_state = pickle.load(statefile)
for url, seen in feeds_state.items():
if url in feeds:
if config is None:
config = CONFIG
server = CONFIG.get(section, 'smtp-server')
LOG.debug('sending message to {} via {}'.format(recipient, server))
ssl = CONFIG.getboolean(section, 'smtp-ssl')
if ssl:
smtp = _smtplib.SMTP_SSL()
else:
smtp = _smtplib.SMTP()
smtp.ehlo()
try:
smtp.connect(SMTP_SERVER)
except KeyboardInterrupt:
raise
except Exception as e:
raise SMTPConnectionError(server=server) from e
if CONFIG.getboolean(section, 'smtp-auth'):
username = CONFIG.get(section, 'smtp-username')
password = CONFIG.get(section, 'smtp-password')
try:
if not ssl:
smtp.starttls()
smtp.login(username, password)
except KeyboardInterrupt:
raise
except Exception as e:
raise SMTPAuthenticationError(server=server, username=username)
smtp.send_message(message, sender, [recipient])
smtp.quit()