From 22180cf239bf61bb6afd5912c5bf98c522862b09 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Fri, 21 Sep 2018 21:24:53 +0800 Subject: [PATCH] bug fix --- vnpy/data/binance/binance_data.py | 110 +- vnpy/data/okex/okex_data.py | 131 +- vnpy/trader/app/ctaStrategy/ctaEngine.py | 122 +- vnpy/trader/app/ctaStrategy/ctaGridTrade.py | 1680 ++++++++++++++++- vnpy/trader/app/ctaStrategy/ctaLineBar.py | 156 +- .../gateway/binanceGateway/binanceGateway.py | 36 +- .../trader/gateway/okexGateway/okexGateway.py | 2 +- 7 files changed, 2154 insertions(+), 83 deletions(-) diff --git a/vnpy/data/binance/binance_data.py b/vnpy/data/binance/binance_data.py index 7f890559..85e8dc18 100644 --- a/vnpy/data/binance/binance_data.py +++ b/vnpy/data/binance/binance_data.py @@ -31,21 +31,33 @@ PERIOD_MAPPING['1month'] = '1M' SYMBOL_LIST = ['ltc_btc', 'eth_btc', 'etc_btc', 'bch_btc', 'btc_usdt', 'eth_usdt', 'ltc_usdt', 'etc_usdt', 'bch_usdt', 'etc_eth','bt1_btc','bt2_btc','btg_btc','qtum_btc','hsr_btc','neo_btc','gas_btc', - 'qtum_usdt','hsr_usdt','neo_usdt','gas_usdt'] + 'qtum_usdt','hsr_usdt','neo_usdt','gas_usdt','bnb_usdt','eos_usdt'] class BinanceData(object): + # ---------------------------------------------------------------------- - def __init__(self, strategy): + def __init__(self, strategy=None): """ 构造函数 - :param strategy: 上层策略,主要用与使用strategy.writeCtaLog() + :param strategy: 上层策略,主要用与使用writeLog() """ self.strategy = strategy + self.client = Client('', '') - self.client = Client('-', '-') + def writeLog(self,content): + if self.strategy and hasattr(self.strategy,'writeCtaLog'): + self.strategy.writeCtaLog(content) + else: + print(content) - def get_bars(self, symbol, period, callback, bar_is_completed=False, bar_freq=1, start_dt=None): + def writeError(self,content): + if self.strategy and hasattr(self.strategy,'writeCtaError'): + self.strategy.writeCtaError(content) + else: + print(content,file=sys.stderr) + + def get_bars(self, symbol, period, callback, bar_is_completed=False,bar_freq=1, start_dt=None): """ 返回k线数据 symbol:合约 @@ -53,10 +65,10 @@ class BinanceData(object): """ ret_bars = [] if symbol not in SYMBOL_LIST: - self.strategy.writeCtaError(u'{} 合约{}不在下载清单中'.format(datetime.now(), symbol)) + self.writeError(u'{} 合约{}不在下载清单中'.format(datetime.now(), symbol)) return False,ret_bars if period not in PERIOD_MAPPING: - self.strategy.writeCtaError(u'{} 周期{}不在下载清单中'.format(datetime.now(), period)) + self.writeError(u'{} 周期{}不在下载清单中'.format(datetime.now(), period)) return False,ret_bars if self.client is None: return False,ret_bars @@ -64,37 +76,107 @@ class BinanceData(object): binance_symbol = symbol.upper().replace('_' , '') binance_period = PERIOD_MAPPING.get(period) - self.strategy.writeCtaLog('{}开始下载binance:{} {}数据.'.format(datetime.now(), binance_symbol, binance_period)) + self.writeLog('{}开始下载binance:{} {}数据.'.format(datetime.now(), binance_symbol, binance_period)) + bars = [] try: bars = self.client.get_klines(symbol=binance_symbol, interval=binance_period) + bar_len = len(bars) for i, bar in enumerate(bars): add_bar = CtaBarData() try: add_bar.vtSymbol = symbol add_bar.symbol = symbol add_bar.datetime = datetime.fromtimestamp(bar[0] / 1000) + utcdatetime = datetime.utcfromtimestamp(bar[0] / 1e3) add_bar.date = add_bar.datetime.strftime('%Y-%m-%d') add_bar.time = add_bar.datetime.strftime('%H:%M:%S') - add_bar.tradingDay = add_bar.date + # 币安的交易日,是按照utc来计算的 + add_bar.tradingDay = utcdatetime.strftime('%Y-%m-%d') add_bar.open = float(bar[1]) add_bar.high = float(bar[2]) add_bar.low = float(bar[3]) add_bar.close = float(bar[4]) add_bar.volume = float(bar[5]) + ret_bars.append(add_bar) except Exception as ex: - self.strategy.writeCtaError( + self.writeError( 'error when convert bar:{},ex:{},t:{}'.format(bar, str(ex), traceback.format_exc())) - return False + return False,ret_bars if start_dt is not None and bar.datetime < start_dt: continue - ret_bars.append(add_bar) + if callback is not None: - callback(add_bar, bar_is_completed, bar_freq) + # 最后一个bar,可能是不完整的,强制修改 + if i == bar_len -1 and bar_is_completed: + # 根据秒数算的话,要+1,例如13:31,freq=31,第31根bar + freq = int((datetime.now() - add_bar.datetime).total_seconds() / 60) + 1 + callback(add_bar, False, freq) + else: + callback(add_bar, bar_is_completed, bar_freq) + return True,ret_bars except Exception as ex: - self.strategy.writeCtaError('exception in get:{},{},{}'.format(binance_symbol,str(ex), traceback.format_exc())) + self.writeError('exception in get:{},{},{}'.format(binance_symbol,str(ex), traceback.format_exc())) return False,ret_bars + def download_bars(self, symbol, period, start_dt=None): + """ + 返回k线数据 + symbol:合约 + period: 周期: 1min,3min,5min,15min,30min,1day,3day,1hour,2hour,4hour,6hour,12hour + """ + ret_bars = [] + + if symbol not in SYMBOL_LIST: + self.writeError(u'{} 合约{}不在下载清单中'.format(datetime.now(), symbol)) + return ret_bars + if period not in PERIOD_MAPPING: + self.writeError(u'{} 周期{}不在下载清单中'.format(datetime.now(), period)) + return ret_bars + if self.client is None: + return ret_bars + + binance_symbol = symbol.upper().replace('_', '') + binance_period = PERIOD_MAPPING.get(period) + + self.writeLog('{}开始下载binance:{} {}数据.'.format(datetime.now(), binance_symbol, binance_period)) + + try: + bars = self.client.get_klines(symbol=binance_symbol, interval=binance_period) + for i, bar in enumerate(bars): + add_bar = {} + try: + bar_datetime = datetime.fromtimestamp(bar[0] / 1e3) + utc_datetime = datetime.utcfromtimestamp(bar[0] / 1e3) + add_bar['datetime'] = bar_datetime.strftime('%Y-%m-%d %H:%M:%S') + add_bar['date'] = bar_datetime.strftime('%Y-%m-%d') + add_bar['time'] = bar_datetime.strftime('%H:%M:%S') + add_bar['open'] = float(bar[1]) + add_bar['high'] = float(bar[2]) + add_bar['low'] = float(bar[3]) + add_bar['close'] = float(bar[4]) + add_bar['volume'] = float(bar[5]) + add_bar['tradingDay'] = utc_datetime.strftime('%Y-%m-%d') + ret_bars.append(add_bar) + except Exception as ex: + self.writeError( + 'error when convert bar:{},ex:{},t:{}'.format(bar, str(ex), traceback.format_exc())) + return ret_bars + + if start_dt is not None and bar_datetime < start_dt: + continue + + return ret_bars + except Exception as ex: + self.writeError('exception in get:{},{},{}'.format(binance_symbol,str(ex), traceback.format_exc())) + return ret_bars + +if __name__ == '__main__': + binance_data = BinanceData() + bars = binance_data.download_bars(symbol='bnb_usdt', period='1day') + + for bar in bars: + print(bar['datetime']) diff --git a/vnpy/data/okex/okex_data.py b/vnpy/data/okex/okex_data.py index 1b52dcde..9d7b35b1 100644 --- a/vnpy/data/okex/okex_data.py +++ b/vnpy/data/okex/okex_data.py @@ -2,7 +2,7 @@ # 从okex下载数据 from datetime import datetime, timezone - +import sys import requests import execjs import traceback @@ -11,13 +11,13 @@ from vnpy.trader.app.ctaStrategy.ctaBase import CtaBarData, CtaTickData period_list = ['1min','3min','5min','15min','30min','1day','1week','1hour','2hour','4hour','6hour','12hour'] symbol_list = ['ltc_btc','eth_btc','etc_btc','bch_btc','btc_usdt','eth_usdt','ltc_usdt','etc_usdt','bch_usdt', 'etc_eth','bt1_btc','bt2_btc','btg_btc','qtum_btc','hsr_btc','neo_btc','gas_btc', - 'qtum_usdt','hsr_usdt','neo_usdt','gas_usdt','eos_usdt'] + 'qtum_usdt','hsr_usdt','neo_usdt','gas_usdt','eos_usdt','ada_usdt','xmr_usdt','zrx_usdt','zil_usdt'] class OkexData(object): # ---------------------------------------------------------------------- - def __init__(self, strategy): + def __init__(self, strategy=None): """ 构造函数 :param strategy: 上层策略,主要用与使用strategy.writeCtaLog() @@ -29,6 +29,18 @@ class OkexData(object): self.session = requests.session() self.session.keep_alive = False + def writeLog(self,content): + if self.strategy and hasattr(self.strategy,'writeCtaLog'): + self.strategy.writeCtaLog(content) + else: + print(content) + + def writeError(self,content): + if self.strategy: + self.strategy.writeCtaError(content) + else: + print(content,file=sys.stderr) + def get_bars(self, symbol, period, callback, bar_is_completed=False,bar_freq=1, start_dt=None): """ 返回k线数据 @@ -38,24 +50,24 @@ class OkexData(object): ret_bars = [] if symbol not in symbol_list: self.strategy.writeCtaError(u'{} {}不在下载清单中'.format(datetime.now(), symbol)) - return False,ret_bars + return False,ret_bars url = u'https://www.okex.com/api/v1/kline.do?symbol={}&type={}'.format(symbol, period) - self.strategy.writeCtaLog('{}开始下载:{} {}数据.URL:{}'.format(datetime.now(), symbol, period,url)) + self.writeLog('{}开始下载:{} {}数据.URL:{}'.format(datetime.now(), symbol, period,url)) content = None - bars = [] try: content = self.session.get(url).content.decode('gbk') - bars = execjs.eval(content) except Exception as ex: self.strategy.writeCtaError('exception in get:{},{},{}'.format(url,str(ex), traceback.format_exc())) return False,ret_bars + bars = execjs.eval(content) + bar_len = len(bars) for i, bar in enumerate(bars): if len(bar) < 5: self.strategy.writeCtaError('error when import bar:{}'.format(bar)) - return False + return False,ret_bars if i == 0: continue add_bar = CtaBarData() @@ -71,17 +83,116 @@ class OkexData(object): add_bar.low = float(bar[3]) add_bar.close = float(bar[4]) add_bar.volume = float(bar[5]) + ret_bars.append(add_bar) except Exception as ex: self.strategy.writeCtaError('error when convert bar:{},ex:{},t:{}'.format(bar, str(ex), traceback.format_exc())) return False,ret_bars if start_dt is not None and bar.datetime < start_dt: continue - ret_bars.append(add_bar) + if callback is not None: - callback(add_bar, bar_is_completed, bar_freq) + # 最后一个bar,可能是不完整的,强制修改 + if i == bar_len -1 and bar_is_completed: + # 根据秒数算的话,要+1,例如13:31,freq=31,第31根bar + freq = int((datetime.now() - add_bar.datetime).total_seconds()/60)+1 + callback(add_bar,False,freq) + else: + callback(add_bar, bar_is_completed, bar_freq) return True,ret_bars + def download_bars(self, symbol, period, size_=None, start_dt=None): + """ + 返回k线数据 + symbol:合约 + period: 周期: 1min,3min,5min,15min,30min,1day,3day,1hour,2hour,4hour,6hour,12hour + """ + ret_bars = [] + if symbol not in symbol_list: + msg = u'{} {}不在下载清单中'.format(datetime.now(), symbol) + if self.strategy: + self.strategy.writeCtaError(msg) + else: + print(msg) + return ret_bars + + url = u'https://www.okex.com/api/v1/kline.do?symbol={}&type={}'.format(symbol, period) + if isinstance(size_,int): + url = url + u'&size={}'.format(size_) + if start_dt is not None and isinstance(start_dt,datetime): + url = url + u'&since={}'.format(int(start_dt.timestamp()*1000)) + self.writeLog('{}开始下载:{} {}数据.URL:{}'.format(datetime.now(), symbol, period,url)) + + content = None + try: + content = self.session.get(url).content.decode('gbk') + except Exception as ex: + self.writeError('exception in get:{},{},{}'.format(url,str(ex), traceback.format_exc())) + return ret_bars + + bars = execjs.eval(content) + + if not isinstance(bars,list): + self.writeError('返回数据不是list:{}'.format(content)) + return ret_bars + + for i, bar in enumerate(bars): + if len(bar) < 5: + self.writeError('error when get bar:{}'.format(bar)) + return ret_bars + if i == 0: + continue + add_bar = {} + try: + + bar_datetime= datetime.fromtimestamp(bar[0] / 1000) + add_bar['datetime'] = bar_datetime.strftime('%Y-%m-%d %H:%M:%S') + add_bar['date'] = bar_datetime.strftime('%Y-%m-%d') + add_bar['time'] = bar_datetime.strftime('%H:%M:%S') + add_bar['open'] = float(bar[1]) + add_bar['high'] = float(bar[2]) + add_bar['low'] = float(bar[3]) + add_bar['close'] = float(bar[4]) + add_bar['volume'] = float(bar[5]) + except Exception as ex: + self.writeError('error when convert bar:{},ex:{},t:{}'.format(bar, str(ex), traceback.format_exc())) + + ret_bars.append(add_bar) + + return ret_bars +class TestStrategy(object): + + def __init__(self): + + self.minDiff = 1 + self.shortSymbol = 'btc' + self.vtSymbol = 'btc' + + self.TMinuteInterval = 1 + def addBar(self,bar,bar_is_completed, bar_freq): + print(u'tradingDay:{},dt:{},{} o:{},h:{},l:{},c:{},v:{}'.format(bar.tradingDay, bar.datetime,bar.vtSymbol, bar.open, bar.high, + bar.low, bar.close, bar.volume)) + def onBar(self, bar): + print(u'tradingDay:{},dt:{},{} o:{},h:{},l:{},c:{},v:{}'.format(bar.tradingDay,bar.datetime,bar.vtSymbol, bar.open, bar.high, bar.low, bar.close, bar.volume)) + + def writeCtaLog(self, content): + print(content) + + def writeCtaError(self, content): + print(content) + +if __name__ == '__main__': + t = TestStrategy() + + ok_data = OkexData(t) + + # 下载日线 + #bars = ok_data.download_bars(symbol='btc_usdt', period='1day') + # 下载分钟 + bars = ok_data.download_bars(symbol='btc_usdt', period='1min',start_dt=datetime.strptime('2017-06-01','%Y-%m-%d')) + for bar in bars: + print(bar['datetime']) + diff --git a/vnpy/trader/app/ctaStrategy/ctaEngine.py b/vnpy/trader/app/ctaStrategy/ctaEngine.py index 4c777e80..5f7ac80f 100644 --- a/vnpy/trader/app/ctaStrategy/ctaEngine.py +++ b/vnpy/trader/app/ctaStrategy/ctaEngine.py @@ -40,7 +40,10 @@ from vnpy.trader.vtFunction import todayDate, getJsonPath from vnpy.trader.util_mail import sendmail # 加载 strategy目录下所有的策略 from vnpy.trader.app.ctaStrategy.strategy import STRATEGY_CLASS - +try: + from vnpy.trader.util_wechat import * +except: + print('import util_wechat fail') MATRIX_DB_NAME = 'matrix' # 虚拟策略矩阵的数据库名称 POSITION_DISPATCH_COLL_NAME = 'position_dispatch' # 虚拟策略矩阵的策略调度配置collection名称 @@ -148,7 +151,7 @@ class CtaEngine(object): contract = self.mainEngine.getContract(vtSymbol) if contract is None: self.writeCtaError( - u'vtEngine.sendOrder取不到{}合约得信息,{}发送{}委托:{},v{}'.format(vtSymbol, strategy.name, orderType, price, + u'vtEngine.sendOrder取不到{}合约得信息,{}发送{}委托:{},v{}失败'.format(vtSymbol, strategy.name, orderType, price, volume)) return '' @@ -256,12 +259,18 @@ class CtaEngine(object): if strategy: self.orderStrategyDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系 - - self.writeCtaLog(u'策略%s发送委托,%s, %s,%s,%s@%s' - % (strategy.name, vtSymbol, req.offset, req.direction, volume, price)) + msg = u'策略%s发送委托,%s, %s,%s,%s@%s' % (strategy.name, vtSymbol, req.offset, req.direction, volume, price) + self.writeCtaLog(msg) else: - self.writeCtaLog(u'%s发送委托,%s, %s,%s,%s@%s' - % ('CtaEngine', vtSymbol, req.offset, req.direction, volume, price)) + msg = u'%s发送委托,%s, %s,%s,%s@%s' % ('CtaEngine', vtSymbol, req.offset, req.direction, volume, price) + self.writeCtaLog(msg) + + # 发送微信 + try: + sendWeChatMsg(msg, target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass + return vtOrderID # ---------------------------------------------------------------------- @@ -283,6 +292,13 @@ class CtaEngine(object): req.sessionID = order.sessionID req.orderID = order.orderID self.mainEngine.cancelOrder(req, order.gatewayName) + + # 发送微信 + try: + msg = u'发送撤单指令,%s, %s,%s' % (order.symbol, order.orderID, order.gatewayName) + sendWeChatMsg(msg, target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass else: if order.status == STATUS_ALLTRADED: self.writeCtaLog(u'委托单({0}已执行,无法撤销'.format(vtOrderID)) @@ -326,6 +342,13 @@ class CtaEngine(object): order.totalVolume - order.tradedVolume)) self.mainEngine.cancelOrder(req, order.gatewayName) + # 发送微信 + try: + msg = u'撤销所有单,{}'.format(symbol) + sendWeChatMsg(msg, target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass + # ---------------------------------------------------------------------- def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy): """发停止单(本地实现)""" @@ -361,10 +384,14 @@ class CtaEngine(object): self.stopOrderDict[stopOrderID] = so # 字典中不会删除 self.workingStopOrderDict[stopOrderID] = so # 字典中会删除 - self.writeCtaLog(u'发停止单成功,' - u'Id:{0},Symbol:{1},Type:{2},Price:{3},Volume:{4}' - u'.'.format(stopOrderID, vtSymbol, orderType, price, volume)) + msg = u'发停止单成功,Id:{},Symbol:{},Type:{},Price:{},Volume:{}'.format(stopOrderID, vtSymbol, orderType, price, volume) + self.writeCtaLog(msg) + # 发送微信 + try: + sendWeChatMsg(msg, target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass return stopOrderID # ---------------------------------------------------------------------- @@ -379,9 +406,21 @@ class CtaEngine(object): so.status = STOPORDER_CANCELLED # STOPORDER_WAITING =》STOPORDER_CANCELLED del self.workingStopOrderDict[stopOrderID] # 删除 self.writeCtaLog(u'撤销停止单:{0}成功.'.format(stopOrderID)) + + # 发送微信 + try: + sendWeChatMsg(u'撤销停止单:{0}成功.'.format(stopOrderID), target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass return True else: self.writeCtaLog(u'撤销停止单:{0}失败,不存在Id.'.format(stopOrderID)) + + # 发送微信 + try: + sendWeChatMsg(u'撤销停止单:{0}失败,不存在Id.'.format(stopOrderID), target=self.ctaEngine.mainEngine.gatewayDetailList[0]['gatewayName']) + except: + pass return False # ---------------------------------------------------------------------- @@ -441,11 +480,13 @@ class CtaEngine(object): ctaTick = CtaTickData() d = ctaTick.__dict__ for key in d.keys(): - d[key] = tick.__getattribute__(key) + if key in tick.__dict__: + d[key] = tick.__getattribute__(key) if not ctaTick.datetime: # 添加datetime字段 - ctaTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y-%m-%d %H:%M:%S.%f') + tickDate = tick.date.replace('-', '') + ctaTick.datetime = datetime.strptime(' '.join([tickDate, tick.time]), '%Y%m%d %H:%M:%S.%f') # 逐个推送到策略实例中 l = self.tickStrategyDict[tick.vtSymbol] @@ -735,6 +776,26 @@ class CtaEngine(object): pass self.mainEngine.writeCritical(content) + def sendAlertToWechat(self,content,target): + """发送微信告警""" + gw = EMPTY_STRING + if len(self.mainEngine.connected_gw_names)>0: + gw = self.mainEngine.connected_gw_names[0] + else: + if len(self.mainEngine.gatewayDetailList) > 0: + d = self.mainEngine.gatewayDetailList[0] + if isinstance(d, dict): + gw = d.get('gatewayDisplayName','Gateway') + + if len(gw)>0: + content = u'[gw:{}]{}'.format(gw,content) + try: + from vnpy.trader import util_wechat + self.writeCtaLog(u'发送微信通知:{}'.format(content)) + util_wechat.sendWeChatMsg(content=content, target=target,level=util_wechat.WECHAT_LEVEL_DEBUG) + except Exception as ex: + self.writeCtaError(u'调用微信接口失败:{} {}'.format(str(ex), traceback.format_exc())) + def sendCtaSignal(self, source, symbol, direction, price, level): """发出交易信号""" s = VtSignalData() @@ -850,8 +911,8 @@ class CtaEngine(object): self.writeCtaLog(u'撤销运行中策略{}的强制清仓,恢复运行'.format(name)) return False except Exception as ex: - self.writeCtaCritical(u'撤销运行中策略{}的强制清仓时异常:{}'.format(name, str(ex))) - traceback.print_exc() + self.writeCtaCritical(u'撤销运行中策略{}的强制清仓时异常:{},{}'.format(name, str(ex),traceback.format_exc())) + return False else: # 防止策略重名 @@ -921,13 +982,21 @@ class CtaEngine(object): # 自动初始化 if 'auto_init' in setting: if setting['auto_init'] == True: - self.writeCtaLog(u'自动初始化策略') - self.initStrategy(name=name) + self.writeCtaLog(u'自动初始化策略:{}'.format(name)) + try: + self.initStrategy(name=name) + except Exception as ex: + self.writeCtaCritical(u'初始化策略:{} 异常,{},{}'.format(name,str(ex),traceback.format_exc())) + return False if 'auto_start' in setting: if setting['auto_start'] == True: - self.writeCtaLog(u'自动启动策略') - self.startStrategy(name=name) + self.writeCtaLog(u'自动启动策略:{}'.format(name)) + try: + self.startStrategy(name=name) + except Exception as ex: + self.writeCtaCritical(u'自动启动策略:{} 异常,{},{}'.format(name, str(ex), traceback.format_exc())) + return False return True def initStrategy(self, name, force=False): @@ -1593,7 +1662,7 @@ class CtaEngine(object): :return: """ try: - with open(self.settingfilePath) as f: + with open(self.settingfilePath,'r',encoding='UTF-8') as f: l = json.load(f) for setting in l: try: @@ -1761,9 +1830,13 @@ class CtaEngine(object): # 发出日志 content =u'策略{}触发异常已停止.{}'.format(strategy.name,traceback.format_exc()) - self.writeCtaLog(content) + self.writeCtaError(content) self.mainEngine.writeCritical(content) + if hasattr(strategy,'backtesting') and hasattr(strategy,'wechat_source'): + if not strategy.backtesting and len(strategy.wechat_source)>0: + self.sendAlertToWechat(content=content,target=strategy.wechat_source) + # ---------------------------------------------------------------------- # 仓位持久化相关 def savePosition(self): @@ -2047,7 +2120,14 @@ class PositionBuffer(object): self.shortYd = EMPTY_INT self.frozen = EMPTY_FLOAT - + + # ---------------------------------------------------------------------- + def toStr(self): + """更新显示信息""" + str = u'long:{},yd:{},td:{}, short:{},yd:{},td:{}, fz:{};' \ + .format(self.longPosition, self.longYd, self.longToday, + self.shortPosition, self.shortYd, self.shortToday,self.frozen) + return str #---------------------------------------------------------------------- def updatePositionData(self, pos): """更新持仓数据""" diff --git a/vnpy/trader/app/ctaStrategy/ctaGridTrade.py b/vnpy/trader/app/ctaStrategy/ctaGridTrade.py index caa4fabf..0ec1a4ab 100644 --- a/vnpy/trader/app/ctaStrategy/ctaGridTrade.py +++ b/vnpy/trader/app/ctaStrategy/ctaGridTrade.py @@ -1,6 +1,5 @@ # encoding: UTF-8 - import os,sys from datetime import datetime import json @@ -9,6 +8,7 @@ import shutil from collections import OrderedDict from vnpy.trader.app.ctaStrategy.ctaBase import * from vnpy.trader.vtConstant import * +import traceback DEBUGCTALOG = True @@ -22,6 +22,7 @@ ChangeLog: 170707,增加重用选项 170719, 增加网格类型 171208,增加openPrices/snapshot + 180420, 增加CtaLegacyGridTrade(传统网格:上网格做多,下网格做空) """ # 网格类型 @@ -37,7 +38,7 @@ class CtaGrid(object): """ - def __init__(self, direction, openprice, closeprice, stopprice=EMPTY_FLOAT, volume=1, type=EMPTY_STRING): + def __init__(self, direction, openprice, closeprice, stopprice=EMPTY_FLOAT, volume=1, type=EMPTY_STRING, vtSymbol=EMPTY_STRING): self.id = str(uuid.uuid1()) self.direction = direction # 交易方向(LONG:多,正套;SHORT:空,反套) @@ -45,6 +46,7 @@ class CtaGrid(object): self.closePrice = closeprice # 平仓价格 self.stopPrice = stopprice # 止损价格 + self.vtSymbol = vtSymbol # 品种合约 self.volume = volume # 开仓数量 self.tradedVolume = EMPTY_INT # 成交数量 开仓时,为开仓数量,平仓时,为平仓数量 @@ -72,6 +74,7 @@ class CtaGrid(object): j['closePrice'] = self.closePrice # 平仓价格 j['stopPrice'] = self.stopPrice # 止损价格 + j['vtSymbol'] = self.vtSymbol # 品种数量 j['volume'] = self.volume # 开仓数量 j['tradedVolume'] = self.tradedVolume # 成交数量 @@ -97,14 +100,53 @@ class CtaGrid(object): return j + def fromJson(self,j): + """从JSON恢复""" + try: + self.id = j.get('id',None) + if self.id is None: + self.id = str(uuid.uuid1()) + self.direction = j.get('direction',EMPTY_STRING) + self.closePrice = j.get('closePrice', EMPTY_FLOAT) + self.openPrice = j.get('openPrice', EMPTY_FLOAT) + self.stopPrice = j.get('stopPrice', EMPTY_FLOAT) + self.orderStatus = j.get('orderStatus',False) # 挂单状态: True,已挂单,False,未挂单 + self.orderRef = j.get('orderRef',EMPTY_STRING) # OrderId + self.openStatus = j.get('openStatus',False) # 开仓状态 + self.closeStatus = j.get('closeStatus',False) # 平仓状态 + + strTime = j.get('openDatetime',None) + if strTime == EMPTY_STRING or strTime is None: + self.openDatetime = None + else: + self.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S') + + self.vtSymbol = j.get('vtSymbol',EMPTY_STRING) + self.volume = j.get('volume',EMPTY_FLOAT) + self.tradedVolume = j.get('tradedVolume',EMPTY_FLOAT) # 已交易的合约数量 + self.lockGrids = j.get('lockGrids',[]) + self.type = j.get('type',EMPTY_STRING) + if self.type == False: + self.type = EMPTY_STRING + self.reuse = j.get('reuse',False) + self.openPrices = j.get('openPrices',{}) + self.snapshot = j.get('snapshot',{}) + except Exception as ex: + print('CtaGrid fromJson Exception:{} {}'.format(str(ex),traceback.format_exc()),file=sys.stderr) + def toStr(self): """输入字符串""" - str = u'o:{0}/{1};c:{2}/{3},r:{4}/opentime:{5}/ordertime:{6}'\ + str = u'o:{}/{};c:{}/{},r:{}/opentime:{}/ordertime:{}'\ .format(self.openPrice, self.openStatus, self.closePrice, self.closeStatus, self.orderRef, self.openDatetime, self.orderDatetime) - return str + if len(self.vtSymbol) > 0: + return u'{} {}'.format(self.vtSymbol,str) + else: + return str + def __eq__(self,other): + return self.id == other.id class CtaGridTrade(object): @@ -146,6 +188,10 @@ class CtaGridTrade(object): self.json_file_path = os.path.join(self.get_data_folder(), u'{}_Grids.json'.format(self.jsonName)) # 网格的路径 + def changeGridHeight(self, grid_height=EMPTY_FLOAT, grid_win=EMPTY_FLOAT): + self.gridHeight = grid_height + self.gridWin = grid_win + def getVolumeRate(self, gridIndex=EMPTY_INT): """获取网格索引对应的开仓数量比例""" if gridIndex >= len(self.volumeList) or gridIndex < 0: @@ -331,21 +377,21 @@ class CtaGridTrade(object): if x.openStatus == True and x.type in types] return grids - def getOpenedGrids(self, direction): + def getOpenedGrids(self, direction,allow_empty_volume = False): """获取已开仓的网格 direction:做多、做空方向: 做多方向时,从dnGrids中获取; 做空方向时,从upGrids中获取 """ # 状态一致,价格大于最低价格 if direction == DIRECTION_LONG: grids = [x for x in self.dnGrids - if x.openStatus == True and x.volume - x.tradedVolume > 0] + if x.openStatus == True and (x.volume - x.tradedVolume > 0 or allow_empty_volume)] return grids # 状态一致,开仓价格小于最高价格 if direction == DIRECTION_SHORT: grids = [x for x in self.upGrids - if x.openStatus == True and x.volume - x.tradedVolume > 0] + if x.openStatus == True and (x.volume - x.tradedVolume > 0 or allow_empty_volume)] return grids def getGrids(self, direction, ordered=False, opened=False, closed=False, begin=EMPTY_FLOAT, end=EMPTY_FLOAT, type=EMPTY_STRING): @@ -612,20 +658,47 @@ class CtaGridTrade(object): self.writeCtaLog(u'清除上网格[open={0}]'.format(x.openPrice)) self.upGrids.remove(x) - def rebuildGrids(self, direction, upline=EMPTY_FLOAT, dnline=EMPTY_FLOAT, midline=EMPTY_FLOAT, upRate=1, dnRate = 1): + def moveGrids(self, direction, pricedelta, type=EMPTY_STRING): + """按pricedelta平移所有网格""" + if direction == DIRECTION_LONG: + for x in self.dnGrids[:]: + x.openPrice += pricedelta # 开仓价格 + x.closePrice += pricedelta # 平仓价格 + x.stopPrice += pricedelta # 止损价格 + x.type = type # 网格类型标签 + # self.openPrices = {} # 套利使用,开仓价格,symbol:price + + if direction == DIRECTION_SHORT: + for x in self.upGrids[:]: + x.openPrice += pricedelta # 开仓价格 + x.closePrice += pricedelta # 平仓价格 + x.stopPrice += pricedelta # 止损价格 + x.type = type # 网格类型标签 + # self.openPrices = {} # 套利使用,开仓价格,symbol:price + + def rebuildGrids(self, direction, upline=EMPTY_FLOAT, dnline=EMPTY_FLOAT, midline=EMPTY_FLOAT, upRate=1, dnRate=1, reuse=False, useVariableSteps=False): """重新拉网 清除未挂单的网格, 在上轨/下轨位置重新挂单 upRate , 上轨网格高度比率 dnRate, 下轨网格高度比率 """ - self.writeCtaLog(u'重新拉网:upline:{0},dnline:{1}'.format(upline, dnline)) + self.writeCtaLog(u'重新拉网:direction:{},upline:{},dnline:{}'.format(direction, upline, dnline)) # 检查上下网格的高度比率,不能低于0.5 if upRate < 0.5 or dnRate < 0.5: upRate = max(0.5, upRate) dnRate = max(0.5, dnRate) + # 计算每个网格的高度。如果使用变高的网格,则每过5格把网格搞的增加(self.gridHeight/2) + gridSteps = [0]*self.maxLots + for i in range(1, self.maxLots, 1): + if useVariableSteps == False: + gridSteps[i] = self.gridHeight * i + else: + j = int(i / 5) + gridSteps[i] = gridSteps[i-1] + self.gridHeight + self.gridHeight / 2 * j + # 重建下网格(移除未挂单、保留开仓得网格、在最低价之下才增加网格 if direction == DIRECTION_LONG: min_long_price = midline @@ -656,16 +729,16 @@ class CtaGridTrade(object): if lots > 0: for i in range(0, lots, 1): # 做多,开仓价为下阻力线-网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 - open_price = int((dnline - self.gridHeight * i * dnRate) / self.minDiff ) * self.minDiff + open_price = int((dnline - gridSteps[i+remainLots] * dnRate) / self.minDiff ) * self.minDiff close_price = int((open_price + self.gridWin* dnRate)/self.minDiff) * self.minDiff grid = CtaGrid(direction=DIRECTION_LONG, openprice=open_price, closeprice=close_price, volume=self.volume*self.getVolumeRate(remainLots + i)) - + grid.reuse = reuse self.dnGrids.append(grid) - self.writeCtaLog(u'重新拉下网格:[{0}~{1}]'.format(dnline, dnline-self.gridHeight * lots)) + self.writeCtaLog(u'重新拉下网格:[{0}~{1}]'.format(dnline, dnline - gridSteps[-1] * dnRate)) # 重建上网格(移除未挂单、保留开仓得网格、在最高价之上才增加网格 if direction == DIRECTION_SHORT: @@ -697,16 +770,17 @@ class CtaGridTrade(object): if lots > 0: # 做空,开仓价为上阻力线+网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 for i in range(0, lots, 1): - open_price = int((upline + self.gridHeight * i * upRate) / self.minDiff) * self.minDiff + open_price = int((upline + gridSteps[i+remainLots] * upRate) / self.minDiff) * self.minDiff close_price = int((open_price - self.gridWin * upRate) / self.minDiff) * self.minDiff grid = CtaGrid(direction=DIRECTION_SHORT, openprice=open_price, closeprice=close_price, volume=self.volume*self.getVolumeRate(remainLots + i)) + grid.reuse = reuse self.upGrids.append(grid) - self.writeCtaLog(u'重新拉上网格:[{0}~{1}]'.format(upline, upline+self.gridHeight * lots)) + self.writeCtaLog(u'重新拉上网格:[{0}~{1}]'.format(upline, upline + gridSteps[-1] * upRate)) def recount_avg_open_price(self): """计算网格的平均开仓价""" @@ -913,6 +987,1324 @@ class CtaGridTrade(object): self.writeCtaLog(u'GrideTrade保存文件{}完成'.format(grid_json_file)) + def load(self, direction, openStatusFilter=[]): + """ + 加载本地Json至网格 + :param direction: DIRECTION_SHORT,做空网格;DIRECTION_LONG,做多网格 + :param openStatusFilter: 缺省,不做过滤;True,只提取已开仓的数据,False,只提取未开仓的数据 + :return: + """ + data = {} + grids_save_path = self.get_data_folder() + + if self.jsonName != self.strategy.name: + self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name)) + self.jsonName = self.strategy.name + + # 移除旧版上/下网格列表 + old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName)) + old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName)) + + if os.path.exists(old_up_json_file): + try: + with open(old_up_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['up_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file)) + data['up_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_up_json_file) + except: + pass + + if os.path.exists(old_dn_json_file): + try: + with open(old_dn_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['dn_grids'] = json.load(f) + except IOError as ex: + self.writeCtaLog(u'读取网格{}出错,ex:{}'.format(old_dn_json_file,str(ex))) + data['dn_grids'] = [] + try: # 移除旧版下网格文件 + os.remove(old_dn_json_file) + except: + pass + + # 若新版文件不存在,就保存;若存在,就优先使用新版数据文件 + grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName)) + if not os.path.exists(grid_json_file): + if len(data) == 0: + data['up_grids'] = [] + data['dn_grids'] = [] + self.writeCtaLog(u'{}不存在,保存'.format(grid_json_file)) + else: + self.writeCtaLog(u'{}不存在,保存'.format(grid_json_file)) + try: + with open(grid_json_file, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) + except Exception as ex: + self.writeCtaLog(u'写入网格文件{}异常:{}'.format(grid_json_file,str(ex))) + else: + # 读取json文件 + try: + with open(grid_json_file, 'r', encoding='utf8') as f: + data = json.load(f) + except Exception as ex: + self.writeCtaLog(u'读取网格文件{}异常:{}'.format(grid_json_file,str(ex))) + + # 从文件获取数据 + json_grids = [] + if direction == DIRECTION_SHORT : + json_grids = data['up_grids'] if 'up_grids' in data else [] + + elif direction == DIRECTION_LONG: + json_grids = data['dn_grids'] if 'dn_grids' in data else [] + + grids = [] + for i in json_grids: + + closePrice = float(i['closePrice']) + openPrice = float(i['openPrice']) + stopPrice = float(i['stopPrice']) + + self.writeCtaLog(u'load Grid:open:{0},close:{1},stop:{2}'.format(openPrice, closePrice, stopPrice)) + + grid = CtaGrid(direction=i['direction'], openprice=openPrice, closeprice=closePrice, + stopprice=stopPrice, volume=i['volume']) + grid.orderStatus = i['orderStatus'] # 挂单状态: True,已挂单,False,未挂单 + grid.orderRef = i['orderRef'] # OrderId + grid.openStatus = i['openStatus'] # 开仓状态 + grid.closeStatus = i['closeStatus'] # 平仓状态 + + strTime = i['openDatetime'] + if strTime == EMPTY_STRING or type(strTime)==type(None): + grid.openDatetime = None + else: + grid.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S') + + try: + grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量 + except KeyError: + grid.tradedVolume = EMPTY_INT + try: + grid.lockGrids = i['lockGrids'] + except KeyError: + grid.lockGrids = [] + + try: + grid.type = i['type'] + if grid.type == False: + grid.type = EMPTY_STRING + except KeyError: + grid.type = EMPTY_STRING + + try: + grid.reuse = i['reuse'] + except KeyError: + grid.reuse = False + + try: + grid.openPrices = i['openPrices'] + except KeyError: + grid.openPrices = {} + + try: + grid.snapshot = i['snapshot'] + except KeyError: + grid.snapshot = {} + + self.writeCtaLog(grid.toStr()) + + # 增加对开仓状态的过滤,满足某些策略只提取已开仓的网格数据 + if len(openStatusFilter) > 0: + if grid.openStatus not in openStatusFilter: + continue + + grids.append(grid) + + # 更新开仓均价 + self.recount_avg_open_price() + return grids + + def get_data_folder(self): + """获取数据目录""" + # 工作目录 + currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) + if os.path.isdir(currentFolder): + # 如果工作目录下,存在data子目录,就使用data子目录 + return currentFolder + else: + # 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data + return os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) + + def changeStrategyName(self, old_name, new_name): + """ + 在线更换策略实例名称,需要把Json文件也转移 + :param old_name: + :param new_name: + :return: + """ + if old_name == new_name: + self.writeCtaLog(u'更换策略实例名称失败,old:{} =>new:{}'.format(old_name, new_name)) + return + + data_folder = self.get_data_folder() + + self.jsonName = new_name + # 旧文件 + old_up_json_file = os.path.join(data_folder, u'{0}_upGrids.json'.format(old_name)) + old_dn_json_file = os.path.join(data_folder, u'{0}_dnGrids.json'.format(old_name)) + old_json_file = os.path.join(data_folder, u'{0}_Grids.json'.format(old_name)) + + # 新文件 + self.json_file_path = os.path.join(data_folder, u'{0}_Grids.json'.format(new_name)) + if os.path.isfile(self.json_file_path): # 新文件若存在,移除 + try: + os.remove(self.json_file_path) + except Exception as ex: + self.writeCtaLog(u'GridTrade.changeStrategyName 删除文件:{}异常:{}'.format(old_up_json_file,str(ex))) + + # 移动文件 + if os.path.isfile(old_json_file): + try: + shutil.move(old_json_file, self.json_file_path) + return + except Exception as ex: + self.writeCtaLog(u'GridTrade.changeStrategyName 移动文件:{}=》{}异常:{}'.format(old_up_json_file, self.json_file_path, str(ex))) + else: + data = {} + if os.path.isfile(old_up_json_file): + try: + with open(old_up_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['up_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file)) + data['up_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_up_json_file) + except IOError: + self.writeCtaLog(u'移除网格{}出错'.format(old_up_json_file)) + + else: + data['up_grids'] = [] + if os.path.isfile(old_dn_json_file): + try: + with open(old_dn_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['dn_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_dn_json_file)) + data['dn_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_dn_json_file) + except IOError: + self.writeCtaLog(u'移除网格{}出错'.format(old_dn_json_file)) + else: + data['dn_grids'] = [] + + try: + with open(self.json_file_path, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) + except IOError as ex: + self.writeCtaLog(u'写入网格文件{}异常:{}'.format(self.json_file_path, str(ex))) + + def getJsonFilePath(self): + """ + 返回上下网格的文件路径 + :return: + """ + return self.json_file_path + + def getTypesOfOpenedGrids(self, direction, include_empty=False): + """ + 获取开仓的网格类型列表 + :param direction: + :param include_empty: 是否包含空值的类型 + :return: + """ + grids = self.getOpenedGrids(direction) + type_list = [] + for g in grids: + if g.type not in type_list and (g.type !=EMPTY_STRING if not include_empty else True): + type_list.append(g.type) + + return type_list + +class CtaLegacyGridTrade(object): + """网格交易类 + 包括两个方向的网格队列, + v1, 传统网格:上网格做多,下网格做空 + """ + + def __init__(self, strategy, maxlots=5, height=2, win=2, vol=1, minDiff = 1): + """初始化 + maxlots,最大网格数 + height,网格高度(绝对值,包含minDiff) + win,盈利数(包含minDiff) + vol,网格开仓数 + minDiff, 最小价格跳动 + """ + self.minDiff = minDiff + self.strategy = strategy + self.jsonName = self.strategy.origName #策略名称 + self.useMongoDb = True + + self.maxLots = maxlots # 缺省网格数量 + self.gridHeight = height # 最小网格高度 + self.gridWin = win # 最小止盈高度 + + self.volume = vol # 每次网格开仓数量 + self.volumeList = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] # 梯级开仓数量比例 + + self.upGrids = [] # 上网格列表,专门做多 + self.dnGrids = [] # 下网格列表,专门做空 + + self.avg_up_open_price = EMPTY_FLOAT # 上网格开仓均价 + self.avg_dn_open_price = EMPTY_FLOAT # 下网格开仓均价 + + self.max_up_open_price = EMPTY_FLOAT # 上网格开仓均价 + self.min_dn_open_price = EMPTY_FLOAT # 下网格开仓均价 + + self.up_json_file_path = None # 下网格(做多网格)的路径 + self.dn_json_file_path = None # 下网格(做多网格)的路径 + + self.fixedGrids = False # Set grids with fixed price or not + self.fixedGridInitPrice = EMPTY_FLOAT + self.gridBufferLength = 5 # Close grids only when (# of grids >= this value) + + def enableFixedGrids(self, price, gridbufferlen=5): + self.fixedGrids = True + self.fixedGridInitPrice = price + self.gridBufferLength = gridbufferlen + + def disableFixedGrids(self, price): + self.fixedGrids = False + self.fixedGridInitPrice = 0 + + def changeGridHeight(self, grid_height=EMPTY_FLOAT, grid_win=EMPTY_FLOAT): + self.gridHeight = grid_height + self.gridWin = grid_win + + def getVolumeRate(self, gridIndex=EMPTY_INT): + """获取网格索引对应的开仓数量比例""" + if gridIndex >= len(self.volumeList) or gridIndex < 0: + return 1 + rate = self.volumeList[gridIndex] + + if rate == 0: + return 1 + else: + return rate + + def initGrid(self, upline=EMPTY_FLOAT, dnline=EMPTY_FLOAT, max_lots=EMPTY_INT, reuse=False): + """初始化网格队列,突破开仓 + upline,上阻力线 + dnline,下支撑线 + """ + if max_lots > EMPTY_INT: + lots = max_lots + else: + lots = self.maxLots + + newupline = upline + newdnline = dnline + if self.fixedGrids is True: + if abs(self.fixedGridInitPrice - upline) % self.gridHeight > 0: + newupline = upline - abs(self.fixedGridInitPrice - upline) % self.gridHeight + self.gridHeight # >= current value + newdnline = dnline - abs(self.fixedGridInitPrice - dnline) % self.gridHeight # <= current value + + self.writeCtaLog(u'初始化网格队列,upline:{}({}),dnline:{}({}), '.format(upline, newupline, dnline, newdnline)) + upline = newupline + dnline = newdnline + + # 初始化上网格列表 + if len(self.upGrids) == 0: + self.upGrids = self.load(direction= DIRECTION_LONG) + if len(self.upGrids) > 0: + self.writeCtaLog(u'上网格从文件{}加载完成'.format(self.up_json_file_path)) + else: + # 做多,开仓价为上阻力线+网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 + for i in range(0, lots, 1): + grid = CtaGrid(direction=DIRECTION_LONG, + openprice=upline + self.gridHeight*i, + closeprice=upline + self.gridHeight*i - self.gridWin, + volume=self.volume*self.getVolumeRate(i)) + if reuse: + grid.reuse = reuse + self.upGrids.append(grid) + + self.writeCtaLog(u'上网格{0}~{1}初始化完成'.format(upline,upline+self.gridHeight*self.maxLots)) + self.save(direction=DIRECTION_LONG) + + # 初始化下网格列表 + if len(self.dnGrids) == 0: + self.dnGrids = self.load(direction= DIRECTION_SHORT) + if len(self.dnGrids) > 0: + self.writeCtaLog(u'下网格从文件{}加载完成'.format(self.dn_json_file_path)) + else: + for i in range(0, lots, 1): + # 做空,开仓价为下阻力线-网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 + grid = CtaGrid(direction=DIRECTION_SHORT, + openprice=dnline - self.gridHeight * i, + closeprice=dnline - self.gridHeight * i + self.gridWin, + volume=self.volume*self.getVolumeRate(i)) + if reuse: + grid.reuse = reuse + self.dnGrids.append(grid) + + self.writeCtaLog(u'下网格{0}~{1}初始化完成'.format(dnline,dnline-self.gridHeight*self.maxLots)) + self.save(direction=DIRECTION_SHORT) + + def writeCtaLog(self, log): + self.strategy.writeCtaLog(log) + + def toStr(self,direction): + """显示网格""" + + pendingCloseList = u'' # 平仓清单 + pendingOpenList = u'' # 开仓清单 + deactiveList = u'' # 待激活清单 + openedVolumeDict = {} # 开仓数量汇总 + + if direction == DIRECTION_SHORT: + numDeactivated = 0 + for grid in self.dnGrids: + t = EMPTY_STRING + if grid.type == LOCK_GRID: + t = u'L:' + elif grid.type == TREND_GRID: + t = u'T:' + elif grid.type == PERIOD_GRID: + t = u'P:' + else: + t = grid.type + # 待平仓 + if grid.openStatus : + opened_volume = 0 + if grid.tradedVolume == EMPTY_INT: + pendingCloseList = pendingCloseList + u'{}[{}->{},sp:{},v:{}];'\ + .format(t,grid.openPrice, grid.closePrice, grid.stopPrice, grid.volume) + opened_volume = grid.volume + else: + pendingCloseList = pendingCloseList + u'[{}{}->{},sp:{},v:{}/{}];'\ + .format(t, grid.openPrice, grid.closePrice, grid.volume, grid.stopPrice, grid.tradedVolume) + opened_volume = grid.volume - grid.tradedVolume + + if grid.type != EMPTY_STRING: + openedVolumeDict[grid.type] = opened_volume if grid.type not in openedVolumeDict else opened_volume + openedVolumeDict[grid.type] + openedVolumeDict['All'] = opened_volume if 'All' not in openedVolumeDict else opened_volume + openedVolumeDict['All'] + + # 待开仓成交 + elif not grid.openStatus and grid.orderStatus: + if grid.tradedVolume == EMPTY_INT: + pendingOpenList = pendingOpenList + u'[{}{},v:{}];'.format(t, grid.openPrice, grid.volume) + else: + pendingOpenList = pendingOpenList + u'[{}{},v:{}/{}];'\ + .format(t, grid.openPrice, grid.volume, grid.tradedVolume) + + # 等待挂单 + else: + if numDeactivated < 5: + deactiveList = deactiveList + u'[{}{}];'.format(t,grid.openPrice) + numDeactivated += 1 + else: + break + + return u'Short:空:待平:[{}],{};开:{};待:{}'.format(openedVolumeDict, pendingCloseList, pendingOpenList, deactiveList) + + if direction == DIRECTION_LONG: + numDeactivated = 0 + for grid in self.upGrids: + t = EMPTY_STRING + if grid.type == LOCK_GRID: + t = u'L:' + elif grid.type == TREND_GRID: + t = u'T:' + elif grid.type == PERIOD_GRID: + t = u'P:' + else: + t = grid.type + # 待平仓 + if grid.openStatus: + opened_volume = 0 + if grid.tradedVolume == EMPTY_INT: + pendingCloseList = pendingCloseList + u'[{} {}->{},sp:{},v:{}];'\ + .format(t,grid.openPrice, grid.closePrice, grid.stopPrice, grid.volume) + opened_volume = grid.volume + else: + pendingCloseList = pendingCloseList + u'[{} {}->{},sp:{}, v:{}/{}];'\ + .format(t,grid.openPrice, grid.closePrice, grid.stopPrice, grid.volume, grid.tradedVolume) + opened_volume = grid.volume - grid.tradedVolume + if grid.type != EMPTY_STRING: + openedVolumeDict[grid.type] = opened_volume if grid.type not in openedVolumeDict else opened_volume + openedVolumeDict[grid.type] + openedVolumeDict['All'] = opened_volume if 'All' not in openedVolumeDict else opened_volume + openedVolumeDict['All'] + # 待开仓成交 + elif not grid.openStatus and grid.orderStatus: + if grid.tradedVolume == EMPTY_INT: + pendingOpenList = pendingOpenList + u'[{}{},v:{}];'.format(t, grid.openPrice, grid.volume) + else: + pendingOpenList = pendingOpenList + u'[{}{},v:{}/{}];'\ + .format(t, grid.openPrice, grid.volume, grid.tradedVolume) + + # 等待挂单 + else: + if numDeactivated < 5: + deactiveList = deactiveList + u'[{}{}];'.format(t, grid.openPrice) + numDeactivated += 1 + else: + break + + return u'Long:多:待平:[{}],{};开:{};待:{}'.format(openedVolumeDict, pendingCloseList,pendingOpenList,deactiveList) + + def getGridsWithTypes(self, direction, types=[]): + """获取符合类型的网格 + direction:做多、做空方向: 做多方向时,从dnGrids中获取; 做空方向时,从upGrids中获取 + type:网格类型列表, + """ + # 状态一致,价格大于最低价格 + if direction == DIRECTION_SHORT: + grids = [x for x in self.dnGrids + if x.type in types] + return grids + + # 状态一致,开仓价格小于最高价格 + if direction == DIRECTION_LONG: + grids = [x for x in self.upGrids + if x.type in types] + return grids + + def getOpenedGridsWithTypes(self, direction, types=[]): + """获取符合类型的持仓网格 + direction:做多、做空方向: 做多方向时,从dnGrids中获取; 做空方向时,从upGrids中获取 + type:网格类型列表, + """ + # 状态一致,价格大于最低价格 + if direction == DIRECTION_SHORT: + grids = [x for x in self.dnGrids + if x.openStatus == True and x.type in types] + return grids + + # 状态一致,开仓价格小于最高价格 + if direction == DIRECTION_LONG: + grids = [x for x in self.upGrids + if x.openStatus == True and x.type in types] + return grids + + def getOpenedGrids(self, direction): + """获取已开仓的网格 + direction:做多、做空方向: 做多方向时,从dnGrids中获取; 做空方向时,从upGrids中获取 + """ + # 状态一致,价格大于最低价格 + if direction == DIRECTION_SHORT: + grids = [x for x in self.dnGrids + if x.openStatus == True] + + return grids + + # 状态一致,开仓价格小于最高价格 + if direction == DIRECTION_LONG: + grids = [x for x in self.upGrids + if x.openStatus == True] + return grids + + def getGrids(self, direction, ordered=False, opened=False, closed=False, begin=EMPTY_FLOAT, end=EMPTY_FLOAT, type=EMPTY_STRING, delta=0): + """获取未挂单的网格 + direction:做多、做空方向: 做空方向时,从dnGrids中获取; 做多方向时,从upGrids中获取 + ordered:是否已提交至服务器 + opened:是否已开仓 + closed:是否已平仓 + begin:开始价格, + end:结束价格, + delta:基于begin价格的偏移,处理滑点,得到更好的开仓点位 + """ + + # 状态一致,价格大于最低价格 + if direction == DIRECTION_SHORT: + if begin == EMPTY_FLOAT: begin = sys.maxsize + if end == EMPTY_FLOAT: end = 0-sys.maxsize + begin += delta + grids = [x for x in self.dnGrids + if x.orderStatus == ordered + and x.openStatus == opened + and x.closeStatus == closed + and x.openPrice >= begin + and x.openPrice <= end + and x.type == type] + + return grids + + # 状态一致,开仓价格小于最高价格 + if direction == DIRECTION_LONG: + if begin == EMPTY_FLOAT: begin = 0-sys.maxsize + if end == EMPTY_FLOAT: end = sys.maxsize + begin -= delta + grids = [x for x in self.upGrids + if x.orderStatus == ordered + and x.openStatus == opened + and x.closeStatus == closed + and x.openPrice <= begin + and x.openPrice >= end + and x.type == type] + return grids + + def getGridById(self,direction, id): + """寻找指定id的网格""" + if id == EMPTY_STRING or len(id) <1: + return + if direction == DIRECTION_SHORT: + for x in self.dnGrids[:]: + if x.id == id: + self.writeCtaLog(u'找到下网格[open={},close={},stop={},volume={}]'.format(x.openPrice,x.closePrice,x.stopPrice,x.volume)) + return x + + if direction == DIRECTION_LONG: + for x in self.upGrids[:]: + if x.id == id: + self.writeCtaLog(u'找到上网格[open={},close={},stop={},volume={}]'.format(x.openPrice,x.closePrice,x.stopPrice,x.volume)) + return x + + return None + + def getPosition(self,direction, type=EMPTY_STRING): + """获取特定类型的网格持仓""" + if direction == DIRECTION_SHORT: + long_vol = [x.volume-x.tradedVolume for x in self.dnGrids if x.openStatus and x.type == type] + return sum(long_vol) + + if direction == DIRECTION_LONG: + short_vol = [x.volume - x.tradedVolume for x in self.upGrids if x.openStatus and x.type == type] + return sum(short_vol) + + def updateOrderRef(self, direction, openPrice, orderRef): + """更新网格的orderId""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + if x.openPrice == openPrice: + x.orderRef = orderRef + x.orderStatus = True + + if direction == DIRECTION_LONG: + for x in self.upGrids: + if x.openPrice == openPrice: + x.orderRef = orderRef + x.orderStatus = True + + def cancelOrderRef(self,direction, openPrice): + """网格撤单""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + if x.openPrice == openPrice and x.orderRef != EMPTY_STRING and x.orderStatus==True and x.openStatus==False: + x.orderRef = EMPTY_STRING + x.orderStatus = False + self.writeCtaLog(u'下网格撤单[{0}]'.format(x.openPrice)) + + if direction == DIRECTION_LONG: + for x in self.upGrids: + if x.openPrice == openPrice and x.orderRef != EMPTY_STRING and x.orderStatus==True and x.openStatus==False: + x.orderRef = EMPTY_STRING + x.orderStatus = False + self.writeCtaLog(u'上网格撤单[{0}]'.format(x.openPrice)) + + def getGridbyOpenPrice(self, direction, openPrice, orderRef = EMPTY_STRING): + """通过开仓价和委托状态获取网格""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + # 优先匹配价格 + if x.orderRef == orderRef and x.openPrice == openPrice: + return x + + if direction == DIRECTION_LONG: + for x in self.upGrids: + # 优先匹配价格 + if x.orderRef == orderRef and x.openPrice == openPrice: + return x + + self.writeCtaLog(u'异常,getGridbyOpenPrice找不到网格[{0},openprice={1},orderRef={2}]'.format(direction, openPrice, orderRef)) + return None + + def getGrid(self, direction, openPrice=EMPTY_FLOAT, closePrice=EMPTY_FLOAT, orderRef=EMPTY_STRING, t=EMPTY_STRING): + """获取网格""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + # 优先匹配价格 + if t == u'OpenPrice' and x.openPrice == openPrice: + return x + elif t == u'ClosePrice' and x.closePrice == closePrice: + return x + elif t == u'OrderRef' and x.orderRef == orderRef: + return x + + if direction == DIRECTION_LONG: + for x in self.upGrids: + # 优先匹配价格 + if t == u'OpenPrice' and x.openPrice == openPrice: + return x + elif t == u'ClosePrice' and x.closePrice == closePrice: + return x + elif t == u'OrderRef' and x.orderRef == orderRef: + return x + + self.writeCtaLog(u'异常,getGrid找不到网格[direction={0},oepnPrice={1},closePrice={2},orderRef={3},t={4}]'.format(direction, openPrice, closePrice, orderRef, t)) + return None + + def updateClosePrice(self, direction, closePrice=EMPTY_FLOAT, type=EMPTY_STRING): + """获取网格""" + # if num(opened Grids) <= 5: set closePrice to 0 + # else: set closePrice to the specified one (should be the closePrice of the newest Grid) + numChanged = 0 + newPrice = EMPTY_FLOAT + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + if x.type == type and x.openStatus is True: + x.closePrice = newPrice + numChanged += 1 + if numChanged >= self.gridBufferLength: + newPrice = closePrice + for x in self.dnGrids: + if x.type == type and x.openStatus is True: + x.closePrice = newPrice + + if direction == DIRECTION_LONG: + for x in self.upGrids: + if x.type == type and x.openStatus is True: + x.closePrice = newPrice + numChanged += 1 + if numChanged >= self.gridBufferLength: + newPrice = closePrice + for x in self.upGrids: + if x.type == type and x.openStatus is True: + x.closePrice = newPrice + + self.writeCtaLog(u'updateClosePrice() {}: update closePrice to {} for all opened grids({})'.format(direction, newPrice, numChanged)) + + def getFirstLastGrid(self, direction,type = EMPTY_STRING): + """获取最前/后一个的网格""" + # 做多网格:,first =开仓价最高一个,last= 最低一个 + if direction == DIRECTION_LONG: + short_grids = self.getGridsWithTypes(direction=direction, types=[type]) + if short_grids is None or len(short_grids) ==0 : + return None, None + + if len(short_grids) == 1: + return short_grids[0],short_grids[0] + + # 价格由低至高排列 + sortedGrids = sorted(short_grids, key=lambda g:g.openPrice) + return sortedGrids[-1], sortedGrids[0] + + # 做空网格: first =最低一个,last= 开仓价最高一个 + if direction == DIRECTION_SHORT: + long_grids = self.getGridsWithTypes(direction=direction, types=[type]) + if long_grids is None or len(long_grids) ==0: + return None, None + + if len(long_grids) == 1: + return long_grids[0], long_grids[0] + + sortedGrids = sorted(long_grids, key=lambda g: g.openPrice) + return sortedGrids[0], sortedGrids[-1] + + return None,None + + def getLastOpenedGrid(self, direction,type = EMPTY_STRING,orderby_asc=True): + """获取最后一个开仓的网格""" + if direction == DIRECTION_LONG: + opened_long_grids = self.getGrids(direction=direction, opened=True,type=type) + if opened_long_grids is None or len(opened_long_grids) ==0 : + return None + if len(opened_long_grids) > 1: + sortedGrids = sorted(opened_long_grids, key=lambda g:g.openPrice) + if orderby_asc: + # 取价格最高的一格 + opened_long_grids = sortedGrids[-1:] + else: + # 取价格最低的一格 + opened_long_grids = sortedGrids[0:1] + + return opened_long_grids[0] + + if direction == DIRECTION_SHORT: + opened_short_grids = self.getGrids(direction=direction, opened=True,type=type) + if opened_short_grids is None or len(opened_short_grids) ==0: + return None + if len(opened_short_grids) > 1: + sortedGrids = sorted(opened_short_grids, key=lambda g: g.openPrice) + if orderby_asc: + # 取价格最低的一格 + opened_short_grids = sortedGrids[0:1] + else: + # 取价格最高的一格 + opened_short_grids = sortedGrids[-1:] + + return opened_short_grids[0] + + def closeGrid(self, direction, closePrice, closeVolume): + """网格交易结束""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids: + if x.closePrice == closePrice and x.openStatus and x.volume == closeVolume: + self.writeCtaLog(u'下网格交易结束[{0}->{1}],仓位:{2},移除网格'.format(x.openPrice, x.closePrice,closeVolume)) + self.dnGrids.remove(x) + return + + if x.closePrice == closePrice and x.openStatus and x.volume > closeVolume: + self.writeCtaLog(u'下网格交易部分结束[{0}->{1}],减少仓位:{2}'.format(x.openPrice, x.closePrice,closeVolume)) + x.volume = x.volume - closeVolume + + if x.closePrice == closePrice and x.openStatus and x.volume < closeVolume: + self.writeCtaLog(u'下网格交易结束[{0}->{1}],移除网格,剩余仓位:{2}'.format(x.openPrice, x.closePrice, closeVolume-x.volume)) + closeVolume = closeVolume - x.volume + self.dnGrids.remove(x) + + if direction == DIRECTION_LONG: + for x in self.upGrids: + if x.closePrice == closePrice and x.openStatus and x.volume == closeVolume: + self.writeCtaLog(u'上网格交易结束[{0}->{1}],仓位:{2},移除网格'.format(x.openPrice, x.closePrice,closeVolume)) + self.upGrids.remove(x) + return + + if x.closePrice == closePrice and x.openStatus and x.volume > closeVolume: + self.writeCtaLog(u'上网格交易结束[{0}->{1}],仓位减少:{2}'.format(x.openPrice, x.closePrice,closeVolume)) + x.volume = x.volume - closeVolume + + if x.closePrice == closePrice and x.openStatus and x.volume < closeVolume: + self.writeCtaLog(u'上网格交易结束[{0}->{1}],移除网格,剩余仓位:{2}'.format(x.openPrice, x.closePrice,closeVolume-x.volume)) + closeVolume = closeVolume - x.volume + self.upGrids.remove(x) + + def removeGridById(self,direction, id): + """移除指定id的网格""" + if id == EMPTY_STRING or len(id) <1: + return + if direction == DIRECTION_SHORT: + for x in self.dnGrids[:]: + if x.id == id: + self.writeCtaLog(u'清除下网格[open={},close={},stop={},volume={}]'.format(x.openPrice,x.closePrice,x.stopPrice,x.volume)) + self.dnGrids.remove(x) + + if direction == DIRECTION_LONG: + for x in self.upGrids[:]: + if x.id == id: + self.writeCtaLog(u'清除上网格[open={},close={},stop={},volume={}]'.format(x.openPrice,x.closePrice,x.stopPrice,x.volume)) + self.upGrids.remove(x) + + def removeGrids(self, direction, priceline, type=EMPTY_STRING): + """清除价格线以下的网格""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids[:]: + if x.openPrice < priceline and not x.orderStatus and not x.openStatus and not x.closeStatus and x.type==type: + self.writeCtaLog(u'清除下网格[open={0}]'.format(x.openPrice)) + self.dnGrids.remove(x) + + if direction == DIRECTION_LONG: + for x in self.upGrids[:]: + if x.openPrice > priceline and not x.orderStatus and not x.openStatus and not x.closeStatus and x.type==type: + self.writeCtaLog(u'清除上网格[open={0}]'.format(x.openPrice)) + self.upGrids.remove(x) + + def moveGrids(self, direction, pricedelta, type=EMPTY_STRING): + """按pricedelta平移所有网格""" + if direction == DIRECTION_SHORT: + for x in self.dnGrids[:]: + x.openPrice += pricedelta # 开仓价格 + if x.closePrice != 0: + x.closePrice += pricedelta # 平仓价格 + x.stopPrice += pricedelta # 止损价格 + x.type = type # 网格类型标签 + # self.openPrices = {} # 套利使用,开仓价格,symbol:price + + if direction == DIRECTION_LONG: + for x in self.upGrids[:]: + x.openPrice += pricedelta # 开仓价格 + if x.closePrice != 0: + x.closePrice += pricedelta # 平仓价格 + x.stopPrice += pricedelta # 止损价格 + x.type = type # 网格类型标签 + # self.openPrices = {} # 套利使用,开仓价格,symbol:price + + def rebuildGrids(self, direction, upline=EMPTY_FLOAT, dnline=EMPTY_FLOAT, midline=EMPTY_FLOAT, upRate=1, dnRate=1, reuse=False, useVariableSteps=False): + """重新拉网 + 清除未挂单的网格, + 在上轨/下轨位置重新挂单 + upRate , 上轨网格高度比率 + dnRate, 下轨网格高度比率 + """ + result = True + newupline = upline + newdnline = dnline + if self.fixedGrids is True: + if abs(self.fixedGridInitPrice - upline) % self.gridHeight > 0: + newupline = upline - abs(self.fixedGridInitPrice - upline) % self.gridHeight + 2*self.gridHeight # ceil(current value, gridHeight) + gridHeight + newdnline = dnline - abs(self.fixedGridInitPrice - dnline) % self.gridHeight - self.gridHeight # floor(current value, gridHeight) - gridHeight + else: + newupline = upline + self.gridHeight + newdnline = dnline - self.gridHeight + + if direction == DIRECTION_SHORT: + self.writeCtaLog(u'DEBUG- rebuildGrids Short, 重新拉网:direction:{},upline:{}({}),dnline:{}({})'.format(direction, upline, newupline, dnline, newdnline)) + else: + self.writeCtaLog(u'DEBUG- rebuildGrids Long, 重新拉网:direction:{},upline:{}({}),dnline:{}({})'.format(direction, upline, newupline, dnline, newdnline)) + uplineDelta = newupline - upline + dnlineDelta = newdnline - dnline + upline = newupline + dnline = newdnline + + # 检查上下网格的高度比率,不能低于0.5 + if upRate < 0.5 or dnRate < 0.5: + upRate = max(0.5, upRate) + dnRate = max(0.5, dnRate) + + # 计算每个网格的高度。如果使用变高的网格,则每过5格把网格搞的增加(self.gridHeight/2) + gridSteps = [0]*self.maxLots + for i in range(1, self.maxLots, 1): + if useVariableSteps == False: + gridSteps[i] = self.gridHeight * i + else: + j = int(i / 5) + gridSteps[i] = gridSteps[i-1] + self.gridHeight + self.gridHeight / 2 * j + + # 重建下网格(向下移动开仓的网格) + if direction == DIRECTION_SHORT: + min_long_price = midline + remove_grids = [] + opened_grids = [] + temp_dnGrids = [] + if self.fixedGrids is True: + # 如果价格没变,不需要重新布网格 + if dnline == self.dnGrids[0].openPrice: + self.writeCtaLog(u'DEBUG- rebuildGrids Short, dnline not changed, no need to rebuild.') + result = False + return result + + # 重建的网格数量(所有网格) + remainLots = 0 + lots = self.maxLots - remainLots + self.writeCtaLog(u'需要重建的网格数量:{0},起点:{1}'.format(lots, dnline)) + if lots > 0: + for i in range(0, lots, 1): + # 做空,开仓价为下阻力线-网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 + open_price = int((dnline - gridSteps[i+remainLots] * dnRate) / self.minDiff ) * self.minDiff + close_price = int((open_price + self.gridWin * dnRate)/self.minDiff) * self.minDiff + + grid = CtaGrid(direction=DIRECTION_SHORT, + openprice=open_price, + closeprice=close_price, + volume=self.volume*self.getVolumeRate(remainLots + i)) + grid.reuse = reuse + temp_dnGrids.append(grid) + + self.writeCtaLog(u'重新拉下网格:[{0}~{1}]'.format(dnline, dnline - gridSteps[-1] * dnRate)) + + # 移除旧的下网格,保留开仓的网格状态 + for m in range(0, len(self.dnGrids)): + x = self.dnGrids[m] + if not x.orderStatus and not x.openStatus and not x.closeStatus: + if len(remove_grids) < 6: + remove_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + else: + if len(opened_grids) < 6: + opened_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + temp_dnGrids[m].orderStatus = x.orderStatus + temp_dnGrids[m].orderStatus = x.orderStatus + temp_dnGrids[m].volume = x.volume + temp_dnGrids[m].tradedVolume = x.tradedVolume + temp_dnGrids[m].orderStatus = x.orderStatus + temp_dnGrids[m].orderRef = x.orderRef + temp_dnGrids[m].openStatus = x.openStatus + temp_dnGrids[m].closeStatus = x.closeStatus + temp_dnGrids[m].openDatetime = x.openDatetime + temp_dnGrids[m].orderDatetime = x.orderDatetime + temp_dnGrids[m].lockGrids = x.lockGrids + temp_dnGrids[m].reuse = x.reuse + temp_dnGrids[m].type = x.type + temp_dnGrids[m].openPrices = x.openPrices + temp_dnGrids[m].snapshot = x.snapshot + if x.closePrice > 0: + temp_dnGrids[m].closePrice = x.closePrice + dnlineDelta + else: + temp_dnGrids[m].closePrice = 0 + + if len(remove_grids) > 0: + self.writeCtaLog(u'清除下网格[{}]'.format(remove_grids)) + if len(opened_grids) > 0: + self.writeCtaLog(u'保留下网格[{}]'.format(opened_grids)) + + for x in self.dnGrids[:]: + self.dnGrids.remove(x) + # self.dnGrids.clear() + self.dnGrids = temp_dnGrids + + self.writeCtaLog(u'DEBUG- rebuildGrids Short: lots:{},upline:{},dnline:{} [{}~{}]'.format(lots, upline, dnline, dnline, dnline - gridSteps[-1] * dnRate)) + + else: + # 移除未挂单的下网格 + for x in self.dnGrids[:]: + if not x.orderStatus and not x.openStatus and not x.closeStatus: + if len(remove_grids) < 6: + remove_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + self.dnGrids.remove(x) + else: + if len(opened_grids) < 6: + opened_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + if x.openPrice < min_long_price: + min_long_price = x.openPrice + + if len(remove_grids) > 0: + self.writeCtaLog(u'清除下网格[{}]'.format(remove_grids)) + if len(opened_grids) > 0: + self.writeCtaLog(u'保留下网格[{}]'.format(opened_grids)) + + # 需要重建的剩余网格数量 + remainLots = len(self.dnGrids) + lots = self.maxLots - remainLots + remainLots = 0 # WJ: correction for the rebuild price + + dnline = min(dnline, min_long_price-self.gridHeight*dnRate) + self.writeCtaLog(u'需要重建的网格数量:{0},起点:{1}'.format(lots, dnline)) + + if lots > 0: + for i in range(0, lots, 1): + # 做空,开仓价为下阻力线-网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 + open_price = int((dnline - gridSteps[i+remainLots] * dnRate) / self.minDiff ) * self.minDiff + close_price = int((open_price + self.gridWin * dnRate)/self.minDiff) * self.minDiff + + grid = CtaGrid(direction=DIRECTION_SHORT, + openprice=open_price, + closeprice=close_price, + volume=self.volume*self.getVolumeRate(remainLots + i)) + grid.reuse = reuse + self.dnGrids.append(grid) + + self.writeCtaLog(u'重新拉下网格:[{0}~{1}]'.format(dnline, dnline - gridSteps[-1] * dnRate)) + self.writeCtaLog(u'DEBUG- rebuildGrids Short, lots:{},upline:{},dnline:{} [{}~{}]'.format(lots, upline, dnline, dnline, dnline - gridSteps[-1] * dnRate)) + + # 重建上网格(向上移动开仓的网格) + if direction == DIRECTION_LONG: + max_short_price = midline # 最高开空价 + remove_grids = [] # 移除的网格列表 + opened_grids = [] # 已开仓的网格列表 temp_dnGrids = {} + temp_upGrids = [] + if self.fixedGrids is True: + # 如果价格没变,不需要重新布网格 + if upline == self.upGrids[0].openPrice: + self.writeCtaLog(u'DEBUG- rebuildGrids Long, upline not changed, no need to rebuild.') + result = False + return result + + # 重建的网格数量(所有网格) + remainLots = 0 + lots = self.maxLots - remainLots + self.writeCtaLog(u'需要重建的网格数量:{0},起点:{1}'.format(lots, upline)) + if lots > 0: + # 做多,开仓价为上阻力线+网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 + for i in range(0, lots, 1): + open_price = int((upline + gridSteps[i+remainLots] * upRate) / self.minDiff) * self.minDiff + close_price = int((open_price - self.gridWin * upRate) / self.minDiff) * self.minDiff + + grid = CtaGrid(direction=DIRECTION_LONG, + openprice=open_price, + closeprice=close_price, + volume=self.volume*self.getVolumeRate(remainLots + i)) + grid.reuse = reuse + temp_upGrids.append(grid) + + self.writeCtaLog(u'重新拉上网格:[{0}~{1}]'.format(upline, upline + gridSteps[-1] * upRate)) + + # 移除旧的上网格,保留开仓的网格状态 + for m in range(0, len(self.upGrids)): + x = self.upGrids[m] + if not x.orderStatus and not x.openStatus and not x.closeStatus: + if len(remove_grids) < 6: + remove_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + else: + if len(opened_grids) < 6: + opened_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + temp_upGrids[m].orderStatus = x.orderStatus + temp_upGrids[m].orderStatus = x.orderStatus + temp_upGrids[m].volume = x.volume + temp_upGrids[m].tradedVolume = x.tradedVolume + temp_upGrids[m].orderStatus = x.orderStatus + temp_upGrids[m].orderRef = x.orderRef + temp_upGrids[m].openStatus = x.openStatus + temp_upGrids[m].closeStatus = x.closeStatus + temp_upGrids[m].openDatetime = x.openDatetime + temp_upGrids[m].orderDatetime = x.orderDatetime + temp_upGrids[m].lockGrids = x.lockGrids + temp_upGrids[m].reuse = x.reuse + temp_upGrids[m].type = x.type + temp_upGrids[m].openPrices = x.openPrices + temp_upGrids[m].snapshot = x.snapshot + if x.closePrice > 0: + temp_upGrids[m].closePrice = x.closePrice + uplineDelta + else: + temp_upGrids[m].closePrice = 0 + + if len(remove_grids) > 0: + self.writeCtaLog(u'清除上网格[{}]'.format(remove_grids)) + if len(opened_grids) > 0: + self.writeCtaLog(u'保留上网格[{}]'.format(opened_grids)) + + for x in self.upGrids[:]: + self.upGrids.remove(x) + # self.upGrids.clear() + self.upGrids = temp_upGrids + + self.writeCtaLog(u'DEBUG- rebuildGrids Long, lots:{},upline:{},dnline:{} [{}~{}]'.format(lots, upline, dnline, upline, upline + gridSteps[-1] * upRate)) + + + else: + # 移除未挂单的上网格 + for x in self.upGrids[:]: + if not x.orderStatus and not x.openStatus and not x.closeStatus: + if len(remove_grids) < 6: + remove_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + self.upGrids.remove(x) + else: + if len(opened_grids) < 6: + opened_grids.append(u'{}=>{}'.format(x.openPrice, x.closePrice)) + + if x.openPrice > max_short_price: + max_short_price = x.openPrice + + if len(remove_grids) > 0: + self.writeCtaLog(u'清除上网格[{}]'.format(remove_grids)) + if len(opened_grids) > 0: + self.writeCtaLog(u'保留上网格[{}]'.format(opened_grids)) + + # 需要重建的剩余网格数量 + remainLots = len(self.upGrids) + lots = self.maxLots - remainLots + remainLots = 0 # WJ: correction for the rebuild price + + upline = max(upline, max_short_price+self.gridHeight*upRate) + self.writeCtaLog(u'需要重建的网格数量:{0},起点:{1}'.format(lots, upline)) + + if lots > 0: + # 做多,开仓价为上阻力线+网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 + for i in range(0, lots, 1): + open_price = int((upline + gridSteps[i+remainLots] * upRate) / self.minDiff) * self.minDiff + close_price = int((open_price - self.gridWin * upRate) / self.minDiff) * self.minDiff + + grid = CtaGrid(direction=DIRECTION_LONG, + openprice=open_price, + closeprice=close_price, + volume=self.volume*self.getVolumeRate(remainLots + i)) + grid.reuse = reuse + self.upGrids.append(grid) + + self.writeCtaLog(u'重新拉上网格:[{0}~{1}]'.format(upline, upline + gridSteps[-1] * upRate)) + self.writeCtaLog(u'DEBUG- rebuildGrids Long, lots:{},upline:{},dnline:{} [{}~{}]'.format(lots, upline, dnline, upline, upline + gridSteps[-1] * upRate)) + return result + + def recount_avg_open_price(self): + """计算网格的平均开仓价""" + up_open_list = [x for x in self.upGrids if x.openStatus] + + self.max_up_open_price = 0 - sys.maxsize + self.avg_up_open_price = 0 - sys.maxsize + self.min_dn_open_price = sys.maxsize + self.avg_dn_open_price = sys.maxsize + + total_price = EMPTY_FLOAT + total_volume = EMPTY_INT + for x in up_open_list: + self.max_up_open_price = max(self.max_up_open_price, x.openPrice) + total_price += x.openPrice*x.volume + total_volume += x.volume + + if total_volume > 0: + self.avg_up_open_price = total_price/total_volume + + total_price = EMPTY_FLOAT + total_volume = EMPTY_INT + + dn_open_list = [x for x in self.dnGrids if x.openStatus] + for x in dn_open_list: + self.min_dn_open_price = min(self.min_dn_open_price,x.openPrice) + total_price += x.openPrice*x.volume + total_volume += x.volume + + if total_volume > 0: + self.avg_dn_open_price = total_price/total_volume + + def count_avg_open_price(self, grid_list): + """计算平均开仓价""" + total_price = EMPTY_FLOAT + total_volume = EMPTY_INT + avg_price = EMPTY_FLOAT + + for g in grid_list: + total_price += g.openPrice * g.volume + total_volume += g.volume + + if total_volume > EMPTY_INT: + avg_price = total_price / total_volume + return avg_price + + def combineOpenedGrids(self,direction,type=EMPTY_STRING): + """合并已开仓的网格""" + total_open_price = EMPTY_FLOAT + total_close_price = EMPTY_FLOAT + total_volume = EMPTY_INT + saved_grid = None + + if direction == DIRECTION_SHORT: + opened_short_grids = self.getGrids(direction=direction, opened=True, ordered=False, type = type) + + if len(opened_short_grids)<=1: + return + self.writeCtaLog(u'{}个空网格合并为1个'.format(len(opened_short_grids))) + saved_grid = opened_short_grids[-1] + + for g in opened_short_grids: + total_open_price += g.openPrice * g.volume + total_close_price += g.closePrice * g.volume + total_volume += g.volume + if g != saved_grid: + self.writeCtaLog(u'删除空网格 {}=>{},v:{}'.format(g.openPrice, g.closePrice, g.volume)) + self.upGrids.remove(g) + else: + self.writeCtaLog(u'保留空网格 {}=>{},v:{}'.format(g.openPrice, g.closePrice, g.volume)) + + # 更新网格的开仓价和仓位数量 + saved_grid.openPrice = int((total_open_price / total_volume)/self.minDiff)*self.minDiff + saved_grid.volume = total_volume + saved_grid.closePrice = int((total_close_price / total_volume)/self.minDiff)*self.minDiff + + self.writeCtaLog(u'合并后空网格为{}=>{},v:{}'.format(saved_grid.openPrice, saved_grid.closePrice, saved_grid.volume)) + + elif direction == DIRECTION_LONG: + opened_long_grids = self.getGrids(direction=direction, opened=True, ordered=False, type=type) + + if len(opened_long_grids) <= 1: + return + self.writeCtaLog(u'{}个多网格合并为1个'.format(len(opened_long_grids))) + saved_grid = opened_long_grids[-1] + + for g in opened_long_grids: + total_open_price += g.openPrice * g.volume + total_close_price += g.closePrice * g.volume + total_volume += g.volume + if g != saved_grid: + self.writeCtaLog(u'删除多网格 {}=>{},v:{}'.format(g.openPrice, g.closePrice, g.volume)) + self.dnGrids.remove(g) + else: + self.writeCtaLog(u'保留多网格 {}=>{},v:{}'.format(g.openPrice, g.closePrice, g.volume)) + + # 更新网格的开仓价和仓位数量 + saved_grid.openPrice = int((total_open_price / total_volume) / self.minDiff) * self.minDiff + saved_grid.volume = total_volume + saved_grid.closePrice = int((total_close_price / total_volume) / self.minDiff) * self.minDiff + + self.writeCtaLog( + u'合并后多网格为{}=>{},v:{}'.format(saved_grid.openPrice, saved_grid.closePrice, saved_grid.volume)) + + def clearDuplicateGrids(self,direction=EMPTY_STRING,type=EMPTY_STRING): + """去除重复开仓价的未开仓网格""" + + if direction == DIRECTION_SHORT or direction==EMPTY_STRING: + if len(self.upGrids) < 2: + return + checking_grids = self.getGrids(direction=DIRECTION_SHORT, opened=False,ordered=False,type=type) + + if len(checking_grids) < 2: + return + + open_price_list = [] + remove_grids = [] + + for g in checking_grids: + if g.openPrice in open_price_list: + remove_grids.append(g) + continue + + open_price_list.append(g.openPrice) + + for rg in remove_grids: + try: + self.upGrids.remove(rg) + except: + pass + + if direction == DIRECTION_LONG or direction==EMPTY_STRING: + if len(self.dnGrids) < 2: + return + checking_grids = self.getGrids(direction=DIRECTION_LONG, opened=False, ordered=False, type=type) + + if len(checking_grids) < 2: + return + + open_price_list = [] + remove_grids = [] + for g in checking_grids: + if g.openPrice in open_price_list: + remove_grids.append(g) + continue + + open_price_list.append(g.openPrice) + + for rg in remove_grids: + try: + self.dnGrids.remove(rg) + except: + pass + + def save(self, direction=None): + """ + 保存网格至本地Json文件" + 2017/11/23 update: 保存时,空的列表也保存 + :param direction: + :return: + """"" + + # 更新开仓均价 + self.recount_avg_open_price() + grids_save_path = self.get_data_folder() + + # 确保json名字与策略一致 + if self.jsonName != self.strategy.name: + self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name)) + self.jsonName = self.strategy.name + + # 移除旧版上/下网格列表 + old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName)) + old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName)) + if os.path.exists(old_up_json_file): + try: + os.remove(old_up_json_file) + except: + pass + + if os.path.exists(old_dn_json_file): + try: + os.remove(old_dn_json_file) + except: + pass + + # 新版网格持久化文件 + grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName)) + self.json_file_path = grid_json_file + + data = {} + up_grids = [] + for grid in self.upGrids: + up_grids.append(grid.toJson()) + dn_grids = [] + for grid in self.dnGrids: + dn_grids.append(grid.toJson()) + data[u'up_grids'] = up_grids + data[u'dn_grids'] = dn_grids + + with open(grid_json_file, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) + + self.writeCtaLog(u'GrideTrade保存文件{}完成'.format(grid_json_file)) + def load(self, direction, openStatusFilter=[]): """ 加载本地Json至网格 @@ -1164,4 +2556,264 @@ class CtaGridTrade(object): return type_list +ARBITRAGE_LONG = u'正套' +ARBITRAGE_SHORT = u'反套' +class ArbitrageGrid(object): + """套利网格""" + def __init__(self,direction, openprice, closeprice, stopprice=EMPTY_FLOAT, type=EMPTY_STRING): + self.leg1 = None + self.leg2 = None + + self.id = str(uuid.uuid1()) + self.direction = direction # 正套(ARBITRAGE_LONG) 反套(ARBITRAGE_SHORT) + self.openPrice = openprice # 开仓价格/价比 + self.closePrice = closeprice # 平仓价格/价比 + self.stopPrice = stopprice # 止损价格/价比 + self.type = type # 套利类型(自定义) + self.snapshot = {} + + def update_leg1(self,grid): + """ + 添加腿1 + :param grid: + :return: + """ + if isinstance(grid, CtaGrid): + self.leg1 = grid + else: + print(u'leg1 不是CtaGrid类型') + + def update_leg2(self, grid): + """ + 添加腿2 + :param grid: + :return: + """ + if isinstance(grid, CtaGrid): + self.leg2 = grid + else: + print(u'leg2 不是CtaGrid类型') + + def toJson(self): + j = OrderedDict() + + j['id'] = self.id + j['direction'] = self.direction + j['openPrices'] = self.openPrice + j['closePrice'] = self.closePrice + j['stopPrice'] = self.stopPrice + j['type'] = self.type + j['snapshot'] = self.snapshot # 切片数据 + + try: + if self.leg1 is not None: + j['leg1'] = self.leg1.toJson() + + if self.leg2 is not None: + j['leg2'] = self.leg2.toJson() + except Exception as ex: + print(u'Arbitrage Grid toJson exception:{} {}'.format(str(ex), traceback.format_exc()),file=sys.stderr) + + return j + + def fromJson(self,j): + if 'id' in j: + self.id = j.get('id') + self.direction = j.get('direction',EMPTY_STRING) + self.openPrice = j.get('openPrice',EMPTY_FLOAT) + self.closePrice = j.get('closePrice',EMPTY_FLOAT) + self.stopPrice = j.get('stopPrice',EMPTY_FLOAT) + self.type = j.get('type',EMPTY_STRING) + self.snapshot = j.get('snapshot',{}) + + if 'leg1' in j: + if self.leg1 is None: + self.leg1 = CtaGrid(direction=EMPTY_STRING,openprice=EMPTY_FLOAT,closeprice=EMPTY_FLOAT) + self.leg1.fromJson(j.get('leg1')) + + if 'leg2' in j: + if self.leg2 is None: + self.leg2 = CtaGrid(direction=EMPTY_STRING,openprice=EMPTY_FLOAT,closeprice=EMPTY_FLOAT) + self.leg2.fromJson(j.get('leg2')) + + +class ArbitrageTrade(object): + """ + 套利交易网格,仅用于持久化记录价差/价比/跨市场/期现套利等 + 它包含正套网格/反套网格两个队列 + """ + + def __init__(self, strategy, leg1_settings, leg2_settings): + """ + 构造函数 + :param strategy: 上层调用策略 + """ + self.strategy = strategy + # 交易合约 + self.leg1_symbol = leg1_settings.get('vtSymbol', EMPTY_STRING) + self.leg2_symbol = leg2_settings.get('vtSymbol', EMPTY_STRING) + + # 交易合约的杠杆比率 + self.leg1_size = leg1_settings.get('size', 1) + self.leg2_size = leg2_settings.get('size', 1) + + # 正套队列 + self.long_list = [] + + # 反套队列 + self.short_list = [] + + def writeCtaLog(self, log): + """ + 写入日志 + :param log: + :return: + """ + if self.strategy and hasattr(self.strategy,'writeCtaLog'): + self.strategy.writeCtaLog(log) + else: + print(log) + + def writeCtaError(self, log): + """ + 写入错误日志 + :param log: + :return: + """ + if self.strategy and hasattr(self.strategy, 'writeCtaError'): + self.strategy.writeCtaError(log) + else: + print(log,file=sys.stderr) + + def toJson(self): + """ + => json object + :return: + """ + j = OrderedDict() + + j['leg1_symbol'] = self.leg1_symbol + j['leg1_size'] = self.leg1_size + j['long_list'] = [g.toJson() for g in self.long_list] + + j['leg2_symbol'] = self.leg2_symbol + j['leg2_size'] = self.leg2_size + j['short_list'] = [g.toJson() for g in self.short_list] + return j + + def fromJson(self,j): + """ + 从Json格式恢复数据 + :param j: + :return: + """ + self.writeCtaLog(u'数据将从Json恢复') + self.leg1_symbol = j.get('leg1_symbol',EMPTY_STRING) + self.leg2_symbol = j.get('leg2_symbol',EMPTY_STRING) + + self.leg1_size = j.get('leg1_size',1) + self.leg2_size = j.get('leg2_size',1) + + self.long_list = [] + for long_json in j.get('long_list',[]): + g = ArbitrageGrid(direction=ARBITRAGE_LONG,openprice=long_json.get('openPrice',EMPTY_FLOAT),closeprice=long_json.get('closePrice',EMPTY_FLOAT)) + g.fromJson(long_json) + self.long_list.append(g) + + self.short_list = [] + for short_json in j.get('short_list', []): + g = ArbitrageGrid(direction=ARBITRAGE_SHORT, openprice=short_json.get('openPrice', EMPTY_FLOAT), + closeprice=short_json.get('closePrice', EMPTY_FLOAT)) + g.fromJson(short_json) + self.short_list.append(g) + + self.writeCtaLog(u'数据恢复完毕') + + def get_data_folder(self): + """获取数据目录""" + # 工作目录 + currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) + if os.path.isdir(currentFolder): + # 如果工作目录下,存在data子目录,就使用data子目录 + return currentFolder + else: + # 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data + currentFolder = os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) + if os.path.exists(currentFolder): + if os.path.isdir(currentFolder): + return currentFolder + else: + return os.path.dirname(__file__) + else: + os.mkdir(currentFolder) + return currentFolder + + def save(self,db=EMPTY_STRING): + """ + 持久化到json文件 + :return: + """ + if not self.strategy: + self.writeCtaError(u'策略对象为空,不能保存') + return + + json_file = os.path.abspath( + os.path.join(self.get_data_folder(), u'{}_AGrids.json'.format(self.strategy.name))) + + try: + json_data = self.toJson() + + with open(json_file, 'w') as f: + data = json.dumps(json_data, indent=4) + f.write(data) + + except IOError as ex: + self.writeCtaError(u'写入AGrids文件{}出错,ex:{}'.format(json_file, str(ex))) + + def load(self,db=EMPTY_STRING): + """ + 数据从Json文件加载 + :return: + """ + if not self.strategy: + self.writeCtaError(u'策略对象为空,不能加载') + return + + json_file = os.path.abspath(os.path.join(self.get_data_folder(), u'{}_AGrids.json'.format(self.strategy.name))) + + json_data = {} + if os.path.exists(json_file): + try: + with open(json_file, 'r', encoding='utf8') as f: + # 解析json文件 + json_data = json.load(f) + except IOError as ex: + self.writeCtaError(u'读取AGrids文件{}出错,ex:{}'.format(json_file, str(ex))) + json_data = {} + + # 从持久化文件恢复数据 + self.fromJson(json_data) + + def addGrid(self,grid): + """ + 添加正套/反套网格 + :param grid: + :return: + """ + if not isinstance(grid,ArbitrageGrid): + self.writeCtaError(u'添加网格不是套利网格ArbitrageGrid类型') + return + + if grid.direction == ARBITRAGE_LONG: + if grid.id in [g.id for g in self.long_list]: + self.writeCtaError('添加{}网格 id{}已存在,不能添加'.format(ARBITRAGE_LONG, grid.id)) + return + self.long_list.append(grid) + return + + if grid.direction == ARBITRAGE_SHORT: + if grid.id in [g.id for g in self.short_list]: + self.writeCtaError(u'添加{}网格 id{}已存在,不能添加'.format(ARBITRAGE_SHORT, grid.id)) + return + self.short_list.append(grid) diff --git a/vnpy/trader/app/ctaStrategy/ctaLineBar.py b/vnpy/trader/app/ctaStrategy/ctaLineBar.py index 4918c6a5..e17e222c 100644 --- a/vnpy/trader/app/ctaStrategy/ctaLineBar.py +++ b/vnpy/trader/app/ctaStrategy/ctaLineBar.py @@ -26,7 +26,7 @@ PERIOD_MINUTE = 'minute' # 分钟级别周期 PERIOD_HOUR = 'hour' # 小时级别周期 PERIOD_DAY = 'day' # 日级别周期 -def getCtaBarByType(bar_type): +def getCtaBarClass(bar_type): assert isinstance(bar_type,str) if bar_type == PERIOD_SECOND: return CtaLineBar @@ -39,6 +39,7 @@ def getCtaBarByType(bar_type): raise Exception('no matched CTA bar type:{}'.format(bar_type)) + class CtaLineBar(object): """CTA K线""" """ 使用方法: @@ -125,6 +126,8 @@ class CtaLineBar(object): self.lastTick = None self.curTradingDay = EMPTY_STRING + self.cur_price = EMPTY_FLOAT + # K线保存数据 self.bar = None # K线数据对象 self.lineBar = [] # K线缓存数据队列 @@ -472,16 +475,18 @@ class CtaLineBar(object): self.writeCtaLog(u'竞价排名tick时间:{0}'.format(tick.datetime)) return - self.curTick = tick + self.curTick = copy.copy(tick) + if self.curTick.lastPrice is None or self.curTick.lastPrice == 0: + self.curTick.lastPrice = (self.curTick.askPrice1 + self.curTick.bidPrice1) / 2 + self.cur_price = (self.curTick.askPrice1 + self.curTick.bidPrice1) / 2 + else: + self.cur_price = self.curTick.lastPrice # 3.生成x K线,若形成新Bar,则触发OnBar事件 self.drawLineBar(tick) # 更新curPeriod的High,low if self.curPeriod is not None: - if self.curTick.lastPrice is None: - self.curTick.lastPrice = (self.curTick.askPrice1 + self.curTick.bidPrice1) / 2 - self.curPeriod.onPrice(self.curTick.lastPrice) # 4.执行 bar内计算 @@ -498,6 +503,9 @@ class CtaLineBar(object): """ l1 = len(self.lineBar) + # 更新最后价格 + self.cur_price = bar.close + if l1 == 0: new_bar = copy.copy(bar) self.lineBar.append(new_bar) @@ -2696,13 +2704,13 @@ class CtaLineBar(object): self.skd_last_cross = (self.lineSK[-1] + self.lineSK[-2] + self.lineSD[-1] + self.lineSD[-2])/4 self.skd_rt_count = self.skd_count self.skd_rt_last_cross = self.skd_last_cross - if self.skd_rt_cross_price == 0 or self.lineBar[-1].close < self.skd_rt_cross_price: - self.skd_rt_cross_price = self.lineBar[-1].close - self.skd_cross_price = min(self.lineBar[-1].close ,self.skd_rt_cross_price) + if self.skd_rt_cross_price == 0 or self.cur_price < self.skd_rt_cross_price: + self.skd_rt_cross_price = self.cur_price + self.skd_cross_price = self.cur_price if self.skd_divergence < 0: # 若原来是顶背离,消失 self.skd_divergence = 0 - + self.writeCtaLog(u'{} Gold Cross:{} at {}'.format(self.name, self.skd_last_cross, self.skd_cross_price)) else: #if self.lineSK[-1] < self.lineSK[-2]: # 延续死叉 self.skd_count -= 1 @@ -2722,14 +2730,16 @@ class CtaLineBar(object): self.skd_last_cross = (self.lineSK[-1] + self.lineSK[-2] + self.lineSD[-1] + self.lineSD[-2]) / 4 self.skd_rt_count = self.skd_count self.skd_rt_last_cross = self.skd_last_cross - if self.skd_rt_cross_price == 0 or self.lineBar[-1].close > self.skd_rt_cross_price: - self.skd_rt_cross_price = self.lineBar[-1].close - self.skd_cross_price = max(self.lineBar[-1].close, self.skd_rt_cross_price) + if self.skd_rt_cross_price == 0 or self.cur_price > self.skd_rt_cross_price: + self.skd_rt_cross_price = self.cur_price + self.skd_cross_price = self.cur_price # 若原来是底背离,消失 if self.skd_divergence > 0: self.skd_divergence = 0 + self.writeCtaLog(u'{} Dead Cross:{} at {}'.format(self.name, self.skd_last_cross, self.skd_cross_price)) + else: #if self.lineSK[-1] > self.lineSK[-2]: # 延续金叉 self.skd_count += 1 @@ -2909,7 +2919,9 @@ class CtaLineBar(object): if self.skd_rt_count >= 0: self.skd_rt_count = -1 self.skd_rt_last_cross = skd_last_cross - self.skd_rt_cross_price = self.lineBar[-1].close + self.skd_rt_cross_price = self.cur_price + self.writeCtaLog(u'{} rt Dead Cross at:{} ,price:{}' + .format(self.name, self.skd_rt_last_cross, self.skd_rt_cross_price)) if skd_last_cross > high_skd: return True @@ -2942,8 +2954,9 @@ class CtaLineBar(object): if self.skd_rt_count <=0: self.skd_rt_count = 1 self.skd_rt_last_cross = skd_last_cross - self.skd_rt_cross_price = self.lineBar[-1].close - + self.skd_rt_cross_price = self.cur_price + self.writeCtaLog(u'{} rt Gold Cross at:{} ,price:{}' + .format(self.name, self.skd_rt_last_cross,self.skd_rt_cross_price)) if skd_last_cross < low_skd: return True @@ -3116,6 +3129,82 @@ class CtaLineBar(object): return None return None + def is_shadow_line(self,open, high, low, close, direction, shadow_rate, wave_rate): + """ + 是否上影线/下影线 + :param open: 开仓价 + :param high: 最高价 + :param low: 最低价 + :param close: 收盘价 + :param direction: 方向(多/空) + :param shadown_rate: 上影线比例(百分比) + :param wave_rate:振幅(百分比) + :return: + """ + if close<=0 or high <= low or shadow_rate <=0 or wave_rate <=0: + self.writeCtaLog(u'是否上下影线,参数出错.close={}, high={},low={},shadow_rate={},wave_rate={}' + .format(close, high, low, shadow_rate, wave_rate)) + return False + + # 振幅 = 高-低 / 收盘价 百分比 + cur_wave_rate = round(100 * float((high - low) / close), 2) + + # 上涨时,判断上影线 + if direction == DIRECTION_LONG: + # 上影线比例 = 上影线(高- max(开盘,收盘))/ 当日振幅=(高-低) + cur_shadow_rate = round(100 * float((high - max(open,close)) / (high-low)),2) + if cur_wave_rate >= wave_rate and cur_shadow_rate >= shadow_rate: + return True + + # 下跌时,判断下影线 + elif direction == DIRECTION_SHORT: + cur_shadow_rate = round(100 * float((min(open,close)-low) / (high-low)),2) + if cur_wave_rate >= wave_rate and cur_shadow_rate >= shadow_rate: + return True + + return False + + def is_end_tick(self, tick_dt): + """ + 根据短合约和时间,判断是否为最后一个tick + :param tick_dt: + :return: + """ + if self.is_7x24: + return False + + # 中金所,只有11:30 和15:15,才有最后一个tick + if self.shortSymbol in MARKET_ZJ: + if (tick_dt.hour == 11 and tick_dt.minute == 30) or (tick_dt.hour == 15 and tick_dt.minute == 15): + return True + else: + return False + + # 其他合约(上期所/郑商所/大连) + if 2 <= tick_dt.hour < 23: + if (tick_dt.hour == 10 and tick_dt.minute == 15) \ + or (tick_dt.hour == 11 and tick_dt.minute == 30) \ + or (tick_dt.hour == 15 and tick_dt.minute == 00) \ + or (tick_dt.hour == 2 and tick_dt.minute == 30): + return True + else: + return False + + # 夜盘1:30收盘 + if self.shortSymbol in NIGHT_MARKET_SQ2 and tick_dt.hour == 1 and tick_dt.minute == 00: + return True + + # 夜盘23:00收盘 + if self.shortSymbol in NIGHT_MARKET_SQ3 and tick_dt.hour == 23 and tick_dt.minute == 00: + return True + + # 夜盘23:30收盘 + if self.shortSymbol in NIGHT_MARKET_ZZ or self.shortSymbol in NIGHT_MARKET_DL: + if tick_dt.hour == 23 and tick_dt.minute == 30: + return True + + return False + class CtaMinuteBar(CtaLineBar): """ 分钟级别K线 @@ -3209,6 +3298,9 @@ class CtaMinuteBar(CtaLineBar): if bar.tradingDay is None: bar.tradingDay = self.getTradingDate(bar.datetime) + # 更新最后价格 + self.cur_price = bar.close + l1 = len(self.lineBar) if l1 == 0: @@ -3489,6 +3581,9 @@ class CtaHourBar(CtaLineBar): if bar.tradingDay is None: bar.tradingDay = self.getTradingDate(bar.datetime) + # 更新最后价格 + self.cur_price = bar.close + l1 = len(self.lineBar) if l1 == 0: @@ -3567,7 +3662,6 @@ class CtaHourBar(CtaLineBar): endtick = False if not self.is_7x24: # 处理日内的间隔时段最后一个tick,如10:15分,11:30分,15:00 和 2:30分 - if (tick.datetime.hour == 10 and tick.datetime.minute == 15) \ or (tick.datetime.hour == 11 and tick.datetime.minute == 30) \ or (tick.datetime.hour == 15 and tick.datetime.minute == 00) \ @@ -3590,10 +3684,22 @@ class CtaHourBar(CtaLineBar): lastBar = self.lineBar[-1] is_new_bar = False + if self.last_minute is None: + if tick.datetime.second == 0: + self.m1_bars_count += 1 + self.last_minute = tick.datetime.minute + # 不在同一交易日,推入新bar if self.curTradingDay != tick.tradingDay: is_new_bar = True + # 去除分钟和秒数 + tick.datetime = datetime.strptime(tick.datetime.strftime('%Y-%m-%d %H:00:00'), '%Y-%m-%d %H:%M:%S') + tick.time = tick.datetime.strftime('%H:%M:%S') self.curTradingDay = tick.tradingDay + self.writeCtaLog('{} drawLineBar() new_bar,{} curTradingDay:{},tick.tradingDay:{}' + .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), self.curTradingDay, + tick.tradingDay)) + else: # 同一交易日,看分钟是否一致 if tick.datetime.minute != self.last_minute and not endtick: @@ -3602,14 +3708,27 @@ class CtaHourBar(CtaLineBar): if self.is_7x24: if (tick.datetime - lastBar.datetime).total_seconds() >= 3600 * self.barTimeInterval: + self.writeCtaLog('{} drawLineBar() new_bar,{} - {} > 3600 * {} ' + .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), lastBar.datetime.strftime("%Y-%m-%d %H:%M:%S"), + self.barTimeInterval)) is_new_bar = True - if len(tick.tradingDay) >0: + # 去除分钟和秒数 + tick.datetime = datetime.strptime(tick.datetime.strftime('%Y-%m-%d %H:00:00'), '%Y-%m-%d %H:%M:%S') + tick.time = tick.datetime.strftime('%H:%M:%S') + if len(tick.tradingDay) > 0: self.curTradingDay = tick.tradingDay else: self.curTradingDay = tick.date if self.m1_bars_count > 60 * self.barTimeInterval: + self.writeCtaLog('{} drawLineBar() new_bar,{} {} > 60 * {} ' + .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), + self.m1_bars_count, + self.barTimeInterval)) is_new_bar = True + # 去除秒数 + tick.datetime = datetime.strptime(tick.datetime.strftime('%Y-%m-%d %H:%M:00'), '%Y-%m-%d %H:%M:%S') + tick.time = tick.datetime.strftime('%H:%M:%S') if is_new_bar: # 创建并推入新的Bar @@ -3741,6 +3860,9 @@ class CtaDayBar(CtaLineBar): :param bar_freq, bar对象得frequency :return: """ + # 更新最后价格 + self.cur_price = bar.close + l1 = len(self.lineBar) if l1 == 0: diff --git a/vnpy/trader/gateway/binanceGateway/binanceGateway.py b/vnpy/trader/gateway/binanceGateway/binanceGateway.py index 346e219c..75f94f5d 100644 --- a/vnpy/trader/gateway/binanceGateway/binanceGateway.py +++ b/vnpy/trader/gateway/binanceGateway/binanceGateway.py @@ -80,8 +80,6 @@ class BinanceGateway(VtGateway): self.writeLog(u'BINANCE连接配置缺少字段,请检查') return - - self.api_spot.active = True if self.log_message: @@ -96,13 +94,15 @@ class BinanceGateway(VtGateway): sub.symbol = symbol_pair self.subscribe(sub) - self.writeLog(u'{}接口初始化成功'.format(self.gatewayName)) # 启动查询 self.initQuery() self.startQuery() + def checkStatus(self): + return self.connected and self.api_spot.checkStatus() + # ---------------------------------------------------------------------- def subscribe(self, subscribeReq): """订阅行情,自动订阅全部行情,无需实现""" @@ -116,7 +116,7 @@ class BinanceGateway(VtGateway): except Exception as ex: self.writeError(u'send order Exception:{},{}'.format(str(ex),traceback.format_exc())) - #---------------------------------------------------------------------- + # ---------------------------------------------------------------------- def cancelOrder(self, cancelOrderReq): """撤单""" return self.api_spot.cancel(cancelOrderReq) @@ -418,6 +418,23 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs self.gateway.onContract(contract) + def checkStatus(self): + """ + 检查接口状态: + 若active为False,return False + 若active为True,检查ticker得更新时间 + :return: + """ + if not self.active: + return False + + if len(self.tickDict.items())>0: + symbol,last_tick = list(self.tickDict.items())[0] + if last_tick.datetime and (datetime.now()-last_tick.datetime).total_seconds() > 60: + return False + + return True + #---------------------------------------------------------------------- def onAllTicker(self,msg): """币安支持所有 ticker 同时socket过来""" @@ -459,6 +476,8 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs tick.askPrice5, tick.askVolume5 = [0 , 0] tick.datetime , tick.date , tick.time = self.generateDateTime( float(msg["E"])) + utc_dt = datetime.utcfromtimestamp( float(msg["E"])/1e3) + tick.tradingDay = utc_dt.strftime('%Y-%m-%d') self.gateway.onTick(tick) #---------------------------------------------------------------------- @@ -630,7 +649,8 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs self.gateway.writeLog(u'OnDepth {}'.format(traceback.format_exc())) tick.datetime , tick.date, tick.time = self.generateDateTime(uu_time_stamp) - + utc_dt = datetime.utcfromtimestamp(float(uu_time_stamp)/ 1e3) + tick.tradingDay = utc_dt.strftime('%Y-%m-%d') # print tick.__dict__ self.gateway.onTick(tick) #self.gateway.onTick(copy(tick)) @@ -773,6 +793,10 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs ''' #---------------------------------------------------------------------- def onGetAccount(self, data, req, reqID): + + if self.gateway and not self.gateway.connected: + self.gateway.connected = True + balances = data["balances"] account = VtAccountData() account.gatewayName = self.gatewayName @@ -1114,5 +1138,5 @@ BREAK """生成时间""" dt = datetime.fromtimestamp(float(s)/1e3) time = dt.strftime("%H:%M:%S.%f") - date = dt.strftime("%Y%m%d") + date = dt.strftime("%Y-%m-%d") return dt , date, time diff --git a/vnpy/trader/gateway/okexGateway/okexGateway.py b/vnpy/trader/gateway/okexGateway/okexGateway.py index 120e1faf..c158e383 100644 --- a/vnpy/trader/gateway/okexGateway/okexGateway.py +++ b/vnpy/trader/gateway/okexGateway/okexGateway.py @@ -920,7 +920,7 @@ class OkexSpotApi(WsSpotApi): tick.date, tick.time,tick.datetime = self.generateDateTime(data['timestamp']) # print "Depth", tick.date, tick.time - + tick.tradingDay = tick.date # 推送tick事件 newtick = copy(tick) self.gateway.onTick(newtick)