Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_indices():
df = get_indices(provider='sina', block_category=StockCategory.industry)
print(df)
df = get_indices(provider='eastmoney', block_category=StockCategory.industry)
print(df)
def test_get_indices():
df = get_indices(provider='sina', block_category=StockCategory.industry)
print(df)
df = get_indices(provider='eastmoney', block_category=StockCategory.industry)
print(df)
bid = orderbook['bids'][0][0] if len(orderbook['bids']) > 0 else None
ask = orderbook['asks'][0][0] if len(orderbook['asks']) > 0 else None
entity_id = f'coin_{exchange}_{code}'
result[entity_id] = (bid, ask)
return result
if __name__ == '__main__':
money_flow_session = get_db_session(provider='sina', data_schema=IndexMoneyFlow)
entities = get_entities(entity_type='index',
return_type='domain', provider='sina',
# 只抓概念和行业
filters=[Index.category.in_(
[StockCategory.industry.value, StockCategory.concept.value])])
for entity in entities:
sql = 'UPDATE index_money_flow SET name="{}" where code="{}"'.format(
entity.name, entity.code)
money_flow_session.execute(sql)
money_flow_session.commit()
tmp_str = resp.text
json_str = tmp_str[tmp_str.index('{'):tmp_str.index('}') + 1]
tmp_json = json.loads(json_str)
for code in tmp_json:
name = tmp_json[code].split(',')[1]
id = 'index_cn_{}'.format(code)
if id in self.index_ids:
continue
self.session.add(Index(id=id, entity_type='index', exchange='cn', code=code, name=name,
category=category.value))
self.session.commit()
indices = get_entities(session=self.session, entity_type='index',
return_type='domain', filters=[
Index.category.in_([StockCategory.industry.value, StockCategory.concept.value])],
provider=self.provider)
for index_item in indices:
for page in range(1, 5):
resp = requests.get(self.category_stocks_url.format(page, index_item.code))
try:
if resp.text == 'null' or resp.text is None:
break
category_jsons = demjson.decode(resp.text)
the_list = []
for category in category_jsons:
stock_code = category['code']
stock_id = china_stock_code_to_id(stock_code)
index_id = index_item.id
the_list.append({
'id': '{}_{}'.format(index_id, stock_id),
results = json_callback_param(resp.text)
for result in results:
items = result.split(',')
code = items[1]
name = items[2]
id = 'index_cn_{}'.format(code)
if id in self.index_ids:
continue
self.session.add(Index(id=id, entity_id=id, entity_type='index', exchange='cn', code=code, name=name,
category=category.value))
self.session.commit()
indices = get_entities(session=self.session, entity_type='index',
return_type='domain', filters=[
Index.category.in_(
[StockCategory.concept.value, StockCategory.industry.value])],
provider=self.provider)
for index_item in indices:
resp = requests.get(self.category_stocks_url.format(index_item.code, '1'))
try:
results = json_callback_param(resp.text)
the_list = []
for result in results:
items = result.split(',')
stock_code = items[1]
stock_id = china_stock_code_to_id(stock_code)
index_id = index_item.id
the_list.append({
'id': '{}_{}'.format(index_id, stock_id),
'index_id': index_id,
'stock_id': stock_id
def generate_url(self, category, code, number):
if category == StockCategory.industry.value:
block = 0
elif category == StockCategory.concept.value:
block = 1
return self.url.format(number, block, code)
def init_entities(self):
self.entity_session = get_db_session(provider=self.entity_provider, data_schema=self.entity_schema)
self.entities = get_entities(session=self.entity_session, entity_type='index',
exchanges=self.exchanges,
codes=self.codes,
entity_ids=self.entity_ids,
return_type='domain', provider=self.provider,
# 只抓概念和行业
filters=[Index.category.in_(
[StockCategory.industry.value, StockCategory.concept.value])])
from zvdata.api import df_to_db
from zvdata.recorder import Recorder
from zvt.api.common import china_stock_code_to_id
from zvt.api.quote import get_entities
from zvt.domain import StockIndex, StockCategory
from zvt.domain.meta.stock_meta import Index
class SinaChinaStockCategoryRecorder(Recorder):
provider = 'sina'
data_schema = StockIndex
# 用于抓取行业/概念/地域列表
category_map_url = {
StockCategory.industry: 'http://vip.stock.finance.sina.com.cn/q/view/newSinaHy.php',
StockCategory.concept: 'http://money.finance.sina.com.cn/q/view/newFLJK.php?param=class'
# StockCategory.area: 'http://money.finance.sina.com.cn/q/view/newFLJK.php?param=area',
}
# 用于抓取行业包含的股票
category_stocks_url = 'http://vip.stock.finance.sina.com.cn/quotes_service/api/json_v2.php/Market_Center.getHQNodeData?page={}&num=5000&sort=symbol&asc=1&node={}&symbol=&_s_r_a=page'
def __init__(self, batch_size=10, force_update=False, sleeping_time=10) -> None:
super().__init__(batch_size, force_update, sleeping_time)
self.indices = get_entities(session=self.session, entity_type='index', exchanges=['cn'],
return_type='domain', provider=self.provider)
self.index_ids = [index_item.id for index_item in self.indices]
def run(self):
# get stock category from sina
from zvdata.api import df_to_db
from zvdata.recorder import Recorder
from zvt.api.common import china_stock_code_to_id
from zvt.api.quote import get_entities
from zvt.domain import StockIndex, StockCategory
from zvt.domain.meta.stock_meta import Index
from zvdata.utils.utils import json_callback_param
class ChinaStockCategoryRecorder(Recorder):
provider = 'eastmoney'
data_schema = StockIndex
# 用于抓取行业/概念/地域列表
category_map_url = {
StockCategory.industry: 'https://nufm.dfcfw.com/EM_Finance2014NumericApplication/JS.aspx?type=CT&cmd=C._BKHY&sty=DCRRBKCPAL&st=(ChangePercent)&sr=-1&p=1&ps=200&lvl=&cb=jsonp_F1A61014DE5E45B7A50068EA290BC918&token=4f1862fc3b5e77c150a2b985b12db0fd&_=08766',
StockCategory.concept: 'https://nufm.dfcfw.com/EM_Finance2014NumericApplication/JS.aspx?type=CT&cmd=C._BKGN&sty=DCRRBKCPAL&st=(ChangePercent)&sr=-1&p=1&ps=300&lvl=&cb=jsonp_3071689CC1E6486A80027D69E8B33F26&token=4f1862fc3b5e77c150a2b985b12db0fd&_=08251',
StockCategory.area: 'https://nufm.dfcfw.com/EM_Finance2014NumericApplication/JS.aspx?type=CT&cmd=C._BKDY&sty=DCRRBKCPAL&st=(ChangePercent)&sr=-1&p=1&ps=200&lvl=&cb=jsonp_A597D4867B3D4659A203AADE5B3B3AD5&token=4f1862fc3b5e77c150a2b985b12db0fd&_=02443'
}
# 用于抓取行业包含的股票
category_stocks_url = 'https://nufm.dfcfw.com/EM_Finance2014NumericApplication/JS.aspx?type=CT&cmd=C.{}{}&sty=SFCOO&st=(Close)&sr=-1&p=1&ps=300&cb=jsonp_B66B5BAA1C1B47B5BB9778045845B947&token=7bc05d0d4c3c22ef9fca8c2a912d779c'
def __init__(self, batch_size=10, force_update=False, sleeping_time=10) -> None:
super().__init__(batch_size, force_update, sleeping_time)
self.indices = get_entities(session=self.session, entity_type='index', exchanges=['cn'],
return_type='domain', provider=self.provider)
self.index_ids = [index_item.id for index_item in self.indices]
def run(self):
for category, url in self.category_map_url.items():