diff --git a/vn.oanda/test.py b/vn.oanda/test.py index 4031a0aa..e86559bb 100644 --- a/vn.oanda/test.py +++ b/vn.oanda/test.py @@ -1,6 +1,6 @@ # encoding: utf-8 -from api import OandaApi +from vnoanda import OandaApi if __name__ == '__main__': diff --git a/vn.oanda/api.py b/vn.oanda/vnoanda.py similarity index 99% rename from vn.oanda/api.py rename to vn.oanda/vnoanda.py index 9746c9ef..a53eafe9 100644 --- a/vn.oanda/api.py +++ b/vn.oanda/vnoanda.py @@ -230,9 +230,9 @@ class OandaApi(object): print callback.__name__ callback(data, reqID) except Exception, e: - self.onError(error) + self.onError(str(e), reqID) else: - self.onError(error) + self.onError(error, reqID) except Empty: pass diff --git a/vn.trader/ContractData.vt b/vn.trader/ContractData.vt index 0d5cad63..9d9298ea 100644 Binary files a/vn.trader/ContractData.vt and b/vn.trader/ContractData.vt differ diff --git a/vn.trader/oandaGateway/OANDA_connect.json b/vn.trader/oandaGateway/OANDA_connect.json new file mode 100644 index 00000000..21dd2cd9 --- /dev/null +++ b/vn.trader/oandaGateway/OANDA_connect.json @@ -0,0 +1,5 @@ +{ + "token": "请在OANDA网站申请", + "accountId": "请在OANDA网站申请", + "settingName": "practice" +} \ No newline at end of file diff --git a/vn.trader/oandaGateway/__init__.py b/vn.trader/oandaGateway/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vn.trader/oandaGateway/oandaGateway.py b/vn.trader/oandaGateway/oandaGateway.py new file mode 100644 index 00000000..6ac3ce11 --- /dev/null +++ b/vn.trader/oandaGateway/oandaGateway.py @@ -0,0 +1,456 @@ +# encoding: UTF-8 + +''' +vn.oanda的gateway接入 + +由于OANDA采用的是外汇做市商的交易模式,因此和国内接口方面有若干区别,具体如下: + +* 行情数据反映的是OANDA的报价变化,因此只有买卖价,而没有成交价 + +* OANDA的持仓管理分为单笔成交持仓(Trade数据,国内没有) + 和单一资产汇总持仓(Position数据) + +* OANDA系统中的所有时间都采用UTC时间(世界协调时,中国是UTC+8) + +* 由于采用的是外汇做市商的模式,用户的限价委托当价格被触及时就会立即全部成交, + 不会出现部分成交的情况,因此委托状态只有已报、成交、撤销三种 + +* 外汇市场采用24小时交易,因此OANDA的委托不像国内收盘后自动失效,需要用户指定 + 失效时间,本接口中默认设置为24个小时候失效 +''' + + +import os +import json +import datetime + +from vnoanda import OandaApi +from vtGateway import * + +# 价格类型映射 +priceTypeMap = {} +priceTypeMap[PRICETYPE_LIMITPRICE] = 'limit' +priceTypeMap[PRICETYPE_MARKETPRICE] = 'market' +priceTypeMapReverse = {v: k for k, v in priceTypeMap.items()} + +# 方向类型映射 +directionMap = {} +directionMap[DIRECTION_LONG] = 'buy' +directionMap[DIRECTION_SHORT] = 'sell' +directionMapReverse = {v: k for k, v in directionMap.items()} + + +######################################################################## +class OandaGateway(VtGateway): + """OANDA接口""" + + #---------------------------------------------------------------------- + def __init__(self, eventEngine, gatewayName='OANDA'): + """Constructor""" + super(OandaGateway, self).__init__(eventEngine, gatewayName) + + self.api = Api(self) + + self.qryEnabled = False # 是否要启动循环查询 + + #---------------------------------------------------------------------- + def connect(self): + """连接""" + # 载入json文件 + fileName = self.gatewayName + '_connect.json' + fileName = os.getcwd() + '\\oandaGateway\\' + fileName + + try: + f = file(fileName) + except IOError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'读取连接配置出错,请检查' + self.onLog(log) + return + + # 解析json文件 + setting = json.load(f) + try: + token = str(setting['token']) + accountId = str(setting['accountId']) + settingName = str(setting['settingName']) + except KeyError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'连接配置缺少字段,请检查' + self.onLog(log) + return + + # 初始化接口 + self.api.init(settingName, token, accountId) + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'接口初始化成功' + self.onLog(log) + + # 查询信息 + self.api.qryInstruments() + self.api.qryOrders() + self.api.qryTrades() + + # 初始化并启动查询 + self.initQuery() + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + pass + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + return self.api.sendOrder_(orderReq) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + self.api.cancelOrder_(cancelOrderReq) + + #---------------------------------------------------------------------- + def qryAccount(self): + """查询账户资金""" + self.api.getAccountInfo() + + #---------------------------------------------------------------------- + def qryPosition(self): + """查询持仓""" + self.api.getPositions() + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + self.api.exit() + + #---------------------------------------------------------------------- + def initQuery(self): + """初始化连续查询""" + if self.qryEnabled: + # 需要循环的查询函数列表 + self.qryFunctionList = [self.qryAccount, self.qryPosition] + + self.qryCount = 0 # 查询触发倒计时 + self.qryTrigger = 2 # 查询触发点 + self.qryNextFunction = 0 # 上次运行的查询函数索引 + + self.startQuery() + + #---------------------------------------------------------------------- + def query(self, event): + """注册到事件处理引擎上的查询函数""" + self.qryCount += 1 + + if self.qryCount > self.qryTrigger: + # 清空倒计时 + self.qryCount = 0 + + # 执行查询函数 + function = self.qryFunctionList[self.qryNextFunction] + function() + + # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 + self.qryNextFunction += 1 + if self.qryNextFunction == len(self.qryFunctionList): + self.qryNextFunction = 0 + + #---------------------------------------------------------------------- + def startQuery(self): + """启动连续查询""" + self.eventEngine.register(EVENT_TIMER, self.query) + + #---------------------------------------------------------------------- + def setQryEnabled(self, qryEnabled): + """设置是否要启动循环查询""" + self.qryEnabled = qryEnabled + + + +######################################################################## +class Api(OandaApi): + """OANDA的API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(Api, self).__init__() + + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.orderDict = {} # 缓存委托数据 + + #---------------------------------------------------------------------- + def onError(self, error, reqID): + """错误信息回调""" + err = VtErrorData() + err.gatewayName = self.gatewayName + err.errorMsg = error + self.gateway.onError(err) + + #---------------------------------------------------------------------- + def onGetInstruments(self, data, reqID): + """回调函数""" + if not 'instruments' in data: + return + l = data['instruments'] + for d in l: + contract = VtContractData() + contract.gatewayName = self.gatewayName + + contract.symbol = d['instrument'] + contract.name = d['displayName'] + contract.exchange = EXCHANGE_OANDA + contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) + contract.priceTick = float(d['pip']) + contract.size = 1 + contract.productClass = PRODUCT_FOREX + self.gateway.onContract(contract) + + self.writeLog(u'交易合约信息查询完成') + + #---------------------------------------------------------------------- + def onGetAccountInfo(self, data, reqID): + """回调函数""" + account = VtAccountData() + account.gatewayName = self.gatewayName + + account.accountID = str(data['accountId']) + account.vtAccountID = '.'.join([self.gatewayName, account.accountID]) + + account.available = data['marginAvail'] + account.margin = data['marginUsed'] + account.closeProfit = data['realizedPl'] + account.positionProfit = data['unrealizedPl'] + account.balance = data['balance'] + + self.gateway.onAccount(account) + + #---------------------------------------------------------------------- + def onGetOrders(self, data, reqID): + """回调函数""" + if not 'orders' in data: + return + l = data['orders'] + + for d in l: + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = d['instrument'] + order.exchange = EXCHANGE_OANDA + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + order.orderID = str(d['id']) + + order.direction = directionMapReverse.get(d['side'], DIRECTION_UNKNOWN) + order.offset = OFFSET_NONE + order.status = STATUS_NOTTRADED # OANDA查询到的订单都是活动委托 + + order.price = d['price'] + order.totalVolume = d['units'] + order.orderTime = getTime(d['time']) + + order.vtOrderID = '.'.join([self.gatewayName , order.orderID]) + + self.gateway.onOrder(order) + + self.orderDict[order.orderID] = order + + self.writeLog(u'委托信息查询完成') + + #---------------------------------------------------------------------- + def onGetPositions(self, data, reqID): + """回调函数""" + if not 'positions' in data: + return + l = data['positions'] + + for d in l: + pos = VtPositionData() + pos.gatewayName = self.gatewayName + + pos.symbol = d['instrument'] + pos.exchange = EXCHANGE_OANDA + pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) + pos.direction = directionMapReverse.get(d['side'], DIRECTION_UNKNOWN) + pos.position = d['units'] + pos.price = d['avgPrice'] + pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) + + self.gateway.onPosition(pos) + + #---------------------------------------------------------------------- + def onGetTransactions(self, data, reqID): + """回调函数""" + if not 'transactions' in data: + return + l = data['transactions'] + + for d in l: + # 这里我们只关心委托成交 + if d['type'] == 'ORDER_FILLED': + trade = VtTradeData() + trade.gatewayName = self.gatewayName + + trade.symbol = d['instrument'] + trade.exchange = EXCHANGE_OANDA + trade.vtSymbol = '.'.join([trade.symbol, trade.exchange]) + trade.tradeID = str(d['id']) + trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) + trade.orderID = str(d['orderId']) + trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID]) + trade.direction = directionMapReverse.get(d['side'], DIRECTION_UNKNOWN) + trade.offset = OFFSET_NONE + trade.price = d['price'] + trade.volume = d['units'] + trade.tradeTime = getTime(d['time']) + + self.gateway.onTrade(trade) + + self.writeLog(u'成交信息查询完成') + + #---------------------------------------------------------------------- + def onPrice(self, data): + """行情推送""" + if 'tick' not in data: + return + d = data['tick'] + + tick = VtTickData() + tick.gatewayName = self.gatewayName + + tick.symbol = d['instrument'] + tick.exchange = EXCHANGE_OANDA + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + tick.bidPrice1 = d['bid'] + tick.askPrice1 = d['ask'] + tick.time = getTime(d['time']) + + # 做市商的TICK数据只有买卖的报价,因此最新价格选用中间价代替 + tick.lastPrice = (tick.bidPrice1 + tick.askPrice1)/2 + + self.gateway.onTick(tick) + + #---------------------------------------------------------------------- + def onEvent(self, data): + """事件推送(成交等)""" + if 'transaction' not in data: + return + + d = data['transaction'] + + # 委托成交 + if d['type'] == 'ORDER_FILLED': + # 推送成交事件 + trade = VtTradeData() + trade.gatewayName = self.gatewayName + + trade.symbol = d['instrument'] + trade.exchange = EXCHANGE_OANDA + trade.vtSymbol = '.'.join([trade.symbol, trade.exchange]) + + trade.tradeID = str(d['id']) + trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) + + trade.orderID = str(d['orderId']) + trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID]) + + trade.direction = directionMapReverse.get(d['side'], DIRECTION_UNKNOWN) + trade.offset = OFFSET_NONE + + trade.price = d['price'] + trade.volume = d['units'] + trade.tradeTime = getTime(d['time']) + + self.gateway.onTrade(trade) + + # 推送委托事件 + order = self.orderDict.get(str(d['orderId']), None) + if not order: + return + order.status = STATUS_ALLTRADED + self.gateway.onOrder(order) + + # 委托下达 + elif d['type'] in ['MARKET_ORDER_CREATE', 'LIMIT_ORDER_CREATE']: + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = d['instrument'] + order.exchange = EXCHANGE_OANDA + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + order.orderID = str(d['id']) + order.direction = directionMapReverse.get(d['side'], DIRECTION_UNKNOWN) + order.offset = OFFSET_NONE + order.status = STATUS_NOTTRADED + order.price = d['price'] + order.totalVolume = d['units'] + order.orderTime = getTime(d['time']) + order.vtOrderID = '.'.join([self.gatewayName , order.orderID]) + + self.gateway.onOrder(order) + self.orderDict[order.orderID] = order + + # 委托撤销 + elif d['type'] == 'ORDER_CANCEL': + order = self.orderDict.get(str(d['orderId']), None) + if not order: + return + order.status = STATUS_CANCELLED + self.gateway.onOrder(order) + + #---------------------------------------------------------------------- + def writeLog(self, logContent): + """发出日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = logContent + self.gateway.onLog(log) + + #---------------------------------------------------------------------- + def qryInstruments(self): + """查询合约""" + params = {'accountId': self.accountId} + self.getInstruments(params) + + #---------------------------------------------------------------------- + def qryOrders(self): + """查询委托""" + self.getOrders({}) + + #---------------------------------------------------------------------- + def qryTrades(self): + """查询成交""" + # 最多查询100条记录 + self.getTransactions({'count': 100}) + + #---------------------------------------------------------------------- + def sendOrder_(self, orderReq): + """发送委托""" + params = {} + params['instrument'] = orderReq.symbol + params['units'] = orderReq.volume + params['side'] = directionMap.get(orderReq.direction, '') + params['price'] = orderReq.price + params['type'] = priceTypeMap.get(orderReq.priceType, '') + + # 委托有效期24小时 + expire = datetime.datetime.now() + datetime.timedelta(days=1) + params['expiry'] = expire.isoformat('T') + 'Z' + + self.sendOrder(params) + + #---------------------------------------------------------------------- + def cancelOrder_(self, cancelOrderReq): + """撤销委托""" + self.cancelOrder(cancelOrderReq.orderID) + + +#---------------------------------------------------------------------- +def getTime(t): + """把OANDA返回的时间格式转化为简单的时间字符串""" + return t[11:19] \ No newline at end of file diff --git a/vn.trader/oandaGateway/vnoanda.py b/vn.trader/oandaGateway/vnoanda.py new file mode 100644 index 00000000..570d3f73 --- /dev/null +++ b/vn.trader/oandaGateway/vnoanda.py @@ -0,0 +1,608 @@ +# encoding: utf-8 + +import json +import requests +from Queue import Queue, Empty +from threading import Thread + + +API_SETTING = {} +API_SETTING['practice'] = {'rest': 'https://api-fxpractice.oanda.com', + 'stream': 'https://stream-fxpractice.oanda.com'} +API_SETTING['trade'] = {'rest': 'https://api-fxpractice.oanda.com', + 'stream': 'https://stream-fxtrade.oanda.com/'} + + +FUNCTIONCODE_GETINSTRUMENTS = 0 +FUNCTIONCODE_GETPRICES = 1 +FUNCTIONCODE_GETPRICEHISTORY = 2 +FUNCTIONCODE_GETACCOUNTS = 3 +FUNCTIONCODE_GETACCOUNTINFO = 4 +FUNCTIONCODE_GETORDERS = 5 +FUNCTIONCODE_SENDORDER = 6 +FUNCTIONCODE_GETORDERINFO = 7 +FUNCTIONCODE_MODIFYORDER = 8 +FUNCTIONCODE_CANCELORDER = 9 +FUNCTIONCODE_GETTRADES = 10 +FUNCTIONCODE_GETTRADEINFO = 11 +FUNCTIONCODE_MODIFYTRADE= 12 +FUNCTIONCODE_CLOSETRADE = 13 +FUNCTIONCODE_GETPOSITIONS = 14 +FUNCTIONCODE_GETPOSITIONINFO= 15 +FUNCTIONCODE_CLOSEPOSITION = 16 +FUNCTIONCODE_GETTRANSACTIONS = 17 +FUNCTIONCODE_GETTRANSACTIONINFO = 18 +FUNCTIONCODE_GETACCOUNTHISTORY = 19 +FUNCTIONCODE_GETCALENDAR = 20 +FUNCTIONCODE_GETPOSITIONRATIOS = 21 +FUNCTIONCODE_GETSPREADS = 22 +FUNCTIONCODE_GETCOMMIMENTS = 23 +FUNCTIONCODE_GETORDERBOOK = 24 +FUNCTIONCODE_GETAUTOCHARTIST = 25 +FUNCTIONCODE_STREAMPRICES = 26 +FUNCTIONCODE_STREAMEVENTS = 27 + + +######################################################################## +class OandaApi(object): + """""" + DEBUG = False + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.token = '' + self.accountId = '' + self.headers = {} + self.restDomain = '' + self.streamDomain = '' + self.session = None + + self.functionSetting = {} + + self.active = False # API的工作状态 + + self.reqID = 0 # 请求编号 + self.reqQueue = Queue() # 请求队列 + self.reqThread = Thread(target=self.processQueue) # 请求处理线程 + + self.streamPricesThread = Thread(target=self.processStreamPrices) # 实时行情线程 + self.streamEventsThread = Thread(target=self.processStreamEvents) # 实时事件线程(成交等) + + #---------------------------------------------------------------------- + def init(self, settingName, token, accountId): + """初始化接口""" + self.restDomain = API_SETTING[settingName]['rest'] + self.streamDomain = API_SETTING[settingName]['stream'] + self.session = requests.Session() + + self.token = token + self.accountId = accountId + + self.headers['Authorization'] = 'Bearer ' + self.token + + self.initFunctionSetting(FUNCTIONCODE_GETINSTRUMENTS, {'path': '/v1/instruments', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETPRICES, {'path': '/v1/prices', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETPRICEHISTORY, {'path': 'v1/candles', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETACCOUNTS, {'path': '/v1/accounts', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETACCOUNTINFO, {'path': '/v1/accounts/%s' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETORDERS, {'path': '/v1/accounts/%s/orders' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_SENDORDER, {'path': '/v1/accounts/%s/orders' %self.accountId, + 'method': 'POST'}) + + self.initFunctionSetting(FUNCTIONCODE_GETORDERINFO, {'path': '/v1/accounts/%s/orders' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_MODIFYORDER, {'path': '/v1/accounts/%s/orders' %self.accountId, + 'method': 'PATCH'}) + + self.initFunctionSetting(FUNCTIONCODE_CANCELORDER, {'path': '/v1/accounts/%s/orders' %self.accountId, + 'method': 'DELETE'}) + + self.initFunctionSetting(FUNCTIONCODE_GETTRADES, {'path': '/v1/accounts/%s/trades' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETTRADEINFO, {'path': '/v1/accounts/%s/trades' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_MODIFYTRADE, {'path': '/v1/accounts/%s/trades' %self.accountId, + 'method': 'PATCH'}) + + self.initFunctionSetting(FUNCTIONCODE_CLOSETRADE, {'path': '/v1/accounts/%s/trades' %self.accountId, + 'method': 'DELETE'}) + + self.initFunctionSetting(FUNCTIONCODE_GETPOSITIONS, {'path': '/v1/accounts/%s/positions' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETPOSITIONINFO, {'path': '/v1/accounts/%s/positions' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_CLOSEPOSITION, {'path': '/v1/accounts/%s/positions' %self.accountId, + 'method': 'DELETE'}) + + self.initFunctionSetting(FUNCTIONCODE_GETTRANSACTIONS, {'path': '/v1/accounts/%s/transactions' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETTRANSACTIONINFO, {'path': '/v1/accounts/%s/transactions' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETACCOUNTHISTORY, {'path': '/v1/accounts/%s/alltransactions' %self.accountId, + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETCALENDAR, {'path': '/labs/v1/calendar', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETPOSITIONRATIOS, {'path': '/labs/v1/historical_position_ratios', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETSPREADS, {'path': '/labs/v1/spreads', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETCOMMIMENTS, {'path': '/labs/v1/commitments', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETORDERBOOK, {'path': '/labs/v1/orderbook_data', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETAUTOCHARTIST, {'path': '/labs/v1/autochartist', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_GETAUTOCHARTIST, {'path': '/labs/v1/autochartist', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_STREAMPRICES, {'path': '/v1/prices', + 'method': 'GET'}) + + self.initFunctionSetting(FUNCTIONCODE_STREAMEVENTS, {'path': '/v1/events', + 'method': 'GET'}) + + + self.active = True + self.reqThread.start() + self.streamEventsThread.start() + self.streamPricesThread.start() + + #---------------------------------------------------------------------- + def exit(self): + """退出接口""" + self.active = False + self.reqThread.join() + + #---------------------------------------------------------------------- + def initFunctionSetting(self, code, setting): + """初始化API功能字典""" + self.functionSetting[code] = setting + + #---------------------------------------------------------------------- + def processRequest(self, req): + """发送请求并通过回调函数推送数据结果""" + url = req['url'] + method = req['method'] + params = req['params'] + + stream = False + if 'stream' in req: + stream = req['stream'] + + if method in ['GET', 'DELETE']: + myreq = requests.Request(method, url, headers=self.headers, params=params) + elif method in ['POST', 'PATCH']: + myreq = requests.Request(method, url, headers=self.headers, data=params) + pre = myreq.prepare() + + r = None + error = None + + try: + r = self.session.send(pre, stream=stream) + except Exception, e: + error = e + + return r, error + + #---------------------------------------------------------------------- + def processQueue(self): + """处理请求队列中的请求""" + while self.active: + try: + req = self.reqQueue.get(block=True, timeout=1) # 获取请求的阻塞为一秒 + callback = req['callback'] + reqID = req['reqID'] + + r, error = self.processRequest(req) + + if r: + try: + data = r.json() + if self.DEBUG: + print callback.__name__ + callback(data, reqID) + except Exception, e: + self.onError(str(e), reqID) + else: + self.onError(error, reqID) + except Empty: + pass + + #---------------------------------------------------------------------- + def sendRequest(self, code, params, callback, optional=''): + """发送请求""" + setting = self.functionSetting[code] + + url = self.restDomain + setting['path'] + if optional: + url = url + '/' + optional + + self.reqID += 1 + + req = {'url': url, + 'method': setting['method'], + 'params': params, + 'callback': callback, + 'reqID': self.reqID} + self.reqQueue.put(req) + + return self.reqID + + #---------------------------------------------------------------------- + def onError(self, error, reqID): + """错误信息回调""" + print error, reqID + + #---------------------------------------------------------------------- + def getInstruments(self, params): + """查询可交易的合约列表""" + return self.sendRequest(FUNCTIONCODE_GETINSTRUMENTS, params, self.onGetInstruments) + + #---------------------------------------------------------------------- + def onGetInstruments(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getPrices(self, params): + """查询价格""" + return self.sendRequest(FUNCTIONCODE_GETPRICES, params, self.onGetPrices) + + #---------------------------------------------------------------------- + def onGetPrices(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getPriceHisory(self, params): + """查询历史价格数据""" + return self.sendRequest(FUNCTIONCODE_GETPRICEHISTORY, params, self.onGetPriceHistory) + + #---------------------------------------------------------------------- + def onGetPriceHistory(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getAccounts(self): + """查询用户的所有账户""" + return self.sendRequest(FUNCTIONCODE_GETACCOUNTS, {}, self.onGetAccounts) + + #---------------------------------------------------------------------- + def onGetAccounts(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getAccountInfo(self): + """查询账户数据""" + return self.sendRequest(FUNCTIONCODE_GETACCOUNTINFO, {}, self.onGetAccountInfo) + + #---------------------------------------------------------------------- + def onGetAccountInfo(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getOrders(self, params): + """查询所有委托""" + return self.sendRequest(FUNCTIONCODE_GETORDERS, params, self.onGetOrders) + + #---------------------------------------------------------------------- + def onGetOrders(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def sendOrder(self, params): + """发送委托""" + return self.sendRequest(FUNCTIONCODE_SENDORDER, params, self.onSendOrder) + + #---------------------------------------------------------------------- + def onSendOrder(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getOrderInfo(self, optional): + """查询委托信息""" + return self.sendRequest(FUNCTIONCODE_GETORDERINFO, {}, self.onGetOrderInfo, optional) + + #---------------------------------------------------------------------- + def onGetOrderInfo(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def modifyOrder(self, params, optional): + """修改委托""" + return self.sendRequest(FUNCTIONCODE_MODIFYORDER, params, self.onModifyOrder, optional) + + #---------------------------------------------------------------------- + def onModifyOrder(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def cancelOrder(self, optional): + """查询委托信息""" + return self.sendRequest(FUNCTIONCODE_CANCELORDER, {}, self.onCancelOrder, optional) + + #---------------------------------------------------------------------- + def onCancelOrder(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getTrades(self, params): + """查询所有仓位""" + return self.sendRequest(FUNCTIONCODE_GETTRADES, params, self.onGetTrades) + + #---------------------------------------------------------------------- + def onGetTrades(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getTradeInfo(self, optional): + """查询仓位信息""" + return self.sendRequest(FUNCTIONCODE_GETTRADEINFO, {}, self.onGetTradeInfo, optional) + + #---------------------------------------------------------------------- + def onGetTradeInfo(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def modifyTrade(self, params, optional): + """修改仓位""" + return self.sendRequest(FUNCTIONCODE_MODIFYTRADE, params, self.onModifyTrade, optional) + + #---------------------------------------------------------------------- + def onModifyTrade(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def closeTrade(self, optional): + """平仓""" + return self.sendRequest(FUNCTIONCODE_CLOSETRADE, {}, self.onCloseTrade, optional) + + #---------------------------------------------------------------------- + def onCloseTrade(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getPositions(self): + """查询所有汇总仓位""" + return self.sendRequest(FUNCTIONCODE_GETPOSITIONS, {}, self.onGetPositions) + + #---------------------------------------------------------------------- + def onGetPositions(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getPositionInfo(self, optional): + """查询汇总仓位信息""" + return self.sendRequest(FUNCTIONCODE_GETPOSITIONINFO, {}, self.onGetPositionInfo, optional) + + #---------------------------------------------------------------------- + def onGetPositionInfo(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def closePosition(self, optional): + """平仓汇总仓位信息""" + return self.sendRequest(FUNCTIONCODE_CLOSEPOSITION, {}, self.onClosePosition, optional) + + #---------------------------------------------------------------------- + def onClosePosition(self, data, reqID): + """回调函数""" + pass + + + #---------------------------------------------------------------------- + def getTransactions(self, params): + """查询所有资金变动""" + return self.sendRequest(FUNCTIONCODE_GETTRANSACTIONS, params, self.onGetTransactions) + + #---------------------------------------------------------------------- + def onGetTransactions(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getTransactionInfo(self, optional): + """查询资金变动信息""" + return self.sendRequest(FUNCTIONCODE_GETTRANSACTIONINFO, {}, self.onGetTransactionInfo, optional) + + #---------------------------------------------------------------------- + def onGetTransactionInfo(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getAccountHistory(self): + """查询账户资金变动历史""" + return self.sendRequest(FUNCTIONCODE_GETACCOUNTHISTORY, {}, self.onGetAccountHistory) + + #---------------------------------------------------------------------- + def onGetAccountHistory(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getCalendar(self, params): + """查询日历""" + return self.sendRequest(FUNCTIONCODE_GETCALENDAR, params, self.onGetCalendar) + + #---------------------------------------------------------------------- + def onGetCalendar(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getPositionRatios(self, params): + """查询持仓比例""" + return self.sendRequest(FUNCTIONCODE_GETPOSITIONRATIOS, params, self.onGetPositionRatios) + + #---------------------------------------------------------------------- + def onGetPositionRatios(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getSpreads(self, params): + """查询所有仓位""" + return self.sendRequest(FUNCTIONCODE_GETSPREADS, params, self.onGetSpreads) + + #---------------------------------------------------------------------- + def onGetSpreads(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getCommitments(self, params): + """查询交易商持仓情况""" + return self.sendRequest(FUNCTIONCODE_GETCOMMIMENTS, params, self.onGetCommitments) + + #---------------------------------------------------------------------- + def onGetCommitments(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getOrderbook(self, params): + """查询订单簿""" + return self.sendRequest(FUNCTIONCODE_GETORDERBOOK, params, self.onGetOrderbook) + + #---------------------------------------------------------------------- + def onGetOrderbook(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def getAutochartist(self, params): + """查询Autochartist识别的模式""" + return self.sendRequest(FUNCTIONCODE_GETAUTOCHARTIST, params, self.onGetAutochartist) + + #---------------------------------------------------------------------- + def onGetAutochartist(self, data, reqID): + """回调函数""" + pass + + #---------------------------------------------------------------------- + def onPrice(self, data): + """行情推送""" + print data + + #---------------------------------------------------------------------- + def onEvent(self, data): + """事件推送(成交等)""" + print data + + #---------------------------------------------------------------------- + def processStreamPrices(self): + """获取价格推送""" + # 首先获取所有合约的代码 + setting = self.functionSetting[FUNCTIONCODE_GETINSTRUMENTS] + req = {'url': self.restDomain + setting['path'], + 'method': setting['method'], + 'params': {'accountId': self.accountId}} + r, error = self.processRequest(req) + if r: + try: + data = r.json() + symbols = [d['instrument'] for d in data['instruments']] + except Exception, e: + self.onError(e, -1) + return + else: + self.onError(error, -1) + return + + # 然后订阅所有的合约行情 + setting = self.functionSetting[FUNCTIONCODE_STREAMPRICES] + params = {'accountId': self.accountId, + 'instruments': ','.join(symbols)} + req = {'url': self.streamDomain + setting['path'], + 'method': setting['method'], + 'params': params, + 'stream': True} + r, error = self.processRequest(req) + + if r: + for line in r.iter_lines(): + if line: + try: + msg = json.loads(line) + + if self.DEBUG: + print self.onPrice.__name__ + + self.onPrice(msg) + except Exception, e: + self.onError(e, -1) + + if not self.active: + break + else: + self.onError(error, -1) + + #---------------------------------------------------------------------- + def processStreamEvents(self): + """获取事件推送""" + setting = self.functionSetting[FUNCTIONCODE_STREAMEVENTS] + req = {'url': self.streamDomain + setting['path'], + 'method': setting['method'], + 'params': {}, + 'stream': True} + r, error = self.processRequest(req) + if r: + for line in r.iter_lines(): + if line: + try: + msg = json.loads(line) + + if self.DEBUG: + print self.onEvent.__name__ + + self.onEvent(msg) + except Exception, e: + self.onError(e, -1) + + if not self.active: + break + else: + self.onError(error, -1) \ No newline at end of file diff --git a/vn.trader/uiMainWindow.py b/vn.trader/uiMainWindow.py index 1d993410..0b5da758 100644 --- a/vn.trader/uiMainWindow.py +++ b/vn.trader/uiMainWindow.py @@ -91,6 +91,9 @@ class MainWindow(QtGui.QMainWindow): connectIbAction = QtGui.QAction(u'连接IB', self) connectIbAction.triggered.connect(self.connectIb) + connectOandaAction = QtGui.QAction(u'连接OANDA', self) + connectOandaAction.triggered.connect(self.connectOanda) + connectDbAction = QtGui.QAction(u'连接数据库', self) connectDbAction.triggered.connect(self.mainEngine.dbConnect) @@ -118,7 +121,9 @@ class MainWindow(QtGui.QMainWindow): sysMenu.addAction(connectFemasAction) sysMenu.addAction(connectKsotpAction) sysMenu.addAction(connectKsgoldAction) + sysMenu.addSeparator() sysMenu.addAction(connectIbAction) + sysMenu.addAction(connectOandaAction) sysMenu.addSeparator() sysMenu.addAction(connectWindAction) sysMenu.addSeparator() @@ -202,6 +207,11 @@ class MainWindow(QtGui.QMainWindow): """连接Ib""" self.mainEngine.connect('IB') + #---------------------------------------------------------------------- + def connectOanda(self): + """连接OANDA""" + self.mainEngine.connect('OANDA') + #---------------------------------------------------------------------- def test(self): """测试按钮用的函数""" diff --git a/vn.trader/vtConstant.py b/vn.trader/vtConstant.py index 6efc3cdd..e1467b04 100644 --- a/vn.trader/vtConstant.py +++ b/vn.trader/vtConstant.py @@ -66,6 +66,8 @@ EXCHANGE_SMART = 'SMART' # IB智能路由(股票、期权) EXCHANGE_GLOBEX = 'GLOBEX' # CME电子交易平台 EXCHANGE_IDEALPRO = 'IDEALPRO' # IB外汇ECN +EXCHANGE_OANDA = 'OANDA' # OANDA外汇做市商 + # 货币类型 CURRENCY_USD = 'USD' # 美元 CURRENCY_CNY = 'CNY' # 人民币 diff --git a/vn.trader/vtEngine.py b/vn.trader/vtEngine.py index fe5f5ff3..22e857bf 100644 --- a/vn.trader/vtEngine.py +++ b/vn.trader/vtEngine.py @@ -87,6 +87,13 @@ class MainEngine(object): self.addGateway(IbGateway, 'IB') except Exception, e: print e + + try: + from oandaGateway.oandaGateway import OandaGateway + self.addGateway(OandaGateway, 'OANDA') + self.gatewayDict['OANDA'].setQryEnabled(True) + except Exception, e: + print e #---------------------------------------------------------------------- def addGateway(self, gateway, gatewayName=None):