Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _aggregated_field_for_split(cls, aggregation, key, version=3,
granularity=None):
path = cls.FIELD_SEP.join([
str(key), aggregation,
str(utils.timespan_total_seconds(granularity or key.sampling))])
return path + '_v%s' % version if version else path
sys.exit(1)
def sizeof_fmt(num, suffix='B'):
for unit in ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi'):
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
size = 0
agg_methods = int(sys.argv[1])
for g, t in utils.grouper(sys.argv[2:], 2):
granularity = utils.to_timespan(g)
timespan = utils.to_timespan(t)
points = timespan / granularity
cursize = points * WORST_CASE_BYTES_PER_POINT
size += cursize
print("%s over %s = %d points = %s" % (g, t, points, sizeof_fmt(cursize)))
size *= agg_methods
print("Total: " + sizeof_fmt(size))
# not the first time we treat this timeserie.
if need_rewrite:
for key in existing_keys:
if agg_oldest_values['prev_oldest_mutable_key'] <= key:
if key >= agg_oldest_values['oldest_mutable_key']:
break
LOG.debug(
"Compressing previous split %s (%s) for "
"metric %s", key, aggregation.method,
metric)
# NOTE(jd) Rewrite it entirely for fun (and
# later for compression). For that, we just
# pass an empty split.
keys_and_split_to_store[
(key, aggregation)] = (
carbonara.AggregatedTimeSerie(
aggregation)
)
for key, split in ts.split():
if key >= oldest_key_to_keep:
LOG.debug(
"Storing split %s (%s) for metric %s",
key, aggregation.method, metric)
keys_and_split_to_store[(key, aggregation)] = split
return (deleted_keys, keys_and_split_to_store)
def _build_report(self, details):
metrics = set()
count = 0
metric_details = defaultdict(int)
for sack in self.iter_sacks():
marker = ""
while True:
names = list(self._list_keys_to_process(
sack, marker=marker, limit=self.Q_LIMIT))
if names and names[0] < marker:
raise incoming.ReportGenerationError(
"Unable to cleanly compute backlog.")
for name in names:
count += 1
metric = name.split("_")[1]
metrics.add(metric)
if details:
metric_details[metric] += 1
if len(names) < self.Q_LIMIT:
break
else:
marker = name
return len(metrics), count, metric_details if details else None
Bucket=bucket,
Prefix=self._prefix(metric) + '%s_%s' % (
aggregation.method,
utils.timespan_total_seconds(
aggregation.granularity),
),
**kwargs)
# If response is empty then check that the metric exists
contents = response.get('Contents', ())
if not contents and not self._metric_exists_p(metric, version):
raise storage.MetricDoesNotExist(metric)
for f in contents:
try:
if (self._version_check(f['Key'], version)):
meta = f['Key'].split('_')
keys[aggregation].add(carbonara.SplitKey(
utils.to_timestamp(meta[2]),
sampling=aggregation.granularity))
except (ValueError, IndexError):
# Might be "none", or any other file. Be resilient.
continue
return keys
'ContinuationToken': response['NextContinuationToken']
}
else:
kwargs = {}
response = self.s3.list_objects_v2(
Bucket=bucket,
Prefix=self._prefix(metric) + '%s_%s' % (
aggregation.method,
utils.timespan_total_seconds(
aggregation.granularity),
),
**kwargs)
# If response is empty then check that the metric exists
contents = response.get('Contents', ())
if not contents and not self._metric_exists_p(metric, version):
raise storage.MetricDoesNotExist(metric)
for f in contents:
try:
if (self._version_check(f['Key'], version)):
meta = f['Key'].split('_')
keys[aggregation].add(carbonara.SplitKey(
utils.to_timestamp(meta[2]),
sampling=aggregation.granularity))
except (ValueError, IndexError):
# Might be "none", or any other file. Be resilient.
continue
return keys
def _list_split_keys_unbatched(self, metric, aggregations, version=3):
with rados.ReadOpCtx() as op:
omaps, ret = self.ioctx.get_omap_vals(op, "", "", -1)
try:
self.ioctx.operate_read_op(
op, self._build_unaggregated_timeserie_path(metric, 3))
except rados.ObjectNotFound:
raise storage.MetricDoesNotExist(metric)
# NOTE(sileht): after reading the libradospy, I'm
# not sure that ret will have the correct value
# get_omap_vals transforms the C int to python int
# before operate_read_op is called, I dunno if the int
# content is copied during this transformation or if
# this is a pointer to the C int, I think it's copied...
try:
ceph.errno_to_exception(ret)
except rados.ObjectNotFound:
raise storage.MetricDoesNotExist(metric)
raw_keys = [name.split("_")
for name, value in omaps
if self._version_check(name, version)]
keys = collections.defaultdict(set)
def _list_split_keys_unbatched(self, metric, aggregations, version=3):
container = self._container_name(metric)
try:
headers, files = self.swift.get_container(
container, full_listing=True)
except swclient.ClientException as e:
if e.http_status == 404:
raise storage.MetricDoesNotExist(metric)
raise
raw_keys = list(map(
lambda k: k.split("_"),
(f['name'] for f in files
if self._version_check(f['name'], version)
and not f['name'].startswith('none'))))
keys = collections.defaultdict(set)
if not raw_keys:
return keys
zipped = list(zip(*raw_keys))
k_timestamps = utils.to_timestamps(zipped[0])
k_methods = zipped[1]
k_granularities = list(map(utils.to_timespan, zipped[2]))
for timestamp, method, granularity in six.moves.zip(
pipe.exists(key)
aggregations = metrics_and_aggregations[metric]
for aggregation in aggregations:
self._scripts["list_split_keys"](
keys=[key], args=[self._aggregated_field_for_split(
aggregation.method, "*",
version, aggregation.granularity)],
client=pipe,
)
results = pipe.execute()
keys = collections.defaultdict(dict)
start = 0
for metric in metrics:
metric_exists_p = results[start]
if not metric_exists_p:
raise storage.MetricDoesNotExist(metric)
aggregations = metrics_and_aggregations[metric]
number_of_aggregations = len(aggregations)
keys_for_aggregations = results[
start + 1:start + 1 + number_of_aggregations
]
start += 1 + number_of_aggregations # 1 for metric_exists_p
for aggregation, k in six.moves.zip(
aggregations, keys_for_aggregations):
if not k:
keys[metric][aggregation] = set()
continue
timestamps, methods, granularities = list(zip(*k))
timestamps = utils.to_timestamps(timestamps)
granularities = map(utils.to_timespan, granularities)
keys[metric][aggregation] = {
carbonara.SplitKey(timestamp,
try:
self.ioctx.operate_read_op(
op, self._build_unaggregated_timeserie_path(metric, 3))
except rados.ObjectNotFound:
raise storage.MetricDoesNotExist(metric)
# NOTE(sileht): after reading the libradospy, I'm
# not sure that ret will have the correct value
# get_omap_vals transforms the C int to python int
# before operate_read_op is called, I dunno if the int
# content is copied during this transformation or if
# this is a pointer to the C int, I think it's copied...
try:
ceph.errno_to_exception(ret)
except rados.ObjectNotFound:
raise storage.MetricDoesNotExist(metric)
raw_keys = [name.split("_")
for name, value in omaps
if self._version_check(name, version)]
keys = collections.defaultdict(set)
if not raw_keys:
return keys
zipped = list(zip(*raw_keys))
k_timestamps = utils.to_timestamps(zipped[2])
k_methods = zipped[3]
k_granularities = list(map(utils.to_timespan, zipped[4]))
for timestamp, method, granularity in six.moves.zip(
k_timestamps, k_methods, k_granularities):
for aggregation in aggregations:
if (aggregation.method == method