Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_agg_def_repr_with_children(self):
grandkids = [
AggregationDef(agg_type='cardinality', value='ingest_source'),
AggregationDef(agg_type='terms', value='ingest_source')
]
kid = AggregationDef(agg_type='date_hist', value='d', children=grandkids)
agg_def = AggregationDef(agg_type='geohash', value='4', children=kid)
self.assertEqual(agg_def.__repr__(), 'geohash:4;date_hist:d;(cardinality:ingest_source,terms:ingest_source)')
def test_agg_def_repr_with_children(self):
grandkids = [
AggregationDef(agg_type='cardinality', value='ingest_source'),
AggregationDef(agg_type='terms', value='ingest_source')
]
kid = AggregationDef(agg_type='date_hist', value='d', children=grandkids)
agg_def = AggregationDef(agg_type='geohash', value='4', children=kid)
self.assertEqual(agg_def.__repr__(), 'geohash:4;date_hist:d;(cardinality:ingest_source,terms:ingest_source)')
def test_vectors_aggregate_query_agg_def(self):
wkt = 'POLYGON((-76.65 40.10, -76.65 40.14, -76.55 40.14, -76.55 40.10, -76.65 40.10))'
aggs = AggregationDef(agg_type='terms', value='ingest_source')
result = self.gbdx.vectors.aggregate_query(wkt, aggs)
self.assertEqual(len(result), 1)
self.assertIn('name', result[0])
self.assertEqual(result[0]['name'],'terms:ingest_source')
self.assertIn('terms', result[0])
self.assertEqual(len(result[0]['terms']), 1)
def test_agg_def_repr_with_children(self):
grandkids = [
AggregationDef(agg_type='cardinality', value='ingest_source'),
AggregationDef(agg_type='terms', value='ingest_source')
]
kid = AggregationDef(agg_type='date_hist', value='d', children=grandkids)
agg_def = AggregationDef(agg_type='geohash', value='4', children=kid)
self.assertEqual(agg_def.__repr__(), 'geohash:4;date_hist:d;(cardinality:ingest_source,terms:ingest_source)')
def test_agg_def_repr_no_children(self):
agg_def = AggregationDef(agg_type='terms', value='ingest_source')
self.assertEqual(agg_def.__repr__(), 'terms:ingest_source')
def test_vectors_aggregate_query_complex(self):
wkt = 'POLYGON((-76.65 40.10, -76.65 40.14, -76.55 40.14, -76.55 40.10, -76.65 40.10))'
child_agg = AggregationDef(agg_type='date_hist', value='month')
aggs = AggregationDef(agg_type='geohash', value='4', children=child_agg)
query = 'item_type:tweet'
start_date = 'now-12M'
end_date = 'now'
result = self.gbdx.vectors.aggregate_query(wkt, aggs, index='vector-sma-twitter*', query=query, start_date=start_date, end_date=end_date)
self.assertEqual(len(result), 1)
self.assertIn('name', result[0])
self.assertEqual(result[0]['name'],'geohash:4')
self.assertIn('terms', result[0])
terms = result[0]['terms']
self.assertEqual(len(terms), 1)
self.assertEqual(terms[0]['term'], 'dr1s')
self.assertEqual(len(terms[0]['aggregations']), 1)
self.assertEqual(len(terms[0]['aggregations'][0]['terms']), 2)
return base
class GeohashAggDef(AggregationDef):
def __init__(self, hash_length='3', **kwargs):
super(GeohashAggDef, self).__init__('geohash', hash_length, **kwargs)
class DateHistogramAggDef(AggregationDef):
def __init__(self, bucket_period='M', **kwargs):
super(DateHistogramAggDef, self).__init__('date_hist', bucket_period, **kwargs)
class FieldBasedAggDef(AggregationDef):
def __init__(self, agg_type, field=None, **kwargs):
if not field:
raise Exception('The "field" property cannot be empty.')
super(FieldBasedAggDef, self).__init__(agg_type, field, **kwargs)
class TermsAggDef(FieldBasedAggDef):
def __init__(self, field=None, **kwargs):
super(TermsAggDef, self).__init__('terms', field, **kwargs)
class CardinalityAggDef(FieldBasedAggDef):
def __repr__(self):
"""Creates a string representation of an aggregation definition suitable for use in VectorServices calls
Returns:
A string representation of an aggregation definition suitable for use in VectorServices calls
"""
if self.value:
base = '%s:%s' % (self.agg_type, self.value)
else:
base = '%s' % self.agg_type
if self.children:
if isinstance(self.children, six.string_types):
return '%s;%s' % (base, self.children)
elif isinstance(self.children, AggregationDef):
return '%s;%s' % (base, self.children.__repr__())
else: # assume it's iterable
kids = []
for child in self.children:
kids.append(child.__repr__())
kids_str = '(%s)' % ','.join(kids)
return '%s;%s' % (base, kids_str)
else:
return base
super(CardinalityAggDef, self).__init__('cardinality', field)
class AvgAggDef(FieldBasedAggDef):
def __init__(self, field=None):
super(AvgAggDef, self).__init__('avg', field)
class SumAggDef(FieldBasedAggDef):
def __init__(self, field=None):
super(SumAggDef, self).__init__('sum', field)
class AvgGeoLatAggDef(AggregationDef):
def __init__(self):
super(AvgGeoLatAggDef, self).__init__('avg_geo_lat')
class AvgGeoLonAggDef(AggregationDef):
def __init__(self):
super(AvgGeoLonAggDef, self).__init__('avg_geo_lon')
if self.children:
if isinstance(self.children, six.string_types):
return '%s;%s' % (base, self.children)
elif isinstance(self.children, AggregationDef):
return '%s;%s' % (base, self.children.__repr__())
else: # assume it's iterable
kids = []
for child in self.children:
kids.append(child.__repr__())
kids_str = '(%s)' % ','.join(kids)
return '%s;%s' % (base, kids_str)
else:
return base
class GeohashAggDef(AggregationDef):
def __init__(self, hash_length='3', **kwargs):
super(GeohashAggDef, self).__init__('geohash', hash_length, **kwargs)
class DateHistogramAggDef(AggregationDef):
def __init__(self, bucket_period='M', **kwargs):
super(DateHistogramAggDef, self).__init__('date_hist', bucket_period, **kwargs)
class FieldBasedAggDef(AggregationDef):
def __init__(self, agg_type, field=None, **kwargs):
if not field: