[Mod]更新FxcmGateway

This commit is contained in:
vn.py 2018-01-29 17:22:41 +08:00
parent 1a01937e6c
commit a13d40da21
4 changed files with 371 additions and 53 deletions

View File

@ -7,4 +7,5 @@ futuquant
wmi
future
flask-socketio
flask-restful
flask-restful
gevent-websocket

View File

@ -16,12 +16,20 @@ api.connect(url, port, token, proxy)
print api.bearer
sleep(20)
api.getInstruments()
#api.getInstruments()
#api.subscribe('EUR/USD')
#api.subscribe('USD/JPY')
#api.subscribe('GBP/USD')
#api.getModel('Summary')
#api.subscribeModel('Summary')
#api.getModel(api.MODEL_SUMMARY)
#api.getModel(api.MODEL_OFFER)
api.subscribeModel(api.MODEL_OFFER)
api.updateSubscriptions('EUR/USD')
api.subscribe('EUR/USD')
#api.subscribeModel(api.MODEL_ACCOUNT)
input()

View File

@ -175,7 +175,7 @@ class FxcmApi(object):
"""查询表"""
uri = '/trading/get_model'
params = {'models': model}
reqid = self.sendReq(self.METHOD_GET, uri, params, self.onGetTable)
reqid = self.sendReq(self.METHOD_GET, uri, params, self.onGetModel)
return reqid
#----------------------------------------------------------------------
@ -184,7 +184,7 @@ class FxcmApi(object):
uri = '/subscribe'
params = {'pairs': symbol}
reqid = self.sendReq(self.METHOD_POST, uri, params, self.onSubscribe)
self.sio.on(symbol, self.onPriceUpdate)
self.sio.on(symbol, self.processPriceUpdate)
return reqid
#----------------------------------------------------------------------
@ -201,7 +201,7 @@ class FxcmApi(object):
uri = '/trading/subscribe'
params = {'models': model}
reqid = self.sendReq(self.METHOD_POST, uri, params, self.onSubscribeModel)
self.sio.on(model, self.onModelUpdate)
self.sio.on(model, self.processModelUpdate)
return reqid
#----------------------------------------------------------------------
@ -213,26 +213,41 @@ class FxcmApi(object):
return reqid
#----------------------------------------------------------------------
def openTrade(self, accountID, symbol, isBuy, amount, limit,
isInPips, atMarket, orderType, timeInForce,
rate=0, stop=0, trailingStep=0):
"""开仓交易"""
def updateSubscriptions(self, symbol):
"""订阅报价表"""
uri = '/trading/update_subscriptions'
params = {
'symbol': symbol,
'visible': 'true'
}
#params = {'symbol': symbol}
reqid = self.sendReq(self.METHOD_POST, uri, params, self.onUpdateSubscriptions)
return reqid
#----------------------------------------------------------------------
def openTrade(self, accountID, symbol, isBuy, amount,
atMarket, orderType, timeInForce,
rate=0, limit=0, stop=0,
trailingStep=0, isInPips=False):
"""市价开仓交易"""
uri = '/trading/open_trade'
params = {
'account_id': accountID,
'symbol': symbol,
'is_buy': isBuy,
'amount': amount,
'limit': limit,
'is_in_pips': isInPips,
'at_market': atMarket,
'order_type': orderType,
'time_in_force': timeInForce
'time_in_force': timeInForce,
'is_in_pips': isInPips
}
if rate:
params['rate'] = rate
if rate:
params['limit'] = limit
if stop:
params['stop'] = stop
@ -242,6 +257,36 @@ class FxcmApi(object):
reqid = self.sendReq(self.METHOD_POST, uri, params, self.onOpenTrade)
return reqid
#----------------------------------------------------------------------
def createEntryOrder(self, accountID, symbol, isBuy, rate,
amount, orderType, timeInForce,
limit=0, stop=0, trailingStep=0, isInPips=False):
"""限价开仓交易"""
uri = '/trading/create_entry_order'
params = {
'account_id': accountID,
'symbol': symbol,
'is_buy': isBuy,
'rate': rate,
'amount': amount,
'order_type': orderType,
'time_in_force': timeInForce,
'is_in_pips': isInPips
}
if rate:
params['limit'] = limit
if stop:
params['stop'] = stop
if trailingStep:
params['trailing_step'] = trailingStep
reqid = self.sendReq(self.METHOD_POST, uri, params, self.onOpenTrade)
return reqid
#----------------------------------------------------------------------
def closeTrade(self, tradeID, amount, atMarket, orderType, timeInForce, rate=0):
"""平仓交易"""
@ -306,9 +351,17 @@ class FxcmApi(object):
print data, reqid
#----------------------------------------------------------------------
def onGetTable(self, data, reqid):
def onGetModel(self, data, reqid):
"""查询表回调"""
print data, reqid
print '*' * 30
print data
for d in data['offers']:
#if str(d['currency']) == 'EUR/USD':
# print d
print d['currency']#, d['visible']
#print len(data['summary'])
#print data
#----------------------------------------------------------------------
def onSubscribe(self, data, reqid):
@ -330,6 +383,11 @@ class FxcmApi(object):
"""退订表回调"""
print data, reqid
#----------------------------------------------------------------------
def onUpdateSubscriptions(self, data, reqid):
"""订阅报价表回调"""
print data, reqid
#----------------------------------------------------------------------
def onOpenTrade(self, data, reqid):
"""开仓回调"""
@ -348,7 +406,20 @@ class FxcmApi(object):
#----------------------------------------------------------------------
def onDeleteOrder(self, data, reqid):
"""撤单回调"""
print data, reqid
print data, reqid
#----------------------------------------------------------------------
def processPriceUpdate(self, msg):
"""行情推送"""
data = json.loads(msg)
self.onPriceUpdate(data)
#----------------------------------------------------------------------
def processModelUpdate(self, msg):
"""表推送"""
print msg
data = json.loads(msg)
self.onModelUpdate(data)
#----------------------------------------------------------------------
def onPriceUpdate(self, data):
@ -358,6 +429,12 @@ class FxcmApi(object):
#----------------------------------------------------------------------
def onModelUpdate(self, data):
"""表推送"""
print data
print data
#print '*' * 30
#fsubscribeModel
#print len(data), data.get('isTotal', None), data
#print '*' * 30
#for d in data:
# print d

View File

@ -9,6 +9,23 @@ from vnpy.trader.vtGateway import *
from vnpy.trader.vtConstant import *
from vnpy.trader.vtFunction import getJsonPath
# 产品类型映射
productMapReverse = {}
productMapReverse[1] = PRODUCT_FOREX
productMapReverse[2] = PRODUCT_DEFER
productMapReverse[3] = PRODUCT_DEFER
productMapReverse[4] = PRODUCT_DEFER
productMapReverse[5] = PRODUCT_DEFER
productMapReverse[6] = PRODUCT_DEFER
productMapReverse[7] = PRODUCT_INDEX
# 委托状态映射
statusMapReverse = {}
statusMapReverse[1] = STATUS_NOTTRADED
statusMapReverse[3] = STATUS_CANCELLED
statusMapReverse[9] = STATUS_ALLTRADED
# 价格类型映射
priceTypeMap = {}
priceTypeMap[PRICETYPE_LIMITPRICE] = 'limit'
@ -72,6 +89,7 @@ class FxcmGateway(VtGateway):
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
#self.api.updateSubscriptions(subscribeReq.symbol)
self.api.subscribe(subscribeReq.symbol)
#----------------------------------------------------------------------
@ -136,12 +154,17 @@ class Api(FxcmApi):
self.accout = ''
self.orderDict = {} # 缓存委托数据
self.tradeDict = {}
self.accountDict = {}
self.positionDict = {}
#----------------------------------------------------------------------
def onConnect(self):
"""连接回调"""
self.writeLog(u'服务器连接成功')
self.getInstruments()
#----------------------------------------------------------------------
def onDisconnect(self):
"""断开回调"""
@ -151,7 +174,7 @@ class Api(FxcmApi):
def onError(self, error, reqid):
"""错误回调"""
err = VtErrorData()
err.gatewayName = self.gatewayNames
err.gatewayName = self.gatewayName
err.errorID = 0
err.errorTime = datetime.now().strftime('%H:%M:%S')
err.errorMsg = error
@ -161,29 +184,196 @@ class Api(FxcmApi):
def onGetInstruments(self, data, reqid):
"""查询合约代码回调"""
for d in data['data']['instrument']:
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = d['symbol']
contract.exchange = EXCHANGE_FXCM
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = contract.symbol
contract.productClass = PRODUCT_FOREX
contract.priceTick = 0.0001
contract.size = data['order']
self.gateway.onContract(contract)
if not d['visible']:
self.updateSubscriptions(d['symbol'])
self.writeLog(u'添加合约%s到交易表' %d['symbol'])
self.qryContracts()
self.qryOrders()
self.qryTrades()
self.qryPositions()
self.qryAccounts()
#----------------------------------------------------------------------
def onGetTable(self, data, reqid):
def onGetModel(self, data, reqid):
"""查询表回调"""
print data, reqid
if 'offers' in data:
for d in data['offers']:
self.processContracts(d)
self.writeLog(u'合约信息查询成功')
self.subscribeModel(self.MODEL_OFFER)
#self.updateSubscriptions('EUR/USD')
elif 'orders' in data:
for d in data['orders']:
self.processOrders(d)
self.writeLog(u'委托查询成功')
#self.subscribeModel(self.MODEL_ORDER)
elif 'closed_positions' in data:
for d in data['closed_positions']:
self.processTrades(d)
self.writeLog(u'已平成交查询成功')
#self.subscribeModel(self.MODEL_CLOSEDPOSITION)
elif 'open_positions' in data:
for d in data['open_positions']:
self.processTrades(d)
self.writeLog(u'未平成交查询成功')
#self.subscribeModel(self.MODEL_OPENPOSITION)
elif 'summary' in data:
for d in data['summary']:
self.processPositions(d)
self.writeLog(u'持仓查询成功')
#self.subscribeModel(self.MODEL_SUMMARY)
elif 'accounts' in data:
for d in data['accounts']:
self.processAccounts(d)
self.writeLog(u'账户查询成功')
#self.subscribeModel(self.MODEL_ACCOUNT)
#----------------------------------------------------------------------
def processContracts(self, d):
"""处理合约数据"""
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = d['currency']
contract.exchange = EXCHANGE_FXCM
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = contract.symbol
contract.productClass = productMapReverse.get(d['instrumentType'], PRODUCT_UNKNOWN)
contract.priceTick = d['pip'] * d['pipFraction']
contract.size = 1000
self.gateway.onContract(contract)
#----------------------------------------------------------------------
def processOrders(self, d):
"""处理委托"""
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = d['currency']
order.exchange = EXCHANGE_FXCM
order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.orderID = d['orderId']
order.vtOrderID = '.'.join([order.gatewayName, order.orderID])
if d['isBuy']:
order.price = d['buy']
order.direction = DIRECTION_LONG
else:
order.price = d['sell']
order.direction = DIRECTION_SHORT
order.totalVolume = d['amountK']
order.status = statusMapReverse.get(d['status'], STATUS_UNKNOWN)
if order.status == STATUS_ALLTRADED:
order.tradedVolume = order.totalVolume
order.orderTime = d['time']
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def processTrades(self, d):
"""处理成交"""
if 'isTotal' in d:
return
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = d['currency']
trade.exchange = EXCHANGE_FXCM
trade.vtSymbol = '.'.join([trade.symbol, trade.exchange])
trade.tradeID = d['tradeId']
trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID])
if d['isBuy']:
trade.direction = DIRECTION_LONG
else:
trade.direction = DIRECTION_SHORT
trade.price = d['open']
trade.volume = d['amountK']
if 'time' in d:
trade.tradeTime = d['time']
else:
trade.tradeTime = d['openTime']
self.gateway.onTrade(trade)
#----------------------------------------------------------------------
def processAccounts(self, d):
"""处理资金"""
if 'isTotal' in d:
return
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = d['accountId']
account.vtAccountID = '.'.join([account.gatewayName, account.accountID])
account.balance = d['equity']
account.available = d['usableMargin']
account.margin = d['usdMr']
account.positionProfit = d['grossPL']
self.gateway.onAccount(account)
#----------------------------------------------------------------------
def processPositions(self, d):
"""处理持仓"""
if 'isTotal' in d:
return
# 多头
position = VtPositionData()
position.gatewayName = self.gatewayName
position.symbol = d['currency']
position.exchange = EXCHANGE_FXCM
position.vtSymbol = '.'.join([position.symbol, position.exchange])
position.direction = DIRECTION_LONG
position.vtPositionName = '.'.join([position.vtSymbol, position.direction])
position.position = d['amountKBuy']
position.price = d['avgBuy']
position.positionProfit = d['plBuy']
self.gateway.onPosition(position)
# 空头
position = VtPositionData()
position.gatewayName = self.gatewayName
position.symbol = d['currency']
position.exchange = EXCHANGE_FXCM
position.vtSymbol = '.'.join([position.symbol, position.exchange])
position.direction = DIRECTION_SHORT
position.vtPositionName = '.'.join([position.vtSymbol, position.direction])
position.position = d['amountKSell']
position.price = d['avgSell']
position.positionProfit = d['plSell']
self.gateway.onPosition(position)
#----------------------------------------------------------------------
def onSubscribe(self, data, reqid):
"""订阅行情回调"""
symbol = data['pairs']['symbol']
symbol = data['pairs'][0]['Symbol']
self.writeLog(u'%s行情订阅成功' %symbol)
#----------------------------------------------------------------------
@ -200,7 +390,12 @@ class Api(FxcmApi):
def onUnsubscribeModel(self, data, reqid):
"""退订表回调"""
pass
#----------------------------------------------------------------------
def onUpdateSubscriptions(self, data, reqid):
"""订阅表回调"""
self.writeLog(u'订阅表更新%s' %data)
#----------------------------------------------------------------------
def onOpenTrade(self, data, reqid):
"""开仓回调"""
@ -224,12 +419,38 @@ class Api(FxcmApi):
#----------------------------------------------------------------------
def onPriceUpdate(self, data):
"""行情推送"""
print data
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = data['Symbol']
tick.exchange = EXCHANGE_FXCM
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
rates = data['Rates']
tick.bidPrice1 = rates[0]
tick.askPrice1 = rates[1]
tick.lastPrice = (tick.bidPrice1 + tick.askPrice1)/2
tick.highPrice = rates[2]
tick.lowPrice = rates[3]
tick.time = data['Updated']
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onModelUpdate(self, data):
"""表推送"""
print data
print data
if 'orderId' in data:
self.processOrders(data)
elif 'tradeId' in data:
self.processTrades(data)
elif 'avgBuy' in data:
self.processPositions(data)
elif 'accountId' in data:
self.processAccounts(data)
else:
print data
#----------------------------------------------------------------------
def init(self, account, url, port, token, proxy):
@ -247,44 +468,55 @@ class Api(FxcmApi):
self.gateway.onLog(log)
#----------------------------------------------------------------------
def qryInstruments(self):
def qryContracts(self):
"""查询合约"""
pass
self.getModel(self.MODEL_OFFER)
#----------------------------------------------------------------------
def qryOrders(self):
"""查询委托"""
pass
self.getModel(self.MODEL_ORDER)
#----------------------------------------------------------------------
def qryTrades(self):
"""查询成交"""
pass
self.getModel(self.MODEL_CLOSEDPOSITION)
self.getModel(self.MODEL_OPENPOSITION)
#----------------------------------------------------------------------
def sendOrder_(self, orderReq):
def qryAccounts(self):
"""查询资金"""
self.getModel(self.MODEL_ACCOUNT)
#----------------------------------------------------------------------
def qryPositions(self):
"""查询持仓"""
self.getModel(self.MODEL_SUMMARY)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发送委托"""
accountID = self.account
symbol = orderReq.symbol
if orderReq.direction == DIRECTION_LONG:
isBuy = True
isBuy = 'true'
else:
isBuy = False
isBuy = 'false'
amount = orderReq.volume
rate = orderReq.price
self.openTrade(accountID, symbol, isBuy, amount, limit, isInPips,
atMarket, orderType, timeInForce)
if orderReq.priceType == PRICETYPE_MARKETPRICE:
reqid = self.openTrade(accountID, symbol, isBuy, amount,
0, 'AtMarket', 'DAY')
else:
reqid = self.createEntryOrder(accountID, symbol, isBuy, rate,
amount, 'AtMarket', 'DAY')
return reqid
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤销委托"""
pass
#----------------------------------------------------------------------
def getTime(t):
"""把OANDA返回的时间格式转化为简单的时间字符串"""
return t[11:19]
self.deleteOrder(cancelOrderReq.orderID)