Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse_dir(path, channel2id):
channel_name = path.name
channel = channel2id[channel_name]
actions = []
for fname in path.iterdir():
for data in parse_file(fname, channel):
act = {'_index': INDEX_NAME, '_type': TYPE_NAME}
act['_source'] = data
actions.append(act)
print(channel_name, len(actions))
helpers.bulk(es, actions)
data = convert_message(data)
if data is None:
continue
data['channel'] = channel_id
res_msg = fetch_message(es, data['channel'], data['timestamp'])
if res_msg['total'] != 0:
assert res_msg['total'] == 1
continue
act = {'_index': INDEX_NAME, '_type': TYPE_NAME}
act['_source'] = data
actions.append(act)
print(channel_name, len(actions))
helpers.bulk(es, actions)
time.sleep(1)