diff --git a/vn.trader/dataRecorder/drEngine.py b/vn.trader/dataRecorder/drEngine.py index 644dc0da..0512e6f2 100644 --- a/vn.trader/dataRecorder/drEngine.py +++ b/vn.trader/dataRecorder/drEngine.py @@ -11,6 +11,8 @@ import os import copy from collections import OrderedDict from datetime import datetime, timedelta +from Queue import Queue +from threading import Thread from eventEngine import * from vtGateway import VtSubscribeReq, VtLogData @@ -43,6 +45,11 @@ class DrEngine(object): # K线对象字典 self.barDict = {} + # 负责执行数据库插入的单独线程相关 + self.active = False # 工作状态 + self.queue = Queue() # 队列 + self.thread = Thread(target=self.run) # 线程 + # 载入设置,订阅行情 self.loadSetting() @@ -111,9 +118,12 @@ class DrEngine(object): # 注意这里的vtSymbol对于IB和LTS接口,应该后缀.交易所 for activeSymbol, vtSymbol in d.items(): self.activeSymbolDict[vtSymbol] = activeSymbol - + + # 启动数据插入线程 + self.start() + # 注册事件监听 - self.registerEvent() + self.registerEvent() #---------------------------------------------------------------------- def procecssTickEvent(self, event): @@ -187,7 +197,29 @@ class DrEngine(object): #---------------------------------------------------------------------- def insertData(self, dbName, collectionName, data): """插入数据到数据库(这里的data可以是CtaTickData或者CtaBarData)""" - self.mainEngine.dbInsert(dbName, collectionName, data.__dict__) + self.queue.put((dbName, collectionName, data.__dict__)) + + #---------------------------------------------------------------------- + def run(self): + """运行插入线程""" + while self.active: + try: + dbName, collectionName, d = self.queue.get(block=True, timeout=1) + self.mainEngine.dbInsert(dbName, collectionName, d) + except Empty: + pass + #---------------------------------------------------------------------- + def start(self): + """启动""" + self.active = True + self.thread.start() + + #---------------------------------------------------------------------- + def stop(self): + """退出""" + if self.active: + self.active = False + self.thread.join() #---------------------------------------------------------------------- def writeDrLog(self, content): diff --git a/vn.trader/eventType.py b/vn.trader/eventType.py index 54e75188..a635dedf 100644 --- a/vn.trader/eventType.py +++ b/vn.trader/eventType.py @@ -19,9 +19,9 @@ EVENT_LOG = 'eLog' # 日志事件,全局通用 EVENT_TICK = 'eTick.' # TICK行情事件,可后接具体的vtSymbol EVENT_TRADE = 'eTrade.' # 成交回报事件 EVENT_ORDER = 'eOrder.' # 报单回报事件 -EVENT_ERRRTNORDERINSERT = 'eErrRtnOrderInsert' # 报单录入错误回报事件 EVENT_POSITION = 'ePosition.' # 持仓回报事件 EVENT_ACCOUNT = 'eAccount.' # 账户回报事件 +EVENT_ACCOUNT_LOSS = 'eAccountLoss' # 账户亏损事件 EVENT_CONTRACT = 'eContract.' # 合约基础信息回报事件 EVENT_ERROR = 'eError.' # 错误回报事件 diff --git a/vn.trader/riskManager/RM_setting.json b/vn.trader/riskManager/RM_setting.json index 18f6eb03..c9d7360d 100644 --- a/vn.trader/riskManager/RM_setting.json +++ b/vn.trader/riskManager/RM_setting.json @@ -1,9 +1,10 @@ { "orderFlowClear": 10, - "percentLimit": 90, + "percentLimit": 80, "workingOrderLimit": 200, - "tradeLimit": 1000, + "tradeLimit": 20000, "orderSizeLimit": 100, "active": true, - "orderFlowLimit": 1000 + "lossPercentLimit": 11, + "orderFlowLimit": 20000 } \ No newline at end of file diff --git a/vn.trader/riskManager/uiRmWidget.py b/vn.trader/riskManager/uiRmWidget.py index fd3139c4..1c4c6074 100644 --- a/vn.trader/riskManager/uiRmWidget.py +++ b/vn.trader/riskManager/uiRmWidget.py @@ -22,9 +22,6 @@ class RmSpinBox(QtGui.QSpinBox): self.setMaximum(1000000) self.setValue(value) - - - ######################################################################## class RmLine(QtGui.QFrame): @@ -36,8 +33,6 @@ class RmLine(QtGui.QFrame): super(RmLine, self).__init__() self.setFrameShape(self.HLine) self.setFrameShadow(self.Sunken) - - ######################################################################## @@ -72,6 +67,9 @@ class RmEngineManager(QtGui.QWidget): # 最大开仓比例 self.spinPercentLimit = RmSpinBox(self.rmEngine.percentLimit) + # 最大净值止损比例,满足后强制止损 + self.spinLossPercentLimit = RmSpinBox(self.rmEngine.lossPercentLimit) + buttonClearOrderFlowCount = QtGui.QPushButton(u'清空流控计数') buttonClearTradeCount = QtGui.QPushButton(u'清空总成交计数') buttonSaveSetting = QtGui.QPushButton(u'保存设置') @@ -98,6 +96,8 @@ class RmEngineManager(QtGui.QWidget): grid.addWidget(RmLine(), 10, 0, 1, 2) grid.addWidget(Label(u'仓位上限(1~100)'), 11, 0) grid.addWidget(self.spinPercentLimit, 11, 1) + grid.addWidget(Label(u'强制止损比例'), 12, 0) + grid.addWidget(self.spinLossPercentLimit, 12, 1) hbox = QtGui.QHBoxLayout() hbox.addWidget(buttonClearOrderFlowCount) @@ -117,6 +117,7 @@ class RmEngineManager(QtGui.QWidget): self.spinTradeLimit.valueChanged.connect(self.rmEngine.setTradeLimit) self.spinWorkingOrderLimit.valueChanged.connect(self.rmEngine.setWorkingOrderLimit) self.spinPercentLimit.valueChanged.connect(self.rmEngine.setAccountPercentLimit) + self.spinLossPercentLimit.valueChanged.connect(self.rmEngine.setLossPercentLimit) self.buttonSwitchEngineStatus.clicked.connect(self.switchEngineSatus) buttonClearOrderFlowCount.clicked.connect(self.rmEngine.clearOrderFlowCount) diff --git a/vn.trader/uiBasicWidget.py b/vn.trader/uiBasicWidget.py index 650713d9..49977dc1 100644 --- a/vn.trader/uiBasicWidget.py +++ b/vn.trader/uiBasicWidget.py @@ -609,8 +609,6 @@ class PositionMonitor(BasicMonitor): self.initTable() self.registerEvent() - - ######################################################################## class AccountMonitor(BasicMonitor): @@ -639,7 +637,6 @@ class AccountMonitor(BasicMonitor): self.initTable() self.registerEvent() - ######################################################################## class TradingWidget(QtGui.QFrame): """简单交易组件""" @@ -745,7 +742,6 @@ class TradingWidget(QtGui.QFrame): self.comboPriceType.addItems(self.priceTypeList) self.comboExchange = QtGui.QComboBox() - self.comboExchange.addItems(self.exchangeList) self.comboCurrency = QtGui.QComboBox() diff --git a/vn.trader/uiMainWindow.py b/vn.trader/uiMainWindow.py index 6048df47..b87182ba 100644 --- a/vn.trader/uiMainWindow.py +++ b/vn.trader/uiMainWindow.py @@ -22,9 +22,11 @@ class MainWindow(QtGui.QMainWindow): self.eventEngine = eventEngine self.widgetDict = {} # 用来保存子窗口的字典 - + + self.connectGatewayDict = {} + self.initUi() - self.loadWindowSettings() + #self.loadWindowSettings() self.connected = False self.autoDisConnect = False @@ -32,11 +34,13 @@ class MainWindow(QtGui.QMainWindow): self.orderSaveDate = EMPTY_STRING self.barSaveDate = EMPTY_STRING - self.connectGatewayDict = {} + # ---------------------------------------------------------------------- def initUi(self): """初始化界面""" - self.setWindowTitle('VnTrader') + path = os.getcwd().rsplit('\\')[-1] + + self.setWindowTitle(path) self.initCentral() self.initMenu() self.initStatusBar() @@ -195,10 +199,16 @@ class MainWindow(QtGui.QMainWindow): self.sbCount = 0 self.sbTrigger = 10 # 10秒刷新一次 self.eventEngine.register(EVENT_TIMER, self.updateStatusBar) + # ---------------------------------------------------------------------- def updateStatusBar(self, event): - """在状态栏更新CPU和内存信息""" + """1、在状态栏更新CPU和内存信息""" + # 2、定时断开服务器连接 + # 3、定时重连服务器 + # 4、定时保存每日的委托单 + # 5、定时执行策略的保存事件 + self.sbCount += 1 # 更新任务栏 @@ -211,68 +221,69 @@ class MainWindow(QtGui.QMainWindow): self.statusLabel.setText(info) - if len(self.connectGatewayDict) > 0: - s = u''.join(str(e) for e in self.connectGatewayDict.values()) - - if not self.connected: - s = s + u' [已断开]' - - - self.setWindowTitle(s) - - # 定时断开 - if self.connected and self.trade_off() and self.autoDisConnect: - self.disconnect() - self.mainEngine.writeLog(u'断开连接{0}'.format(self.connectGatewayDict.values())) - self.mainEngine.writeLog(u'清空数据引擎') - self.mainEngine.clearData() - self.mainEngine.writeLog(u'清空委托列表') - self.widgetOrderM.clearData() - self.mainEngine.writeLog(u'清空交易列表') - self.widgetTradeM.clearData() - - # 定时重连 - if not self.connected \ - and self.autoDisConnect \ - and not self.trade_off()\ - and len(self.connectGatewayDict) > 0: - - self.mainEngine.writeLog(u'清空数据引擎') - self.mainEngine.clearData() - self.mainEngine.writeLog(u'清空委托列表') - self.widgetOrderM.clearData() - self.mainEngine.writeLog(u'清空交易列表') - self.widgetTradeM.clearData() + if self.connectGatewayDict: s = u''.join(str(e) for e in self.connectGatewayDict.values()) - self.mainEngine.writeLog(u'重新连接{0}'.format(s)) - for key in self.connectGatewayDict.keys(): - self.mainEngine.connect(key) + if not self.connected: + s = s + u' [已断开]' - self.connected = True - # 交易日收盘后保存所有委托记录, - dt = datetime.now() - today = datetime.now().strftime('%y%m%d') - if dt.hour == 15 and dt.minute == 1 and len(self.connectGatewayDict) > 0 and today!=self.orderSaveDate: - self.orderSaveDate = today - self.mainEngine.writeLog(u'保存所有委托记录') - orderfile = os.getcwd() +'/orders/{0}.csv'.format(self.orderSaveDate) - if os.path.exists(orderfile): - return - else: - self.widgetOrderM.saveToCsv(path=orderfile) + self.setWindowTitle(s) - # 调用各策略保存数据 - if ((dt.hour == 15 and dt.minute == 1) or (dt.hour == 2 and dt.minute == 31)) \ - and len(self.connectGatewayDict) > 0 \ - and today != self.barSaveDate: - self.barSaveDate = today - self.mainEngine.writeLog(u'调用各策略保存数据') - self.mainEngine.saveData() + # 定时断开 + if self.connected and self.trade_off() and self.autoDisConnect: + self.disconnect() + self.mainEngine.writeLog(u'断开连接{0}'.format(self.connectGatewayDict.values())) + self.mainEngine.writeLog(u'清空数据引擎') + self.mainEngine.clearData() + self.mainEngine.writeLog(u'清空委托列表') + self.widgetOrderM.clearData() + self.mainEngine.writeLog(u'清空交易列表') + self.widgetTradeM.clearData() - if not (dt.hour == 15 or dt.hour == 2): - self.barSaveDate = EMPTY_STRING + # 定时重连 + if not self.connected \ + and self.autoDisConnect \ + and not self.trade_off()\ + and len(self.connectGatewayDict) > 0: + + self.mainEngine.writeLog(u'清空数据引擎') + self.mainEngine.clearData() + self.mainEngine.writeLog(u'清空委托列表') + self.widgetOrderM.clearData() + self.mainEngine.writeLog(u'清空交易列表') + self.widgetTradeM.clearData() + s = u''.join(str(e) for e in self.connectGatewayDict.values()) + self.mainEngine.writeLog(u'重新连接{0}'.format(s)) + + for key in self.connectGatewayDict.keys(): + self.mainEngine.connect(key) + + self.connected = True + + # 交易日收盘后保存所有委托记录, + dt = datetime.now() + today = datetime.now().strftime('%y%m%d') + if dt.hour == 15 and dt.minute == 1 and len(self.connectGatewayDict) > 0 and today!=self.orderSaveDate: + self.orderSaveDate = today + self.mainEngine.writeLog(u'保存所有委托记录') + orderfile = os.getcwd() +'/orders/{0}.csv'.format(self.orderSaveDate) + if os.path.exists(orderfile): + return + else: + self.widgetOrderM.saveToCsv(path=orderfile) + + # 调用各策略保存数据 + if ((dt.hour == 15 and dt.minute == 1) or (dt.hour == 2 and dt.minute == 31)) \ + and len(self.connectGatewayDict) > 0 \ + and today != self.barSaveDate \ + and self.connected: + self.barSaveDate = today + self.mainEngine.writeLog(u'调用各策略保存数据') + self.mainEngine.saveData() + + if not (dt.hour == 15 or dt.hour == 2): + self.barSaveDate = EMPTY_STRING # ---------------------------------------------------------------------- def getCpuMemory(self): @@ -491,7 +502,6 @@ class MainWindow(QtGui.QMainWindow): pass - def disconnect(self): """"断开底层gateway的连接""" self.mainEngine.disconnect() diff --git a/vn.trader/vtConstant.py b/vn.trader/vtConstant.py index 38e4c4c9..46f02069 100644 --- a/vn.trader/vtConstant.py +++ b/vn.trader/vtConstant.py @@ -78,6 +78,7 @@ EXCHANGE_GLOBEX = 'GLOBEX' # CME电子交易平台 EXCHANGE_IDEALPRO = 'IDEALPRO' # IB外汇ECN EXCHANGE_OANDA = 'OANDA' # OANDA外汇做市商 +EXCHANGE_OKCOIN = 'OKCOIN' # OKCOIN比特币交易所 # 货币类型 CURRENCY_USD = 'USD' # 美元 diff --git a/vn.trader/vtEngine.py b/vn.trader/vtEngine.py index bf416fe6..38e8937d 100644 --- a/vn.trader/vtEngine.py +++ b/vn.trader/vtEngine.py @@ -28,7 +28,7 @@ class MainEngine(object): self.eventEngine.start() # 创建数据引擎 - self.dataEngine = DataEngine(self.eventEngine) + self.dataEngine = DataEngine(self, self.eventEngine) # MongoDB数据库相关 self.dbClient = None # MongoDB客户端对象 @@ -61,7 +61,7 @@ class MainEngine(object): self.addGateway(CtpGateway, 'CTP_EBF') self.gatewayDict['CTP_EBF'].setQryEnabled(True) - except Exception, e: + except Exception as e: print e """ @@ -125,8 +125,16 @@ class MainEngine(object): self.gatewayDict['OANDA'].setQryEnabled(True) except Exception, e: print e + + try: + from okcoinGateway.okcoinGateway import OkcoinGateway + self.addGateway(OkcoinGateway, 'OKCOIN') + self.gatewayDict['OKCOIN'].setQryEnabled(True) + except Exception, e: + print e """ - # ---------------------------------------------------------------------- + + #---------------------------------------------------------------------- def addGateway(self, gateway, gatewayName=None): """创建接口""" self.gatewayDict[gatewayName] = gateway(self.eventEngine, gatewayName) @@ -208,6 +216,9 @@ class MainEngine(object): # 停止事件引擎 self.eventEngine.stop() + # 停止数据记录引擎 + self.drEngine.stop() + # 保存数据引擎里的合约数据到硬盘 self.dataEngine.saveContracts() @@ -288,6 +299,7 @@ class MainEngine(object): def clearData(self): """清空数据引擎的数据""" self.dataEngine.clearData() + self.ctaEngine.clearData() def saveData(self): self.ctaEngine.saveStrategyData() @@ -298,8 +310,9 @@ class DataEngine(object): contractFileName = 'ContractData.vt' # ---------------------------------------------------------------------- - def __init__(self, eventEngine): + def __init__(self, mainEngine, eventEngine): """Constructor""" + self.mainEngine = mainEngine self.eventEngine = eventEngine # 保存合约详细信息的字典 @@ -316,6 +329,9 @@ class DataEngine(object): # 注册事件监听 self.registerEvent() + + # 已订阅合约代码 + self.subscribedSymbols = set() # ---------------------------------------------------------------------- def updateContract(self, event): @@ -386,10 +402,45 @@ class DataEngine(object): """注册事件监听""" self.eventEngine.register(EVENT_CONTRACT, self.updateContract) self.eventEngine.register(EVENT_ORDER, self.updateOrder) + self.eventEngine.register(EVENT_POSITION, self.updatePosition) def clearData(self): """清空数据""" self.orderDict = {} self.workingOrderDict = {} - + self.subscribedSymbols.clear() + + def updatePosition(self,event): + """更新持仓信息""" + # 在获取更新持仓信息时,自动订阅这个symbol + # 目的:1、 + + position = event.dict_['data'] + + symbol = position.symbol + + # 已存在,不做更新 + if symbol in self.subscribedSymbols: + return + + self.subscribedSymbols.add(symbol) + + gatewayName = position.gatewayName + contract = self.mainEngine.getContract(symbol) + + if not contract: + self.mainEngine.writeLog(u'找不到合约{0}信息'.format(symbol)) + return + + # 订阅合约 + req = VtSubscribeReq() + req.symbol = symbol + req.exchange = contract.exchange + req.currency = '' + req.productClass = '' + + self.mainEngine.subscribe(req, gatewayName) + + self.mainEngine.writeLog(u'自动订阅合约{0}'.format(symbol)) +