[Add]增加Jaqs服务功能模块

This commit is contained in:
vn.py 2017-10-27 15:41:39 +08:00
parent cc3fdfc569
commit c66f0d423a
10 changed files with 665 additions and 4 deletions

View File

@ -3,4 +3,5 @@ websocket-client
msgpack-python
qdarkstyle
SortedContainers
futuquant
futuquant
snappy

View File

@ -13,7 +13,6 @@
from __future__ import division
#from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarManager,
@ -29,7 +28,7 @@ class DoubleMaStrategy(CtaTemplate):
# 策略参数
fastWindow = 10 # 快速均线参数
slowWindow = 60 # 慢速均线参数
initDays = 10 # 初始化数据所用的天数
initDays = 10 # 初始化数据所用的天数
# 策略变量
fastMa0 = EMPTY_FLOAT # 当前最新的快速EMA

View File

@ -0,0 +1,4 @@
{
"host": "127.0.0.1",
"port": 88888
}

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from jsEngine import JsEngine
from uiJsWidget import JsEngineManager
appName = 'JaqsService'
appDisplayName = u'Jaqs服务'
appEngine = JsEngine
appWidget = JsEngineManager
appIco = 'js.ico'

View File

@ -0,0 +1,212 @@
import zmq
import Queue
import threading
import msgpack
import snappy
import traceback
import time
def _unpack(str) :
if str[0] == 'S':
tmp = snappy.uncompress(str[1:])
obj = msgpack.loads(tmp)
elif str[0] == '\0':
obj = msgpack.loads(str[1:])
else:
return None
#print "UNPACK", obj
return obj
def _pack(obj) :
# print "PACK", obj
tmp = msgpack.dumps(obj)
if len(tmp) > 1000:
return 'S' + snappy.compress(tmp)
else:
return '\0' + tmp
class JRpcServer :
def __init__(self) :
self._waiter_lock = threading.Lock()
self._waiter_map = {}
self._should_close = False
self._send_lock = threading.Lock()
self._callback_queue = Queue.Queue()
self._ctx = zmq.Context()
self._pull_sock = self._ctx.socket(zmq.PULL)
self._pull_sock.bind("inproc://pull_sock")
self._push_sock = self._ctx.socket(zmq.PUSH)
self._push_sock.connect("inproc://pull_sock")
self.on_call = None
t = threading.Thread(target=self._recv_run)
t.setDaemon(True)
t.start()
t = threading.Thread(target=self._callback_run)
t.setDaemon(True)
t.start()
def __del__(self):
self.close()
# def set_on_call(self, on_call):
# """def on_call(client_id, req_msg)"""
# self._on_call = on_call
def _recv_run(self):
poller = zmq.Poller()
poller.register(self._pull_sock, zmq.POLLIN)
remote_sock = None
#client_addr_map = {}
while not self._should_close:
try:
socks = dict(poller.poll(500))
if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN:
msgs = self._pull_sock.recv_multipart()
if len(msgs) == 2:
if remote_sock:
# [addr, data]
#print "send data", msgs[0], msgs[1]
remote_sock.send_multipart(msgs)
elif len(msgs) == 1:
cmd = msgs[0]
if cmd == "LISTEN":
if remote_sock:
poller.unregister(remote_sock)
remote_sock.close()
remote_sock = None
remote_sock = self._do_listen()
if remote_sock :
poller.register(remote_sock, zmq.POLLIN)
elif cmd == "CLOSE":
self._should_close = True
break
if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN:
msgs = remote_sock.recv_multipart()
if len(msgs) == 2:
identity = msgs[0]
data = msgs[1]
#client_id = identity.split('$')
#client_addr_map[client_id] = identity
self._on_data_arrived(identity, data)
except zmq.error.Again, e:
#print "RECV timeout: ", e
pass
except Exception, e:
print("_recv_run:", e)
def _callback_run(self):
while not self._should_close:
try:
r = self._callback_queue.get(timeout = 1)
if r :
r()
except Queue.Empty, e:
pass
except Exception, e:
traceback.print_exc(e)
print "_callback_run", type(e), e
def _async_call(self, func):
self._callback_queue.put( func )
def listen(self, addr) :
self._addr = addr
self._push_sock.send("LISTEN")
def _do_listen(self):
socket = self._ctx.socket(zmq.ROUTER)
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.SNDTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0)
socket.bind(self._addr)
return socket
def close(self):
self._should_close = True
self.push_sock.send("CLOSE")
def _send(self, data, addr):
self._send_lock.acquire()
#self._push_sock.send(addr, flags=zmq.SNDMORE)
#self._push_sock.send(data)
self._push_sock.send_multipart([addr, data])
self._send_lock.release()
def _on_data_arrived(self, identity, data):
try:
msg = _unpack(data)
#print "RECV", msg
if not msg:
print "wrong message format"
return
method = msg['method'] if msg.has_key('method') else None
call_id = msg['id'] if msg.has_key('id') and msg['id'] else None
if call_id and method:
if method == ".sys.heartbeat":
# Unlike scala implementation, here reply to client directly
rsp_msg = { 'jsonrpc' : '2.0',
'method' : method,
'result' : { "time" : time.time() },
'id' : call_id }
self._send( _pack(rsp_msg), identity)
if self.on_call :
self._async_call( lambda : self.on_call(identity, msg))
except Exception, e:
print( "_on_data_arrived:", e)
pass
def send_rsp(self, client_id, req, result=None, error=None):
"""send response message to client
example:
send_rsp(client_id, req, result={'data': 123})
send_rsp(client_id, req, error=(-1, "wrong argument"))
"""
if req['method'] == '.sys.heartbeat':
return
rsp_msg = { 'jsonrpc' : '2.0',
'method' : req["method"],
'id' : req['id'] }
if result is not None:
rsp_msg['result'] = result
if error is not None:
rsp_msg['error'] = {'error': error[0], 'message' : error[1]}
self._send(_pack(rsp_msg), client_id)

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

View File

@ -0,0 +1,286 @@
# encoding: UTF-8
import json
from collections import defaultdict
import jrpc_server
from vnpy.event import Event
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtObject import VtLogData, VtOrderReq, VtCancelOrderReq
from vnpy.trader.vtConstant import *
EVENT_JS_LOG = 'eJsLog'
ACTION_MAP = {}
ACTION_MAP['Buy'] = (DIRECTION_LONG, OFFSET_OPEN)
ACTION_MAP['Sell'] = (DIRECTION_SHORT, OFFSET_CLOSE)
ACTION_MAP['Short'] = (DIRECTION_SHORT, OFFSET_OPEN)
ACTION_MAP['Cover'] = (DIRECTION_LONG, OFFSET_CLOSE)
ACTION_MAP['CoverYesterday'] = (DIRECTION_LONG, OFFSET_CLOSEYESTERDAY)
ACTION_MAP['SellYesterday'] = (DIRECTION_SHORT, OFFSET_CLOSEYESTERDAY)
ACTION_MAP_REVERSE = {v:k for k,v in ACTION_MAP.items()}
STATUS_MAP_REVERSE = {}
STATUS_MAP_REVERSE[STATUS_NOTTRADED] = 'Accepted'
STATUS_MAP_REVERSE[STATUS_PARTTRADED] = 'Accepted'
STATUS_MAP_REVERSE[STATUS_ALLTRADED] = 'Filled'
STATUS_MAP_REVERSE[STATUS_CANCELLED] = 'Cancelled'
STATUS_MAP_REVERSE[STATUS_REJECTED] = 'Rejected'
STATUS_MAP_REVERSE[STATUS_UNKNOWN] = 'New'
EXCHANGE_MAP = {}
EXCHANGE_MAP['SH'] = EXCHANGE_SSE
EXCHANGE_MAP['SZ'] = EXCHANGE_SZSE
EXCHANGE_MAP['CFE'] = EXCHANGE_CFFEX
EXCHANGE_MAP['SHF'] = EXCHANGE_SHFE
EXCHANGE_MAP['DCE'] = EXCHANGE_DCE
EXCHANGE_MAP['CZC'] = EXCHANGE_CZCE
EXCHANGE_MAP_REVERSE = {v:k for k, v in EXCHANGE_MAP.items()}
########################################################################
class JsEngine(object):
"""JAQS服务引擎"""
settingFileName = 'JS_setting.json'
settingfilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.server = None # RPC服务器
self.cbDict = {} # 回调函数字典
# 注册日志事件类型
self.mainEngine.registerLogEvent(EVENT_JS_LOG)
# 初始化
self.initCallback()
self.initServer()
#----------------------------------------------------------------------
def initCallback(self):
"""初始化回调函数映射"""
self.cbDict['sys.heartbeat'] = self.onHeartbeat
self.cbDict['auth.login'] = self.onLogin
self.cbDict['auth.use_strategy'] = self.onUseStrategy
self.cbDict['oms.query_position'] = self.onQueryPosition
self.cbDict['oms.query_order'] = self.onQueryOrder
self.cbDict['oms.place_order'] = self.onPlaceOrder
self.cbDict['oms.cancel_order'] = self.onCancelOrder
#----------------------------------------------------------------------
def initServer(self):
"""初始化"""
with open(self.settingfilePath) as f:
setting = json.load(f)
host = setting['host']
port = setting['port']
addr = "tcp://%s:%s" %(host, port)
# 初始化RPC服务器
self.server = jrpc_server.JRpcServer()
self.server.on_call = self.onCall
self.server.listen(addr)
self.writeLog(u'Jaqs服务器启动成功')
#----------------------------------------------------------------------
def onCall(self, clientId, req):
"""RPC服务回调函数"""
method = req['method']
cb = self.cbDict.get(method, None)
if not cb:
self.writeLog(u'无法找到方法%s对应的回调函数' %method)
return
self.writeLog(u'收到请求:%s' %req)
cb(clientId, req)
#----------------------------------------------------------------------
def onHeartbeat(self, clientId, req):
"""心跳"""
pass
#----------------------------------------------------------------------
def onLogin(self, clientId, req):
"""登录"""
params = req['params']
result = {
'username': params['username'],
'name': params['username'],
'strategies': [1],
'broker_strategies': [1]
}
error = [0, '']
self.server.send_rsp(clientId, req, result, error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def onUseStrategy(self, clientId, req):
"""使用策略"""
result = req['params']['account_id']
error = [0, '']
self.server.send_rsp(client_id, req, result, error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def onQueryPosition(self, clientId, req):
"""查询持仓"""
l = self.mainEngine.getAllPositionDetails()
result = defaultdict(list)
for detail in l:
security = self.converSymbol(detail.vtSymbol)
# 多头
if detail.longPos:
result['security'].append(security)
result['side'].append('Long')
result['cost_price'].append(0)
result['float_pnl'].append(0)
result['close_pnl'].append(0)
result['trading_pnl'].append(0)
result['holding_pnl'].append(0)
result['commission'].append(0)
result['init_size'].append(0)
result['current_size'].append(detail.longPos)
result['enable_size'].append(detail.longPos-detail.longPosFrozen)
result['frozen_size'].append(detail.longPosFrozen)
result['uncome_size'].append(0)
result['pre_size'].append(detail.longYd)
result['today_size'].append(detail.longTd)
# 空头
if detail.shortPos:
result['security'].append(security)
result['side'].append('Short')
result['cost_price'].append(0)
result['float_pnl'].append(0)
result['close_pnl'].append(0)
result['trading_pnl'].append(0)
result['holding_pnl'].append(0)
result['commission'].append(0)
result['init_size'].append(0)
result['current_size'].append(detail.shortPos)
result['enable_size'].append(detail.shortPos-detail.shortPosFrozen)
result['frozen_size'].append(detail.shortPosFrozen)
result['uncome_size'].append(0)
result['pre_size'].append(detail.shortYd)
result['today_size'].append(detail.shortTd)
error = [0, '']
self.server.send_rsp(client_id, req, result, error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def onQueryOrder(self, clientId, req):
"""查询委托"""
l = self.mainEngine.getAllWorkingOrders()
result = defaultdict(list)
for order in l:
result['task_id'].append(order.vtOrderID)
result['entrust_no'].append(order.vtOrderID)
result['entrust_price'].append(order.price)
result['entrust_size'].append(order.totalVolume)
result['sub_seq'].append(0)
result['sub_total'].append(0)
result['batch_no'].append(0)
result['fill_price'].append(order.price)
result['fill_size'].append(order.tradedVolume)
result['algo'].append('')
result['entrust_action'].append(ACTION_MAP_REVERSE[(order.direction, order.offset)])
result['order_status'].append(STATUS_MAP_REVERSE[order.status])
result['security'].append(self.converSymbol(order.vtSymbol))
hh, mm, ss = order.orderTime.split(':')
result['entrust_time'].append(int(hh)*10000000+
int(mm)*100000+
int(ss)*1000)
error = [0, '']
self.server.send_rsp(clientId, req, result, error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def onPlaceOrder(self, clientId, req):
"""委托"""
params = req['params']
s, e = params['security'].split('.')
vor = VtOrderReq()
vor.symbol = s
vor.exchange = EXCHANGE_MAP[e]
vor.direction, vor.offset = ACTION_MAP[params['action']]
vor.price = params['price']
vor.volume = params['size']
contract = self.mainEngine.getContract(contract)
vtOrderID = self.mainEngine.sendOrder(vor, contract.gatewayName)
error = [0, '']
self.server.send_rsp(clientId, req, vtOrderID, error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def onCancelOrder(self, clientId, req):
"""撤单"""
params = req['params']
vtOrderID = params['task_id']
gatewayName, orderID = vtOrderID.split('.')
vcor = VtCancelOrderReq()
vcor.orderID = vtOrderID
self.mainEngine.cancelOrder(vcor, gatewayName)
error = [0, '']
self.server.send_rsp(clientId, req, 'successful', error)
self.writeLog(u'发出响应:%s' %result)
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志事件"""
log = VtLogData()
log.logContent = content
log.gatewayName = 'JAQS_SERVICE'
event = Event(type_=EVENT_JS_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def converSymbol(self, vtSymbol):
"""转换合约代码"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
return ''
e = EXCHANGE_MAP_REVERSE[contract.exchange]
return '.'.join(contract.symbol, e)

View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
import jrpc_server
import time
import pandas as pd
from qdata.database import DatabaseConn
from qdata.datamodel import DataModel
import qdata.apisetting as st
import json
server = None
view_set = {}
db = None
def on_call(client_id, req):
if req['method'] != '.sys.heartbeat':
print "on_call", req
if req['method'] == 'auth.login':
server.send_rsp(client_id, req, result = { "username" : "fixme", "name": "fixme" })
return
if req['method'] != 'jset.query':
server.send_rsp(client_id, req, error=[-1, "unknown method"])
return
if not req.has_key('params'):
server.send_rsp(client_id, req, error=[-1, "missing params"])
return
view = req['params']['view']
if not view or view == "sys.views":
server.send_rsp(client_id, req, result = { "name" : view_set})
if view not in view_set:
server.send_rsp(client_id, req, error=[-1, "wrong view name"])
return
result, error = datafectch(req['params'])
server.send_rsp(client_id, req, result=result, error=error)
def init():
global view_set, db, model
conn = DatabaseConn()
db = conn.get_conn()
model = DataModel(db)
view_set = model.apilist_set()
print(view_set)
def run():
global server
init()
server = jrpc_server.JRpcServer()
server.on_call = on_call
addr = "tcp://%s:%s"%(st.HOST, st.PORT)
print "listen at " + addr
server.listen(addr)
while True:
time.sleep(1)
def datafectch(params):
view = params['view']
api = model.apilist_one(view)
df = model.get_params(api)
sql = model.get_datasql(df, args = params, apilist=api)
print(sql)
data = model.get_data(bind=api.source_id, sql=sql)
if data['msg'] == st.DATA_MSG[0]:
df = pd.read_json(json.dumps(data['data']), orient='split')
return (_data(df), None)
else:
return (data['msg'], None)
def _data(df):
data = {}
for col in df.columns:
data[col] = df[col].tolist()
return data
if __name__ == "__main__":
run()

View File

@ -0,0 +1,54 @@
# encoding: UTF-8
'''
行情记录模块相关的GUI控制组件
'''
from vnpy.event import Event
from vnpy.trader.uiQt import QtWidgets, QtCore
from .jsEngine import EVENT_JS_LOG
########################################################################
class JsEngineManager(QtWidgets.QWidget):
"""Jaqs服务管理组件"""
signal = QtCore.Signal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, jsEngine, eventEngine, parent=None):
"""Constructor"""
super(JsEngineManager, self).__init__(parent)
self.jsEngine = drEngine
self.eventEngine = eventEngine
self.initUi()
self.registerEvent()
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle(u'Jaqs服务')
# 日志监控
self.logMonitor = QtWidgets.QTextEdit()
self.logMonitor.setReadOnly(True)
self.logMonitor.setMinimumHeight(600)
# 设置布局
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(self.logMonitor)
self.setLayout(vbox)
#----------------------------------------------------------------------
def updateLog(self, event):
"""更新日志"""
log = event.dict_['data']
content = '\t'.join([log.logTime, log.logContent])
self.logMonitor.append(content)
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.signal.connect(self.updateLog)
self.eventEngine.register(EVENT_JS_LOG, self.signal.emit)

View File

@ -282,6 +282,11 @@ class MainEngine(object):
"""查询所有的活跃的委托(返回列表)"""
return self.dataEngine.getAllWorkingOrders()
#----------------------------------------------------------------------
def getAllPositionDetails(self):
"""查询本地持仓缓存细节"""
return self.dataEngine.getAllPositionDetails()
#----------------------------------------------------------------------
def getAllGatewayDetails(self):
"""查询引擎中所有底层接口的信息"""
@ -420,7 +425,7 @@ class DataEngine(object):
# 更新到持仓细节中
detail = self.getPositionDetail(pos.vtSymbol)
detail.updatePosition(pos)
#----------------------------------------------------------------------
def getContract(self, vtSymbol):
"""查询合约对象"""
@ -490,6 +495,11 @@ class DataEngine(object):
return detail
#----------------------------------------------------------------------
def getAllPositionDetails(self):
"""查询所有本地持仓缓存细节"""
return self.detailDict.values()
#----------------------------------------------------------------------
def updateOrderReq(self, req, vtOrderID):
"""委托请求更新"""