Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_constructor_explicit(self):
span_id = 'test_span_id'
span_name = 'test_span_name'
parent_span = mock.Mock()
start_time = utils.to_iso_str()
end_time = utils.to_iso_str()
attributes = {
'http.status_code': '200',
'component': 'HTTP load balancer',
}
message_events = [mock.Mock()]
links = mock.Mock()
stack_trace = mock.Mock()
status = mock.Mock()
context_tracer = mock.Mock()
span = self._make_one(
name=span_name,
parent_span=parent_span,
attributes=attributes,
start_time=start_time,
def test_create_span_data(self):
span_data_module.SpanData(
name='root',
context=None,
span_id='6e0c63257de34c92',
parent_span_id='6e0c63257de34c93',
attributes={'key1': 'value1'},
start_time=utils.to_iso_str(),
end_time=utils.to_iso_str(),
stack_trace=None,
links=None,
status=None,
annotations=None,
message_events=None,
same_process_as_parent_span=None,
child_span_count=None,
span_kind=0,
)
def test_format_legacy_trace_json(self):
trace_id = '2dd43a1d6b2549c6bc2a1a54c2fc0b05'
span_data = span_data_module.SpanData(
name='root',
context=span_context.SpanContext(
trace_id=trace_id,
span_id='6e0c63257de34c92'
),
span_id='6e0c63257de34c92',
parent_span_id='6e0c63257de34c93',
attributes={'key1': 'value1'},
start_time=utils.to_iso_str(),
end_time=utils.to_iso_str(),
stack_trace=stack_trace.StackTrace(stack_trace_hash_id='111'),
links=[link.Link('1111', span_id='6e0c63257de34c92')],
status=status.Status(code=0, message='pok'),
annotations=[
time_event.Annotation(
timestamp=datetime.datetime(1970, 1, 1),
description='description'
)
],
message_events=[
time_event.MessageEvent(
timestamp=datetime.datetime(1970, 1, 1),
id=0,
)
],
same_process_as_parent_span=False,
def test_get_weakref_unbound(self):
mock_val = Mock()
func = lambda: mock_val # noqa
ref = utils.get_weakref(func)
self.assertIsInstance(ref, weakref.ref)
self.assertIs(ref(), func)
self.assertIs(ref()(), mock_val)
del func
gc.collect()
self.assertIsNotNone(ref)
self.assertIsNone(ref())
:type metric_producers:
list(:class:`opencensus.metrics.export.metric_producer.MetricProducer`)
:param metric_producers: The list of metric producers to use to get metrics
:type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter`
:param exporter: The exporter to use to export metrics.
:type interval: int or float
:param interval: Seconds between export calls.
:rtype: :class:`PeriodicTask`
:return: A running thread responsible calling the exporter.
"""
weak_gets = [utils.get_weakref(producer.get_metrics)
for producer in metric_producers]
weak_export = utils.get_weakref(exporter.export_metrics)
def export_all():
all_gets = []
for weak_get in weak_gets:
get = weak_get()
if get is None:
raise TransportError("Metric producer is not available")
all_gets.append(get())
export = weak_export()
if export is None:
raise TransportError("Metric exporter is not available")
export(itertools.chain(*all_gets))
def __init__(self, func, gauge_point):
self.gauge_point = gauge_point
self.func = utils.get_weakref(func)
def format_annotation_json(self):
annotation_json = {}
annotation_json['description'] = utils.get_truncatable_str(
self.description)
if self.attributes is not None:
annotation_json['attributes'] = self.attributes.\
format_attributes_json()
return annotation_json
def create_batched_time_series(self, metrics,
batch_size=MAX_TIME_SERIES_PER_UPLOAD):
time_series_list = itertools.chain.from_iterable(
self.create_time_series_list(metric) for metric in metrics)
return list(utils.window(time_series_list, batch_size))
def _format_attribute_value(value):
if isinstance(value, bool):
value_type = 'bool_value'
elif isinstance(value, int):
value_type = 'int_value'
elif isinstance(value, six.string_types):
value_type = 'string_value'
value = utils.get_truncatable_str(value)
elif isinstance(value, float):
value_type = 'double_value'
else:
return None
return {value_type: value}
list(:class:`opencensus.metrics.export.metric_producer.MetricProducer`)
:param metric_producers: The list of metric producers to use to get metrics
:type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter`
:param exporter: The exporter to use to export metrics.
:type interval: int or float
:param interval: Seconds between export calls.
:rtype: :class:`PeriodicTask`
:return: A running thread responsible calling the exporter.
"""
weak_gets = [utils.get_weakref(producer.get_metrics)
for producer in metric_producers]
weak_export = utils.get_weakref(exporter.export_metrics)
def export_all():
all_gets = []
for weak_get in weak_gets:
get = weak_get()
if get is None:
raise TransportError("Metric producer is not available")
all_gets.append(get())
export = weak_export()
if export is None:
raise TransportError("Metric exporter is not available")
export(itertools.chain(*all_gets))
tt = PeriodicMetricTask(interval, export_all)
tt.start()