Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'Using indicator startup period: 20 ...', caplog
)
# timedifference in 5 minutes
td = ((end - start).total_seconds() // 60 // 5) + 1
assert td != len(tickerdata['UNITTEST/BTC'])
start_real = tickerdata['UNITTEST/BTC'].iloc[0, 0]
assert log_has(f'Missing data at start for pair '
f'UNITTEST/BTC, data starts at {start_real.strftime("%Y-%m-%d %H:%M:%S")}',
caplog)
# Make sure we start fresh - test missing data at end
caplog.clear()
start = arrow.get('2018-01-10T00:00:00')
end = arrow.get('2018-02-20T00:00:00')
tickerdata = history.load_data(datadir=testdatadir, timeframe='5m',
pairs=['UNITTEST/BTC'],
timerange=TimeRange('date', 'date',
start.timestamp, end.timestamp))
# timedifference in 5 minutes
td = ((end - start).total_seconds() // 60 // 5) + 1
assert td != len(tickerdata['UNITTEST/BTC'])
# Shift endtime with +5 - as last candle is dropped (partial candle)
end_real = arrow.get(tickerdata['UNITTEST/BTC'].iloc[-1, 0]).shift(minutes=5)
assert log_has(f'Missing data at end for pair '
f'UNITTEST/BTC, data ends at {end_real.strftime("%Y-%m-%d %H:%M:%S")}',
caplog)
def test_start_plot_profit_error(mocker):
args = [
"plot-profit",
"--pairs", "ETH/BTC"
]
argsp = get_args(args)
# Make sure we use no config. Details: #2241
# not resetting config causes random failures if config.json exists
argsp["config"] = []
with pytest.raises(OperationalException):
start_plot_profit(argsp)
mocker.patch('freqtrade.persistence.create_engine', lambda *args, **kwargs: engine)
# Create table using the old format
engine.execute(create_table_old)
engine.execute("create index ix_trades_is_open on trades(is_open)")
engine.execute("create index ix_trades_pair on trades(pair)")
engine.execute(insert_table_old)
# fake previous backup
engine.execute("create table trades_bak as select * from trades")
engine.execute("create table trades_bak1 as select * from trades")
# Run init to test migration
init(default_conf['db_url'], default_conf['dry_run'])
assert len(Trade.query.filter(Trade.id == 1).all()) == 1
trade = Trade.query.filter(Trade.id == 1).first()
assert trade.fee_open == fee.return_value
assert trade.fee_close == fee.return_value
assert trade.open_rate_requested is None
assert trade.close_rate_requested is None
assert trade.is_open == 1
assert trade.amount == amount
assert trade.stake_amount == default_conf.get("stake_amount")
assert trade.pair == "ETC/BTC"
assert trade.exchange == "binance"
assert trade.max_rate == 0.0
assert trade.min_rate is None
assert trade.stop_loss == 0.0
assert trade.initial_stop_loss == 0.0
assert trade.sell_reason is None
assert trade.strategy is None
# Create table using the old format
engine.execute(create_table_old)
engine.execute("create index ix_trades_is_open on trades(is_open)")
engine.execute("create index ix_trades_pair on trades(pair)")
engine.execute(insert_table_old)
# fake previous backup
engine.execute("create table trades_bak as select * from trades")
engine.execute("create table trades_bak1 as select * from trades")
# Run init to test migration
init(default_conf['db_url'], default_conf['dry_run'])
assert len(Trade.query.filter(Trade.id == 1).all()) == 1
trade = Trade.query.filter(Trade.id == 1).first()
assert trade.fee_open == fee.return_value
assert trade.fee_close == fee.return_value
assert trade.open_rate_requested is None
assert trade.close_rate_requested is None
assert trade.is_open == 1
assert trade.amount == amount
assert trade.stake_amount == default_conf.get("stake_amount")
assert trade.pair == "ETC/BTC"
assert trade.exchange == "binance"
assert trade.max_rate == 0.0
assert trade.min_rate is None
assert trade.stop_loss == 0.0
assert trade.initial_stop_loss == 0.0
assert trade.sell_reason is None
assert trade.strategy is None
assert trade.ticker_interval is None
""".format(fee=fee.return_value,
stake=default_conf.get("stake_amount"),
amount=amount
)
engine = create_engine('sqlite://')
mocker.patch('freqtrade.persistence.create_engine', lambda *args, **kwargs: engine)
# Create table using the old format
engine.execute(create_table_old)
engine.execute(insert_table_old)
# Run init to test migration
init(default_conf['db_url'], default_conf['dry_run'])
assert len(Trade.query.filter(Trade.id == 1).all()) == 1
trade = Trade.query.filter(Trade.id == 1).first()
assert trade.fee_open == fee.return_value
assert trade.fee_close == fee.return_value
assert trade.open_rate_requested is None
assert trade.close_rate_requested is None
assert trade.is_open == 1
assert trade.amount == amount
assert trade.stake_amount == default_conf.get("stake_amount")
assert trade.pair == "ETC/BTC"
assert trade.exchange == "binance"
assert trade.max_rate == 0.0
assert trade.stop_loss == 0.0
assert trade.initial_stop_loss == 0.0
assert trade.open_trade_price == trade._calc_open_trade_price()
assert log_has("trying trades_bak0", caplog)
assert log_has("Running database migration - backup available as trades_bak0", caplog)
def test_init(default_conf, mocker, caplog) -> None:
start_polling = MagicMock()
mocker.patch('freqtrade.rpc.telegram.Updater', MagicMock(return_value=start_polling))
Telegram(get_patched_freqtradebot(mocker, default_conf))
assert start_polling.call_count == 0
# number of handles registered
assert start_polling.dispatcher.add_handler.call_count > 0
assert start_polling.start_polling.call_count == 1
message_str = "rpc.telegram is listening for following commands: [['status'], ['profit'], " \
"['balance'], ['start'], ['stop'], ['forcesell'], ['forcebuy'], " \
"['performance'], ['daily'], ['count'], ['reload_conf'], ['show_config'], " \
"['stopbuy'], ['whitelist'], ['blacklist'], ['edge'], ['help'], ['version']]"
assert log_has(message_str, caplog)
def test_backtest(default_conf, fee, mocker, testdatadir) -> None:
default_conf['ask_strategy']['use_sell_signal'] = False
mocker.patch('freqtrade.exchange.Exchange.get_fee', fee)
patch_exchange(mocker)
backtesting = Backtesting(default_conf)
pair = 'UNITTEST/BTC'
timerange = TimeRange('date', None, 1517227800, 0)
data = history.load_data(datadir=testdatadir, timeframe='5m', pairs=['UNITTEST/BTC'],
timerange=timerange)
data_processed = backtesting.strategy.tickerdata_to_dataframe(data)
min_date, max_date = get_timerange(data_processed)
results = backtesting.backtest(
{
'stake_amount': default_conf['stake_amount'],
'processed': data_processed,
'max_open_trades': 10,
'position_stacking': False,
'start_date': min_date,
'end_date': max_date,
}
)
assert not results.empty
assert len(results) == 2
def load_data_test(what, testdatadir):
timerange = TimeRange.parse_timerange('1510694220-1510700340')
pair = history.load_tickerdata_file(testdatadir, timeframe='1m',
pair='UNITTEST/BTC', timerange=timerange)
datalen = len(pair)
base = 0.001
if what == 'raise':
data = [
[
pair[x][0], # Keep old dates
x * base, # But replace O,H,L,C
x * base + 0.0001,
x * base - 0.0001,
x * base,
pair[x][5], # Keep old volume
] for x in range(0, datalen)
]
def test_backtest_1min_ticker_interval(default_conf, fee, mocker, testdatadir) -> None:
default_conf['ask_strategy']['use_sell_signal'] = False
mocker.patch('freqtrade.exchange.Exchange.get_fee', fee)
patch_exchange(mocker)
backtesting = Backtesting(default_conf)
# Run a backtesting for an exiting 1min timeframe
timerange = TimeRange.parse_timerange('1510688220-1510700340')
data = history.load_data(datadir=testdatadir, timeframe='1m', pairs=['UNITTEST/BTC'],
timerange=timerange)
processed = backtesting.strategy.tickerdata_to_dataframe(data)
min_date, max_date = get_timerange(processed)
results = backtesting.backtest(
{
'stake_amount': default_conf['stake_amount'],
'processed': processed,
'max_open_trades': 1,
'position_stacking': False,
'start_date': min_date,
'end_date': max_date,
}
)
assert not results.empty
assert len(results) == 1
def test_create_cum_profit1(testdatadir):
filename = testdatadir / "backtest-result_test.json"
bt_data = load_backtest_data(filename)
# Move close-time to "off" the candle, to make sure the logic still works
bt_data.loc[:, 'close_time'] = bt_data.loc[:, 'close_time'] + DateOffset(seconds=20)
timerange = TimeRange.parse_timerange("20180110-20180112")
df = load_pair_history(pair="TRX/BTC", ticker_interval='5m',
datadir=testdatadir, timerange=timerange)
cum_profits = create_cum_profit(df.set_index('date'),
bt_data[bt_data["pair"] == 'TRX/BTC'],
"cum_profits", timeframe="5m")
assert "cum_profits" in cum_profits.columns
assert cum_profits.iloc[0]['cum_profits'] == 0
assert cum_profits.iloc[-1]['cum_profits'] == 0.0798005