Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(dsn):
from influxdb import InfluxDBClient
global client
global thread
global queue
print "Initing InfluxDB"
client = InfluxDBClient.from_dsn(dsn)
print dsn
print "Inited InfluxDB"
queue = Queue.Queue(10000)
thread = threading.Thread(target = influxWorker)
thread.start()
def process_eni_metrics(
stream_eni, myips, stream,
start, end, period, sample_size,
resolver, sink_uri):
"""ENI flow stream processor that rollups, enhances,
and indexes the stream by time period."""
stats = Counter()
period_counters = flow_stream_stats(myips, stream, period)
client = InfluxDBClient.from_dsn(sink_uri)
resource = resolver.resolve_resource(stream_eni)
points = []
for period in sorted(period_counters):
pc = period_counters[period]
pd = datetime.fromtimestamp(period)
for t in ('inbytes', 'outbytes'):
tpc = pc[t]
ips = [ip for ip, _ in tpc.most_common(sample_size)]
resolved = resolver.resolve(ips, pd - timedelta(900), pd + timedelta(900))
logical_counter = rollup_logical(tpc, resolved, ('app', 'env'))
for (app, env), v in logical_counter.items():
p = {}
# rinfo = resolved.get(ip, {})
p['fields'] = {'Bytes': v}
def __init__(self, name=None):
self.client = InfluxDBClient.from_dsn(INFLUXDB_DSN, timeout=2)
dbname = INFLUXDB_DATABASE or self.client._database
try:
if dbname:
self.client.switch_database(dbname)
self.client.create_database(dbname)
except Exception as e:
LOG.error('InfluxDB: ERROR - %s' % e)
super(InfluxDBWrite, self).__init__(name)