Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _build_report(self, details):
metrics = set()
count = 0
metric_details = defaultdict(int)
for sack in self.iter_sacks():
marker = ""
while True:
names = list(self._list_keys_to_process(
sack, marker=marker, limit=self.Q_LIMIT))
if names and names[0] < marker:
raise incoming.ReportGenerationError(
"Unable to cleanly compute backlog.")
for name in names:
count += 1
metric = name.split("_")[1]
metrics.add(metric)
if details:
metric_details[metric] += 1
if len(names) < self.Q_LIMIT:
break
else:
marker = name
return len(metrics), count, metric_details if details else None
def MeasuresListSchema(measures):
try:
times = utils.to_timestamps([m['timestamp'] for m in measures])
except TypeError:
raise voluptuous.Invalid("unexpected measures format")
except ValueError as e:
raise voluptuous.Invalid("unexpected timestamp '%s'" % e)
try:
values = [float(i['value']) for i in measures]
except Exception:
raise voluptuous.Invalid("unexpected measures value")
return (incoming.Measure(t, v) for t, v in six.moves.zip(times, values))
self.times[metric_name] = incoming.Measure(
utils.dt_in_unix_ns(utils.utcnow()), value)
elif metric_type == "g":
if sampling is not None:
raise ValueError(
"Invalid sampling for g: `%d`, should be none"
% sampling)
self.gauges[metric_name] = incoming.Measure(
utils.dt_in_unix_ns(utils.utcnow()), value)
elif metric_type == "c":
sampling = 1 if sampling is None else sampling
if metric_name in self.counters:
current_value = self.counters[metric_name].value
else:
current_value = 0
self.counters[metric_name] = incoming.Measure(
utils.dt_in_unix_ns(utils.utcnow()),
current_value + (value * (1 / sampling)))
# TODO(jd) Support "set" type
# elif metric_type == "s":
# pass
else:
raise ValueError("Unknown metric type `%s'" % metric_type)
def change_sack_size():
conf = cfg.ConfigOpts()
conf.register_cli_opts([_SACK_NUMBER_OPT])
conf = service.prepare_service(conf=conf, log_to_std=True)
s = incoming.get_driver(conf)
try:
report = s.measures_report(details=False)
except incoming.SackDetectionError:
LOG.error('Unable to detect the number of storage sacks.\n'
'Ensure gnocchi-upgrade has been executed.')
return
remainder = report['summary']['measures']
if remainder:
LOG.error('Cannot change sack when non-empty backlog. Process '
'remaining %s measures and try again', remainder)
return
LOG.info("Removing current %d sacks", s.NUM_SACKS)
s.remove_sacks()
LOG.info("Creating new %d sacks", conf.sacks_number)
s.upgrade(conf.sacks_number)
# under the License.
from concurrent import futures
import daiquiri
import numpy
import six
from gnocchi import incoming
from gnocchi import utils
LOG = daiquiri.getLogger(__name__)
_NUM_WORKERS = utils.get_default_workers()
class CarbonaraBasedStorage(incoming.StorageDriver):
MEASURE_PREFIX = "measure"
SACK_PREFIX = "incoming"
CFG_PREFIX = 'gnocchi-config'
CFG_SACKS = 'sacks'
@property
def NUM_SACKS(self):
if not hasattr(self, '_num_sacks'):
try:
self._num_sacks = int(self.get_storage_sacks())
except Exception as e:
LOG.error('Unable to detect the number of storage sacks. '
'Ensure gnocchi-upgrade has been executed: %s', e)
raise incoming.SackDetectionError(e)
return self._num_sacks
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import uuid
import daiquiri
import six
from gnocchi.common import redis
from gnocchi import incoming
LOG = daiquiri.getLogger(__name__)
class RedisStorage(incoming.IncomingDriver):
_SCRIPTS = {
"process_measure_for_metric": """
local llen = redis.call("LLEN", KEYS[1])
-- lrange is inclusive on both ends, decrease to grab exactly n items
if llen > 0 then llen = llen - 1 end
return {llen, table.concat(redis.call("LRANGE", KEYS[1], 0, llen), "")}
""",
"process_measures_for_sack": """
local results = {}
local metric_id_extractor = "[^%s]*%s([^%s]*)"
local metric_with_measures = redis.call("KEYS", KEYS[1] .. "%s*")
for i, sack_metric in ipairs(metric_with_measures) do
local llen = redis.call("LLEN", sack_metric)
local metric_id = sack_metric:gmatch(metric_id_extractor)()
-- lrange is inclusive on both ends, decrease to grab exactly n items
def get(details=True):
enforce("get status", {})
try:
report = pecan.request.incoming.measures_report(
strtobool("details", details))
except incoming.ReportGenerationError:
abort(503, 'Unable to generate status. Please retry.')
report_dict = {"storage": {"summary": report['summary']}}
if 'details' in report:
report_dict["storage"]["measures_to_process"] = report['details']
return report_dict
def NUM_SACKS(self):
if not hasattr(self, '_num_sacks'):
try:
self._num_sacks = int(self.get_storage_sacks())
except Exception as e:
LOG.error('Unable to detect the number of storage sacks. '
'Ensure gnocchi-upgrade has been executed: %s', e)
raise incoming.SackDetectionError(e)
return self._num_sacks
for field_name, field_value in six.iteritems(fields):
if isinstance(field_value, str):
# We do not support field value that are not numerical
continue
# Metric name is the:
# .@=,…
# with tag ordered
# Replace "/" with "_" because Gnocchi does not support /
# in metric names
metric_name = (
measurement + "." + field_name + tags_str
).replace("/", "_")
resources[resource_id][metric_name].append(
incoming.Measure(timestamp, field_value))
measures_to_batch = {}
for resource_name, metrics_and_measures in six.iteritems(
resources):
resource_name = resource_name
resource_id = utils.ResourceUUID(
resource_name, creator=creator)
LOG.debug("Getting metrics from resource `%s'", resource_name)
timeout = pecan.request.conf.api.operation_timeout
metrics = (
api.get_or_create_resource_and_metrics.retry_with(
stop=tenacity.stop_after_delay(timeout))(
creator, resource_id, resource_name,
metrics_and_measures.keys(),
{}, db)
)