Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.init_selectors(entity_ids=entity_ids, entity_type=self.entity_type, exchanges=self.exchanges,
codes=self.codes, start_timestamp=self.start_timestamp, end_timestamp=self.end_timestamp)
self.selectors_comparator = self.init_selectors_comparator()
self.trading_level_asc = list(set([IntervalLevel(selector.level) for selector in self.selectors]))
self.trading_level_asc.sort()
self.trading_level_desc = list(self.trading_level_asc)
self.trading_level_desc.reverse()
self.targets_slot: TargetsSlot = TargetsSlot()
self.session = get_db_session('zvt', 'business')
trader = get_trader(session=self.session, trader_name=self.trader_name, return_type='domain', limit=1)
if trader:
self.logger.warning("trader:{} has run before,old result would be deleted".format(self.trader_name))
self.session.query(business.Trader).filter(business.Trader.trader_name == self.trader_name).delete()
self.session.commit()
self.on_start()
def load_traders():
global trader_domains
global trader_names
trader_domains = get_trader(return_type='domain')
for trader in trader_domains:
account_readers.append(AccountReader(trader_names=[trader.trader_name], level=trader.level))
order_readers.append(OrderReader(trader_names=[trader.trader_name]))
trader_names = [item.trader_name for item in trader_domains]
import dash
import dash_core_components as dcc
import dash_daq as daq
import dash_html_components as html
from dash.dependencies import Input, Output
from zvt.api.business import get_trader
from zvt.api.technical import get_current_price
from zvt.charts.dcc_components import get_trader_detail_figures
from zvt.domain import TradingLevel
from zvt.reader.business_reader import AccountReader, OrderReader
trader = 'singlecointrader'
security = 'coin_binance_EOS/USDT'
trader_domain = get_trader(trader_name=trader, return_type='domain')[0]
account_reader = AccountReader(trader_names=['singlecointrader'], level=TradingLevel.LEVEL_1MIN)
order_reader = OrderReader(trader_names=['singlecointrader'])
def serve_layout():
layout = html.Div(
[
daq.LEDDisplay(
id='current_price',
label="EOS/USDT current price",
value=get_current_price(security_list=[security]),
color="#FF5E5E"
),
html.Div(id='trader-details', children=get_trader_detail_figures(trader_domain=trader_domain,
account_reader=None,
def load_traders():
global trader_domains
global trader_names
trader_domains = get_trader(return_type='domain')
for trader in trader_domains:
account_readers.append(AccountReader(trader_names=[trader.trader_name], level=trader.level))
order_readers.append(OrderReader(trader_names=[trader.trader_name]))
trader_names = [item.trader_name for item in trader_domains]