Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_collector_to_metric_invalid_dist(self):
agg = mock.Mock()
view = view_module.View(VIDEO_SIZE_VIEW_NAME,
"processed video size over time",
[FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg)
registry = mock.Mock()
options = prometheus.Options("test1", 8001, "localhost", registry)
collector = prometheus.Collector(options=options)
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
with self.assertRaisesRegexp(
ValueError,
'unsupported aggregation type '):
collector.to_metric(desc=desc, tag_values=[None], agg_data=agg)
def test_collector_collect(self):
agg = aggregation_module.LastValueAggregation(256)
view = view_module.View("new_view", "processed video size over time",
[FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg)
registry = mock.Mock()
options = prometheus.Options("test2", 8001, "localhost", registry)
collector = prometheus.Collector(options=options)
collector.register_view(view)
desc = collector.registered_views['test2_new_view']
metric = collector.to_metric(
desc=desc,
tag_values=[tag_value_module.TagValue("value")],
agg_data=agg.aggregation_data)
self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
self.assertEqual('gauge', metric.type)
expected_samples = [
Sample(metric.name, {"myorg_keys_frontend": "value"}, 256)]
exporter = stackdriver.StackdriverStatsExporter(
options=option, client=client)
stats = stats_module.Stats()
view_manager = stats.view_manager
stats_recorder = stats.stats_recorder
if len(view_manager.measure_to_view_map.exporters) > 0:
view_manager.unregister_exporter(
view_manager.measure_to_view_map.exporters[0])
view_manager.register_exporter(exporter)
agg_3 = aggregation_module.SumAggregation(sum=2.2)
view_name3 = "view-name3"
new_view3 = view_module.View(
view_name3, "processed video size over time", [FRONTEND_KEY_FLOAT],
VIDEO_SIZE_MEASURE_FLOAT, agg_3)
view_manager.register_view(new_view3)
tag_value_float = tag_value_module.TagValue("1200")
tag_map = tag_map_module.TagMap()
tag_map.insert(FRONTEND_KEY_FLOAT, tag_value_float)
measure_map = stats_recorder.new_measurement_map()
measure_map.measure_float_put(VIDEO_SIZE_MEASURE_FLOAT, 25 * MiB)
measure_map.record(tag_map)
def test_record_with_exporter(self):
exporter = mock.Mock()
measure_name = "test_measure"
measure_description = "test_description"
measure = BaseMeasure(
name=measure_name, description=measure_description)
view_name = "test_view"
view_description = "test_description"
view_columns = ["testTag1", "testColumn2"]
view_measure = measure
view_aggregation = mock.Mock()
View(
name=view_name,
description=view_description,
columns=view_columns,
measure=view_measure,
aggregation=view_aggregation)
measure_value = 5
tags = {"testTag1": "testTag1Value"}
measurement_map = {measure: measure_value}
timestamp = mock.Mock()
measure_to_view_map = measure_to_view_map_module.MeasureToViewMap()
measure_to_view_map.exporters.append(exporter)
measure_to_view_map._registered_measures = {}
record = measure_to_view_map.record(
tags=tags, measurement_map=measurement_map, timestamp=timestamp)
def test_collector_to_metric_sum(self):
agg = aggregation_module.SumAggregation(256.0)
view = view_module.View(VIDEO_SIZE_VIEW_NAME,
"processed video size over time",
[FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg)
registry = mock.Mock()
options = prometheus.Options("test1", 8001, "localhost", registry)
collector = prometheus.Collector(options=options)
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
self.assertEqual('unknown', metric.type)
self.assertEqual(1, len(metric.samples))
def test_collector_collect_with_none_label_value(self):
agg = aggregation_module.LastValueAggregation(256)
view = view_module.View("new_view", "processed video size over time",
[FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg)
registry = mock.Mock()
options = prometheus.Options("test3", 8001, "localhost", registry)
collector = prometheus.Collector(options=options)
collector.register_view(view)
desc = collector.registered_views['test3_new_view']
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
self.assertEqual(1, len(metric.samples))
sample = metric.samples[0]
# Sample is a namedtuple
# ('Sample', ['name', 'labels', 'value', 'timestamp', 'exemplar'])
label_map = sample[1]
self.assertEqual({"myorg_keys_frontend": ""}, label_map)
value_type, metric_descriptor_type):
"""Test that ViewDatas are converted correctly into Metrics.
This test doesn't check that the various aggregation data `to_point`
methods handle the point conversion correctly, just that converted
Point is included in the Metric, and the metric has the expected
structure, descriptor, and labels.
"""
start_time = datetime.datetime(2019, 1, 25, 11, 12, 13)
current_time = datetime.datetime(2019, 1, 25, 12, 13, 14)
mock_measure = mock.Mock(spec=measure.MeasureFloat)
mock_aggregation = mock.Mock(spec=aggregation_class)
mock_aggregation.get_metric_type.return_value = metric_descriptor_type
vv = view.View(
name=mock.Mock(),
description=mock.Mock(),
columns=[tag_key.TagKey('k1'), tag_key.TagKey('k2')],
measure=mock_measure,
aggregation=mock_aggregation)
vd = mock.Mock(spec=view_data.ViewData)
vd.view = vv
vd.start_time = start_time
mock_point = mock.Mock(spec=point.Point)
mock_point.value = mock.Mock(spec=value_type)
mock_agg = mock.Mock(spec=aggregation_data.SumAggregationData)
mock_agg.to_point.return_value = mock_point
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module
from opencensus.tags import tag_key as tag_key_module
from opencensus.tags import tag_map as tag_map_module
from opencensus.tags import tag_value as tag_value_module
MiB = 1 << 20
FRONTEND_KEY = tag_key_module.TagKey("my.org/keys/frontend")
VIDEO_SIZE_MEASURE = measure_module.MeasureInt(
"my.org/measures/video_size", "size of processed videos", "By")
VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size"
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
[0.0, 16.0 * MiB, 256.0 * MiB])
VIDEO_SIZE_VIEW = view_module.View(
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)
def main():
stats = stats_module.stats
view_manager = stats.view_manager
stats_recorder = stats.stats_recorder
# Register view.
view_manager.register_view(VIDEO_SIZE_VIEW)
# Sleep for [0, 10] milliseconds to fake work.
time.sleep(random.randint(1, 10) / 1000.0)
# Process video.
from opencensus.ext.azure import metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module
from opencensus.tags import tag_map as tag_map_module
stats = stats_module.stats
view_manager = stats.view_manager
stats_recorder = stats.stats_recorder
CARROTS_MEASURE = measure_module.MeasureInt("carrots",
"number of carrots",
"carrots")
CARROTS_VIEW = view_module.View("carrots_view",
"number of carrots",
[],
CARROTS_MEASURE,
aggregation_module.CountAggregation())
def main():
# Enable metrics
# Set the interval in seconds in which you want to send metrics
exporter = metrics_exporter.new_metrics_exporter()
view_manager.register_exporter(exporter)
view_manager.register_view(CARROTS_VIEW)
mmap = stats_recorder.new_measurement_map()
tmap = tag_map_module.TagMap()