[update] 股票+RPC分布式引擎例子

This commit is contained in:
msincenselee 2021-11-03 16:40:10 +08:00
parent ebcb1676ea
commit cedf5ffbe6
36 changed files with 2303 additions and 0 deletions

View File

@ -0,0 +1,17 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
#$CONDA_HOME/bin/conda deactivate
#$CONDA_HOME/bin/conda activate py37
############ Added by Huang Jianwei at 2018-04-03
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./run_service.py
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log &

View File

@ -0,0 +1,15 @@
{
"mongo_db":
{
"host": "192.168.1.214",
"port": 27017
},
"accounts":
{
"em02_gw":
{
"copy_history_trades": true,
"copy_history_orders": true
}
}
}

View File

@ -0,0 +1,7 @@
{
"资金账号": "zzzz",
"资金密码": "zzzz",
"并发连接数": 3,
"session缓存文件": "session.json",
"账号类型": "普通"
}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,4 @@
{
"rep_address": "tcp://127.0.0.1:102201",
"pub_address": "tcp://127.0.0.1:102202"
}

View File

@ -0,0 +1,159 @@
# flake8: noqa
import os
import sys
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import DEBUG
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine, EVENT_TIMER
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json
from vnpy.gateway.eastmoney import EastmoneyGateway
#from vnpy.app.cta_stock import CtaStockApp
#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG
from vnpy.app.rpc_service import RpcServiceApp
#from vnpy.app.algo_broker import AlgoBrokerApp
from vnpy.app.account_recorder import AccountRecorderApp
from vnpy.trader.util_pid import update_pid
from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor
SETTINGS["log.active"] = True
SETTINGS["log.level"] = DEBUG
SETTINGS["log.console"] = True
SETTINGS["log.file"] = True
gateway_name = 'em02_gw'
gw_setting = load_json(f'connect_{gateway_name}.json')
import types
import traceback
def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None:
"""
Raise exception under debug mode
"""
sys.__excepthook__(exctype, value, tb)
msg = "".join(traceback.format_exception(exctype, value, tb))
print(msg, file=sys.stderr)
class DaemonService(object):
def __init__(self):
self.event_engine = EventEngine()
self.g_count = 0
self.last_dt = datetime.now()
# 创建账号/持仓/委托/交易/日志记录
self.acc_monitor = AccountMonitor(self.event_engine)
self.pos_monitor = PositionMonitor(self.event_engine)
self.ord_monitor = OrderMonitor(self.event_engine)
self.trd_monitor = TradeMonitor(self.event_engine)
#self.log_monitor = LogMonitor(self.event_engine)
# 创建主引擎
self.main_engine = MainEngine(self.event_engine)
self.save_data_time = None
self.save_snapshot_time = None
# 注册定时器,用于判断重连
self.event_engine.register(EVENT_TIMER, self.on_timer)
def on_timer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 60秒才执行一次检查
self.g_count += 1
if self.g_count <= 60:
return
# 强制写入一次gpid
update_pid()
self.g_count = 0
dt = datetime.now()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
#print(u'run_server.py checkpoint:{0}'.format(dt))
self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt))
# ctp 短线重连得处理
# 定时保存策略内数据
if dt.strftime('%H:%M') in ['02:31', '15:16']:
if self.save_data_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内数据')
self.save_data_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_data('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']:
if self.save_snapshot_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内K线切片数据')
self.save_snapshot_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_snapshot('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
def start(self):
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
timer_count = 0
# 远程调用服务
rpc_server = self.main_engine.add_app(RpcServiceApp)
ret, msg = rpc_server.start()
if not ret:
self.main_engine.write_log(f"RPC服务未能启动:{msg}")
return
else:
self.main_engine.write_log(f'RPC服务已启动')
update_pid()
# 添加账号同步app
self.main_engine.add_app(AccountRecorderApp)
# 接入网关
self.main_engine.add_gateway(EastmoneyGateway, gateway_name)
self.main_engine.write_log(f"连接{gateway_name}接口")
self.main_engine.connect(gw_setting, gateway_name)
sleep(5)
# # 添加cta引擎
# cta_engine = self.main_engine.add_app(CtaStockApp)
# cta_engine.init_engine()
# 添加算法引擎代理
#self.main_engine.add_app(AlgoBrokerApp)
self.main_engine.write_log("主引擎创建成功")
while True:
sleep(1)
if __name__ == "__main__":
sys.excepthook = excepthook
s = DaemonService()
s.start()

View File

@ -0,0 +1,394 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# python 3 环境
# 激活 activate.sh (激活py35 env启动运行程序
import sys
import time
from datetime import datetime
# import commands
import os
import subprocess
import psutil
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.trader.util_wechat import send_wx_msg
# python容器文件
python_path = '/home/trade/anaconda3/envs/py37/bin/python'
# shell 文件不使用sh
bash = "/bin/bash"
# 配置内容
# 运行时间段
# 是否7X24小时运行数字货币
IS_7x24 = False
# 是否激活夜盘
ACTIVE_NIGHT = False
# python 脚本这里要和activate.sh里面得PROGRAM_NAME 一致
PROGRAM_NAME = './run_service.py'
# 日志目录
log_path = os.path.abspath(os.path.join(os.getcwd(), 'log'))
if os.path.isdir(log_path):
# 如果工作目录下存在logs子目录就使用logs子目录
base_path = os.getcwd()
else:
# 使用service.py所在得目录
base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
# 进程组id保存文件
gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt'))
tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp'))
program_file = os.path.join(base_path, 'activate.sh')
log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log'))
error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log'))
cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log'))
cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log'))
null_file = "/dev/null"
cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file,
cron_error_file)
program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file)
USE_GPID = False
def _check_gpid(gpid):
"""
检查进程()ID
:param gpid:
:return: True, 正在运行/ False: 没有运行
"""
if not USE_GPID:
int_gpid = int(gpid)
return psutil.pid_exists(int_gpid)
try:
# 通过系统子进程打开ps命令找到gpid下得所有进程
p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print('找不到shell运行命令ps', file=sys.stderr)
exit(1)
# print('returncode1:{}'.format(returncode))
try:
p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
returncode = p2.wait()
except OSError as e:
print(u'找不到shell运行命令uniq', file=sys.stderr)
exit(1)
# print('returncode2:{}'.format(returncode))
for i in p2.stdout.readlines():
# print (u'p2.line:{}'.format(i))
if i.decode().strip() == gpid:
print(u'找到gpid:{}'.format(gpid))
return True
print(u'找不到gpid:{}'.format(gpid))
return False
def _status():
"""
查询当前状态
:return:
"""
print(u'检查{}'.format(gpid_file))
if os.path.exists(gpid_file):
with open(gpid_file, 'r') as f:
gpid = f.read().strip()
print(u'gpid={}'.format(gpid))
if gpid != "" and _check_gpid(gpid):
return gpid
return None
def trade_off():
"""检查现在是否为非交易时间"""
now = datetime.now()
# 数字货币
if IS_7x24:
if now.hour == 12 and now.minute == 0:
return True
else:
return False
# 国内期货/股票
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0)
# 国内期货有夜盘
if ACTIVE_NIGHT:
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (
now.isoweekday() == 1 and now <= a)
off = (a <= now <= b) or (c <= now <= d) or weekend
return off
else:
weekend = now.isoweekday() in [6, 7]
off = now <= b or c <= now or weekend
return off
def _start():
"""
启动服务
:return:
"""
# 获取进程组id
gpid = _status()
if trade_off():
# 属于停止运行期间
if gpid:
print(u'现在属于停止运行时间进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid))
import signal
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'杀死进程中,等待{}'.format(i))
if i > 30:
print(u'杀死进程失败,退出')
exit(1)
print('进程组已停止运行[gpid={}]'.format(gpid))
send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path))
else:
print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now()))
else:
# 属于运行时间
if not gpid:
print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command))
if os.path.isfile(gpid_file):
print(u'{0}文件存在,先执行删除'.format(gpid_file))
try:
os.remove(gpid_file)
except:
pass
os.popen(program_command)
i = 0
while True:
gpid = _status()
if gpid:
print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid))
send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid))
break
i += 1
print(u'启动进程中,等待{}'.format(i))
if i > 30:
print(u'启动进程失败,退出')
exit(1)
time.sleep(1)
else:
print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path))
def schedule():
"""
crontab 计划执行
:return:
"""
print('======schedule========')
_start()
def status():
"""查看状态"""
print('======status========')
gpid = _status()
if gpid:
print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid))
else:
print('{}服务进程没有运行.'.format(base_path))
check_pids_in_cwd(gpid)
# operate的可选字符串为add, del
def operate_crontab(operate):
"""
操作crontab
:param operate: add , del
:return:
"""
try:
# 从系统命令中,获取定时任务
p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print(u"找不到shell运行命令crontab", file=sys.stderr)
exit(1)
remain_cron_list = []
exist_flag = False
old_cron_content = ''
for i in p.stdout.readlines():
if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0:
old_cron_content = i.decode("utf-8")
exist_flag = True
else:
remain_cron_list.append(i.decode("utf-8"))
if operate == "add" and not exist_flag:
remain_cron_list.append(cron_content)
remain_cron_list.append("\n")
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr)
if operate == "del" and exist_flag:
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr)
# os.remove(tmp_cron_file)
def check_pids_in_cwd(gpid=None):
print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME))
runing_pids = []
for pid in psutil.pids():
try:
p = psutil.Process(pid)
p_name = p.name()
if not p_name.endswith('python'):
continue
p_cwd = p.cwd()
if p_cwd != base_path:
continue
p_cmdline = p.cmdline()
if PROGRAM_NAME not in p_cmdline:
continue
runing_pids.append(pid)
except:
pass
if len(runing_pids) > 1:
if gpid is not None:
if gpid in runing_pids:
print(u'排除其他pid')
runing_pids.remove(gpid)
else:
print(u'gpid不在运行清单中排除首个pid')
runing_pids.pop(0)
else:
print(u'gpid为空排除首个pid')
runing_pids.pop(0)
for pid in runing_pids:
try:
p = psutil.Process(pid)
print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行'
.format(pid, p.name, p.exe(), p.cwd(), p.cmdline()))
import signal
os.kill(int(pid), signal.SIGKILL)
except:
pass
def start():
print(u'======start========')
# 往任务表增加定时计划
operate_crontab("add")
print(u'任务表增加定时计划完毕')
# 执行启动
# _start()
print(u'启动{}服务执行完毕'.format(base_path))
def _stop():
print(u'======stop========')
# 在任务表删除定时计划
operate_crontab("del")
# 查询进程组id
gpid = _status()
if gpid:
# 进程组存在,杀死进程
import signal
# 杀死进程组
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'等待{}'.format(i))
print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
else:
print(u'{}服务进程没有运行'.format(base_path))
def stop():
"""
停止服务
:return:
"""
_stop()
print(u'执行停止{}服务完成'.format(base_path))
def restart():
print(u'======restart========')
_stop()
_start()
print('执行重启{}服务完成'.format(base_path))
if __name__ == '__main__':
if len(sys.argv) >= 2:
fun = sys.argv[1]
else:
fun = ''
if fun == 'status':
status()
elif fun == 'start':
start()
elif fun == 'stop':
stop()
elif fun == 'restart':
restart()
elif fun == 'schedule':
schedule()
else:
print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__)))
status()

View File

@ -0,0 +1,4 @@
{
"validatekey": "a51dc23f-443b-4321-a956-300b6498304c",
"cookie_str": "st_si=49015368899232;eastmoney_txzq_zjzh=NTQwMzMwMjE1NTM5fA%3D%3D;st_asi=delete;Yybdm=5403;st_pvi=31579671208450;st_inirUrl=https%3A%2F%2Fjy.xzsec.com%2FTrade%2FBuy;Uid=qPx0RsvGLgDoiKf0CU47yg%3d%3d;Khmc=%e6%9d%8e%e7%a4%ba%e4%bd%b3;st_sp=2021-10-31%2014%3A39%3A20;st_sn=2;st_psi=20211031143926579-11923323313501-2985332488;mobileimei=3529e943-4e71-4958-b784-7dcb6554b418;Uuid=c45d2d47879749cc91911f1a2dd1bda0"
}

Binary file not shown.

View File

@ -0,0 +1,29 @@
{
"font.family": "Arial",
"font.size": 12,
"log.active": true,
"log.level": 10,
"log.console": true,
"log.file": true,
"email.server": "smtp.qq.com",
"email.port": 465,
"email.username": "",
"email.password": "",
"email.sender": "",
"email.receiver": "",
"rqdata.username": "",
"rqdata.password": "",
"database.driver": "sqlite",
"database.database": "database.db",
"database.host": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "",
"database.authentication_source": "admin",
"huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price",
"huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price",
"renko.host": "192.168.1.214",
"hams.host": "192.168.1.214",
"rabbitmq.host": "192.168.1.211",
"gateway_name": "em02"
}

View File

@ -0,0 +1,17 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
#$CONDA_HOME/bin/conda deactivate
#$CONDA_HOME/bin/conda activate py37
############ Added by Huang Jianwei at 2018-04-03
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./run_service.py
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log &

View File

@ -0,0 +1,16 @@
{
"mongo_db":
{
"host": "192.168.1.214",
"port": 27017
},
"accounts":
{
"em02":
{
"copy_history_trades": true,
"copy_history_orders": true
}
},
"event_list": ["eStrategyPos.","eStrategySnapshot."]
}

View File

@ -0,0 +1,4 @@
{
"主动请求地址": "tcp://127.0.0.1:102201",
"推送订阅地址": "tcp://127.0.0.1:102202"
}

View File

@ -0,0 +1,5 @@
{
"accountid" : "540330215539[普通]",
"strategy_group": "cta_stock_01",
"compare_pos": false
}

View File

@ -0,0 +1,21 @@
{
"stock_grids_etf": {
"class_name": "StrategyStockGridTradeV3",
"vt_symbols": [
"159995.SZSE"
],
"auto_init": true,
"auto_start": true,
"setting": {
"backtesting": false,
"comments": [
"芯片ETF"
],
"max_invest_percent": 10,
"max_invest_margin": 100000,
"max_single_margin": 2000,
"grid_height_percent": 4,
"grid_lots": 15
}
}
}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,4 @@
{
"rep_address": "tcp://127.0.0.1:102203",
"pub_address": "tcp://127.0.0.1:102204"
}

View File

@ -0,0 +1,117 @@
# flake8: noqa
import os
import sys
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
#from vnpy.gateway.binancef import BinancefGateway
# from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.ctptest import CtptestGateway
# from vnpy.gateway.mini import MiniGateway
# from vnpy.gateway.sopt import SoptGateway
# from vnpy.gateway.minitest import MinitestGateway
# from vnpy.gateway.femas import FemasGateway
# from vnpy.gateway.tiger import TigerGateway
# from vnpy.gateway.oes import OesGateway
# from vnpy.gateway.okex import OkexGateway
# from vnpy.gateway.huobi import HuobiGateway
# from vnpy.gateway.bitfinex import BitfinexGateway
# from vnpy.gateway.onetoken import OnetokenGateway
# from vnpy.gateway.okexf import OkexfGateway
# from vnpy.gateway.okexs import OkexsGateway
# from vnpy.gateway.xtp import XtpGateway
# from vnpy.gateway.hbdm import HbdmGateway
# from vnpy.gateway.tap import TapGateway
# from vnpy.gateway.tora import ToraGateway
# from vnpy.gateway.alpaca import AlpacaGateway
# from vnpy.gateway.da import DaGateway
# from vnpy.gateway.coinbase import CoinbaseGateway
# from vnpy.gateway.bitstamp import BitstampGateway
# from vnpy.gateway.gateios import GateiosGateway
# from vnpy.gateway.bybit import BybitGateway
# from vnpy.gateway.eastmoney import EastmoneyGateway
from vnpy.gateway.rpc import RpcGateway
# from vnpy.app.cta_crypto import CtaCryptoApp
from vnpy.app.cta_stock import CtaStockApp
# from vnpy.app.csv_loader import CsvLoaderApp
# from vnpy.app.algo_trading import AlgoTradingApp
# from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.tick_recorder import TickRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp
# from vnpy.app.script_trader import ScriptTraderApp
# from vnpy.app.rpc_service import RpcServiceApp
# from vnpy.app.spread_trading import SpreadTradingApp
# from vnpy.app.portfolio_manager import PortfolioManagerApp
def main():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(RpcGateway, 'em02_gw')
#main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(MiniGateway)
# main_engine.add_gateway(SoptGateway)
# main_engine.add_gateway(MinitestGateway)
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
#main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway)
# main_engine.add_gateway(HuobiGateway)
# main_engine.add_gateway(BitfinexGateway)
# main_engine.add_gateway(OnetokenGateway)
# main_engine.add_gateway(OkexfGateway)
# main_engine.add_gateway(HbdmGateway)
# main_engine.add_gateway(XtpGateway)
# main_engine.add_gateway(TapGateway)
# main_engine.add_gateway(ToraGateway)
# main_engine.add_gateway(AlpacaGateway)
# main_engine.add_gateway(OkexsGateway)
# main_engine.add_gateway(DaGateway)
# main_engine.add_gateway(CoinbaseGateway)
# main_engine.add_gateway(BitstampGateway)
# main_engine.add_gateway(GateiosGateway)
#main_engine.add_gateway(BybitGateway)
#main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaStockApp)
#main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)
#main_engine.add_app(DataRecorderApp)
#main_engine.add_app(TickRecorderApp)
# main_engine.add_app(RiskManagerApp)
# main_engine.add_app(ScriptTraderApp)
# main_engine.add_app(RpcServiceApp)
# main_engine.add_app(SpreadTradingApp)
#main_engine.add_app(PortfolioManagerApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,159 @@
# flake8: noqa
import os
import sys
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import DEBUG
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine, EVENT_TIMER
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json
from vnpy.gateway.rpc import RpcGateway
from vnpy.app.cta_stock import CtaStockApp
#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG
from vnpy.app.rpc_service import RpcServiceApp
#from vnpy.app.algo_broker import AlgoBrokerApp
from vnpy.app.account_recorder import AccountRecorderApp
from vnpy.trader.util_pid import update_pid
from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor
SETTINGS["log.active"] = True
SETTINGS["log.level"] = DEBUG
SETTINGS["log.console"] = True
SETTINGS["log.file"] = True
gateway_name = 'em02_gw'
gw_setting = load_json(f'connect_{gateway_name}.json')
import types
import traceback
def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None:
"""
Raise exception under debug mode
"""
sys.__excepthook__(exctype, value, tb)
msg = "".join(traceback.format_exception(exctype, value, tb))
print(msg, file=sys.stderr)
class DaemonService(object):
def __init__(self):
self.event_engine = EventEngine()
self.g_count = 0
self.last_dt = datetime.now()
# 创建账号/持仓/委托/交易/日志记录
self.acc_monitor = AccountMonitor(self.event_engine)
self.pos_monitor = PositionMonitor(self.event_engine)
self.ord_monitor = OrderMonitor(self.event_engine)
self.trd_monitor = TradeMonitor(self.event_engine)
#self.log_monitor = LogMonitor(self.event_engine)
# 创建主引擎
self.main_engine = MainEngine(self.event_engine)
self.save_data_time = None
self.save_snapshot_time = None
# 注册定时器,用于判断重连
self.event_engine.register(EVENT_TIMER, self.on_timer)
def on_timer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 60秒才执行一次检查
self.g_count += 1
if self.g_count <= 60:
return
# 强制写入一次gpid
update_pid()
self.g_count = 0
dt = datetime.now()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
#print(u'run_server.py checkpoint:{0}'.format(dt))
self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt))
# ctp 短线重连得处理
# 定时保存策略内数据
if dt.strftime('%H:%M') in ['02:31', '15:16']:
if self.save_data_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内数据')
self.save_data_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_data('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']:
if self.save_snapshot_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内K线切片数据')
self.save_snapshot_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_snapshot('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
def start(self):
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
timer_count = 0
# 远程调用服务
rpc_server = self.main_engine.add_app(RpcServiceApp)
ret, msg = rpc_server.start()
if not ret:
self.main_engine.write_log(f"RPC服务未能启动:{msg}")
return
else:
self.main_engine.write_log(f'RPC服务已启动')
update_pid()
# 添加账号同步app
self.main_engine.add_app(AccountRecorderApp)
# 接入网关
self.main_engine.add_gateway(RpcGateway, gateway_name)
self.main_engine.write_log(f"连接{gateway_name}接口")
self.main_engine.connect(gw_setting, gateway_name)
sleep(5)
# 添加cta引擎
cta_engine = self.main_engine.add_app(CtaStockApp)
cta_engine.init_engine()
# 添加算法引擎代理
#self.main_engine.add_app(AlgoBrokerApp)
self.main_engine.write_log("主引擎创建成功")
while True:
sleep(1)
if __name__ == "__main__":
sys.excepthook = excepthook
s = DaemonService()
s.start()

View File

@ -0,0 +1,394 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# python 3 环境
# 激活 activate.sh (激活py35 env启动运行程序
import sys
import time
from datetime import datetime
# import commands
import os
import subprocess
import psutil
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.trader.util_wechat import send_wx_msg
# python容器文件
python_path = '/home/trade/anaconda3/envs/py37/bin/python'
# shell 文件不使用sh
bash = "/bin/bash"
# 配置内容
# 运行时间段
# 是否7X24小时运行数字货币
IS_7x24 = False
# 是否激活夜盘
ACTIVE_NIGHT = False
# python 脚本这里要和activate.sh里面得PROGRAM_NAME 一致
PROGRAM_NAME = './run_service.py'
# 日志目录
log_path = os.path.abspath(os.path.join(os.getcwd(), 'log'))
if os.path.isdir(log_path):
# 如果工作目录下存在logs子目录就使用logs子目录
base_path = os.getcwd()
else:
# 使用service.py所在得目录
base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
# 进程组id保存文件
gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt'))
tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp'))
program_file = os.path.join(base_path, 'activate.sh')
log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log'))
error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log'))
cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log'))
cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log'))
null_file = "/dev/null"
cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file,
cron_error_file)
program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file)
USE_GPID = False
def _check_gpid(gpid):
"""
检查进程()ID
:param gpid:
:return: True, 正在运行/ False: 没有运行
"""
if not USE_GPID:
int_gpid = int(gpid)
return psutil.pid_exists(int_gpid)
try:
# 通过系统子进程打开ps命令找到gpid下得所有进程
p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print('找不到shell运行命令ps', file=sys.stderr)
exit(1)
# print('returncode1:{}'.format(returncode))
try:
p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
returncode = p2.wait()
except OSError as e:
print(u'找不到shell运行命令uniq', file=sys.stderr)
exit(1)
# print('returncode2:{}'.format(returncode))
for i in p2.stdout.readlines():
# print (u'p2.line:{}'.format(i))
if i.decode().strip() == gpid:
print(u'找到gpid:{}'.format(gpid))
return True
print(u'找不到gpid:{}'.format(gpid))
return False
def _status():
"""
查询当前状态
:return:
"""
print(u'检查{}'.format(gpid_file))
if os.path.exists(gpid_file):
with open(gpid_file, 'r') as f:
gpid = f.read().strip()
print(u'gpid={}'.format(gpid))
if gpid != "" and _check_gpid(gpid):
return gpid
return None
def trade_off():
"""检查现在是否为非交易时间"""
now = datetime.now()
# 数字货币
if IS_7x24:
if now.hour == 12 and now.minute == 0:
return True
else:
return False
# 国内期货/股票
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=9, minute=10, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0)
# 国内期货有夜盘
if ACTIVE_NIGHT:
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (
now.isoweekday() == 1 and now <= a)
off = (a <= now <= b) or (c <= now <= d) or weekend
return off
else:
weekend = now.isoweekday() in [6, 7]
off = now <= b or c <= now or weekend
return off
def _start():
"""
启动服务
:return:
"""
# 获取进程组id
gpid = _status()
if trade_off():
# 属于停止运行期间
if gpid:
print(u'现在属于停止运行时间进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid))
import signal
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'杀死进程中,等待{}'.format(i))
if i > 30:
print(u'杀死进程失败,退出')
exit(1)
print('进程组已停止运行[gpid={}]'.format(gpid))
send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path))
else:
print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now()))
else:
# 属于运行时间
if not gpid:
print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command))
if os.path.isfile(gpid_file):
print(u'{0}文件存在,先执行删除'.format(gpid_file))
try:
os.remove(gpid_file)
except:
pass
os.popen(program_command)
i = 0
while True:
gpid = _status()
if gpid:
print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid))
send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid))
break
i += 1
print(u'启动进程中,等待{}'.format(i))
if i > 30:
print(u'启动进程失败,退出')
exit(1)
time.sleep(1)
else:
print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path))
def schedule():
"""
crontab 计划执行
:return:
"""
print('======schedule========')
_start()
def status():
"""查看状态"""
print('======status========')
gpid = _status()
if gpid:
print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid))
else:
print('{}服务进程没有运行.'.format(base_path))
check_pids_in_cwd(gpid)
# operate的可选字符串为add, del
def operate_crontab(operate):
"""
操作crontab
:param operate: add , del
:return:
"""
try:
# 从系统命令中,获取定时任务
p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print(u"找不到shell运行命令crontab", file=sys.stderr)
exit(1)
remain_cron_list = []
exist_flag = False
old_cron_content = ''
for i in p.stdout.readlines():
if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0:
old_cron_content = i.decode("utf-8")
exist_flag = True
else:
remain_cron_list.append(i.decode("utf-8"))
if operate == "add" and not exist_flag:
remain_cron_list.append(cron_content)
remain_cron_list.append("\n")
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr)
if operate == "del" and exist_flag:
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr)
# os.remove(tmp_cron_file)
def check_pids_in_cwd(gpid=None):
print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME))
runing_pids = []
for pid in psutil.pids():
try:
p = psutil.Process(pid)
p_name = p.name()
if not p_name.endswith('python'):
continue
p_cwd = p.cwd()
if p_cwd != base_path:
continue
p_cmdline = p.cmdline()
if PROGRAM_NAME not in p_cmdline:
continue
runing_pids.append(pid)
except:
pass
if len(runing_pids) > 1:
if gpid is not None:
if gpid in runing_pids:
print(u'排除其他pid')
runing_pids.remove(gpid)
else:
print(u'gpid不在运行清单中排除首个pid')
runing_pids.pop(0)
else:
print(u'gpid为空排除首个pid')
runing_pids.pop(0)
for pid in runing_pids:
try:
p = psutil.Process(pid)
print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行'
.format(pid, p.name, p.exe(), p.cwd(), p.cmdline()))
import signal
os.kill(int(pid), signal.SIGKILL)
except:
pass
def start():
print(u'======start========')
# 往任务表增加定时计划
operate_crontab("add")
print(u'任务表增加定时计划完毕')
# 执行启动
# _start()
print(u'启动{}服务执行完毕'.format(base_path))
def _stop():
print(u'======stop========')
# 在任务表删除定时计划
operate_crontab("del")
# 查询进程组id
gpid = _status()
if gpid:
# 进程组存在,杀死进程
import signal
# 杀死进程组
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'等待{}'.format(i))
print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
else:
print(u'{}服务进程没有运行'.format(base_path))
def stop():
"""
停止服务
:return:
"""
_stop()
print(u'执行停止{}服务完成'.format(base_path))
def restart():
print(u'======restart========')
_stop()
_start()
print('执行重启{}服务完成'.format(base_path))
if __name__ == '__main__':
if len(sys.argv) >= 2:
fun = sys.argv[1]
else:
fun = ''
if fun == 'status':
status()
elif fun == 'start':
start()
elif fun == 'stop':
stop()
elif fun == 'restart':
restart()
elif fun == 'schedule':
schedule()
else:
print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__)))
status()

Binary file not shown.

View File

@ -0,0 +1,29 @@
{
"font.family": "Arial",
"font.size": 12,
"log.active": true,
"log.level": 10,
"log.console": true,
"log.file": true,
"email.server": "smtp.qq.com",
"email.port": 465,
"email.username": "",
"email.password": "",
"email.sender": "",
"email.receiver": "",
"rqdata.username": "",
"rqdata.password": "",
"database.driver": "sqlite",
"database.database": "database.db",
"database.host": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "",
"database.authentication_source": "admin",
"huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price",
"huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price",
"renko.host": "192.168.1.214",
"hams.host": "192.168.1.214",
"rabbitmq.host": "192.168.1.211",
"gateway_name": "em02"
}

View File

@ -0,0 +1,17 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
#$CONDA_HOME/bin/conda deactivate
#$CONDA_HOME/bin/conda activate py37
############ Added by Huang Jianwei at 2018-04-03
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./run_service.py
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME >$BASE_PATH/log/service.log 2>>$BASE_PATH/log/service-error.log &

View File

@ -0,0 +1,16 @@
{
"mongo_db":
{
"host": "192.168.1.214",
"port": 27017
},
"accounts":
{
"em02":
{
"copy_history_trades": true,
"copy_history_orders": true
}
},
"event_list": ["eStrategyPos.","eStrategySnapshot."]
}

View File

@ -0,0 +1,4 @@
{
"主动请求地址": "tcp://127.0.0.1:102201",
"推送订阅地址": "tcp://127.0.0.1:102202"
}

View File

@ -0,0 +1,6 @@
{
"accountid" : "540330215539[普通]",
"strategy_group": "cta_stock_02",
"get_pos_from_db": true,
"compare_pos": true
}

View File

@ -0,0 +1,23 @@
{
"stock_three_buy_d1": {
"class_name": "StrategyStock3rdBuyGroupV1",
"vt_symbols": [],
"auto_init": true,
"auto_start": true,
"setting": {
"comment": "日线三买",
"backtesting": false,
"max_invest_rate": 0.5,
"max_single_margin": 20000,
"stock_name_filters": [
"st",
"退",
"地产"
],
"screener_strategies": [
"stock_screener_ThreeBuy_D1_SZSE",
"stock_screener_ThreeBuy_D1_SSE"
]
}
}
}

View File

@ -0,0 +1 @@
{}

View File

@ -0,0 +1,4 @@
{
"rep_address": "tcp://127.0.0.1:102205",
"pub_address": "tcp://127.0.0.1:102206"
}

View File

@ -0,0 +1,117 @@
# flake8: noqa
import os
import sys
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
#from vnpy.gateway.binancef import BinancefGateway
# from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.ctptest import CtptestGateway
# from vnpy.gateway.mini import MiniGateway
# from vnpy.gateway.sopt import SoptGateway
# from vnpy.gateway.minitest import MinitestGateway
# from vnpy.gateway.femas import FemasGateway
# from vnpy.gateway.tiger import TigerGateway
# from vnpy.gateway.oes import OesGateway
# from vnpy.gateway.okex import OkexGateway
# from vnpy.gateway.huobi import HuobiGateway
# from vnpy.gateway.bitfinex import BitfinexGateway
# from vnpy.gateway.onetoken import OnetokenGateway
# from vnpy.gateway.okexf import OkexfGateway
# from vnpy.gateway.okexs import OkexsGateway
# from vnpy.gateway.xtp import XtpGateway
# from vnpy.gateway.hbdm import HbdmGateway
# from vnpy.gateway.tap import TapGateway
# from vnpy.gateway.tora import ToraGateway
# from vnpy.gateway.alpaca import AlpacaGateway
# from vnpy.gateway.da import DaGateway
# from vnpy.gateway.coinbase import CoinbaseGateway
# from vnpy.gateway.bitstamp import BitstampGateway
# from vnpy.gateway.gateios import GateiosGateway
# from vnpy.gateway.bybit import BybitGateway
# from vnpy.gateway.eastmoney import EastmoneyGateway
from vnpy.gateway.rpc import RpcGateway
# from vnpy.app.cta_crypto import CtaCryptoApp
from vnpy.app.cta_stock import CtaStockApp
# from vnpy.app.csv_loader import CsvLoaderApp
# from vnpy.app.algo_trading import AlgoTradingApp
# from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.tick_recorder import TickRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp
# from vnpy.app.script_trader import ScriptTraderApp
# from vnpy.app.rpc_service import RpcServiceApp
# from vnpy.app.spread_trading import SpreadTradingApp
# from vnpy.app.portfolio_manager import PortfolioManagerApp
def main():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(RpcGateway, 'em02_gw')
#main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(MiniGateway)
# main_engine.add_gateway(SoptGateway)
# main_engine.add_gateway(MinitestGateway)
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
#main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway)
# main_engine.add_gateway(HuobiGateway)
# main_engine.add_gateway(BitfinexGateway)
# main_engine.add_gateway(OnetokenGateway)
# main_engine.add_gateway(OkexfGateway)
# main_engine.add_gateway(HbdmGateway)
# main_engine.add_gateway(XtpGateway)
# main_engine.add_gateway(TapGateway)
# main_engine.add_gateway(ToraGateway)
# main_engine.add_gateway(AlpacaGateway)
# main_engine.add_gateway(OkexsGateway)
# main_engine.add_gateway(DaGateway)
# main_engine.add_gateway(CoinbaseGateway)
# main_engine.add_gateway(BitstampGateway)
# main_engine.add_gateway(GateiosGateway)
#main_engine.add_gateway(BybitGateway)
#main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaStockApp)
#main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)
#main_engine.add_app(DataRecorderApp)
#main_engine.add_app(TickRecorderApp)
# main_engine.add_app(RiskManagerApp)
# main_engine.add_app(ScriptTraderApp)
# main_engine.add_app(RpcServiceApp)
# main_engine.add_app(SpreadTradingApp)
#main_engine.add_app(PortfolioManagerApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,112 @@
# flake8: noqa
import os
import sys
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import DEBUG
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine, EVENT_TIMER
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json
# from vnpy.gateway.gj import GjGateway
from vnpy.app.stock_screener import ScreenerApp
# from vnpy.app.cta_stock import CtaStockApp
# from vnpy.app.cta_crypto.base import EVENT_CTA_LOG
from vnpy.app.rpc_service import RpcServiceApp
# from vnpy.app.algo_broker import AlgoBrokerApp
from vnpy.app.account_recorder import AccountRecorderApp
from vnpy.trader.util_pid import update_pid
# from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor
SETTINGS["log.active"] = True
SETTINGS["log.level"] = DEBUG
SETTINGS["log.console"] = True
SETTINGS["log.file"] = True
gateway_name = 'em02'
import types
import traceback
def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None:
"""
Raise exception under debug mode
"""
sys.__excepthook__(exctype, value, tb)
msg = "".join(traceback.format_exception(exctype, value, tb))
print(msg, file=sys.stderr)
class DaemonService(object):
def __init__(self):
self.event_engine = EventEngine()
self.g_count = 0
self.last_dt = datetime.now()
# 创建主引擎
self.main_engine = MainEngine(self.event_engine)
self.save_data_time = None
self.save_snapshot_time = None
# 注册定时器,用于判断重连
self.event_engine.register(EVENT_TIMER, self.on_timer)
def on_timer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 60秒才执行一次检查
self.g_count += 1
if self.g_count <= 60:
return
self.g_count = 0
dt = datetime.now()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
print(u'run_screener.py checkpoint:{0}'.format(dt))
self.main_engine.write_log(u'run_screener.py checkpoint:{0}'.format(dt))
def start(self):
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
# 添加选股引擎
screen_engine = self.main_engine.add_app(ScreenerApp)
screen_engine.init_engine()
self.main_engine.write_log("主引擎创建成功")
while True:
sleep(1)
if screen_engine.get_all_completed_status():
from vnpy.trader.util_wechat import send_wx_msg
msg = f'{gateway_name}完成所有选股任务'
send_wx_msg(content=msg)
self.main_engine.write_log(msg)
sleep(10)
os._exit(0)
if __name__ == "__main__":
sys.excepthook = excepthook
s = DaemonService()
s.start()

View File

@ -0,0 +1,159 @@
# flake8: noqa
import os
import sys
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import DEBUG
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine, EVENT_TIMER
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import load_json
from vnpy.gateway.rpc import RpcGateway
from vnpy.app.cta_stock import CtaStockApp
#from vnpy.app.cta_crypto.base import EVENT_CTA_LOG
from vnpy.app.rpc_service import RpcServiceApp
#from vnpy.app.algo_broker import AlgoBrokerApp
from vnpy.app.account_recorder import AccountRecorderApp
from vnpy.trader.util_pid import update_pid
from vnpy.trader.util_monitor import OrderMonitor, TradeMonitor, PositionMonitor, AccountMonitor, LogMonitor
SETTINGS["log.active"] = True
SETTINGS["log.level"] = DEBUG
SETTINGS["log.console"] = True
SETTINGS["log.file"] = True
gateway_name = 'em02_gw'
gw_setting = load_json(f'connect_{gateway_name}.json')
import types
import traceback
def excepthook(exctype: type, value: Exception, tb: types.TracebackType) -> None:
"""
Raise exception under debug mode
"""
sys.__excepthook__(exctype, value, tb)
msg = "".join(traceback.format_exception(exctype, value, tb))
print(msg, file=sys.stderr)
class DaemonService(object):
def __init__(self):
self.event_engine = EventEngine()
self.g_count = 0
self.last_dt = datetime.now()
# 创建账号/持仓/委托/交易/日志记录
self.acc_monitor = AccountMonitor(self.event_engine)
self.pos_monitor = PositionMonitor(self.event_engine)
self.ord_monitor = OrderMonitor(self.event_engine)
self.trd_monitor = TradeMonitor(self.event_engine)
#self.log_monitor = LogMonitor(self.event_engine)
# 创建主引擎
self.main_engine = MainEngine(self.event_engine)
self.save_data_time = None
self.save_snapshot_time = None
# 注册定时器,用于判断重连
self.event_engine.register(EVENT_TIMER, self.on_timer)
def on_timer(self, event):
"""定时器执行逻辑,每十秒执行一次"""
# 60秒才执行一次检查
self.g_count += 1
if self.g_count <= 60:
return
# 强制写入一次gpid
update_pid()
self.g_count = 0
dt = datetime.now()
if dt.hour != self.last_dt.hour:
self.last_dt = dt
#print(u'run_server.py checkpoint:{0}'.format(dt))
self.main_engine.write_log(u'run_server.py checkpoint:{0}'.format(dt))
# ctp 短线重连得处理
# 定时保存策略内数据
if dt.strftime('%H:%M') in ['02:31', '15:16']:
if self.save_data_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内数据')
self.save_data_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_data('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
if dt.strftime('%H:%M') in ['02:32', '10:16', '11:31', '15:17', '23:01']:
if self.save_snapshot_time != dt.strftime('%H:%M'):
self.main_engine.write_log(u'保存策略内K线切片数据')
self.save_snapshot_time = dt.strftime('%H:%M')
try:
self.main_engine.save_strategy_snapshot('ALL')
except Exception as ex:
self.main_engine.write_error('保存策略内数据异常')
def start(self):
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
timer_count = 0
# 远程调用服务
rpc_server = self.main_engine.add_app(RpcServiceApp)
ret, msg = rpc_server.start()
if not ret:
self.main_engine.write_log(f"RPC服务未能启动:{msg}")
return
else:
self.main_engine.write_log(f'RPC服务已启动')
update_pid()
# 添加账号同步app
self.main_engine.add_app(AccountRecorderApp)
# 接入网关
self.main_engine.add_gateway(RpcGateway, gateway_name)
self.main_engine.write_log(f"连接{gateway_name}接口")
self.main_engine.connect(gw_setting, gateway_name)
sleep(5)
# 添加cta引擎
cta_engine = self.main_engine.add_app(CtaStockApp)
cta_engine.init_engine()
# 添加算法引擎代理
#self.main_engine.add_app(AlgoBrokerApp)
self.main_engine.write_log("主引擎创建成功")
while True:
sleep(1)
if __name__ == "__main__":
sys.excepthook = excepthook
s = DaemonService()
s.start()

View File

@ -0,0 +1,24 @@
{
"stock_screener_ThreeBuy_D1_SSE": {
"class_name": "StrategyChanlunThreeBuy",
"auto_init": true,
"auto_start": true,
"setting": {
"vt_symbols": [],
"all_stocks": true,
"exchange": "SSE",
"bar_name": "D1"
}
},
"stock_screener_ThreeBuy_D1_SZSE": {
"class_name": "StrategyChanlunThreeBuy",
"auto_init": true,
"auto_start": true,
"setting": {
"vt_symbols": [],
"all_stocks": true,
"exchange": "SZSE",
"bar_name": "D1"
}
}
}

View File

@ -0,0 +1,394 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# python 3 环境
# 激活 activate.sh (激活py35 env启动运行程序
import sys
import time
from datetime import datetime
# import commands
import os
import subprocess
import psutil
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.trader.util_wechat import send_wx_msg
# python容器文件
python_path = '/home/trade/anaconda3/envs/py37/bin/python'
# shell 文件不使用sh
bash = "/bin/bash"
# 配置内容
# 运行时间段
# 是否7X24小时运行数字货币
IS_7x24 = False
# 是否激活夜盘
ACTIVE_NIGHT = False
# python 脚本这里要和activate.sh里面得PROGRAM_NAME 一致
PROGRAM_NAME = './run_service.py'
# 日志目录
log_path = os.path.abspath(os.path.join(os.getcwd(), 'log'))
if os.path.isdir(log_path):
# 如果工作目录下存在logs子目录就使用logs子目录
base_path = os.getcwd()
else:
# 使用service.py所在得目录
base_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
# 进程组id保存文件
gpid_file = os.path.abspath(os.path.join(base_path, 'log', 'gpid.txt'))
tmp_cron_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.tmp'))
program_file = os.path.join(base_path, 'activate.sh')
log_file = os.path.abspath(os.path.join(base_path, 'log', 'service.log'))
error_file = os.path.abspath(os.path.join(base_path, 'log', 'service-error.log'))
cron_log_file = os.path.abspath(os.path.join(base_path, 'log', 'cron.log'))
cron_error_file = os.path.abspath(os.path.join(base_path, 'log', 'cron-error.log'))
null_file = "/dev/null"
cron_content = "* * * * * {} {} schedule >>{} 2>>{}".format(python_path, os.path.realpath(__file__), cron_log_file,
cron_error_file)
program_command = "nohup {} {} >>{} 2>>{} &".format(bash, program_file, log_file, error_file)
USE_GPID = False
def _check_gpid(gpid):
"""
检查进程()ID
:param gpid:
:return: True, 正在运行/ False: 没有运行
"""
if not USE_GPID:
int_gpid = int(gpid)
return psutil.pid_exists(int_gpid)
try:
# 通过系统子进程打开ps命令找到gpid下得所有进程
p = subprocess.Popen(["ps", "-A", "-o", "pgrp="], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print('找不到shell运行命令ps', file=sys.stderr)
exit(1)
# print('returncode1:{}'.format(returncode))
try:
p2 = subprocess.Popen("uniq", stdin=p.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
returncode = p2.wait()
except OSError as e:
print(u'找不到shell运行命令uniq', file=sys.stderr)
exit(1)
# print('returncode2:{}'.format(returncode))
for i in p2.stdout.readlines():
# print (u'p2.line:{}'.format(i))
if i.decode().strip() == gpid:
print(u'找到gpid:{}'.format(gpid))
return True
print(u'找不到gpid:{}'.format(gpid))
return False
def _status():
"""
查询当前状态
:return:
"""
print(u'检查{}'.format(gpid_file))
if os.path.exists(gpid_file):
with open(gpid_file, 'r') as f:
gpid = f.read().strip()
print(u'gpid={}'.format(gpid))
if gpid != "" and _check_gpid(gpid):
return gpid
return None
def trade_off():
"""检查现在是否为非交易时间"""
now = datetime.now()
# 数字货币
if IS_7x24:
if now.hour == 12 and now.minute == 0:
return True
else:
return False
# 国内期货/股票
a = datetime.now().replace(hour=2, minute=35, second=0, microsecond=0)
b = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0)
c = datetime.now().replace(hour=15, minute=30, second=0, microsecond=0)
d = datetime.now().replace(hour=20, minute=45, second=0, microsecond=0)
# 国内期货有夜盘
if ACTIVE_NIGHT:
weekend = (now.isoweekday() == 6 and now >= a) or (now.isoweekday() == 7) or (
now.isoweekday() == 1 and now <= a)
off = (a <= now <= b) or (c <= now <= d) or weekend
return off
else:
weekend = now.isoweekday() in [6, 7]
off = now <= b or c <= now or weekend
return off
def _start():
"""
启动服务
:return:
"""
# 获取进程组id
gpid = _status()
if trade_off():
# 属于停止运行期间
if gpid:
print(u'现在属于停止运行时间进程组ID存在,将杀死服务进程:[gpid={}]'.format(gpid))
import signal
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'杀死进程中,等待{}'.format(i))
if i > 30:
print(u'杀死进程失败,退出')
exit(1)
print('进程组已停止运行[gpid={}]'.format(gpid))
send_wx_msg('进程组{}已停止运行[{}]'.format(gpid, base_path))
else:
print(u'{} 现在属于停止运行时间,不启动服务'.format(datetime.now()))
else:
# 属于运行时间
if not gpid:
print(u'{}属于运行时间,将启动服务:{}'.format(datetime.now(), program_command))
if os.path.isfile(gpid_file):
print(u'{0}文件存在,先执行删除'.format(gpid_file))
try:
os.remove(gpid_file)
except:
pass
os.popen(program_command)
i = 0
while True:
gpid = _status()
if gpid:
print('{}属于运行时间,成功启动服务[gpid={}]'.format(datetime.now(), gpid))
send_wx_msg('{}属于运行时间,成功启动服务[{},gpid={}]'.format(datetime.now(), base_path, gpid))
break
i += 1
print(u'启动进程中,等待{}'.format(i))
if i > 30:
print(u'启动进程失败,退出')
exit(1)
time.sleep(1)
else:
print(u'{}属于运行时间,{}服务已运行'.format(datetime.now(), base_path))
def schedule():
"""
crontab 计划执行
:return:
"""
print('======schedule========')
_start()
def status():
"""查看状态"""
print('======status========')
gpid = _status()
if gpid:
print('{}服务进程[gpid={}]正在运行'.format(base_path, gpid))
else:
print('{}服务进程没有运行.'.format(base_path))
check_pids_in_cwd(gpid)
# operate的可选字符串为add, del
def operate_crontab(operate):
"""
操作crontab
:param operate: add , del
:return:
"""
try:
# 从系统命令中,获取定时任务
p = subprocess.Popen(["crontab", "-l"], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
returncode = p.wait()
except OSError as e:
print(u"找不到shell运行命令crontab", file=sys.stderr)
exit(1)
remain_cron_list = []
exist_flag = False
old_cron_content = ''
for i in p.stdout.readlines():
if i.decode("utf-8").find(os.path.realpath(__file__) + " schedule") >= 0:
old_cron_content = i.decode("utf-8")
exist_flag = True
else:
remain_cron_list.append(i.decode("utf-8"))
if operate == "add" and not exist_flag:
remain_cron_list.append(cron_content)
remain_cron_list.append("\n")
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'添加crontab项: {}'.format(cron_content), file=sys.stderr)
if operate == "del" and exist_flag:
with open(tmp_cron_file, 'wb') as f:
for i in remain_cron_list:
f.write(i.encode("utf-8"))
os.popen("crontab {}".format(tmp_cron_file))
print(u'删除crontab item: {}'.format(old_cron_content), file=sys.stderr)
# os.remove(tmp_cron_file)
def check_pids_in_cwd(gpid=None):
print('检查{}路径下运行得python {}进程'.format(base_path, PROGRAM_NAME))
runing_pids = []
for pid in psutil.pids():
try:
p = psutil.Process(pid)
p_name = p.name()
if not p_name.endswith('python'):
continue
p_cwd = p.cwd()
if p_cwd != base_path:
continue
p_cmdline = p.cmdline()
if PROGRAM_NAME not in p_cmdline:
continue
runing_pids.append(pid)
except:
pass
if len(runing_pids) > 1:
if gpid is not None:
if gpid in runing_pids:
print(u'排除其他pid')
runing_pids.remove(gpid)
else:
print(u'gpid不在运行清单中排除首个pid')
runing_pids.pop(0)
else:
print(u'gpid为空排除首个pid')
runing_pids.pop(0)
for pid in runing_pids:
try:
p = psutil.Process(pid)
print(u'pid:{},name:{},bin:{},path:{},cmd:{},被终止运行'
.format(pid, p.name, p.exe(), p.cwd(), p.cmdline()))
import signal
os.kill(int(pid), signal.SIGKILL)
except:
pass
def start():
print(u'======start========')
# 往任务表增加定时计划
operate_crontab("add")
print(u'任务表增加定时计划完毕')
# 执行启动
# _start()
print(u'启动{}服务执行完毕'.format(base_path))
def _stop():
print(u'======stop========')
# 在任务表删除定时计划
operate_crontab("del")
# 查询进程组id
gpid = _status()
if gpid:
# 进程组存在,杀死进程
import signal
# 杀死进程组
if USE_GPID:
# 杀死进程组
os.killpg(int(gpid), signal.SIGKILL)
else:
os.kill(int(gpid), signal.SIGKILL)
i = 0
while _status():
time.sleep(1)
i += 1
print(u'等待{}'.format(i))
print(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
send_wx_msg(u'{}成功停止{}服务[gpid={}]'.format(datetime.now(), base_path, gpid))
else:
print(u'{}服务进程没有运行'.format(base_path))
def stop():
"""
停止服务
:return:
"""
_stop()
print(u'执行停止{}服务完成'.format(base_path))
def restart():
print(u'======restart========')
_stop()
_start()
print('执行重启{}服务完成'.format(base_path))
if __name__ == '__main__':
if len(sys.argv) >= 2:
fun = sys.argv[1]
else:
fun = ''
if fun == 'status':
status()
elif fun == 'start':
start()
elif fun == 'stop':
stop()
elif fun == 'restart':
restart()
elif fun == 'schedule':
schedule()
else:
print(u'Usage: {} (status|start|stop|restart)'.format(os.path.basename(__file__)))
status()

Binary file not shown.

View File

@ -0,0 +1,29 @@
{
"font.family": "Arial",
"font.size": 12,
"log.active": true,
"log.level": 10,
"log.console": true,
"log.file": true,
"email.server": "smtp.qq.com",
"email.port": 465,
"email.username": "",
"email.password": "",
"email.sender": "",
"email.receiver": "",
"rqdata.username": "",
"rqdata.password": "",
"database.driver": "sqlite",
"database.database": "database.db",
"database.host": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "",
"database.authentication_source": "admin",
"huafu.data_source_prod": "http://192.168.1.212:4000/api/v1/get_price",
"huafu.data_source": "http://127.0.0.1:4212/api/v1/get_price",
"renko.host": "192.168.1.214",
"hams.host": "192.168.1.214",
"rabbitmq.host": "192.168.1.211",
"gateway_name": "em02"
}