完成策略引擎、策略模板,还缺策略DEMO、回测引擎。

This commit is contained in:
WOLF 2015-05-21 15:14:20 +08:00
parent e851cf8a76
commit 6e622a3e23
18 changed files with 10221 additions and 0 deletions

11
vn.strategy/README.md Normal file
View File

@ -0,0 +1,11 @@
#vn.strategy介绍
##2015/5/21
该模块主要包含了一个适用于CTA类策略时间序列型的策略引擎和策略模板。
目前尚未完成,仅供参考。

View File

@ -0,0 +1,604 @@
# encoding: UTF-8
from datetime import datetime
from pymongo import Connection
from pymongo.errors import *
from eventEngine import *
# 常量定义
OFFSET_OPEN = '0' # 开仓
OFFSET_CLOSE = '1' # 平仓
DIRECTION_BUY = '0' # 买入
DIRECTION_SELL = '1' # 卖出
PRICETYPE_LIMIT = '2' # 限价
########################################################################
class Tick:
"""Tick数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.openPrice = 0 # OHLC
self.highPrice = 0
self.lowPrice = 0
self.lastPrice = 0
self.volume = 0 # 成交量
self.openInterest = 0 # 持仓量
self.upperLimit = 0 # 涨停价
self.lowerLimit = 0 # 跌停价
self.time = '' # 更新时间和毫秒
self.ms= 0
self.bidPrice1 = 0 # 深度行情
self.bidPrice2 = 0
self.bidPrice3 = 0
self.bidPrice4 = 0
self.bidPrice5 = 0
self.askPrice1 = 0
self.askPrice2 = 0
self.askPrice3 = 0
self.askPrice4 = 0
self.askPrice5 = 0
self.bidVolume1 = 0
self.bidVolume2 = 0
self.bidVolume3 = 0
self.bidVolume4 = 0
self.bidVolume5 = 0
self.askVolume1 = 0
self.askVolume2 = 0
self.askVolume3 = 0
self.askVolume4 = 0
self.askVolume5 = 0
########################################################################
class Trade:
"""成交数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.orderRef = '' # 报单号
self.tradeID = '' # 成交编号
self.direction = None # 方向
self.offset = None # 开平
self.price = 0 # 成交价
self.volume = 0 # 成交量
########################################################################
class Order:
"""报单数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.orderRef = '' # 报单编号
self.direction = None # 方向
self.offset = None # 开平
self.price = 0 # 委托价
self.volumeOriginal = 0 # 报单量
self.volumeTraded = 0 # 已成交数量
self.insertTime = '' # 报单时间
self.cancelTime = '' # 撤单时间
self.frontID = 0 # 前置机编号
self.sessionID = 0 # 会话编号
self.status = '' # 报单状态代码
########################################################################
class StopOrder:
"""
停止单对象
用于实现价格突破某一水平后自动追入
即通常的条件单和止损单
"""
#----------------------------------------------------------------------
def __init__(self, symbol, direction, offset, price, volume, strategy):
"""Constructor"""
self.symbol = symbol
self.direction = direction
self.offset = offset
self.price = price
self.volume = volume
self.strategy = strategy
########################################################################
class StrategyEngine(object):
"""策略引擎"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, mainEngine):
"""Constructor"""
self.__eventEngine = eventEngine
self.mainEngine = mainEngine
# 获取代表今日的datetime
t = datetime.today()
self.today = t.replace(hour=0, minute=0, second=0, microsecond=0)
# 保存所有报单数据的字典
self.__dictOrder = {}
# 保存策略对象的字典
# key为策略名称
# value为策略对象
self.dictStrategy = {}
# 保存合约代码和策略对象映射关系的字典
# key为合约代码
# value为交易该合约的策略列表
self.__dictSymbolStrategy = {}
# 保存报单编号和策略对象映射关系的字典
# key为报单编号
# value为策略对象
self.__dictOrderRefStrategy = {}
# 保存合约代码和相关停止单的字典
# key为合约代码
# value为该合约相关的停止单列表
self.__dictStopOrder = {}
# MongoDB数据库相关
self.__mongoConnected = False
self.__mongoConnection = None
self.__mongoTickDB = None
# 调用函数
self.__connectMongo()
self.createStrategy()
self.__registerEvent()
#----------------------------------------------------------------------
def createStrategy(self):
"""创建策略"""
pass
#----------------------------------------------------------------------
def __connectMongo(self):
"""连接MongoDB数据库"""
try:
self.__mongoConnection = Connection()
self.__mongoConnected = True
self.__mongoTickDB = self.__mongoConnection['TickDB']
self.writeLog(u'策略引擎连接MongoDB成功')
except ConnectionFailure:
self.writeLog(u'策略引擎连接MongoDB失败')
#----------------------------------------------------------------------
def __recordTick(self, data):
"""将Tick数据插入到MongoDB中"""
if self.__mongoConnected:
symbol = data['InstrumentID']
data['date'] = self.today
self.__mongoTickDB[symbol].insert(data)
#----------------------------------------------------------------------
def loadTick(self, symbol, dt):
"""从MongoDB中读取Tick数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
cx = collection.find({'date':{'$gte':dt}})
return cx
else:
return None
#----------------------------------------------------------------------
def __updateMarketData(self, event):
"""行情更新"""
data = event.dict_['data']
symbol = data['InstrumentID']
# 检查是否存在交易该合约的策略
if symbol in self.__dictSymbolStrategy:
# 创建TICK数据对象并更新数据
tick = Tick(symbol)
tick.openPrice = data['OpenPrice']
tick.highPrice = data['HighestPrice']
tick.lowPrice = data['LowestPrice']
tick.lastPrice = data['LastPrice']
tick.volume = data['Volume']
tick.openInterest = data['OpenInterest']
tick.upperLimit = data['UpperLimitPrice']
tick.lowerLimit = data['LowerLimitPrice']
tick.time = data['UpdateTime']
tick.ms = data['UpdateMillisec']
tick.bidPrice1 = data['BidPrice1']
tick.bidPrice2 = data['BidPrice2']
tick.bidPrice3 = data['BidPrice3']
tick.bidPrice4 = data['BidPrice4']
tick.bidPrice5 = data['BidPrice5']
tick.askPrice1 = data['AskPrice1']
tick.askPrice2 = data['AskPrice2']
tick.askPrice3 = data['AskPrice3']
tick.askPrice4 = data['AskPrice4']
tick.askPrice5 = data['AskPrice5']
tick.bidVolume1 = data['BidVolume1']
tick.bidVolume2 = data['BidVolume2']
tick.bidVolume3 = data['BidVolume3']
tick.bidVolume4 = data['BidVolume4']
tick.bidVolume5 = data['BidVolume5']
tick.askVolume1 = data['AskVolume1']
tick.askVolume2 = data['AskVolume2']
tick.askVolume3 = data['AskVolume3']
tick.askVolume4 = data['AskVolume4']
tick.askVolume5 = data['AskVolume5']
# 首先检查停止单是否需要发出
self.__processStopOrder(tick)
# 将该TICK数据推送给每个策略
for strategy in self.__dictSymbolStrategy[symbol]:
strategy.onTick(tick)
# 将数据插入MongoDB数据库实盘建议另开程序记录TICK数据
self.__recordTick(data)
#----------------------------------------------------------------------
def __processStopOrder(self, tick):
"""处理停止单"""
symbol = tick.symbol
lastPrice = tick.lastPrice
upperLimit = tick.upperLimit
lowerLimit = tick.lowerLimit
# 如果当前有该合约上的止损单
if symbol in self.__dictStopOrder:
# 获取止损单列表
listSO = self.__dictStopOrder[symbol] # SO:stop order
# 准备一个空的已发止损单列表
listSent = []
for so in listSO:
# 如果是买入停止单,且最新成交价大于停止触发价
if so.direction == DIRECTION_BUY and lastPrice >= so.price:
# 以当日涨停价发出限价单买入
ref = self.sendOrder(symbol, DIRECTION_BUY, so.offset,
upperLimit, so.volume, strategy)
# 触发策略的止损单发出更新
so.strategy.onStopOrder(ref)
# 将该止损单对象保存到已发送列表中
listSent.append(so)
# 如果是卖出停止单,且最新成交价小于停止触发价
elif so.direction == DIRECTION_SELL and lastPrice <= so.price:
ref = self.sendOrder(symbol, DIRECTION_SELL, so.offset,
lowerLimit, so.volume, strategy)
so.strategy.onStopOrder(ref)
listSent.append(so)
# 从停止单列表中移除已经发单的停止单对象
if listSent:
for so in listSent:
listSO.remove(so)
# 检查停止单列表是否为空,若为空,则从停止单字典中移除该合约代码
if not listSO:
del self.__dictStopOrder[symbol]
#----------------------------------------------------------------------
def __updateOrder(self, event):
"""报单更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
# 检查是否存在监听该报单的策略
if orderRef in self.__dictOrderRefStrategy:
# 创建Order数据对象
order = Order(data['InstrumentID'])
order.orderRef = data['OrderRef']
order.direction = data['Direction']
order.offset = data['ComboOffsetFlag']
order.price = data['LimitPrice']
order.volumeOriginal = data['VolumeTotalOriginal']
order.volumeTraded = data['VolumeTraded']
order.insertTime = data['InsertTime']
order.cancelTime = data['CancelTime']
order.frontID = data['FrontID']
order.sessionID = data['SessionID']
order.status = data['OrderStatus']
# 推送给策略
strategy = self.__dictOrderRefStrategy[orderRef]
strategy.onOrder(order)
# 记录该Order的数据
self.__dictOrder[orderRef] = data
#----------------------------------------------------------------------
def __updateTrade(self, event):
"""成交更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
if orderRef in self.__dictOrderRefStrategy:
# 创建Trade数据对象
trade = Trade(data['InstrumentID'])
trade.orderRef = orderRef
trade.tradeID = data['TradeID']
trade.direction = data['Direction']
trade.offset = data['OffsetFlag']
trade.price = data['Price']
trade.volume = data['Volume']
# 推送给策略
strategy = self.__dictOrderRefStrategy[orderRef]
strategy.onTrade(trade)
#----------------------------------------------------------------------
def sendOrder(self, symbol, direction, offset, price, volume, strategy):
"""
发单仅允许限价单
symbol合约代码
direction方向DIRECTION_BUY/DIRECTION_SELL
offset开平OFFSET_OPEN/OFFSET_CLOSE
price下单价格
volume下单手数
strategy策略对象
"""
contract = self.mainEngine.selectInstrument(symbol)
if contract:
ref = self.mainEngine.sendOrder(symbol,
contract['ExchangeID'],
price,
PRICETYPE_LIMIT,
volume,
direction,
offset)
self.__dictOrderRefStrategy[ref] = strategy
return ref
#----------------------------------------------------------------------
def cancelOrder(self, orderRef):
"""
撤单
"""
order = self.__dictOrder[orderRef]
symbol = order['InstrumentID']
contract = self.mainEngine.selectInstrument(symbol)
if contract:
self.mainEngine.cancelOrder(symbol,
contract['ExchangeID'],
orderRef,
order['FrontID'],
order['SessionID'])
#----------------------------------------------------------------------
def __registerEvent(self):
"""注册事件监听"""
self.__eventEngine.register(EVENT_MARKETDATA, self.__updateMarketData)
self.__eventEngine.register(EVENT_ORDER, self.__updateOrder)
self.__eventEngine.register(EVENT_TRADE ,self.__updateTrade)
#----------------------------------------------------------------------
def writeLog(self, log):
"""写日志"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def registerStrategy(self, symbol, strategy):
"""注册策略对合约TICK数据的监听"""
# 尝试获取监听该合约代码的策略的列表,若无则创建
try:
listStrategy = self.__dictSymbolStrategy[symbol]
except KeyError:
listStrategy = []
self.__dictSymbolStrategy[symbol] = listStrategy
# 防止重复注册
if strategy not in listStrategy:
listStrategy.append(strategy)
#----------------------------------------------------------------------
def placeStopOrder(self, symbol, direction, offset, price, volume, strategy):
"""
下停止单运行于本地引擎中
注意这里的price是停止单的触发价
"""
# 创建止损单对象
so = StopOrder(symbol, direction, offset, price, volume, strategy)
# 获取该合约相关的止损单列表
try:
listSO = self.__dictStopOrder[symbol]
except KeyError:
listSO = []
self.__dictStopOrder[symbol] = listSO
# 将该止损单插入列表中
listSO.append(so)
return so
#----------------------------------------------------------------------
def cancelStopOrder(self, so):
"""撤销停止单"""
symbol = so.symbol
try:
listSO = self.__dictStopOrder[symbol]
if so in listSO:
listSO.remove(so)
if not listSO:
del self.__dictStopOrder[symbol]
except KeyError:
pass
########################################################################
class SimpleStrategyTemplate(object):
"""简易策略模板"""
#----------------------------------------------------------------------
def __init__(self, name, symbol, engine):
"""Constructor"""
self.name = name # 策略名称(注意唯一性)
self.__engine = engine # 策略引擎对象
self.symbol = symbol # 策略交易的合约
self.trading = False # 策略是否启动交易
#----------------------------------------------------------------------
def onTick(self, tick):
"""行情更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onTrade(self, trade):
"""交易更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onOrder(self, order):
"""报单更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onStopOrder(self, orderRef):
"""停止单更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def start(self):
"""
启动交易
这里是最简单的改变self.trading
有需要可以重新实现更复杂的操作
"""
self.trading = True
#----------------------------------------------------------------------
def stop(self):
"""
停止交易
同上
"""
self.trading = False
#----------------------------------------------------------------------
def __buy(self, price, volume, stopOrder=False):
"""买入开仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_BUY,
OFFSET_OPEN, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_BUY,
OFFSET_OPEN, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __cover(self, price, volume, StopOrder=False):
"""买入平仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_BUY,
OFFSET_CLOSE, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_BUY,
OFFSET_CLOSE, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __sell(self, price, volume, stopOrder=False):
"""卖出平仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_SELL,
OFFSET_CLOSE, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_SELL,
OFFSET_CLOSE, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __short(self, price, volume, stopOrder=False):
"""卖出开仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_SELL,
OFFSET_OPEN, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_SELL,
OFFSET_OPEN, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __cancelOrder(self, orderRef):
"""撤单"""
self.__engine.cancelOrder(orderRef)
#----------------------------------------------------------------------
def __cancelStopOrder(self, so):
"""撤销停止单"""
self.__engine.cancelStopOrder(so)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,945 @@
# encoding: UTF-8
"""
该文件中包含的是交易平台的底层接口相关的部分
主要对API进行了一定程度的简化封装方便开发
"""
import os
from vnctpmd import MdApi
from vnctptd import TdApi
from eventEngine import *
from ctp_data_type import defineDict
#----------------------------------------------------------------------
def print_dict(d):
"""打印API收到的字典该函数主要用于开发时的debug"""
print '-'*60
l = d.keys()
l.sort()
for key in l:
print key, ':', d[key]
########################################################################
class DemoMdApi(MdApi):
"""
Demo中的行情API封装
封装后所有数据自动推送到事件驱动引擎中由其负责推送到各个监听该事件的回调函数上
对用户暴露的主动函数包括:
登陆 login
订阅合约 subscribe
"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
"""
API对象的初始化函数
"""
super(DemoMdApi, self).__init__()
# 事件引擎,所有数据都推送到其中,再由事件引擎进行分发
self.__eventEngine = eventEngine
# 请求编号由api负责管理
self.__reqid = 0
# 以下变量用于实现连接和重连后的自动登陆
self.__userid = ''
self.__password = ''
self.__brokerid = ''
# 以下集合用于重连后自动订阅之前已订阅的合约,使用集合为了防止重复
self.__setSubscribed = set()
# 初始化.con文件的保存目录为\mdconnection注意这个目录必须已存在否则会报错
self.createFtdcMdApi(os.getcwd() + '\\mdconnection\\')
#----------------------------------------------------------------------
def onFrontConnected(self):
"""服务器连接"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = u'行情服务器连接成功'
self.__eventEngine.put(event)
# 如果用户已经填入了用户名等等,则自动尝试连接
if self.__userid:
req = {}
req['UserID'] = self.__userid
req['Password'] = self.__password
req['BrokerID'] = self.__brokerid
self.__reqid = self.__reqid + 1
self.reqUserLogin(req, self.__reqid)
#----------------------------------------------------------------------
def onFrontDisconnected(self, n):
"""服务器断开"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = u'行情服务器连接断开'
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onHeartBeatWarning(self, n):
"""心跳报警"""
# 因为API的心跳报警比较常被触发且与API工作关系不大因此选择忽略
pass
#----------------------------------------------------------------------
def onRspError(self, error, n, last):
"""错误回报"""
event = Event(type_=EVENT_LOG)
log = u'行情错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspUserLogin(self, data, error, n, last):
"""登陆回报"""
event = Event(type_=EVENT_LOG)
if error['ErrorID'] == 0:
log = u'行情服务器登陆成功'
else:
log = u'登陆回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
## 重连后自动订阅之前已经订阅过的合约
#if self.__setSubscribed:
#for instrument in self.__setSubscribed:
#self.subscribe(instrument[0], instrument[1])
#----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
event = Event(type_=EVENT_LOG)
if error['ErrorID'] == 0:
log = u'行情服务器登出成功'
else:
log = u'登出回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspSubMarketData(self, data, error, n, last):
"""订阅合约回报"""
# 通常不在乎订阅错误,选择忽略
pass
#----------------------------------------------------------------------
def onRspUnSubMarketData(self, data, error, n, last):
"""退订合约回报"""
# 同上
pass
#----------------------------------------------------------------------
def onRtnDepthMarketData(self, data):
"""行情推送"""
# 行情推送收到后,同时触发常规行情事件,以及特定合约行情事件,用于满足不同类型的监听
# 常规行情事件
event1 = Event(type_=EVENT_MARKETDATA)
event1.dict_['data'] = data
self.__eventEngine.put(event1)
# 特定合约行情事件
event2 = Event(type_=(EVENT_MARKETDATA_CONTRACT+data['InstrumentID']))
event2.dict_['data'] = data
self.__eventEngine.put(event2)
#----------------------------------------------------------------------
def onRspSubForQuoteRsp(self, data, error, n, last):
"""订阅期权询价"""
pass
#----------------------------------------------------------------------
def onRspUnSubForQuoteRsp(self, data, error, n, last):
"""退订期权询价"""
pass
#----------------------------------------------------------------------
def onRtnForQuoteRsp(self, data):
"""期权询价推送"""
pass
#----------------------------------------------------------------------
def login(self, address, userid, password, brokerid):
"""连接服务器"""
self.__userid = userid
self.__password = password
self.__brokerid = brokerid
# 注册服务器地址
self.registerFront(address)
# 初始化连接成功会调用onFrontConnected
self.init()
#----------------------------------------------------------------------
def subscribe(self, instrumentid, exchangeid):
"""订阅合约"""
self.subscribeMarketData(instrumentid)
instrument = (instrumentid, exchangeid)
self.__setSubscribed.add(instrument)
########################################################################
class DemoTdApi(TdApi):
"""
Demo中的交易API封装
主动函数包括
login 登陆
getInstrument 查询合约信息
getAccount 查询账号资金
getInvestor 查询投资者
getPosition 查询持仓
sendOrder 发单
cancelOrder 撤单
"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
"""API对象的初始化函数"""
super(DemoTdApi, self).__init__()
# 事件引擎,所有数据都推送到其中,再由事件引擎进行分发
self.__eventEngine = eventEngine
# 请求编号由api负责管理
self.__reqid = 0
# 报单编号由api负责管理
self.__orderref = 0
# 以下变量用于实现连接和重连后的自动登陆
self.__userid = ''
self.__password = ''
self.__brokerid = ''
# 合约字典(保存合约查询数据)
self.__dictInstrument = {}
# 初始化.con文件的保存目录为\tdconnection
self.createFtdcTraderApi(os.getcwd() + '\\tdconnection\\')
#----------------------------------------------------------------------
def onFrontConnected(self):
"""服务器连接"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = u'交易服务器连接成功'
self.__eventEngine.put(event)
# 如果用户已经填入了用户名等等,则自动尝试连接
if self.__userid:
req = {}
req['UserID'] = self.__userid
req['Password'] = self.__password
req['BrokerID'] = self.__brokerid
self.__reqid = self.__reqid + 1
self.reqUserLogin(req, self.__reqid)
#----------------------------------------------------------------------
def onFrontDisconnected(self, n):
"""服务器断开"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = u'交易服务器连接断开'
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onHeartBeatWarning(self, n):
""""""
pass
#----------------------------------------------------------------------
def onRspAuthenticate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspUserLogin(self, data, error, n, last):
"""登陆回报"""
event = Event(type_=EVENT_LOG)
if error['ErrorID'] == 0:
log = u'交易服务器登陆成功'
else:
log = u'登陆回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
self.getSettlement() # 登录完成后立即查询结算信息
#----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
event = Event(type_=EVENT_LOG)
if error['ErrorID'] == 0:
log = u'交易服务器登出成功'
else:
log = u'登出回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspUserPasswordUpdate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspTradingAccountPasswordUpdate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspOrderInsert(self, data, error, n, last):
"""发单错误(柜台)"""
event = Event(type_=EVENT_LOG)
log = u' 发单错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspParkedOrderInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspOrderAction(self, data, error, n, last):
"""撤单错误(柜台)"""
event = Event(type_=EVENT_LOG)
log = u'撤单错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQueryMaxOrderVolume(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspSettlementInfoConfirm(self, data, error, n, last):
"""确认结算信息回报"""
event = Event(type_=EVENT_LOG)
log = u'结算信息确认完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
event = Event(type_=EVENT_TDLOGIN)
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspRemoveParkedOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspRemoveParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspExecOrderInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspExecOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspForQuoteInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQuoteInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQuoteAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTrade(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPosition(self, data, error, n, last):
"""持仓查询回报"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_POSITION)
event.dict_['data'] = data
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'持仓查询回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryTradingAccount(self, data, error, n, last):
"""资金账户查询回报"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_ACCOUNT)
event.dict_['data'] = data
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'账户查询回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryInvestor(self, data, error, n, last):
"""投资者查询回报"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_INVESTOR)
event.dict_['data'] = data
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'合约投资者回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryTradingCode(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrumentMarginRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrumentCommissionRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchange(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryProduct(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrument(self, data, error, n, last):
"""
合约查询回报
由于该回报的推送速度极快因此不适合全部存入队列中处理
选择先储存在一个本地字典中全部收集完毕后再推送到队列中
由于耗时过长目前使用其他进程读取
"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_INSTRUMENT)
event.dict_['data'] = data
event.dict_['last'] = last
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'合约投资者回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryDepthMarketData(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySettlementInfo(self, data, error, n, last):
"""查询结算信息回报"""
event = Event(type_=EVENT_LOG)
log = u'结算信息查询完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
self.confirmSettlement() # 查询完成后立即确认结算信息
#----------------------------------------------------------------------
def onRspQryTransferBank(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPositionDetail(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryNotice(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySettlementInfoConfirm(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPositionCombineDetail(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryCFMMCTradingAccountKey(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryEWarrantOffset(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorProductGroupMargin(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeMarginRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeMarginRateAdjust(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySecAgentACIDMap(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOptionInstrTradeCost(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOptionInstrCommRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExecOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryForQuote(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryQuote(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTransferSerial(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryAccountregister(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspError(self, error, n, last):
"""错误回报"""
event = Event(type_=EVENT_LOG)
log = u'交易错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRtnOrder(self, data):
"""报单回报"""
# 更新最大报单编号
newref = data['OrderRef']
self.__orderref = max(self.__orderref, int(newref))
# 常规报单事件
event1 = Event(type_=EVENT_ORDER)
event1.dict_['data'] = data
self.__eventEngine.put(event1)
# 特定合约行情事件
event2 = Event(type_=(EVENT_ORDER_ORDERREF+data['OrderRef']))
event2.dict_['data'] = data
self.__eventEngine.put(event2)
#----------------------------------------------------------------------
def onRtnTrade(self, data):
"""成交回报"""
# 常规成交事件
event1 = Event(type_=EVENT_TRADE)
event1.dict_['data'] = data
self.__eventEngine.put(event1)
# 特定合约成交事件
event2 = Event(type_=(EVENT_TRADE_CONTRACT+data['InstrumentID']))
event2.dict_['data'] = data
self.__eventEngine.put(event2)
#----------------------------------------------------------------------
def onErrRtnOrderInsert(self, data, error):
"""发单错误回报(交易所)"""
event = Event(type_=EVENT_LOG)
log = u'发单错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onErrRtnOrderAction(self, data, error):
"""撤单错误回报(交易所)"""
event = Event(type_=EVENT_LOG)
log = u'撤单错误回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRtnInstrumentStatus(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnTradingNotice(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnErrorConditionalOrder(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnExecOrder(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnExecOrderInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnExecOrderAction(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnForQuoteInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnQuote(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQuoteInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQuoteAction(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnForQuoteRsp(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRspQryContractBank(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryParkedOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTradingNotice(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryBrokerTradingParams(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryBrokerTradingAlgos(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromBankToFutureByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromFutureToBankByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromBankToFutureByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromFutureToBankByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByFutureManual(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByFutureManual(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnQueryBankBalanceByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnBankToFutureByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnFutureToBankByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnRepealBankToFutureByFutureManual(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnRepealFutureToBankByFutureManual(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQueryBankBalanceByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRspFromBankToFutureByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspFromFutureToBankByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQueryBankAccountMoneyByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRtnOpenAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnCancelAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnChangeAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def login(self, address, userid, password, brokerid):
"""连接服务器"""
self.__userid = userid
self.__password = password
self.__brokerid = brokerid
# 数据重传模式设为从本日开始
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
# 注册服务器地址
self.registerFront(address)
# 初始化连接成功会调用onFrontConnected
self.init()
#----------------------------------------------------------------------
def getInstrument(self):
"""查询合约"""
self.__reqid = self.__reqid + 1
self.reqQryInstrument({}, self.__reqid)
#----------------------------------------------------------------------
def getAccount(self):
"""查询账户"""
self.__reqid = self.__reqid + 1
self.reqQryTradingAccount({}, self.__reqid)
#----------------------------------------------------------------------
def getInvestor(self):
"""查询投资者"""
self.__reqid = self.__reqid + 1
self.reqQryInvestor({}, self.__reqid)
#----------------------------------------------------------------------
def getPosition(self):
"""查询持仓"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqQryInvestorPosition(req, self.__reqid)
#----------------------------------------------------------------------
def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
"""发单"""
self.__reqid = self.__reqid + 1
req = {}
req['InstrumentID'] = instrumentid
req['OrderPriceType'] = pricetype
req['LimitPrice'] = price
req['VolumeTotalOriginal'] = volume
req['Direction'] = direction
req['CombOffsetFlag'] = offset
self.__orderref = self.__orderref + 1
req['OrderRef'] = str(self.__orderref)
req['InvestorID'] = self.__userid
req['UserID'] = self.__userid
req['BrokerID'] = self.__brokerid
req['CombHedgeFlag'] = defineDict['THOST_FTDC_HF_Speculation'] # 投机单
req['ContingentCondition'] = defineDict['THOST_FTDC_CC_Immediately'] # 立即发单
req['ForceCloseReason'] = defineDict['THOST_FTDC_FCC_NotForceClose'] # 非强平
req['IsAutoSuspend'] = 0 # 非自动挂起
req['TimeCondition'] = defineDict['THOST_FTDC_TC_GFD'] # 今日有效
req['VolumeCondition'] = defineDict['THOST_FTDC_VC_AV'] # 任意成交量
req['MinVolume'] = 1 # 最小成交量为1
self.reqOrderInsert(req, self.__reqid)
# 返回订单号,便于某些算法进行动态管理
return self.__orderref
#----------------------------------------------------------------------
def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
"""撤单"""
self.__reqid = self.__reqid + 1
req = {}
req['InstrumentID'] = instrumentid
req['ExchangeID'] = exchangeid
req['OrderRef'] = orderref
req['FrontID'] = frontid
req['SessionID'] = sessionid
req['ActionFlag'] = defineDict['THOST_FTDC_AF_Delete']
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqOrderAction(req, self.__reqid)
#----------------------------------------------------------------------
def getSettlement(self):
"""查询结算信息"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqQrySettlementInfo(req, self.__reqid)
#----------------------------------------------------------------------
def confirmSettlement(self):
"""确认结算信息"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqSettlementInfoConfirm(req, self.__reqid)

View File

@ -0,0 +1,189 @@
# encoding: UTF-8
"""
该文件中包含的是交易平台的中间层
将API和事件引擎包装到一个主引擎类中便于管理
当客户想采用服务器-客户机模式实现交易功能放在托管机房
而图形控制功能在本地电脑时该主引擎负责实现远程通讯
"""
import sys
from datetime import date
from time import sleep
import shelve
from PyQt4 import QtCore
from demoApi import *
from eventEngine import EventEngine
########################################################################
class MainEngine:
"""主引擎负责对API的调度"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.ee = EventEngine() # 创建事件驱动引擎
self.md = DemoMdApi(self.ee) # 创建API接口
self.td = DemoTdApi(self.ee)
self.ee.start() # 启动事件驱动引擎
# 循环查询持仓和账户相关
self.countGet = 0 # 查询延时计数
self.lastGet = 'Account' # 上次查询的性质
self.ee.register(EVENT_TDLOGIN, self.initGet) # 登录成功后开始初始化查询
# 合约储存相关
self.dictInstrument = {} # 字典(保存合约查询数据)
self.ee.register(EVENT_INSTRUMENT, self.insertInstrument)
#----------------------------------------------------------------------
def login(self, userid, password, brokerid, mdAddress, tdAddress):
"""登陆"""
self.md.login(mdAddress, userid, password, brokerid)
self.td.login(tdAddress, userid, password, brokerid)
#----------------------------------------------------------------------
def subscribe(self, instrumentid, exchangeid):
"""订阅合约"""
self.md.subscribe(instrumentid, exchangeid)
#----------------------------------------------------------------------
def getAccount(self):
"""查询账户"""
self.td.getAccount()
#----------------------------------------------------------------------
def getInvestor(self):
"""查询投资者"""
self.td.getInvestor()
#----------------------------------------------------------------------
def getPosition(self):
"""查询持仓"""
self.td.getPosition()
#----------------------------------------------------------------------
def getInstrument(self):
"""获取合约"""
event = Event(type_=EVENT_LOG)
log = u'查询合约信息'
event.dict_['log'] = log
self.ee.put(event)
self.td.getInstrument()
#----------------------------------------------------------------------
def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
"""发单"""
ref = self.td.sendOrder(instrumentid, exchangeid, price, pricetype, volume, direction, offset)
return ref
#----------------------------------------------------------------------
def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
"""撤单"""
self.td.cancelOrder(instrumentid, exchangeid, orderref, frontid, sessionid)
#----------------------------------------------------------------------
def getAccountPosition(self, event):
"""循环查询账户和持仓"""
self.countGet = self.countGet + 1
# 每5秒发一次查询
if self.countGet > 5:
self.countGet = 0 # 清空计数
if self.lastGet == 'Account':
self.getPosition()
self.lastGet = 'Position'
else:
self.getAccount()
self.lastGet = 'Account'
#----------------------------------------------------------------------
def initGet(self, event):
"""在交易服务器登录成功后,开始初始化查询"""
# 打开设定文件setting.vn
f = shelve.open('setting.vn')
# 尝试读取设定字典,若该字典不存在,则发出查询请求
try:
d = f['instrument']
# 如果本地保存的合约数据是今日的,则载入,否则发出查询请求
today = date.today()
if d['date'] == today:
self.dictInstrument = d['dictInstrument']
event = Event(type_=EVENT_LOG)
log = u'合约信息读取完成'
event.dict_['log'] = log
self.ee.put(event)
self.getInvestor()
# 开始循环查询
self.ee.register(EVENT_TIMER, self.getAccountPosition)
else:
self.getInstrument()
except KeyError:
self.getInstrument()
f.close()
#----------------------------------------------------------------------
def insertInstrument(self, event):
"""插入合约对象"""
data = event.dict_['data']
last = event.dict_['last']
self.dictInstrument[data['InstrumentID']] = data
# 合约对象查询完成后,查询投资者信息并开始循环查询
if last:
# 将查询完成的合约信息保存到本地文件,今日登录可直接使用不再查询
self.saveInstrument()
event = Event(type_=EVENT_LOG)
log = u'合约信息查询完成'
event.dict_['log'] = log
self.ee.put(event)
self.getInvestor()
# 开始循环查询
self.ee.register(EVENT_TIMER, self.getAccountPosition)
#----------------------------------------------------------------------
def selectInstrument(self, instrumentid):
"""获取合约信息对象"""
try:
instrument = self.dictInstrument[instrumentid]
except KeyError:
instrument = None
return instrument
#----------------------------------------------------------------------
def exit(self):
"""退出"""
# 销毁API对象
self.td = None
self.md = None
# 停止事件驱动引擎
self.ee.stop()
#----------------------------------------------------------------------
def saveInstrument(self):
"""保存合约属性数据"""
f = shelve.open('setting.vn')
d = {}
d['dictInstrument'] = self.dictInstrument
d['date'] = date.today()
f['instrument'] = d
f.close()

View File

@ -0,0 +1,32 @@
# encoding: UTF-8
"""
该文件中包含的是交易平台的主函数
将底层中层上层的功能导入并运行
"""
import ctypes
import sys
from demoEngine import MainEngine
from demoUi import *
#----------------------------------------------------------------------
def main():
"""主程序入口"""
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID('vn.py demo') # win7以下请注释掉该行
app = QtGui.QApplication(sys.argv)
app.setWindowIcon(QtGui.QIcon('vnpy.ico'))
me = MainEngine()
mw = MainWindow(me.ee, me)
mw.showMaximized()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,18 @@
# encoding: UTF-8
from strategyEngine import StrategyTemplate
########################################################################
class SimpleEmaStrategy(StrategyTemplate):
"""简单双指数移动均线策略"""
#----------------------------------------------------------------------
def __init__(self, name, symbol, engine):
"""Constructor"""
super(SimpleEmaStrategy, self).__init__(name, symbol, engine)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,196 @@
# encoding: UTF-8
# 系统模块
from Queue import Queue, Empty
from threading import Thread
# 第三方模块
from PyQt4.QtCore import QTimer
# 自己开发的模块
from eventType import *
########################################################################
class EventEngine:
"""
事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心
从外部修改了这些变量的值或状态导致bug
变量说明
__queue私有变量事件队列
__active私有变量事件引擎开关
__thread私有变量事件处理线程
__timer私有变量计时器
__handlers私有变量事件处理函数字典
方法说明
__run: 私有方法事件处理线程连续运行用
__process: 私有方法处理事件调用注册在引擎中的监听函数
__onTimer私有方法计时器固定事件间隔触发后向事件队列中存入计时器事件
start: 公共方法启动引擎
stop公共方法停止引擎
register公共方法向引擎中注册监听函数
unregister公共方法向引擎中注销监听函数
put公共方法向事件队列中存入新的事件
事件监听函数必须定义为输入参数仅为一个event对象
函数
def func(event)
...
对象方法
def method(self, event)
...
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = QTimer()
self.__timer.timeout.connect(self.__onTimer)
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = {}
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
#若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
#以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
#----------------------------------------------------------------------
def __onTimer(self):
"""向事件队列中存入计时器事件"""
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
#----------------------------------------------------------------------
def start(self):
"""引擎启动"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
self.__timer.start(1000)
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timer.stop()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type_]
except KeyError:
pass
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
########################################################################
class Event:
"""事件对象"""
#----------------------------------------------------------------------
def __init__(self, type_=None):
"""Constructor"""
self.type_ = type_ # 事件类型
self.dict_ = {} # 字典用于保存具体的事件数据
#----------------------------------------------------------------------
def test():
"""测试函数"""
import sys
from datetime import datetime
from PyQt4.QtCore import QCoreApplication
def simpletest(event):
print u'处理每秒触发的计时器事件:%s' % str(datetime.now())
app = QCoreApplication(sys.argv)
ee = EventEngine()
ee.register(EVENT_TIMER, simpletest)
ee.start()
app.exec_()
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()

View File

@ -0,0 +1,61 @@
# encoding: UTF-8
'''
本文件仅用于存放对于事件类型常量的定义
由于python中不存在真正的常量概念因此选择使用全大写的变量名来代替常量
这里设计的命名规则以EVENT_前缀开头
常量的内容通常选择一个能够代表真实意义的字符串便于理解
建议将所有的常量定义放在该文件中便于检查是否存在重复的现象
'''
EVENT_TIMER = 'eTimer' # 计时器事件每隔1秒发送一次
EVENT_LOG = 'eLog' # 日志事件,通常使用某个监听函数直接显示
EVENT_TDLOGIN = 'eTdLogin' # 交易服务器登录成功事件
EVENT_MARKETDATA = 'eMarketData' # 行情推送事件
EVENT_MARKETDATA_CONTRACT = 'eMarketData.' # 特定合约的行情事件
EVENT_TRADE = 'eTrade' # 成交推送事件
EVENT_TRADE_CONTRACT = 'eTrade.' # 特定合约的成交事件
EVENT_ORDER = 'eOrder' # 报单推送事件
EVENT_ORDER_ORDERREF = 'eOrder.' # 特定报单号的报单事件
EVENT_POSITION = 'ePosition' # 持仓查询回报事件
EVENT_INSTRUMENT = 'eInstrument' # 合约查询回报事件
EVENT_INVESTOR = 'eInvestor' # 投资者查询回报事件
EVENT_ACCOUNT = 'eAccount' # 账户查询回报事件
#----------------------------------------------------------------------
def test():
"""检查是否存在内容重复的常量定义"""
check_dict = {}
global_dict = globals()
for key, value in global_dict.items():
if '__' not in key: # 不检查python内置对象
if value in check_dict:
check_dict[value].append(key)
else:
check_dict[value] = [key]
for key, value in check_dict.items():
if len(value)>1:
print u'存在重复的常量定义:' + str(key)
for name in value:
print name
print ''
print u'测试完毕'
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()

View File

@ -0,0 +1 @@
空文件仅用于让github识别该文件夹

View File

@ -0,0 +1,631 @@
# encoding: UTF-8
from datetime import datetime
from pymongo import Connection
from pymongo.errors import *
from eventEngine import *
# 常量定义
OFFSET_OPEN = '0' # 开仓
OFFSET_CLOSE = '1' # 平仓
DIRECTION_BUY = '0' # 买入
DIRECTION_SELL = '1' # 卖出
PRICETYPE_LIMIT = '2' # 限价
########################################################################
class Tick:
"""Tick数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.openPrice = 0 # OHLC
self.highPrice = 0
self.lowPrice = 0
self.lastPrice = 0
self.volume = 0 # 成交量
self.openInterest = 0 # 持仓量
self.upperLimit = 0 # 涨停价
self.lowerLimit = 0 # 跌停价
self.time = '' # 更新时间和毫秒
self.ms= 0
self.bidPrice1 = 0 # 深度行情
self.bidPrice2 = 0
self.bidPrice3 = 0
self.bidPrice4 = 0
self.bidPrice5 = 0
self.askPrice1 = 0
self.askPrice2 = 0
self.askPrice3 = 0
self.askPrice4 = 0
self.askPrice5 = 0
self.bidVolume1 = 0
self.bidVolume2 = 0
self.bidVolume3 = 0
self.bidVolume4 = 0
self.bidVolume5 = 0
self.askVolume1 = 0
self.askVolume2 = 0
self.askVolume3 = 0
self.askVolume4 = 0
self.askVolume5 = 0
########################################################################
class Trade:
"""成交数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.orderRef = '' # 报单号
self.tradeID = '' # 成交编号
self.direction = None # 方向
self.offset = None # 开平
self.price = 0 # 成交价
self.volume = 0 # 成交量
########################################################################
class Order:
"""报单数据对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol # 合约代码
self.orderRef = '' # 报单编号
self.direction = None # 方向
self.offset = None # 开平
self.price = 0 # 委托价
self.volumeOriginal = 0 # 报单量
self.volumeTraded = 0 # 已成交数量
self.insertTime = '' # 报单时间
self.cancelTime = '' # 撤单时间
self.frontID = 0 # 前置机编号
self.sessionID = 0 # 会话编号
self.status = '' # 报单状态代码
########################################################################
class StopOrder:
"""
停止单对象
用于实现价格突破某一水平后自动追入
即通常的条件单和止损单
"""
#----------------------------------------------------------------------
def __init__(self, symbol, direction, offset, price, volume, strategy):
"""Constructor"""
self.symbol = symbol
self.direction = direction
self.offset = offset
self.price = price
self.volume = volume
self.strategy = strategy
########################################################################
class StrategyEngine(object):
"""策略引擎"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, mainEngine):
"""Constructor"""
self.__eventEngine = eventEngine
self.mainEngine = mainEngine
# 获取代表今日的datetime
t = datetime.today()
self.today = t.replace(hour=0, minute=0, second=0, microsecond=0)
# 保存所有报单数据的字典
self.__dictOrder = {}
# 保存策略对象的字典
# key为策略名称
# value为策略对象
self.dictStrategy = {}
# 保存合约代码和策略对象映射关系的字典
# key为合约代码
# value为交易该合约的策略列表
self.__dictSymbolStrategy = {}
# 保存报单编号和策略对象映射关系的字典
# key为报单编号
# value为策略对象
self.__dictOrderRefStrategy = {}
# 保存合约代码和相关停止单的字典
# key为合约代码
# value为该合约相关的停止单列表
self.__dictStopOrder = {}
# MongoDB数据库相关
self.__mongoConnected = False
self.__mongoConnection = None
self.__mongoTickDB = None
# 调用函数
self.__connectMongo()
self.createStrategy()
self.__registerEvent()
#----------------------------------------------------------------------
def createStrategy(self, strategyName, strategySymbol, strategyClass, strategySetting):
"""创建策略"""
strategy = strategyClass(strategyName, strategySymbol, self)
self.dictStrategy[strategyName] = strategy
strategy.initSetting(strategySetting)
#----------------------------------------------------------------------
def __connectMongo(self):
"""连接MongoDB数据库"""
try:
self.__mongoConnection = Connection()
self.__mongoConnected = True
self.__mongoTickDB = self.__mongoConnection['TickDB']
self.writeLog(u'策略引擎连接MongoDB成功')
except ConnectionFailure:
self.writeLog(u'策略引擎连接MongoDB失败')
#----------------------------------------------------------------------
def __recordTick(self, data):
"""将Tick数据插入到MongoDB中"""
if self.__mongoConnected:
symbol = data['InstrumentID']
data['date'] = self.today
self.__mongoTickDB[symbol].insert(data)
#----------------------------------------------------------------------
def loadTick(self, symbol, dt):
"""从MongoDB中读取Tick数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
cx = collection.find({'date':{'$gte':dt}})
return cx
else:
return None
#----------------------------------------------------------------------
def __updateMarketData(self, event):
"""行情更新"""
data = event.dict_['data']
symbol = data['InstrumentID']
# 检查是否存在交易该合约的策略
if symbol in self.__dictSymbolStrategy:
# 创建TICK数据对象并更新数据
tick = Tick(symbol)
tick.openPrice = data['OpenPrice']
tick.highPrice = data['HighestPrice']
tick.lowPrice = data['LowestPrice']
tick.lastPrice = data['LastPrice']
tick.volume = data['Volume']
tick.openInterest = data['OpenInterest']
tick.upperLimit = data['UpperLimitPrice']
tick.lowerLimit = data['LowerLimitPrice']
tick.time = data['UpdateTime']
tick.ms = data['UpdateMillisec']
tick.bidPrice1 = data['BidPrice1']
tick.bidPrice2 = data['BidPrice2']
tick.bidPrice3 = data['BidPrice3']
tick.bidPrice4 = data['BidPrice4']
tick.bidPrice5 = data['BidPrice5']
tick.askPrice1 = data['AskPrice1']
tick.askPrice2 = data['AskPrice2']
tick.askPrice3 = data['AskPrice3']
tick.askPrice4 = data['AskPrice4']
tick.askPrice5 = data['AskPrice5']
tick.bidVolume1 = data['BidVolume1']
tick.bidVolume2 = data['BidVolume2']
tick.bidVolume3 = data['BidVolume3']
tick.bidVolume4 = data['BidVolume4']
tick.bidVolume5 = data['BidVolume5']
tick.askVolume1 = data['AskVolume1']
tick.askVolume2 = data['AskVolume2']
tick.askVolume3 = data['AskVolume3']
tick.askVolume4 = data['AskVolume4']
tick.askVolume5 = data['AskVolume5']
# 首先检查停止单是否需要发出
self.__processStopOrder(tick)
# 将该TICK数据推送给每个策略
for strategy in self.__dictSymbolStrategy[symbol]:
strategy.onTick(tick)
# 将数据插入MongoDB数据库实盘建议另开程序记录TICK数据
self.__recordTick(data)
#----------------------------------------------------------------------
def __processStopOrder(self, tick):
"""处理停止单"""
symbol = tick.symbol
lastPrice = tick.lastPrice
upperLimit = tick.upperLimit
lowerLimit = tick.lowerLimit
# 如果当前有该合约上的止损单
if symbol in self.__dictStopOrder:
# 获取止损单列表
listSO = self.__dictStopOrder[symbol] # SO:stop order
# 准备一个空的已发止损单列表
listSent = []
for so in listSO:
# 如果是买入停止单,且最新成交价大于停止触发价
if so.direction == DIRECTION_BUY and lastPrice >= so.price:
# 以当日涨停价发出限价单买入
ref = self.sendOrder(symbol, DIRECTION_BUY, so.offset,
upperLimit, so.volume, strategy)
# 触发策略的止损单发出更新
so.strategy.onStopOrder(ref)
# 将该止损单对象保存到已发送列表中
listSent.append(so)
# 如果是卖出停止单,且最新成交价小于停止触发价
elif so.direction == DIRECTION_SELL and lastPrice <= so.price:
ref = self.sendOrder(symbol, DIRECTION_SELL, so.offset,
lowerLimit, so.volume, strategy)
so.strategy.onStopOrder(ref)
listSent.append(so)
# 从停止单列表中移除已经发单的停止单对象
if listSent:
for so in listSent:
listSO.remove(so)
# 检查停止单列表是否为空,若为空,则从停止单字典中移除该合约代码
if not listSO:
del self.__dictStopOrder[symbol]
#----------------------------------------------------------------------
def __updateOrder(self, event):
"""报单更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
# 检查是否存在监听该报单的策略
if orderRef in self.__dictOrderRefStrategy:
# 创建Order数据对象
order = Order(data['InstrumentID'])
order.orderRef = data['OrderRef']
order.direction = data['Direction']
order.offset = data['ComboOffsetFlag']
order.price = data['LimitPrice']
order.volumeOriginal = data['VolumeTotalOriginal']
order.volumeTraded = data['VolumeTraded']
order.insertTime = data['InsertTime']
order.cancelTime = data['CancelTime']
order.frontID = data['FrontID']
order.sessionID = data['SessionID']
order.status = data['OrderStatus']
# 推送给策略
strategy = self.__dictOrderRefStrategy[orderRef]
strategy.onOrder(order)
# 记录该Order的数据
self.__dictOrder[orderRef] = data
#----------------------------------------------------------------------
def __updateTrade(self, event):
"""成交更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
if orderRef in self.__dictOrderRefStrategy:
# 创建Trade数据对象
trade = Trade(data['InstrumentID'])
trade.orderRef = orderRef
trade.tradeID = data['TradeID']
trade.direction = data['Direction']
trade.offset = data['OffsetFlag']
trade.price = data['Price']
trade.volume = data['Volume']
# 推送给策略
strategy = self.__dictOrderRefStrategy[orderRef]
strategy.onTrade(trade)
#----------------------------------------------------------------------
def sendOrder(self, symbol, direction, offset, price, volume, strategy):
"""
发单仅允许限价单
symbol合约代码
direction方向DIRECTION_BUY/DIRECTION_SELL
offset开平OFFSET_OPEN/OFFSET_CLOSE
price下单价格
volume下单手数
strategy策略对象
"""
contract = self.mainEngine.selectInstrument(symbol)
if contract:
ref = self.mainEngine.sendOrder(symbol,
contract['ExchangeID'],
price,
PRICETYPE_LIMIT,
volume,
direction,
offset)
self.__dictOrderRefStrategy[ref] = strategy
return ref
#----------------------------------------------------------------------
def cancelOrder(self, orderRef):
"""
撤单
"""
order = self.__dictOrder[orderRef]
symbol = order['InstrumentID']
contract = self.mainEngine.selectInstrument(symbol)
if contract:
self.mainEngine.cancelOrder(symbol,
contract['ExchangeID'],
orderRef,
order['FrontID'],
order['SessionID'])
#----------------------------------------------------------------------
def __registerEvent(self):
"""注册事件监听"""
self.__eventEngine.register(EVENT_MARKETDATA, self.__updateMarketData)
self.__eventEngine.register(EVENT_ORDER, self.__updateOrder)
self.__eventEngine.register(EVENT_TRADE ,self.__updateTrade)
#----------------------------------------------------------------------
def writeLog(self, log):
"""写日志"""
event = Event(type_=EVENT_LOG)
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def registerStrategy(self, symbol, strategy):
"""注册策略对合约TICK数据的监听"""
# 尝试获取监听该合约代码的策略的列表,若无则创建
try:
listStrategy = self.__dictSymbolStrategy[symbol]
except KeyError:
listStrategy = []
self.__dictSymbolStrategy[symbol] = listStrategy
# 防止重复注册
if strategy not in listStrategy:
listStrategy.append(strategy)
#----------------------------------------------------------------------
def placeStopOrder(self, symbol, direction, offset, price, volume, strategy):
"""
下停止单运行于本地引擎中
注意这里的price是停止单的触发价
"""
# 创建止损单对象
so = StopOrder(symbol, direction, offset, price, volume, strategy)
# 获取该合约相关的止损单列表
try:
listSO = self.__dictStopOrder[symbol]
except KeyError:
listSO = []
self.__dictStopOrder[symbol] = listSO
# 将该止损单插入列表中
listSO.append(so)
return so
#----------------------------------------------------------------------
def cancelStopOrder(self, so):
"""撤销停止单"""
symbol = so.symbol
try:
listSO = self.__dictStopOrder[symbol]
if so in listSO:
listSO.remove(so)
if not listSO:
del self.__dictStopOrder[symbol]
except KeyError:
pass
#----------------------------------------------------------------------
def startAll(self):
"""启动所有策略"""
for strategy in self.dictStrategy.values():
strategy.start()
#----------------------------------------------------------------------
def stopAll(self):
"""停止所有策略"""
for strategy in self.dictStrategy.values():
strategy.stop()
########################################################################
class StrategyTemplate(object):
"""策略模板"""
#----------------------------------------------------------------------
def __init__(self, name, symbol, engine):
"""Constructor"""
self.name = name # 策略名称(注意唯一性)
self.symbol = symbol # 策略交易的合约
self.__engine = engine # 策略引擎对象
self.trading = False # 策略是否启动交易
#----------------------------------------------------------------------
def onTick(self, tick):
"""行情更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onTrade(self, trade):
"""交易更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onOrder(self, order):
"""报单更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onStopOrder(self, orderRef):
"""停止单更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def onBar(self, o, h, l, c, volume, time):
"""K线数据更新"""
raise NotImplementedError
#----------------------------------------------------------------------
def start(self):
"""
启动交易
这里是最简单的改变self.trading
有需要可以重新实现更复杂的操作
"""
self.trading = True
#----------------------------------------------------------------------
def stop(self):
"""
停止交易
同上
"""
self.trading = False
#----------------------------------------------------------------------
def initSetting(self, setting):
"""
初始化设置
setting通常是一个包含了参数设置的字典
"""
raise NotImplementedError
#----------------------------------------------------------------------
def __buy(self, price, volume, stopOrder=False):
"""买入开仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_BUY,
OFFSET_OPEN, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_BUY,
OFFSET_OPEN, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __cover(self, price, volume, StopOrder=False):
"""买入平仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_BUY,
OFFSET_CLOSE, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_BUY,
OFFSET_CLOSE, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __sell(self, price, volume, stopOrder=False):
"""卖出平仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_SELL,
OFFSET_CLOSE, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_SELL,
OFFSET_CLOSE, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __short(self, price, volume, stopOrder=False):
"""卖出开仓"""
if self.trading:
if stopOrder:
so = self.__engine.placeStopOrder(self.symbol, DIRECTION_SELL,
OFFSET_OPEN, price, volume, self)
return so
else:
ref = self.__engine.sendOrder(self.symbol, DIRECTION_SELL,
OFFSET_OPEN, price, volume, self)
return ref
else:
return None
#----------------------------------------------------------------------
def __cancelOrder(self, orderRef):
"""撤单"""
self.__engine.cancelOrder(orderRef)
#----------------------------------------------------------------------
def __cancelStopOrder(self, so):
"""撤销停止单"""
self.__engine.cancelStopOrder(so)

View File

@ -0,0 +1 @@
空文件仅用于让github识别该文件夹

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB