diff --git a/vnpy/task/README.md b/vnpy/task/README.md new file mode 100644 index 00000000..63e0cfe9 --- /dev/null +++ b/vnpy/task/README.md @@ -0,0 +1,62 @@ +### 基于Celery的分布式任务 + +# 安装 +Windows 10: + + 1) conda activate py37 + 2) pip install celery + 3) pip install eventlet + +Centos 7: + + trade用户 + 1) conda activate py37 + 2) pip install celery + + root 用户 (如果使用redis作为中介) + 1) 安装redis服务器 yum install redis + 2) 修改/etc/redis.conf ,在127.0.0.1后,增加对外的IP地址 + 3) 启动redis服务器 systemctl start redis + 4) 允许外部访问(简单粗暴的方式:iptables -F) + + root 用户(如果使用rabbitMQ作为中介) + 1) 自行百度下如何安装rabbitMQ 服务程序 + 2) 缺省账号/密码: admin/admin + 3) web访问界面 http://[ip]:15672/ + +Celery flower 监控 + + 1) pip install flower + 2) celery -A vnpy.task.celery_app --port=5555 --broker + +# 启动 +windows 10 + + 1) conda activate py37 + 2) 进入项目目录后,例如c:\vnpy2, 运行命令: + celery -A vnpy.task.celery_app worker -P eventlet -l debug -f celery.log + 其中: + -A 代表运行的模块, worker,表明这是celery的worker, + -P windows10下使用的协议 + -l (小写L),表示logging的级别, debug, info 等。 + -f 代表输出日志文件名 + 3) 停止 Ctrl + C + +# 添加任务的例子 + 可在jupyter_notebook 中,或普通py程序中,往celery_app添加任务 + 其中: + 1) excutue, 是celery_app中定义的Task类型方法。 + 2) 它通过参数进行传递: + - func: 你要运行的某个命名空间函数,例如vnpy.app.cta_strategy_pro.portfolio_testing.single_test + - 后续其他是这个函数所需要的若干参数了,用dict字典即可。 + 3) task_id,提前定义,可把它作为任务的唯一标识。 + 调用前,可存储在mongo数据库中,调用完成后,可更新mongo数据库的结果。 + + from vnpy.task.celery_app import execute + task_id = str(uuid1()) + print(f'添加celery 任务:{task_id}') + execute.apply_async(kwargs={ + 'func': 'vnpy.app.cta_strategy_pro.portfolio_testing.single_test', + 'test_setting': test_setting, + 'strategy_setting': strategy_setting}, + task_id=task_id) diff --git a/vnpy/task/celery_app.py b/vnpy/task/celery_app.py new file mode 100644 index 00000000..e3e82f00 --- /dev/null +++ b/vnpy/task/celery_app.py @@ -0,0 +1,99 @@ +# encoding: UTF-8 + +# Celery app +# 该py脚本,为启动celery worker app +# 在项目根目录下,运行 celery -A vnpy.task.celery worker +import time +from celery import Celery + +import sys +import os +import traceback + +from vnpy.trader.utility import import_module_by_str +from celery.utils.log import get_task_logger + +logger = get_task_logger(__name__) + + +# 添加项目目录 +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) + +if vnpy_root not in sys.path: + print('append {} into sys.path'.format(vnpy_root)) + sys.path.append(vnpy_root) + +# 使用本地配置的 +from vnpy.trader.utility import load_json +file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'celery_config.json')) +celery_config = load_json(file_path) + +# 使用 redis +# broker = celery_config.get('celery_broker','redis://192.168.0.202:6379') +#backend = celery_config.get('celery_backend','redis://192.168.0.202:6379/0') + +# 使用rabbitMQ +broker = celery_config.get('celery_broker', 'amqp://admin:admin@192.168.0.202:5672//') +backend = celery_config.get('celery_backend', 'amqp://admin:admin@192.168.0.202:5672//') + +print(u'Celery 使用redis配置:\nbroker:{}\nbackend:{}'.format(broker, backend)) + +app = Celery('vnpy_task', broker=broker) + +# 动态导入task目录下子任务 +#app.conf.CELERY_IMPORTS = ['vnpy.task.celery_app.worker_started'] + +#app.conf.update( +# CELERY_TASK_SERIALIZER='json', +# CELERY_RESULT_SERIALIZER='json', +# CELERY_ACCEPT_CONTENT=['json'], +# CELERY_TIMEZONE='Asia/Shanghai', +# CELERY_ENABLE_UTC=True +# ) + + +def worker_started(): + """发送worker启动的通知""" + try: + import socket + from vnpy.trader.util_logger import setup_logger + logger = setup_logger(file_name='celery_worker') + logger.inf('celery worker started') + except: + pass + +@app.task(bind=True) +def execute(self, func, *args, **kwargs): + """ + 通用的执行任务方法 + :param func: 函数方法在绝对路径字符串,mod.sub_mod.func + :param args: 参数 + :param kwargs: 扩展参数 + :return: + """ + try: + logger.info('working folder:{}'.format(os.getcwd())) + logger.info('task id:{}'.format(self.request.id)) + logger.info('celery will execute:{}'.format(func)) + logger.info('args:{}'.format(args)) + logger.info('kwargs:{}'.format(kwargs)) + + # Str =》 加载 模块( 模块如果修改代码,会加载新得代码) + f = import_module_by_str(func) + + # 加载不成功得话,告警 + if f is None: + msg = 'celery can not load function :{}'.format(func) + print(msg, file=sys.stderr) + return False + + # 真正执行任务 + ret = f(*args, **kwargs) + + # 返回执行结果给celery + return ret + + except Exception as ex: + msg = 'celery execute func:{} exception:{},{}'.format(func, str(ex), traceback.format_exc()) + logger.error(msg) + return False diff --git a/vnpy/task/celery_config.json b/vnpy/task/celery_config.json new file mode 100644 index 00000000..bbb236ef --- /dev/null +++ b/vnpy/task/celery_config.json @@ -0,0 +1,4 @@ +{ + "celery_broker": "amqp://admin:admin@192.168.0.202:5672//", + "celery_backend": "amqp://admin:admin@192.168.0.202:5672//" +}