Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
df.index.name = 'date'
chunkstore_lib.write('data', df, audit={'user': 'test_user'})
df = DataFrame(data={'data': np.random.randint(0, 100, size=10)},
index=pd.date_range('2016-01-01', '2016-01-10'))
df.index.name = 'date'
chunkstore_lib.write('data', df, audit={'user': 'other_user'})
assert(len(chunkstore_lib.read_audit_log()) == 2)
assert(len(chunkstore_lib.read_audit_log(symbol='data')) == 2)
assert(len(chunkstore_lib.read_audit_log(symbol='none')) == 0)
chunkstore_lib.append('data', df, audit={'user': 'test_user'})
assert(chunkstore_lib.read_audit_log()[-1]['appended_rows'] == 10)
df = DataFrame(data={'data': np.random.randint(0, 100, size=5)},
index=pd.date_range('2017-01-01', '2017-01-5'))
df.index.name = 'date'
chunkstore_lib.update('data', df, audit={'user': 'other_user'})
assert(chunkstore_lib.read_audit_log()[-1]['new_chunks'] == 5)
chunkstore_lib.rename('data', 'data_new', audit={'user': 'temp_user'})
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol rename')
chunkstore_lib.delete('data_new', chunk_range=DateRange('2016-01-01', '2016-01-02'), audit={'user': 'test_user'})
chunkstore_lib.delete('data_new', audit={'user': 'test_user'})
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol delete')
assert(chunkstore_lib.read_audit_log()[-2]['action'] == 'range delete')
def test_strategybase_tree_allocate_long_short():
c1 = SecurityBase('c1')
c2 = SecurityBase('c2')
s = StrategyBase('p', [c1, c2])
c1 = s['c1']
c2 = s['c2']
dts = pd.date_range('2010-01-01', periods=3)
data = pd.DataFrame(index=dts, columns=['c1', 'c2'], data=100)
data['c1'][dts[1]] = 105
data['c2'][dts[1]] = 95
s.setup(data)
i = 0
s.update(dts[i], data.ix[dts[i]])
s.adjust(1000)
c1.allocate(500)
assert c1.position == 5
assert c1.value == 500
assert c1.weight == 500.0 / 1000
assert s.capital == 1000 - 500
def test_select_has_data_preselected():
algo = algos.SelectHasData(min_count=3, lookback=pd.DateOffset(days=3))
s = bt.Strategy('s')
dts = pd.date_range('2010-01-01', periods=3)
data = pd.DataFrame(index=dts, columns=['c1', 'c2'], data=100.)
data['c1'].ix[dts[0]] = np.nan
data['c1'].ix[dts[1]] = np.nan
s.setup(data)
s.update(dts[2])
s.temp['selected'] = ['c1']
assert algo(s)
selected = s.temp['selected']
assert len(selected) == 0
method : str
Method to use for daily locations, one of 'last' or 'most-common'
subscriber_subset : dict or None
Subset of subscribers to retrieve modal locations for. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.
Returns
-------
dict
Dict which functions as the query specification
"""
dates = [
d.strftime("%Y-%m-%d")
for d in pd.date_range(start_date, end_date, freq="D", closed="left")
]
daily_locations = [
daily_location_spec(
date=date,
aggregation_unit=aggregation_unit,
method=method,
subscriber_subset=subscriber_subset,
mapping_table=mapping_table,
geom_table=geom_table,
geom_table_join_column=geom_table_join_column,
)
for date in dates
]
return modal_location_spec(locations=daily_locations)
def get_missing_dates_for_id(store_filename: str, system_id: int) -> List:
if not os.path.exists(store_filename):
return []
with pd.HDFStore(store_filename, mode='r') as store:
missing_dates_for_id = store.select(
key='missing_dates',
where='index=system_id',
columns=[
'missing_start_date_PV_localtime',
'missing_end_date_PV_localtime'])
missing_dates = []
for _, row in missing_dates_for_id.iterrows():
missing_date_range = pd.date_range(
row['missing_start_date_PV_localtime'],
row['missing_end_date_PV_localtime'],
freq='D').tolist()
missing_dates.extend(missing_date_range)
missing_dates = np.sort(np.unique(missing_dates))
missing_dates = datetime_list_to_dates(missing_dates)
print()
_LOG.info(
'system_id %d: %d missing dates already found',
system_id,
len(missing_dates))
return missing_dates
def __call__(self):
df = pandas.DataFrame(
numpy.random.randn(1000, 4),
index=pandas.date_range('1/1/2000', periods=1000),
columns=list('ABCD')).cumsum()
return df
def normalize_data(self, trend_data):
df = pd.DataFrame(index=pd.date_range(self.START, self.END))
if len(trend_data) == 0:
self.log.critical("No trend data found for {}".format(self.columns))
raise ValueError("No trend data to normalize")
# https://github.com/anyuzx/bitcoin-google-trend-strategy/blob/master/bitcoin_google_trend_strategy.py
renorm_factor = 1.0
for c in self.columns:
last_entry = 0
trend_array = []
for i, frame in enumerate(trend_data[::-1]):
if frame.empty:
self.log.critical(
"Trend Dataframe empty for {}: {}-{}".format(
c, self.START.date(), self.END.date()
)
)
----------
train_index
Pandas DatetimeIndex
prediction_length
prediction length
custom_features
shape: (num_custom_features, train_length + pred_length)
Returns
-------
a tuple of (training, prediction) feature tensors
shape: (num_features, train_length/pred_length)
"""
train_length = len(train_index)
full_time_index = pd.date_range(
train_index.min(),
periods=train_length + prediction_length,
freq=train_index.freq,
)
# Default feature map for both seasonal and non-seasonal models.
if self._is_exp_kernel():
# Default time index features: index of the time point
# [0, train_length + pred_length - 1]
features = np.expand_dims(
np.array(range(len(full_time_index))), axis=0
)
# Rescale time index features into the range: [-0.5, 0.5]
# similar to the seasonal features
# (see gluonts.time_feature)
def _get_actions(old_ts, new_ts, action_times):
# calculates the actions between two datetimes and returns them as
# ordered pandas.Series, filters out weekends since assumption is
# nothing happens here. This could be extended to allow more advanced
# user defined filtering based on things such as holiday calendars.
# action_times is a list of tuples with Timedelta and string for action
# type
if not action_times:
return pd.Series([])
timestamps = pd.date_range(old_ts, new_ts, normalize=True)
wknds = (timestamps.dayofweek == 5) + (timestamps.dayofweek == 6)
timestamps = timestamps[~wknds]
actions = []
for ts, ac_type in action_times:
ac_ts = timestamps + ts
ac_ts = ac_ts[ac_ts > old_ts]
ac_ts = ac_ts[ac_ts <= new_ts]
# this will give an empty DataFrame is ac_ts is an empty
# DateTimeIndex resulting in no actions as desired
actions.append(pd.Series(ac_type, index=ac_ts))
actions = pd.concat(actions, axis=0)
actions.sort_index(inplace=True)
return actions
time = [pd.Timestamp(x).to_pydatetime() for x in pd.date_range('2012-10-01T00:00:00', periods=365)]
strategy = np.linspace(1, 25, 365)
benchmark = np.linspace(2, 26, 365)
backtest = [time, strategy, time, benchmark]
time = [pd.Timestamp(x).to_pydatetime() for x in pd.date_range('2013-10-01T00:00:00', periods=50)]
strategy = np.linspace(25, 29, 50)
benchmark = np.linspace(26, 30, 50)
live = [time, strategy, time, benchmark]
result = charts.GetCumulativeReturns()
result = charts.GetCumulativeReturns(backtest)
result = charts.GetCumulativeReturns(backtest, live)
## Test GetDailyReturnsPlot
time = [pd.Timestamp(x).to_pydatetime() for x in pd.date_range('2012-10-01T00:00:00', periods=365)]
data = list(np.random.normal(0, 1, 365))
backtest = [time, data]
time = [pd.Timestamp(x).to_pydatetime() for x in pd.date_range('2013-10-01T00:00:00', periods=120)]
data = list(np.random.normal(0.5, 1.5, 120))
live = [time, data]
empty = [[], []]
result = charts.GetDailyReturns(empty, empty)
result = charts.GetDailyReturns(backtest, empty)
result = charts.GetDailyReturns(backtest, live)
## Test GetMonthlyReturnsPlot
backtest = {'2016': [0.5, 0.7, 0.2, 0.23, 1.3, 1.45, 1.67, -2.3, -0.5, 1.23, 1.23, -3.5],
'2017': [0.5, 0.7, 0.2, 0.23, 1.3, 1.45, 1.67, -2.3, -0.5, 1.23, 1.23, -3.5][::-1]}