[update] json与numpy兼容问题,股票数据问题

This commit is contained in:
msincenselee 2021-05-19 16:23:15 +08:00
parent 4e006e59fd
commit 2ed5301b6a
12 changed files with 182 additions and 20 deletions

View File

@ -212,7 +212,7 @@ class WebsocketClient:
recv_data = gzip.decompress(recv_data) recv_data = gzip.decompress(recv_data)
data = self.unpack_data(recv_data) data = self.unpack_data(recv_data)
except ValueError as e: except ValueError as e:
print("websocket unable to parse data: " + recv_data) print("websocket unable to parse data: " + recv_data, file=sys.stderr)
raise e raise e
self._log('recv data: %s', data) self._log('recv data: %s', data)

View File

@ -204,11 +204,13 @@ class CtaEngine(BaseEngine):
def process_timer_event(self, event: Event): def process_timer_event(self, event: Event):
""" 处理定时器事件""" """ 处理定时器事件"""
all_trading = True all_trading = True
untrading_strategies = []
# 触发每个策略的定时接口 # 触发每个策略的定时接口
for strategy in list(self.strategies.values()): for strategy in list(self.strategies.values()):
strategy.on_timer() strategy.on_timer()
if not strategy.trading: if not strategy.trading:
all_trading = False all_trading = False
untrading_strategies.append(strategy.strategy_name)
dt = datetime.now() dt = datetime.now()
@ -225,6 +227,12 @@ class CtaEngine(BaseEngine):
# 推送到事件 # 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos) self.put_all_strategy_pos_event(all_strategy_pos)
else:
if len(untrading_strategies) > 0:
account_id = self.engine_config.get('accountid', '-')
msg = f'异常,{account_id}/策略{untrading_strategies}处于停止交易状态,不能推送持仓对比'
self.write_error(msg)
self.send_wechat(msg)
def process_tick_event(self, event: Event): def process_tick_event(self, event: Event):
"""处理tick到达事件""" """处理tick到达事件"""
@ -1002,6 +1010,7 @@ class CtaEngine(BaseEngine):
""" """
Init a strategy. Init a strategy.
""" """
self.write_log(f'创建独立线程执行{strategy_name} on_init()')
task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start) task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start)
self.thread_tasks.append(task) self.thread_tasks.append(task)
return True return True
@ -1011,14 +1020,13 @@ class CtaEngine(BaseEngine):
Init strategies in queue. Init strategies in queue.
""" """
try: try:
self.write_log(f"{strategy_name} => 开始执行初始化")
strategy = self.strategies[strategy_name] strategy = self.strategies[strategy_name]
if strategy.inited: if strategy.inited:
self.write_error(f"{strategy_name} => 已经完成初始化,禁止重复操作") self.write_error(f"{strategy_name} => 已经完成初始化,禁止重复操作")
return return
self.write_log(f"{strategy_name} => 开始执行初始化")
# Call on_init function of strategy # Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init) self.call_strategy_func(strategy, strategy.on_init)
@ -1048,7 +1056,7 @@ class CtaEngine(BaseEngine):
except Exception as ex: except Exception as ex:
msg = f'{strategy_name} => 执行on_init异常:{str(ex)}' msg = f'{strategy_name} => 执行on_init异常:{str(ex)}'
self.write_error(ex) self.write_error(msg)
self.send_wechat(msg) self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
@ -1077,7 +1085,7 @@ class CtaEngine(BaseEngine):
except Exception as ex: except Exception as ex:
msg = f'{strategy_name} => 执行on_start异常:{str(ex)}' msg = f'{strategy_name} => 执行on_start异常:{str(ex)}'
self.write_error(ex) self.write_error(msg)
self.send_wechat(msg) self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
@ -1112,7 +1120,7 @@ class CtaEngine(BaseEngine):
return True, f'{strategy_name}=> 成功停止' return True, f'{strategy_name}=> 成功停止'
except Exception as ex: except Exception as ex:
msg = f'{strategy_name} => 执行stop_strategy()异常:{str(ex)}' msg = f'{strategy_name} => 执行stop_strategy()异常:{str(ex)}'
self.write_error(ex) self.write_error(msg)
self.send_wechat(msg) self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
return False, f'停止策略失败{strategy_name},异常:{str(ex)}' return False, f'停止策略失败{strategy_name},异常:{str(ex)}'
@ -1170,7 +1178,7 @@ class CtaEngine(BaseEngine):
except Exception as ex: except Exception as ex:
msg = f'执行remove_strategy({strategy_name})异常:{str(ex)}' msg = f'执行remove_strategy({strategy_name})异常:{str(ex)}'
self.write_error(ex) self.write_error(msg)
self.send_wechat(msg) self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
return False, f'移除策略失败{strategy_name},异常:{str(ex)}' return False, f'移除策略失败{strategy_name},异常:{str(ex)}'
@ -1243,7 +1251,7 @@ class CtaEngine(BaseEngine):
return True, msg return True, msg
except Exception as ex: except Exception as ex:
msg = f'执行reload_strategy({strategy_name})异常:{str(ex)}' msg = f'执行reload_strategy({strategy_name})异常:{str(ex)}'
self.write_error(ex) self.write_error(msg)
self.send_wechat(msg) self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
return False, f'重启策略失败{strategy_name},异常:{str(ex)}' return False, f'重启策略失败{strategy_name},异常:{str(ex)}'
@ -1281,7 +1289,9 @@ class CtaEngine(BaseEngine):
# 保存策略数据 # 保存策略数据
strategy.sync_data() strategy.sync_data()
except Exception as ex: except Exception as ex:
self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex))) msg = u'保存策略{}数据异常:'.format(strategy_name, str(ex))
self.write_error(msg)
self.send_wechat(msg)
self.write_error(traceback.format_exc()) self.write_error(traceback.format_exc())
def clean_strategy_cache(self, strategy_name): def clean_strategy_cache(self, strategy_name):

View File

@ -513,6 +513,7 @@ class CtaFutureTemplate(CtaTemplate):
self.write_log(u'保存policy数据') self.write_log(u'保存policy数据')
self.policy.save() self.policy.save()
def save_klines_to_cache(self, kline_names: list = []): def save_klines_to_cache(self, kline_names: list = []):
""" """
保存K线数据到缓存 保存K线数据到缓存
@ -600,9 +601,10 @@ class CtaFutureTemplate(CtaTemplate):
return {} return {}
def init_policy(self): def init_policy(self):
self.write_log(u'init_policy(),初始化执行逻辑') self.write_log(f'{self.strategy_name} => init_policy(),初始化执行逻辑')
if self.policy: if self.policy:
self.policy.load() self.policy.load()
self.write_log(f'{self.strategy_name} => init_policy(),初始化执行逻辑完成')
def init_position(self): def init_position(self):
""" """

View File

@ -1251,7 +1251,7 @@ class CtaProFutureTemplate(CtaProTemplate):
dist_record['symbol'] = trade.vt_symbol dist_record['symbol'] = trade.vt_symbol
# 处理股指锁单 # 处理股指锁单
if trade.exchange == Exchange.CFFEX: if trade.exchange == Exchange.CFFEX and not self.backtesting:
if trade.direction == Direction.LONG: if trade.direction == Direction.LONG:
if abs(self.position.short_pos) >= trade.volume: if abs(self.position.short_pos) >= trade.volume:
self.position.short_pos += trade.volume self.position.short_pos += trade.volume

View File

@ -0,0 +1,18 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from vnpy.trader.constant import Direction
from vnpy.trader.object import TickData, BarData, TradeData, OrderData
from vnpy.trader.utility import BarGenerator, ArrayManager
from .engine import TradeCopyEngine, APP_NAME
class TradeCopyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "跟单软件"
engine_class = TradeCopyEngine
widget_name = "TcManager"
icon_name = "tc.ico"

View File

@ -13,6 +13,25 @@ TNS_STATUS_ORDERING = 'ordering'
TNS_STATUS_OPENED = 'opened' TNS_STATUS_OPENED = 'opened'
TNS_STATUS_CLOSED = 'closed' TNS_STATUS_CLOSED = 'closed'
import numpy as np
class MyEncoder(json.JSONEncoder):
"""
自定义转换器处理np,datetime等不能被json转换得问题
"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
else:
return super(MyEncoder, self).default(obj)
class CtaPolicy(CtaComponent): class CtaPolicy(CtaComponent):
""" """
@ -103,7 +122,7 @@ class CtaPolicy(CtaComponent):
json_data = self.to_json() json_data = self.to_json()
json_data['save_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') json_data['save_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(json_file, 'w', encoding='utf8') as f: with open(json_file, 'w', encoding='utf8') as f:
data = json.dumps(json_data, indent=4, ensure_ascii=False) data = json.dumps(json_data, indent=4, ensure_ascii=False, cls=MyEncoder)
f.write(data) f.write(data)
except IOError as ex: except IOError as ex:

View File

@ -376,13 +376,17 @@ class CtpGateway(BaseGateway):
def check_status(self): def check_status(self):
"""检查状态""" """检查状态"""
# 检查交易接口、行情接口的连接状态
if self.td_api.connect_status and self.md_api.connect_status: if self.td_api.connect_status and self.md_api.connect_status:
self.status.update({'con': True}) self.status.update({'con': True})
# 检查通达信行情接口(直接连通达信)
if self.tdx_api: if self.tdx_api:
self.tdx_api.check_status() self.tdx_api.check_status()
if self.tdx_api is None or self.md_api is None:
return False # 检查天勤行情接口
if self.tq_api:
self.tq_api.check_status()
if not self.td_api.connect_status or self.md_api.connect_status: if not self.td_api.connect_status or self.md_api.connect_status:
return False return False
@ -1600,6 +1604,10 @@ class TdxMdApi():
self.check_status() self.check_status()
def check_status(self): def check_status(self):
"""
检查通达信直连状态
:return:
"""
# self.write_log(u'检查tdx接口状态') # self.write_log(u'检查tdx接口状态')
if len(self.registered_symbol_set) == 0: if len(self.registered_symbol_set) == 0:
return return
@ -1826,6 +1834,8 @@ class SubMdApi():
self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 self.symbol_tick_dict = {} # 合约与最后一个Tick得字典
self.registed_symbol_set = set() # 订阅的合约记录集 self.registed_symbol_set = set() # 订阅的合约记录集
self.last_tick_dt = None
self.sub = None self.sub = None
self.setting = {} self.setting = {}
self.connect_status = False self.connect_status = False
@ -1852,6 +1862,42 @@ class SubMdApi():
self.gateway.write_error(traceback.format_exc()) self.gateway.write_error(traceback.format_exc())
self.connect_status = False self.connect_status = False
def check_status(self):
"""接口状态的健康检查"""
# 订阅的合约
d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())}
# 合约的最后时间
if self.last_tick_dt:
d.update({"sub_tick_time": self.last_tick_dt.strftime('%Y-%m-%d %H:%M:%S')})
if len(self.symbol_tick_dict) > 0:
dt_now = datetime.now()
hh_mm = dt_now.hour * 100 + dt_now.minute
# 期货交易时间内
if 900 <= hh_mm <= 1130 or 1300 <= hh_mm <= 1500 or hh_mm < 230 or hh_mm >= 2100:
# 未有数据到达
if self.last_tick_dt is None:
d.update({"sub_status": False, "sub_error": u"rabbitmq未有行情数据到达"})
else: # 有数据
# 超时15分钟以上
if (dt_now - self.last_tick_dt).total_seconds() > 60 * 15:
d.update({"sub_status": False,
"sub_error": u"{}rabbitmq行情数据超时15分钟以上".format(hh_mm)})
else:
d.update({"sub_status": True})
self.gateway.status.pop("sub_error", None)
# 非交易时间
else:
self.gateway.status.pop("sub_status", None)
self.gateway.status.pop("sub_error", None)
# 更新到gateway的状态中去
self.gateway.status.update(d)
def on_message(self, chan, method_frame, _header_frame, body, userdata=None): def on_message(self, chan, method_frame, _header_frame, body, userdata=None):
# print(" [x] %r" % body) # print(" [x] %r" % body)
try: try:
@ -1874,6 +1920,7 @@ class SubMdApi():
exchange=Exchange(d.get('exchange')), exchange=Exchange(d.get('exchange')),
symbol=symbol, symbol=symbol,
datetime=dt) datetime=dt)
d.pop('exchange', None) d.pop('exchange', None)
d.pop('symbol', None) d.pop('symbol', None)
tick.__dict__.update(d) tick.__dict__.update(d)
@ -1887,6 +1934,8 @@ class SubMdApi():
if tick.last_price > pre_tick.last_price * 1.2 or tick.last_price < pre_tick.last_price * 0.8: if tick.last_price > pre_tick.last_price * 1.2 or tick.last_price < pre_tick.last_price * 0.8:
return return
self.last_tick_dt = tick.datetime
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick) self.gateway.on_custom_tick(tick)
@ -2016,6 +2065,11 @@ class TqMdApi():
self.update_thread = Thread(target=self.update) self.update_thread = Thread(target=self.update)
self.update_thread.start() self.update_thread.start()
def check_status(self):
"""检查接口状态"""
pass
def generate_tick_from_quote(self, vt_symbol, quote) -> TickData: def generate_tick_from_quote(self, vt_symbol, quote) -> TickData:
""" """
生成TickData 生成TickData

View File

@ -1167,7 +1167,7 @@ class TqMdApi():
return return
try: try:
from tqsdk import TqApi from tqsdk import TqApi
self.api = TqApi(_stock=True, url="wss://u.shinnytech.com/t/nfmd/front/mobile") self.api = TqApi(_stock=True, url="wss://api.shinnytech.com/t/nfmd/front/mobile")
except Exception as e: except Exception as e:
self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e))) self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e)))
self.gateway.write_log(traceback.format_exc()) self.gateway.write_log(traceback.format_exc())

View File

@ -2105,7 +2105,7 @@ class TqMdApi():
return return
try: try:
from tqsdk import TqApi from tqsdk import TqApi
self.api = TqApi(_stock=True, url="wss://u.shinnytech.com/t/nfmd/front/mobile") self.api = TqApi(_stock=True, url="wss://api.shinnytech.com/t/nfmd/front/mobile")
except Exception as e: except Exception as e:
self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e))) self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e)))
self.gateway.write_log(traceback.format_exc()) self.gateway.write_log(traceback.format_exc())

View File

@ -1088,7 +1088,7 @@ class RohonTdApi(TdApi):
price=data["LimitPrice"], price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"], volume=data["VolumeTotalOriginal"],
traded=data["VolumeTraded"], traded=data["VolumeTraded"],
status=STATUS_ROHON2VT[data["OrderStatus"]], status=STATUS_ROHON2VT.get(data["OrderStatus"],Status.UNKNOWN),
time=data["InsertTime"], time=data["InsertTime"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )

View File

@ -1,5 +1,6 @@
import traceback import traceback
import json import json
from copy import deepcopy
from uuid import uuid1 from uuid import uuid1
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Thread from threading import Thread
@ -84,6 +85,16 @@ class StockRpcGateway(BaseGateway):
self.query_all() self.query_all()
def check_status(self):
if self.client:
pass
if self.rabbit_api:
self.rabbit_api.check_status()
return True
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""行情订阅""" """行情订阅"""
self.write_log(f'创建订阅任务=> rabbitMQ') self.write_log(f'创建订阅任务=> rabbitMQ')
@ -114,6 +125,8 @@ class StockRpcGateway(BaseGateway):
task.close() task.close()
# gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") # gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
# self.client.subscribe(req, gateway_name) # self.client.subscribe(req, gateway_name)
if self.rabbit_api:
self.rabbit_api.registed_symbol_set.add(req.vt_symbol)
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
""" """
@ -161,7 +174,8 @@ class StockRpcGateway(BaseGateway):
for position in positions: for position in positions:
position.gateway_name = self.gateway_name position.gateway_name = self.gateway_name
# 更换 vt_positionid得gateway前缀 # 更换 vt_positionid得gateway前缀
position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.',
f'{self.gateway_name}.')
# 更换 vt_accountid得gateway前缀 # 更换 vt_accountid得gateway前缀
position.vt_accountid = position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') position.vt_accountid = position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.')
@ -205,6 +219,8 @@ class StockRpcGateway(BaseGateway):
if event.type == EVENT_TICK: if event.type == EVENT_TICK:
return return
event = deepcopy(event)
data = event.data data = event.data
if hasattr(data, "gateway_name"): if hasattr(data, "gateway_name"):
@ -225,7 +241,7 @@ class StockRpcGateway(BaseGateway):
if hasattr(data, 'vt_positionid'): if hasattr(data, 'vt_positionid'):
data.vt_positionid = data.vt_positionid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') data.vt_positionid = data.vt_positionid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
if event.type in [EVENT_ORDER,EVENT_TRADE]: if event.type in [EVENT_ORDER, EVENT_TRADE]:
self.write_log(f'{self.remote_gw_name} => {self.gateway_name} event:{data.__dict__}') self.write_log(f'{self.remote_gw_name} => {self.gateway_name} event:{data.__dict__}')
self.event_engine.put(event) self.event_engine.put(event)
@ -242,12 +258,48 @@ class SubMdApi():
self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 self.symbol_tick_dict = {} # 合约与最后一个Tick得字典
self.registed_symbol_set = set() # 订阅的合约记录集 self.registed_symbol_set = set() # 订阅的合约记录集
self.last_tick_dt = None
self.sub = None self.sub = None
self.setting = {} self.setting = {}
self.connect_status = False self.connect_status = False
self.thread = None # 用线程运行所有行情接收 self.thread = None # 用线程运行所有行情接收
def check_status(self):
"""接口状态的健康检查"""
# 订阅的合约
d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())}
# 合约的最后时间
if self.last_tick_dt:
d.update({"sub_tick_time": self.last_tick_dt.strftime('%Y-%m-%d %H:%M:%S')})
if len(self.symbol_tick_dict) > 0:
dt_now = datetime.now()
hh_mm = dt_now.hour * 100 + dt_now.minute
# 期货交易时间内
if 930 <= hh_mm <= 1130 or 1301 <= hh_mm <= 1500:
# 未有数据到达
if self.last_tick_dt is None:
d.update({"sub_status": False, "sub_error": u"rabbitmq未有行情数据到达"})
else: # 有数据
# 超时5分钟以上
if (dt_now - self.last_tick_dt).total_seconds() > 60 * 5:
d.update({"sub_status": False,
"sub_error": u"{}rabbitmq行情数据超时5分钟以上".format(hh_mm)})
else:
d.update({"sub_status": True})
self.gateway.status.pop("sub_error", None)
# 非交易时间
else:
self.gateway.status.pop("sub_status", None)
self.gateway.status.pop("sub_error", None)
# 更新到gateway的状态中去
self.gateway.status.update(d)
def connect(self, setting={}): def connect(self, setting={}):
"""连接""" """连接"""
self.setting = setting self.setting = setting
@ -295,6 +347,7 @@ class SubMdApi():
self.symbol_tick_dict[symbol] = tick self.symbol_tick_dict[symbol] = tick
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
self.last_tick_dt = tick.datetime
except Exception as ex: except Exception as ex:
self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex))) self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex)))

View File

@ -209,6 +209,12 @@ class BaseGateway(ABC):
self.write_log(msg, level=ERROR, on_log=True) self.write_log(msg, level=ERROR, on_log=True)
print(msg, file=sys.stderr) print(msg, file=sys.stderr)
def check_status(self) -> bool:
"""
check gateway connection or market data status.
"""
return False
@abstractmethod @abstractmethod
def connect(self, setting: dict) -> None: def connect(self, setting: dict) -> None:
""" """