diff --git a/examples/Services/__init__.py b/examples/Services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/Services/activate.sh b/examples/Services/activate.sh new file mode 100644 index 00000000..e7347aef --- /dev/null +++ b/examples/Services/activate.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 +source $CONDA_HOME/bin/deactivate +source $CONDA_HOME/bin/activate $CONDA_HOME/envs/py35 + +############ +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./runServer.py + +python $PROGRAM_NAME >$BASE_PATH/logs/service.log 2>>$BASE_PATH/logs/service-error.log & diff --git a/examples/Services/runServer.py b/examples/Services/runServer.py new file mode 100644 index 00000000..d3b2bec7 --- /dev/null +++ b/examples/Services/runServer.py @@ -0,0 +1,265 @@ +# encoding: utf-8 + +# 该文件,为无界面启动文件,以vtServer为容器,加载MainEngine +# 配置: +# self.gateway_name ,gateway 的连接名称,在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json", +# self.strategies:启动的策略实例,须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi'] +# vtServer的ZMQ端口: 从VT_Setting.json中配置,根据AUTO_CONNCET_GW找到对应得端口配置 + +import os +import sys + +import ctypes +from datetime import datetime, timedelta, date +from time import sleep +from threading import Thread + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +from vnpy.trader.vtEvent import * +from vnpy.rpc import RpcServer +from vnpy.trader.vtEngine import MainEngine +from vnpy.trader.gateway import ctpGateway +from vnpy.trader.setup_logger import setup_logger,get_logger +from vnpy.trader.util_monitor import * +from vnpy.trader.vtGlobal import globalSetting +from vnpy.trader.util_gpid import * +from vnpy.trader.app import ctaStrategy,riskManager + +AUTO_CONNCET_GW = 'CTP' +######################################################################## +class VtServer(RpcServer): + """vn.trader 无界面服务器""" + + # ---------------------------------------------------------------------- + def __init__(self, repAddress, pubAddress): + """Constructor""" + super(VtServer, self).__init__(repAddress, pubAddress) + #self.usePickle() + + # gateway 是否连接 + self.connected = False + # gateway 的连接名称,在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json", + self.gateway_name = AUTO_CONNCET_GW + # 启动的策略实例,须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi'] + self.strategies = [u'S30_RB0510', u'S30_HCRB05'] + self.g_count = 0 + self.disconnect_signal = 0 + self.last_dt = datetime.now() + + # 创建事件引擎 + ee = EventEngine2() + + # 创建主引擎对象 + print( u'instance mainengine') + self.engine = MainEngine(ee) + + # 添加CTP Gateway,配置文件为 CTP_Post + self.engine.addGateway(ctpGateway, self.gateway_name) + + # 添加应用 + self.engine.addApp(ctaStrategy) + self.engine.addApp(riskManager) + + + # 注册主引擎的方法到服务器的RPC函数 + self.register(self.engine.connect) + self.register(self.engine.disconnect) + self.register(self.engine.subscribe) + self.register(self.engine.sendOrder) + self.register(self.engine.cancelOrder) + self.register(self.engine.qryAccount) + self.register(self.engine.qryPosition) + self.register(self.engine.checkGatewayStatus) # 检测gateway的连接状态 + self.register(self.engine.qryStatus) # 检测ctaEngine的状态 + self.register(self.engine.exit) + self.register(self.engine.writeLog) + self.register(self.engine.dbConnect) + self.register(self.engine.dbInsert) + self.register(self.engine.dbQuery) + self.register(self.engine.dbUpdate) + self.register(self.engine.getContract) + self.register(self.engine.getAllContracts) + self.register(self.engine.getOrder) + self.register(self.engine.getAllWorkingOrders) + self.register(self.engine.getAllGatewayNames) + self.register(self.engine.saveData) + self.register(self.engine.initStrategy) + self.register(self.engine.startStrategy) + self.register(self.engine.stopStrategy) + + # 注册事件引擎发送的事件处理监听 + self.engine.eventEngine.registerGeneralHandler(self.eventHandler) + + + def trade_off(self): + """检查现在是否为非交易时间""" + now = datetime.now() + a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0) + b = datetime.now().replace(hour=8, minute=55, second=0, microsecond=0) + c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0) + d = datetime.now().replace(hour=20, minute=55, second=0, microsecond=0) + weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (now.isoweekday() == 1 and now <=b) + off = (a <= now <= b) or (c <= now <= d) or weekend + return off + + def disconnect(self): + """"断开底层gateway的连接""" + if self.engine: + self.engine.disconnect(self.gateway_name) + self.connected = False + + def onTimer(self, event): + """定时器执行逻辑,每十秒执行一次""" + + # 十秒才执行一次检查 + self.g_count += 1 + if self.g_count <= 30: + return + self.g_count = 0 + dt = datetime.now() + self.engine.qryStatus() + if dt.hour != self.last_dt.hour: + self.last_dt = dt + print(u'noUiMain.py checkpoint:{0}'.format(dt)) + self.engine.writeLog( u'noUiMain.py checkpoint:{0}'.format(dt)) + + # 定时断开 + if self.trade_off(): + """非交易时间""" + if self.connected: + self.engine.writeLog(u'断开连接{0}'.format(self.gateway_name)) + self.disconnect() + self.engine.writeLog(u'清空数据引擎') + self.engine.clearData() + self.connected = False + self.engine.writeNotification(u'非交易时间{0},断开连接{1}'.format(dt, self.gateway_name)) + return + + # 交易时间内,定时重连和检查 + if not self.connected: + self.engine.writeLog(u'启动连接{0}'.format(self.gateway_name)) + self.engine.writeLog(u'清空数据引擎') + self.engine.clearData() + self.engine.writeLog(u'重新连接{0}'.format(self.gateway_name)) + self.engine.connect(self.gateway_name) + self.connected = True + self.disconnect_signal = 0 + self.engine.writeNotification(u'{0},重新连接{1}'.format(dt, self.gateway_name)) + return + else: + if not self.engine.checkGatewayStatus(self.gateway_name): + self.disconnect_signal += 1 + + if self.disconnect_signal >= 5: + self.engine.writeWarning(u'检查连接{0}异常,超过{1}次'.format(self.gateway_name,self.disconnect_signal)) + sys.exit(0) + else: + self.disconnect_signal = 0 + + def start(self): + """启动""" + super(VtServer, self).start() + + # 若需要连接数据库,则启动 + # self.mainEngine.dbConnect() + + # 加载cta的配置 + print( u'load cta setting') + self.engine.ctaEngine.loadSetting() + + print(u'initialize all strategies') + # 初始化策略,如果多个,则需要逐一初始化多个 + for s in self.strategies: + print( 'init trategy {0}'.format(s)) + self.engine.ctaEngine.initStrategy(s) + # 逐一启动策略 + print( 'start strategy {0}'.format(s)) + self.engine.ctaEngine.startStrategy(s) + + # 指定的连接配置 + if not self.trade_off(): + print( u'connect gateway:{0}'.format(self.gateway_name)) + self.engine.connect(self.gateway_name) + self.connected = True + + # 注册定时器,用于判断重连 + self.engine.eventEngine.register(EVENT_TIMER, self.onTimer) + + # 所有的日志监控 + self.logM = LogMonitor(self.engine.eventEngine) + self.errorM = ErrorMonitor(self.engine.eventEngine) + self.tradeM = TradeMonitor(self.engine.eventEngine) + self.orderM = OrderMonitor(self.engine.eventEngine, self.engine) + self.positionM = PositionMonitor(self.engine.eventEngine) + self.accountM = AccountMonitor(self.engine.eventEngine) + + self.engine.writeNotification(u'{0},服务启动{1}'.format(datetime.now(),self. gateway_name)) + + # ---------------------------------------------------------------------- + def eventHandler(self, event): + """事件处理""" + try: + # 调用RpcServer.publish() + if isinstance(event.type_, str): + self.publish(event.type_, event) + else: + self.publish(event.type_.encode('utf-8'), event) + + except Exception as ex: + print( u'event Handler exception:{0}'.format(str(ex))) + + # ---------------------------------------------------------------------- + def stopServer(self): + """停止服务器""" + print( 'stopServer') + # 关闭引擎 + self.engine.exit() + # 停止服务器线程 + self.stop() + +# ---------------------------------------------------------------------- +def printLog(content): + """打印日志""" + print( datetime.now().strftime("%H:%M:%S"), '\t', content) + +# ---------------------------------------------------------------------- +def runServer(): + """运行服务器""" + + try: + log_file_name = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'logs', u'noUiMain.log')) + except Exception as ex: + print( u'Use local dict:{0}'.format(os.getcwd())) + log_file_name = os.path.abspath(os.path.join(os.getcwd(), 'logs', u'noUiMain.log')) + + setup_logger(filename=log_file_name, debug=False) + + # Req/Publish端口 + try: + zmqAddressDict = globalSetting['ZMQ'] + zmqAddress = zmqAddressDict[AUTO_CONNCET_GW] + reqAddress = zmqAddress['ReqAddress'] + pubAddress = zmqAddress['PubAddress'] + except: + reqAddress = 'tcp://*:2014' + pubAddress = 'tcp://*:2016' + + # 创建并启动服务器 + server = VtServer(reqAddress, pubAddress) + server.start() + + printLog('-' * 50) + printLog(u'Request端口:{0}'.format(reqAddress)) + printLog(u'Publish端口:{0}'.format(pubAddress)) + printLog(u'Trader服务器已启动') + + +if __name__ == '__main__': + + # 主程序 + thread = Thread(target=runServer, args=()) + thread.start() diff --git a/examples/Services/service.py b/examples/Services/service.py new file mode 100644 index 00000000..62d16457 --- /dev/null +++ b/examples/Services/service.py @@ -0,0 +1,376 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# python 3 环境 +# 激活 activate.sh (激活py35 env,启动运行程序 + +import sys +import time +from datetime import datetime +# import commands +import os +import subprocess +import psutil + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +try: + import vnpy.trader.util_mail as sendmail +except : + pass + +# python容器文件 +python_path = '/home/trade/anaconda3/envs/py35/bin/python' + +# shell 文件,不使用sh +bash = "/bin/bash" + +# 运行时间段 +valid_time_span = '08:55:00~15:30:00,20:45:00~02:35:00' + +# 日志目录 +log_path = os.path.abspath(os.path.join(os.getcwd(), 'logs')) +if os.path.isdir(log_path): + # 如果工作目录下,存在logs子目录,就使用logs子目录 + base_path = os.getcwd() +else: + # 使用service.py所在得目录 + base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) + +# 进程组id保存文件 +gpid_file = os.path.abspath(os.path.join(base_path, 'logs', 'gpid.txt')) + +tmp_cron_file = os.path.abspath(os.path.join(base_path, 'logs', 'cron.tmp')) + +program_file = os.path.join(base_path, 'activate.sh') +log_file = os.path.abspath(os.path.join(base_path,'logs', 'service.log')) +error_file = os.path.abspath(os.path.join(base_path, 'logs', 'service-error.log')) +cron_log_file = os.path.abspath(os.path.join(base_path,'logs', 'cron.log')) +cron_error_file = os.path.abspath(os.path.join(base_path, 'logs', 'cron-error.log')) +null_file = "/dev/null" + +cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file, cron_error_file) +program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file) + +USE_GPID = False + +def _check_gpid(gpid): + """ + 检查进程(组)ID + :param gpid: + :return: True, 正在运行/ False: 没有运行 + """ + if not USE_GPID: + int_gpid = int(gpid) + return psutil.pid_exists(int_gpid) + + try: + # 通过系统子进程,打开ps命令,找到gpid下得所有进程 + p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print('找不到shell运行命令ps', file=sys.stderr) + exit(1) + + #print('returncode1:{}'.format(returncode)) + try: + p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + returncode = p2.wait() + except OSError as e: + print(u'找不到shell运行命令uniq', file=sys.stderr) + exit(1) + + # print('returncode2:{}'.format(returncode)) + for i in p2.stdout.readlines(): + #print (u'p2.line:{}'.format(i)) + if i.decode().strip() == gpid: + print(u'找到gpid:{}'.format(gpid)) + return True + + print(u'找不到gpid:{}'.format(gpid)) + return False + +def _status(): + """ + 查询当前状态 + :return: + """ + print(u'检查{}'.format(gpid_file)) + if os.path.exists(gpid_file): + with open(gpid_file, 'r') as f: + gpid = f.read().strip() + print(u'gpid={}'.format(gpid)) + if gpid != "" and _check_gpid(gpid): + return gpid + + return None + +def trade_off(): + """检查现在是否为非交易时间""" + now = datetime.now() + a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0) + b = datetime.now().replace(hour=8, minute=55, second=0, microsecond=0) + c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0) + d = datetime.now().replace(hour=20, minute=55, second=0, microsecond=0) + weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or(now.isoweekday() == 1 and now <= a) + off = (a<= now <= b) or (c <= now <= d) or weekend + return off + + +def _check_stop_time(): + """ + 当前时间是否属于停止运行 + :return: True:属于停止运行期间;False:不属于 + """ + time_span_list = [] + for time_span in valid_time_span.split(","): + time_pair = tuple([item.strip() for item in time_span.split("~")]) + time_span_list.append(time_pair) + time_span_list.sort(key=lambda pair: pair[0]) + if len(time_span_list) == 0: + return False + import datetime + now_time = datetime.datetime.now() + a = now_time.replace(hour=2, minute=35, second=0, microsecond=0) + weekend = (now_time.isoweekday() == 6 and now_time >= a) or (now_time.isoweekday() == 7) or (now_time.isoweekday() == 1 and now_time <=a) + if weekend: + return True + + now_time_str = now_time.strftime('%H:%M:%S') + stop_flag = True + if time_span_list[-1][1] > now_time_str: + stop_flag = False + else: + watch_flag = False + for item in time_span_list: + if item[0] > now_time_str: + stop_flag = True + watch_flag = True + break + if item[1] > now_time_str: + stop_flag = False + watch_flag = True + break + + if not watch_flag: + if time_span_list[-1][1] < time_span_list[0][0]: + stop_flag = False + else: + stop_flag = True + + return stop_flag + + +def _start(): + """ + 启动服务 + :return: + """ + # 获取进程组id + gpid = _status() + if _check_stop_time(): + # 属于停止运行期间 + if gpid: + print(u'现在属于停止运行时间,进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid)) + import signal + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'杀死进程中,等待{}秒'.format(i)) + if i > 30: + print(u'杀死进程失败,退出') + exit(1) + + print('进程组已停止运行[gpid={}]'.format(gpid)) + try: + sendmail.sendmail(subject='Notification: {0}目录下服务 killed by service.py'.format(base_path), + msgcontent='停止运行时间,已将进程组[gpid={}]杀死.'.format(gpid)) + except: + print(u'发送通知邮件失败', file=sys.stderr) + pass + else: + print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now())) + else: + # 属于运行时间 + if not gpid: + print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(),program_command)) + if os.path.isfile(gpid_file): + print(u'{0}文件存在,先执行删除'.format(gpid_file)) + try: + os.remove(gpid_file) + except: + pass + + os.popen(program_command) + i = 0 + while True: + gpid = _status() + if gpid: + print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid)) + try: + sendmail.sendmail( + subject='Notification: {0}目录下进程启动'.format(base_path), + msgcontent='{}属于运行时间,已启动服务[gpid={}]'.format(datetime.now(), gpid)) + except: + print(u'发送通知邮件失败', file=sys.stderr) + pass + break + + i += 1 + print(u'启动进程中,等待{}秒'.format(i)) + if i > 30: + print(u'启动进程失败,退出') + exit(1) + time.sleep(1) + + + else: + print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(),base_path)) + +def schedule(): + """ + crontab 计划执行 + :return: + """ + print( '======schedule========') + _start() + + +def status(): + """查看状态""" + print('======status========') + + gpid = _status() + if gpid: + print('{}服务进程[gpid={}]正在运行'.format(base_path,gpid)) + else: + print('{}服务进程没有运行.'.format(base_path)) + +# operate的可选字符串为:add, del +def operate_crontab(operate): + """ + 操作crontab + :param operate: add , del + :return: + """ + + try: + # 从系统命令中,获取定时任务 + p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print(u"找不到shell运行命令crontab", file=sys.stderr) + exit(1) + + remain_cron_list = [] + exist_flag = False + for i in p.stdout.readlines(): + if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0: + old_cron_content = i.decode("utf-8") + exist_flag = True + else: + remain_cron_list.append(i.decode("utf-8")) + + if operate == "add" and not exist_flag: + remain_cron_list.append(cron_content) + remain_cron_list.append("\n") + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr) + + if operate == "del" and exist_flag: + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr) + + # os.remove(tmp_cron_file) + +def start(): + print(u'======start========') + # 往任务表增加定时计划 + operate_crontab("add") + # 执行启动 + _start() + print(u'启动{}服务执行完毕'.format(base_path)) + +def _stop(): + print(u'======stop========') + # 在任务表删除定时计划 + operate_crontab("del") + + # 查询进程组id + gpid = _status() + if gpid: + # 进程组存在,杀死进程 + import signal + # 杀死进程组 + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'等待{}秒'.format(i)) + + print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path,gpid)) + + try: + sendmail.sendmail(subject='Notification: {}目录下服务进程停止'.format(base_path), + msgcontent= '服务进程[gpid={}]停止'.format(gpid)) + except: + print(u'发送通知邮件失败') + pass + else: + print(u'{}服务进程没有运行'.format(base_path)) + +def stop(): + """ + 停止服务 + :return: + """ + _stop() + print(u'执行停止{}服务完成'.format(base_path)) + + +def restart(): + print(u'======restart========') + _stop() + _start() + print('执行重启{}服务完成'.format(base_path)) + + +if __name__ == '__main__': + if len(sys.argv) >=2: + fun = sys.argv[1] + else: + fun = '' + if fun == 'status': + status() + elif fun == 'start': + start() + elif fun == 'stop': + stop() + elif fun == 'restart': + restart() + elif fun == 'schedule': + schedule() + else: + print( u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__)))