vnpy/examples/Services/runServer.py
2018-05-29 00:04:00 +08:00

266 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: utf-8
# 该文件为无界面启动文件以vtServer为容器加载MainEngine
# 配置:
# self.gateway_name gateway 的连接名称在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json"
# self.strategies启动的策略实例须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi']
# vtServer的ZMQ端口 从VT_Setting.json中配置根据AUTO_CONNCET_GW找到对应得端口配置
import os
import sys
import ctypes
from datetime import datetime, timedelta, date
from time import sleep
from threading import Thread
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.trader.vtEvent import *
from vnpy.rpc import RpcServer
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.gateway import ctpGateway
from vnpy.trader.setup_logger import setup_logger,get_logger
from vnpy.trader.util_monitor import *
from vnpy.trader.vtGlobal import globalSetting
from vnpy.trader.util_gpid import *
from vnpy.trader.app import ctaStrategy,riskManager
AUTO_CONNCET_GW = 'CTP'
########################################################################
class VtServer(RpcServer):
"""vn.trader 无界面服务器"""
# ----------------------------------------------------------------------
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(VtServer, self).__init__(repAddress, pubAddress)
#self.usePickle()
# gateway 是否连接
self.connected = False
# gateway 的连接名称在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json"
self.gateway_name = AUTO_CONNCET_GW
# 启动的策略实例须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi']
self.strategies = [u'S30_RB0510', u'S30_HCRB05']
self.g_count = 0
self.disconnect_signal = 0
self.last_dt = datetime.now()
# 创建事件引擎
ee = EventEngine2()
# 创建主引擎对象
print( u'instance mainengine')
self.engine = MainEngine(ee)
# 添加CTP Gateway,配置文件为 CTP_Post
self.engine.addGateway(ctpGateway, self.gateway_name)
# 添加应用
self.engine.addApp(ctaStrategy)
self.engine.addApp(riskManager)
# 注册主引擎的方法到服务器的RPC函数
self.register(self.engine.connect)
self.register(self.engine.disconnect)
self.register(self.engine.subscribe)
self.register(self.engine.sendOrder)
self.register(self.engine.cancelOrder)
self.register(self.engine.qryAccount)
self.register(self.engine.qryPosition)
self.register(self.engine.checkGatewayStatus) # 检测gateway的连接状态
self.register(self.engine.qryStatus) # 检测ctaEngine的状态
self.register(self.engine.exit)
self.register(self.engine.writeLog)
self.register(self.engine.dbConnect)
self.register(self.engine.dbInsert)
self.register(self.engine.dbQuery)
self.register(self.engine.dbUpdate)
self.register(self.engine.getContract)
self.register(self.engine.getAllContracts)
self.register(self.engine.getOrder)
self.register(self.engine.getAllWorkingOrders)
self.register(self.engine.getAllGatewayNames)
self.register(self.engine.saveData)
self.register(self.engine.initStrategy)
self.register(self.engine.startStrategy)
self.register(self.engine.stopStrategy)
# 注册事件引擎发送的事件处理监听
self.engine.eventEngine.registerGeneralHandler(self.eventHandler)
def trade_off(self):
"""检查现在是否为非交易时间"""
now = datetime.now()
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=8, minute=55, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=55, second=0, microsecond=0)
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (now.isoweekday() == 1 and now <=b)
off = (a <= now <= b) or (c <= now <= d) or weekend
return off
def disconnect(self):
""""断开底层gateway的连接"""
if self.engine:
self.engine.disconnect(self.gateway_name)
self.connected = False
def onTimer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 十秒才执行一次检查
self.g_count += 1
if self.g_count <= 30:
return
self.g_count = 0
dt = datetime.now()
self.engine.qryStatus()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
print(u'noUiMain.py checkpoint:{0}'.format(dt))
self.engine.writeLog( u'noUiMain.py checkpoint:{0}'.format(dt))
# 定时断开
if self.trade_off():
"""非交易时间"""
if self.connected:
self.engine.writeLog(u'断开连接{0}'.format(self.gateway_name))
self.disconnect()
self.engine.writeLog(u'清空数据引擎')
self.engine.clearData()
self.connected = False
self.engine.writeNotification(u'非交易时间{0},断开连接{1}'.format(dt, self.gateway_name))
return
# 交易时间内,定时重连和检查
if not self.connected:
self.engine.writeLog(u'启动连接{0}'.format(self.gateway_name))
self.engine.writeLog(u'清空数据引擎')
self.engine.clearData()
self.engine.writeLog(u'重新连接{0}'.format(self.gateway_name))
self.engine.connect(self.gateway_name)
self.connected = True
self.disconnect_signal = 0
self.engine.writeNotification(u'{0},重新连接{1}'.format(dt, self.gateway_name))
return
else:
if not self.engine.checkGatewayStatus(self.gateway_name):
self.disconnect_signal += 1
if self.disconnect_signal >= 5:
self.engine.writeWarning(u'检查连接{0}异常,超过{1}'.format(self.gateway_name,self.disconnect_signal))
sys.exit(0)
else:
self.disconnect_signal = 0
def start(self):
"""启动"""
super(VtServer, self).start()
# 若需要连接数据库,则启动
# self.mainEngine.dbConnect()
# 加载cta的配置
print( u'load cta setting')
self.engine.ctaEngine.loadSetting()
print(u'initialize all strategies')
# 初始化策略,如果多个,则需要逐一初始化多个
for s in self.strategies:
print( 'init trategy {0}'.format(s))
self.engine.ctaEngine.initStrategy(s)
# 逐一启动策略
print( 'start strategy {0}'.format(s))
self.engine.ctaEngine.startStrategy(s)
# 指定的连接配置
if not self.trade_off():
print( u'connect gateway:{0}'.format(self.gateway_name))
self.engine.connect(self.gateway_name)
self.connected = True
# 注册定时器,用于判断重连
self.engine.eventEngine.register(EVENT_TIMER, self.onTimer)
# 所有的日志监控
self.logM = LogMonitor(self.engine.eventEngine)
self.errorM = ErrorMonitor(self.engine.eventEngine)
self.tradeM = TradeMonitor(self.engine.eventEngine)
self.orderM = OrderMonitor(self.engine.eventEngine, self.engine)
self.positionM = PositionMonitor(self.engine.eventEngine)
self.accountM = AccountMonitor(self.engine.eventEngine)
self.engine.writeNotification(u'{0},服务启动{1}'.format(datetime.now(),self. gateway_name))
# ----------------------------------------------------------------------
def eventHandler(self, event):
"""事件处理"""
try:
# 调用RpcServer.publish()
if isinstance(event.type_, str):
self.publish(event.type_, event)
else:
self.publish(event.type_.encode('utf-8'), event)
except Exception as ex:
print( u'event Handler exception:{0}'.format(str(ex)))
# ----------------------------------------------------------------------
def stopServer(self):
"""停止服务器"""
print( 'stopServer')
# 关闭引擎
self.engine.exit()
# 停止服务器线程
self.stop()
# ----------------------------------------------------------------------
def printLog(content):
"""打印日志"""
print( datetime.now().strftime("%H:%M:%S"), '\t', content)
# ----------------------------------------------------------------------
def runServer():
"""运行服务器"""
try:
log_file_name = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'logs', u'noUiMain.log'))
except Exception as ex:
print( u'Use local dict:{0}'.format(os.getcwd()))
log_file_name = os.path.abspath(os.path.join(os.getcwd(), 'logs', u'noUiMain.log'))
setup_logger(filename=log_file_name, debug=False)
# Req/Publish端口
try:
zmqAddressDict = globalSetting['ZMQ']
zmqAddress = zmqAddressDict[AUTO_CONNCET_GW]
reqAddress = zmqAddress['ReqAddress']
pubAddress = zmqAddress['PubAddress']
except:
reqAddress = 'tcp://*:2014'
pubAddress = 'tcp://*:2016'
# 创建并启动服务器
server = VtServer(reqAddress, pubAddress)
server.start()
printLog('-' * 50)
printLog(u'Request端口:{0}'.format(reqAddress))
printLog(u'Publish端口:{0}'.format(pubAddress))
printLog(u'Trader服务器已启动')
if __name__ == '__main__':
# 主程序
thread = Thread(target=runServer, args=())
thread.start()