From c66f0d423aca7ccbb727d0632baf32bfd9da6bfc Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 27 Oct 2017 15:41:39 +0800 Subject: [PATCH] =?UTF-8?q?[Add]=E5=A2=9E=E5=8A=A0Jaqs=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 3 +- .../ctaStrategy/strategy/strategyDoubleMa.py | 3 +- vnpy/trader/app/jaqsService/JS_setting.json | 4 + vnpy/trader/app/jaqsService/__init__.py | 10 + vnpy/trader/app/jaqsService/jrpc_server.py | 212 +++++++++++++ vnpy/trader/app/jaqsService/js.ico | Bin 0 -> 21662 bytes vnpy/trader/app/jaqsService/jsEngine.py | 286 ++++++++++++++++++ vnpy/trader/app/jaqsService/service.py | 85 ++++++ vnpy/trader/app/jaqsService/uiJsWidget.py | 54 ++++ vnpy/trader/vtEngine.py | 12 +- 10 files changed, 665 insertions(+), 4 deletions(-) create mode 100644 vnpy/trader/app/jaqsService/JS_setting.json create mode 100644 vnpy/trader/app/jaqsService/__init__.py create mode 100644 vnpy/trader/app/jaqsService/jrpc_server.py create mode 100644 vnpy/trader/app/jaqsService/js.ico create mode 100644 vnpy/trader/app/jaqsService/jsEngine.py create mode 100644 vnpy/trader/app/jaqsService/service.py create mode 100644 vnpy/trader/app/jaqsService/uiJsWidget.py diff --git a/requirements.txt b/requirements.txt index dc8a453a..633d9ad7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ websocket-client msgpack-python qdarkstyle SortedContainers -futuquant \ No newline at end of file +futuquant +snappy \ No newline at end of file diff --git a/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py b/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py index 776b1ef7..7b9b9ff3 100644 --- a/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py +++ b/vnpy/trader/app/ctaStrategy/strategy/strategyDoubleMa.py @@ -13,7 +13,6 @@ from __future__ import division -#from vnpy.trader.vtObject import VtBarData from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, BarManager, @@ -29,7 +28,7 @@ class DoubleMaStrategy(CtaTemplate): # 策略参数 fastWindow = 10 # 快速均线参数 slowWindow = 60 # 慢速均线参数 - initDays = 10 # 初始化数据所用的天数 + initDays = 10 # 初始化数据所用的天数 # 策略变量 fastMa0 = EMPTY_FLOAT # 当前最新的快速EMA diff --git a/vnpy/trader/app/jaqsService/JS_setting.json b/vnpy/trader/app/jaqsService/JS_setting.json new file mode 100644 index 00000000..f08c2ebb --- /dev/null +++ b/vnpy/trader/app/jaqsService/JS_setting.json @@ -0,0 +1,4 @@ +{ + "host": "127.0.0.1", + "port": 88888 +} \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/__init__.py b/vnpy/trader/app/jaqsService/__init__.py new file mode 100644 index 00000000..f0bf3d26 --- /dev/null +++ b/vnpy/trader/app/jaqsService/__init__.py @@ -0,0 +1,10 @@ +# encoding: UTF-8 + +from jsEngine import JsEngine +from uiJsWidget import JsEngineManager + +appName = 'JaqsService' +appDisplayName = u'Jaqs服务' +appEngine = JsEngine +appWidget = JsEngineManager +appIco = 'js.ico' \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/jrpc_server.py b/vnpy/trader/app/jaqsService/jrpc_server.py new file mode 100644 index 00000000..d49037c1 --- /dev/null +++ b/vnpy/trader/app/jaqsService/jrpc_server.py @@ -0,0 +1,212 @@ +import zmq +import Queue +import threading +import msgpack +import snappy +import traceback +import time + +def _unpack(str) : + + if str[0] == 'S': + tmp = snappy.uncompress(str[1:]) + obj = msgpack.loads(tmp) + elif str[0] == '\0': + obj = msgpack.loads(str[1:]) + else: + return None + + #print "UNPACK", obj + return obj + +def _pack(obj) : +# print "PACK", obj + tmp = msgpack.dumps(obj) + if len(tmp) > 1000: + return 'S' + snappy.compress(tmp) + else: + return '\0' + tmp + +class JRpcServer : + + def __init__(self) : + self._waiter_lock = threading.Lock() + self._waiter_map = {} + + self._should_close = False + self._send_lock = threading.Lock() + + self._callback_queue = Queue.Queue() + + self._ctx = zmq.Context() + self._pull_sock = self._ctx.socket(zmq.PULL) + self._pull_sock.bind("inproc://pull_sock") + self._push_sock = self._ctx.socket(zmq.PUSH) + self._push_sock.connect("inproc://pull_sock") + + self.on_call = None + + t = threading.Thread(target=self._recv_run) + t.setDaemon(True) + t.start() + + t = threading.Thread(target=self._callback_run) + t.setDaemon(True) + t.start() + + def __del__(self): + self.close() + +# def set_on_call(self, on_call): +# """def on_call(client_id, req_msg)""" +# self._on_call = on_call + + def _recv_run(self): + + + poller = zmq.Poller() + poller.register(self._pull_sock, zmq.POLLIN) + + remote_sock = None + + #client_addr_map = {} + + while not self._should_close: + + try: + socks = dict(poller.poll(500)) + if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN: + msgs = self._pull_sock.recv_multipart() + if len(msgs) == 2: + if remote_sock: + # [addr, data] + #print "send data", msgs[0], msgs[1] + remote_sock.send_multipart(msgs) + elif len(msgs) == 1: + cmd = msgs[0] + if cmd == "LISTEN": + if remote_sock: + poller.unregister(remote_sock) + remote_sock.close() + remote_sock = None + + remote_sock = self._do_listen() + + if remote_sock : + poller.register(remote_sock, zmq.POLLIN) + + elif cmd == "CLOSE": + self._should_close = True + break + + if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN: + msgs = remote_sock.recv_multipart() + if len(msgs) == 2: + identity = msgs[0] + data = msgs[1] + #client_id = identity.split('$') + #client_addr_map[client_id] = identity + self._on_data_arrived(identity, data) + + except zmq.error.Again, e: + #print "RECV timeout: ", e + pass + except Exception, e: + print("_recv_run:", e) + + def _callback_run(self): + while not self._should_close: + try: + r = self._callback_queue.get(timeout = 1) + if r : + r() + except Queue.Empty, e: + pass + + except Exception, e: + traceback.print_exc(e) + print "_callback_run", type(e), e + + def _async_call(self, func): + self._callback_queue.put( func ) + + def listen(self, addr) : + self._addr = addr + self._push_sock.send("LISTEN") + + + def _do_listen(self): + + socket = self._ctx.socket(zmq.ROUTER) + socket.setsockopt(zmq.RCVTIMEO, 1000) + socket.setsockopt(zmq.SNDTIMEO, 1000) + socket.setsockopt(zmq.LINGER, 0) + socket.bind(self._addr) + + return socket + + def close(self): + self._should_close = True + self.push_sock.send("CLOSE") + + def _send(self, data, addr): + self._send_lock.acquire() + + #self._push_sock.send(addr, flags=zmq.SNDMORE) + #self._push_sock.send(data) + self._push_sock.send_multipart([addr, data]) + + self._send_lock.release() + + def _on_data_arrived(self, identity, data): + try: + msg = _unpack(data) + #print "RECV", msg + + if not msg: + print "wrong message format" + return + + method = msg['method'] if msg.has_key('method') else None + + call_id = msg['id'] if msg.has_key('id') and msg['id'] else None + + if call_id and method: + if method == ".sys.heartbeat": + # Unlike scala implementation, here reply to client directly + rsp_msg = { 'jsonrpc' : '2.0', + 'method' : method, + 'result' : { "time" : time.time() }, + 'id' : call_id } + + self._send( _pack(rsp_msg), identity) + if self.on_call : + self._async_call( lambda : self.on_call(identity, msg)) + + except Exception, e: + print( "_on_data_arrived:", e) + pass + + + def send_rsp(self, client_id, req, result=None, error=None): + """send response message to client + + example: + send_rsp(client_id, req, result={'data': 123}) + send_rsp(client_id, req, error=(-1, "wrong argument")) + """ + + if req['method'] == '.sys.heartbeat': + return + + rsp_msg = { 'jsonrpc' : '2.0', + 'method' : req["method"], + 'id' : req['id'] } + + if result is not None: + rsp_msg['result'] = result + + if error is not None: + rsp_msg['error'] = {'error': error[0], 'message' : error[1]} + + self._send(_pack(rsp_msg), client_id) \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/js.ico b/vnpy/trader/app/jaqsService/js.ico new file mode 100644 index 0000000000000000000000000000000000000000..67a8e8ea1f011d6fe39790897d87d316915ede7d GIT binary patch literal 21662 zcmeHO2Y6OR);?cNAiYomBmt#KSAwXhfC^GXX$ra)l-?ClDGEqcA{JJ$v4V&qD4?sX z9gx1LAczIQNDCkW20|c|%=zDU?svl%+WNcvE9>v%dA})l?%X-=J@?GanTd!8f6bZ+ ze#^@v?L|t8NL2tu@+81=8`_90j&pxM$Ii2lP!ban-)hvvQkYMtF? z|1hnMfg}nH`@=|d_5|UOvH~DK%YjnBpW!$It<nke_^@CGcl(81N!`1t2#rm}mK) zL8KZ;$O6cX3B{Wc*8u(uBE3O^t2Zt`%-{Y0m>dseF9vX$zXWh~hy1u4eSvfaP#W+9 zyyuS^DC4ETEx-%Fd%yx<9dHB)=Z9(H7_bHS9QX)$2Y3YFvZ)S0e*c?8B07IH@HDUt zI0#T?SN`7`A4MCxfVY8BKyBc^i7WEb8CU@92TsR>`yIL0!@sTYGB?fmRCRtrImf#u zKma&b1MR~V0KWxyE)y9H?l?yKdUr8!~0D0mVnQ#se z^t|oQ&9|sLj{ZIe`KP>WEDujlQh98GLI;d==xC($XmDOn{x-i1M$ zaPy1}zCD#nm7z12tF&%oEcbu8Rpo)f4)@PIYh?0Ee5S+U_{WS~2K^E_1i+j6yM}Uj z`YK5-;poGp=h$z9@4(2F4UPQ?gfIB1CwKCGPm&%Bd zx3@A$!C&Uo1VMq|f>68Of8Lub6+aNRsN_@RwydaaQG_i|i;46p%n-ZJv`J1Vp1 z8kzZqkylWe(yKRZhLO8EtK58xk#_A3#vmhY+tCKA{N+{yy+R$->(yK3 z?g2&~9!0xvQ@M<*kfUgpd%M9vL9%nQTNj7pJTDuj_Cw zPR^@US7ATyh7OM&>&(l4e}1?oJSLHmYH^&86XiOt3$)i8mmZ@ApO6>sO}3R?kqGF;nIBStbOek$(*~d^&6v?fEm9XFddb+!gkS_PBK$ zm8-5+seti}^9er7eRk1r_uS!!b3obz&OK;TpPocJpfVRWbpApki;!x_&Wf6~Zpx2rU4 zWTZ+JBYx*87ehua!$m#wvn_n1KS0?I8Uvdwd)wksHdolInyN=Bbz$o zUgiX~V?6bX{4HI<=K{-&%*FV?d2}4+T+aNE`_}2e1wa~bw)vuO6emBdUvwVg^}uF; z`*;h00=Ws_&7!5y6^vn%r#SOG&VMLB{TF_}o??w1{!RS)G;JJh80T7?Z*o1{>U#JA zhT?miq4M5`D)aE3I3IQVvu`2(DwJM>csB4P&=80Neruq9&~LHY@0Z-FYCCJ# zsA%>3q@YF($!^z9R-(^K8P};JTVi75kk>0Y=o8oY*8~6CH3WbmN+&c5HP}dZXJ9#*EBI$V;H#s>kJ#W zM!FE8GF)%fifdh!s#P)niFMkceR0Ohlb;p#E~-_NO&d1Ili_{dwoNiW{#Yh;=^`JO zD`)-K^Z`!<(ttAt$Vy*VYv4QR%f0~vWOjD8v76WH8QgJahaR z^lxzPNn4hd?tZ^T_=2viKYCO~g!=3D`$6`dSh-R%p;L<@Bki8{V#rM!b*4aWwpFPj zSu6MZoRNN_Fm>s zF8dzO+L-0sw`QG@_dmp%wUO4SCSQXea^Uva#*`^cuD-b5d&m%ZF)GUXh!=u~GAD3U z0{I0A3G&OUugX}|?+SE78{G+{UGzY{2l7O>apUAE93nU-!$zoV+Dcwrn6R|N7dbnA zUFC;OMtiMb#6a+Q0}_?ayw_) z4PqLE$f4a{Te-5V-nC2aK)nt?N3?UZ3*>JGZb9CHY1SDwOvYM%9>#oZ$4@F-vI!GH z(fn4FlfOA{(JzMi;l0%mAJw~U`$_O^-L8Z; zdxr|HOV=J+se*g~ekixwXEzR)iRo~e+dzl@4qi3^oaek)%rpS_d!L6-sSnVp|5E5Q$C zcl&JRfDG3WD7SJlrmU0+xc0vLWGHlod%y#Me9VIvpu92aTumU&_?mE_tO@cinzBy6 zzVd+OXDsG7`wke{{fh#OT{wAg!7>j4SpmkzyK4`jq1eqEA2Os)rYjj-W>HomwbDEF@*rfeh0>na{n*;)Q@%%Gi?HUb4&mnhk*1RLW2&B z57(i~SjYC&^fIyx{9F_2ukCk82Fl^e<;rR0rd&jsGV&?7S%dkV)~hFfLs?y*UI-T> zc2OVca9n~sk#Xmp(%15n`IyS#W9r?Xr(t`|?Z>d=Y`AE7WsWXX~?Oecl(zy>|FQeDG~YU@jGbdXYK%4WNQWk+9%I@~r$^jYb5Gc2DGDhB&-O69Gq|665+{e#NN|Fx>3Zx3^5mjB_ zo~i`$7lcp^d8VrYRa&=}yDUH4S61+I=!p6c9tQGF;Lu?WaBRZdIvVFNZfe)gyzu<< zx@_rEo$=ah+PhaTb3V@)q^7DL`{@yoD+0VmfGae*9Yoj%>(H+k}8 zb4$mLCK`T8PqXG4iTQXj+*b_uxCr3cl7AGl{ChMd+1gcd!2TA3GKA!yELL9RUHL0z zW=gZzSlJ40cwXk^V#Q>8@7~f3byHEV0^$mcLpTrdc__0n(&r;hq&06YT`WIOJZZp> z@sksBgZCHA%`@IXgEc-O!94%mbLQZ|g9a253^u)X?OM~kX;b=PO7hZV5#KD*0>(kE^gZ>o9_goy`6<~to=kPI{4H=+`F|vB@ zTx%Q3v)>`Oa=3E2a@rVjrPi-6_kkClck2SshrGK)37I{AzEs5Z(rBkl7|J4D7UeF= zAYJa}o23Kv?lauW^A+R=+#J>*bRc-}unEF$@N7%J-o4GSW5=wV#<=vT$z3?XiH$e=+uBUHjf!WR&go2OB*`%%B_H$i?| z1(T*%;SLjvOf&(4DoAYbj6);Mt}T81E03FE5j@ zrXojiKq8P72Idoo4wX8vS7Wdq;T$suIA+h8pyTM9M#Any=HojXGftzim%4oUaw}gL zH~QXt?`hk%ZFT6-p?cuJ0bR0qv6iI2uWUJugRS}KqmMKX#Wx7HamX4p2u ze^#tmp`}Wc(BR`wXf&P~4cilq@iGE^;8H|W+|C8K9gNh2U*D(q;@vSnom zuCGH~%62b6y$KKWz26Od2VKeqHlfWQfQ`V9(3hWajruYNXa__B^p$KDj$hy)NPb`o zib2P{1MfHW>eSJ#TesS8!V={CX!q{jEf?kClV7)Pot1m^=+U-p(4axAuWQzeZ_Y<)YjSJ9g|aGiS~;)E(*yf|)gImI(%fX2ggQ%q!$KpMCb3 znU1k*vKU1enp*R%I#$i82 z3NkaXX2rKOc<|t`Duv@+c$rO4n>I~-IO7wAJ+lPNu@f+VrVe@Wo~&;!C57j1I1ZgI zP$pNt)73c}7S<-o0C|YS>V{^)A+U_`(wKZW6I(qmAZwZqF(PgZy6Voecs1 z%{J;`e`}vRhH?Cg#kwgL@3i=g*VTvp_5S_(8H`hfe1yqrd6+kEo+e@c-BY20iNYNF z73fAN6uAdw2IHE++){lQ8>1(`q={IABw?*eemH&#Z8QD7B3}Sy|4rG>#_hC0AL9HE z{T2q_n27;Dv;}ckQ%4RNY63AarqgY=nUyP7axPS8NDdu3WaiGDtF>_c#dlc)6Z_%E z#&^XPrYy$#DKEWbphx7#a)fcu(3ahP>#fEYj4+XX`)eY6Wr;g?nI!l#Xn(wD8va@5 zOsB}$vmx9W+6dxW4S@gOQ2Y$_c&RJSZyd2_mLN|t*q?aVg9P+B9)82(<8f{YXDQ3# z{7=i~&9!&Wo~C=3E~d#s3TLWx%4;4E z@BJ&l=kWg=|41ZWMd}PV+i6z4)76ZHN zftJbQWzL?n$m8e_eWND_ua|bY1+Wm{eC2Ck5WxA(uL;zfXK}tCJ2?I68|HZ@zLO7t zq%$U24Ap_^fOic02*{1153958&lz>VaB4fG$|saZfVK zV0)9WH%26XyIxCV?=U6sEtLQ#7Vr%u(m%%eGHo-@d}96voN1r07Oro!?{fGE$G05} z(pT6C_%$I7m9k_A_WO@QUpXG|47}f;kMhQ6Xn^y@=j~aix8E^hl_FN4=$XvDG5FRQ)1f=Q*zsOi&F44gG;;Cv+MPt53!FNvu-~_ zA9-nr-EaMJts@)OZ1&8X$j7)A^Dpk@1>PA*e9W$ zMPMClxuO4_d${~xGmnSw>2!hL))P{_y5skS@s|RCV~fa_0LPkt72;6=wgl%!mM+J> z=x6r)h|lB6Nk~X&QWB1%+O=z2oTed?w-V>HRX$m0{bpP<;yVb*9VY*AE?f@6;%ua| zO=LQ<4zTY<0{dDQi&MtMv3QJ$it6L#*>TuCyFbmd z!#p=CG66WX%_`EO%{0Y(*dWm{I5Prxy}QenExQ-bSP5JU`08Nuza|{leC{biU6d`9 z)HKC8GNnJy2b&ZFKPc@6aqTG1s1pr8*Aipr4llvw;J^>y;{1s)u{p zh3>rvaVqeeg9nRSJ`X&k0-1QmZp2>!oq;&yPsbDWonz+~d#3(D*wEE$=%2Cr6aiUd zA$J1yY}|Ps#yFFRF^y$0@C^o8muo8c-Q<%6*d4x;=URDY|4X0^^gMvrh6)ub$b}bP zXn8@u7^fi~1?&XY1AR~yb>4aBxwo9PxE?k+$OE3wZ>tgTKAyK9_!#LMpkL9vPM{3k zrYXdsqtsN1i;Zm=9~-;J&;J>OzZqwO)Sg?#Tu@-2p;rd;7vm|guy)=00I?Tn&+Sco z+WG%F_~#@1{(`uen1zv%k(DBFn1G=e9K`N(Lw!g=+!t7fzU%>>2CA}Oq3ia(v;E_? z9g4}vZ2)Z&d76iG8z7$jAowmh)*5=A;lP&|BX`^X)!_fCaJ@vm=yX>o zMw(6GoSX=WKqKG*fW8;{ zu~q?Dz(#;)Kc@ksfc^l-#F$cWV1kxvXlolVsba;7>`Nq26ZzqYsV_N5&qI6z5QVhm zgku=f|Eb_U7k#jNAzvDxo{*nkfDOn$j`%5H57Mmw3^$yfpA@1|k^-ayoS)Kmq3|ag!&YRxCzQ81K%KB0Fa{qVEg=A zJL*0MaT&D5_iz+g4vYaZfFMxF3)AOP2=80VmMzI!I?xi}H+EL^+ieH&c>w1e@zAMr zmXlMxpD-B;b&F+ZBk&!Nj}xD9HrhT_8}!pH<_|718g-hqC#*ayZNk&G0jC`!!*PR? zJ{FeFmvpz>;3_KFIyv~|mK*!95Wf+j^a!N!1pL~cP=pmBgH!ILhub=w6|qrXbkYs~ zj$r zJ96Gv*n7-Lvz6{nN3r`;xxzA+x&FRX=uy;2djzUYXATM literal 0 HcmV?d00001 diff --git a/vnpy/trader/app/jaqsService/jsEngine.py b/vnpy/trader/app/jaqsService/jsEngine.py new file mode 100644 index 00000000..cc96b446 --- /dev/null +++ b/vnpy/trader/app/jaqsService/jsEngine.py @@ -0,0 +1,286 @@ +# encoding: UTF-8 + +import json +from collections import defaultdict + +import jrpc_server + +from vnpy.event import Event +from vnpy.trader.vtFunction import getJsonPath +from vnpy.trader.vtObject import VtLogData, VtOrderReq, VtCancelOrderReq +from vnpy.trader.vtConstant import * + + + +EVENT_JS_LOG = 'eJsLog' + +ACTION_MAP = {} +ACTION_MAP['Buy'] = (DIRECTION_LONG, OFFSET_OPEN) +ACTION_MAP['Sell'] = (DIRECTION_SHORT, OFFSET_CLOSE) +ACTION_MAP['Short'] = (DIRECTION_SHORT, OFFSET_OPEN) +ACTION_MAP['Cover'] = (DIRECTION_LONG, OFFSET_CLOSE) +ACTION_MAP['CoverYesterday'] = (DIRECTION_LONG, OFFSET_CLOSEYESTERDAY) +ACTION_MAP['SellYesterday'] = (DIRECTION_SHORT, OFFSET_CLOSEYESTERDAY) +ACTION_MAP_REVERSE = {v:k for k,v in ACTION_MAP.items()} + +STATUS_MAP_REVERSE = {} +STATUS_MAP_REVERSE[STATUS_NOTTRADED] = 'Accepted' +STATUS_MAP_REVERSE[STATUS_PARTTRADED] = 'Accepted' +STATUS_MAP_REVERSE[STATUS_ALLTRADED] = 'Filled' +STATUS_MAP_REVERSE[STATUS_CANCELLED] = 'Cancelled' +STATUS_MAP_REVERSE[STATUS_REJECTED] = 'Rejected' +STATUS_MAP_REVERSE[STATUS_UNKNOWN] = 'New' + + +EXCHANGE_MAP = {} +EXCHANGE_MAP['SH'] = EXCHANGE_SSE +EXCHANGE_MAP['SZ'] = EXCHANGE_SZSE +EXCHANGE_MAP['CFE'] = EXCHANGE_CFFEX +EXCHANGE_MAP['SHF'] = EXCHANGE_SHFE +EXCHANGE_MAP['DCE'] = EXCHANGE_DCE +EXCHANGE_MAP['CZC'] = EXCHANGE_CZCE +EXCHANGE_MAP_REVERSE = {v:k for k, v in EXCHANGE_MAP.items()} + + +######################################################################## +class JsEngine(object): + """JAQS服务引擎""" + settingFileName = 'JS_setting.json' + settingfilePath = getJsonPath(settingFileName, __file__) + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + self.server = None # RPC服务器 + self.cbDict = {} # 回调函数字典 + + # 注册日志事件类型 + self.mainEngine.registerLogEvent(EVENT_JS_LOG) + + # 初始化 + self.initCallback() + self.initServer() + + #---------------------------------------------------------------------- + def initCallback(self): + """初始化回调函数映射""" + self.cbDict['sys.heartbeat'] = self.onHeartbeat + self.cbDict['auth.login'] = self.onLogin + self.cbDict['auth.use_strategy'] = self.onUseStrategy + self.cbDict['oms.query_position'] = self.onQueryPosition + self.cbDict['oms.query_order'] = self.onQueryOrder + self.cbDict['oms.place_order'] = self.onPlaceOrder + self.cbDict['oms.cancel_order'] = self.onCancelOrder + + #---------------------------------------------------------------------- + def initServer(self): + """初始化""" + with open(self.settingfilePath) as f: + setting = json.load(f) + host = setting['host'] + port = setting['port'] + addr = "tcp://%s:%s" %(host, port) + + # 初始化RPC服务器 + self.server = jrpc_server.JRpcServer() + self.server.on_call = self.onCall + self.server.listen(addr) + + self.writeLog(u'Jaqs服务器启动成功') + + #---------------------------------------------------------------------- + def onCall(self, clientId, req): + """RPC服务回调函数""" + method = req['method'] + cb = self.cbDict.get(method, None) + if not cb: + self.writeLog(u'无法找到方法%s对应的回调函数' %method) + return + + self.writeLog(u'收到请求:%s' %req) + + cb(clientId, req) + + #---------------------------------------------------------------------- + def onHeartbeat(self, clientId, req): + """心跳""" + pass + + #---------------------------------------------------------------------- + def onLogin(self, clientId, req): + """登录""" + params = req['params'] + + result = { + 'username': params['username'], + 'name': params['username'], + 'strategies': [1], + 'broker_strategies': [1] + } + + error = [0, ''] + + self.server.send_rsp(clientId, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onUseStrategy(self, clientId, req): + """使用策略""" + result = req['params']['account_id'] + error = [0, ''] + self.server.send_rsp(client_id, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onQueryPosition(self, clientId, req): + """查询持仓""" + l = self.mainEngine.getAllPositionDetails() + + result = defaultdict(list) + for detail in l: + security = self.converSymbol(detail.vtSymbol) + + # 多头 + if detail.longPos: + result['security'].append(security) + result['side'].append('Long') + + result['cost_price'].append(0) + result['float_pnl'].append(0) + result['close_pnl'].append(0) + result['trading_pnl'].append(0) + result['holding_pnl'].append(0) + result['commission'].append(0) + result['init_size'].append(0) + result['current_size'].append(detail.longPos) + result['enable_size'].append(detail.longPos-detail.longPosFrozen) + result['frozen_size'].append(detail.longPosFrozen) + result['uncome_size'].append(0) + result['pre_size'].append(detail.longYd) + result['today_size'].append(detail.longTd) + + # 空头 + if detail.shortPos: + result['security'].append(security) + result['side'].append('Short') + + result['cost_price'].append(0) + result['float_pnl'].append(0) + result['close_pnl'].append(0) + result['trading_pnl'].append(0) + result['holding_pnl'].append(0) + result['commission'].append(0) + result['init_size'].append(0) + result['current_size'].append(detail.shortPos) + result['enable_size'].append(detail.shortPos-detail.shortPosFrozen) + result['frozen_size'].append(detail.shortPosFrozen) + result['uncome_size'].append(0) + result['pre_size'].append(detail.shortYd) + result['today_size'].append(detail.shortTd) + + error = [0, ''] + + self.server.send_rsp(client_id, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onQueryOrder(self, clientId, req): + """查询委托""" + l = self.mainEngine.getAllWorkingOrders() + + result = defaultdict(list) + for order in l: + result['task_id'].append(order.vtOrderID) + result['entrust_no'].append(order.vtOrderID) + + result['entrust_price'].append(order.price) + result['entrust_size'].append(order.totalVolume) + result['sub_seq'].append(0) + result['sub_total'].append(0) + result['batch_no'].append(0) + + result['fill_price'].append(order.price) + result['fill_size'].append(order.tradedVolume) + result['algo'].append('') + result['entrust_action'].append(ACTION_MAP_REVERSE[(order.direction, order.offset)]) + result['order_status'].append(STATUS_MAP_REVERSE[order.status]) + result['security'].append(self.converSymbol(order.vtSymbol)) + + hh, mm, ss = order.orderTime.split(':') + result['entrust_time'].append(int(hh)*10000000+ + int(mm)*100000+ + int(ss)*1000) + + error = [0, ''] + + self.server.send_rsp(clientId, req, result, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onPlaceOrder(self, clientId, req): + """委托""" + params = req['params'] + s, e = params['security'].split('.') + + vor = VtOrderReq() + vor.symbol = s + vor.exchange = EXCHANGE_MAP[e] + vor.direction, vor.offset = ACTION_MAP[params['action']] + vor.price = params['price'] + vor.volume = params['size'] + + contract = self.mainEngine.getContract(contract) + + vtOrderID = self.mainEngine.sendOrder(vor, contract.gatewayName) + + error = [0, ''] + + self.server.send_rsp(clientId, req, vtOrderID, error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def onCancelOrder(self, clientId, req): + """撤单""" + params = req['params'] + vtOrderID = params['task_id'] + gatewayName, orderID = vtOrderID.split('.') + + vcor = VtCancelOrderReq() + vcor.orderID = vtOrderID + self.mainEngine.cancelOrder(vcor, gatewayName) + + error = [0, ''] + self.server.send_rsp(clientId, req, 'successful', error) + + self.writeLog(u'发出响应:%s' %result) + + #---------------------------------------------------------------------- + def writeLog(self, content): + """发出日志事件""" + log = VtLogData() + log.logContent = content + log.gatewayName = 'JAQS_SERVICE' + event = Event(type_=EVENT_JS_LOG) + event.dict_['data'] = log + self.eventEngine.put(event) + + #---------------------------------------------------------------------- + def converSymbol(self, vtSymbol): + """转换合约代码""" + contract = self.mainEngine.getContract(vtSymbol) + if not contract: + return '' + + e = EXCHANGE_MAP_REVERSE[contract.exchange] + return '.'.join(contract.symbol, e) + + + \ No newline at end of file diff --git a/vnpy/trader/app/jaqsService/service.py b/vnpy/trader/app/jaqsService/service.py new file mode 100644 index 00000000..39bf6485 --- /dev/null +++ b/vnpy/trader/app/jaqsService/service.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- + +import jrpc_server +import time +import pandas as pd +from qdata.database import DatabaseConn +from qdata.datamodel import DataModel +import qdata.apisetting as st +import json + +server = None +view_set = {} +db = None + +def on_call(client_id, req): + + if req['method'] != '.sys.heartbeat': + print "on_call", req + + if req['method'] == 'auth.login': + server.send_rsp(client_id, req, result = { "username" : "fixme", "name": "fixme" }) + return + + if req['method'] != 'jset.query': + server.send_rsp(client_id, req, error=[-1, "unknown method"]) + return + + if not req.has_key('params'): + server.send_rsp(client_id, req, error=[-1, "missing params"]) + return + + view = req['params']['view'] + if not view or view == "sys.views": + server.send_rsp(client_id, req, result = { "name" : view_set}) + + if view not in view_set: + server.send_rsp(client_id, req, error=[-1, "wrong view name"]) + return + + result, error = datafectch(req['params']) + server.send_rsp(client_id, req, result=result, error=error) + +def init(): + global view_set, db, model + conn = DatabaseConn() + db = conn.get_conn() + model = DataModel(db) + view_set = model.apilist_set() + print(view_set) + +def run(): + global server + + init() + server = jrpc_server.JRpcServer() + server.on_call = on_call + addr = "tcp://%s:%s"%(st.HOST, st.PORT) + print "listen at " + addr + server.listen(addr) + + while True: + time.sleep(1) + +def datafectch(params): + view = params['view'] + api = model.apilist_one(view) + df = model.get_params(api) + sql = model.get_datasql(df, args = params, apilist=api) + print(sql) + data = model.get_data(bind=api.source_id, sql=sql) + if data['msg'] == st.DATA_MSG[0]: + df = pd.read_json(json.dumps(data['data']), orient='split') + return (_data(df), None) + else: + return (data['msg'], None) + + +def _data(df): + data = {} + for col in df.columns: + data[col] = df[col].tolist() + return data + +if __name__ == "__main__": + run() diff --git a/vnpy/trader/app/jaqsService/uiJsWidget.py b/vnpy/trader/app/jaqsService/uiJsWidget.py new file mode 100644 index 00000000..5fc50116 --- /dev/null +++ b/vnpy/trader/app/jaqsService/uiJsWidget.py @@ -0,0 +1,54 @@ +# encoding: UTF-8 + +''' +行情记录模块相关的GUI控制组件 +''' + +from vnpy.event import Event +from vnpy.trader.uiQt import QtWidgets, QtCore +from .jsEngine import EVENT_JS_LOG + + + +######################################################################## +class JsEngineManager(QtWidgets.QWidget): + """Jaqs服务管理组件""" + signal = QtCore.Signal(type(Event())) + + #---------------------------------------------------------------------- + def __init__(self, jsEngine, eventEngine, parent=None): + """Constructor""" + super(JsEngineManager, self).__init__(parent) + + self.jsEngine = drEngine + self.eventEngine = eventEngine + + self.initUi() + self.registerEvent() + + #---------------------------------------------------------------------- + def initUi(self): + """初始化界面""" + self.setWindowTitle(u'Jaqs服务') + # 日志监控 + self.logMonitor = QtWidgets.QTextEdit() + self.logMonitor.setReadOnly(True) + self.logMonitor.setMinimumHeight(600) + + # 设置布局 + vbox = QtWidgets.QVBoxLayout() + vbox.addWidget(self.logMonitor) + self.setLayout(vbox) + + #---------------------------------------------------------------------- + def updateLog(self, event): + """更新日志""" + log = event.dict_['data'] + content = '\t'.join([log.logTime, log.logContent]) + self.logMonitor.append(content) + + #---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.signal.connect(self.updateLog) + self.eventEngine.register(EVENT_JS_LOG, self.signal.emit) \ No newline at end of file diff --git a/vnpy/trader/vtEngine.py b/vnpy/trader/vtEngine.py index a7fa6260..abe47a73 100644 --- a/vnpy/trader/vtEngine.py +++ b/vnpy/trader/vtEngine.py @@ -282,6 +282,11 @@ class MainEngine(object): """查询所有的活跃的委托(返回列表)""" return self.dataEngine.getAllWorkingOrders() + #---------------------------------------------------------------------- + def getAllPositionDetails(self): + """查询本地持仓缓存细节""" + return self.dataEngine.getAllPositionDetails() + #---------------------------------------------------------------------- def getAllGatewayDetails(self): """查询引擎中所有底层接口的信息""" @@ -420,7 +425,7 @@ class DataEngine(object): # 更新到持仓细节中 detail = self.getPositionDetail(pos.vtSymbol) detail.updatePosition(pos) - + #---------------------------------------------------------------------- def getContract(self, vtSymbol): """查询合约对象""" @@ -490,6 +495,11 @@ class DataEngine(object): return detail + #---------------------------------------------------------------------- + def getAllPositionDetails(self): + """查询所有本地持仓缓存细节""" + return self.detailDict.values() + #---------------------------------------------------------------------- def updateOrderReq(self, req, vtOrderID): """委托请求更新"""