初步完成价差交易模块的测试,主引擎增加数据库加载数据时的排序功能

This commit is contained in:
vn.py 2017-07-03 16:15:29 +08:00
parent 9eee8ea166
commit 302b6f75d7
11 changed files with 130 additions and 48 deletions

View File

@ -54,11 +54,11 @@ def runParentProcess():
"""父进程运行函数"""
printLog(u'启动行情记录守护父进程')
DAY_START = time(8, 45) # 日盘启动和停止时间
DAY_END = time(15, 30)
DAY_START = time(8, 57) # 日盘启动和停止时间
DAY_END = time(15, 18)
NIGHT_START = time(20, 45) # 夜盘启动和停止时间
NIGHT_END = time(2, 45)
NIGHT_START = time(20, 57) # 夜盘启动和停止时间
NIGHT_END = time(2, 33)
p = None # 子进程句柄

View File

@ -145,4 +145,3 @@ class ShcifcoApi(object):
return barList

View File

@ -139,7 +139,7 @@ class BacktestingEngine(object):
# 载入初始化需要用的数据
flt = {'datetime':{'$gte':self.dataStartDate,
'$lt':self.strategyStartDate}}
initCursor = collection.find(flt)
initCursor = collection.find(flt).sort('datetime')
# 将数据从查询指针中读取出,并生成列表
self.initData = [] # 清空initData列表
@ -154,7 +154,7 @@ class BacktestingEngine(object):
else:
flt = {'datetime':{'$gte':self.strategyStartDate,
'$lte':self.dataEndDate}}
self.dbCursor = collection.find(flt)
self.dbCursor = collection.find(flt).sort('datetime')
self.output(u'载入完成,数据量:%s' %(initCursor.count() + self.dbCursor.count()))

View File

@ -337,7 +337,7 @@ class CtaEngine(object):
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
barData = self.mainEngine.dbQuery(dbName, collectionName, d)
barData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in barData:
@ -352,7 +352,7 @@ class CtaEngine(object):
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
tickData = self.mainEngine.dbQuery(dbName, collectionName, d)
tickData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in tickData:

View File

@ -2,8 +2,8 @@
"orderFlowClear": 1,
"orderCancelLimit": 10,
"workingOrderLimit": 20,
"tradeLimit": 100,
"orderSizeLimit": 10,
"tradeLimit": 1000,
"orderSizeLimit": 100,
"active": true,
"orderFlowLimit": 50
}

View File

@ -176,6 +176,7 @@ class SniperAlgo(StAlgoTemplate):
self.hedgingTaskDict = {} # 被动腿需要对冲的数量字典 vtSymbol:volume
self.legOrderDict = {} # vtSymbol: list of vtOrderID
self.orderTradedDict = {} # vtOrderID: tradedVolume
#----------------------------------------------------------------------
def updateSpreadTick(self, spread):
@ -229,13 +230,7 @@ class SniperAlgo(StAlgoTemplate):
#----------------------------------------------------------------------
def updateTrade(self, trade):
"""成交更新"""
if not self.active:
return
if trade.vtSymbol == self.activeVtSymbol:
self.newActiveLegTrade(trade)
else:
self.newPassiveLegTrade(trade)
pass
#----------------------------------------------------------------------
def updateOrder(self, order):
@ -243,7 +238,22 @@ class SniperAlgo(StAlgoTemplate):
if not self.active:
return
# 只处理完成委托
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
newTradedVolume = order.tradedVolume
lastTradedVolume = self.orderTradedDict.get(vtOrderID, 0)
# 检查是否有新的成交
if newTradedVolume > lastTradedVolume:
self.orderTradedDict[vtOrderID] = newTradedVolume # 缓存委托已经成交数量
volume = newTradedVolume - lastTradedVolume # 计算本次成交数量
if vtSymbol == self.activeVtSymbol:
self.newActiveLegTrade(vtSymbol, order.direction, volume)
else:
self.newPassiveLegTrade(vtSymbol, order.direction, volume)
# 处理完成委托
if order.status in self.FINISHED_STATUS:
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
@ -257,7 +267,6 @@ class SniperAlgo(StAlgoTemplate):
# 检查若是被动腿,且已经没有未完成委托,则执行对冲
if not orderList and vtSymbol in self.passiveVtSymbols:
self.hedgePassiveLeg(vtSymbol)
self.writeLog(u'发出新的被动腿%s对冲单' %vtSymbol)
#----------------------------------------------------------------------
def updateTimer(self):
@ -283,7 +292,33 @@ class SniperAlgo(StAlgoTemplate):
#----------------------------------------------------------------------
def start(self):
"""启动"""
if not self.active:
# 如果已经运行则直接返回状态
if self.active:
return self.active
# 做多检查
if self.mode != self.MODE_SHORTONLY:
if self.buyPrice >= self.sellPrice:
self.writeLog(u'启动失败允许多头交易时BuyPrice必须小于SellPrice')
return self.active
# 做空检查
if self.mode != self.MODE_LONGONLY:
if self.shortPrice <= self.coverPrice:
self.writeLog(u'启动失败允许空头交易时ShortPrice必须大于CoverPrice')
return self.active
# 多空检查
if self.mode == self.MODE_LONGSHORT:
if self.buyPrice >= self.coverPrice:
self.writeLog(u'启动失败允许双向交易时BuyPrice必须小于CoverPrice')
return self.active
if self.shortPrice <= self.sellPrice:
self.writeLog(u'启动失败允许双向交易时ShortPrice必须大于SellPrice')
return self.active
# 启动算法
self.quoteCount = 0
self.hedgeCount = 0
@ -339,6 +374,7 @@ class SniperAlgo(StAlgoTemplate):
"""发出主动腿"""
spread = self.spread
# 首先计算不带正负号的价差委托量
if direction == self.SPREAD_LONG:
spreadVolume = min(spread.askVolume,
self.maxPosSize - spread.netPos,
@ -359,9 +395,15 @@ class SniperAlgo(StAlgoTemplate):
if spreadVolume <= 0:
return
# 加上价差方向
if direction == self.SPREAD_SHORT:
spreadVolume = -spreadVolume
# 计算主动腿委托量
leg = self.legDict[self.activeVtSymbol]
legVolume = spreadVolume * leg.ratio
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的主动腿%s狙击单' %self.activeVtSymbol)
self.quoteCount = 0 # 重置主动腿报价撤单等待计数
@ -370,11 +412,16 @@ class SniperAlgo(StAlgoTemplate):
"""被动腿对冲"""
if vtSymbol not in self.hedgingTaskDict:
return
legVolume = self.hedgingTaskDict[vtSymbol]
orderList = self.legOrderDict.get(vtSymbol, [])
if orderList:
return
legVolume = self.hedgingTaskDict[vtSymbol]
leg = self.legDict[vtSymbol]
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的被动腿%s对冲单' %vtSymbol)
#----------------------------------------------------------------------
def hedgeAllPassiveLegs(self):
@ -385,13 +432,19 @@ class SniperAlgo(StAlgoTemplate):
self.hedgeCount = 0 # 重置被动腿对冲撤单等待计数
#----------------------------------------------------------------------
def newActiveLegTrade(self, trade):
def newActiveLegTrade(self, vtSymbol, direction, volume):
"""新的主动腿成交"""
spread = self.spread
# 输出日志
self.writeLog(u'主动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
# 将主动腿成交带上方向
if direction == DIRECTION_SHORT:
volume = -volume
# 计算主动腿成交后,对应的价差仓位
spread = self.spread
activeRatio = spread.activeLeg.ratio
spreadVolume = round(trade.volume / activeRatio) # 四舍五入求主动腿成交量对应的价差份数
spreadVolume = round(volume / activeRatio) # 四舍五入求主动腿成交量对应的价差份数
# 计算价差新仓位,对应的被动腿需要对冲部分
for leg in self.spread.passiveLegs:
@ -402,28 +455,28 @@ class SniperAlgo(StAlgoTemplate):
else:
self.hedgingTaskDict[leg.vtSymbol] += newHedgingTask
# 输出日志
self.writeLog(u'主动腿%s成交,方向%s,数量%s' %(trade.vtSymbol, trade.direction, trade.volume))
# 发出被动腿对冲委托
self.hedgeAllPassiveLegs()
#----------------------------------------------------------------------
def newPassiveLegTrade(self, trade):
def newPassiveLegTrade(self, vtSymbol, direction, volume):
"""新的被动腿成交"""
if trade.vtSymbol in self.hedgingTaskDict:
if vtSymbol in self.hedgingTaskDict:
# 计算完成的对冲数量
if trade.direction == DIRECTION_LONG:
hedgedVolume = trade.volume
if direction == DIRECTION_LONG:
hedgedVolume = volume
else:
hedgedVolume = -trade.volume
hedgedVolume = -volume
# 计算剩余尚未完成的数量
self.hedgingTaskDict[trade.vtSymbol] -= hedgedVolume
self.hedgingTaskDict[vtSymbol] -= hedgedVolume
# 如果已全部完成,则从字典中移除
if not self.hedgingTaskDict[trade.vtSymbol]:
del self.hedgingTaskDict[trade.vtSymbol]
if not self.hedgingTaskDict[vtSymbol]:
del self.hedgingTaskDict[vtSymbol]
# 输出日志
self.writeLog(u'被动腿%s成交,方向%s,数量%s' %(trade.vtSymbol, trade.direction, trade.volume))
self.writeLog(u'被动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
#----------------------------------------------------------------------
def cancelLegOrder(self, vtSymbol):
@ -432,6 +485,8 @@ class SniperAlgo(StAlgoTemplate):
return
orderList = self.legOrderDict[vtSymbol]
if not orderList:
return
for vtOrderID in orderList:
self.algoEngine.cancelOrder(vtOrderID)

View File

@ -149,6 +149,8 @@ class StSpread(object):
self.shortPos = min(self.shortPos, legAdjustedShortPos)
# 计算净仓位
self.longPos = int(self.longPos)
self.shortPos = int(self.shortPos)
self.netPos = self.longPos - self.shortPos
#----------------------------------------------------------------------

View File

@ -389,25 +389,45 @@ class StAlgoEngine(object):
def buy(self, vtSymbol, price, volume, payup=0):
"""买入"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_OPEN, price, volume, payup)
return [vtOrderID]
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def sell(self, vtSymbol, price, volume, payup=0):
"""卖出"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_CLOSE, price, volume, payup)
return [vtOrderID]
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def short(self, vtSymbol, price, volume, payup=0):
"""卖空"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_OPEN, price, volume, payup)
return [vtOrderID]
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def cover(self, vtSymbol, price, volume, payup=0):
"""平空"""
vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_CLOSE, price, volume, payup)
return [vtOrderID]
l = []
if vtOrderID:
l.append(vtOrderID)
return l
#----------------------------------------------------------------------
def putAlgoEvent(self, algo):

View File

@ -33,7 +33,7 @@ class StTickMonitor(BasicMonitor):
d['askPrice'] = {'chinese':u'卖价', 'cellType':AskCell}
d['askVolume'] = {'chinese':u'卖量', 'cellType':AskCell}
d['time'] = {'chinese':u'时间', 'cellType':BasicCell}
d['symbol'] = {'chinese':u'代码', 'cellType':BasicCell}
d['symbol'] = {'chinese':u'价差公式', 'cellType':BasicCell}
self.setHeaderDict(d)
self.setDataKey('name')

View File

@ -11,6 +11,7 @@ vn.okcoin的gateway接入
import os
import json
from datetime import datetime
from time import sleep
from copy import copy
from threading import Condition
from Queue import Queue

View File

@ -5,7 +5,7 @@ import shelve
from collections import OrderedDict
from datetime import datetime
from pymongo import MongoClient
from pymongo import MongoClient, ASCENDING
from pymongo.errors import ConnectionFailure
from vnpy.event import Event
@ -209,12 +209,17 @@ class MainEngine(object):
self.writeLog(text.DATA_INSERT_FAILED)
#----------------------------------------------------------------------
def dbQuery(self, dbName, collectionName, d):
def dbQuery(self, dbName, collectionName, d, sortKey='', sortDirection=ASCENDING):
"""从MongoDB中读取数据d是查询要求返回的是数据库查询的指针"""
if self.dbClient:
db = self.dbClient[dbName]
collection = db[collectionName]
if sortKey:
cursor = collection.find(d).sort(sortKey, sortDirection) # 对查询出来的数据进行排序
else:
cursor = collection.find(d)
if cursor:
return list(cursor)
else: