Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser = argparse.ArgumentParser()
parser.add_argument('bro_log', type=str, help='Specify a bro log to run BroLogReader test on')
parser.add_argument('-t', '--tail', action='store_true', help='Turn on log tailing')
args, commands = parser.parse_known_args()
# Check for unknown args
if commands:
print('Unrecognized args: %s' % commands)
sys.exit(1)
# File may have a tilde in it
if args.bro_log:
args.bro_log = os.path.expanduser(args.bro_log)
# Run the bro reader on a given log file
reader = bro_log_reader.BroLogReader(args.bro_log, tail=args.tail, strict=True)
for row in reader.readrows():
pprint(row)
self.type_mapper = {'bool': lambda x: True if x == 'T' else False,
'count': int,
'int': int,
'double': float,
'time': lambda x: datetime.datetime.fromtimestamp(float(x)),
'interval': lambda x: datetime.timedelta(seconds=float(x)),
'string': lambda x: x,
'enum': lambda x: x,
'port': int,
'unknown': lambda x: x}
self.dash_mapper = {'bool': False, 'count': 0, 'int': 0, 'port': 0, 'double': 0.0,
'time': datetime.datetime.fromtimestamp(86400), 'interval': datetime.timedelta(seconds=0),
'string': '-', 'unknown:': '-'}
# Initialize the Parent Class
super(BroLogReader, self).__init__(self._filepath, full_read=True, tail=self._tail)
# Test some of the error conditions
reader.field_names = ['good', 'error']
reader.type_converters = [int, lambda x: datetime.datetime.fromtimestamp(float(x))]
reader.make_dict([5, '0, .5, .5'])
# Test invalid file path
with pytest.raises(IOError):
BroLogReader('nowhere.log')
# Now include tailing (note: as an automated test this needs to timeout quickly)
try:
from interruptingcow import timeout
# Spin up the class
tailer = BroLogReader(test_path, tail=True)
# Tail the file for 2 seconds and then quit
try:
with timeout(2, exception=RuntimeError):
for line in tailer.readrows():
print(line)
except RuntimeError: # InterruptingCow raises a RuntimeError on timeout
print('Tailing Test successful!')
except ImportError:
print('Tailing Test not run, need interruptcow module...')