linux驻留程序

This commit is contained in:
msincenselee 2018-05-29 00:04:00 +08:00
parent 96eccfb004
commit ada4a2d265
4 changed files with 658 additions and 0 deletions

View File

View File

@ -0,0 +1,17 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
source $CONDA_HOME/bin/deactivate
source $CONDA_HOME/bin/activate $CONDA_HOME/envs/py35
############
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./runServer.py
python $PROGRAM_NAME >$BASE_PATH/logs/service.log 2>>$BASE_PATH/logs/service-error.log &

View File

@ -0,0 +1,265 @@
# encoding: utf-8
# 该文件为无界面启动文件以vtServer为容器加载MainEngine
# 配置:
# self.gateway_name gateway 的连接名称在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json"
# self.strategies启动的策略实例须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi']
# vtServer的ZMQ端口 从VT_Setting.json中配置根据AUTO_CONNCET_GW找到对应得端口配置
import os
import sys
import ctypes
from datetime import datetime, timedelta, date
from time import sleep
from threading import Thread
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.trader.vtEvent import *
from vnpy.rpc import RpcServer
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.gateway import ctpGateway
from vnpy.trader.setup_logger import setup_logger,get_logger
from vnpy.trader.util_monitor import *
from vnpy.trader.vtGlobal import globalSetting
from vnpy.trader.util_gpid import *
from vnpy.trader.app import ctaStrategy,riskManager
AUTO_CONNCET_GW = 'CTP'
########################################################################
class VtServer(RpcServer):
"""vn.trader 无界面服务器"""
# ----------------------------------------------------------------------
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(VtServer, self).__init__(repAddress, pubAddress)
#self.usePickle()
# gateway 是否连接
self.connected = False
# gateway 的连接名称在vtEngine.initGateway()里面定义,对应的配置文件是 "连接名称_connect.json"
self.gateway_name = AUTO_CONNCET_GW
# 启动的策略实例须在catStrategy/CtaSetting.json 里面定义 [u'S28_RB1001', u'S28_TFT', u'S28_HCRB',u'atr_rsi']
self.strategies = [u'S30_RB0510', u'S30_HCRB05']
self.g_count = 0
self.disconnect_signal = 0
self.last_dt = datetime.now()
# 创建事件引擎
ee = EventEngine2()
# 创建主引擎对象
print( u'instance mainengine')
self.engine = MainEngine(ee)
# 添加CTP Gateway,配置文件为 CTP_Post
self.engine.addGateway(ctpGateway, self.gateway_name)
# 添加应用
self.engine.addApp(ctaStrategy)
self.engine.addApp(riskManager)
# 注册主引擎的方法到服务器的RPC函数
self.register(self.engine.connect)
self.register(self.engine.disconnect)
self.register(self.engine.subscribe)
self.register(self.engine.sendOrder)
self.register(self.engine.cancelOrder)
self.register(self.engine.qryAccount)
self.register(self.engine.qryPosition)
self.register(self.engine.checkGatewayStatus) # 检测gateway的连接状态
self.register(self.engine.qryStatus) # 检测ctaEngine的状态
self.register(self.engine.exit)
self.register(self.engine.writeLog)
self.register(self.engine.dbConnect)
self.register(self.engine.dbInsert)
self.register(self.engine.dbQuery)
self.register(self.engine.dbUpdate)
self.register(self.engine.getContract)
self.register(self.engine.getAllContracts)
self.register(self.engine.getOrder)
self.register(self.engine.getAllWorkingOrders)
self.register(self.engine.getAllGatewayNames)
self.register(self.engine.saveData)
self.register(self.engine.initStrategy)
self.register(self.engine.startStrategy)
self.register(self.engine.stopStrategy)
# 注册事件引擎发送的事件处理监听
self.engine.eventEngine.registerGeneralHandler(self.eventHandler)
def trade_off(self):
"""检查现在是否为非交易时间"""
now = datetime.now()
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=8, minute=55, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=55, second=0, microsecond=0)
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (now.isoweekday() == 1 and now <=b)
off = (a <= now <= b) or (c <= now <= d) or weekend
return off
def disconnect(self):
""""断开底层gateway的连接"""
if self.engine:
self.engine.disconnect(self.gateway_name)
self.connected = False
def onTimer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 十秒才执行一次检查
self.g_count += 1
if self.g_count <= 30:
return
self.g_count = 0
dt = datetime.now()
self.engine.qryStatus()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
print(u'noUiMain.py checkpoint:{0}'.format(dt))
self.engine.writeLog( u'noUiMain.py checkpoint:{0}'.format(dt))
# 定时断开
if self.trade_off():
"""非交易时间"""
if self.connected:
self.engine.writeLog(u'断开连接{0}'.format(self.gateway_name))
self.disconnect()
self.engine.writeLog(u'清空数据引擎')
self.engine.clearData()
self.connected = False
self.engine.writeNotification(u'非交易时间{0},断开连接{1}'.format(dt, self.gateway_name))
return
# 交易时间内,定时重连和检查
if not self.connected:
self.engine.writeLog(u'启动连接{0}'.format(self.gateway_name))
self.engine.writeLog(u'清空数据引擎')
self.engine.clearData()
self.engine.writeLog(u'重新连接{0}'.format(self.gateway_name))
self.engine.connect(self.gateway_name)
self.connected = True
self.disconnect_signal = 0
self.engine.writeNotification(u'{0},重新连接{1}'.format(dt, self.gateway_name))
return
else:
if not self.engine.checkGatewayStatus(self.gateway_name):
self.disconnect_signal += 1
if self.disconnect_signal >= 5:
self.engine.writeWarning(u'检查连接{0}异常,超过{1}'.format(self.gateway_name,self.disconnect_signal))
sys.exit(0)
else:
self.disconnect_signal = 0
def start(self):
"""启动"""
super(VtServer, self).start()
# 若需要连接数据库,则启动
# self.mainEngine.dbConnect()
# 加载cta的配置
print( u'load cta setting')
self.engine.ctaEngine.loadSetting()
print(u'initialize all strategies')
# 初始化策略,如果多个,则需要逐一初始化多个
for s in self.strategies:
print( 'init trategy {0}'.format(s))
self.engine.ctaEngine.initStrategy(s)
# 逐一启动策略
print( 'start strategy {0}'.format(s))
self.engine.ctaEngine.startStrategy(s)
# 指定的连接配置
if not self.trade_off():
print( u'connect gateway:{0}'.format(self.gateway_name))
self.engine.connect(self.gateway_name)
self.connected = True
# 注册定时器,用于判断重连
self.engine.eventEngine.register(EVENT_TIMER, self.onTimer)
# 所有的日志监控
self.logM = LogMonitor(self.engine.eventEngine)
self.errorM = ErrorMonitor(self.engine.eventEngine)
self.tradeM = TradeMonitor(self.engine.eventEngine)
self.orderM = OrderMonitor(self.engine.eventEngine, self.engine)
self.positionM = PositionMonitor(self.engine.eventEngine)
self.accountM = AccountMonitor(self.engine.eventEngine)
self.engine.writeNotification(u'{0},服务启动{1}'.format(datetime.now(),self. gateway_name))
# ----------------------------------------------------------------------
def eventHandler(self, event):
"""事件处理"""
try:
# 调用RpcServer.publish()
if isinstance(event.type_, str):
self.publish(event.type_, event)
else:
self.publish(event.type_.encode('utf-8'), event)
except Exception as ex:
print( u'event Handler exception:{0}'.format(str(ex)))
# ----------------------------------------------------------------------
def stopServer(self):
"""停止服务器"""
print( 'stopServer')
# 关闭引擎
self.engine.exit()
# 停止服务器线程
self.stop()
# ----------------------------------------------------------------------
def printLog(content):
"""打印日志"""
print( datetime.now().strftime("%H:%M:%S"), '\t', content)
# ----------------------------------------------------------------------
def runServer():
"""运行服务器"""
try:
log_file_name = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'logs', u'noUiMain.log'))
except Exception as ex:
print( u'Use local dict:{0}'.format(os.getcwd()))
log_file_name = os.path.abspath(os.path.join(os.getcwd(), 'logs', u'noUiMain.log'))
setup_logger(filename=log_file_name, debug=False)
# Req/Publish端口
try:
zmqAddressDict = globalSetting['ZMQ']
zmqAddress = zmqAddressDict[AUTO_CONNCET_GW]
reqAddress = zmqAddress['ReqAddress']
pubAddress = zmqAddress['PubAddress']
except:
reqAddress = 'tcp://*:2014'
pubAddress = 'tcp://*:2016'
# 创建并启动服务器
server = VtServer(reqAddress, pubAddress)
server.start()
printLog('-' * 50)
printLog(u'Request端口:{0}'.format(reqAddress))
printLog(u'Publish端口:{0}'.format(pubAddress))
printLog(u'Trader服务器已启动')
if __name__ == '__main__':
# 主程序
thread = Thread(target=runServer, args=())
thread.start()

View File

@ -0,0 +1,376 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# python 3 环境
# 激活 activate.sh (激活py35 env启动运行程序
import sys
import time
from datetime import datetime
# import commands
import os
import subprocess
import psutil
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
try:
import vnpy.trader.util_mail as sendmail
except :
pass
# python容器文件
python_path = '/home/trade/anaconda3/envs/py35/bin/python'
# shell 文件不使用sh
bash = "/bin/bash"
# 运行时间段
valid_time_span = '08:55:00~15:30:00,20:45:00~02:35:00'
# 日志目录
log_path = os.path.abspath(os.path.join(os.getcwd(), 'logs'))
if os.path.isdir(log_path):
# 如果工作目录下存在logs子目录就使用logs子目录
base_path = os.getcwd()
else:
# 使用service.py所在得目录
base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
# 进程组id保存文件
gpid_file = os.path.abspath(os.path.join(base_path, 'logs', 'gpid.txt'))
tmp_cron_file = os.path.abspath(os.path.join(base_path, 'logs', 'cron.tmp'))
program_file = os.path.join(base_path, 'activate.sh')
log_file = os.path.abspath(os.path.join(base_path,'logs', 'service.log'))
error_file = os.path.abspath(os.path.join(base_path, 'logs', 'service-error.log'))
cron_log_file = os.path.abspath(os.path.join(base_path,'logs', 'cron.log'))
cron_error_file = os.path.abspath(os.path.join(base_path, 'logs', 'cron-error.log'))
null_file = "/dev/null"
cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file, cron_error_file)
program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file)
USE_GPID = False
def _check_gpid(gpid):
"""
检查进程()ID
:param gpid:
:return: True, 正在运行/ False: 没有运行
"""
if not USE_GPID:
int_gpid = int(gpid)
return psutil.pid_exists(int_gpid)
try:
# 通过系统子进程打开ps命令找到gpid下得所有进程
p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print('找不到shell运行命令ps', file=sys.stderr)
exit(1)
#print('returncode1:{}'.format(returncode))
try:
p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
returncode = p2.wait()
except OSError as e:
print(u'找不到shell运行命令uniq', file=sys.stderr)
exit(1)
# print('returncode2:{}'.format(returncode))
for i in p2.stdout.readlines():
#print (u'p2.line:{}'.format(i))
if i.decode().strip() == gpid:
print(u'找到gpid:{}'.format(gpid))
return True
print(u'找不到gpid:{}'.format(gpid))
return False
def _status():
"""
查询当前状态
:return:
"""
print(u'检查{}'.format(gpid_file))
if os.path.exists(gpid_file):
with open(gpid_file, 'r') as f:
gpid = f.read().strip()
print(u'gpid={}'.format(gpid))
if gpid != "" and _check_gpid(gpid):
return gpid
return None
def trade_off():
"""检查现在是否为非交易时间"""
now = datetime.now()
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=8, minute=55, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=55, second=0, microsecond=0)
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or(now.isoweekday() == 1 and now <= a)
off = (a<= now <= b) or (c <= now <= d) or weekend
return off
def _check_stop_time():
"""
当前时间是否属于停止运行
:return: True属于停止运行期间False不属于
"""
time_span_list = []
for time_span in valid_time_span.split(","):
time_pair = tuple([item.strip() for item in time_span.split("~")])
time_span_list.append(time_pair)
time_span_list.sort(key=lambda pair: pair[0])
if len(time_span_list) == 0:
return False
import datetime
now_time = datetime.datetime.now()
a = now_time.replace(hour=2, minute=35, second=0, microsecond=0)
weekend = (now_time.isoweekday() == 6 and now_time >= a) or (now_time.isoweekday() == 7) or (now_time.isoweekday() == 1 and now_time <=a)
if weekend:
return True
now_time_str = now_time.strftime('%H:%M:%S')
stop_flag = True
if time_span_list[-1][1] > now_time_str:
stop_flag = False
else:
watch_flag = False
for item in time_span_list:
if item[0] > now_time_str:
stop_flag = True
watch_flag = True
break
if item[1] > now_time_str:
stop_flag = False
watch_flag = True
break
if not watch_flag:
if time_span_list[-1][1] < time_span_list[0][0]:
stop_flag = False
else:
stop_flag = True
return stop_flag
def _start():
"""
启动服务
:return:
"""
# 获取进程组id
gpid = _status()
if _check_stop_time():
# 属于停止运行期间
if gpid:
print(u'现在属于停止运行时间进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid))
import signal
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'杀死进程中,等待{}'.format(i))
if i > 30:
print(u'杀死进程失败,退出')
exit(1)
print('进程组已停止运行[gpid={}]'.format(gpid))
try:
sendmail.sendmail(subject='Notification: {0}目录下服务 killed by service.py'.format(base_path),
msgcontent='停止运行时间,已将进程组[gpid={}]杀死.'.format(gpid))
except:
print(u'发送通知邮件失败', file=sys.stderr)
pass
else:
print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now()))
else:
# 属于运行时间
if not gpid:
print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(),program_command))
if os.path.isfile(gpid_file):
print(u'{0}文件存在,先执行删除'.format(gpid_file))
try:
os.remove(gpid_file)
except:
pass
os.popen(program_command)
i = 0
while True:
gpid = _status()
if gpid:
print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid))
try:
sendmail.sendmail(
subject='Notification: {0}目录下进程启动'.format(base_path),
msgcontent='{}属于运行时间,已启动服务[gpid={}]'.format(datetime.now(), gpid))
except:
print(u'发送通知邮件失败', file=sys.stderr)
pass
break
i += 1
print(u'启动进程中,等待{}'.format(i))
if i > 30:
print(u'启动进程失败,退出')
exit(1)
time.sleep(1)
else:
print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(),base_path))
def schedule():
"""
crontab 计划执行
:return:
"""
print( '======schedule========')
_start()
def status():
"""查看状态"""
print('======status========')
gpid = _status()
if gpid:
print('{}服务进程[gpid={}]正在运行'.format(base_path,gpid))
else:
print('{}服务进程没有运行.'.format(base_path))
# operate的可选字符串为add, del
def operate_crontab(operate):
"""
操作crontab
:param operate: add , del
:return:
"""
try:
# 从系统命令中,获取定时任务
p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print(u"找不到shell运行命令crontab", file=sys.stderr)
exit(1)
remain_cron_list = []
exist_flag = False
for i in p.stdout.readlines():
if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0:
old_cron_content = i.decode("utf-8")
exist_flag = True
else:
remain_cron_list.append(i.decode("utf-8"))
if operate == "add" and not exist_flag:
remain_cron_list.append(cron_content)
remain_cron_list.append("\n")
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr)
if operate == "del" and exist_flag:
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr)
# os.remove(tmp_cron_file)
def start():
print(u'======start========')
# 往任务表增加定时计划
operate_crontab("add")
# 执行启动
_start()
print(u'启动{}服务执行完毕'.format(base_path))
def _stop():
print(u'======stop========')
# 在任务表删除定时计划
operate_crontab("del")
# 查询进程组id
gpid = _status()
if gpid:
# 进程组存在,杀死进程
import signal
# 杀死进程组
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'等待{}'.format(i))
print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path,gpid))
try:
sendmail.sendmail(subject='Notification: {}目录下服务进程停止'.format(base_path),
msgcontent= '服务进程[gpid={}]停止'.format(gpid))
except:
print(u'发送通知邮件失败')
pass
else:
print(u'{}服务进程没有运行'.format(base_path))
def stop():
"""
停止服务
:return:
"""
_stop()
print(u'执行停止{}服务完成'.format(base_path))
def restart():
print(u'======restart========')
_stop()
_start()
print('执行重启{}服务完成'.format(base_path))
if __name__ == '__main__':
if len(sys.argv) >=2:
fun = sys.argv[1]
else:
fun = ''
if fun == 'status':
status()
elif fun == 'start':
start()
elif fun == 'stop':
stop()
elif fun == 'restart':
restart()
elif fun == 'schedule':
schedule()
else:
print( u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__)))