Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# REMINDED_FILE_PATH = os.path.join(os.getcwd(), 'reminded.json')
# None for no status, 0 for message mode, 1 for robot mode
STATUS_FILE_PATH = os.path.join(os.getcwd(), 'status.json')
# path where received image, file ... saved to
DATA_PATH = os.path.join(os.getcwd(), 'data')
if DEBUG:
logzero.loglevel(logging.DEBUG)
else:
logzero.loglevel(logging.WARNING)
fmt = '%(color)s[%(asctime)s <%(module)s:%(funcName)s>:%(lineno)d] [%(levelname)s]%(end_color)s - %(message)s'
formatter = logzero.LogFormatter(color=True, datefmt='%Y%m%d %H:%M:%S', fmt=fmt)
file_formatter = logzero.LogFormatter(color=False, datefmt='%Y%m%d %H:%M:%S', fmt=fmt)
logzero.formatter(formatter)
logzero.logfile(filename='crazyWx.log', formatter=file_formatter,
maxBytes=1000000, backupCount=3)
# logzero.logfile(filename='warning.log', formatter=file_formatter,
# maxBytes=1000000, backupCount=3, loglevel=logging.WARNING)
logzero.logger.debug("crazyWx init")
if __name__ == '__main__':
pass
import logzero
_log_format = "%(color)s%(levelname)s:%(end_color)s %(message)s"
_formatter = logzero.LogFormatter(fmt=_log_format)
_log_level = 0
# Set up a temporary logger with default log level so that
# it can be used before log level argument is determined
logzero.setup_default_logger(formatter=_formatter, level=_log_level)
# Options
# Initialize an empty object which can be assigned attributes
# (useful when using spotdl as a library)
args = type("", (), {})()
# Apple has specific tags - see mutagen docs -
# http://mutagen.readthedocs.io/en/latest/api/mp4.html
M4A_TAG_PRESET = {
"album": "\xa9alb",
"artist": "\xa9ART",
def _logger():
"""Add the multiprocessing module's logger.
:return: Returns a multiprocessing logger.
"""
multiprocess_handler = get_logger()
multiprocess_handler = logging.StreamHandler()
multiprocess_handler.setLevel(logging.ERROR)
multiprocess_handler.setFormatter(logzero.LogFormatter(color=True))
# Attach it to the logzero default logger
logzero.logger.addHandler(multiprocess_handler)
logger = logzero.logger
return logger
import pyarrow as pa
import pyarrow.parquet as pq
import logzero
from logzero import logger
#=============================================================================
# get current timestamp
now = pd.to_datetime('today')
#=============================================================================
## setup logger
logfile = PurePath(pdir/'logs'/'equity_downloader_logs'/f'iex_downloader_log_{now.date()}.log').as_posix()
log_format = '%(color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d]%(end_color)s %(message)s'
formatter = logzero.LogFormatter(fmt=log_format, datefmt='%Y-%m-%d %I:%M:%S')
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
#=============================================================================
# confirm market hours
local_tz = tzlocal.get_localzone() # get local timezone via tzlocal package
now_local_tz = now.tz_localize(local_tz) # localize current timestamp
nyse = mcal.get_calendar('NYSE') # get NYSE calendar
nyseToday = nyse.schedule(start_date=now.date(), end_date=now.date())
mktOpen = nyseToday.market_open.iloc[0].tz_convert(local_tz)
mktClose = nyseToday.market_close.iloc[0].tz_convert(local_tz)
if mktOpen <= now_local_tz <= mktClose: # only run during market hours
#==========================================================================
# import symbols
from __future__ import print_function
import sys
import re
import datetime
import os, hashlib, base64
import logzero
from logzero import logger
DEFAULT_FORMAT = '%(color)s[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d]%(end_color)s %(message)s'
logfmt = logzero.LogFormatter(fmt=DEFAULT_FORMAT, datefmt="%Y-%m-%d %H:%M:%S")
logzero.setup_default_logger(formatter=logfmt)
storage_client = None
class InputStream(object):
'''This class handles opening either stdin or a gzipped or non-gzipped file'''
def __init__(self, string = None, tempdir = None):
'''Create a new wrapper around a stream'''
self.tempdir = tempdir
if string in (None, '-', 'stdin') and self.valid(string):
self.handle = sys.stdin
return
if string.startswith('gs:'):
localpath = self.download_google_file(string)
string = localpath
import logzero
import logging
_LOG_LEVELS_STR = ['INFO', 'WARNING', 'ERROR', 'DEBUG']
def log_leveller(log_level_str):
loggin_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG]
log_level_str_index = _LOG_LEVELS_STR.index(log_level_str)
loggin_level = loggin_levels[log_level_str_index]
return loggin_level
log_format = ("%(color)s%(levelname)s:%(end_color)s %(message)s")
formatter = logzero.LogFormatter(fmt=log_format)
# create a default logger
log = logzero.setup_logger(formatter=formatter)
Setup logging to a (rotating) logfile.
Args:
filename (str): Logfile. If filename is None, disable file logging
max_bytes (int): Maximum number of bytes per logfile. If used together with backup_count,
logfile will be rotated when it reaches this amount of bytes.
backup_count (int): Number of rotated logfiles to keep
"""
_logger = logging.getLogger("neo-python")
if not filename and not self.rotating_filehandler:
_logger.removeHandler(self.rotating_filehandler)
else:
self.rotating_filehandler = RotatingFileHandler(filename, mode='a', maxBytes=max_bytes, backupCount=backup_count, encoding=None)
self.rotating_filehandler.setLevel(logging.DEBUG)
self.rotating_filehandler.setFormatter(LogFormatter(color=False))
_logger.addHandler(self.rotating_filehandler)
Params
------
out_log_fp: str
log file fp name doesn't include extension fn will add it
logger: logzero logger object
Returns
-------
logger: logzero logger instance
"""
now = pd.to_datetime('now', utc=True)
file_ = out_log_fp+f'_{now.date()}.log'
logfile = Path(pdir/'logs'/file_).as_posix()
check_path(logfile)
formatter = logzero.LogFormatter(fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
logzero.setup_default_logger(logfile=logfile, formatter=formatter)
return logger