comment vnpy

This commit is contained in:
msincenselee 2015-11-15 15:53:54 +08:00
parent a69e8836d0
commit bdd35fb9d5
20 changed files with 747 additions and 577 deletions

2
.gitignore vendored
View File

@ -71,3 +71,5 @@ vn.strategy/strategydemo/strategy01.py
vn.strategy/strategydemo/strategy02.py
vn.trader/CTP_connect.json
*.vt
vn.trader/CTP_connect.json
vn.trader/CTP_connect.json

View File

@ -1,7 +1,8 @@
{
"brokerID": "8070",
"tdAddress": "tcp://180.168.214.246:41213",
"password": "154815",
"mdAddress": "tcp://180.168.214.246:41205",
"userID": "887733"
"brokerID":"9999",
"tdAddress":"tcp://180.168.146.187:10000",
"password":"jiajia",
"mdAddress":"tcp://180.168.146.187:10010",
"userID":"033513"
}

Binary file not shown.

View File

@ -107,6 +107,8 @@ class CtaTickData(object):
########################################################################
class CtaEngine(object):
"""CTA策略引擎"""
# 策略配置文件
settingFileName = 'CTA_setting.json'
# ----------------------------------------------------------------------
@ -138,28 +140,30 @@ class CtaEngine(object):
self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除
self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除
# 注册事件监听
self.registerEvent()
# ----------------------------------------------------------------------
def sendOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发单"""
# 1.查询合约对象
contract = self.dataEngine.getContract(vtSymbol)
# 2. 创建发单传入对象
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.price = price
req.volume = volume
req.symbol = contract.symbol # 合约代码
req.exchange = contract.exchange # 交易所
req.price = price # 价格
req.volume = volume # 数量
# 设计为CTA引擎发出的委托只允许使用限价单
req.priceType = PRICETYPE_LIMITPRICE
req.priceType = PRICETYPE_LIMITPRICE # 价格类型
# CTA委托类型映射
if orderType == CTAORDER_BUY:
req.direction = DIRECTION_LONG
req.offset = OFFSET_OPEN
req.direction = DIRECTION_LONG # 合约方向
req.offset = OFFSET_OPEN # 开/平
elif orderType == CTAORDER_SELL:
req.direction = DIRECTION_SHORT
req.offset = OFFSET_CLOSE
@ -170,19 +174,22 @@ class CtaEngine(object):
req.direction = DIRECTION_LONG
req.offset = OFFSET_CLOSE
# 3.调用主引擎的发单接口进行发单保存OrderID与策略映射关系
vtOrderID = self.mainEngine.sendOrder(req) # 发单
self.orderDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系
return vtOrderID
# ----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
# 查询报单对象
# 1.调用主引擎接口,查询委托单对象
order = self.dataEngine.getOrder(vtOrderID)
# 如果查询成功
if order:
# 检查是否报单还有效,只有有效时才发出撤单指令
# 2.检查是否报单(委托单)还有效,只有有效时才发出撤单指令
orderFinished = (order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED)
if not orderFinished:
req = VtCancelOrderReq()
@ -192,73 +199,103 @@ class CtaEngine(object):
req.sessionID = order.sessionID
req.orderID = order.orderID
self.mainEngine.cancelOrder(req, order.gatewayName)
else:
if order.status == STATUS_ALLTRADED:
self.writeCtaLog(u'委托单({0}已执行,无法撤销'.format(vtOrderID))
if order.status == STATUS_CANCELLED:
self.writeCtaLog(u'委托单({0}已撤销,无法再次撤销'.format(vtOrderID))
# 查询不成功
else:
self.writeCtaLog(u'委托单({0}不存在'.format(vtOrderID))
# ----------------------------------------------------------------------
def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发停止单(本地实现)"""
# 1.生成本地停止单ID
self.stopOrderCount += 1
stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount)
# 2.创建停止单对象
so = StopOrder()
so.vtSymbol = vtSymbol
so.orderType = orderType
so.price = price
so.volume = volume
so.strategy = strategy
so.stopOrderID = stopOrderID
so.status = STOPORDER_WAITING
so.vtSymbol = vtSymbol # 代码
so.orderType = orderType # 停止单类型
so.price = price # 价格
so.volume = volume # 数量
so.strategy = strategy # 来源策略
so.stopOrderID = stopOrderID # Id
so.status = STOPORDER_WAITING # 状态
# 保存stopOrder对象到字典中
self.stopOrderDict[stopOrderID] = so
self.workingStopOrderDict[stopOrderID] = so
# 3.保存stopOrder对象到字典中
self.stopOrderDict[stopOrderID] = so # 字典中不会删除
self.workingStopOrderDict[stopOrderID] = so # 字典中会删除
self.writeCtaLog(u'发停止单成功,'
u'Id:{0},Symbol:{1},Type:{2},Price:{3},Volume:{4}'
u'.'.format(stopOrderID, vtSymbol, orderType, price, volume))
return stopOrderID
# ----------------------------------------------------------------------
def cancelStopOrder(self, stopOrderID):
"""撤销停止单"""
# 检查停止单是否存在
# 1.检查停止单是否存在
if stopOrderID in self.workingStopOrderDict:
so = self.workingStopOrderDict[stopOrderID]
so.status = STOPORDER_CANCELLED
del self.workingStopOrderDict[stopOrderID]
so.status = STOPORDER_CANCELLED # STOPORDER_WAITING =》STOPORDER_CANCELLED
del self.workingStopOrderDict[stopOrderID] # 删除
self.writeCtaLog(u'撤销停止单:{0}成功.'.format(stopOrderID))
else:
self.writeCtaLog(u'撤销停止单:{0}失败不存在Id.'.format(stopOrderID))
# ----------------------------------------------------------------------
def processStopOrder(self, tick):
"""收到行情后处理本地停止单(检查是否要立即发出)"""
vtSymbol = tick.vtSymbol
# 首先检查是否有策略交易该合约
# 1.首先检查是否有策略交易该合约
if vtSymbol in self.tickStrategyDict:
# 遍历等待中的停止单,检查是否会被触发
# 2.遍历等待中的停止单,检查是否会被触发
for so in self.workingStopOrderDict.values():
if so.vtSymbol == vtSymbol:
# 3. 触发标识判断
longTriggered = so.direction == DIRECTION_LONG and tick.lastPrice >= so.price # 多头停止单被触发
shortTriggered = so.direction == DIRECTION_SHORT and tick.lasatPrice <= so.price # 空头停止单被触发
# 4.触发处理
if longTriggered or shortTriggered:
# 买入和卖出分别以涨停跌停价发单(模拟市价单)
# 5.设定价格,买入和卖出分别以涨停跌停价发单(模拟市价单)
if so.direction == DIRECTION_LONG:
price = tick.upperLimit
else:
price = tick.lowerLimit
# 6.更新停止单状态,触发
so.status = STOPORDER_TRIGGERED
# 7.发单
self.sendOrder(so.vtSymbol, so.orderType, price, so.volume, so.strategy)
# 8.删除停止单
del self.workingStopOrderDict[so.stopOrderID]
# ----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情推送"""
"""处理行情推送事件"""
# 1. 获取事件的Tick数据
tick = event.dict_['data']
# 收到tick行情后先处理本地停止单检查是否要立即发出
# 2.收到tick行情后先处理本地停止单(检查是否要立即发出)
self.processStopOrder(tick)
# 推送tick到对应的策略对象进行处理
# 3.推送tick到对应的策略对象进行处理
if tick.vtSymbol in self.tickStrategyDict:
# 将vtTickData数据转化为ctaTickData
# 4.将vtTickData数据转化为ctaTickData
ctaTick = CtaTickData()
d = ctaTick.__dict__
for key in d.keys():
@ -266,10 +303,9 @@ class CtaEngine(object):
if key != 'datetime':
d[key] = tick.__getattribute__(key)
# 添加datetime字段
# 5.添加datetime字段
ctaTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# 逐个推送到策略对象中
l = self.tickStrategyDict[tick.vtSymbol]
for strategy in l:
strategy.onTick(tick)
@ -277,31 +313,47 @@ class CtaEngine(object):
# ----------------------------------------------------------------------
def processOrderEvent(self, event):
"""处理委托推送"""
"""处理委托推送事件"""
# 1.获取事件的Order数据
order = event.dict_['data']
# 2.判断order是否在映射字典中
if order.vtOrderID in self.orderStrategyDict:
# 3.提取对应的策略
strategy = self.orderStrategyDict[order.vtOrderID]
# 4.触发策略的委托推送事件方法
strategy.onOrder(order)
# ----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交推送"""
"""处理成交推送事件"""
# 1.获取事件的Trade数据
trade = event.dict_['data']
# 2.判断Trade是否在映射字典中
if trade.vtOrderID in self.orderStrategyDict:
strategy = self.orderStrategyDict[order.vtOrderID]
# 3.提取对应的策略
strategy = self.orderStrategyDict[trade.vtOrderID]
# 4.触发策略的成交推送事件。
strategy.onTrade(trade)
# ----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
# 注册行情数据推送Tick数据到达的响应事件
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
# 注册订单推送的响应事件
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
# 注册成交推送(交易)的响应时间
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
# ----------------------------------------------------------------------
@ -311,7 +363,8 @@ class CtaEngine(object):
# ----------------------------------------------------------------------
def loadBar(self, dbName, collectionName, startDate):
"""从数据库中读取Bar数据startDate是datetime对象"""
"""从数据库中读取Bar数据
startDate是datetime对象"""
d = {'datetime':{'$gte':startDate}}
cursor = self.mainEngine.dbQuery(dbName, collectionName, d)
@ -359,11 +412,11 @@ class CtaEngine(object):
# 防止策略重名
if name not in self.strategyDict:
# 创建策略对象
# 1.创建策略对象
strategy = strategyClass(self, name, paramDict)
self.strategyDict[name] = strategy
# 保存Tick映射关系
# 2.保存Tick映射关系symbol <==> Strategy[] )
if strategy.vtSymbol in self.tickStrategyDict:
l = self.tickStrategyDict[strategy.vtSymbol]
else:
@ -371,12 +424,15 @@ class CtaEngine(object):
self.tickStrategyDict[strategy.vtSymbol] = l
l.append(strategy)
# 订阅合约
# 3.订阅合约
contract = self.dataEngine.getContract(strategy.vtSymbol)
if contract:
# 4.构造订阅请求包
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
# 5.调用主引擎的订阅接口
self.mainEngine.subscribe(req, contract.gatewayName)
else:
@ -385,31 +441,46 @@ class CtaEngine(object):
# ---------------------------------------------------------------------
def startStrategy(self, name):
"""启动策略"""
# 1.判断策略名称是否存在字典中
if name in self.strategyDict:
# 2.提取策略
strategy = self.strategyDict[name]
# 3.判断策略是否运行
if not strategy.trading:
# 4.设置运行状态
strategy.trading = True
# 5.启动策略
strategy.start()
else:
self.writeCtaLog(u'策略对象不存在:' + name)
# ----------------------------------------------------------------------
def stopStrategy(self, name):
"""停止策略"""
"""停止策略运行"""
# 1.判断策略名称是否存在字典中
if name in self.strategyDict:
# 2.提取策略
strategy = self.strategyDict[name]
# 3.停止交易
if strategy.trading:
# 4.设置交易状态为False
strategy.trading = False
# 5.调用策略的停止方法
strategy.stop()
# 对该策略发出的所有限价单进行撤单
# 6.对该策略发出的所有限价单进行撤单
for vtOrderID, s in self.orderStrategyDict.items():
if s is strategy:
self.cancelOrder(vtOrderID)
# 对该策略发出的所有本地停止单撤单
# 7.对该策略发出的所有本地停止单撤单
for stopOrderID, so in self.workingStopOrderDict.items():
if so.strategy is strategy:
self.cancelStopOrder(stopOrderID)
@ -422,27 +493,45 @@ class CtaEngine(object):
with open(self.settingFileName, 'w') as f:
d = {}
# 逐一循环name策略实例名称strategy策略
for name, strategy in self.strategyDict.items():
# 配置串
setting = {}
# 策略的类名称
setting['strategyClassName'] = strategy.strategyClassName
# 策略的参数
for param in strategy.paramList:
setting[param] = strategy.__getattribute__(param)
# 策略实例名,保存配置
d[name] = setting
# Json对象=》Json字符串
jsonD = json.dumps(d, indent=4)
# 保存文件
f.write(jsonD)
# ----------------------------------------------------------------------
def loadStrategySetting(self):
"""读取引擎中的策略配置"""
with open(self.settingFileName) as f:
# 读取文件内容串=》Json对象
d = json.load(f)
# 策略实例名称,配置内容
for name, setting in d.items():
# 策略的类名称
strategyClassName = setting['strategyClassName']
if strategyClassName in strategyClassDict:
# 透过策略类字典,反射获取策略
strategyClass = strategyClassDict[strategyClassName]
# 初始化策略的设置
self.initStrategy(name, strategyClass, setting)
else:
self.writeCtaLog(u'无法找到策略类:' + strategyClassName)
@ -450,14 +539,21 @@ class CtaEngine(object):
# ----------------------------------------------------------------------
def getStrategyVar(self, name):
"""获取策略当前的变量字典"""
if name in self.strategyDict:
# 获取策略实例
strategy = self.strategyDict[name]
# 策略实例的所有属性
d = strategy.__dict__
# 变量字典
varDict = OrderedDict()
# 策略中的变量名称列表
for key in strategy.varList:
# 如果匹配就增加
if key in d:
varDict[key] = d[key]
@ -470,12 +566,19 @@ class CtaEngine(object):
def getStrategyParam(self, name):
"""获取策略的参数字典"""
if name in self.strategyDict:
# 获取策略实例
strategy = self.strategyDict[name]
# 策略实例的所有属性
d = strategy.__dict__
# 参数字典
paramDict = OrderedDict()
# 策略内的参数名称列表
for key in strategy.paramList:
# 匹配的就添加
if key in d:
paramDict[key] = d[key]

View File

@ -2,6 +2,7 @@
'''
在本文件中引入所有希望在系统中使用的策略类
当作配置文件每次增加删除策略时需要维护此字典
'''
from ctaStrategyTemplate import TestStrategy

View File

@ -499,6 +499,7 @@ class CtpTdApi(TdApi):
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
# ----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
@ -766,7 +767,9 @@ class CtpTdApi(TdApi):
# ----------------------------------------------------------------------
def onRspQryInvestorPositionDetail(self, data, error, n, last):
""""""
"""投资者持仓详细查询应答。
当客户端发出投资者持仓查询指令后后交易托管系统返回响应时该方法会被调用
"""
pass
# ----------------------------------------------------------------------

View File

@ -63,7 +63,9 @@ class ValueMonitor(QtGui.QTableWidget):
########################################################################
class CtaStrategyManager(QtGui.QGroupBox):
"""策略管理组件"""
"""策略管理组件
即策略的UI提供了启动停止查看参数参看变量等显示信息
"""
# ----------------------------------------------------------------------
def __init__(self, ctaEngine, eventEngine, name, parent=None):
@ -110,10 +112,13 @@ class CtaStrategyManager(QtGui.QGroupBox):
# ----------------------------------------------------------------------
def updateMonitor(self, event=None):
"""显示策略最新状态"""
# 调用CTA策略引擎,获取策略的所有参数的值依赖策略的ParamList配置
paramDict = self.ctaEngine.getStrategyParam(self.name)
if paramDict:
self.paramMonitor.updateData(paramDict)
# 调用CTA策略引擎,获取策略的所有变量的值依赖策略的varList配置
varDict = self.ctaEngine.getStrategyVar(self.name)
if varDict:
self.varMonitor.updateData(varDict)
@ -121,6 +126,7 @@ class CtaStrategyManager(QtGui.QGroupBox):
# ----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
# 注册定时器,定时更新策略最新状态
self.eventEngine.register(EVENT_TIMER, self.updateMonitor)
# ----------------------------------------------------------------------
@ -138,6 +144,8 @@ class CtaStrategyManager(QtGui.QGroupBox):
########################################################################
class CtaEngineManager(QtGui.QWidget):
"""CTA引擎管理组件"""
# 定义信号对象为Event的类型
signal = QtCore.pyqtSignal(type(Event()))
# ----------------------------------------------------------------------
@ -193,14 +201,21 @@ class CtaEngineManager(QtGui.QWidget):
# ----------------------------------------------------------------------
def initStrategyManager(self):
"""初始化策略管理组件界面"""
# 构建组件w
w = QtGui.QWidget()
# 构建Layout层
hbox = QtGui.QHBoxLayout()
# 将策略UI组件逐一添加Layout层
for name in self.ctaEngine.strategyDict.keys():
strategyManager = CtaStrategyManager(self.ctaEngine, self.eventEngine, name)
hbox.addWidget(strategyManager)
# w绑定Layout层
w.setLayout(hbox)
# 滚动区域设置为组件w
self.scrollArea.setWidget(w)
# ----------------------------------------------------------------------
@ -219,9 +234,15 @@ class CtaEngineManager(QtGui.QWidget):
def load(self):
"""加载策略"""
if not self.strategyLoaded:
# 调用策略引擎,加载所有策略设置
self.ctaEngine.loadStrategySetting()
# 初始化所有策略的UI组件
self.initStrategyManager()
self.strategyLoaded = True
self.ctaEngine.writeCtaLog(u'策略加载成功')
# ----------------------------------------------------------------------
@ -234,6 +255,9 @@ class CtaEngineManager(QtGui.QWidget):
# ----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
# 定义信号绑定方法
self.signal.connect(self.updateCtaLog)
# 注册EVENT_CAT_LOG类型的事件触发方法为信号的焕发emit
self.eventEngine.register(EVENT_CTA_LOG, self.signal.emit)

View File

@ -68,9 +68,19 @@ class MainWindow(QtGui.QMainWindow):
# ----------------------------------------------------------------------
def initMenu(self):
"""初始化菜单"""
# 创建操作
connectCtpAction = QtGui.QAction(u'连接CTP', self)
connectCtpAction.triggered.connect(self.connectCtp)
connectCtpProdAction = QtGui.QAction(u'连接生产CTP', self)
connectCtpProdAction.triggered.connect(self.connectCtpProd)
connectCtpPostAction = QtGui.QAction(u'连接盘后CTP', self)
connectCtpPostAction.triggered.connect(self.connectCtpPost)
connectCtpTestAction = QtGui.QAction(u'连接测试CTP', self)
connectCtpTestAction.triggered.connect(self.connectCtpTest)
#connectCtpAction = QtGui.QAction(u'连接CTP', self)
#connectCtpAction.triggered.connect(self.connectCtp)
connectLtsAction = QtGui.QAction(u'连接LTS', self)
connectLtsAction.triggered.connect(self.connectLts)
@ -104,7 +114,11 @@ class MainWindow(QtGui.QMainWindow):
menubar = self.menuBar()
sysMenu = menubar.addMenu(u'系统')
sysMenu.addAction(connectCtpAction)
sysMenu.addAction(connectCtpProdAction)
sysMenu.addAction(connectCtpPostAction)
sysMenu.addAction(connectCtpTestAction)
#sysMenu.addAction(connectCtpAction)
sysMenu.addAction(connectLtsAction)
sysMenu.addAction(connectWindAction)
@ -151,6 +165,19 @@ class MainWindow(QtGui.QMainWindow):
return u'CPU使用率%d%% 内存使用率:%d%%' % (cpuPercent, memoryPercent)
# ----------------------------------------------------------------------
def connectCtpProd(self):
"""连接生产环境CTP接口"""
self.mainEngine.connect('CTP_Prod')
# ----------------------------------------------------------------------
def connectCtpPost(self):
"""连接盘后CTP接口"""
self.mainEngine.connect('CTP_Post')
def connectCtpTest(self):
"""连接测试环境CTP接口"""
self.mainEngine.connect('CTP_Test')
def connectCtp(self):
"""连接CTP接口"""
self.mainEngine.connect('CTP')

View File

@ -38,6 +38,15 @@ class MainEngine(object):
self.addGateway(CtpGateway, 'CTP')
self.gatewayDict['CTP'].setQryEnabled(True)
self.addGateway(CtpGateway, 'CTP_Prod')
self.gatewayDict['CTP_Prod'].setQryEnabled(True)
self.addGateway(CtpGateway, 'CTP_Post')
self.gatewayDict['CTP_Post'].setQryEnabled(True)
self.addGateway(CtpGateway, 'CTP_Test')
self.gatewayDict['CTP_Test'].setQryEnabled(True)
self.addGateway(LtsGateway, 'LTS')
self.gatewayDict['LTS'].setQryEnabled(True)
@ -240,7 +249,7 @@ class DataEngine(object):
# ----------------------------------------------------------------------
def getOrder(self, vtOrderID):
"""查询委托"""
"""查询委托单(报单)"""
try:
return self.orderDict[vtOrderID]
except KeyError: