From a2a075ba6ad68d3f11fb108ed70f4ff24f588620 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 21 Jun 2017 17:19:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BB=B7=E5=B7=AE=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/rpc/vnrpc.py | 4 +- vnpy/trader/app/ctaStrategy/ctaBacktesting.py | 5 +- vnpy/trader/app/spreadTrading/stAlgo.py | 294 ++++++++++++++++++ vnpy/trader/app/spreadTrading/stEngine.py | 185 +++++++++-- 4 files changed, 466 insertions(+), 22 deletions(-) create mode 100644 vnpy/trader/app/spreadTrading/stAlgo.py diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py index c3a979c9..f8c00047 100644 --- a/vnpy/rpc/vnrpc.py +++ b/vnpy/rpc/vnrpc.py @@ -180,7 +180,7 @@ class RpcServer(RpcObject): def publish(self, topic, data): """ 广播推送数据 - topic:主题内容 + topic:主题内容(注意必须是ascii编码) data:具体的数据 """ # 序列化数据 @@ -294,6 +294,8 @@ class RpcClient(RpcObject): 订阅特定主题的广播数据 可以使用topic=''来订阅所有的主题 + + 注意topic必须是ascii编码 """ self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) diff --git a/vnpy/trader/app/ctaStrategy/ctaBacktesting.py b/vnpy/trader/app/ctaStrategy/ctaBacktesting.py index 62b07b9d..01b29280 100644 --- a/vnpy/trader/app/ctaStrategy/ctaBacktesting.py +++ b/vnpy/trader/app/ctaStrategy/ctaBacktesting.py @@ -812,7 +812,7 @@ class BacktestingEngine(object): l.append(pool.apply_async(optimize, (strategyClass, setting, targetName, self.mode, self.startDate, self.initDays, self.endDate, - self.slippage, self.rate, self.size, + self.slippage, self.rate, self.size, self.priceTick, self.dbName, self.symbol))) pool.close() pool.join() @@ -929,7 +929,7 @@ def formatNumber(n): #---------------------------------------------------------------------- def optimize(strategyClass, setting, targetName, mode, startDate, initDays, endDate, - slippage, rate, size, + slippage, rate, size, priceTick, dbName, symbol): """多进程优化时跑在每个进程中运行的函数""" engine = BacktestingEngine() @@ -939,6 +939,7 @@ def optimize(strategyClass, setting, targetName, engine.setSlippage(slippage) engine.setRate(rate) engine.setSize(size) + engine.setPriceTick(priceTick) engine.setDatabase(dbName, symbol) engine.initStrategy(strategyClass, setting) diff --git a/vnpy/trader/app/spreadTrading/stAlgo.py b/vnpy/trader/app/spreadTrading/stAlgo.py new file mode 100644 index 00000000..ba6e1a86 --- /dev/null +++ b/vnpy/trader/app/spreadTrading/stAlgo.py @@ -0,0 +1,294 @@ +# encoding: UTF-8 + +from math import floor + +from vnpy.trader.vtConstant import (EMPTY_INT, EMPTY_FLOAT, + EMPTY_STRING, EMPTY_UNICODE, + DIRECTION_LONG, DIRECTION_SHORT, + STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED) + + + +######################################################################## +class StAlgoTemplate(object): + """价差算法交易模板""" + MODE_LONGSHORT = 'longshort' + MODE_LONGONLY = 'long' + MODE_SHORTONLY = 'short' + + #---------------------------------------------------------------------- + def __init__(self, algoEngine, spread): + """Constructor""" + self.algoEngine = algoEngine # 算法引擎 + self.spreadName = spread.name # 价差名称 + self.spread = spread # 价差对象 + + self.algoName = EMPTY_STRING # 算法名称 + + self.active = False # 工作状态 + self.mode = self.MODE_LONGSHORT # 工作模式 + + self.buyPrice = EMPTY_FLOAT # 开平仓价格 + self.sellPrice = EMPTY_FLOAT + self.shortPrice = EMPTY_FLOAT + self.coverPrice = EMPTY_FLOAT + + self.maxPosSize = EMPTY_INT # 最大单边持仓量 + self.maxOrderSize = EMPTY_INT # 最大单笔委托量 + + #---------------------------------------------------------------------- + def updateSpreadTick(self, spread): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def updateSpreadPos(self, spread): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def updateTrade(self, trade): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def updateOrder(self, order): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def updateTimer(self): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def start(self): + """""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def stop(self): + """""" + raise NotImplementedError + + + +######################################################################## +class SniperAlgo(StAlgoTemplate): + """狙击算法(市价委托)""" + FINISHED_STATUS = [STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED] + + #---------------------------------------------------------------------- + def __init__(self, algoEngine): + """Constructor""" + super(SniperAlgo, self).__init__(algoEngine) + + self.algoName = u'Sniper' + self.hedgeInterval = 2 # 对冲腿撤单再发前的等待时间 + self.hedgeCount = 0 # 对冲计数 + + self.activeVtSymbol = spread.activeLeg.vtSymbol # 主动腿代码 + self.passiveVtSymbols = [leg.vtSymbol for leg in spread.passiveLegs] # 被动腿代码列表 + + # 缓存每条腿对象的字典 + self.legDict = {} + self.legDict[spread.activeLeg.vtSymbol] = spread.activeLeg + for leg in spread.passiveLegs: + self.legDict[leg.vtSymbol] = leg + + self.hedgingTaskDict = {} # 被动腿需要对冲的数量字典 vtSymbol:volume + self.legOrderDict = {} # vtSymbol: list of vtOrderID + + #---------------------------------------------------------------------- + def updateSpreadTick(self, spread): + """价差行情更新""" + self.spread = spread + + #---------------------------------------------------------------------- + def updateSpreadPos(self, spread): + """价差持仓更新""" + self.spread = spread + + #---------------------------------------------------------------------- + def updateTrade(self, trade): + """成交更新""" + if not self.active: + return + + if trade.vtSymbol == self.activeVtSymbol: + self.newActiveLegTrade(trade) + else: + self.newPassiveLegTrade(trade) + + #---------------------------------------------------------------------- + def updateOrder(self, order): + """委托更新""" + if not self.active: + return + + # 只处理完成委托 + if order.status in self.FINISHED_STATUS: + vtOrderID = order.vtOrderID + vtSymbol = order.vtSymbol + + # 从委托列表中移除该委托 + orderList = self.legOrderDict[vtSymbol] + + if vtOrderID in orderList: + orderList.remove(vtOrderID) + + # 检查若是被动腿,且已经没有未完成委托,则执行对冲 + if not orderList and vtSymbol in self.passiveVtSymbols: + self.hedgePassiveLeg(vtSymbol) + + #---------------------------------------------------------------------- + def updateTimer(self): + """计时更新""" + self.hedgeCount += 1 + + # 计时到达对冲间隔后,则对尚未成交的全部被动腿委托全部撤单 + # 收到撤单回报后,会自动发送新的对冲委托 + if self.hedgeCount > self.hedgeInterval: + self.cancelAllPassiveLegOrders() + self.hedgeCount = 0 + + #---------------------------------------------------------------------- + def start(self): + """启动""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def stop(self): + """停止""" + self.active = False + + self.hedgingTaskDict.clear() + self.cancelAllOrders() + + #---------------------------------------------------------------------- + def sendLegOrder(self, leg, legVolume): + """发送每条腿的委托""" + vtSymbol = leg.vtSymbol + volume = abs(legVolume) + payup = leg.payup + + # 发送委托 + if legVolume > 0: + price = leg.askPrice + + if leg.shortPos > 0: + orderList = self.algoEngine.cover(vtSymbol, price, volume, payup) + else: + orderList = self.algoEngine.buy(vtSymbol, price, volume, payup) + + elif legVolume < 0: + price = leg.bidPrice + + if leg.longPos > 0: + orderList = self.algoEngine.sell(vtSymbol, price, volume, payup) + else: + orderList = self.algoEngine.short(vtSymbol, price, volume, payup) + + # 保存到字典中 + if vtSymbol not in self.legOrderDict: + self.legOrderDict[vtSymbol] = orderList + else: + self.legOrderDict[vtSymbol].extend(orderList) + + #---------------------------------------------------------------------- + def quoteActiveLeg(self, direction): + """发出主动腿""" + spread = self.spread + + if direction == DIRECTION_LONG: + spreadVolume = min(spread.askVolume, + self.maxPosSize - spread.netPos, + self.maxOrderSize) + else: + spreadVolume = min(spread.bidVolume, + self.maxPosSize + spread.netPos, + self.maxOrderSize) + + if spreadVolume <= 0: + return + + leg = self.legDict[self.activeVtSymbol] + legVolume = spreadVolume * leg.ratio + + self.sendLegOrder(leg, legVolume) + + #---------------------------------------------------------------------- + def hedgePassiveLeg(self, vtSymbol): + """被动腿对冲""" + if vtSymbol not in self.hedgingTaskDict: + return + legVolume = self.hedgingTaskDict[vtSymbol] + + leg = self.legDict[vtSymbol] + + self.sendLegOrder(leg, legVolume) + + #---------------------------------------------------------------------- + def hedgeAllPassiveLegs(self): + """执行所有被动腿对冲""" + for vtSymbol in self.hedgingTaskDict.keys(): + self.hedgePassiveLeg(vtSymbol) + + #---------------------------------------------------------------------- + def newActiveLegTrade(self, trade): + """新的主动腿成交""" + spread = self.spread + + # 计算主动腿成交后,对应的价差仓位 + activeRatio = spread.activeLeg.ratio + spreadVolume = round(trade.volume / activeRatio) # 四舍五入求主动腿成交量对应的价差份数 + + # 计算价差新仓位,对应的被动腿需要对冲部分 + for leg in self.spread.passiveLegs: + newHedgingTask = leg.ratio * spreadVolume + + if leg.vtSymbol not in self.hedgingTaskDict: + self.hedgingTaskDict[leg.vtSymbol] = newHedgingTask + else: + self.hedgingTaskDict[leg.vtSymbol] += newHedgingTask + + #---------------------------------------------------------------------- + def newPassiveLegTrade(self, trade): + """新的被动腿成交""" + if trade.vtSymbol in self.hedgingTaskDict: + # 计算完成的对冲数量 + if trade.direction == DIRECTION_LONG: + hedgedVolume = trade.volume + else: + hedgedVolume = -trade.volume + + # 计算剩余尚未完成的数量 + self.hedgingTaskDict[trade.vtSymbol] -= hedgedVolume + + # 如果已全部完成,则从字典中移除 + if not self.hedgingTaskDict[trade.vtSymbol]: + del self.hedgingTaskDict[trade.vtSymbol] + + #---------------------------------------------------------------------- + def cancelLegOrder(self, vtSymbol): + """撤销某条腿的委托""" + if vtSymbol not in self.legOrderDict: + return + + orderList = self.legOrderDict[vtSymbol] + + for vtOrderID in orderList: + self.algoEngine.cancelOrder(vtOrderID) + + #---------------------------------------------------------------------- + def cancelAllOrders(self): + """撤销全部委托""" + for orderList in self.legOrderDict.values(): + for vtOrderID in orderList: + self.algoEngine.cancelOrder(vtOrderID) + + #---------------------------------------------------------------------- + def cancelAllPassiveLegOrders(self): + """撤销全部被动腿委托""" + for vtSymbol in self.passiveVtSymbols: + self.cancelLegOrder(vtSymbol) \ No newline at end of file diff --git a/vnpy/trader/app/spreadTrading/stEngine.py b/vnpy/trader/app/spreadTrading/stEngine.py index 4ce43a52..c93bb3cd 100644 --- a/vnpy/trader/app/spreadTrading/stEngine.py +++ b/vnpy/trader/app/spreadTrading/stEngine.py @@ -6,8 +6,10 @@ from copy import copy from vnpy.event import Event from vnpy.trader.vtFunction import getJsonPath -from vnpy.trader.vtEvent import EVENT_TICK, EVENT_TRADE, EVENT_POSITION -from vnpy.trader.vtObject import VtSubscribeReq, VtLogData +from vnpy.trader.vtEvent import (EVENT_TICK, EVENT_TRADE, EVENT_POSITION, + EVENT_TIMER, EVENT_ORDER) +from vnpy.trader.vtObject import (VtSubscribeReq, VtOrderReq, + VtCancelOrderReq, VtLogData) from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT, OFFSET_OPEN, OFFSET_CLOSE) @@ -16,8 +18,8 @@ from .stBase import (StLeg, StSpread, EVENT_SPREADTRADING_TICK, ######################################################################## -class StEngine(object): - """""" +class StDataEngine(object): + """价差数据计算引擎""" settingFileName = 'ST_setting.json' settingFilePath = getJsonPath(settingFileName, __file__) @@ -143,15 +145,13 @@ class StEngine(object): spread = self.vtSymbolSpreadDict[tick.vtSymbol] spread.calculatePrice() - # 推送价差行情更新 - newSpread = copy(spread) - + # 推送价差行情更新 event1 = Event(EVENT_SPREADTRADING_TICK+spread.name) - event1.dict_['data'] = newSpread + event1.dict_['data'] = spread self.eventEngine.put(event1) event2 = Event(EVENT_SPREADTRADING_TICK) - event2.dict_['data'] = newSpread + event2.dict_['data'] = spread self.eventEngine.put(event2) #---------------------------------------------------------------------- @@ -184,18 +184,16 @@ class StEngine(object): spread.calculatePos() # 推送价差持仓更新 - newSpread = copy(spread) - event1 = Event(EVENT_SPREADTRADING_POS+spread.name) - event1.dict_['data'] = newSpread + event1.dict_['data'] = spread self.eventEngine.put(event1) event2 = Event(EVENT_SPREADTRADING_POS) - event2.dict_['data'] = newSpread + event2.dict_['data'] = spread self.eventEngine.put(event2) #---------------------------------------------------------------------- - def processPositionEvent(self, event): + def processPosEvent(self, event): """处理持仓推送""" # 检查持仓是否需要处理 pos = event.dict_['data'] @@ -217,14 +215,12 @@ class StEngine(object): spread.calculatePos() # 推送价差持仓更新 - newSpread = copy(spread) - event1 = Event(EVENT_SPREADTRADING_POS+spread.name) - event1.dict_['data'] = newSpread + event1.dict_['data'] = spread self.eventEngine.put(event1) event2 = Event(EVENT_SPREADTRADING_POS) - event2.dict_['data'] = newSpread + event2.dict_['data'] = spread self.eventEngine.put(event2) #---------------------------------------------------------------------- @@ -232,7 +228,7 @@ class StEngine(object): """""" self.eventEngine.register(EVENT_TICK, self.processTickEvent) self.eventEngine.register(EVENT_TRADE, self.processTradeEvent) - self.eventEngine.register(EVENT_POSITION, self.processPositionEvent) + self.eventEngine.register(EVENT_POSITION, self.processPosEvent) #---------------------------------------------------------------------- def subscribeMarketData(self, vtSymbol): @@ -262,4 +258,155 @@ class StEngine(object): """停止""" pass + +######################################################################## +class StAlgoEngine(object): + """价差算法交易引擎""" + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + self.algoDict = {} # spreadName:algo + self.vtSymbolAlgoDict = {} # vtSymbol:algo + + self.registerEvent() + + #---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.eventEngine.register(EVENT_SPREADTRADING_TICK, self.processSpreadTickEvent) + self.eventEngine.register(EVENT_SPREADTRADING_POS, self.processSpreadPosEvent) + self.eventEngine.register(EVENT_TRADE, self.processTradeEvent) + self.eventEngine.register(EVENT_ORDER, self.processOrderEvent) + self.eventEngine.register(EVENT_TIMER, self.processTimerEvent) + + #---------------------------------------------------------------------- + def processSpreadTickEvent(self, event): + """处理价差行情事件""" + spread = event.dict_['data'] + + algo = self.algoDict.get(spread.name, None) + if algo: + algo.updateSpreadTick(spread) + + #---------------------------------------------------------------------- + def processSpreadPosEvent(self, event): + """处理价差持仓事件""" + spread = event.dict_['data'] + + algo = self.algoDict.get(spread.name, None) + if algo: + algo.updateSpreadPos(spread) + + #---------------------------------------------------------------------- + def processTradeEvent(self, event): + """处理成交事件""" + trade = event.dict_['data'] + + algo = self.algoDict.get(trade.vtSymbol, None) + if algo: + algo.updateTrade(trade) + + #---------------------------------------------------------------------- + def processOrderEvent(self, event): + """处理委托事件""" + order = event.dict_['data'] + + algo = self.algoDict.get(order.vtSymbol, None) + if algo: + algo.updateOrder(order) + + #---------------------------------------------------------------------- + def processTimerEvent(self, event): + """""" + for algo in self.algoDict.values(): + algo.updateTimer() + + #---------------------------------------------------------------------- + def sendOrder(self, vtSymbol, direction, offset, price, volume, payup=0): + """发单""" + contract = self.mainEngine.getContract(vtSymbol) + if not contract: + return '' + + req = VtOrderReq() + req.symbol = contract.symbol + req.exchange = contract.exchange + req.direction = direction + req.offset = offset + req.volume = volume + + if direction == DIRECTION_LONG: + req.price = price + payup * contract.priceTick + else: + req.price = price - payup * contract.priceTick + + vtOrderID = self.mainEngine.sendOrder(req, contract.gatewayName) + return vtOrderID + + #---------------------------------------------------------------------- + def cancelOrder(self, vtOrderID): + """撤单""" + order = self.mainEngine.getOrder(vtOrderID) + if not order: + return + + req = VtCancelOrderReq() + req.symbol = order.symbol + req.exchange = order.exchange + req.frontID = order.frontID + req.sessionID = order.sessionID + req.orderID = order.orderID + + self.mainEngine.cancelOrder(req, order.gatewayName) + + #---------------------------------------------------------------------- + def buy(self, vtSymbol, price, volume, payup=0): + """买入""" + vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_OPEN, price, volume, payup) + return [vtOrderID] + + #---------------------------------------------------------------------- + def sell(self, vtSymbol, price, volume, payup=0): + """卖出""" + vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_CLOSE, price, volume, payup) + return [vtOrderID] + + #---------------------------------------------------------------------- + def short(self, vtSymbol, price, volume, payup=0): + """卖空""" + vtOrderID = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_OPEN, price, volume, payup) + return [vtOrderID] + + #---------------------------------------------------------------------- + def cover(self, vtSymbol, price, volume, payup=0): + """平空""" + vtOrderID = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_CLOSE, price, volume, payup) + return [vtOrderID] + + +######################################################################## +class StEngine(object): + """价差引擎""" + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + self.dataEngine = StDataEngine(mainEngine, eventEngine) + self.algoEngine = StAlgoEngine(mainEngine, eventEngine) + + #---------------------------------------------------------------------- + def loadSetting(self): + """""" + self.dataEngine.loadSetting() + + + + \ No newline at end of file