Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _set_active(feeds, args, active=True):
"Shared by `pause` and `unpause`."
if active:
action = 'unpause'
else:
action = 'pause'
if not args.index:
args.index = range(len(feeds))
for index in args.index:
feed = feeds.index(index)
_LOG.info('{} feed {}'.format(action, feed))
feed.active = active
feeds.save()
warned = True
elif isinstance(exc, (IOError, AttributeError)):
_LOG.error('{}: {}'.format(exc, self))
warned = True
elif isinstance(exc, KeyboardInterrupt):
raise exc
elif isinstance(exc, _sax.SAXParseException):
_LOG.error('sax parsing error: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and
isinstance(exc, _feedparser.CharacterEncodingOverride)):
_LOG.warning(
'incorrectly declared encoding: {}: {}'.format(exc, self))
warned = True
elif (parsed.bozo and isinstance(exc, _feedparser.NonXMLContentType)):
_LOG.warning('non XML Content-Type: {}: {}'.format(exc, self))
warned = True
elif parsed.bozo or exc:
if exc is None:
exc = "can't process"
_LOG.error('processing error: {}: {}'.format(exc, self))
warned = True
if (not warned and
status in [200, 302] and
not parsed.entries and
not version):
raise _error.ProcessingError(parsed=parsed, feed=self)
def _fetch(self):
"""Fetch and parse a feed using feedparser.
>>> feed = Feed(
... name='test-feed',
... url='http://feeds.feedburner.com/allthingsrss/hJBr')
>>> parsed = feed._fetch()
>>> parsed.status
200
"""
_LOG.info('fetch {}'.format(self))
if not self.url:
raise _error.InvalidFeedConfig(setting='url', feed=self)
if self.section in self.config:
config = self.config[self.section]
else:
config = self.config['DEFAULT']
proxy = config['proxy']
timeout = config.getint('feed-timeout')
kwargs = {}
if proxy:
kwargs['handlers'] = [_urllib_request.ProxyHandler({'http':proxy})]
f = _util.TimeLimitedFunction(timeout, _feedparser.parse)
return f(self.url, self.etag, modified=self.modified, **kwargs)
def new(feeds, args):
"Create a new feed database."
if args.email:
_LOG.info('set the default target email to {}'.format(args.email))
feeds.config['DEFAULT']['to'] = args.email
if _os.path.exists(feeds.configfiles[-1]):
raise _error.ConfigAlreadyExistsError(feeds=feeds)
feeds.save()
with _codecs.open(self.datafile, 'w', self.datafile_encoding) as f:
self._save_feed_states(feeds=[], stream=f)
try:
self._datafile_lock = _codecs.open(
self.datafile, 'r', self.datafile_encoding)
except IOError as e:
raise _error.DataFileError(feeds=self) from e
locktype = 0
if lock and UNIX:
locktype = _fcntl.LOCK_SH
_fcntl.lockf(self._datafile_lock.fileno(), locktype)
self.clear()
level = _LOG.level
handlers = list(_LOG.handlers)
feeds = []
try:
data = _json.load(self._datafile_lock)
except ValueError as e:
_LOG.info('could not load data file using JSON')
data = self._load_pickled_data(self._datafile_lock)
version = data.get('version', None)
if version != self.datafile_version:
data = self._upgrade_state_data(data)
for state in data['feeds']:
feed = _feed.Feed(name='dummy-name')
feed.set_state(state)
if 'name' not in state:
raise _error.DataFileError(
feeds=self,
def _check_for_errors(self, parsed):
warned = False
status = getattr(parsed, 'status', 200)
_LOG.debug('HTTP status {}'.format(status))
if status == 301:
_LOG.info('redirect {} from {} to {}'.format(
self.name, self.url, parsed['url']))
self.url = parsed['url']
elif status not in [200, 302, 304, 307]:
raise _error.HTTPError(status=status, feed=self)
http_headers = parsed.get('headers', {})
if http_headers:
_LOG.debug('HTTP headers: {}'.format(http_headers))
if not http_headers:
_LOG.warning('could not get HTTP headers: {}'.format(self))
warned = True
else:
if 'html' in http_headers.get('content-type', 'rss'):
_LOG.warning('looks like HTML: {}'.format(self))
warned = True
if http_headers.get('content-length', '1') == '0':
_LOG.warning('empty page: {}'.format(self))
def _load_pickled_data(self, stream):
_LOG.info('try and load data file using Pickle')
with open(self.datafile, 'rb') as f:
feeds = list(feed.get_state() for feed in _pickle.load(f))
return {
'version': self.datafile_version,
'feeds': feeds,
}