Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _save_feeds(self):
_LOG.debug('save feed data to {}'.format(self.datafile))
dirname = _os.path.dirname(self.datafile)
if dirname and not _os.path.isdir(dirname):
_os.makedirs(dirname, mode=0o700, exist_ok=True)
tmpfile = self.datafile + '.tmp'
with _codecs.open(tmpfile, 'w', self.datafile_encoding) as f:
self._save_feed_states(feeds=self, stream=f)
f.flush()
_os.fsync(f.fileno())
_os.replace(tmpfile, self.datafile)
if UNIX and self._datafile_lock is not None:
self._datafile_lock.close() # release the lock
self._datafile_lock = None
def _send_digest(self, digest, seen, sender, send=True):
"""Send a digest message
The date is extracted from the last message in the digest
payload. We assume that this part exists. If you don't have
any messages in the digest, don't call this function.
"""
digest['From'] = sender # TODO: _Header(), _formataddr()...
last_part = digest.get_payload()[-1]
last_message = last_part.get_payload()[0]
digest['Date'] = last_message['Date']
_LOG.debug('new digest for {}'.format(self))
if send:
self._send(sender=sender, message=digest)
for (guid, id_) in seen:
if guid not in self.seen:
self.seen[guid] = {}
self.seen[guid]['id'] = id_
def _process_entry(self, parsed, entry):
# get_entry_id is the unique method that encapsulates where the strategy for getting the id is set
guid = self._get_entry_id(entry)
if guid in self.seen:
_LOG.debug('already seen {}'.format(guid))
return # already seen
_LOG.debug('not seen {}'.format(guid))
sender = self._get_entry_email(parsed=parsed, entry=entry)
subject = self._get_entry_title(entry)
extra_headers = _collections.OrderedDict((
('Date', self._get_entry_date(entry)),
('Message-ID', '<{}@dev.null.invalid>'.format(_uuid.uuid4())),
('User-Agent', self.user_agent),
('List-ID', '<{}.localhost>'.format(self.name)),
('List-Post', 'NO (posting not allowed on this list)'),
('X-RSS-Feed', self.url),
('X-RSS-ID', guid),
('X-RSS-URL', self._get_entry_link(entry)),
('X-RSS-TAGS', self._get_entry_tags(entry)),
))
def smtp_send(sender, recipient, message, config=None, section='DEFAULT'):
if config is None:
config = _config.CONFIG
server = config.get(section, 'smtp-server')
# Adding back in support for 'server:port'
pos = server.find(':')
if 0 <= pos:
# Strip port out of server name
port = int(server[pos+1:])
server = server[:pos]
else:
port = config.getint(section, 'smtp-port')
_LOG.debug('sending message to {} via {}'.format(recipient, server))
ssl = config.getboolean(section, 'smtp-ssl')
smtp_auth = config.getboolean(section, 'smtp-auth')
try:
if ssl or smtp_auth:
context = _ssl.create_default_context()
if ssl:
smtp = _smtplib.SMTP_SSL(host=server, port=port, context=context)
else:
smtp = _smtplib.SMTP(host=server, port=port)
except KeyboardInterrupt:
raise
except Exception as e:
raise _error.SMTPConnectionError(server=server) from e
if smtp_auth:
username = config.get(section, 'smtp-username')
password = config.get(section, 'smtp-password')
If the best guess isn't well-formed (something@something.com),
use `self.from_email` instead.
"""
if self.force_from:
return self.from_email
feed = parsed.feed
if 'email' in entry.get('author_detail', []):
return self._validate_email(entry.author_detail.email)
elif 'email' in feed.get('author_detail', []):
return self._validate_email(feed.author_detail.email)
if self.use_publisher_email:
if 'email' in feed.get('publisher_detail', []):
return self._validate_email(feed.publisher_detail.email)
if feed.get('errorreportsto', None):
return self._validate_email(feed.errorreportsto)
_LOG.debug('no sender address found, fallback to default')
return self.from_email
def load(self, lock=True, require=False):
_LOG.debug('load feed configuration from {}'.format(self.configfiles))
if self.configfiles:
self.read_configfiles = self.config.read(self.configfiles)
else:
self.read_configfiles = []
_LOG.debug('loaded configuration from {}'.format(
self.read_configfiles))
self._load_feeds(lock=lock, require=require)