Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, feeds=None, message=None, **kwargs):
if message is None:
message = 'error with feeds'
super(FeedsError, self).__init__(message=message, **kwargs)
self.feeds = feeds
class DataFileError (FeedsError):
def __init__(self, feeds, message=None):
if message is None:
message = 'problem with the feed data file {}'.format(
feeds.datafile)
super(DataFileError, self).__init__(feeds=feeds, message=message)
class NoDataFile (DataFileError):
def __init__(self, feeds):
message = 'feed data file {} does not exist'.format(feeds.datafile)
super(NoDataFile, self).__init__(feeds=feeds, message=message)
def log(self):
super(NoDataFile, self).log()
_LOG.warning(
"if you're using r2e for the first time, you have to run "
"'r2e new' first.")
class NoToEmailAddress (InvalidFeedConfig, FeedsError):
def __init__(self, feed, **kwargs):
message = 'no target email address has been defined'
super(NoToEmailAddress, self).__init__(
setting='to', feed=feed, message=message, **kwargs)
level = _LOG.level
handlers = list(_LOG.handlers)
feeds = []
try:
data = _json.load(self._datafile_lock)
except ValueError as e:
_LOG.info('could not load data file using JSON')
data = self._load_pickled_data(self._datafile_lock)
version = data.get('version', None)
if version != self.datafile_version:
data = self._upgrade_state_data(data)
for state in data['feeds']:
feed = _feed.Feed(name='dummy-name')
feed.set_state(state)
if 'name' not in state:
raise _error.DataFileError(
feeds=self,
message='missing feed name in datafile {}'.format(
self.datafile))
feeds.append(feed)
_LOG.setLevel(level)
_LOG.handlers = handlers
self.extend(feeds)
if locktype == 0:
self._datafile_lock.close()
self._datafile_lock = None
for feed in self:
feed.load_from_config(self.config)
feed_names = set(feed.name for feed in self)
def __init__(self, feeds, message=None):
if message is None:
message = 'problem with the feed data file {}'.format(
feeds.datafile)
super(DataFileError, self).__init__(feeds=feeds, message=message)
_LOG.debug('load feed data from {}'.format(self.datafile))
if not _os.path.exists(self.datafile):
if require:
raise _error.NoDataFile(feeds=self)
_LOG.info('feed data file not found at {}'.format(self.datafile))
_LOG.debug('creating an empty data file')
dirname = _os.path.dirname(self.datafile)
if dirname and not _os.path.isdir(dirname):
_os.makedirs(dirname, mode=0o700, exist_ok=True)
with _codecs.open(self.datafile, 'w', self.datafile_encoding) as f:
self._save_feed_states(feeds=[], stream=f)
try:
self._datafile_lock = _codecs.open(
self.datafile, 'r', self.datafile_encoding)
except IOError as e:
raise _error.DataFileError(feeds=self) from e
locktype = 0
if lock and UNIX:
locktype = _fcntl.LOCK_SH
_fcntl.lockf(self._datafile_lock.fileno(), locktype)
self.clear()
level = _LOG.level
handlers = list(_LOG.handlers)
feeds = []
try:
data = _json.load(self._datafile_lock)
except ValueError as e:
_LOG.info('could not load data file using JSON')
data = self._load_pickled_data(self._datafile_lock)