Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_disallows_non_utc_unix_timestamps(self):
"""
A asking for a UNIX timestamp with a timezone that's not UTC raises a
ValueError.
"""
with pytest.raises(ValueError) as e:
TimeStamper(utc=False)
assert "UNIX timestamps are always UTC." == e.value.args[0]
def test_inserts_utc_unix_timestamp_by_default(self):
"""
Per default a float UNIX timestamp is used.
"""
ts = TimeStamper()
d = ts(None, None, {})
# freezegun doesn't work with time.time. :(
assert isinstance(d["timestamp"], float)
def test_local(self):
"""
Timestamp in local timezone work. We can't add a timezone to the
string without additional libraries.
"""
ts = TimeStamper(fmt="iso", utc=False)
d = ts(None, None, {})
assert "1980-03-25T16:00:00" == d["timestamp"]
def test_bind_request_id_on_message_receive(mocker):
wrapped = Mock()
instance = Mock()
wrapped.__call__ = mocked_function.__call__
args0 = {}
kwargs = {}
args = [args0]
mocker.patch("uuid.uuid4", return_value="12345")
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.JSONRenderer(),
],
context_class=structlog.threadlocal.wrap_dict(dict),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
_bind_request_id_on_message_receive(wrapped, instance, args, kwargs)
request_id = structlog.get_config()["context_class"]._tl.dict_["request_id"]
assert request_id == '12345'
def init():
logging.basicConfig(stream=sys.stdout, format='%(message)s')
logging.getLogger().setLevel(LOG_LEVEL_DEBUG if config.DEBUG
else LOG_LEVEL_PROD)
configure(
processors=[
filter_by_level,
add_log_level,
add_app_context,
split_pos_args,
TimeStamper(fmt='iso', utc=True),
StackInfoRenderer(),
format_exc_info,
JSONRenderer(sort_keys=True)
],
context_class=wrap_dict(dict),
logger_factory=LoggerFactory(),
wrapper_class=BoundLogger,
cache_logger_on_first_use=True,
)
for logger_name in ['requests', 'statsd', 'amqpstorm', 'datadog.dogstatsd']:
logging.getLogger(logger_name).setLevel(logging.WARNING)
return get()
help='Max seconds user is active for'
)
argparser.add_argument(
'--user-session-max-start-delay',
default=60,
type=int,
help='Max seconds by which all users should have logged in'
)
argparser.add_argument(
'--json',
action='store_true',
help='True if output should be JSON formatted'
)
args = argparser.parse_args()
processors=[structlog.processors.TimeStamper(fmt="ISO")]
if args.json:
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(processors=processors)
loop = asyncio.get_event_loop()
loop.run_until_complete(run(args))
def configure_logger(log_name, log_dir, log_level):
eventrenamer = EventRenamer("message")
timestamper = structlog.processors.TimeStamper(fmt="iso", utc=False, key="@timestamp")
shared_processors = [
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
add_thread_info,
timestamper,
eventrenamer
]
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
add_thread_info,
structlog.stdlib.PositionalArgumentsFormatter(),
timestamper,
ch.setLevel(logging.DEBUG)
debug_logger.addHandler(ch)
# Logger for all queries run or accessed (used by flowmachine server)
query_run_log = logging.getLogger("flowmachine").getChild("query_run_log")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
query_run_log.addHandler(ch)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(serializer=rapidjson.dumps),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
FLOWKIT_LOGGERS_HAVE_BEEN_INITIALISED = True
def receiver_setup_logging(
loglevel, logfile, format, colorize, **kwargs
): # pragma: no cover
logging.config.dictConfig(settings.LOGGING)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.ExceptionPrettyPrinter(),
# structlog.processors.KeyValueRenderer(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=structlog.threadlocal.wrap_dict(dict),
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def config_logging():
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt='iso'),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(sort_keys=True)
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)