diff --git a/requirements.txt b/requirements.txt index dc8a453a..633d9ad7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ websocket-client msgpack-python qdarkstyle SortedContainers -futuquant \ No newline at end of file +futuquant +snappy \ No newline at end of file diff --git a/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py b/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py index 776b1ef7..7b9b9ff3 100644 --- a/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py +++ b/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py @@ -13,7 +13,6 @@ from __future__ import division -#from vnpy.trader.vtObject import VtBarData from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, BarManager, @@ -29,7 +28,7 @@ class DoubleMaStrategy(CtaTemplate): # 策略参数 fastWindow = 10 # 快速均线参数 slowWindow = 60 # 慢速均线参数 - initDays = 10 # 初始化数据所用的天数 + initDays = 10 # 初始化数据所用的天数 # 策略变量 fastMa0 = EMPTY_FLOAT # 当前最新的快速EMA diff --git a/vnpy/trader/app/jaqsService/JS_setting.json b/vnpy/trader/app/jaqsService/JS_setting.json new file mode 100644 index 00000000..f08c2ebb --- /dev/null +++ b/vnpy/trader/app/jaqsService/JS_setting.json @@ -0,0 +1,4 @@ +{ + "host": "127.0.0.1", + "port": 88888 +} \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/__init__.py b/vnpy/trader/app/jaqsService/__init__.py new file mode 100644 index 00000000..f0bf3d26 --- /dev/null +++ b/vnpy/trader/app/jaqsService/__init__.py @@ -0,0 +1,10 @@ +# encoding: UTF-8 + +from jsEngine import JsEngine +from uiJsWidget import JsEngineManager + +appName = 'JaqsService' +appDisplayName = u'Jaqs服务' +appEngine = JsEngine +appWidget = JsEngineManager +appIco = 'js.ico' \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/jrpc_server.py b/vnpy/trader/app/jaqsService/jrpc_server.py new file mode 100644 index 00000000..d49037c1 --- /dev/null +++ b/vnpy/trader/app/jaqsService/jrpc_server.py @@ -0,0 +1,212 @@ +import zmq +import Queue +import threading +import msgpack +import snappy +import traceback +import time + +def _unpack(str) : + + if str[0] == 'S': + tmp = snappy.uncompress(str[1:]) + obj = msgpack.loads(tmp) + elif str[0] == '\0': + obj = msgpack.loads(str[1:]) + else: + return None + + #print "UNPACK", obj + return obj + +def _pack(obj) : +# print "PACK", obj + tmp = msgpack.dumps(obj) + if len(tmp) > 1000: + return 'S' + snappy.compress(tmp) + else: + return '\0' + tmp + +class JRpcServer : + + def __init__(self) : + self._waiter_lock = threading.Lock() + self._waiter_map = {} + + self._should_close = False + self._send_lock = threading.Lock() + + self._callback_queue = Queue.Queue() + + self._ctx = zmq.Context() + self._pull_sock = self._ctx.socket(zmq.PULL) + self._pull_sock.bind("inproc://pull_sock") + self._push_sock = self._ctx.socket(zmq.PUSH) + self._push_sock.connect("inproc://pull_sock") + + self.on_call = None + + t = threading.Thread(target=self._recv_run) + t.setDaemon(True) + t.start() + + t = threading.Thread(target=self._callback_run) + t.setDaemon(True) + t.start() + + def __del__(self): + self.close() + +# def set_on_call(self, on_call): +# """def on_call(client_id, req_msg)""" +# self._on_call = on_call + + def _recv_run(self): + + + poller = zmq.Poller() + poller.register(self._pull_sock, zmq.POLLIN) + + remote_sock = None + + #client_addr_map = {} + + while not self._should_close: + + try: + socks = dict(poller.poll(500)) + if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN: + msgs = self._pull_sock.recv_multipart() + if len(msgs) == 2: + if remote_sock: + # [addr, data] + #print "send data", msgs[0], msgs[1] + remote_sock.send_multipart(msgs) + elif len(msgs) == 1: + cmd = msgs[0] + if cmd == "LISTEN": + if remote_sock: + poller.unregister(remote_sock) + remote_sock.close() + remote_sock = None + + remote_sock = self._do_listen() + + if remote_sock : + poller.register(remote_sock, zmq.POLLIN) + + elif cmd == "CLOSE": + self._should_close = True + break + + if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN: + msgs = remote_sock.recv_multipart() + if len(msgs) == 2: + identity = msgs[0] + data = msgs[1] + #client_id = identity.split('$') + #client_addr_map[client_id] = identity + self._on_data_arrived(identity, data) + + except zmq.error.Again, e: + #print "RECV timeout: ", e + pass + except Exception, e: + print("_recv_run:", e) + + def _callback_run(self): + while not self._should_close: + try: + r = self._callback_queue.get(timeout = 1) + if r : + r() + except Queue.Empty, e: + pass + + except Exception, e: + traceback.print_exc(e) + print "_callback_run", type(e), e + + def _async_call(self, func): + self._callback_queue.put( func ) + + def listen(self, addr) : + self._addr = addr + self._push_sock.send("LISTEN") + + + def _do_listen(self): + + socket = self._ctx.socket(zmq.ROUTER) + socket.setsockopt(zmq.RCVTIMEO, 1000) + socket.setsockopt(zmq.SNDTIMEO, 1000) + socket.setsockopt(zmq.LINGER, 0) + socket.bind(self._addr) + + return socket + + def close(self): + self._should_close = True + self.push_sock.send("CLOSE") + + def _send(self, data, addr): + self._send_lock.acquire() + + #self._push_sock.send(addr, flags=zmq.SNDMORE) + #self._push_sock.send(data) + self._push_sock.send_multipart([addr, data]) + + self._send_lock.release() + + def _on_data_arrived(self, identity, data): + try: + msg = _unpack(data) + #print "RECV", msg + + if not msg: + print "wrong message format" + return + + method = msg['method'] if msg.has_key('method') else None + + call_id = msg['id'] if msg.has_key('id') and msg['id'] else None + + if call_id and method: + if method == ".sys.heartbeat": + # Unlike scala implementation, here reply to client directly + rsp_msg = { 'jsonrpc' : '2.0', + 'method' : method, + 'result' : { "time" : time.time() }, + 'id' : call_id } + + self._send( _pack(rsp_msg), identity) + if self.on_call : + self._async_call( lambda : self.on_call(identity, msg)) + + except Exception, e: + print( "_on_data_arrived:", e) + pass + + + def send_rsp(self, client_id, req, result=None, error=None): + """send response message to client + + example: + send_rsp(client_id, req, result={'data': 123}) + send_rsp(client_id, req, error=(-1, "wrong argument")) + """ + + if req['method'] == '.sys.heartbeat': + return + + rsp_msg = { 'jsonrpc' : '2.0', + 'method' : req["method"], + 'id' : req['id'] } + + if result is not None: + rsp_msg['result'] = result + + if error is not None: + rsp_msg['error'] = {'error': error[0], 'message' : error[1]} + + self._send(_pack(rsp_msg), client_id) \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/js.ico b/vnpy/trader/app/jaqsService/js.ico new file mode 100644 index 00000000..67a8e8ea Binary files /dev/null and b/vnpy/trader/app/jaqsService/js.ico differ diff --git a/vnpy/trader/app/jaqsService/jsEngine.py b/vnpy/trader/app/jaqsService/jsEngine.py new file mode 100644 index 00000000..cc96b446 --- /dev/null +++ b/vnpy/trader/app/jaqsService/jsEngine.py @@ -0,0 +1,286 @@ +# encoding: UTF-8 + +import json +from collections import defaultdict + +import jrpc_server + +from vnpy.event import Event +from vnpy.trader.vtFunction import getJsonPath +from vnpy.trader.vtObject import VtLogData, VtOrderReq, VtCancelOrderReq +from vnpy.trader.vtConstant import * + + + +EVENT_JS_LOG = 'eJsLog' + +ACTION_MAP = {} +ACTION_MAP['Buy'] = (DIRECTION_LONG, OFFSET_OPEN) +ACTION_MAP['Sell'] = (DIRECTION_SHORT, OFFSET_CLOSE) +ACTION_MAP['Short'] = (DIRECTION_SHORT, OFFSET_OPEN) +ACTION_MAP['Cover'] = (DIRECTION_LONG, OFFSET_CLOSE) +ACTION_MAP['CoverYesterday'] = (DIRECTION_LONG, OFFSET_CLOSEYESTERDAY) +ACTION_MAP['SellYesterday'] = (DIRECTION_SHORT, OFFSET_CLOSEYESTERDAY) +ACTION_MAP_REVERSE = {v:k for k,v in ACTION_MAP.items()} + +STATUS_MAP_REVERSE = {} +STATUS_MAP_REVERSE[STATUS_NOTTRADED] = 'Accepted' +STATUS_MAP_REVERSE[STATUS_PARTTRADED] = 'Accepted' +STATUS_MAP_REVERSE[STATUS_ALLTRADED] = 'Filled' +STATUS_MAP_REVERSE[STATUS_CANCELLED] = 'Cancelled' +STATUS_MAP_REVERSE[STATUS_REJECTED] = 'Rejected' +STATUS_MAP_REVERSE[STATUS_UNKNOWN] = 'New' + + +EXCHANGE_MAP = {} +EXCHANGE_MAP['SH'] = EXCHANGE_SSE +EXCHANGE_MAP['SZ'] = EXCHANGE_SZSE +EXCHANGE_MAP['CFE'] = EXCHANGE_CFFEX +EXCHANGE_MAP['SHF'] = EXCHANGE_SHFE +EXCHANGE_MAP['DCE'] = EXCHANGE_DCE +EXCHANGE_MAP['CZC'] = EXCHANGE_CZCE +EXCHANGE_MAP_REVERSE = {v:k for k, v in EXCHANGE_MAP.items()} + + +######################################################################## +class JsEngine(object): + """JAQS服务引擎""" + settingFileName = 'JS_setting.json' + settingfilePath = getJsonPath(settingFileName, __file__) + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + self.server = None # RPC服务器 + self.cbDict = {} # 回调函数字典 + + # 注册日志事件类型 + self.mainEngine.registerLogEvent(EVENT_JS_LOG) + + # 初始化 + self.initCallback() + self.initServer() + + #---------------------------------------------------------------------- + def initCallback(self): + """初始化回调函数映射""" + self.cbDict['sys.heartbeat'] = self.onHeartbeat + self.cbDict['auth.login'] = self.onLogin + self.cbDict['auth.use_strategy'] = self.onUseStrategy + self.cbDict['oms.query_position'] = self.onQueryPosition + self.cbDict['oms.query_order'] = self.onQueryOrder + self.cbDict['oms.place_order'] = self.onPlaceOrder + self.cbDict['oms.cancel_order'] = self.onCancelOrder + + #---------------------------------------------------------------------- + def initServer(self): + """初始化""" + with open(self.settingfilePath) as f: + setting = json.load(f) + host = setting['host'] + port = setting['port'] + addr = "tcp://%s:%s" %(host, port) + + # 初始化RPC服务器 + self.server = jrpc_server.JRpcServer() + self.server.on_call = self.onCall + self.server.listen(addr) + + self.writeLog(u'Jaqs服务器启动成功') + + #---------------------------------------------------------------------- + def onCall(self, clientId, req): + """RPC服务回调函数""" + method = req['method'] + cb = self.cbDict.get(method, None) + if not cb: + self.writeLog(u'无法找到方法%s对应的回调函数' %method) + return + + self.writeLog(u'收到请求:%s' %req) + + cb(clientId, req) + + #---------------------------------------------------------------------- + def onHeartbeat(self, clientId, req): + """心跳""" + pass + + #---------------------------------------------------------------------- + def onLogin(self, clientId, req): + """登录""" + params = req['params'] + + result = { + 'username': params['username'], + 'name': params['username'], + 'strategies': [1], + 'broker_strategies': [1] + } + + error = [0, ''] + + self.server.send_rsp(clientId, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onUseStrategy(self, clientId, req): + """使用策略""" + result = req['params']['account_id'] + error = [0, ''] + self.server.send_rsp(client_id, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onQueryPosition(self, clientId, req): + """查询持仓""" + l = self.mainEngine.getAllPositionDetails() + + result = defaultdict(list) + for detail in l: + security = self.converSymbol(detail.vtSymbol) + + # 多头 + if detail.longPos: + result['security'].append(security) + result['side'].append('Long') + + result['cost_price'].append(0) + result['float_pnl'].append(0) + result['close_pnl'].append(0) + result['trading_pnl'].append(0) + result['holding_pnl'].append(0) + result['commission'].append(0) + result['init_size'].append(0) + result['current_size'].append(detail.longPos) + result['enable_size'].append(detail.longPos-detail.longPosFrozen) + result['frozen_size'].append(detail.longPosFrozen) + result['uncome_size'].append(0) + result['pre_size'].append(detail.longYd) + result['today_size'].append(detail.longTd) + + # 空头 + if detail.shortPos: + result['security'].append(security) + result['side'].append('Short') + + result['cost_price'].append(0) + result['float_pnl'].append(0) + result['close_pnl'].append(0) + result['trading_pnl'].append(0) + result['holding_pnl'].append(0) + result['commission'].append(0) + result['init_size'].append(0) + result['current_size'].append(detail.shortPos) + result['enable_size'].append(detail.shortPos-detail.shortPosFrozen) + result['frozen_size'].append(detail.shortPosFrozen) + result['uncome_size'].append(0) + result['pre_size'].append(detail.shortYd) + result['today_size'].append(detail.shortTd) + + error = [0, ''] + + self.server.send_rsp(client_id, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onQueryOrder(self, clientId, req): + """查询委托""" + l = self.mainEngine.getAllWorkingOrders() + + result = defaultdict(list) + for order in l: + result['task_id'].append(order.vtOrderID) + result['entrust_no'].append(order.vtOrderID) + + result['entrust_price'].append(order.price) + result['entrust_size'].append(order.totalVolume) + result['sub_seq'].append(0) + result['sub_total'].append(0) + result['batch_no'].append(0) + + result['fill_price'].append(order.price) + result['fill_size'].append(order.tradedVolume) + result['algo'].append('') + result['entrust_action'].append(ACTION_MAP_REVERSE[(order.direction, order.offset)]) + result['order_status'].append(STATUS_MAP_REVERSE[order.status]) + result['security'].append(self.converSymbol(order.vtSymbol)) + + hh, mm, ss = order.orderTime.split(':') + result['entrust_time'].append(int(hh)*10000000+ + int(mm)*100000+ + int(ss)*1000) + + error = [0, ''] + + self.server.send_rsp(clientId, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onPlaceOrder(self, clientId, req): + """委托""" + params = req['params'] + s, e = params['security'].split('.') + + vor = VtOrderReq() + vor.symbol = s + vor.exchange = EXCHANGE_MAP[e] + vor.direction, vor.offset = ACTION_MAP[params['action']] + vor.price = params['price'] + vor.volume = params['size'] + + contract = self.mainEngine.getContract(contract) + + vtOrderID = self.mainEngine.sendOrder(vor, contract.gatewayName) + + error = [0, ''] + + self.server.send_rsp(clientId, req, vtOrderID, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onCancelOrder(self, clientId, req): + """撤单""" + params = req['params'] + vtOrderID = params['task_id'] + gatewayName, orderID = vtOrderID.split('.') + + vcor = VtCancelOrderReq() + vcor.orderID = vtOrderID + self.mainEngine.cancelOrder(vcor, gatewayName) + + error = [0, ''] + self.server.send_rsp(clientId, req, 'successful', error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def writeLog(self, content): + """发出日志事件""" + log = VtLogData() + log.logContent = content + log.gatewayName = 'JAQS_SERVICE' + event = Event(type_=EVENT_JS_LOG) + event.dict_['data'] = log + self.eventEngine.put(event) + + #---------------------------------------------------------------------- + def converSymbol(self, vtSymbol): + """转换合约代码""" + contract = self.mainEngine.getContract(vtSymbol) + if not contract: + return '' + + e = EXCHANGE_MAP_REVERSE[contract.exchange] + return '.'.join(contract.symbol, e) + + + \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/service.py b/vnpy/trader/app/jaqsService/service.py new file mode 100644 index 00000000..39bf6485 --- /dev/null +++ b/vnpy/trader/app/jaqsService/service.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- + +import jrpc_server +import time +import pandas as pd +from qdata.database import DatabaseConn +from qdata.datamodel import DataModel +import qdata.apisetting as st +import json + +server = None +view_set = {} +db = None + +def on_call(client_id, req): + + if req['method'] != '.sys.heartbeat': + print "on_call", req + + if req['method'] == 'auth.login': + server.send_rsp(client_id, req, result = { "username" : "fixme", "name": "fixme" }) + return + + if req['method'] != 'jset.query': + server.send_rsp(client_id, req, error=[-1, "unknown method"]) + return + + if not req.has_key('params'): + server.send_rsp(client_id, req, error=[-1, "missing params"]) + return + + view = req['params']['view'] + if not view or view == "sys.views": + server.send_rsp(client_id, req, result = { "name" : view_set}) + + if view not in view_set: + server.send_rsp(client_id, req, error=[-1, "wrong view name"]) + return + + result, error = datafectch(req['params']) + server.send_rsp(client_id, req, result=result, error=error) + +def init(): + global view_set, db, model + conn = DatabaseConn() + db = conn.get_conn() + model = DataModel(db) + view_set = model.apilist_set() + print(view_set) + +def run(): + global server + + init() + server = jrpc_server.JRpcServer() + server.on_call = on_call + addr = "tcp://%s:%s"%(st.HOST, st.PORT) + print "listen at " + addr + server.listen(addr) + + while True: + time.sleep(1) + +def datafectch(params): + view = params['view'] + api = model.apilist_one(view) + df = model.get_params(api) + sql = model.get_datasql(df, args = params, apilist=api) + print(sql) + data = model.get_data(bind=api.source_id, sql=sql) + if data['msg'] == st.DATA_MSG[0]: + df = pd.read_json(json.dumps(data['data']), orient='split') + return (_data(df), None) + else: + return (data['msg'], None) + + +def _data(df): + data = {} + for col in df.columns: + data[col] = df[col].tolist() + return data + +if __name__ == "__main__": + run() diff --git a/vnpy/trader/app/jaqsService/uiJsWidget.py b/vnpy/trader/app/jaqsService/uiJsWidget.py new file mode 100644 index 00000000..5fc50116 --- /dev/null +++ b/vnpy/trader/app/jaqsService/uiJsWidget.py @@ -0,0 +1,54 @@ +# encoding: UTF-8 + +''' +行情记录模块相关的GUI控制组件 +''' + +from vnpy.event import Event +from vnpy.trader.uiQt import QtWidgets, QtCore +from .jsEngine import EVENT_JS_LOG + + + +######################################################################## +class JsEngineManager(QtWidgets.QWidget): + """Jaqs服务管理组件""" + signal = QtCore.Signal(type(Event())) + + #---------------------------------------------------------------------- + def __init__(self, jsEngine, eventEngine, parent=None): + """Constructor""" + super(JsEngineManager, self).__init__(parent) + + self.jsEngine = drEngine + self.eventEngine = eventEngine + + self.initUi() + self.registerEvent() + + #---------------------------------------------------------------------- + def initUi(self): + """初始化界面""" + self.setWindowTitle(u'Jaqs服务') + # 日志监控 + self.logMonitor = QtWidgets.QTextEdit() + self.logMonitor.setReadOnly(True) + self.logMonitor.setMinimumHeight(600) + + # 设置布局 + vbox = QtWidgets.QVBoxLayout() + vbox.addWidget(self.logMonitor) + self.setLayout(vbox) + + #---------------------------------------------------------------------- + def updateLog(self, event): + """更新日志""" + log = event.dict_['data'] + content = '\t'.join([log.logTime, log.logContent]) + self.logMonitor.append(content) + + #---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.signal.connect(self.updateLog) + self.eventEngine.register(EVENT_JS_LOG, self.signal.emit) \ No newline at end of file diff --git a/vnpy/trader/vtEngine.py b/vnpy/trader/vtEngine.py index a7fa6260..abe47a73 100644 --- a/vnpy/trader/vtEngine.py +++ b/vnpy/trader/vtEngine.py @@ -282,6 +282,11 @@ class MainEngine(object): """查询所有的活跃的委托(返回列表)""" return self.dataEngine.getAllWorkingOrders() + #---------------------------------------------------------------------- + def getAllPositionDetails(self): + """查询本地持仓缓存细节""" + return self.dataEngine.getAllPositionDetails() + #---------------------------------------------------------------------- def getAllGatewayDetails(self): """查询引擎中所有底层接口的信息""" @@ -420,7 +425,7 @@ class DataEngine(object): # 更新到持仓细节中 detail = self.getPositionDetail(pos.vtSymbol) detail.updatePosition(pos) - + #---------------------------------------------------------------------- def getContract(self, vtSymbol): """查询合约对象""" @@ -490,6 +495,11 @@ class DataEngine(object): return detail + #---------------------------------------------------------------------- + def getAllPositionDetails(self): + """查询所有本地持仓缓存细节""" + return self.detailDict.values() + #---------------------------------------------------------------------- def updateOrderReq(self, req, vtOrderID): """委托请求更新"""