Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send(self, sender, message):
_LOG.info('send message for {}'.format(self))
section = self.section
if section not in self.config:
section = 'DEFAULT'
_email.send(sender=sender, recipient=self.to, message=message,
config=self.config, section=section)
def _load_feeds(self, lock, require):
_LOG.debug('load feed data from {}'.format(self.datafile))
if not _os.path.exists(self.datafile):
if require:
raise _error.NoDataFile(feeds=self)
_LOG.info('feed data file not found at {}'.format(self.datafile))
_LOG.debug('creating an empty data file')
dirname = _os.path.dirname(self.datafile)
if dirname and not _os.path.isdir(dirname):
_os.makedirs(dirname, mode=0o700, exist_ok=True)
with _codecs.open(self.datafile, 'w', self.datafile_encoding) as f:
self._save_feed_states(feeds=[], stream=f)
try:
self._datafile_lock = _codecs.open(
self.datafile, 'r', self.datafile_encoding)
except IOError as e:
raise _error.DataFileError(feeds=self) from e
locktype = 0
if lock and UNIX:
locktype = _fcntl.LOCK_SH
_fcntl.lockf(self._datafile_lock.fileno(), locktype)
raise _error.DataFileError(feeds=self) from e
locktype = 0
if lock and UNIX:
locktype = _fcntl.LOCK_SH
_fcntl.lockf(self._datafile_lock.fileno(), locktype)
self.clear()
level = _LOG.level
handlers = list(_LOG.handlers)
feeds = []
try:
data = _json.load(self._datafile_lock)
except ValueError as e:
_LOG.info('could not load data file using JSON')
data = self._load_pickled_data(self._datafile_lock)
version = data.get('version', None)
if version != self.datafile_version:
data = self._upgrade_state_data(data)
for state in data['feeds']:
feed = _feed.Feed(name='dummy-name')
feed.set_state(state)
if 'name' not in state:
raise _error.DataFileError(
feeds=self,
message='missing feed name in datafile {}'.format(
self.datafile))
feeds.append(feed)
_LOG.setLevel(level)
_LOG.handlers = handlers
self.extend(feeds)
def email(feeds, args):
"Update the default target email address"
if not args.email:
_LOG.info('unset the default target email')
else:
_LOG.info('set the default target email to {}'.format(args.email))
feeds.config['DEFAULT']['to'] = args.email
feeds.save()
def opmlexport(feeds, args):
"Export configuration to OPML."
if args.file:
_LOG.info('exporting feeds to {}'.format(args.file))
f = open(args.file, 'wb')
else:
_LOG.info('exporting feeds to stdout')
f = _sys.stdout
f.write(
b'\n'
b'\n'
b'\n'
b'<title>rss2email OPML export</title>\n'
b'\n'
b'\n')
for feed in feeds:
if not feed.url:
_LOG.debug('dropping {}'.format(feed))
continue
name = _saxutils.escape(feed.name)
def opmlexport(feeds, args):
"Export configuration to OPML."
if args.file:
_LOG.info('exporting feeds to {}'.format(args.file))
f = open(args.file, 'wb')
else:
_LOG.info('exporting feeds to stdout')
f = _sys.stdout
f.write(
b'\n'
b'\n'
b'\n'
b'<title>rss2email OPML export</title>\n'
b'\n'
b'\n')
for feed in feeds:
if not feed.url:
_LOG.debug('dropping {}'.format(feed))
continue
name = _saxutils.escape(feed.name)
url = _saxutils.escape(feed.url)
f.write('\n'.format(
name, url).encode())
def delete(feeds, args):
"Remove a feed from the database"
to_remove = []
for index in args.index:
feed = feeds.index(index)
to_remove.append(feed)
for feed in to_remove:
_LOG.info('deleting feed {}'.format(feed))
feeds.remove(feed)
feeds.save()