Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def validate_available_position(self, order, position):
if not self.config.available_position:
return True
if order.position_effect != POSITION_EFFECT.CLOSE:
return True
if order.side == SIDE.BUY and order.quantity > position.closable_sell_quantity:
order.mark_rejected(_(
"Order Rejected: not enough securities {order_book_id} to buy close, target sell quantity is {quantity}, sell_closable_quantity {closable}").format(
order_book_id=order.order_book_id,
quantity=order.quantity,
closable=position.closable_sell_quantity,
))
return False
elif order.side == SIDE.SELL and order.quantity > position.closable_buy_quantity:
order.mark_rejected(_(
"Order Rejected: not enough securities {order_book_id} to sell close, target sell quantity is {quantity}, buy_closable_quantity {closable}").format(
order_book_id=order.order_book_id,
quantity=order.quantity,
closable=position.closable_buy_quantity,
))
return
order_book_id = trade.order_book_id
if trade.side not in (SIDE.BUY, SIDE.SELL):
raise RuntimeError("unknown side, trade {}".format(trade))
long_positions = self._positions_dict[POSITION_DIRECTION.LONG]
short_positions = self._positions_dict[POSITION_DIRECTION.SHORT]
if trade.position_effect == POSITION_EFFECT.OPEN:
if trade.side == SIDE.BUY:
position = long_positions.get_or_create(order_book_id)
elif trade.side == SIDE.SELL:
position = short_positions.get_or_create(order_book_id)
elif trade.position_effect in (POSITION_EFFECT.CLOSE, POSITION_EFFECT.CLOSE_TODAY):
if trade.side == SIDE.BUY:
position = short_positions.get_or_create(order_book_id)
elif trade.side == SIDE.SELL:
position = long_positions.get_or_create(order_book_id)
else:
# NOTE: 股票如果没有position_effect就特殊处理
position = long_positions.get_or_create(order_book_id)
position.apply_trade(trade)
self._backward_trade_set.add(trade.exec_id)
def closable_today_sell_quantity(self):
buy_close_today_order_quantity = sum(o.unfilled_quantity for o in self.open_orders if o.side == SIDE.BUY and
o.position_effect == POSITION_EFFECT.CLOSE_TODAY)
return self.sell_today_quantity - buy_close_today_order_quantity
def _apply_trade(self, trade):
if trade.exec_id in self._backward_trade_set:
return
order_book_id = trade.order.order_book_id
position = self._positions[order_book_id]
self._total_cash -= trade.transaction_cost
if trade.order.position_effect != POSITION_EFFECT.OPEN:
self._total_cash -= trade.last_quantity * trade.last_price * position.margin_rate
else:
self._total_cash -= trade.last_quantity * trade.last_price * position.margin_rate
self._frozen_cash -= self._frozen_cash_of_trade(trade)
self._positions[order_book_id].apply_trade(trade)
self._backward_trade_set.add(trade.exec_id)
def position_effect(self):
if self._position_effect is None:
if self._side == SIDE.BUY:
return POSITION_EFFECT.OPEN
else:
return POSITION_EFFECT.CLOSE
return self._position_effect
def sell_close_order_quantity(self):
"""
[int] 卖方向挂单量
"""
return sum(order.unfilled_quantity for order in self.open_orders if order.side == SIDE.SELL and
order.position_effect in [POSITION_EFFECT.CLOSE, POSITION_EFFECT.CLOSE_TODAY])
def _get_direction(self, side, position_effect):
direction = None
if position_effect in (POSITION_EFFECT.CLOSE, POSITION_EFFECT.CLOSE_TODAY):
if side == SIDE.BUY:
direction = POSITION_DIRECTION.SHORT
elif side == SIDE.SELL:
direction = POSITION_DIRECTION.LONG
elif position_effect == POSITION_EFFECT.OPEN:
if side == SIDE.BUY:
direction = POSITION_DIRECTION.LONG
elif side == SIDE.SELL:
direction = POSITION_DIRECTION.SHORT
return direction
def apply_trade(self, trade):
if trade.exec_id in self._backward_trade_set:
return
order_book_id = trade.order_book_id
long_positions = self._positions_dict[POSITION_DIRECTION.LONG]
short_positions = self._positions_dict[POSITION_DIRECTION.SHORT]
if trade.position_effect == POSITION_EFFECT.OPEN:
if trade.side == SIDE.BUY:
position = long_positions.get_or_create(order_book_id)
elif trade.side == SIDE.SELL:
position = short_positions.get_or_create(order_book_id)
else:
raise RuntimeError("unknown side, trade {}".format(trade))
elif trade.position_effect in (POSITION_EFFECT.CLOSE, POSITION_EFFECT.CLOSE_TODAY):
if trade.side == SIDE.BUY:
position = short_positions.get_or_create(order_book_id)
elif trade.side == SIDE.SELL:
position = long_positions.get_or_create(order_book_id)
else:
raise RuntimeError("unknown side, trade {}".format(trade))
else:
# NOTE: 股票如果没有position_effect就特殊处理
position = long_positions.get_or_create(order_book_id)
def _fake_trade(order_book_id, quantity, price):
return Trade.__from_create__(0, price, abs(quantity),
SIDE.BUY if quantity > 0 else SIDE.SELL,
POSITION_EFFECT.OPEN, order_book_id)
def fast_forward(self, orders, trades=None):
# 计算 Positions
if trades:
close_trades = []
# 先处理开仓
for trade in trades:
if trade.exec_id in self._backward_trade_set:
continue
if trade.position_effect == POSITION_EFFECT.OPEN:
self._apply_trade(trade)
else:
close_trades.append(trade)
# 后处理平仓
for trade in close_trades:
self._apply_trade(trade)
# 计算 Frozen Cash
self._frozen_cash = sum(self._frozen_cash_of_order(order) for order in orders if order.is_active())