Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_from_config(self, config=None):
"Restore configured attributes"
if config is None:
config = _config.CONFIG
self.config = config
if self.section in self.config:
data = self.config[self.section]
else:
data = self.config['DEFAULT']
keys = sorted(data.keys())
expected = sorted(self._configured_attribute_translations.values())
if keys != expected:
for key in expected:
if (key not in keys and
key not in self._non_default_configured_attributes):
raise _error.InvalidFeedConfig(
setting=key, feed=self,
message='missing configuration key: {}'.format(key))
for key in keys:
if key not in expected:
def smtp_send(sender, recipient, message, config=None, section='DEFAULT'):
if config is None:
config = _config.CONFIG
server = config.get(section, 'smtp-server')
# Adding back in support for 'server:port'
pos = server.find(':')
if 0 <= pos:
# Strip port out of server name
port = int(server[pos+1:])
server = server[:pos]
else:
port = config.getint(section, 'smtp-port')
_LOG.debug('sending message to {} via {}'.format(recipient, server))
ssl = config.getboolean(section, 'smtp-ssl')
smtp_auth = config.getboolean(section, 'smtp-auth')
try:
if ssl or smtp_auth:
context = _ssl.create_default_context()
def __init__(self, configfiles=None, datafile=None, config=None):
super(Feeds, self).__init__()
if configfiles is None:
configfiles = self._get_configfiles()
self.configfiles = configfiles
if datafile is None:
datafile = self._get_datafile()
self.datafile = _os.path.realpath(datafile)
if config is None:
config = _config.CONFIG
self.config = config
self._datafile_lock = None
def maildir_send(message, config=None, section='DEFAULT'):
if config is None:
config = _config.CONFIG
path = config.get(section, 'maildir-path')
mailbox = config.get(section, 'maildir-mailbox')
maildir = _mailbox.Maildir(_os.path.join(path, mailbox))
maildir.add(message)
links = []
# Get the link
link = entry['link']
if link:
links.append(link)
for enclosure in entry['enclosures']:
links.append(enclosure['href'])
if not links:
return message
# Remove the redirect and modify the content
timeout = rss2email.config.CONFIG['DEFAULT'].getint('feed-timeout')
proxy = rss2email.config.CONFIG['DEFAULT']['proxy']
for link in links:
try:
request = urllib.request.Request(link)
request.add_header('User-agent', feed.user_agent)
direct_link = urllib.request.urlopen(request).geturl()
except Exception as e:
LOG.warning('could not follow redirect for {}: {}'.format(
link, e))
continue
content = re.sub(re.escape(link), direct_link, content)
# clear CTE and set message. It can be important to clear the CTE
# before setting the payload, since the payload is only re-encoded
# if CTE is not already set.
del message['Content-Transfer-Encoding']
message.set_payload(content, charset=encoding)
# saved/loaded from feed.dat using __getstate__/__setstate__.
_dynamic_attributes = [
'name',
'etag',
'modified',
'seen',
]
## saved/loaded from ConfigParser instance
# attributes that aren't in DEFAULT
_non_default_configured_attributes = [
'url',
]
# attributes that are in DEFAULT
_default_configured_attributes = [
key.replace('-', '_') for key in _config.CONFIG['DEFAULT'].keys()]
_default_configured_attributes[
_default_configured_attributes.index('from')
] = 'from_email' # `from` is a Python keyword
_default_configured_attributes[
_default_configured_attributes.index('user_agent')
] = '_user_agent' # `user_agent` is a getter that does substitution
# all attributes that are saved/loaded from .config
_configured_attributes = (
_non_default_configured_attributes + _default_configured_attributes)
# attribute name -> .config option
_configured_attribute_translations = dict(
(attr,attr) for attr in _non_default_configured_attributes)
_configured_attribute_translations.update(dict(
zip(_default_configured_attributes,
_config.CONFIG['DEFAULT'].keys())))
# .config option -> attribute name
links = []
# Get the link
link = entry['link']
if link:
links.append(link)
for enclosure in entry['enclosures']:
links.append(enclosure['href'])
if not links:
return message
# Remove the redirect and modify the content
timeout = rss2email.config.CONFIG['DEFAULT'].getint('feed-timeout')
proxy = rss2email.config.CONFIG['DEFAULT']['proxy']
for link in links:
try:
request = urllib.request.Request(link)
request.add_header('User-agent', feed.user_agent)
direct_link = urllib.request.urlopen(request).geturl()
except Exception as e:
LOG.warning('could not follow redirect for {}: {}'.format(
link, e))
continue
content = re.sub(re.escape(link), direct_link, content)
# clear CTE and set message. It can be important to clear the CTE
# before setting the payload, since the payload is only re-encoded
# if CTE is not already set.
del message['Content-Transfer-Encoding']