Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_000778_spo_detial():
result = fundamental.get_spo_detail(session=session, provider=Provider.EASTMONEY, return_type='domain',
codes=['000778'], end_timestamp='2018-09-30',
order=SPODetail.timestamp.desc())
assert len(result) == 4
latest: SPODetail = result[0]
assert latest.timestamp == to_pd_timestamp('2017-04-01')
assert latest.spo_issues == 347600000
assert latest.spo_price == 5.15
assert latest.spo_raising_fund == 1766000000
def test_000778_dividend_financing():
result = fundamental.get_dividend_financing(session=session, provider=Provider.EASTMONEY, return_type='domain',
codes=['000778'], end_timestamp='2018-09-30',
order=DividendFinancing.timestamp.desc())
assert len(result) == 22
latest: DividendFinancing = result[1]
assert latest.timestamp == to_pd_timestamp('2017')
assert latest.dividend_money == 598632026.4
assert latest.spo_issues == 347572815.0
assert latest.rights_issues == 0
assert latest.ipo_issues == 0
def test_000778_rights_issue_detail():
result = fundamental.get_rights_issue_detail(session=session, provider=Provider.EASTMONEY, return_type='domain',
codes=['000778'], end_timestamp='2018-09-30',
order=RightsIssueDetail.timestamp.desc())
assert len(result) == 2
latest: RightsIssueDetail = result[0]
assert latest.timestamp == to_pd_timestamp('2001-09-10')
assert latest.rights_issues == 43570000
assert latest.rights_raising_fund == 492300000
assert latest.rights_issue_price == 11.3
def __init__(self, security_id, trading_level, timestamp, trader_name, history_size=250) -> None:
self.security_id = security_id
self.trading_level = trading_level
self.current_timestamp = trading_level.floor_timestamp(to_pd_timestamp(timestamp))
self.model_name = "{}_{}_{}".format(trader_name, type(self).__name__, trading_level.value)
self.history_size = history_size
self.add_trading_signal_listener(SimAccountService(trader_name=trader_name, model_name=self.model_name,
timestamp=timestamp))
self.close_hour, self.close_minute = get_close_time(self.security_id)
def on_finish(self, security_item):
kdatas = get_kdata(security_id=security_item.id, level=self.level.value, order=StockDayKdata.timestamp.asc(),
return_type='domain',
session=self.session,
filters=[StockDayKdata.hfq_close.is_(None),
StockDayKdata.timestamp >= to_pd_timestamp('2005-01-01')])
if kdatas:
start = kdatas[0].timestamp
end = kdatas[-1].timestamp
# get hfq from joinquant
df = get_price(to_jq_security_id(security_item), start_date=to_time_str(start), end_date=now_time_str(),
frequency='daily',
fields=['factor', 'open', 'close', 'low', 'high'],
skip_paused=True, fq='post')
if df is not None and not df.empty:
# fill hfq data
for kdata in kdatas:
if kdata.timestamp in df.index:
kdata.hfq_open = df.loc[kdata.timestamp, 'open']
kdata.hfq_close = df.loc[kdata.timestamp, 'close']
kdata.hfq_high = df.loc[kdata.timestamp, 'high']
the_id = self.generate_domain_id(security_item, original_data)
items = get_data(data_schema=self.data_schema, session=self.session, provider=self.provider,
security_id=security_item.id,
filters=[self.data_schema.id == the_id],
return_type='domain')
if items and not self.force_update:
self.logger.info('ignore the data {}:{} saved before'.format(self.data_schema, the_id))
return None
if not items:
timestamp_str = original_data[self.get_timestamp_field()]
timestamp = None
try:
timestamp = to_pd_timestamp(timestamp_str)
except Exception as e:
self.logger.exception(e)
domain_item = self.data_schema(id=the_id,
code=security_item.code,
security_id=security_item.id,
timestamp=timestamp)
else:
domain_item = items[0]
fill_domain_from_dict(domain_item, original_data, self.get_data_map())
return domain_item
enum_value = lambda x: [e.value for e in x]
COIN_EXCHANGES = ["binance", "huobipro", "okex"]
# COIN_BASE = ["BTC", "ETH", "XRP", "BCH", "EOS", "LTC", "XLM", "ADA", "IOTA", "TRX", "NEO", "DASH", "XMR",
# "BNB", "ETC", "QTUM", "ONT"]
COIN_BASE = ["BTC", "ETH", "EOS"]
COIN_PAIRS = [("{}/{}".format(item, "USDT")) for item in COIN_BASE] + \
[("{}/{}".format(item, "USD")) for item in COIN_BASE]
if __name__ == '__main__':
# print(provider_map_category.get('eastmoney'))
print(TradingLevel.LEVEL_1HOUR.count_from_timestamp(to_pd_timestamp('2019-05-31'), 240))
def init_history_data(self, history_data):
self.history_data = pd.DataFrame(history_data)
self.history_data = index_df_with_time(self.history_data)
self.current_timestamp = to_pd_timestamp(history_data[-1]['timestamp'])
self.current_data = history_data[-1]
def evaluate_start_end_size_timestamps(self, security_item):
the_timestamps = self.security_timestamps_map.get(security_item.id)
if not the_timestamps:
self.init_timestamps(security_item)
the_timestamps = self.security_timestamps_map.get(security_item.id)
if not the_timestamps:
self.logger.exception("could not get time series for:{}".format(security_item.id))
assert False
timestamps = [to_pd_timestamp(t) for t in the_timestamps]
timestamps.sort()
self.logger.info(
'security_id:{},init timestamps start:{},end:{}'.format(security_item.id, timestamps[0], timestamps[-1]))
latest_record = get_data(security_id=security_item.id,
provider=self.provider,
data_schema=self.data_schema,
order=self.data_schema.timestamp.desc(), limit=1,
return_type='domain',
session=self.session)
if latest_record:
self.logger.info('latest record timestamp:{}'.format(latest_record[0].timestamp))
timestamps = [t for t in timestamps if t > latest_record[0].timestamp]