[增强] 增加分布式任务。 celery_app,可用于分布式回测,或计划任务执行
This commit is contained in:
parent
5208c85d77
commit
8bc0ed5bf5
62
vnpy/task/README.md
Normal file
62
vnpy/task/README.md
Normal file
@ -0,0 +1,62 @@
|
||||
### 基于Celery的分布式任务
|
||||
|
||||
# 安装
|
||||
Windows 10:
|
||||
|
||||
1) conda activate py37
|
||||
2) pip install celery
|
||||
3) pip install eventlet
|
||||
|
||||
Centos 7:
|
||||
|
||||
trade用户
|
||||
1) conda activate py37
|
||||
2) pip install celery
|
||||
|
||||
root 用户 (如果使用redis作为中介)
|
||||
1) 安装redis服务器 yum install redis
|
||||
2) 修改/etc/redis.conf ,在127.0.0.1后,增加对外的IP地址
|
||||
3) 启动redis服务器 systemctl start redis
|
||||
4) 允许外部访问(简单粗暴的方式:iptables -F)
|
||||
|
||||
root 用户(如果使用rabbitMQ作为中介)
|
||||
1) 自行百度下如何安装rabbitMQ 服务程序
|
||||
2) 缺省账号/密码: admin/admin
|
||||
3) web访问界面 http://[ip]:15672/
|
||||
|
||||
Celery flower 监控
|
||||
|
||||
1) pip install flower
|
||||
2) celery -A vnpy.task.celery_app --port=5555 --broker
|
||||
|
||||
# 启动
|
||||
windows 10
|
||||
|
||||
1) conda activate py37
|
||||
2) 进入项目目录后,例如c:\vnpy2, 运行命令:
|
||||
celery -A vnpy.task.celery_app worker -P eventlet -l debug -f celery.log
|
||||
其中:
|
||||
-A 代表运行的模块, worker,表明这是celery的worker,
|
||||
-P windows10下使用的协议
|
||||
-l (小写L),表示logging的级别, debug, info 等。
|
||||
-f 代表输出日志文件名
|
||||
3) 停止 Ctrl + C
|
||||
|
||||
# 添加任务的例子
|
||||
可在jupyter_notebook 中,或普通py程序中,往celery_app添加任务
|
||||
其中:
|
||||
1) excutue, 是celery_app中定义的Task类型方法。
|
||||
2) 它通过参数进行传递:
|
||||
- func: 你要运行的某个命名空间函数,例如vnpy.app.cta_strategy_pro.portfolio_testing.single_test
|
||||
- 后续其他是这个函数所需要的若干参数了,用dict字典即可。
|
||||
3) task_id,提前定义,可把它作为任务的唯一标识。
|
||||
调用前,可存储在mongo数据库中,调用完成后,可更新mongo数据库的结果。
|
||||
|
||||
from vnpy.task.celery_app import execute
|
||||
task_id = str(uuid1())
|
||||
print(f'添加celery 任务:{task_id}')
|
||||
execute.apply_async(kwargs={
|
||||
'func': 'vnpy.app.cta_strategy_pro.portfolio_testing.single_test',
|
||||
'test_setting': test_setting,
|
||||
'strategy_setting': strategy_setting},
|
||||
task_id=task_id)
|
99
vnpy/task/celery_app.py
Normal file
99
vnpy/task/celery_app.py
Normal file
@ -0,0 +1,99 @@
|
||||
# encoding: UTF-8
|
||||
|
||||
# Celery app
|
||||
# 该py脚本,为启动celery worker app
|
||||
# 在项目根目录下,运行 celery -A vnpy.task.celery worker
|
||||
import time
|
||||
from celery import Celery
|
||||
|
||||
import sys
|
||||
import os
|
||||
import traceback
|
||||
|
||||
from vnpy.trader.utility import import_module_by_str
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
# 添加项目目录
|
||||
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
if vnpy_root not in sys.path:
|
||||
print('append {} into sys.path'.format(vnpy_root))
|
||||
sys.path.append(vnpy_root)
|
||||
|
||||
# 使用本地配置的
|
||||
from vnpy.trader.utility import load_json
|
||||
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'celery_config.json'))
|
||||
celery_config = load_json(file_path)
|
||||
|
||||
# 使用 redis
|
||||
# broker = celery_config.get('celery_broker','redis://192.168.0.202:6379')
|
||||
#backend = celery_config.get('celery_backend','redis://192.168.0.202:6379/0')
|
||||
|
||||
# 使用rabbitMQ
|
||||
broker = celery_config.get('celery_broker', 'amqp://admin:admin@192.168.0.202:5672//')
|
||||
backend = celery_config.get('celery_backend', 'amqp://admin:admin@192.168.0.202:5672//')
|
||||
|
||||
print(u'Celery 使用redis配置:\nbroker:{}\nbackend:{}'.format(broker, backend))
|
||||
|
||||
app = Celery('vnpy_task', broker=broker)
|
||||
|
||||
# 动态导入task目录下子任务
|
||||
#app.conf.CELERY_IMPORTS = ['vnpy.task.celery_app.worker_started']
|
||||
|
||||
#app.conf.update(
|
||||
# CELERY_TASK_SERIALIZER='json',
|
||||
# CELERY_RESULT_SERIALIZER='json',
|
||||
# CELERY_ACCEPT_CONTENT=['json'],
|
||||
# CELERY_TIMEZONE='Asia/Shanghai',
|
||||
# CELERY_ENABLE_UTC=True
|
||||
# )
|
||||
|
||||
|
||||
def worker_started():
|
||||
"""发送worker启动的通知"""
|
||||
try:
|
||||
import socket
|
||||
from vnpy.trader.util_logger import setup_logger
|
||||
logger = setup_logger(file_name='celery_worker')
|
||||
logger.inf('celery worker started')
|
||||
except:
|
||||
pass
|
||||
|
||||
@app.task(bind=True)
|
||||
def execute(self, func, *args, **kwargs):
|
||||
"""
|
||||
通用的执行任务方法
|
||||
:param func: 函数方法在绝对路径字符串,mod.sub_mod.func
|
||||
:param args: 参数
|
||||
:param kwargs: 扩展参数
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
logger.info('working folder:{}'.format(os.getcwd()))
|
||||
logger.info('task id:{}'.format(self.request.id))
|
||||
logger.info('celery will execute:{}'.format(func))
|
||||
logger.info('args:{}'.format(args))
|
||||
logger.info('kwargs:{}'.format(kwargs))
|
||||
|
||||
# Str =》 加载 模块( 模块如果修改代码,会加载新得代码)
|
||||
f = import_module_by_str(func)
|
||||
|
||||
# 加载不成功得话,告警
|
||||
if f is None:
|
||||
msg = 'celery can not load function :{}'.format(func)
|
||||
print(msg, file=sys.stderr)
|
||||
return False
|
||||
|
||||
# 真正执行任务
|
||||
ret = f(*args, **kwargs)
|
||||
|
||||
# 返回执行结果给celery
|
||||
return ret
|
||||
|
||||
except Exception as ex:
|
||||
msg = 'celery execute func:{} exception:{},{}'.format(func, str(ex), traceback.format_exc())
|
||||
logger.error(msg)
|
||||
return False
|
4
vnpy/task/celery_config.json
Normal file
4
vnpy/task/celery_config.json
Normal file
@ -0,0 +1,4 @@
|
||||
{
|
||||
"celery_broker": "amqp://admin:admin@192.168.0.202:5672//",
|
||||
"celery_backend": "amqp://admin:admin@192.168.0.202:5672//"
|
||||
}
|
Loading…
Reference in New Issue
Block a user