diff --git a/prod/stock_em02_gw/activate.sh b/prod/stock_em02_gw/activate.sh new file mode 100644 index 00000000..f73bc3a0 --- /dev/null +++ b/prod/stock_em02_gw/activate.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 +#$CONDA_HOME/bin/conda deactivate +#$CONDA_HOME/bin/conda activate py37 + +############ Added by Huang Jianwei at 2018-04-03 +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./run_service.py + +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log & diff --git a/prod/stock_em02_gw/ar_setting.json b/prod/stock_em02_gw/ar_setting.json new file mode 100644 index 00000000..2952f558 --- /dev/null +++ b/prod/stock_em02_gw/ar_setting.json @@ -0,0 +1,15 @@ +{ + "mongo_db": + { + "host": "192.168.1.214", + "port": 27017 + }, + "accounts": + { + "em02_gw": + { + "copy_history_trades": true, + "copy_history_orders": true + } + } +} diff --git a/prod/stock_em02_gw/connect_em02_gw.json b/prod/stock_em02_gw/connect_em02_gw.json new file mode 100644 index 00000000..2970c78d --- /dev/null +++ b/prod/stock_em02_gw/connect_em02_gw.json @@ -0,0 +1,7 @@ +{ + "资金账号": "zzzz", + "资金密码": "zzzz", + "并发连接数": 3, + "session缓存文件": "session.json", + "账号类型": "普通" +} diff --git a/prod/stock_em02_gw/custom_contracts.json b/prod/stock_em02_gw/custom_contracts.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/prod/stock_em02_gw/custom_contracts.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/prod/stock_em02_gw/rpc_service_setting.json b/prod/stock_em02_gw/rpc_service_setting.json new file mode 100644 index 00000000..3089eb27 --- /dev/null +++ b/prod/stock_em02_gw/rpc_service_setting.json @@ -0,0 +1,4 @@ +{ + "rep_address": "tcp://127.0.0.1:102201", + "pub_address": "tcp://127.0.0.1:102202" +} \ No newline at end of file diff --git a/prod/stock_em02_gw/run_service.py b/prod/stock_em02_gw/run_service.py new file mode 100644 index 00000000..39e44ca2 --- /dev/null +++ b/prod/stock_em02_gw/run_service.py @@ -0,0 +1,159 @@ +# flake8: noqa + +import os +import sys +import multiprocessing +from time import sleep +from datetime import datetime, time +from logging import DEBUG + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine, EVENT_TIMER +from vnpy.trader.setting import SETTINGS +from vnpy.trader.engine import MainEngine +from vnpy.trader.utility import load_json +from vnpy.gateway.eastmoney import EastmoneyGateway +#from vnpy.app.cta_stock import CtaStockApp +#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG +from vnpy.app.rpc_service import RpcServiceApp +#from vnpy.app.algo_broker import AlgoBrokerApp +from vnpy.app.account_recorder import AccountRecorderApp +from vnpy.trader.util_pid import update_pid +from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor + +SETTINGS["log.active"] = True +SETTINGS["log.level"] = DEBUG +SETTINGS["log.console"] = True +SETTINGS["log.file"] = True + +gateway_name = 'em02_gw' +gw_setting = load_json(f'connect_{gateway_name}.json') + +import types +import traceback + +def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None: + """ + Raise exception under debug mode + """ + sys.__excepthook__(exctype, value, tb) + + msg = "".join(traceback.format_exception(exctype, value, tb)) + + print(msg, file=sys.stderr) + +class DaemonService(object): + + def __init__(self): + self.event_engine = EventEngine() + self.g_count = 0 + self.last_dt = datetime.now() + # 创建账号/持仓/委托/交易/日志记录 + self.acc_monitor = AccountMonitor(self.event_engine) + self.pos_monitor = PositionMonitor(self.event_engine) + self.ord_monitor = OrderMonitor(self.event_engine) + self.trd_monitor = TradeMonitor(self.event_engine) + #self.log_monitor = LogMonitor(self.event_engine) + + # 创建主引擎 + self.main_engine = MainEngine(self.event_engine) + + self.save_data_time = None + self.save_snapshot_time = None + + # 注册定时器,用于判断重连 + self.event_engine.register(EVENT_TIMER, self.on_timer) + + def on_timer(self, event): + """定时器执行逻辑,每十秒执行一次""" + + # 60秒才执行一次检查 + self.g_count += 1 + if self.g_count <= 60: + return + + # 强制写入一次gpid + update_pid() + + self.g_count = 0 + dt = datetime.now() + + if dt.hour != self.last_dt.hour: + self.last_dt = dt + #print(u'run_server.py checkpoint:{0}'.format(dt)) + self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt)) + + # ctp 短线重连得处理 + + # 定时保存策略内数据 + if dt.strftime('%H:%M') in ['02:31', '15:16']: + if self.save_data_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内数据') + self.save_data_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_data('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']: + if self.save_snapshot_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内K线切片数据') + self.save_snapshot_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_snapshot('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + def start(self): + """ + Running in the child process. + """ + SETTINGS["log.file"] = True + + timer_count = 0 + + # 远程调用服务 + rpc_server = self.main_engine.add_app(RpcServiceApp) + ret, msg = rpc_server.start() + if not ret: + self.main_engine.write_log(f"RPC服务未能启动:{msg}") + return + else: + self.main_engine.write_log(f'RPC服务已启动') + + update_pid() + + # 添加账号同步app + self.main_engine.add_app(AccountRecorderApp) + + # 接入网关 + self.main_engine.add_gateway(EastmoneyGateway, gateway_name) + self.main_engine.write_log(f"连接{gateway_name}接口") + self.main_engine.connect(gw_setting, gateway_name) + + sleep(5) + + # # 添加cta引擎 + # cta_engine = self.main_engine.add_app(CtaStockApp) + # cta_engine.init_engine() + + # 添加算法引擎代理 + #self.main_engine.add_app(AlgoBrokerApp) + + self.main_engine.write_log("主引擎创建成功") + + while True: + sleep(1) + + +if __name__ == "__main__": + + sys.excepthook = excepthook + + s = DaemonService() + s.start() diff --git a/prod/stock_em02_gw/service.py b/prod/stock_em02_gw/service.py new file mode 100644 index 00000000..a30e2b59 --- /dev/null +++ b/prod/stock_em02_gw/service.py @@ -0,0 +1,394 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# python 3 环境 +# 激活 activate.sh (激活py35 env,启动运行程序 + +import sys +import time +from datetime import datetime +# import commands +import os +import subprocess +import psutil + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +from vnpy.trader.util_wechat import send_wx_msg + +# python容器文件 +python_path = '/home/trade/anaconda3/envs/py37/bin/python' + +# shell 文件,不使用sh +bash = "/bin/bash" + +# 配置内容 +# 运行时间段 +# 是否7X24小时运行(数字货币) +IS_7x24 = False +# 是否激活夜盘 +ACTIVE_NIGHT = False +# python 脚本,这里要和activate.sh里面得PROGRAM_NAME 一致 +PROGRAM_NAME = './run_service.py' + +# 日志目录 +log_path = os.path.abspath(os.path.join(os.getcwd(), 'log')) +if os.path.isdir(log_path): + # 如果工作目录下,存在logs子目录,就使用logs子目录 + base_path = os.getcwd() +else: + # 使用service.py所在得目录 + base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) + +# 进程组id保存文件 +gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt')) + +tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp')) + +program_file = os.path.join(base_path, 'activate.sh') +log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log')) +error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log')) +cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log')) +cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log')) +null_file = "/dev/null" + +cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file, + cron_error_file) +program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file) + +USE_GPID = False + + +def _check_gpid(gpid): + """ + 检查进程(组)ID + :param gpid: + :return: True, 正在运行/ False: 没有运行 + """ + if not USE_GPID: + int_gpid = int(gpid) + return psutil.pid_exists(int_gpid) + + try: + # 通过系统子进程,打开ps命令,找到gpid下得所有进程 + p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print('找不到shell运行命令ps', file=sys.stderr) + exit(1) + + # print('returncode1:{}'.format(returncode)) + try: + p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + returncode = p2.wait() + except OSError as e: + print(u'找不到shell运行命令uniq', file=sys.stderr) + exit(1) + + # print('returncode2:{}'.format(returncode)) + for i in p2.stdout.readlines(): + # print (u'p2.line:{}'.format(i)) + if i.decode().strip() == gpid: + print(u'找到gpid:{}'.format(gpid)) + return True + + print(u'找不到gpid:{}'.format(gpid)) + return False + + +def _status(): + """ + 查询当前状态 + :return: + """ + print(u'检查{}'.format(gpid_file)) + if os.path.exists(gpid_file): + with open(gpid_file, 'r') as f: + gpid = f.read().strip() + print(u'gpid={}'.format(gpid)) + if gpid != "" and _check_gpid(gpid): + return gpid + + return None + + +def trade_off(): + """检查现在是否为非交易时间""" + now = datetime.now() + + # 数字货币 + if IS_7x24: + if now.hour == 12 and now.minute == 0: + return True + else: + return False + + # 国内期货/股票 + a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0) + b = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0) + c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0) + d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0) + + # 国内期货有夜盘 + if ACTIVE_NIGHT: + weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or ( + now.isoweekday() == 1 and now <= a) + off = (a <= now <= b) or (c <= now <= d) or weekend + return off + else: + weekend = now.isoweekday() in [6, 7] + off = now <= b or c <= now or weekend + return off + + +def _start(): + """ + 启动服务 + :return: + """ + # 获取进程组id + gpid = _status() + if trade_off(): + # 属于停止运行期间 + if gpid: + print(u'现在属于停止运行时间,进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid)) + import signal + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'杀死进程中,等待{}秒'.format(i)) + if i > 30: + print(u'杀死进程失败,退出') + exit(1) + + print('进程组已停止运行[gpid={}]'.format(gpid)) + send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path)) + else: + print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now())) + else: + # 属于运行时间 + if not gpid: + print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command)) + if os.path.isfile(gpid_file): + print(u'{0}文件存在,先执行删除'.format(gpid_file)) + try: + os.remove(gpid_file) + except: + pass + + os.popen(program_command) + i = 0 + while True: + gpid = _status() + if gpid: + print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid)) + send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid)) + break + + i += 1 + print(u'启动进程中,等待{}秒'.format(i)) + if i > 30: + print(u'启动进程失败,退出') + exit(1) + time.sleep(1) + + + else: + print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path)) + + +def schedule(): + """ + crontab 计划执行 + :return: + """ + print('======schedule========') + _start() + + +def status(): + """查看状态""" + print('======status========') + + gpid = _status() + if gpid: + print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid)) + else: + print('{}服务进程没有运行.'.format(base_path)) + + check_pids_in_cwd(gpid) + + +# operate的可选字符串为:add, del +def operate_crontab(operate): + """ + 操作crontab + :param operate: add , del + :return: + """ + + try: + # 从系统命令中,获取定时任务 + p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print(u"找不到shell运行命令crontab", file=sys.stderr) + exit(1) + + remain_cron_list = [] + exist_flag = False + old_cron_content = '' + for i in p.stdout.readlines(): + if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0: + old_cron_content = i.decode("utf-8") + exist_flag = True + else: + remain_cron_list.append(i.decode("utf-8")) + + if operate == "add" and not exist_flag: + remain_cron_list.append(cron_content) + remain_cron_list.append("\n") + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr) + + if operate == "del" and exist_flag: + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr) + + # os.remove(tmp_cron_file) + + +def check_pids_in_cwd(gpid=None): + print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME)) + + runing_pids = [] + for pid in psutil.pids(): + try: + p = psutil.Process(pid) + p_name = p.name() + if not p_name.endswith('python'): + continue + p_cwd = p.cwd() + if p_cwd != base_path: + continue + p_cmdline = p.cmdline() + if PROGRAM_NAME not in p_cmdline: + continue + + runing_pids.append(pid) + + except: + pass + + if len(runing_pids) > 1: + if gpid is not None: + if gpid in runing_pids: + print(u'排除其他pid') + runing_pids.remove(gpid) + else: + print(u'gpid,不在运行清单中,排除首个pid') + runing_pids.pop(0) + else: + print(u'gpid为空,排除首个pid') + runing_pids.pop(0) + + for pid in runing_pids: + try: + p = psutil.Process(pid) + print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行' + .format(pid, p.name, p.exe(), p.cwd(), p.cmdline())) + import signal + os.kill(int(pid), signal.SIGKILL) + except: + pass + + +def start(): + print(u'======start========') + # 往任务表增加定时计划 + operate_crontab("add") + print(u'任务表增加定时计划完毕') + # 执行启动 + # _start() + print(u'启动{}服务执行完毕'.format(base_path)) + + +def _stop(): + print(u'======stop========') + # 在任务表删除定时计划 + operate_crontab("del") + + # 查询进程组id + gpid = _status() + if gpid: + # 进程组存在,杀死进程 + import signal + # 杀死进程组 + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'等待{}秒'.format(i)) + + print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + else: + print(u'{}服务进程没有运行'.format(base_path)) + + +def stop(): + """ + 停止服务 + :return: + """ + _stop() + print(u'执行停止{}服务完成'.format(base_path)) + + +def restart(): + print(u'======restart========') + _stop() + _start() + print('执行重启{}服务完成'.format(base_path)) + + +if __name__ == '__main__': + if len(sys.argv) >= 2: + fun = sys.argv[1] + else: + fun = '' + if fun == 'status': + status() + elif fun == 'start': + start() + elif fun == 'stop': + stop() + elif fun == 'restart': + restart() + elif fun == 'schedule': + schedule() + else: + print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__))) + status() diff --git a/prod/stock_em02_gw/session.json b/prod/stock_em02_gw/session.json new file mode 100644 index 00000000..d4ed02e7 --- /dev/null +++ b/prod/stock_em02_gw/session.json @@ -0,0 +1,4 @@ +{ + "validatekey": "a51dc23f-443b-4321-a956-300b6498304c", + "cookie_str": "st_si=49015368899232;eastmoney_txzq_zjzh=NTQwMzMwMjE1NTM5fA%3D%3D;st_asi=delete;Yybdm=5403;st_pvi=31579671208450;st_inirUrl=https%3A%2F%2Fjy.xzsec.com%2FTrade%2FBuy;Uid=qPx0RsvGLgDoiKf0CU47yg%3d%3d;Khmc=%e6%9d%8e%e7%a4%ba%e4%bd%b3;st_sp=2021-10-31%2014%3A39%3A20;st_sn=2;st_psi=20211031143926579-11923323313501-2985332488;mobileimei=3529e943-4e71-4958-b784-7dcb6554b418;Uuid=c45d2d47879749cc91911f1a2dd1bda0" +} \ No newline at end of file diff --git a/prod/stock_em02_gw/vn_contract.pkb2 b/prod/stock_em02_gw/vn_contract.pkb2 new file mode 100644 index 00000000..fbe9739a Binary files /dev/null and b/prod/stock_em02_gw/vn_contract.pkb2 differ diff --git a/prod/stock_em02_gw/vt_setting.json b/prod/stock_em02_gw/vt_setting.json new file mode 100644 index 00000000..3ebc1e1b --- /dev/null +++ b/prod/stock_em02_gw/vt_setting.json @@ -0,0 +1,29 @@ +{ + "font.family": "Arial", + "font.size": 12, + "log.active": true, + "log.level": 10, + "log.console": true, + "log.file": true, + "email.server": "smtp.qq.com", + "email.port": 465, + "email.username": "", + "email.password": "", + "email.sender": "", + "email.receiver": "", + "rqdata.username": "", + "rqdata.password": "", + "database.driver": "sqlite", + "database.database": "database.db", + "database.host": "localhost", + "database.port": 3306, + "database.user": "root", + "database.password": "", + "database.authentication_source": "admin", + "huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price", + "huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price", + "renko.host": "192.168.1.214", + "hams.host": "192.168.1.214", + "rabbitmq.host": "192.168.1.211", + "gateway_name": "em02" +} diff --git a/prod/stock_em02_rpc01/activate.sh b/prod/stock_em02_rpc01/activate.sh new file mode 100644 index 00000000..f73bc3a0 --- /dev/null +++ b/prod/stock_em02_rpc01/activate.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 +#$CONDA_HOME/bin/conda deactivate +#$CONDA_HOME/bin/conda activate py37 + +############ Added by Huang Jianwei at 2018-04-03 +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./run_service.py + +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log & diff --git a/prod/stock_em02_rpc01/ar_setting.json b/prod/stock_em02_rpc01/ar_setting.json new file mode 100644 index 00000000..3d7fc16c --- /dev/null +++ b/prod/stock_em02_rpc01/ar_setting.json @@ -0,0 +1,16 @@ +{ + "mongo_db": + { + "host": "192.168.1.214", + "port": 27017 + }, + "accounts": + { + "em02": + { + "copy_history_trades": true, + "copy_history_orders": true + } + }, + "event_list": ["eStrategyPos.","eStrategySnapshot."] +} diff --git a/prod/stock_em02_rpc01/connect_em02_gw.json b/prod/stock_em02_rpc01/connect_em02_gw.json new file mode 100644 index 00000000..5661735c --- /dev/null +++ b/prod/stock_em02_rpc01/connect_em02_gw.json @@ -0,0 +1,4 @@ +{ + "主动请求地址": "tcp://127.0.0.1:102201", + "推送订阅地址": "tcp://127.0.0.1:102202" +} \ No newline at end of file diff --git a/prod/stock_em02_rpc01/cta_stock_config.json b/prod/stock_em02_rpc01/cta_stock_config.json new file mode 100644 index 00000000..a1895ac3 --- /dev/null +++ b/prod/stock_em02_rpc01/cta_stock_config.json @@ -0,0 +1,5 @@ +{ + "accountid" : "540330215539[普通]", + "strategy_group": "cta_stock_01", + "compare_pos": false +} diff --git a/prod/stock_em02_rpc01/cta_stock_setting.json b/prod/stock_em02_rpc01/cta_stock_setting.json new file mode 100644 index 00000000..365b0d20 --- /dev/null +++ b/prod/stock_em02_rpc01/cta_stock_setting.json @@ -0,0 +1,21 @@ +{ + "stock_grids_etf": { + "class_name": "StrategyStockGridTradeV3", + "vt_symbols": [ + "159995.SZSE" + ], + "auto_init": true, + "auto_start": true, + "setting": { + "backtesting": false, + "comments": [ + "芯片ETF" + ], + "max_invest_percent": 10, + "max_invest_margin": 100000, + "max_single_margin": 2000, + "grid_height_percent": 4, + "grid_lots": 15 + } + } +} \ No newline at end of file diff --git a/prod/stock_em02_rpc01/custom_contracts.json b/prod/stock_em02_rpc01/custom_contracts.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/prod/stock_em02_rpc01/custom_contracts.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/prod/stock_em02_rpc01/rpc_service_setting.json b/prod/stock_em02_rpc01/rpc_service_setting.json new file mode 100644 index 00000000..74a479ea --- /dev/null +++ b/prod/stock_em02_rpc01/rpc_service_setting.json @@ -0,0 +1,4 @@ +{ + "rep_address": "tcp://127.0.0.1:102203", + "pub_address": "tcp://127.0.0.1:102204" +} diff --git a/prod/stock_em02_rpc01/run_main_em02_rpc01.py b/prod/stock_em02_rpc01/run_main_em02_rpc01.py new file mode 100644 index 00000000..1733dfde --- /dev/null +++ b/prod/stock_em02_rpc01/run_main_em02_rpc01.py @@ -0,0 +1,117 @@ +# flake8: noqa + +import os +import sys +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) +print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine + +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp + +#from vnpy.gateway.binancef import BinancefGateway +# from vnpy.gateway.bitmex import BitmexGateway +# from vnpy.gateway.futu import FutuGateway +# from vnpy.gateway.ib import IbGateway +# from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.ctptest import CtptestGateway +# from vnpy.gateway.mini import MiniGateway +# from vnpy.gateway.sopt import SoptGateway +# from vnpy.gateway.minitest import MinitestGateway +# from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.tiger import TigerGateway +# from vnpy.gateway.oes import OesGateway +# from vnpy.gateway.okex import OkexGateway +# from vnpy.gateway.huobi import HuobiGateway +# from vnpy.gateway.bitfinex import BitfinexGateway +# from vnpy.gateway.onetoken import OnetokenGateway +# from vnpy.gateway.okexf import OkexfGateway +# from vnpy.gateway.okexs import OkexsGateway +# from vnpy.gateway.xtp import XtpGateway +# from vnpy.gateway.hbdm import HbdmGateway +# from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.tora import ToraGateway +# from vnpy.gateway.alpaca import AlpacaGateway +# from vnpy.gateway.da import DaGateway +# from vnpy.gateway.coinbase import CoinbaseGateway +# from vnpy.gateway.bitstamp import BitstampGateway +# from vnpy.gateway.gateios import GateiosGateway +# from vnpy.gateway.bybit import BybitGateway +# from vnpy.gateway.eastmoney import EastmoneyGateway +from vnpy.gateway.rpc import RpcGateway + +# from vnpy.app.cta_crypto import CtaCryptoApp +from vnpy.app.cta_stock import CtaStockApp +# from vnpy.app.csv_loader import CsvLoaderApp +# from vnpy.app.algo_trading import AlgoTradingApp +# from vnpy.app.cta_backtester import CtaBacktesterApp +# from vnpy.app.data_recorder import DataRecorderApp +# from vnpy.app.tick_recorder import TickRecorderApp +# from vnpy.app.risk_manager import RiskManagerApp +# from vnpy.app.script_trader import ScriptTraderApp +# from vnpy.app.rpc_service import RpcServiceApp +# from vnpy.app.spread_trading import SpreadTradingApp +# from vnpy.app.portfolio_manager import PortfolioManagerApp + + +def main(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(RpcGateway, 'em02_gw') + #main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(CtptestGateway) + # main_engine.add_gateway(MiniGateway) + # main_engine.add_gateway(SoptGateway) + # main_engine.add_gateway(MinitestGateway) + # main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(IbGateway) + # main_engine.add_gateway(FutuGateway) + #main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(TigerGateway) + # main_engine.add_gateway(OesGateway) + # main_engine.add_gateway(OkexGateway) + # main_engine.add_gateway(HuobiGateway) + # main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(OnetokenGateway) + # main_engine.add_gateway(OkexfGateway) + # main_engine.add_gateway(HbdmGateway) + # main_engine.add_gateway(XtpGateway) + # main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(ToraGateway) + # main_engine.add_gateway(AlpacaGateway) + # main_engine.add_gateway(OkexsGateway) + # main_engine.add_gateway(DaGateway) + # main_engine.add_gateway(CoinbaseGateway) + # main_engine.add_gateway(BitstampGateway) + # main_engine.add_gateway(GateiosGateway) + #main_engine.add_gateway(BybitGateway) + + #main_engine.add_app(CtaStrategyApp) + main_engine.add_app(CtaStockApp) + #main_engine.add_app(CtaBacktesterApp) + # main_engine.add_app(CsvLoaderApp) + # main_engine.add_app(AlgoTradingApp) + #main_engine.add_app(DataRecorderApp) + #main_engine.add_app(TickRecorderApp) + # main_engine.add_app(RiskManagerApp) + # main_engine.add_app(ScriptTraderApp) + # main_engine.add_app(RpcServiceApp) + # main_engine.add_app(SpreadTradingApp) + #main_engine.add_app(PortfolioManagerApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +if __name__ == "__main__": + main() diff --git a/prod/stock_em02_rpc01/run_service.py b/prod/stock_em02_rpc01/run_service.py new file mode 100644 index 00000000..e2ef5d35 --- /dev/null +++ b/prod/stock_em02_rpc01/run_service.py @@ -0,0 +1,159 @@ +# flake8: noqa + +import os +import sys +import multiprocessing +from time import sleep +from datetime import datetime, time +from logging import DEBUG + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine, EVENT_TIMER +from vnpy.trader.setting import SETTINGS +from vnpy.trader.engine import MainEngine +from vnpy.trader.utility import load_json +from vnpy.gateway.rpc import RpcGateway +from vnpy.app.cta_stock import CtaStockApp +#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG +from vnpy.app.rpc_service import RpcServiceApp +#from vnpy.app.algo_broker import AlgoBrokerApp +from vnpy.app.account_recorder import AccountRecorderApp +from vnpy.trader.util_pid import update_pid +from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor + +SETTINGS["log.active"] = True +SETTINGS["log.level"] = DEBUG +SETTINGS["log.console"] = True +SETTINGS["log.file"] = True + +gateway_name = 'em02_gw' +gw_setting = load_json(f'connect_{gateway_name}.json') + +import types +import traceback + +def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None: + """ + Raise exception under debug mode + """ + sys.__excepthook__(exctype, value, tb) + + msg = "".join(traceback.format_exception(exctype, value, tb)) + + print(msg, file=sys.stderr) + +class DaemonService(object): + + def __init__(self): + self.event_engine = EventEngine() + self.g_count = 0 + self.last_dt = datetime.now() + # 创建账号/持仓/委托/交易/日志记录 + self.acc_monitor = AccountMonitor(self.event_engine) + self.pos_monitor = PositionMonitor(self.event_engine) + self.ord_monitor = OrderMonitor(self.event_engine) + self.trd_monitor = TradeMonitor(self.event_engine) + #self.log_monitor = LogMonitor(self.event_engine) + + # 创建主引擎 + self.main_engine = MainEngine(self.event_engine) + + self.save_data_time = None + self.save_snapshot_time = None + + # 注册定时器,用于判断重连 + self.event_engine.register(EVENT_TIMER, self.on_timer) + + def on_timer(self, event): + """定时器执行逻辑,每十秒执行一次""" + + # 60秒才执行一次检查 + self.g_count += 1 + if self.g_count <= 60: + return + + # 强制写入一次gpid + update_pid() + + self.g_count = 0 + dt = datetime.now() + + if dt.hour != self.last_dt.hour: + self.last_dt = dt + #print(u'run_server.py checkpoint:{0}'.format(dt)) + self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt)) + + # ctp 短线重连得处理 + + # 定时保存策略内数据 + if dt.strftime('%H:%M') in ['02:31', '15:16']: + if self.save_data_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内数据') + self.save_data_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_data('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']: + if self.save_snapshot_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内K线切片数据') + self.save_snapshot_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_snapshot('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + def start(self): + """ + Running in the child process. + """ + SETTINGS["log.file"] = True + + timer_count = 0 + + # 远程调用服务 + rpc_server = self.main_engine.add_app(RpcServiceApp) + ret, msg = rpc_server.start() + if not ret: + self.main_engine.write_log(f"RPC服务未能启动:{msg}") + return + else: + self.main_engine.write_log(f'RPC服务已启动') + + update_pid() + + # 添加账号同步app + self.main_engine.add_app(AccountRecorderApp) + + # 接入网关 + self.main_engine.add_gateway(RpcGateway, gateway_name) + self.main_engine.write_log(f"连接{gateway_name}接口") + self.main_engine.connect(gw_setting, gateway_name) + + sleep(5) + + # 添加cta引擎 + cta_engine = self.main_engine.add_app(CtaStockApp) + cta_engine.init_engine() + + # 添加算法引擎代理 + #self.main_engine.add_app(AlgoBrokerApp) + + self.main_engine.write_log("主引擎创建成功") + + while True: + sleep(1) + + +if __name__ == "__main__": + + sys.excepthook = excepthook + + s = DaemonService() + s.start() diff --git a/prod/stock_em02_rpc01/service.py b/prod/stock_em02_rpc01/service.py new file mode 100644 index 00000000..f34c4d78 --- /dev/null +++ b/prod/stock_em02_rpc01/service.py @@ -0,0 +1,394 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# python 3 环境 +# 激活 activate.sh (激活py35 env,启动运行程序 + +import sys +import time +from datetime import datetime +# import commands +import os +import subprocess +import psutil + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +from vnpy.trader.util_wechat import send_wx_msg + +# python容器文件 +python_path = '/home/trade/anaconda3/envs/py37/bin/python' + +# shell 文件,不使用sh +bash = "/bin/bash" + +# 配置内容 +# 运行时间段 +# 是否7X24小时运行(数字货币) +IS_7x24 = False +# 是否激活夜盘 +ACTIVE_NIGHT = False +# python 脚本,这里要和activate.sh里面得PROGRAM_NAME 一致 +PROGRAM_NAME = './run_service.py' + +# 日志目录 +log_path = os.path.abspath(os.path.join(os.getcwd(), 'log')) +if os.path.isdir(log_path): + # 如果工作目录下,存在logs子目录,就使用logs子目录 + base_path = os.getcwd() +else: + # 使用service.py所在得目录 + base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) + +# 进程组id保存文件 +gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt')) + +tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp')) + +program_file = os.path.join(base_path, 'activate.sh') +log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log')) +error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log')) +cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log')) +cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log')) +null_file = "/dev/null" + +cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file, + cron_error_file) +program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file) + +USE_GPID = False + + +def _check_gpid(gpid): + """ + 检查进程(组)ID + :param gpid: + :return: True, 正在运行/ False: 没有运行 + """ + if not USE_GPID: + int_gpid = int(gpid) + return psutil.pid_exists(int_gpid) + + try: + # 通过系统子进程,打开ps命令,找到gpid下得所有进程 + p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print('找不到shell运行命令ps', file=sys.stderr) + exit(1) + + # print('returncode1:{}'.format(returncode)) + try: + p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + returncode = p2.wait() + except OSError as e: + print(u'找不到shell运行命令uniq', file=sys.stderr) + exit(1) + + # print('returncode2:{}'.format(returncode)) + for i in p2.stdout.readlines(): + # print (u'p2.line:{}'.format(i)) + if i.decode().strip() == gpid: + print(u'找到gpid:{}'.format(gpid)) + return True + + print(u'找不到gpid:{}'.format(gpid)) + return False + + +def _status(): + """ + 查询当前状态 + :return: + """ + print(u'检查{}'.format(gpid_file)) + if os.path.exists(gpid_file): + with open(gpid_file, 'r') as f: + gpid = f.read().strip() + print(u'gpid={}'.format(gpid)) + if gpid != "" and _check_gpid(gpid): + return gpid + + return None + + +def trade_off(): + """检查现在是否为非交易时间""" + now = datetime.now() + + # 数字货币 + if IS_7x24: + if now.hour == 12 and now.minute == 0: + return True + else: + return False + + # 国内期货/股票 + a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0) + b = datetime.now().replace(hour=9, minute=10, second=0, microsecond=0) + c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0) + d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0) + + # 国内期货有夜盘 + if ACTIVE_NIGHT: + weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or ( + now.isoweekday() == 1 and now <= a) + off = (a <= now <= b) or (c <= now <= d) or weekend + return off + else: + weekend = now.isoweekday() in [6, 7] + off = now <= b or c <= now or weekend + return off + + +def _start(): + """ + 启动服务 + :return: + """ + # 获取进程组id + gpid = _status() + if trade_off(): + # 属于停止运行期间 + if gpid: + print(u'现在属于停止运行时间,进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid)) + import signal + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'杀死进程中,等待{}秒'.format(i)) + if i > 30: + print(u'杀死进程失败,退出') + exit(1) + + print('进程组已停止运行[gpid={}]'.format(gpid)) + send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path)) + else: + print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now())) + else: + # 属于运行时间 + if not gpid: + print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command)) + if os.path.isfile(gpid_file): + print(u'{0}文件存在,先执行删除'.format(gpid_file)) + try: + os.remove(gpid_file) + except: + pass + + os.popen(program_command) + i = 0 + while True: + gpid = _status() + if gpid: + print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid)) + send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid)) + break + + i += 1 + print(u'启动进程中,等待{}秒'.format(i)) + if i > 30: + print(u'启动进程失败,退出') + exit(1) + time.sleep(1) + + + else: + print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path)) + + +def schedule(): + """ + crontab 计划执行 + :return: + """ + print('======schedule========') + _start() + + +def status(): + """查看状态""" + print('======status========') + + gpid = _status() + if gpid: + print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid)) + else: + print('{}服务进程没有运行.'.format(base_path)) + + check_pids_in_cwd(gpid) + + +# operate的可选字符串为:add, del +def operate_crontab(operate): + """ + 操作crontab + :param operate: add , del + :return: + """ + + try: + # 从系统命令中,获取定时任务 + p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print(u"找不到shell运行命令crontab", file=sys.stderr) + exit(1) + + remain_cron_list = [] + exist_flag = False + old_cron_content = '' + for i in p.stdout.readlines(): + if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0: + old_cron_content = i.decode("utf-8") + exist_flag = True + else: + remain_cron_list.append(i.decode("utf-8")) + + if operate == "add" and not exist_flag: + remain_cron_list.append(cron_content) + remain_cron_list.append("\n") + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr) + + if operate == "del" and exist_flag: + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr) + + # os.remove(tmp_cron_file) + + +def check_pids_in_cwd(gpid=None): + print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME)) + + runing_pids = [] + for pid in psutil.pids(): + try: + p = psutil.Process(pid) + p_name = p.name() + if not p_name.endswith('python'): + continue + p_cwd = p.cwd() + if p_cwd != base_path: + continue + p_cmdline = p.cmdline() + if PROGRAM_NAME not in p_cmdline: + continue + + runing_pids.append(pid) + + except: + pass + + if len(runing_pids) > 1: + if gpid is not None: + if gpid in runing_pids: + print(u'排除其他pid') + runing_pids.remove(gpid) + else: + print(u'gpid,不在运行清单中,排除首个pid') + runing_pids.pop(0) + else: + print(u'gpid为空,排除首个pid') + runing_pids.pop(0) + + for pid in runing_pids: + try: + p = psutil.Process(pid) + print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行' + .format(pid, p.name, p.exe(), p.cwd(), p.cmdline())) + import signal + os.kill(int(pid), signal.SIGKILL) + except: + pass + + +def start(): + print(u'======start========') + # 往任务表增加定时计划 + operate_crontab("add") + print(u'任务表增加定时计划完毕') + # 执行启动 + # _start() + print(u'启动{}服务执行完毕'.format(base_path)) + + +def _stop(): + print(u'======stop========') + # 在任务表删除定时计划 + operate_crontab("del") + + # 查询进程组id + gpid = _status() + if gpid: + # 进程组存在,杀死进程 + import signal + # 杀死进程组 + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'等待{}秒'.format(i)) + + print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + else: + print(u'{}服务进程没有运行'.format(base_path)) + + +def stop(): + """ + 停止服务 + :return: + """ + _stop() + print(u'执行停止{}服务完成'.format(base_path)) + + +def restart(): + print(u'======restart========') + _stop() + _start() + print('执行重启{}服务完成'.format(base_path)) + + +if __name__ == '__main__': + if len(sys.argv) >= 2: + fun = sys.argv[1] + else: + fun = '' + if fun == 'status': + status() + elif fun == 'start': + start() + elif fun == 'stop': + stop() + elif fun == 'restart': + restart() + elif fun == 'schedule': + schedule() + else: + print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__))) + status() diff --git a/prod/stock_em02_rpc01/vn_contract.pkb2 b/prod/stock_em02_rpc01/vn_contract.pkb2 new file mode 100644 index 00000000..05e22953 Binary files /dev/null and b/prod/stock_em02_rpc01/vn_contract.pkb2 differ diff --git a/prod/stock_em02_rpc01/vt_setting.json b/prod/stock_em02_rpc01/vt_setting.json new file mode 100644 index 00000000..75bda20a --- /dev/null +++ b/prod/stock_em02_rpc01/vt_setting.json @@ -0,0 +1,29 @@ +{ + "font.family": "Arial", + "font.size": 12, + "log.active": true, + "log.level": 10, + "log.console": true, + "log.file": true, + "email.server": "smtp.qq.com", + "email.port": 465, + "email.username": "", + "email.password": "", + "email.sender": "", + "email.receiver": "", + "rqdata.username": "", + "rqdata.password": "", + "database.driver": "sqlite", + "database.database": "database.db", + "database.host": "localhost", + "database.port": 3306, + "database.user": "root", + "database.password": "", + "database.authentication_source": "admin", + "huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price", + "huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price", + "renko.host": "192.168.1.214", + "hams.host": "192.168.1.214", + "rabbitmq.host": "192.168.1.211", + "gateway_name": "em02" +} \ No newline at end of file diff --git a/prod/stock_em02_rpc02/activate.sh b/prod/stock_em02_rpc02/activate.sh new file mode 100644 index 00000000..f73bc3a0 --- /dev/null +++ b/prod/stock_em02_rpc02/activate.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 +#$CONDA_HOME/bin/conda deactivate +#$CONDA_HOME/bin/conda activate py37 + +############ Added by Huang Jianwei at 2018-04-03 +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./run_service.py + +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log & diff --git a/prod/stock_em02_rpc02/ar_setting.json b/prod/stock_em02_rpc02/ar_setting.json new file mode 100644 index 00000000..3d7fc16c --- /dev/null +++ b/prod/stock_em02_rpc02/ar_setting.json @@ -0,0 +1,16 @@ +{ + "mongo_db": + { + "host": "192.168.1.214", + "port": 27017 + }, + "accounts": + { + "em02": + { + "copy_history_trades": true, + "copy_history_orders": true + } + }, + "event_list": ["eStrategyPos.","eStrategySnapshot."] +} diff --git a/prod/stock_em02_rpc02/connect_em02_gw.json b/prod/stock_em02_rpc02/connect_em02_gw.json new file mode 100644 index 00000000..5661735c --- /dev/null +++ b/prod/stock_em02_rpc02/connect_em02_gw.json @@ -0,0 +1,4 @@ +{ + "主动请求地址": "tcp://127.0.0.1:102201", + "推送订阅地址": "tcp://127.0.0.1:102202" +} \ No newline at end of file diff --git a/prod/stock_em02_rpc02/cta_stock_config.json b/prod/stock_em02_rpc02/cta_stock_config.json new file mode 100644 index 00000000..569562d4 --- /dev/null +++ b/prod/stock_em02_rpc02/cta_stock_config.json @@ -0,0 +1,6 @@ +{ + "accountid" : "540330215539[普通]", + "strategy_group": "cta_stock_02", + "get_pos_from_db": true, + "compare_pos": true +} diff --git a/prod/stock_em02_rpc02/cta_stock_setting.json b/prod/stock_em02_rpc02/cta_stock_setting.json new file mode 100644 index 00000000..1c4f9a17 --- /dev/null +++ b/prod/stock_em02_rpc02/cta_stock_setting.json @@ -0,0 +1,23 @@ +{ + "stock_three_buy_d1": { + "class_name": "StrategyStock3rdBuyGroupV1", + "vt_symbols": [], + "auto_init": true, + "auto_start": true, + "setting": { + "comment": "日线三买", + "backtesting": false, + "max_invest_rate": 0.5, + "max_single_margin": 20000, + "stock_name_filters": [ + "st", + "退", + "地产" + ], + "screener_strategies": [ + "stock_screener_ThreeBuy_D1_SZSE", + "stock_screener_ThreeBuy_D1_SSE" + ] + } + } +} \ No newline at end of file diff --git a/prod/stock_em02_rpc02/custom_contracts.json b/prod/stock_em02_rpc02/custom_contracts.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/prod/stock_em02_rpc02/custom_contracts.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/prod/stock_em02_rpc02/rpc_service_setting.json b/prod/stock_em02_rpc02/rpc_service_setting.json new file mode 100644 index 00000000..fd97c00c --- /dev/null +++ b/prod/stock_em02_rpc02/rpc_service_setting.json @@ -0,0 +1,4 @@ +{ + "rep_address": "tcp://127.0.0.1:102205", + "pub_address": "tcp://127.0.0.1:102206" +} diff --git a/prod/stock_em02_rpc02/run_main_em02_rpc02.py b/prod/stock_em02_rpc02/run_main_em02_rpc02.py new file mode 100644 index 00000000..1733dfde --- /dev/null +++ b/prod/stock_em02_rpc02/run_main_em02_rpc02.py @@ -0,0 +1,117 @@ +# flake8: noqa + +import os +import sys +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) +print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine + +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp + +#from vnpy.gateway.binancef import BinancefGateway +# from vnpy.gateway.bitmex import BitmexGateway +# from vnpy.gateway.futu import FutuGateway +# from vnpy.gateway.ib import IbGateway +# from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.ctptest import CtptestGateway +# from vnpy.gateway.mini import MiniGateway +# from vnpy.gateway.sopt import SoptGateway +# from vnpy.gateway.minitest import MinitestGateway +# from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.tiger import TigerGateway +# from vnpy.gateway.oes import OesGateway +# from vnpy.gateway.okex import OkexGateway +# from vnpy.gateway.huobi import HuobiGateway +# from vnpy.gateway.bitfinex import BitfinexGateway +# from vnpy.gateway.onetoken import OnetokenGateway +# from vnpy.gateway.okexf import OkexfGateway +# from vnpy.gateway.okexs import OkexsGateway +# from vnpy.gateway.xtp import XtpGateway +# from vnpy.gateway.hbdm import HbdmGateway +# from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.tora import ToraGateway +# from vnpy.gateway.alpaca import AlpacaGateway +# from vnpy.gateway.da import DaGateway +# from vnpy.gateway.coinbase import CoinbaseGateway +# from vnpy.gateway.bitstamp import BitstampGateway +# from vnpy.gateway.gateios import GateiosGateway +# from vnpy.gateway.bybit import BybitGateway +# from vnpy.gateway.eastmoney import EastmoneyGateway +from vnpy.gateway.rpc import RpcGateway + +# from vnpy.app.cta_crypto import CtaCryptoApp +from vnpy.app.cta_stock import CtaStockApp +# from vnpy.app.csv_loader import CsvLoaderApp +# from vnpy.app.algo_trading import AlgoTradingApp +# from vnpy.app.cta_backtester import CtaBacktesterApp +# from vnpy.app.data_recorder import DataRecorderApp +# from vnpy.app.tick_recorder import TickRecorderApp +# from vnpy.app.risk_manager import RiskManagerApp +# from vnpy.app.script_trader import ScriptTraderApp +# from vnpy.app.rpc_service import RpcServiceApp +# from vnpy.app.spread_trading import SpreadTradingApp +# from vnpy.app.portfolio_manager import PortfolioManagerApp + + +def main(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(RpcGateway, 'em02_gw') + #main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(CtptestGateway) + # main_engine.add_gateway(MiniGateway) + # main_engine.add_gateway(SoptGateway) + # main_engine.add_gateway(MinitestGateway) + # main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(IbGateway) + # main_engine.add_gateway(FutuGateway) + #main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(TigerGateway) + # main_engine.add_gateway(OesGateway) + # main_engine.add_gateway(OkexGateway) + # main_engine.add_gateway(HuobiGateway) + # main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(OnetokenGateway) + # main_engine.add_gateway(OkexfGateway) + # main_engine.add_gateway(HbdmGateway) + # main_engine.add_gateway(XtpGateway) + # main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(ToraGateway) + # main_engine.add_gateway(AlpacaGateway) + # main_engine.add_gateway(OkexsGateway) + # main_engine.add_gateway(DaGateway) + # main_engine.add_gateway(CoinbaseGateway) + # main_engine.add_gateway(BitstampGateway) + # main_engine.add_gateway(GateiosGateway) + #main_engine.add_gateway(BybitGateway) + + #main_engine.add_app(CtaStrategyApp) + main_engine.add_app(CtaStockApp) + #main_engine.add_app(CtaBacktesterApp) + # main_engine.add_app(CsvLoaderApp) + # main_engine.add_app(AlgoTradingApp) + #main_engine.add_app(DataRecorderApp) + #main_engine.add_app(TickRecorderApp) + # main_engine.add_app(RiskManagerApp) + # main_engine.add_app(ScriptTraderApp) + # main_engine.add_app(RpcServiceApp) + # main_engine.add_app(SpreadTradingApp) + #main_engine.add_app(PortfolioManagerApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +if __name__ == "__main__": + main() diff --git a/prod/stock_em02_rpc02/run_screener.py b/prod/stock_em02_rpc02/run_screener.py new file mode 100644 index 00000000..cd1d2f02 --- /dev/null +++ b/prod/stock_em02_rpc02/run_screener.py @@ -0,0 +1,112 @@ +# flake8: noqa + +import os +import sys +import multiprocessing +from time import sleep +from datetime import datetime, time +from logging import DEBUG + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine, EVENT_TIMER +from vnpy.trader.setting import SETTINGS +from vnpy.trader.engine import MainEngine +from vnpy.trader.utility import load_json +# from vnpy.gateway.gj import GjGateway +from vnpy.app.stock_screener import ScreenerApp +# from vnpy.app.cta_stock import CtaStockApp +# from vnpy.app.cta_crypto.base import EVENT_CTA_LOG +from vnpy.app.rpc_service import RpcServiceApp +# from vnpy.app.algo_broker import AlgoBrokerApp +from vnpy.app.account_recorder import AccountRecorderApp +from vnpy.trader.util_pid import update_pid + +# from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor + +SETTINGS["log.active"] = True +SETTINGS["log.level"] = DEBUG +SETTINGS["log.console"] = True +SETTINGS["log.file"] = True + +gateway_name = 'em02' + +import types +import traceback + + +def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None: + """ + Raise exception under debug mode + """ + sys.__excepthook__(exctype, value, tb) + + msg = "".join(traceback.format_exception(exctype, value, tb)) + + print(msg, file=sys.stderr) + + +class DaemonService(object): + + def __init__(self): + self.event_engine = EventEngine() + self.g_count = 0 + self.last_dt = datetime.now() + + # 创建主引擎 + self.main_engine = MainEngine(self.event_engine) + + self.save_data_time = None + self.save_snapshot_time = None + + # 注册定时器,用于判断重连 + self.event_engine.register(EVENT_TIMER, self.on_timer) + + def on_timer(self, event): + """定时器执行逻辑,每十秒执行一次""" + + # 60秒才执行一次检查 + self.g_count += 1 + if self.g_count <= 60: + return + + self.g_count = 0 + dt = datetime.now() + + if dt.hour != self.last_dt.hour: + self.last_dt = dt + print(u'run_screener.py checkpoint:{0}'.format(dt)) + self.main_engine.write_log(u'run_screener.py checkpoint:{0}'.format(dt)) + + def start(self): + """ + Running in the child process. + """ + SETTINGS["log.file"] = True + + # 添加选股引擎 + screen_engine = self.main_engine.add_app(ScreenerApp) + screen_engine.init_engine() + + self.main_engine.write_log("主引擎创建成功") + + while True: + sleep(1) + if screen_engine.get_all_completed_status(): + from vnpy.trader.util_wechat import send_wx_msg + msg = f'{gateway_name}完成所有选股任务' + send_wx_msg(content=msg) + self.main_engine.write_log(msg) + sleep(10) + os._exit(0) + + +if __name__ == "__main__": + sys.excepthook = excepthook + + s = DaemonService() + s.start() diff --git a/prod/stock_em02_rpc02/run_service.py b/prod/stock_em02_rpc02/run_service.py new file mode 100644 index 00000000..e2ef5d35 --- /dev/null +++ b/prod/stock_em02_rpc02/run_service.py @@ -0,0 +1,159 @@ +# flake8: noqa + +import os +import sys +import multiprocessing +from time import sleep +from datetime import datetime, time +from logging import DEBUG + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine, EVENT_TIMER +from vnpy.trader.setting import SETTINGS +from vnpy.trader.engine import MainEngine +from vnpy.trader.utility import load_json +from vnpy.gateway.rpc import RpcGateway +from vnpy.app.cta_stock import CtaStockApp +#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG +from vnpy.app.rpc_service import RpcServiceApp +#from vnpy.app.algo_broker import AlgoBrokerApp +from vnpy.app.account_recorder import AccountRecorderApp +from vnpy.trader.util_pid import update_pid +from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor + +SETTINGS["log.active"] = True +SETTINGS["log.level"] = DEBUG +SETTINGS["log.console"] = True +SETTINGS["log.file"] = True + +gateway_name = 'em02_gw' +gw_setting = load_json(f'connect_{gateway_name}.json') + +import types +import traceback + +def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None: + """ + Raise exception under debug mode + """ + sys.__excepthook__(exctype, value, tb) + + msg = "".join(traceback.format_exception(exctype, value, tb)) + + print(msg, file=sys.stderr) + +class DaemonService(object): + + def __init__(self): + self.event_engine = EventEngine() + self.g_count = 0 + self.last_dt = datetime.now() + # 创建账号/持仓/委托/交易/日志记录 + self.acc_monitor = AccountMonitor(self.event_engine) + self.pos_monitor = PositionMonitor(self.event_engine) + self.ord_monitor = OrderMonitor(self.event_engine) + self.trd_monitor = TradeMonitor(self.event_engine) + #self.log_monitor = LogMonitor(self.event_engine) + + # 创建主引擎 + self.main_engine = MainEngine(self.event_engine) + + self.save_data_time = None + self.save_snapshot_time = None + + # 注册定时器,用于判断重连 + self.event_engine.register(EVENT_TIMER, self.on_timer) + + def on_timer(self, event): + """定时器执行逻辑,每十秒执行一次""" + + # 60秒才执行一次检查 + self.g_count += 1 + if self.g_count <= 60: + return + + # 强制写入一次gpid + update_pid() + + self.g_count = 0 + dt = datetime.now() + + if dt.hour != self.last_dt.hour: + self.last_dt = dt + #print(u'run_server.py checkpoint:{0}'.format(dt)) + self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt)) + + # ctp 短线重连得处理 + + # 定时保存策略内数据 + if dt.strftime('%H:%M') in ['02:31', '15:16']: + if self.save_data_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内数据') + self.save_data_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_data('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']: + if self.save_snapshot_time != dt.strftime('%H:%M'): + self.main_engine.write_log(u'保存策略内K线切片数据') + self.save_snapshot_time = dt.strftime('%H:%M') + try: + self.main_engine.save_strategy_snapshot('ALL') + except Exception as ex: + self.main_engine.write_error('保存策略内数据异常') + + def start(self): + """ + Running in the child process. + """ + SETTINGS["log.file"] = True + + timer_count = 0 + + # 远程调用服务 + rpc_server = self.main_engine.add_app(RpcServiceApp) + ret, msg = rpc_server.start() + if not ret: + self.main_engine.write_log(f"RPC服务未能启动:{msg}") + return + else: + self.main_engine.write_log(f'RPC服务已启动') + + update_pid() + + # 添加账号同步app + self.main_engine.add_app(AccountRecorderApp) + + # 接入网关 + self.main_engine.add_gateway(RpcGateway, gateway_name) + self.main_engine.write_log(f"连接{gateway_name}接口") + self.main_engine.connect(gw_setting, gateway_name) + + sleep(5) + + # 添加cta引擎 + cta_engine = self.main_engine.add_app(CtaStockApp) + cta_engine.init_engine() + + # 添加算法引擎代理 + #self.main_engine.add_app(AlgoBrokerApp) + + self.main_engine.write_log("主引擎创建成功") + + while True: + sleep(1) + + +if __name__ == "__main__": + + sys.excepthook = excepthook + + s = DaemonService() + s.start() diff --git a/prod/stock_em02_rpc02/screener_setting.json b/prod/stock_em02_rpc02/screener_setting.json new file mode 100644 index 00000000..155640b9 --- /dev/null +++ b/prod/stock_em02_rpc02/screener_setting.json @@ -0,0 +1,24 @@ +{ + "stock_screener_ThreeBuy_D1_SSE": { + "class_name": "StrategyChanlunThreeBuy", + "auto_init": true, + "auto_start": true, + "setting": { + "vt_symbols": [], + "all_stocks": true, + "exchange": "SSE", + "bar_name": "D1" + } + }, + "stock_screener_ThreeBuy_D1_SZSE": { + "class_name": "StrategyChanlunThreeBuy", + "auto_init": true, + "auto_start": true, + "setting": { + "vt_symbols": [], + "all_stocks": true, + "exchange": "SZSE", + "bar_name": "D1" + } + } +} diff --git a/prod/stock_em02_rpc02/service.py b/prod/stock_em02_rpc02/service.py new file mode 100644 index 00000000..a30e2b59 --- /dev/null +++ b/prod/stock_em02_rpc02/service.py @@ -0,0 +1,394 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# python 3 环境 +# 激活 activate.sh (激活py35 env,启动运行程序 + +import sys +import time +from datetime import datetime +# import commands +import os +import subprocess +import psutil + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +from vnpy.trader.util_wechat import send_wx_msg + +# python容器文件 +python_path = '/home/trade/anaconda3/envs/py37/bin/python' + +# shell 文件,不使用sh +bash = "/bin/bash" + +# 配置内容 +# 运行时间段 +# 是否7X24小时运行(数字货币) +IS_7x24 = False +# 是否激活夜盘 +ACTIVE_NIGHT = False +# python 脚本,这里要和activate.sh里面得PROGRAM_NAME 一致 +PROGRAM_NAME = './run_service.py' + +# 日志目录 +log_path = os.path.abspath(os.path.join(os.getcwd(), 'log')) +if os.path.isdir(log_path): + # 如果工作目录下,存在logs子目录,就使用logs子目录 + base_path = os.getcwd() +else: + # 使用service.py所在得目录 + base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) + +# 进程组id保存文件 +gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt')) + +tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp')) + +program_file = os.path.join(base_path, 'activate.sh') +log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log')) +error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log')) +cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log')) +cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log')) +null_file = "/dev/null" + +cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file, + cron_error_file) +program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file) + +USE_GPID = False + + +def _check_gpid(gpid): + """ + 检查进程(组)ID + :param gpid: + :return: True, 正在运行/ False: 没有运行 + """ + if not USE_GPID: + int_gpid = int(gpid) + return psutil.pid_exists(int_gpid) + + try: + # 通过系统子进程,打开ps命令,找到gpid下得所有进程 + p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print('找不到shell运行命令ps', file=sys.stderr) + exit(1) + + # print('returncode1:{}'.format(returncode)) + try: + p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + returncode = p2.wait() + except OSError as e: + print(u'找不到shell运行命令uniq', file=sys.stderr) + exit(1) + + # print('returncode2:{}'.format(returncode)) + for i in p2.stdout.readlines(): + # print (u'p2.line:{}'.format(i)) + if i.decode().strip() == gpid: + print(u'找到gpid:{}'.format(gpid)) + return True + + print(u'找不到gpid:{}'.format(gpid)) + return False + + +def _status(): + """ + 查询当前状态 + :return: + """ + print(u'检查{}'.format(gpid_file)) + if os.path.exists(gpid_file): + with open(gpid_file, 'r') as f: + gpid = f.read().strip() + print(u'gpid={}'.format(gpid)) + if gpid != "" and _check_gpid(gpid): + return gpid + + return None + + +def trade_off(): + """检查现在是否为非交易时间""" + now = datetime.now() + + # 数字货币 + if IS_7x24: + if now.hour == 12 and now.minute == 0: + return True + else: + return False + + # 国内期货/股票 + a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0) + b = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0) + c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0) + d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0) + + # 国内期货有夜盘 + if ACTIVE_NIGHT: + weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or ( + now.isoweekday() == 1 and now <= a) + off = (a <= now <= b) or (c <= now <= d) or weekend + return off + else: + weekend = now.isoweekday() in [6, 7] + off = now <= b or c <= now or weekend + return off + + +def _start(): + """ + 启动服务 + :return: + """ + # 获取进程组id + gpid = _status() + if trade_off(): + # 属于停止运行期间 + if gpid: + print(u'现在属于停止运行时间,进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid)) + import signal + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'杀死进程中,等待{}秒'.format(i)) + if i > 30: + print(u'杀死进程失败,退出') + exit(1) + + print('进程组已停止运行[gpid={}]'.format(gpid)) + send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path)) + else: + print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now())) + else: + # 属于运行时间 + if not gpid: + print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command)) + if os.path.isfile(gpid_file): + print(u'{0}文件存在,先执行删除'.format(gpid_file)) + try: + os.remove(gpid_file) + except: + pass + + os.popen(program_command) + i = 0 + while True: + gpid = _status() + if gpid: + print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid)) + send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid)) + break + + i += 1 + print(u'启动进程中,等待{}秒'.format(i)) + if i > 30: + print(u'启动进程失败,退出') + exit(1) + time.sleep(1) + + + else: + print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path)) + + +def schedule(): + """ + crontab 计划执行 + :return: + """ + print('======schedule========') + _start() + + +def status(): + """查看状态""" + print('======status========') + + gpid = _status() + if gpid: + print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid)) + else: + print('{}服务进程没有运行.'.format(base_path)) + + check_pids_in_cwd(gpid) + + +# operate的可选字符串为:add, del +def operate_crontab(operate): + """ + 操作crontab + :param operate: add , del + :return: + """ + + try: + # 从系统命令中,获取定时任务 + p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + returncode = p.wait() + except OSError as e: + print(u"找不到shell运行命令crontab", file=sys.stderr) + exit(1) + + remain_cron_list = [] + exist_flag = False + old_cron_content = '' + for i in p.stdout.readlines(): + if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0: + old_cron_content = i.decode("utf-8") + exist_flag = True + else: + remain_cron_list.append(i.decode("utf-8")) + + if operate == "add" and not exist_flag: + remain_cron_list.append(cron_content) + remain_cron_list.append("\n") + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr) + + if operate == "del" and exist_flag: + with open(tmp_cron_file, 'wb') as f: + for i in remain_cron_list: + f.write(i.encode("utf-8")) + os.popen("crontab {}".format(tmp_cron_file)) + print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr) + + # os.remove(tmp_cron_file) + + +def check_pids_in_cwd(gpid=None): + print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME)) + + runing_pids = [] + for pid in psutil.pids(): + try: + p = psutil.Process(pid) + p_name = p.name() + if not p_name.endswith('python'): + continue + p_cwd = p.cwd() + if p_cwd != base_path: + continue + p_cmdline = p.cmdline() + if PROGRAM_NAME not in p_cmdline: + continue + + runing_pids.append(pid) + + except: + pass + + if len(runing_pids) > 1: + if gpid is not None: + if gpid in runing_pids: + print(u'排除其他pid') + runing_pids.remove(gpid) + else: + print(u'gpid,不在运行清单中,排除首个pid') + runing_pids.pop(0) + else: + print(u'gpid为空,排除首个pid') + runing_pids.pop(0) + + for pid in runing_pids: + try: + p = psutil.Process(pid) + print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行' + .format(pid, p.name, p.exe(), p.cwd(), p.cmdline())) + import signal + os.kill(int(pid), signal.SIGKILL) + except: + pass + + +def start(): + print(u'======start========') + # 往任务表增加定时计划 + operate_crontab("add") + print(u'任务表增加定时计划完毕') + # 执行启动 + # _start() + print(u'启动{}服务执行完毕'.format(base_path)) + + +def _stop(): + print(u'======stop========') + # 在任务表删除定时计划 + operate_crontab("del") + + # 查询进程组id + gpid = _status() + if gpid: + # 进程组存在,杀死进程 + import signal + # 杀死进程组 + if USE_GPID: + # 杀死进程组 + os.killpg(int(gpid), signal.SIGKILL) + else: + os.kill(int(gpid), signal.SIGKILL) + i = 0 + while _status(): + time.sleep(1) + i += 1 + print(u'等待{}秒'.format(i)) + + print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid)) + else: + print(u'{}服务进程没有运行'.format(base_path)) + + +def stop(): + """ + 停止服务 + :return: + """ + _stop() + print(u'执行停止{}服务完成'.format(base_path)) + + +def restart(): + print(u'======restart========') + _stop() + _start() + print('执行重启{}服务完成'.format(base_path)) + + +if __name__ == '__main__': + if len(sys.argv) >= 2: + fun = sys.argv[1] + else: + fun = '' + if fun == 'status': + status() + elif fun == 'start': + start() + elif fun == 'stop': + stop() + elif fun == 'restart': + restart() + elif fun == 'schedule': + schedule() + else: + print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__))) + status() diff --git a/prod/stock_em02_rpc02/vn_contract.pkb2 b/prod/stock_em02_rpc02/vn_contract.pkb2 new file mode 100644 index 00000000..05e22953 Binary files /dev/null and b/prod/stock_em02_rpc02/vn_contract.pkb2 differ diff --git a/prod/stock_em02_rpc02/vt_setting.json b/prod/stock_em02_rpc02/vt_setting.json new file mode 100644 index 00000000..3ebc1e1b --- /dev/null +++ b/prod/stock_em02_rpc02/vt_setting.json @@ -0,0 +1,29 @@ +{ + "font.family": "Arial", + "font.size": 12, + "log.active": true, + "log.level": 10, + "log.console": true, + "log.file": true, + "email.server": "smtp.qq.com", + "email.port": 465, + "email.username": "", + "email.password": "", + "email.sender": "", + "email.receiver": "", + "rqdata.username": "", + "rqdata.password": "", + "database.driver": "sqlite", + "database.database": "database.db", + "database.host": "localhost", + "database.port": 3306, + "database.user": "root", + "database.password": "", + "database.authentication_source": "admin", + "huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price", + "huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price", + "renko.host": "192.168.1.214", + "hams.host": "192.168.1.214", + "rabbitmq.host": "192.168.1.211", + "gateway_name": "em02" +}