Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_standalone_backtesting_bot(config, data_files):
config_to_use = create_blank_config_using_loaded_one(config)
config_to_use[CONFIG_CRYPTO_CURRENCIES] = {}
config_to_use[CONFIG_BACKTESTING][CONFIG_BACKTESTING_DATA_FILES] = []
# do not activate web interface on standalone backtesting bot
WebService.enable(config_to_use, False)
ignored_files = []
reference_market = _get_reference_market(data_files)
if DEFAULT_REFERENCE_MARKET != reference_market:
_switch_reference_market(config_to_use, reference_market)
if data_files:
for data_file_to_use in data_files:
_, file_symbol, _, _ = interpret_file_name(data_file_to_use)
currency, _ = split_symbol(file_symbol)
full_file_path = CONFIG_DATA_COLLECTOR_PATH + data_file_to_use
ending = f".{full_file_path.split('.')[-1]}"
full_file_path += full_file_path if not is_valid_ending(ending) else ""
if currency not in config_to_use[CONFIG_CRYPTO_CURRENCIES]:
config_to_use[CONFIG_CRYPTO_CURRENCIES][currency] = {CONFIG_CRYPTO_PAIRS: []}
if file_symbol not in config_to_use[CONFIG_CRYPTO_CURRENCIES][currency][CONFIG_CRYPTO_PAIRS]:
config_to_use[CONFIG_CRYPTO_CURRENCIES][currency][CONFIG_CRYPTO_PAIRS].append(file_symbol)
config_to_use[CONFIG_BACKTESTING][CONFIG_BACKTESTING_DATA_FILES].append(full_file_path)
else:
ignored_files.append(data_file_to_use)
return create_backtesting_bot(config_to_use), ignored_files
def _babysit(self):
"""Monitor open positions. Check if any have closed.
"""
for otid in self.open_trade_ids:
if broker.is_closed(otid):
self.open_trade_ids.remove(otid) # mark status
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def convert_result_into_dict(index, evaluators, time_frames, risk, score, trades):
return {
TestSuiteResult.INDEX: index,
TestSuiteResult.EVALUATORS: evaluators,
TestSuiteResult.TIME_FRAMES: time_frames,
TestSuiteResult.RISK: risk,
TestSuiteResult.SCORE: score,
TestSuiteResult.AVERAGE_TRADES: trades,
}
def get_test_suite_result(self):
return TestSuiteResult(self._profitability_results,
self._trades_counts,
self.config[CONFIG_TRADING][CONFIG_TRADER_RISK],
self.config[CONFIG_FORCED_TIME_FRAME],
self.config[CONFIG_FORCED_EVALUATOR],
self.strategy_evaluator_class.get_name())
def get_report(self):
# index, evaluators, risk, score, trades
if self.sorted_results_through_all_time_frame:
results = [TestSuiteResult.convert_result_into_dict(rank, result[CONFIG].get_evaluators(), "",
result[CONFIG].get_risk(), result[RANK],
round(result[TRADES_IN_RESULT], 5))
for rank, result in enumerate(self.sorted_results_through_all_time_frame[0:100])]
else:
results = []
return results