Merge remote-tracking branch 'origin/dev'

# Conflicts:
#	vnpy/trader/app/dataRecorder/DR_setting.json
This commit is contained in:
Happiness 2017-07-03 21:22:24 +08:00
commit e235f84426
63 changed files with 2549 additions and 168 deletions

View File

@ -37,7 +37,8 @@ Using the vn.py project, institutional investors and professional traders, such
- LTSvn.lts
- QDPvn.qdp
- CSHSHLPvn.cshshlp
**Chinese Precious Metal Market**

View File

Before

Width:  |  Height:  |  Size: 618 KiB

After

Width:  |  Height:  |  Size: 618 KiB

View File

Before

Width:  |  Height:  |  Size: 234 KiB

After

Width:  |  Height:  |  Size: 234 KiB

View File

Before

Width:  |  Height:  |  Size: 465 KiB

After

Width:  |  Height:  |  Size: 465 KiB

View File

Before

Width:  |  Height:  |  Size: 465 KiB

After

Width:  |  Height:  |  Size: 465 KiB

View File

Before

Width:  |  Height:  |  Size: 320 KiB

After

Width:  |  Height:  |  Size: 320 KiB

View File

Before

Width:  |  Height:  |  Size: 319 KiB

After

Width:  |  Height:  |  Size: 319 KiB

View File

@ -61,7 +61,7 @@ RUN echo "开始配置系vnpy环境" \
&& echo "更改 pip 源" \
&& mkdir ~/.pip \
&& echo "[global]" >> ~/.pip/pip.conf \
&& echo "index-url = http://pypi.douban.com/simple" >> ~/.pip/pip.conf \
&& echo "index-url = https://pypi.doubanio.com/simple/" >> ~/.pip/pip.conf \
&& echo "使用 pip 安装 python 库" \
&& pip install TA-Lib \
&& conda clean -ay \
@ -70,4 +70,4 @@ RUN echo "开始配置系vnpy环境" \
WORKDIR /srv/vnpy
# 暂时不设置入口点,否则不能使用 -it 交互模式
# ENTRYPOINT python /srv/vnpy/vn.trader/vtServer.py
# ENTRYPOINT python /srv/vnpy/vn.trader/vtServer.py

View File

@ -0,0 +1,119 @@
# encoding: UTF-8
import multiprocessing
from time import sleep
from datetime import datetime, time
from vnpy.event import EventEngine2
from vnpy.trader.vtEvent import EVENT_LOG
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.gateway import ctpGateway
from vnpy.trader.app import ctaStrategy
from vnpy.trader.app.ctaStrategy.ctaBase import EVENT_CTA_LOG
#----------------------------------------------------------------------
def printLog(content):
"""输出日志"""
t = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print '%s\t%s' %(t, content)
#----------------------------------------------------------------------
def processLogEvent(event):
"""处理日志事件"""
log = event.dict_['data']
if log.gatewayName:
content = '%s:%s' %(log.gatewayName, log.logContent)
else:
content = '%s:%s' %('MainEngine', log.logContent)
printLog(content)
#----------------------------------------------------------------------
def processCtaLogEvent(event):
"""处理CTA模块日志事件"""
log = event.dict_['data']
content = '%s:%s' %('CTA Engine', log.logContent)
printLog(content)
#----------------------------------------------------------------------
def runChildProcess():
"""子进程运行函数"""
print '-'*20
printLog(u'启动CTA策略运行子进程')
ee = EventEngine2()
printLog(u'事件引擎创建成功')
me = MainEngine(ee)
me.addGateway(ctpGateway)
me.addApp(ctaStrategy)
printLog(u'主引擎创建成功')
ee.register(EVENT_LOG, processLogEvent)
ee.register(EVENT_CTA_LOG, processCtaLogEvent)
printLog(u'注册日志事件监听')
me.connect('CTP')
printLog(u'连接CTP接口')
sleep(5) # 等待CTP接口初始化
cta = me.appDict[ctaStrategy.appName]
cta.loadSetting()
printLog(u'CTA策略载入成功')
cta.initAll()
printLog(u'CTA策略初始化成功')
cta.startAll()
printLog(u'CTA策略启动成功')
while True:
sleep(1)
#----------------------------------------------------------------------
def runParentProcess():
"""父进程运行函数"""
printLog(u'启动CTA策略守护父进程')
DAY_START = time(8, 45) # 日盘启动和停止时间
DAY_END = time(15, 30)
NIGHT_START = time(20, 45) # 夜盘启动和停止时间
NIGHT_END = time(2, 45)
p = None # 子进程句柄
while True:
currentTime = datetime.now().time()
recording = False
# 判断当前处于的时间段
if ((currentTime >= DAY_START and currentTime <= DAY_END) or
(currentTime >= NIGHT_START) or
(currentTime <= NIGHT_END)):
recording = True
# 记录时间则需要启动子进程
if recording and p is None:
printLog(u'启动子进程')
p = multiprocessing.Process(target=runChildProcess)
p.start()
printLog(u'子进程启动成功')
# 非记录时间则退出子进程
if not recording and p is not None:
printLog(u'关闭子进程')
p.terminate()
p.join()
p = None
printLog(u'子进程关闭成功')
sleep(5)
if __name__ == '__main__':
runChildProcess()
# 尽管同样实现了无人值守但强烈建议每天启动时人工检查为自己的PNL负责
# runParentProcess()

View File

@ -0,0 +1,95 @@
# encoding: UTF-8
import multiprocessing
from time import sleep
from datetime import datetime, time
from vnpy.event import EventEngine2
from vnpy.trader.vtEvent import EVENT_LOG
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.gateway import ctpGateway
from vnpy.trader.app import dataRecorder
#----------------------------------------------------------------------
def printLog(content):
"""输出日志"""
t = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print '%s\t%s' %(t, content)
#----------------------------------------------------------------------
def processLogEvent(event):
"""处理日志事件"""
log = event.dict_['data']
if log.gatewayName:
content = '%s:%s' %(log.gatewayName, log.logContent)
else:
content = '%s:%s' %('MainEngine', log.logContent)
printLog(content)
#----------------------------------------------------------------------
def runChildProcess():
"""子进程运行函数"""
print '-'*20
printLog(u'启动行情记录运行子进程')
ee = EventEngine2()
printLog(u'事件引擎创建成功')
me = MainEngine(ee)
me.addGateway(ctpGateway)
me.addApp(dataRecorder)
printLog(u'主引擎创建成功')
ee.register(EVENT_LOG, processLogEvent)
printLog(u'注册日志事件监听')
me.connect('CTP')
printLog(u'连接CTP接口')
while True:
sleep(1)
#----------------------------------------------------------------------
def runParentProcess():
"""父进程运行函数"""
printLog(u'启动行情记录守护父进程')
DAY_START = time(8, 57) # 日盘启动和停止时间
DAY_END = time(15, 18)
NIGHT_START = time(20, 57) # 夜盘启动和停止时间
NIGHT_END = time(2, 33)
p = None # 子进程句柄
while True:
currentTime = datetime.now().time()
recording = False
# 判断当前处于的时间段
if ((currentTime >= DAY_START and currentTime <= DAY_END) or
(currentTime >= NIGHT_START) or
(currentTime <= NIGHT_END)):
recording = True
# 记录时间则需要启动子进程
if recording and p is None:
printLog(u'启动子进程')
p = multiprocessing.Process(target=runChildProcess)
p.start()
printLog(u'子进程启动成功')
# 非记录时间则退出子进程
if not recording and p is not None:
printLog(u'关闭子进程')
p.terminate()
p.join()
p = None
printLog(u'子进程关闭成功')
sleep(5)
if __name__ == '__main__':
runChildProcess()
#runParentProcess()

11
examples/README.md Normal file
View File

@ -0,0 +1,11 @@
# vn.py项目的应用示例
本文件夹中的内容主要是关于如何在交易业务中使用vn.py的示例
* VnTrader最常用的vn.py图形交易界面
* DataRecording全自动行情记录工具无需用户每日定时重启
* CtaTrading无图形界面模式的CTA策略交易
* CtaBacktestingCTA策略的回测和优化

View File

@ -19,8 +19,7 @@ from vnpy.trader.uiMainWindow import MainWindow
from vnpy.trader.gateway import ( huobiGateway, okcoinGateway)
# 加载上层应用
from vnpy.trader.app import (riskManager, dataRecorder,
ctaStrategy)
from vnpy.trader.app import (riskManager, ctaStrategy, spreadTrading)
#----------------------------------------------------------------------
@ -45,8 +44,8 @@ def main():
# 添加上层应用
me.addApp(riskManager)
me.addApp(dataRecorder)
me.addApp(ctaStrategy)
me.addApp(spreadTrading)
# 创建主窗口
mw = MainWindow(me, ee)

5
vnpy/data/README.md Normal file
View File

@ -0,0 +1,5 @@
# vn.data - 数据相关工具
### 历史数据
* datayes通联数据接口
* shcifco上海中期接口

0
vnpy/data/__init__.py Normal file
View File

View File

@ -0,0 +1 @@
from vndatayes import DatayesApi

View File

@ -0,0 +1,52 @@
# encoding: UTF-8
'''一个简单的通联数据客户端主要使用requests开发比通联官网的python例子更为简洁。'''
import os
import requests
import json
HTTP_OK = 200
########################################################################
class DatayesApi(object):
"""通联数据API"""
#----------------------------------------------------------------------
def __init__(self, token,
domain="http://api.wmcloud.com/data",
version="v1"):
"""Constructor"""
self.domain = domain # 主域名
self.version = version # API版本
self.token = token # 授权码
self.header = {} # http请求头部
self.header['Connection'] = 'keep_alive'
self.header['Authorization'] = 'Bearer ' + self.token
#----------------------------------------------------------------------
def downloadData(self, path, params):
"""下载数据"""
url = '/'.join([self.domain, self.version, path])
r = requests.get(url=url, headers=self.header, params=params)
if r.status_code != HTTP_OK:
print u'http请求失败状态代码%s' %r.status_code
return None
else:
result = r.json()
if 'retMsg' in result and result['retMsg'] == 'Success':
return result['data']
else:
if 'retMsg' in result:
print u'查询失败,返回信息%s' %result['retMsg']
elif 'message' in result:
print u'查询失败,返回信息%s' %result['message']
return None

View File

27
vnpy/data/shcifco/test.py Normal file
View File

@ -0,0 +1,27 @@
# encoding: UTF-8
from vnshcifco import ShcifcoApi, PERIOD_1MIN
if __name__ == "__main__":
ip = '101.231.179.199'
port = '10102'
token = 'testd2cda34b2d317779e812eb84ee4224a6_qpweqf1'
symbol = 'cu1709'
# 创建API对象
api = ShcifcoApi(ip, port, token)
# 获取最新tick
print api.getLastTick(symbol)
# 获取最新价格
print api.getLastPrice(symbol)
# 获取最新分钟线
print api.getLastBar(symbol)
# 获取历史分钟线
print api.getHisBar(symbol, 502, period=PERIOD_1MIN)

View File

@ -0,0 +1,147 @@
# encoding: UTF-8
import requests
HTTP_OK = 200
PERIOD_1MIN = '1m'
PERIOD_5MIN = '5m'
PERIOD_15MIN = '15m'
PERIOD_60MIN = '60m'
PERIOD_1DAY = '1d'
########################################################################
class ShcifcoApi(object):
"""数据接口"""
#----------------------------------------------------------------------
def __init__(self, ip, port, token):
"""Constructor"""
self.ip = ip
self.port = port
self.token = token
self.service = 'ShcifcoApi'
self.domain = 'http://' + ':'.join([self.ip, self.port])
#----------------------------------------------------------------------
def getData(self, path, params):
"""下载数据"""
url = '/'.join([self.domain, self.service, path])
params['token'] = self.token
r = requests.get(url=url, params=params)
if r.status_code != HTTP_OK:
print u'http请求失败状态代码%s' %r.status_code
return None
else:
return r.text
#----------------------------------------------------------------------
def getLastTick(self, symbol):
"""获取最新Tick"""
path = 'lasttick'
params = {'ids': symbol}
data = self.getData(path, params)
if not data:
return None
data = data.split(';')[0]
l = data.split(',')
d = {
'symbol': l[0],
'lastPrice': float(l[1]),
'bidPrice': float(l[2]),
'bidVolume': int(l[3]),
'askPrice': float(l[4]),
'askVolume': int(l[5]),
'volume': int(l[6]),
'openInterest': int(l[7])
}
return d
#----------------------------------------------------------------------
def getLastPrice(self, symbol):
"""获取最新成交价"""
path = 'lastprice'
params = {'ids': symbol}
data = self.getData(path, params)
if not data:
return None
data = data.split(';')[0]
price = float(data)
return price
#----------------------------------------------------------------------
def getLastBar(self, symbol):
"""获取最新的一分钟K线数据"""
path = 'lastbar'
params = {'id': symbol}
data = self.getData(path, params)
if not data:
return None
data = data.split(';')[0]
l = data.split(',')
d = {
'symbol': l[0],
'time': l[1],
'open': float(l[2]),
'high': float(l[3]),
'low': float(l[4]),
'close': float(l[5]),
'volume': int(l[6]),
'openInterest': int(l[7])
}
return d
#----------------------------------------------------------------------
def getHisBar(self, symbol, num, date='', period=''):
"""获取历史K线数据"""
path = 'hisbar'
# 默认参数
params = {
'id': symbol,
'num': num
}
# 可选参数
if date:
params[date] = date
if period:
params[period] = period
data = self.getData(path, params)
if not data:
return None
barList = []
l = data.split(';')
for barStr in l:
# 过滤某些空数据
if ',' not in barStr:
continue
barData = barStr.split(',')
d = {
'symbol': barData[0],
'date': barData[1],
'time': barData[2],
'open': float(barData[3]),
'high': float(barData[4]),
'low': float(barData[5]),
'close': float(barData[6]),
'volume': int(barData[7]),
'openInterest': int(barData[8])
}
barList.append(d)
return barList

View File

@ -180,7 +180,7 @@ class RpcServer(RpcObject):
def publish(self, topic, data):
"""
广播推送数据
topic主题内容
topic主题内容注意必须是ascii编码
data具体的数据
"""
# 序列化数据
@ -294,6 +294,8 @@ class RpcClient(RpcObject):
订阅特定主题的广播数据
可以使用topic=''来订阅所有的主题
注意topic必须是ascii编码
"""
self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic)

View File

@ -139,7 +139,7 @@ class BacktestingEngine(object):
# 载入初始化需要用的数据
flt = {'datetime':{'$gte':self.dataStartDate,
'$lt':self.strategyStartDate}}
initCursor = collection.find(flt)
initCursor = collection.find(flt).sort('datetime')
# 将数据从查询指针中读取出,并生成列表
self.initData = [] # 清空initData列表
@ -154,7 +154,7 @@ class BacktestingEngine(object):
else:
flt = {'datetime':{'$gte':self.strategyStartDate,
'$lte':self.dataEndDate}}
self.dbCursor = collection.find(flt)
self.dbCursor = collection.find(flt).sort('datetime')
self.output(u'载入完成,数据量:%s' %(initCursor.count() + self.dbCursor.count()))
@ -812,7 +812,7 @@ class BacktestingEngine(object):
l.append(pool.apply_async(optimize, (strategyClass, setting,
targetName, self.mode,
self.startDate, self.initDays, self.endDate,
self.slippage, self.rate, self.size,
self.slippage, self.rate, self.size, self.priceTick,
self.dbName, self.symbol)))
pool.close()
pool.join()
@ -929,7 +929,7 @@ def formatNumber(n):
#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
mode, startDate, initDays, endDate,
slippage, rate, size,
slippage, rate, size, priceTick,
dbName, symbol):
"""多进程优化时跑在每个进程中运行的函数"""
engine = BacktestingEngine()
@ -939,6 +939,7 @@ def optimize(strategyClass, setting, targetName,
engine.setSlippage(slippage)
engine.setRate(rate)
engine.setSize(size)
engine.setPriceTick(priceTick)
engine.setDatabase(dbName, symbol)
engine.initStrategy(strategyClass, setting)

View File

@ -33,6 +33,10 @@ MINUTE_DB_NAME = 'VnTrader_1Min_Db'
ENGINETYPE_BACKTESTING = 'backtesting' # 回测
ENGINETYPE_TRADING = 'trading' # 实盘
# CTA模块事件
EVENT_CTA_LOG = 'eCtaLog' # CTA相关的日志事件
EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件
# CTA引擎中涉及的数据类定义
from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT

View File

@ -337,7 +337,7 @@ class CtaEngine(object):
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
barData = self.mainEngine.dbQuery(dbName, collectionName, d)
barData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in barData:
@ -352,7 +352,7 @@ class CtaEngine(object):
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
tickData = self.mainEngine.dbQuery(dbName, collectionName, d)
tickData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in tickData:
@ -463,7 +463,25 @@ class CtaEngine(object):
if so.strategy is strategy:
self.cancelStopOrder(stopOrderID)
else:
self.writeCtaLog(u'策略实例不存在:%s' %name)
self.writeCtaLog(u'策略实例不存在:%s' %name)
#----------------------------------------------------------------------
def initAll(self):
"""全部初始化"""
for name in self.strategyDict.keys():
self.initStrategy(name)
#----------------------------------------------------------------------
def startAll(self):
"""全部启动"""
for name in self.strategyDict.keys():
self.startStrategy(name)
#----------------------------------------------------------------------
def stopAll(self):
"""全部停止"""
for name in self.strategyDict.keys():
self.stopStrategy(name)
#----------------------------------------------------------------------
def saveSetting(self):

View File

@ -13,10 +13,10 @@ from multiprocessing.pool import ThreadPool
import pymongo
from vnpy.data.datayes import DatayesApi
from vnpy.trader.vtGlobal import globalSetting
from vnpy.trader.vtConstant import *
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.datayesClient import DatayesClient
# 以下为vn.trader和通联数据规定的交易所代码映射
@ -33,10 +33,10 @@ class HistoryDataEngine(object):
"""CTA模块用的历史数据引擎"""
#----------------------------------------------------------------------
def __init__(self):
def __init__(self, token):
"""Constructor"""
self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
self.datayesClient = DatayesClient()
self.datayesClient = DatayesApi(token)
#----------------------------------------------------------------------
def lastTradeDate(self):

View File

@ -9,7 +9,8 @@ from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.uiBasicWidget import QtGui, QtCore, QtWidgets, BasicCell
from vnpy.trader.app.ctaStrategy.language import text
from .ctaBase import EVENT_CTA_LOG, EVENT_CTA_STRATEGY
from .language import text
########################################################################
@ -227,20 +228,17 @@ class CtaEngineManager(QtWidgets.QWidget):
#----------------------------------------------------------------------
def initAll(self):
"""全部初始化"""
for name in self.ctaEngine.strategyDict.keys():
self.ctaEngine.initStrategy(name)
self.ctaEngine.initAll()
#----------------------------------------------------------------------
def startAll(self):
"""全部启动"""
for name in self.ctaEngine.strategyDict.keys():
self.ctaEngine.startStrategy(name)
self.ctaEngine.startAll()
#----------------------------------------------------------------------
def stopAll(self):
"""全部停止"""
for name in self.ctaEngine.strategyDict.keys():
self.ctaEngine.stopStrategy(name)
self.ctaEngine.stopAll()
#----------------------------------------------------------------------
def load(self):
@ -267,13 +265,14 @@ class CtaEngineManager(QtWidgets.QWidget):
#----------------------------------------------------------------------
def closeEvent(self, event):
"""关闭窗口时的事件"""
reply = QtWidgets.QMessageBox.question(self, text.SAVE_POSITION_DATA,
text.SAVE_POSITION_QUESTION, QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No)
if reply == QtWidgets.QMessageBox.Yes:
self.ctaEngine.savePosition()
if self.isVisible():
reply = QtWidgets.QMessageBox.question(self, text.SAVE_POSITION_DATA,
text.SAVE_POSITION_QUESTION, QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No)
if reply == QtWidgets.QMessageBox.Yes:
self.ctaEngine.savePosition()
event.accept()

View File

@ -0,0 +1,7 @@
gateway,symbol,exchange,currency,product,tick,bar,active
CTP,IF1709,,,,y,y,IF0000
CTP,IC1709,,,,y,,
CTP,m1709,,,,y,,
SGIT,IH1709,,,,y,y,IH0000
LTS,600036,SZSE,,,y,,
IB,EUR.USD,IDEALPRO,USD,Íâ»ã,y,y,
1 gateway symbol exchange currency product tick bar active
2 CTP IF1709 y y IF0000
3 CTP IC1709 y
4 CTP m1709 y
5 SGIT IH1709 y y IH0000
6 LTS 600036 SZSE y
7 IB EUR.USD IDEALPRO USD Íâ»ã y y

View File

@ -12,6 +12,8 @@ TICK_DB_NAME = 'VnTrader_Tick_Db'
DAILY_DB_NAME = 'VnTrader_Daily_Db'
MINUTE_DB_NAME = 'VnTrader_1Min_Db'
# 行情记录模块事件
EVENT_DATARECORDER_LOG = 'eDataRecorderLog' # 行情记录日志更新事件
# CTA引擎中涉及的数据类定义
from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT

View File

@ -6,7 +6,8 @@
使用DR_setting.json来配置需要收集的合约以及主力合约代码
'''
import json
#import json
import csv
import os
import copy
from collections import OrderedDict
@ -27,7 +28,7 @@ from vnpy.trader.app.dataRecorder.language import text
class DrEngine(object):
"""数据记录引擎"""
settingFileName = 'DR_setting.json'
settingFileName = 'DR_setting.csv'
path = os.path.abspath(os.path.dirname(__file__))
settingFileName = os.path.join(path, settingFileName)
@ -49,6 +50,9 @@ class DrEngine(object):
# K线对象字典
self.barDict = {}
# 配置字典
self.settingDict = OrderedDict()
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
self.queue = Queue() # 队列
@ -57,84 +61,67 @@ class DrEngine(object):
# 载入设置,订阅行情
self.loadSetting()
# 启动数据插入线程
self.start()
# 注册事件监听
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""载入设置"""
"""加载配"""
with open(self.settingFileName) as f:
drSetting = json.load(f)
drSetting = csv.DictReader(f)
# 如果working设为False则不启动行情记录功能
working = drSetting['working']
if not working:
return
if 'tick' in drSetting:
l = drSetting['tick']
for d in drSetting:
# 读取配置
gatewayName = d['gateway']
symbol = d['symbol']
exchange = d['exchange']
currency = d['currency']
productClass = d['product']
recordTick = d['tick']
recordBar = d['bar']
activeSymbol = d['active']
for setting in l:
symbol = setting[0]
if exchange:
vtSymbol = '.'.join([symbol, exchange])
else:
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = setting[0]
# 针对LTS和IB接口订阅行情需要交易所代码
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
# 针对IB接口订阅行情需要货币和产品类型
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
tick = VtTickData() # 该tick实例可以用于缓存部分数据目前未使用
self.tickDict[vtSymbol] = tick
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
symbol = setting[0]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = symbol
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
bar = VtBarData()
self.barDict[vtSymbol] = bar
if 'active' in drSetting:
d = drSetting['active']
# 订阅行情
req = VtSubscribeReq()
req.symbol = symbol
req.exchange = exchange
req.currency = currency
req.productClass = productClass
self.mainEngine.subscribe(req, gatewayName)
# 注意这里的vtSymbol对于IB和LTS接口应该后缀.交易所
for activeSymbol, vtSymbol in d.items():
# 设置需要记录的数据
if recordTick:
tick = VtTickData()
self.tickDict[vtSymbol] = VtTickData()
if recordBar:
self.barDict[vtSymbol] = VtBarData()
if activeSymbol:
self.activeSymbolDict[vtSymbol] = activeSymbol
# 启动数据插入线程
self.start()
# 注册事件监听
self.registerEvent()
# 保存配置到缓存中
self.settingDict[vtSymbol] = d
#----------------------------------------------------------------------
def getSetting(self):
"""获取配置"""
return self.settingDict
#----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情推送"""
tick = event.dict_['data']
vtSymbol = tick.vtSymbol
# 转化Tick格式
if not tick.datetime:
tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')

View File

@ -4,14 +4,10 @@
行情记录模块相关的GUI控制组件
'''
import json
from qtpy import QtWidgets, QtGui, QtCore
from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.app.dataRecorder.language import text
from vnpy.trader.uiQt import QtWidgets, QtCore
from .drBase import EVENT_DATARECORDER_LOG
from .language import text
########################################################################
@ -119,33 +115,24 @@ class DrEngineManager(QtWidgets.QWidget):
#----------------------------------------------------------------------
def updateSetting(self):
"""显示引擎行情记录配置"""
with open(self.drEngine.settingFileName) as f:
drSetting = json.load(f)
if 'tick' in drSetting:
l = drSetting['tick']
for setting in l:
self.tickTable.insertRow(0)
self.tickTable.setItem(0, 0, TableCell(setting[0]))
self.tickTable.setItem(0, 1, TableCell(setting[1]))
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
self.barTable.insertRow(0)
self.barTable.setItem(0, 0, TableCell(setting[0]))
self.barTable.setItem(0, 1, TableCell(setting[1]))
if 'active' in drSetting:
d = drSetting['active']
for activeSymbol, symbol in d.items():
self.activeTable.insertRow(0)
self.activeTable.setItem(0, 0, TableCell(activeSymbol))
self.activeTable.setItem(0, 1, TableCell(symbol))
setting = self.drEngine.getSetting()
for d in setting.values():
if d['tick']:
self.tickTable.insertRow(0)
self.tickTable.setItem(0, 0, TableCell(d['symbol']))
self.tickTable.setItem(0, 1, TableCell(d['gateway']))
if d['bar']:
self.barTable.insertRow(0)
self.barTable.setItem(0, 0, TableCell(d['symbol']))
self.barTable.setItem(0, 1, TableCell(d['gateway']))
if d['active']:
self.activeTable.insertRow(0)
self.activeTable.setItem(0, 0, TableCell(d['active']))
self.activeTable.setItem(0, 1, TableCell(d['symbol']))
self.tickTable.resizeColumnsToContents()
self.barTable.resizeColumnsToContents()
self.activeTable.resizeColumnsToContents()

View File

@ -2,8 +2,8 @@
"orderFlowClear": 1,
"orderCancelLimit": 10,
"workingOrderLimit": 20,
"tradeLimit": 100,
"orderSizeLimit": 10,
"tradeLimit": 1000,
"orderSizeLimit": 100,
"active": true,
"orderFlowLimit": 50
}

View File

@ -0,0 +1,85 @@
[
{
"name": "m.09-01",
"activeLeg":
{
"vtSymbol": "m1709",
"ratio": 1,
"multiplier": 1.0,
"payup": 2
},
"passiveLegs": [
{
"vtSymbol": "m1801",
"ratio": -1,
"multiplier": -1.0,
"payup": 2
}
]
},
{
"name": "IF.07-09",
"activeLeg":
{
"vtSymbol": "IF1707",
"ratio": 1,
"multiplier": 1.0,
"payup": 2
},
"passiveLegs": [
{
"vtSymbol": "IF1709",
"ratio": -1,
"multiplier": -1.0,
"payup": 2
}
]
},
{
"name": "IH.07-09",
"activeLeg":
{
"vtSymbol": "IH1707",
"ratio": 1,
"multiplier": 1.0,
"payup": 2
},
"passiveLegs": [
{
"vtSymbol": "IH1709",
"ratio": -1,
"multiplier": -1.0,
"payup": 2
}
]
},
{
"name": "IC.07-09",
"activeLeg":
{
"vtSymbol": "IC1707",
"ratio": 1,
"multiplier": 1.0,
"payup": 2
},
"passiveLegs": [
{
"vtSymbol": "IC1709",
"ratio": -1,
"multiplier": -1.0,
"payup": 2
}
]
}
]

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from .stEngine import StEngine
from .uiStWidget import StManager
appName = 'SpreadTrading'
appDisplayName = u'价差交易'
appEngine = StEngine
appWidget = StManager
appIco = 'st.ico'

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@ -0,0 +1,517 @@
# encoding: UTF-8
from math import floor
from vnpy.trader.vtConstant import (EMPTY_INT, EMPTY_FLOAT,
EMPTY_STRING, EMPTY_UNICODE,
DIRECTION_LONG, DIRECTION_SHORT,
STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED)
########################################################################
class StAlgoTemplate(object):
"""价差算法交易模板"""
MODE_LONGSHORT = u'双向'
MODE_LONGONLY = u'做多'
MODE_SHORTONLY = u'做空'
SPREAD_LONG = 1
SPREAD_SHORT = 2
#----------------------------------------------------------------------
def __init__(self, algoEngine, spread):
"""Constructor"""
self.algoEngine = algoEngine # 算法引擎
self.spreadName = spread.name # 价差名称
self.spread = spread # 价差对象
self.algoName = EMPTY_STRING # 算法名称
self.active = False # 工作状态
self.mode = self.MODE_LONGSHORT # 工作模式
self.buyPrice = EMPTY_FLOAT # 开平仓价格
self.sellPrice = EMPTY_FLOAT
self.shortPrice = EMPTY_FLOAT
self.coverPrice = EMPTY_FLOAT
self.maxPosSize = EMPTY_INT # 最大单边持仓量
self.maxOrderSize = EMPTY_INT # 最大单笔委托量
#----------------------------------------------------------------------
def updateSpreadTick(self, spread):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateSpreadPos(self, spread):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateTrade(self, trade):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateOrder(self, order):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateTimer(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def start(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def stop(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def setBuyPrice(self, buyPrice):
"""设置买开的价格"""
self.buyPrice = buyPrice
#----------------------------------------------------------------------
def setSellPrice(self, sellPrice):
"""设置卖平的价格"""
self.sellPrice = sellPrice
#----------------------------------------------------------------------
def setShortPrice(self, shortPrice):
"""设置卖开的价格"""
self.shortPrice = shortPrice
#----------------------------------------------------------------------
def setCoverPrice(self, coverPrice):
"""设置买平的价格"""
self.coverPrice = coverPrice
#----------------------------------------------------------------------
def setMode(self, mode):
"""设置算法交易方向"""
self.mode = mode
#----------------------------------------------------------------------
def setMaxOrderSize(self, maxOrderSize):
"""设置最大单笔委托数量"""
self.maxOrderSize = maxOrderSize
#----------------------------------------------------------------------
def setMaxPosSize(self, maxPosSize):
"""设置最大持仓数量"""
self.maxPosSize = maxPosSize
#----------------------------------------------------------------------
def putEvent(self):
"""发出算法更新事件"""
self.algoEngine.putAlgoEvent(self)
#----------------------------------------------------------------------
def writeLog(self, content):
"""输出算法日志"""
prefix = ' '.join([self.spreadName, self.algoName])
content = ':'.join([prefix, content])
self.algoEngine.writeLog(content)
#----------------------------------------------------------------------
def getAlgoParams(self):
"""获取算法参数"""
d = {
"spreadName": self.spreadName,
"algoName": self.algoName,
"buyPrice": self.buyPrice,
"sellPrice": self.sellPrice,
"shortPrice": self.shortPrice,
"coverPrice": self.coverPrice,
"maxOrderSize": self.maxOrderSize,
"maxPosSize": self.maxPosSize,
"mode": self.mode
}
return d
#----------------------------------------------------------------------
def setAlgoParams(self, d):
"""设置算法参数"""
self.buyPrice = d.get('buyPrice', EMPTY_FLOAT)
self.sellPrice = d.get('sellPrice', EMPTY_FLOAT)
self.shortPrice = d.get('shortPrice', EMPTY_FLOAT)
self.coverPrice = d.get('coverPrice', EMPTY_FLOAT)
self.maxOrderSize = d.get('maxOrderSize', EMPTY_INT)
self.maxPosSize = d.get('maxPosSize', EMPTY_INT)
self.mode = d.get('mode', self.MODE_LONGSHORT)
########################################################################
class SniperAlgo(StAlgoTemplate):
"""狙击算法(市价委托)"""
FINISHED_STATUS = [STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED]
#----------------------------------------------------------------------
def __init__(self, algoEngine, spread):
"""Constructor"""
super(SniperAlgo, self).__init__(algoEngine, spread)
self.algoName = u'Sniper'
self.quoteInterval = 2 # 主动腿报价撤单再发前等待的时间
self.quoteCount = 0 # 报价计数
self.hedgeInterval = 2 # 对冲腿对冲撤单再发前的等待时间
self.hedgeCount = 0 # 对冲计数
self.activeVtSymbol = spread.activeLeg.vtSymbol # 主动腿代码
self.passiveVtSymbols = [leg.vtSymbol for leg in spread.passiveLegs] # 被动腿代码列表
# 缓存每条腿对象的字典
self.legDict = {}
self.legDict[spread.activeLeg.vtSymbol] = spread.activeLeg
for leg in spread.passiveLegs:
self.legDict[leg.vtSymbol] = leg
self.hedgingTaskDict = {} # 被动腿需要对冲的数量字典 vtSymbol:volume
self.legOrderDict = {} # vtSymbol: list of vtOrderID
self.orderTradedDict = {} # vtOrderID: tradedVolume
#----------------------------------------------------------------------
def updateSpreadTick(self, spread):
"""价差行情更新"""
self.spread = spread
# 若算法没有启动则直接返回
if not self.active:
return
# 若当前已有主动腿委托则直接返回
if (self.activeVtSymbol in self.legOrderDict and
self.legOrderDict[self.activeVtSymbol]):
return
# 允许做多
if self.mode == self.MODE_LONGSHORT or self.mode == self.MODE_LONGONLY:
# 买入
if (spread.netPos >= 0 and
spread.netPos < self.maxPosSize and
spread.askPrice <= self.buyPrice):
self.quoteActiveLeg(self.SPREAD_LONG)
self.writeLog(u'买入开仓')
# 卖出
elif (spread.netPos > 0 and
spread.bidPrice >= self.sellPrice):
self.quoteActiveLeg(self.SPREAD_SHORT)
self.writeLog(u'卖出平仓')
# 允许做空
if self.mode == self.MODE_LONGSHORT or self.mode == self.MODE_SHORTONLY:
# 做空
if (spread.netPos <= 0 and
spread.netPos > -self.maxPosSize and
spread.bidPrice >= self.shortPrice):
self.quoteActiveLeg(self.SPREAD_SHORT)
self.writeLog(u'卖出开仓')
# 平空
elif (spread.netPos < 0 and
spread.askPrice <= self.coverPrice):
self.quoteActiveLeg(self.SPREAD_LONG)
self.writeLog(u'买入平仓')
#----------------------------------------------------------------------
def updateSpreadPos(self, spread):
"""价差持仓更新"""
self.spread = spread
#----------------------------------------------------------------------
def updateTrade(self, trade):
"""成交更新"""
pass
#----------------------------------------------------------------------
def updateOrder(self, order):
"""委托更新"""
if not self.active:
return
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
newTradedVolume = order.tradedVolume
lastTradedVolume = self.orderTradedDict.get(vtOrderID, 0)
# 检查是否有新的成交
if newTradedVolume > lastTradedVolume:
self.orderTradedDict[vtOrderID] = newTradedVolume # 缓存委托已经成交数量
volume = newTradedVolume - lastTradedVolume # 计算本次成交数量
if vtSymbol == self.activeVtSymbol:
self.newActiveLegTrade(vtSymbol, order.direction, volume)
else:
self.newPassiveLegTrade(vtSymbol, order.direction, volume)
# 处理完成委托
if order.status in self.FINISHED_STATUS:
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
# 从委托列表中移除该委托
orderList = self.legOrderDict[vtSymbol]
if vtOrderID in orderList:
orderList.remove(vtOrderID)
# 检查若是被动腿,且已经没有未完成委托,则执行对冲
if not orderList and vtSymbol in self.passiveVtSymbols:
self.hedgePassiveLeg(vtSymbol)
#----------------------------------------------------------------------
def updateTimer(self):
"""计时更新"""
if not self.active:
return
self.quoteCount += 1
self.hedgeCount += 1
# 计时到达报价间隔后,则对尚未成交的主动腿委托全部撤单
# 收到撤单回报后清空委托列表,等待下次价差更新再发单
if self.quoteCount > self.quoteInterval:
self.cancelLegOrder(self.activeVtSymbol)
self.quoteCount = 0
# 计时到达对冲间隔后,则对尚未成交的全部被动腿委托全部撤单
# 收到撤单回报后,会自动发送新的对冲委托
if self.hedgeCount > self.hedgeInterval:
self.cancelAllPassiveLegOrders()
self.hedgeCount = 0
#----------------------------------------------------------------------
def start(self):
"""启动"""
# 如果已经运行则直接返回状态
if self.active:
return self.active
# 做多检查
if self.mode != self.MODE_SHORTONLY:
if self.buyPrice >= self.sellPrice:
self.writeLog(u'启动失败允许多头交易时BuyPrice必须小于SellPrice')
return self.active
# 做空检查
if self.mode != self.MODE_LONGONLY:
if self.shortPrice <= self.coverPrice:
self.writeLog(u'启动失败允许空头交易时ShortPrice必须大于CoverPrice')
return self.active
# 多空检查
if self.mode == self.MODE_LONGSHORT:
if self.buyPrice >= self.coverPrice:
self.writeLog(u'启动失败允许双向交易时BuyPrice必须小于CoverPrice')
return self.active
if self.shortPrice <= self.sellPrice:
self.writeLog(u'启动失败允许双向交易时ShortPrice必须大于SellPrice')
return self.active
# 启动算法
self.quoteCount = 0
self.hedgeCount = 0
self.active = True
self.writeLog(u'算法启动')
return self.active
#----------------------------------------------------------------------
def stop(self):
"""停止"""
if self.active:
self.hedgingTaskDict.clear()
self.cancelAllOrders()
self.active = False
self.writeLog(u'算法停止')
return self.active
#----------------------------------------------------------------------
def sendLegOrder(self, leg, legVolume):
"""发送每条腿的委托"""
vtSymbol = leg.vtSymbol
volume = abs(legVolume)
payup = leg.payup
# 发送委托
if legVolume > 0:
price = leg.askPrice
if leg.shortPos > 0:
orderList = self.algoEngine.cover(vtSymbol, price, volume, payup)
else:
orderList = self.algoEngine.buy(vtSymbol, price, volume, payup)
elif legVolume < 0:
price = leg.bidPrice
if leg.longPos > 0:
orderList = self.algoEngine.sell(vtSymbol, price, volume, payup)
else:
orderList = self.algoEngine.short(vtSymbol, price, volume, payup)
# 保存到字典中
if vtSymbol not in self.legOrderDict:
self.legOrderDict[vtSymbol] = orderList
else:
self.legOrderDict[vtSymbol].extend(orderList)
#----------------------------------------------------------------------
def quoteActiveLeg(self, direction):
"""发出主动腿"""
spread = self.spread
# 首先计算不带正负号的价差委托量
if direction == self.SPREAD_LONG:
spreadVolume = min(spread.askVolume,
self.maxPosSize - spread.netPos,
self.maxOrderSize)
# 有价差空头持仓的情况下,则本次委托最多平完空头
if spread.shortPos > 0:
spreadVolume = min(spreadVolume, spread.shortPos)
else:
spreadVolume = min(spread.bidVolume,
self.maxPosSize + spread.netPos,
self.maxOrderSize)
# 有价差多头持仓的情况下,则本次委托最多平完多头
if spread.longPos > 0:
spreadVolume = min(spreadVolume, spread.longPos)
if spreadVolume <= 0:
return
# 加上价差方向
if direction == self.SPREAD_SHORT:
spreadVolume = -spreadVolume
# 计算主动腿委托量
leg = self.legDict[self.activeVtSymbol]
legVolume = spreadVolume * leg.ratio
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的主动腿%s狙击单' %self.activeVtSymbol)
self.quoteCount = 0 # 重置主动腿报价撤单等待计数
#----------------------------------------------------------------------
def hedgePassiveLeg(self, vtSymbol):
"""被动腿对冲"""
if vtSymbol not in self.hedgingTaskDict:
return
orderList = self.legOrderDict.get(vtSymbol, [])
if orderList:
return
legVolume = self.hedgingTaskDict[vtSymbol]
leg = self.legDict[vtSymbol]
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的被动腿%s对冲单' %vtSymbol)
#----------------------------------------------------------------------
def hedgeAllPassiveLegs(self):
"""执行所有被动腿对冲"""
for vtSymbol in self.hedgingTaskDict.keys():
self.hedgePassiveLeg(vtSymbol)
self.hedgeCount = 0 # 重置被动腿对冲撤单等待计数
#----------------------------------------------------------------------
def newActiveLegTrade(self, vtSymbol, direction, volume):
"""新的主动腿成交"""
# 输出日志
self.writeLog(u'主动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
# 将主动腿成交带上方向
if direction == DIRECTION_SHORT:
volume = -volume
# 计算主动腿成交后,对应的价差仓位
spread = self.spread
activeRatio = spread.activeLeg.ratio
spreadVolume = round(volume / activeRatio) # 四舍五入求主动腿成交量对应的价差份数
# 计算价差新仓位,对应的被动腿需要对冲部分
for leg in self.spread.passiveLegs:
newHedgingTask = leg.ratio * spreadVolume
if leg.vtSymbol not in self.hedgingTaskDict:
self.hedgingTaskDict[leg.vtSymbol] = newHedgingTask
else:
self.hedgingTaskDict[leg.vtSymbol] += newHedgingTask
# 发出被动腿对冲委托
self.hedgeAllPassiveLegs()
#----------------------------------------------------------------------
def newPassiveLegTrade(self, vtSymbol, direction, volume):
"""新的被动腿成交"""
if vtSymbol in self.hedgingTaskDict:
# 计算完成的对冲数量
if direction == DIRECTION_LONG:
hedgedVolume = volume
else:
hedgedVolume = -volume
# 计算剩余尚未完成的数量
self.hedgingTaskDict[vtSymbol] -= hedgedVolume
# 如果已全部完成,则从字典中移除
if not self.hedgingTaskDict[vtSymbol]:
del self.hedgingTaskDict[vtSymbol]
# 输出日志
self.writeLog(u'被动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
#----------------------------------------------------------------------
def cancelLegOrder(self, vtSymbol):
"""撤销某条腿的委托"""
if vtSymbol not in self.legOrderDict:
return
orderList = self.legOrderDict[vtSymbol]
if not orderList:
return
for vtOrderID in orderList:
self.algoEngine.cancelOrder(vtOrderID)
self.writeLog(u'撤单%s的所有委托' %vtSymbol)
#----------------------------------------------------------------------
def cancelAllOrders(self):
"""撤销全部委托"""
for orderList in self.legOrderDict.values():
for vtOrderID in orderList:
self.algoEngine.cancelOrder(vtOrderID)
self.writeLog(u'全部撤单')
#----------------------------------------------------------------------
def cancelAllPassiveLegOrders(self):
"""撤销全部被动腿委托"""
cancelPassive = False
for vtSymbol in self.passiveVtSymbols:
if vtSymbol in self.legOrderDict and self.legOrderDict[vtSymbol]:
self.cancelLegOrder(vtSymbol)
cancelPassive = True
# 只有确实发出撤单委托时,才输出信息
if cancelPassive:
self.writeLog(u'被动腿全撤')

View File

@ -0,0 +1,168 @@
# encoding: UTF-8
from __future__ import division
from math import floor
from datetime import datetime
from vnpy.trader.vtConstant import (EMPTY_INT, EMPTY_FLOAT,
EMPTY_STRING, EMPTY_UNICODE)
EVENT_SPREADTRADING_TICK = 'eSpreadTradingTick.'
EVENT_SPREADTRADING_POS = 'eSpreadTradingPos.'
EVENT_SPREADTRADING_LOG = 'eSpreadTradingLog'
EVENT_SPREADTRADING_ALGO = 'eSpreadTradingAlgo.'
EVENT_SPREADTRADING_ALGOLOG = 'eSpreadTradingAlgoLog'
########################################################################
class StLeg(object):
""""""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.vtSymbol = EMPTY_STRING # 代码
self.ratio = EMPTY_INT # 实际交易时的比例
self.multiplier = EMPTY_FLOAT # 计算价差时的乘数
self.payup = EMPTY_INT # 对冲时的超价tick
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.bidVolume = EMPTY_INT
self.askVolume = EMPTY_INT
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
########################################################################
class StSpread(object):
""""""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.name = EMPTY_UNICODE # 名称
self.symbol = EMPTY_STRING # 代码(基于组成腿计算)
self.activeLeg = None # 主动腿
self.passiveLegs = [] # 被动腿(支持多条)
self.allLegs = [] # 所有腿
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.bidVolume = EMPTY_INT
self.askVolume = EMPTY_INT
self.time = EMPTY_STRING
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
#----------------------------------------------------------------------
def initSpread(self):
"""初始化价差"""
# 价差最少要有一条主动腿
if not self.activeLeg:
return
# 生成所有腿列表
self.allLegs.append(self.activeLeg)
self.allLegs.extend(self.passiveLegs)
# 生成价差代码
legSymbolList = []
for leg in self.allLegs:
if leg.multiplier >= 0:
legSymbol = '+%s*%s' %(leg.multiplier, leg.vtSymbol)
else:
legSymbol = '%s*%s' %(leg.multiplier, leg.vtSymbol)
legSymbolList.append(legSymbol)
self.symbol = ''.join(legSymbolList)
#----------------------------------------------------------------------
def calculatePrice(self):
"""计算价格"""
# 清空价格和委托量数据
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.askVolume = EMPTY_INT
self.bidVolume = EMPTY_INT
# 遍历价差腿列表
for n, leg in enumerate(self.allLegs):
# 计算价格
if leg.multiplier > 0:
self.bidPrice += leg.bidPrice * leg.multiplier
self.askPrice += leg.askPrice * leg.multiplier
else:
self.bidPrice += leg.askPrice * leg.multiplier
self.askPrice += leg.bidPrice * leg.multiplier
# 计算报单量
if leg.ratio > 0:
legAdjustedBidVolume = floor(leg.bidVolume / leg.ratio)
legAdjustedAskVolume = floor(leg.askVolume / leg.ratio)
else:
legAdjustedBidVolume = floor(leg.askVolume / abs(leg.ratio))
legAdjustedAskVolume = floor(leg.bidVolume / abs(leg.ratio))
if n == 0:
self.bidVolume = legAdjustedBidVolume # 对于第一条腿,直接初始化
self.askVolume = legAdjustedAskVolume
else:
self.bidVolume = min(self.bidVolume, legAdjustedBidVolume) # 对于后续的腿,价差可交易报单量取较小值
self.askVolume = min(self.askVolume, legAdjustedAskVolume)
# 更新时间
self.time = datetime.now().strftime('%H:%M:%S.%f')[:-3]
#----------------------------------------------------------------------
def calculatePos(self):
"""计算持仓"""
# 清空持仓数据
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
# 遍历价差腿列表
for n, leg in enumerate(self.allLegs):
if leg.ratio > 0:
legAdjustedLongPos = floor(leg.longPos / leg.ratio)
legAdjustedShortPos = floor(leg.shortPos / leg.ratio)
else:
legAdjustedLongPos = floor(leg.shortPos / abs(leg.ratio))
legAdjustedShortPos = floor(leg.longPos / abs(leg.ratio))
if n == 0:
self.longPos = legAdjustedLongPos
self.shortPos = legAdjustedShortPos
else:
self.longPos = min(self.longPos, legAdjustedLongPos)
self.shortPos = min(self.shortPos, legAdjustedShortPos)
# 计算净仓位
self.longPos = int(self.longPos)
self.shortPos = int(self.shortPos)
self.netPos = self.longPos - self.shortPos
#----------------------------------------------------------------------
def addActiveLeg(self, leg):
"""添加主动腿"""
self.activeLeg = leg
#----------------------------------------------------------------------
def addPassiveLeg(self, leg):
"""添加被动腿"""
self.passiveLegs.append(leg)

View File

@ -0,0 +1,584 @@
# encoding: UTF-8
import json
import traceback
import shelve
from vnpy.event import Event
from vnpy.trader.vtFunction import getJsonPath, getTempPath
from vnpy.trader.vtEvent import (EVENT_TICK, EVENT_TRADE, EVENT_POSITION,
EVENT_TIMER, EVENT_ORDER)
from vnpy.trader.vtObject import (VtSubscribeReq, VtOrderReq,
VtCancelOrderReq, VtLogData)
from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT,
OFFSET_OPEN, OFFSET_CLOSE,
PRICETYPE_LIMITPRICE)
from .stBase import (StLeg, StSpread, EVENT_SPREADTRADING_TICK,
EVENT_SPREADTRADING_POS, EVENT_SPREADTRADING_LOG,
EVENT_SPREADTRADING_ALGO, EVENT_SPREADTRADING_ALGOLOG)
from .stAlgo import SniperAlgo
########################################################################
class StDataEngine(object):
"""价差数据计算引擎"""
settingFileName = 'ST_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 腿、价差相关字典
self.legDict = {} # vtSymbol:StLeg
self.spreadDict = {} # name:StSpread
self.vtSymbolSpreadDict = {} # vtSymbol:StSpread
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载配置"""
try:
with open(self.settingFilePath) as f:
l = json.load(f)
for setting in l:
result, msg = self.createSpread(setting)
self.writeLog(msg)
self.writeLog(u'价差配置加载完成')
except:
content = u'价差配置加载出错,原因:' + traceback.format_exc()
self.writeLog(content)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存配置"""
with open(self.settingFilePath) as f:
pass
#----------------------------------------------------------------------
def createSpread(self, setting):
"""创建价差"""
result = False
msg = ''
# 检查价差重名
if setting['name'] in self.spreadDict:
msg = u'%s价差存在重名' %setting['name']
return result, msg
# 检查腿是否已使用
l = []
l.append(setting['activeLeg']['vtSymbol'])
for d in setting['passiveLegs']:
l.append(d['vtSymbol'])
for vtSymbol in l:
if vtSymbol in self.vtSymbolSpreadDict:
existingSpread = self.vtSymbolSpreadDict[vtSymbol]
msg = u'%s合约已经存在于%s价差中' %(vtSymbol, existingSpread.name)
return result, msg
# 创建价差
spread = StSpread()
spread.name = setting['name']
self.spreadDict[spread.name] = spread
# 创建主动腿
activeSetting = setting['activeLeg']
activeLeg = StLeg()
activeLeg.vtSymbol = str(activeSetting['vtSymbol'])
activeLeg.ratio = float(activeSetting['ratio'])
activeLeg.multiplier = float(activeSetting['multiplier'])
activeLeg.payup = int(activeSetting['payup'])
spread.addActiveLeg(activeLeg)
self.legDict[activeLeg.vtSymbol] = activeLeg
self.vtSymbolSpreadDict[activeLeg.vtSymbol] = spread
self.subscribeMarketData(activeLeg.vtSymbol)
# 创建被动腿
passiveSettingList = setting['passiveLegs']
passiveLegList = []
for d in passiveSettingList:
passiveLeg = StLeg()
passiveLeg.vtSymbol = str(d['vtSymbol'])
passiveLeg.ratio = float(d['ratio'])
passiveLeg.multiplier = float(d['multiplier'])
passiveLeg.payup = int(d['payup'])
spread.addPassiveLeg(passiveLeg)
self.legDict[passiveLeg.vtSymbol] = passiveLeg
self.vtSymbolSpreadDict[passiveLeg.vtSymbol] = spread
self.subscribeMarketData(passiveLeg.vtSymbol)
# 初始化价差
spread.initSpread()
self.putSpreadTickEvent(spread)
self.putSpreadPosEvent(spread)
# 返回结果
result = True
msg = u'%s价差创建成功' %spread.name
return result, msg
#----------------------------------------------------------------------
def processTickEvent(self, event):
"""处理行情推送"""
# 检查行情是否需要处理
tick = event.dict_['data']
if tick.vtSymbol not in self.legDict:
return
# 更新腿价格
leg = self.legDict[tick.vtSymbol]
leg.bidPrice = tick.bidPrice1
leg.askPrice = tick.askPrice1
leg.bidVolume = tick.bidVolume1
leg.askVolume = tick.askVolume1
# 更新价差价格
spread = self.vtSymbolSpreadDict[tick.vtSymbol]
spread.calculatePrice()
# 发出事件
self.putSpreadTickEvent(spread)
#----------------------------------------------------------------------
def putSpreadTickEvent(self, spread):
"""发出价差行情更新事件"""
event1 = Event(EVENT_SPREADTRADING_TICK+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_TICK)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交推送"""
# 检查成交是否需要处理
trade = event.dict_['data']
if trade.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[trade.vtSymbol]
direction = trade.direction
offset = trade.offset
if direction == DIRECTION_LONG:
if offset == OFFSET_OPEN:
leg.longPos += trade.volume
else:
leg.shortPos -= trade.volume
else:
if offset == OFFSET_OPEN:
leg.shortPos += trade.volume
else:
leg.longPos -= trade.volume
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[trade.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processPosEvent(self, event):
"""处理持仓推送"""
# 检查持仓是否需要处理
pos = event.dict_['data']
if pos.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[pos.vtSymbol]
direction = pos.direction
if direction == DIRECTION_LONG:
leg.longPos = pos.position
else:
leg.shortPos = pos.position
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[pos.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
self.putSpreadPosEvent(spread)
#----------------------------------------------------------------------
def putSpreadPosEvent(self, spread):
"""发出价差持仓事件"""
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def registerEvent(self):
""""""
self.eventEngine.register(EVENT_TICK, self.processTickEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_POSITION, self.processPosEvent)
#----------------------------------------------------------------------
def subscribeMarketData(self, vtSymbol):
"""订阅行情"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
self.writeLog(u'订阅行情失败,找不到该合约%s' %vtSymbol)
return
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
self.mainEngine.subscribe(req, contract.gatewayName)
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def getAllSpreads(self):
"""获取所有的价差"""
return self.spreadDict.values()
########################################################################
class StAlgoEngine(object):
"""价差算法交易引擎"""
algoFileName = 'SpreadTradingAlgo.vt'
algoFilePath = getTempPath(algoFileName)
#----------------------------------------------------------------------
def __init__(self, dataEngine, mainEngine, eventEngine):
"""Constructor"""
self.dataEngine = dataEngine
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.algoDict = {} # spreadName:algo
self.vtSymbolAlgoDict = {} # vtSymbol:algo
self.registerEvent()
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_SPREADTRADING_TICK, self.processSpreadTickEvent)
self.eventEngine.register(EVENT_SPREADTRADING_POS, self.processSpreadPosEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
#----------------------------------------------------------------------
def processSpreadTickEvent(self, event):
"""处理价差行情事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadTick(spread)
#----------------------------------------------------------------------
def processSpreadPosEvent(self, event):
"""处理价差持仓事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadPos(spread)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交事件"""
trade = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(trade.vtSymbol, None)
if algo:
algo.updateTrade(trade)
#----------------------------------------------------------------------
def processOrderEvent(self, event):
"""处理委托事件"""
order = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(order.vtSymbol, None)
if algo:
algo.updateOrder(order)
#----------------------------------------------------------------------
def processTimerEvent(self, event):
""""""
for algo in self.algoDict.values():
algo.updateTimer()
#----------------------------------------------------------------------
def sendOrder(self, vtSymbol, direction, offset, price, volume, payup=0):
"""发单"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
return ''
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.direction = direction
req.offset = offset
req.volume = int(volume)
req.priceType = PRICETYPE_LIMITPRICE
if direction == DIRECTION_LONG:
req.price = price + payup * contract.priceTick
else:
req.price = price - payup * contract.priceTick
vtOrderID = self.mainEngine.sendOrder(req, contract.gatewayName)
return vtOrderID
#----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
order = self.mainEngine.getOrder(vtOrderID)
if not order:
return
req = VtCancelOrderReq()
req.symbol = order.symbol
req.exchange = order.exchange
req.frontID = order.frontID
req.sessionID = order.sessionID
req.orderID = order.orderID
self.mainEngine.cancelOrder(req, order.gatewayName)
#----------------------------------------------------------------------
def buy(self, vtSymbol, price, volume, payup=0):
"""买入"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_OPEN, price, volume, payup)
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def sell(self, vtSymbol, price, volume, payup=0):
"""卖出"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_CLOSE, price, volume, payup)
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def short(self, vtSymbol, price, volume, payup=0):
"""卖空"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_OPEN, price, volume, payup)
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def cover(self, vtSymbol, price, volume, payup=0):
"""平空"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_CLOSE, price, volume, payup)
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def putAlgoEvent(self, algo):
"""发出算法状态更新事件"""
event = Event(EVENT_SPREADTRADING_ALGO+algo.name)
self.eventEngine.put(event)
#----------------------------------------------------------------------
def writeLog(self, content):
"""输出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_ALGOLOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存算法配置"""
setting = {}
for algo in self.algoDict.values():
setting[algo.spreadName] = algo.getAlgoParams()
f = shelve.open(self.algoFilePath)
f['setting'] = setting
f.close()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载算法配置"""
# 创建算法对象
l = self.dataEngine.getAllSpreads()
for spread in l:
algo = SniperAlgo(self, spread)
self.algoDict[spread.name] = algo
# 保存腿代码和算法对象的映射
for leg in spread.allLegs:
self.vtSymbolAlgoDict[leg.vtSymbol] = algo
# 加载配置
f = shelve.open(self.algoFilePath)
setting = f.get('setting', None)
f.close()
if not setting:
return
for algo in self.algoDict.values():
if algo.spreadName in setting:
d = setting[algo.spreadName]
algo.setAlgoParams(d)
#----------------------------------------------------------------------
def stopAll(self):
"""停止全部算法"""
for algo in self.algoDict.values():
algo.stop()
#----------------------------------------------------------------------
def startAlgo(self, spreadName):
"""启动算法"""
algo = self.algoDict[spreadName]
algoActive = algo.start()
return algoActive
#----------------------------------------------------------------------
def stopAlgo(self, spreadName):
"""停止算法"""
algo = self.algoDict[spreadName]
algoActive = algo.stop()
return algoActive
#----------------------------------------------------------------------
def getAllAlgoParams(self):
"""获取所有算法的参数"""
return [algo.getAlgoParams() for algo in self.algoDict.values()]
#----------------------------------------------------------------------
def setAlgoBuyPrice(self, spreadName, buyPrice):
"""设置算法买开价格"""
algo = self.algoDict[spreadName]
algo.setBuyPrice(buyPrice)
#----------------------------------------------------------------------
def setAlgoSellPrice(self, spreadName, sellPrice):
"""设置算法卖平价格"""
algo = self.algoDict[spreadName]
algo.setSellPrice(sellPrice)
#----------------------------------------------------------------------
def setAlgoShortPrice(self, spreadName, shortPrice):
"""设置算法卖开价格"""
algo = self.algoDict[spreadName]
algo.setShortPrice(shortPrice)
#----------------------------------------------------------------------
def setAlgoCoverPrice(self, spreadName, coverPrice):
"""设置算法买平价格"""
algo = self.algoDict[spreadName]
algo.setCoverPrice(coverPrice)
#----------------------------------------------------------------------
def setAlgoMode(self, spreadName, mode):
"""设置算法工作模式"""
algo = self.algoDict[spreadName]
algo.setMode(mode)
#----------------------------------------------------------------------
def setAlgoMaxOrderSize(self, spreadName, maxOrderSize):
"""设置算法单笔委托限制"""
algo = self.algoDict[spreadName]
algo.setMaxOrderSize(maxOrderSize)
#----------------------------------------------------------------------
def setAlgoMaxPosSize(self, spreadName, maxPosSize):
"""设置算法持仓限制"""
algo = self.algoDict[spreadName]
algo.setMaxPosSize(maxPosSize)
########################################################################
class StEngine(object):
"""价差引擎"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.dataEngine = StDataEngine(mainEngine, eventEngine)
self.algoEngine = StAlgoEngine(self.dataEngine, mainEngine, eventEngine)
#----------------------------------------------------------------------
def init(self):
"""初始化"""
self.dataEngine.loadSetting()
self.algoEngine.loadSetting()
#----------------------------------------------------------------------
def stop(self):
"""停止"""
self.dataEngine.saveSetting()
self.algoEngine.stopAll()
self.algoEngine.saveSetting()

View File

@ -0,0 +1,521 @@
# encoding: UTF-8
from collections import OrderedDict
from vnpy.event import Event
from vnpy.trader.uiQt import QtWidgets, QtCore
from vnpy.trader.uiBasicWidget import (BasicMonitor, BasicCell, PnlCell,
AskCell, BidCell, BASIC_FONT)
from .stBase import (EVENT_SPREADTRADING_TICK, EVENT_SPREADTRADING_POS,
EVENT_SPREADTRADING_LOG, EVENT_SPREADTRADING_ALGO,
EVENT_SPREADTRADING_ALGOLOG)
from .stAlgo import StAlgoTemplate
STYLESHEET_START = 'background-color: rgb(111,255,244); color: black'
STYLESHEET_STOP = 'background-color: rgb(255,201,111); color: black'
########################################################################
class StTickMonitor(BasicMonitor):
"""价差行情监控"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine, parent=None):
"""Constructor"""
super(StTickMonitor, self).__init__(mainEngine, eventEngine, parent)
d = OrderedDict()
d['name'] = {'chinese':u'价差名称', 'cellType':BasicCell}
d['bidPrice'] = {'chinese':u'买价', 'cellType':BidCell}
d['bidVolume'] = {'chinese':u'买量', 'cellType':BidCell}
d['askPrice'] = {'chinese':u'卖价', 'cellType':AskCell}
d['askVolume'] = {'chinese':u'卖量', 'cellType':AskCell}
d['time'] = {'chinese':u'时间', 'cellType':BasicCell}
d['symbol'] = {'chinese':u'价差公式', 'cellType':BasicCell}
self.setHeaderDict(d)
self.setDataKey('name')
self.setEventType(EVENT_SPREADTRADING_TICK)
self.setFont(BASIC_FONT)
self.initTable()
self.registerEvent()
########################################################################
class StPosMonitor(BasicMonitor):
"""价差持仓监控"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine, parent=None):
"""Constructor"""
super(StPosMonitor, self).__init__(mainEngine, eventEngine, parent)
d = OrderedDict()
d['name'] = {'chinese':u'价差名称', 'cellType':BasicCell}
d['netPos'] = {'chinese':u'净仓', 'cellType':PnlCell}
d['longPos'] = {'chinese':u'多仓', 'cellType':BasicCell}
d['shortPos'] = {'chinese':u'空仓', 'cellType':BasicCell}
d['symbol'] = {'chinese':u'代码', 'cellType':BasicCell}
self.setHeaderDict(d)
self.setDataKey('name')
self.setEventType(EVENT_SPREADTRADING_POS)
self.setFont(BASIC_FONT)
self.initTable()
self.registerEvent()
########################################################################
class StLogMonitor(QtWidgets.QTextEdit):
"""价差日志监控"""
signal = QtCore.pyqtSignal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine, parent=None):
"""Constructor"""
super(StLogMonitor, self).__init__(parent)
self.eventEngine = eventEngine
self.registerEvent()
#----------------------------------------------------------------------
def processLogEvent(self, event):
"""处理日志事件"""
log = event.dict_['data']
content = '%s:%s' %(log.logTime, log.logContent)
self.append(content)
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.signal.connect(self.processLogEvent)
self.eventEngine.register(EVENT_SPREADTRADING_LOG, self.signal.emit)
########################################################################
class StAlgoLogMonitor(BasicMonitor):
"""价差日志监控"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine, parent=None):
"""Constructor"""
super(StAlgoLogMonitor, self).__init__(mainEngine, eventEngine, parent)
d = OrderedDict()
d['logTime'] = {'chinese':u'时间', 'cellType':BasicCell}
d['logContent'] = {'chinese':u'信息', 'cellType':BasicCell}
self.setHeaderDict(d)
self.setEventType(EVENT_SPREADTRADING_ALGOLOG)
self.setFont(BASIC_FONT)
self.initTable()
self.registerEvent()
########################################################################
class StBuyPriceSpinBox(QtWidgets.QDoubleSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, price, parent=None):
"""Constructor"""
super(StBuyPriceSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setDecimals(4)
self.setRange(-10000, 10000)
self.setValue(price)
self.valueChanged.connect(self.setPrice)
#----------------------------------------------------------------------
def setPrice(self, value):
"""设置价格"""
self.algoEngine.setAlgoBuyPrice(self.spreadName, value)
########################################################################
class StSellPriceSpinBox(QtWidgets.QDoubleSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, price, parent=None):
"""Constructor"""
super(StSellPriceSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setDecimals(4)
self.setRange(-10000, 10000)
self.setValue(price)
self.valueChanged.connect(self.setPrice)
#----------------------------------------------------------------------
def setPrice(self, value):
"""设置价格"""
self.algoEngine.setAlgoSellPrice(self.spreadName, value)
########################################################################
class StShortPriceSpinBox(QtWidgets.QDoubleSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, price, parent=None):
"""Constructor"""
super(StShortPriceSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setDecimals(4)
self.setRange(-10000, 10000)
self.setValue(price)
self.valueChanged.connect(self.setPrice)
#----------------------------------------------------------------------
def setPrice(self, value):
"""设置价格"""
self.algoEngine.setAlgoShortPrice(self.spreadName, value)
########################################################################
class StCoverPriceSpinBox(QtWidgets.QDoubleSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, price, parent=None):
"""Constructor"""
super(StCoverPriceSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setDecimals(4)
self.setRange(-10000, 10000)
self.setValue(price)
self.valueChanged.connect(self.setPrice)
#----------------------------------------------------------------------
def setPrice(self, value):
"""设置价格"""
self.algoEngine.setAlgoCoverPrice(self.spreadName, value)
########################################################################
class StMaxPosSizeSpinBox(QtWidgets.QSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, size, parent=None):
"""Constructor"""
super(StMaxPosSizeSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setRange(-10000, 10000)
self.setValue(size)
self.valueChanged.connect(self.setSize)
#----------------------------------------------------------------------
def setSize(self, size):
"""设置价格"""
self.algoEngine.setAlgoMaxPosSize(self.spreadName, size)
########################################################################
class StMaxOrderSizeSpinBox(QtWidgets.QSpinBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, size, parent=None):
"""Constructor"""
super(StMaxOrderSizeSpinBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.setRange(-10000, 10000)
self.setValue(size)
self.valueChanged.connect(self.setSize)
#----------------------------------------------------------------------
def setSize(self, size):
"""设置价格"""
self.algoEngine.setAlgoMaxOrderSize(self.spreadName, size)
########################################################################
class StModeComboBox(QtWidgets.QComboBox):
""""""
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, mode, parent=None):
"""Constructor"""
super(StModeComboBox, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
l = [StAlgoTemplate.MODE_LONGSHORT,
StAlgoTemplate.MODE_LONGONLY,
StAlgoTemplate.MODE_SHORTONLY]
self.addItems(l)
self.setCurrentIndex(l.index(mode))
self.currentIndexChanged.connect(self.setMode)
#----------------------------------------------------------------------
def setMode(self):
"""设置模式"""
mode = unicode(self.currentText())
self.algoEngine.setAlgoMode(self.spreadName, mode)
#----------------------------------------------------------------------
def algoActiveChanged(self, active):
"""算法运行状态改变"""
# 只允许算法停止时修改运行模式
if active:
self.setEnabled(False)
else:
self.setEnabled(True)
########################################################################
class StActiveButton(QtWidgets.QPushButton):
""""""
signalActive = QtCore.pyqtSignal(bool)
#----------------------------------------------------------------------
def __init__(self, algoEngine, spreadName, parent=None):
"""Constructor"""
super(StActiveButton, self).__init__(parent)
self.algoEngine = algoEngine
self.spreadName = spreadName
self.active = False
self.setStopped()
self.clicked.connect(self.buttonClicked)
#----------------------------------------------------------------------
def buttonClicked(self):
"""改变运行模式"""
if self.active:
self.stop()
else:
self.start()
#----------------------------------------------------------------------
def stop(self):
"""停止"""
algoActive = self.algoEngine.stopAlgo(self.spreadName)
if not algoActive:
self.setStopped()
#----------------------------------------------------------------------
def start(self):
"""启动"""
algoActive = self.algoEngine.startAlgo(self.spreadName)
if algoActive:
self.setStarted()
#----------------------------------------------------------------------
def setStarted(self):
"""算法启动"""
self.setText(u'运行中')
self.setStyleSheet(STYLESHEET_START)
self.active = True
self.signalActive.emit(self.active)
#----------------------------------------------------------------------
def setStopped(self):
"""算法停止"""
self.setText(u'已停止')
self.setStyleSheet(STYLESHEET_STOP)
self.active = False
self.signalActive.emit(self.active)
########################################################################
class StAlgoManager(QtWidgets.QTableWidget):
"""价差算法管理组件"""
#----------------------------------------------------------------------
def __init__(self, stEngine, parent=None):
"""Constructor"""
super(StAlgoManager, self).__init__(parent)
self.algoEngine = stEngine.algoEngine
self.buttonActiveDict = {} # spreadName: buttonActive
self.initUi()
#----------------------------------------------------------------------
def initUi(self):
"""初始化表格"""
headers = [u'价差',
u'算法',
'BuyPrice',
'SellPrice',
'CoverPrice',
'ShortPrice',
u'委托上限',
u'持仓上限',
u'模式',
u'状态']
self.setColumnCount(len(headers))
self.setHorizontalHeaderLabels(headers)
self.horizontalHeader().setResizeMode(QtWidgets.QHeaderView.Stretch)
self.verticalHeader().setVisible(False)
self.setEditTriggers(self.NoEditTriggers)
#----------------------------------------------------------------------
def initCells(self):
"""初始化单元格"""
algoEngine = self.algoEngine
l = self.algoEngine.getAllAlgoParams()
self.setRowCount(len(l))
for row, d in enumerate(l):
cellSpreadName = QtWidgets.QTableWidgetItem(d['spreadName'])
cellAlgoName = QtWidgets.QTableWidgetItem(d['algoName'])
spinBuyPrice = StBuyPriceSpinBox(algoEngine, d['spreadName'], d['buyPrice'])
spinSellPrice = StSellPriceSpinBox(algoEngine, d['spreadName'], d['sellPrice'])
spinShortPrice = StShortPriceSpinBox(algoEngine, d['spreadName'], d['shortPrice'])
spinCoverPrice = StCoverPriceSpinBox(algoEngine, d['spreadName'], d['coverPrice'])
spinMaxOrderSize = StMaxOrderSizeSpinBox(algoEngine, d['spreadName'], d['maxOrderSize'])
spinMaxPosSize = StMaxPosSizeSpinBox(algoEngine, d['spreadName'], d['maxPosSize'])
comboMode = StModeComboBox(algoEngine, d['spreadName'], d['mode'])
buttonActive = StActiveButton(algoEngine, d['spreadName'])
self.setItem(row, 0, cellSpreadName)
self.setItem(row, 1, cellAlgoName)
self.setCellWidget(row, 2, spinBuyPrice)
self.setCellWidget(row, 3, spinSellPrice)
self.setCellWidget(row, 4, spinCoverPrice)
self.setCellWidget(row, 5, spinShortPrice)
self.setCellWidget(row, 6, spinMaxOrderSize)
self.setCellWidget(row, 7, spinMaxPosSize)
self.setCellWidget(row, 8, comboMode)
self.setCellWidget(row, 9, buttonActive)
buttonActive.signalActive.connect(comboMode.algoActiveChanged)
self.buttonActiveDict[d['spreadName']] = buttonActive
#----------------------------------------------------------------------
def stopAll(self):
"""停止所有算法"""
for button in self.buttonActiveDict.values():
button.stop()
########################################################################
class StGroup(QtWidgets.QGroupBox):
"""集合显示"""
#----------------------------------------------------------------------
def __init__(self, widget, title, parent=None):
"""Constructor"""
super(StGroup, self).__init__(parent)
self.setTitle(title)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(widget)
self.setLayout(vbox)
########################################################################
class StManager(QtWidgets.QWidget):
""""""
#----------------------------------------------------------------------
def __init__(self, stEngine, eventEngine, parent=None):
"""Constructor"""
super(StManager, self).__init__(parent)
self.stEngine = stEngine
self.mainEngine = stEngine.mainEngine
self.eventEngine = eventEngine
self.initUi()
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle(u'价差交易')
# 创建组件
tickMonitor = StTickMonitor(self.mainEngine, self.eventEngine)
posMonitor = StPosMonitor(self.mainEngine, self.eventEngine)
logMonitor = StLogMonitor(self.mainEngine, self.eventEngine)
self.algoManager = StAlgoManager(self.stEngine)
algoLogMonitor = StAlgoLogMonitor(self.mainEngine, self.eventEngine)
# 创建按钮
buttonInit = QtWidgets.QPushButton(u'初始化')
buttonInit.clicked.connect(self.init)
buttonStopAll = QtWidgets.QPushButton(u'全部停止')
buttonStopAll.clicked.connect(self.algoManager.stopAll)
# 创建集合
groupTick = StGroup(tickMonitor, u'价差行情')
groupPos = StGroup(posMonitor, u'价差持仓')
groupLog = StGroup(logMonitor, u'日志信息')
groupAlgo = StGroup(self.algoManager, u'价差算法')
groupAlgoLog = StGroup(algoLogMonitor, u'算法信息')
# 设置布局
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(buttonInit)
hbox.addStretch()
hbox.addWidget(buttonStopAll)
grid = QtWidgets.QGridLayout()
grid.addLayout(hbox, 0, 0, 1, 2)
grid.addWidget(groupTick, 1, 0)
grid.addWidget(groupPos, 1, 1)
grid.addWidget(groupAlgo, 2, 0, 1, 2)
grid.addWidget(groupLog, 3, 0)
grid.addWidget(groupAlgoLog, 3, 1)
self.setLayout(grid)
#----------------------------------------------------------------------
def show(self):
"""重载显示"""
self.showMaximized()
#----------------------------------------------------------------------
def init(self):
"""初始化"""
self.stEngine.init()
self.algoManager.initCells()

View File

@ -91,8 +91,6 @@ class CtpGateway(VtGateway):
self.tdConnected = False # 交易API连接状态
self.qryEnabled = False # 循环查询
self.requireAuthentication = False
#----------------------------------------------------------------------
def connect(self):
@ -331,10 +329,6 @@ class CtpMdApi(MdApi):
#----------------------------------------------------------------------
def onRtnDepthMarketData(self, data):
"""行情推送"""
# 忽略成交量为0的无效tick数据
if not data['Volume']:
return
# 创建对象
tick = VtTickData()
tick.gatewayName = self.gatewayName
@ -712,7 +706,7 @@ class CtpTdApi(TdApi):
pos.positionProfit += data['PositionProfit']
# 计算持仓均价
if pos.position:
if pos.position and pos.symbol in self.symbolSizeDict:
size = self.symbolSizeDict[pos.symbol]
pos.price = (cost + data['PositionCost']) / (pos.position * size)

View File

@ -11,6 +11,7 @@ vn.okcoin的gateway接入
import os
import json
from datetime import datetime
from time import sleep
from copy import copy
from threading import Condition
from Queue import Queue

View File

@ -26,6 +26,9 @@ exchangeMap[EXCHANGE_CZCE] = 'CZC'
exchangeMap[EXCHANGE_UNKNOWN] = ''
exchangeMapReverse = {v:k for k,v in exchangeMap.items()}
# Wind接口相关事件
EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件
########################################################################
class WindGateway(VtGateway):
@ -82,6 +85,9 @@ class WindGateway(VtGateway):
# 而vt中的tick是完整更新因此需要本地维护一个所有字段的快照
self.tickDict = {}
# 订阅请求缓存
self.subscribeBufferDict = {}
self.registerEvent()
#----------------------------------------------------------------------
@ -97,8 +103,14 @@ class WindGateway(VtGateway):
def subscribe(self, subscribeReq):
"""订阅行情"""
windSymbol = '.'.join([subscribeReq.symbol, exchangeMap[subscribeReq.exchange]])
data = self.w.wsq(windSymbol, self.wsqParam, func=self.wsqCallBack)
# 若已经连接则直接订阅
if self.connected:
data = self.w.wsq(windSymbol, self.wsqParam, func=self.wsqCallBack)
# 否则缓存在字典中
else:
self.subscribeBufferDict[windSymbol] = subscribeReq
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
@ -157,7 +169,7 @@ class WindGateway(VtGateway):
self.tickDict[windSymbol] = tick
dt = data.Times[0]
tick.time = dt.strftime('%H:%M:%S')
tick.time = dt.strftime('%H:%M:%S.%f')
tick.date = dt.strftime('%Y%m%d')
# 采用遍历的形式读取数值
@ -183,6 +195,13 @@ class WindGateway(VtGateway):
if not result.ErrorCode:
log.logContent = u'Wind接口连接成功'
self.connected = True
# 发出缓存的订阅请求
for req in self.subscribeBufferDict.values():
self.subscribe(req)
self.subscribeBufferDict.clear()
else:
log.logContent = u'Wind接口连接失败错误代码%d' %result.ErrorCode
self.onLog(log)

View File

@ -1,6 +1,7 @@
# encoding: UTF-8
import psutil
import traceback
from vnpy.trader.vtFunction import loadIconPath
from vnpy.trader.vtGlobal import globalSetting
@ -257,18 +258,25 @@ class MainWindow(QtWidgets.QMainWindow):
#----------------------------------------------------------------------
def loadWindowSettings(self, settingName):
"""载入窗口设置"""
settings = QtCore.QSettings('vn.trader', settingName)
# 这里由于PyQt4的版本不同settings.value('state')调用返回的结果可能是:
# 1. None初次调用注册表里无相应记录因此为空
# 2. QByteArray比较新的PyQt4
# 3. QVariant以下代码正确执行所需的返回结果
# 所以为了兼容考虑这里加了一个try...except如果是1、2的情况就pass
# 可能导致主界面的设置无法载入(每次退出时的保存其实是成功了)
try:
self.restoreState(settings.value('state').toByteArray())
self.restoreGeometry(settings.value('geometry').toByteArray())
except AttributeError:
pass
settings = QtCore.QSettings('vn.trader', settingName)
state = settings.value('state')
geometry = settings.value('geometry')
# 尚未初始化
if state is None:
return
# 老版PyQt
elif isinstance(state, QtCore.QVariant):
self.restoreState(state.toByteArray())
self.restoreGeometry(geometry.toByteArray())
# 新版PyQt
elif isinstance(state, QtCore.QByteArray):
self.restoreState(state)
self.restoreGeometry(geometry)
# 异常
else:
content = u'载入窗口配置异常,请检查'
self.mainEngine.writeLog(content)
#----------------------------------------------------------------------
def restoreWindow(self):

View File

@ -5,7 +5,7 @@ import shelve
from collections import OrderedDict
from datetime import datetime
from pymongo import MongoClient
from pymongo import MongoClient, ASCENDING
from pymongo.errors import ConnectionFailure
from vnpy.event import Event
@ -209,12 +209,17 @@ class MainEngine(object):
self.writeLog(text.DATA_INSERT_FAILED)
#----------------------------------------------------------------------
def dbQuery(self, dbName, collectionName, d):
def dbQuery(self, dbName, collectionName, d, sortKey='', sortDirection=ASCENDING):
"""从MongoDB中读取数据d是查询要求返回的是数据库查询的指针"""
if self.dbClient:
db = self.dbClient[dbName]
collection = db[collectionName]
cursor = collection.find(d)
if sortKey:
cursor = collection.find(d).sort(sortKey, sortDirection) # 对查询出来的数据进行排序
else:
cursor = collection.find(d)
if cursor:
return list(cursor)
else:

View File

@ -17,14 +17,4 @@ EVENT_ORDER = 'eOrder.' # 报单回报事件
EVENT_POSITION = 'ePosition.' # 持仓回报事件
EVENT_ACCOUNT = 'eAccount.' # 账户回报事件
EVENT_CONTRACT = 'eContract.' # 合约基础信息回报事件
EVENT_ERROR = 'eError.' # 错误回报事件
# CTA模块相关
EVENT_CTA_LOG = 'eCtaLog' # CTA相关的日志事件
EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件
# 行情记录模块相关
EVENT_DATARECORDER_LOG = 'eDataRecorderLog' # 行情记录日志更新事件
# Wind接口相关
EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件
EVENT_ERROR = 'eError.' # 错误回报事件

View File

@ -64,5 +64,20 @@ def getTempPath(name):
path = os.path.join(tempPath, name)
return path
#----------------------------------------------------------------------
def getJsonPath(name, moduleFile):
"""
获取JSON配置文件的路径
1. 优先从当前工作目录查找JSON文件
2. 若无法找到则前往模块所在目录查找
"""
currentFolder = os.getcwd()
currentJsonPath = os.path.join(currentFolder, name)
if os.path.isfile(currentJsonPath):
return currentJsonPath
moduleFolder = os.path.abspath(os.path.dirname(moduleFile))
moduleJsonPath = os.path.join(moduleFolder, '.', name)
return moduleJsonPath