Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def force_refresh_data(self):
evaluator_thread_manager = next(iter(self.evaluator_threads_manager_by_time_frame.values()))
backtesting_enabled = Backtesting.enabled(evaluator_thread_manager.get_evaluator().get_config())
if not backtesting_enabled:
time_frames = self.evaluator_threads_manager_by_time_frame.keys()
for time_frame in time_frames:
self._refresh_time_frame_data(time_frame)
def run(self):
while self.keep_running:
now = time.time()
self._refresh_data()
if not Backtesting.enabled(self.parent.get_evaluator().get_config()):
time.sleep(TimeFramesMinutes[self.parent.time_frame] * MINUTE_TO_SECONDS - (time.time() - now))
symbol = None
error = None
try:
time_frames = self.evaluator_threads_manager_by_time_frame.keys()
# sort time frames to update them in order of accuracy
time_frames = TimeFrameManager.sort_time_frames(time_frames)
if time_frames:
# figure out from an evaluator if back testing is running for this symbol
evaluator_thread_manager = next(iter(self.evaluator_threads_manager_by_time_frame.values()))
symbol = evaluator_thread_manager.get_symbol()
# test if we need to initialize backtesting features
backtesting_enabled = Backtesting.enabled(evaluator_thread_manager.get_evaluator().get_config())
if backtesting_enabled:
exchange = evaluator_thread_manager.exchange.get_exchange()
exchange.init_candles_offset(time_frames, symbol)
# init refreshed_times at 0 for each time frame
self.refreshed_times = {key: 0 for key in time_frames}
# init last refresh times at 0 for each time frame
self.time_frame_last_update = {key: 0 for key in time_frames}
while self.keep_running:
self._execute_update(symbol, exchange, time_frames, backtesting_enabled)
else:
self.logger.warning("no time frames to monitor, going to sleep.")
except Exception as e: