diff --git a/vnpy/api/fxcm/demo.py b/vnpy/api/fxcm/demo.py new file mode 100644 index 00000000..f51fa03b --- /dev/null +++ b/vnpy/api/fxcm/demo.py @@ -0,0 +1,425 @@ +""" +FXCM Trading RESTful API example. +""" +import json +import time +import requests +from socketIO_client import SocketIO +from prompt_toolkit import prompt +from threading import Thread +import signal +import sys +import io +from datetime import datetime + +socketIO = None +UPDATES = {} +SYMBOLS = {} +COLLECTIONS = None +ACCESS_TOKEN = "48055b5d9afac0a300143ac067ce04cd2430a434" +TRADING_API_URL = 'https://api-demo.fxcm.com:443' +WEBSOCKET_PORT = 443 + + +list = ['Offer','Account','Order','OpenPosition','Summary','Properties', 'ClosedPosition'] +order_list = [] +logFile = "H:/Projects/RESTAPI-master/nodejs/PythonLog.txt" + +def timestamp(): + output = str(datetime.now().strftime('%Y%m%d-%H:%M:%S.%f')[:-3]) + return output + +def logging(mess, t): + ts = timestamp() + + with io.FileIO(logFile, "a") as file: + file.write('\n' +ts + ": " +t + " ==>" +'\n') + for key in mess: + file.write(str(key) +" - " +str(mess[key]) +'\n') + +def request_processor(method, params): + """ Trading server request help function. """ + + headers = { + 'User-Agent': 'request', + 'Authorization': bearer_access_token, + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded' + } + rresp = requests.get(TRADING_API_URL + method, headers=headers, params=params) + if rresp.status_code == 200: + data = rresp.json() + if data["response"]["executed"] is True: + return True, data + return False, data["response"]["error"] + else: + return False, rresp.status_code + +def post_request_processor(method, params): + """ Trading server request help function. """ + + headers = { + 'User-Agent': 'request', + 'Authorization': bearer_access_token, + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded' + } + rresp = requests.post(TRADING_API_URL + method, headers=headers, data=params) + if rresp.status_code == 200: + data = rresp.json() + if data["response"]["executed"] is True: + return True, data + return False, data["response"]["error"] + else: + return False, rresp.status_code + +def create_bearer_token(t,s): + bt = "Bearer " +s +t + return bt + +def on_price_update(msg): + md = json.loads(msg) + SYMBOLS[md["Symbol"]] = md + print(SYMBOLS, "SYMBOLS") + #logging(SYMBOLS, "SYMBOLS") + +def on_message(msg): + message = json.loads(msg) + UPDATES[message["t"]] = message + print(UPDATES, "UPDATES") + #logging(UPDATES, "UPDATES") + +def on_error(ws, error): + print (error) + +def on_close(): + print ('Websocket closed.') + +def on_connect(): + ### Get models + status, response = request_processor('/trading/get_model', {'models': list}) + if status is True: + COLLECTIONS = response + print("*" * 50) + print (COLLECTIONS["accounts"]) + print("*" * 50) + else: + print(status) + print ("Error processing request /trading/get_model: " +str(response)) + ### Subscribe message updates + status, response = post_request_processor('/trading/subscribe', {'models': order_list}) + if status is True: + for item in order_list: + socketIO.on(item, on_message) + else: + print ("Error processing request: /trading/subscribe: " + response) + + thr = Thread(target=cli, args=(1, COLLECTIONS)) + try: + thr.setDaemon(True) + thr.start() + socketIO.wait() + except (KeyboardInterrupt, SystemExit): + thr.join(0) + sys.exit(1) + + +def subscribe_prices(symbol): + status, response = post_request_processor('/subscribe', {'pairs': symbol}) + if status is True: + socketIO.on(symbol, on_price_update) + else: + print ("Error processing request: /subscribe: " + response) + +def unsubscribe_prices(symbol): + status, response = post_request_processor('/unsubscribe', {'pairs': symbol}) + if status is True: + socketIO.on(symbol, on_price_update) + else: + print ("Error processing request: /unsubscribe: " + response) + +def cli(args, COLLECTIONS): + while True: + try: + inp = prompt(u'> ') + except KeyboardInterrupt: + print ("Press Ctrl+c again to quit.") + sys.exit(1) + if not inp: + continue + try: + if len(inp) > 3 and inp[0:3] == 'run': + cmd = json.loads(inp[4:]) + command = cmd["c"] + if command == 'book': + for symbol in SYMBOLS: + price = SYMBOLS[symbol] + print (price) + + elif command == 'show': + if cmd["opt"] == 'offers': + for offer in COLLECTIONS['offers']: + ###print ('{}, {}, {}, {}, {}'.format(offer['currency'], offer['sell'], offer['buy'], offer['offerId'], offer['ratePrecision'])) + print (offer) + + elif cmd["opt"] == 'orders': + for order in COLLECTIONS['orders']: + print (order) + + elif cmd["opt"] == 'updates': + for obj in UPDATES: + print (obj) + + elif cmd["opt"] == 'accounts': + for obj in COLLECTIONS['accounts']: + print (obj) + + elif cmd["opt"] == 'open_positions': + for obj in COLLECTIONS['open_positions']: + print (obj) + + elif cmd["opt"] == 'closed_positions': + for obj in COLLECTIONS['closed_positions']: + print (obj) + + elif cmd["opt"] == 'summary': + for obj in COLLECTIONS['summary']: + print (obj) + + elif cmd["opt"] == 'properties': + for obj in COLLECTIONS['properties']: + print (obj) + + elif cmd["opt"] == 'LeverageProfile': + for obj in COLLECTIONS['LeverageProfile']: + print (obj) + + elif command == 'subscribe': + for symbol in cmd["opt"]: + subscribe_prices(symbol) + + elif command == 'unsubscribe': + for symbol in cmd["opt"]: + unsubscribe_prices(symbol) + + elif command == 'market_order': + status, response = post_request_processor('/trading/open_trade', { + 'account_id': cmd['account_id'], + 'symbol': cmd['symbol'], + 'side': cmd['side'], + 'amount': cmd['amount'], + 'at_market': 0, + 'order_type': 'AtMarket', + 'time_in_force': cmd['time_in_force'] + }) + if status is True: + print ('market_order has been executed: {}'.format(response)) + else: + print ('market_order execution error: {}'.format(response)) + + elif command == 'market_range': + status, response = post_request_processor('/trading/open_trade', { + 'account_id': cmd['account_id'], + 'symbol': cmd['symbol'], + 'side': cmd['side'], + 'amount': cmd['amount'], + 'order_type': 'MarketRange', + 'at_market': 1 + }) + if status is True: + print ('market_order has been executed: {}'.format(response)) + else: + print ('market_order execution error: {}'.format(response)) + + elif command == 'close_trade': + status, response = post_request_processor('/trading/close_trade', { + 'trade_id': cmd['trade_id'], + 'rate': 0, + 'amount': cmd['amount'], + 'at_market': 0, + 'order_type': 'AtMarket', + 'time_in_force': cmd['time_in_force'] + }) + if status is True: + print ('close_trade has been executed: {}'.format(response)) + else: + print ('close_trade execution error: {}'.format(response)) + + elif command == 'create_entry_order': + status, response = post_request_processor('/trading/create_entry_order', { + 'account_id': cmd['account_id'], + 'symbol': cmd['symbol'], + 'is_buy': cmd['side'], + 'rate': cmd['rate'], + 'is_in_pips': 0, + 'amount': cmd['amount'], + 'order_type': 'AtMarket', + 'time_in_force': cmd['time_in_force'] + }) + if status is True: + print ('create_entry_order has been executed: {}'.format(response)) + else: + print ('create_entry_order execution error: {}'.format(response)) + + elif command == 'change_order': + status, response = post_request_processor('/trading/change_order', { + 'order_id': cmd['order_id'], + 'rate': cmd['rate'], + 'range': cmd['range'], + 'amount': cmd['amount'], + 'trailing_step': cmd['trailing_step'] + }) + if status is True: + print ('change_order has been executed: {}'.format(response)) + else: + print ('change_order execution error: {}'.format(response)) + + elif command == 'delete_order': + status, response = post_request_processor('/trading/delete_order', { + 'order_id': cmd['order_id'] + }) + if status is True: + print ('delete_order has been executed: {}'.format(response)) + else: + print ('delete_order execution error: {}'.format(response)) + + elif command == 'get_instruments': + status, response = request_processor('/trading/get_instruments', {}) + if status is True: + print ('get_instruments has been executed: {}'.format(response)) + else: + print ('get_instruments execution error: {}'.format(response)) + + elif command == 'simple_oco': + status, response = request_processor('/trading/simple_oco', { + 'account_id': cmd['account_id'], + 'symbol': cmd['symbol'], + 'amount': cmd['amount'], + 'time_in_force': cmd['time_in_force'], + 'is_buy': cmd['is_buy'], + 'rate': cmd['rate'], + 'is_buy2': cmd['is_buy2'], + 'rate2': cmd['rate2'] + }) + if status is True: + print ('simple_oco has been executed: {}'.format(response)) + else: + print ('simple_oco execution error: {}'.format(response)) + + elif command == 'add_to_oco': + status, response = request_processor('/trading/add_to_oco', { + orderIds: cmd.orderIds, + ocoBulkId: cmd.ocoBulkId + }) + if status is True: + print ('add_to_oco has been executed: {}'.format(response)) + else: + print ('add_to_oco execution error: {}'.format(response)) + + elif command == 'remove_from_oco': + status, response = request_processor('/trading/remove_from_oco', { + orderIds: cmd.orderIds + }) + if status is True: + print ('remove_from_oco has been executed: {}'.format(response)) + else: + print ('remove_from_oco execution error: {}'.format(response)) + + elif command == 'edit_oco': + status, response = request_processor('/trading/edit_oco', { + ocoBulkId: cmd.ocoBulkId, + addOrderIds: cmd.addOrderIds, + removeOrderIds: cmd.removeOrderIds + }) + if status is True: + print ('edit_oco has been executed: {}'.format(response)) + else: + print ('edit_oco execution error: {}'.format(response)) + + elif command == 'change_trade_stop_limit': + status, response = request_processor('/trading/change_trade_stop_limit', { + trade_id: cmd.trade_id, + is_stop: cmd.is_stop, + rate: cmd.rate, + is_in_pips: cmd.is_in_pips, + trailing_step: cmd.trailing_step + }) + if status is True: + print ('change_trade_stop_limit has been executed: {}'.format(response)) + else: + print ('change_trade_stop_limit execution error: {}'.format(response)) + + elif command == 'change_order_stop_limit': + status, response = request_processor('/trading/change_order_stop_limit', { + order_id: cmd.order_id, + is_stop: cmd.is_stop, + rate: cmd.rate, + is_in_pips: cmd.is_in_pips, + trailing_step: cmd.trailing_step + }) + if status is True: + print ('change_order_stop_limit has been executed: {}'.format(response)) + else: + print ('change_order_stop_limit execution error: {}'.format(response)) + + elif command == 'change_net_stop_limit': + status, response = request_processor('/trading/change_net_stop_limit', { + account_id: cmd.account_id, + symbol: cmd.symbol, + is_buy: cmd.is_buy, + is_stop: cmd.is_stop, + rate: cmd.rate, + trailing_step: cmd.trailing_step + }) + if status is True: + print ('change_net_stop_limit has been executed: {}'.format(response)) + else: + print ('change_net_stop_limit execution error: {}'.format(response)) + + elif command == 'close_all_for_symbol': + status, response = request_processor('/trading/close_all_for_symbol', { + account_id: cmd.account_id, + forSymbol: cmd.forSymbol, + symbol: cmd.symbol, + order_type: cmd.order_type, + time_in_force: cmd.time_in_force + }) + if status is True: + print ('close_all_for_symbol has been executed: {}'.format(response)) + else: + print ('close_all_for_symbol execution error: {}'.format(response)) + + elif command == 'candles': + req="/"+command+"/"+str(cmd['instrumentid'])+"/"+str(cmd['periodid']) + status, response = request_processor(req, { + 'num': cmd['num'], + 'from': cmd['from'], + 'to': cmd['to'] + }) + if status is True: + print ('has been executed: {}'.format(response)) + else: + print ('execution error: {}'.format(response)) + + + else: + print ("Unknown command: " + command) + else: + print ('Unknown command.') + except ValueError: + print ('Invalid command') +### Main + +if __name__ == '__main__': + #signal.getsignal(signal.SIGINT, exit_gracefully) + + COLLECTIONS = {} + print '000' + socketIO = SocketIO(TRADING_API_URL, WEBSOCKET_PORT, params={'access_token': ACCESS_TOKEN, 'agent': "leiwang-rest-api"}) + print 1 + #print (socketIO._engineIO_session.id) + bearer_access_token = create_bearer_token(ACCESS_TOKEN, socketIO._engineIO_session.id) + print (bearer_access_token) + socketIO.on('disconnec', on_close) + socketIO.on('connect', on_connect) + socketIO.wait() diff --git a/vnpy/api/fxcm/vnfxcm.py b/vnpy/api/fxcm/vnfxcm.py new file mode 100644 index 00000000..a715b25b --- /dev/null +++ b/vnpy/api/fxcm/vnfxcm.py @@ -0,0 +1,352 @@ +# encoding: UTF-8 + +import json +import requests +from socketIO_client import SocketIO +from threading import Thread +from queue import Queue, Empty +from datetime import datetime + + +######################################################################## +class FxcmApi(object): + """FXCM""" + API_URL = 'https://api-demo.fxcm.com:443' + WEBSOCKET_PORT = 443 + METHOD_GET = 'get' + METHOD_POST = 'post' + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.url = '' + self.port = '' + self.token = '' + + self.sio = None + self.bearer = '' + self.headers = None + + self.queue = Queue() + self.reqid = 0 + self.active = False + self.thread = Thread(target=self.run) + + #---------------------------------------------------------------------- + def connect(self, url, port, token): + """连接""" + self.url = url + self.port = port + self.token = token + + self.initSocketIO() + self.generateBearer() + self.generateHeaders() + + self.active = True + self.thread.start() + + #---------------------------------------------------------------------- + def stop(self): + """停止""" + self.active = False + self.thread.join() + + #---------------------------------------------------------------------- + def initSocketIO(self): + """初始化SocketIO客户端""" + params = { + 'access_token': self.token, + 'agent': "leiwang-rest-api" + } + self.sio = SocketIO(self.url, self.port, params=params) + + self.sio.on('connect', self.onConnect) + self.sio.on('disconnect', self.onDisconnect) + + #---------------------------------------------------------------------- + def generateBearer(self): + """创建通讯授权码""" + self.bearer = "Bearer " + self.sio._engineIO_session.id + self.token + + #---------------------------------------------------------------------- + def generateHeaders(self): + """生成通讯头部""" + self.headers = { + 'User-Agent': 'request', + 'Authorization': self.bearer, + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded' + } + + #---------------------------------------------------------------------- + def run(self): + """连续运行""" + while self.active: + try: + d = self.queue.get(timeout=1) + self.processReq(d) + except Empty: + pass + + #---------------------------------------------------------------------- + def sendReq(self, method, uri, params, callback): + """发出请求""" + self.reqid += 1 + + d = { + 'method': method, + 'uri': uri, + 'params': params, + 'callback': callback, + 'reqid': self.reqid + } + + self.queue.put(d) + + return self.reqid + + #---------------------------------------------------------------------- + def processReq(self, d): + """处理请求""" + method = d['method'] + uri = d['uri'] + params = d['params'] + callback = d['callback'] + reqid = d['reqid'] + + url = self.url + uri + + if method == self.METHOD_GET: + resp = requests.get(url, headers=self.headers, params=params) + elif method == self.METHOD_POST: + resp = requests.post(url, headers=self.headers, params=params) + + if resp.status_code == 200: + data = resp.json() + if data["response"]["executed"] is True: + callback(data, reqid) + return + self.onError(data["response"]["error"], reqid) + else: + self.onError(u'HTTP请求失败,错误代码%s' %resp.status_code) + + #---------------------------------------------------------------------- + def onConnect(self): + """连接回调""" + print 'onConnect' + + #---------------------------------------------------------------------- + def onDisconnect(self): + """断开回调""" + print 'onClose' + + #---------------------------------------------------------------------- + def onError(self, error, reqid): + """错误回调""" + print 'onError', error + + #---------------------------------------------------------------------- + def getInstruments(self): + """查询合约代码""" + uri = '/trading/get_instruments' + reqid = self.sendReq(self.METHOD_GET, uri, {}, self.onGetInstruments) + return reqid + + #---------------------------------------------------------------------- + def getModel(self, model): + """查询表""" + uri = '/trading/get_model' + params = {'models': model} + reqid = self.sendReq(self.METHOD_GET, uri, params, self.onGetModel) + return reqid + + #---------------------------------------------------------------------- + def subscribe(self, symbol): + """订阅行情""" + uri = '/subscribe' + params = {'pairs': symbol} + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onSubscribe) + self.sio.on(pair, self.onPriceUpdate) + return reqid + + #---------------------------------------------------------------------- + def unsubscribe(self, symbol): + """退订行情""" + uri = '/unsubscribe' + params = {'pairs': symbol} + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onUnsubscribe) + return reqid + + #---------------------------------------------------------------------- + def subscribeTable(self, model): + """订阅表""" + uri = '/trading/subscribe' + params = {'models': model} + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onSubscribeTable) + self.sio.on(model, self.onTableUpdate) + return reqid + + #---------------------------------------------------------------------- + def unsubscribeTable(self, model): + """退订表""" + uri = '/trading/unsubscribe' + params = {'models': model} + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onUnsubscribeTable) + return reqid + + #---------------------------------------------------------------------- + def openTrade(self, accountID, symbol, isBuy, amount, limit, + isInPips, atMarket, orderType, timeInForce, + rate=0, stop=0, trailingStep=0): + """开仓交易""" + uri = '/trading/open_trade' + params = { + 'account_id': accountID, + 'symbol': symbol, + 'is_buy': isBuy, + 'amount': amount, + 'limit': limit, + 'is_in_pips': isInPips, + 'at_market': atMarket, + 'order_type': orderType, + 'time_in_force': timeInForce + } + + if rate: + params['rate'] = rate + + if stop: + params['stop'] = stop + + if trailingStep: + params['trailing_step'] = trailingStep + + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onOpenTrade) + return reqid + + #---------------------------------------------------------------------- + def closeTrade(self, tradeID, amount, atMarket, orderType, timeInForce, rate=0): + """平仓交易""" + uri = '/trading/close_trade' + params = { + 'trade_id': tradeID, + 'amount': amount, + 'at_market': atMarket, + 'order_type': orderType, + 'time_in_force': timeInForce + } + + if rate: + params['rate'] = rate + + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onCloseTrade) + return reqid + + #---------------------------------------------------------------------- + def changeOrder(self, orderID, rate, range_, amount, trailingStep=0): + """修改委托""" + uri = '/trading/change_order' + params = { + 'order_id': orderID, + 'rate': rate, + 'range': range_, + 'amount': amount + } + + if trailingStep: + params['trailing_step'] = trailingStep + + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onChangeOrder) + return reqid + + #---------------------------------------------------------------------- + def deleteOrder(self, orderID): + """撤销委托""" + uri = '/trading/delete_order' + params = {'order_id': orderID} + reqid = self.sendReq(self.METHOD_POST, uri, params, self.onDeleteOrder) + return reqid + + #---------------------------------------------------------------------- + def onGetInstruments(self, data, reqid): + """查询合约代码回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onGetTable(self, data, reqid): + """查询表回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onSubscribe(self, data, reqid): + """订阅行情回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onUnsubscribe(self, data, reqid): + """退订行情回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onSubscribeTable(self, data, reqid): + """订阅表回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onUnsubscribeTable(self, data, reqid): + """退订表回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onOpenTrade(self, data, reqid): + """开仓回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onCloseTrade(self, data, reqid): + """平仓回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onChangeOrder(self, data, reqid): + """改单回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onDeleteOrder(self, data, reqid): + """撤单回调""" + print data, reqid + + #---------------------------------------------------------------------- + def onPriceUpdate(self, data): + """行情推送""" + print data + + #---------------------------------------------------------------------- + def onTableUpdate(self, data): + """表推送""" + print data + + + + + + + +if __name__ == '__main__': + url = 'https://api-demo.fxcm.com:443' + port = 443 + token = '48055b5d9afac0a300143ac067ce04cd2430a434' + + api = FxcmApi() + print 'api created' + + api.connect(url, port, token) + print api.bearer + + api.getInstruments() + + input() + + \ No newline at end of file