Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self, request, context):
"""Handle a test request by calling other test services"""
logger.debug("grpc service received: %s", request)
if not request.service_hops:
response = pb2.TestResponse(
id=request.id,
status=[pb2.CommonResponseStatus(
status=pb2.SUCCESS,
)],
)
else:
status = ([pb2.CommonResponseStatus(status=pb2.SUCCESS)] +
list(service.call_next(request).status))
response = pb2.TestResponse(id=request.id, status=status)
tracer = execution_context.get_opencensus_tracer()
tracer.add_attribute_to_current_span("reqId", request.id)
return response
def test_emit(self, monitor_resource_mock):
trace_id = '6e0c63257de34c92bf9efcd03927272e'
span_datas = [
span_data_module.SpanData(
name='span',
context=span_context.SpanContext(trace_id=trace_id),
span_id='1111',
parent_span_id=None,
attributes=None,
start_time=None,
end_time=None,
child_span_count=None,
stack_trace=None,
time_events=None,
links=None,
status=None,
same_process_as_parent_span=None,
span_kind=0,
)
]
stackdriver_spans = {
def test_emit(self):
exporter = logging_exporter.LoggingExporter()
logger = mock.Mock()
exporter.logger = logger
span_datas = [
span_data_module.SpanData(
name='span',
context=span_context.SpanContext(trace_id='1'),
span_id='1111',
parent_span_id=None,
attributes=None,
start_time=None,
end_time=None,
child_span_count=None,
stack_trace=None,
annotations=None,
message_events=None,
links=None,
status=None,
same_process_as_parent_span=None,
span_kind=0,
)
]
exporter.emit(span_datas)
def test_init(self):
# Check for required arg errors
with self.assertRaises(ValueError):
metric.Metric(Mock(), None)
with self.assertRaises(ValueError):
metric.Metric(None, Mock())
mock_time_series = Mock(spec=time_series.TimeSeries)
mock_time_series.check_points_type.return_value = True
mock_descriptor = Mock(spec=metric_descriptor.MetricDescriptor)
mock_descriptor.type = (metric_descriptor
.MetricDescriptorType.GAUGE_INT64)
mm = metric.Metric(mock_descriptor, [mock_time_series],)
self.assertEqual(mm.time_series, [mock_time_series])
self.assertEqual(mm.descriptor, mock_descriptor)
def setUp(self):
self.double_value = value_module.ValueDouble(55.5)
self.long_value = value_module.ValueLong(9876543210)
self.timestamp = '2018-10-06T17:57:57.936475Z'
value_at_percentile = [summary_module.ValueAtPercentile(99.5, 10.2)]
snapshot = summary_module.Snapshot(10, 87.07, value_at_percentile)
self.summary = summary_module.Summary(10, 6.6, snapshot)
self.summary_value = value_module.ValueSummary(self.summary)
self.distribution_value = value_module.ValueDistribution(
100,
1000.0,
10.0,
value_module.BucketOptions(
value_module.Explicit(list(range(1, 10)))),
[value_module.Bucket(10, None) for ii in range(10)],
)
def test_init(self):
bounds = [1, 2]
explicit = value_module.Explicit(bounds)
self.assertEqual(explicit.bounds, bounds)
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from opencensus.metrics import label_value
from opencensus.metrics.export import point, time_series, value
START_TIMESTAMP = '2018-10-09T22:33:44.012345Z'
LABEL_VALUE1 = label_value.LabelValue('value one')
LABEL_VALUE2 = label_value.LabelValue('价值二')
LABEL_VALUES = (LABEL_VALUE1, LABEL_VALUE2)
POINTS = (point.Point(
value.ValueLong(1), "2018-10-09T23:33:44.012345Z"),
point.Point(
value.ValueLong(2), "2018-10-10T00:33:44.012345Z"),
point.Point(
value.ValueLong(3), "2018-10-10T01:33:44.012345Z"),
point.Point(
value.ValueLong(4), "2018-10-10T02:33:44.012345Z"),
point.Point(
value.ValueLong(5), "2018-10-10T03:33:44.012345Z"))
class TestTimeSeries(unittest.TestCase):
def test_init(self):
ts = time_series.TimeSeries(LABEL_VALUES, POINTS, START_TIMESTAMP)
self.assertEqual(ts.start_timestamp, START_TIMESTAMP)
self.assertEqual(ts.label_values, LABEL_VALUES)
def test_new_aggregation_data_explicit(self):
measure = mock.Mock(spec=measure_module.MeasureInt)
last_value_aggregation = aggregation_module.LastValueAggregation(
value=6)
agg_data = last_value_aggregation.new_aggregation_data(measure)
self.assertEqual(6, agg_data.value)
self.assertEqual(value.ValueLong, agg_data.value_type)
def test_constructor_explicit(self):
span_id = 'test_span_id'
span_name = 'test_span_name'
parent_span = mock.Mock()
start_time = utils.to_iso_str()
end_time = utils.to_iso_str()
attributes = {
'http.status_code': '200',
'component': 'HTTP load balancer',
}
annotations = [mock.Mock()]
message_events = [mock.Mock()]
links = [mock.Mock()]
stack_trace = mock.Mock()
status = mock.Mock()
context_tracer = mock.Mock()
span = self._make_one(
name=span_name,
parent_span=parent_span,
attributes=attributes,
def test_constructor_explicit(self):
span_id = 'test_span_id'
span_name = 'test_span_name'
parent_span = mock.Mock()
start_time = utils.to_iso_str()
end_time = utils.to_iso_str()
attributes = {
'http.status_code': '200',
'component': 'HTTP load balancer',
}
annotations = [mock.Mock()]
message_events = [mock.Mock()]
links = [mock.Mock()]
stack_trace = mock.Mock()
status = mock.Mock()
context_tracer = mock.Mock()
span = self._make_one(
name=span_name,
parent_span=parent_span,
attributes=attributes,
start_time=start_time,