diff --git a/vnpy/task/README.md b/vnpy/task/README.md index 63e0cfe9..b18caca8 100644 --- a/vnpy/task/README.md +++ b/vnpy/task/README.md @@ -34,9 +34,11 @@ windows 10 1) conda activate py37 2) 进入项目目录后,例如c:\vnpy2, 运行命令: - celery -A vnpy.task.celery_app worker -P eventlet -l debug -f celery.log + celery worker -c 2 -A vnpy.task.celery_app -P eventlet -l debug -f celery.log 其中: - -A 代表运行的模块, worker,表明这是celery的worker, + worker,表明这是celery的worker, + -c 代表使用多少个cpu作为线程池 + -A 代表worker运行的模块, -P windows10下使用的协议 -l (小写L),表示logging的级别, debug, info 等。 -f 代表输出日志文件名 @@ -60,3 +62,6 @@ windows 10 'test_setting': test_setting, 'strategy_setting': strategy_setting}, task_id=task_id) + +# 清除任务得例子 + celery -A vnpy.task.celery_app purge diff --git a/vnpy/task/celery_app.py b/vnpy/task/celery_app.py index e3e82f00..e22b30c3 100644 --- a/vnpy/task/celery_app.py +++ b/vnpy/task/celery_app.py @@ -15,7 +15,6 @@ from celery.utils.log import get_task_logger logger = get_task_logger(__name__) - # 添加项目目录 vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) @@ -25,12 +24,13 @@ if vnpy_root not in sys.path: # 使用本地配置的 from vnpy.trader.utility import load_json + file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'celery_config.json')) celery_config = load_json(file_path) # 使用 redis # broker = celery_config.get('celery_broker','redis://192.168.0.202:6379') -#backend = celery_config.get('celery_backend','redis://192.168.0.202:6379/0') +# backend = celery_config.get('celery_backend','redis://192.168.0.202:6379/0') # 使用rabbitMQ broker = celery_config.get('celery_broker', 'amqp://admin:admin@192.168.0.202:5672//') @@ -40,10 +40,11 @@ print(u'Celery 使用redis配置:\nbroker:{}\nbackend:{}'.format(broker, backend app = Celery('vnpy_task', broker=broker) -# 动态导入task目录下子任务 -#app.conf.CELERY_IMPORTS = ['vnpy.task.celery_app.worker_started'] -#app.conf.update( +# 动态导入task目录下子任务 +# app.conf.CELERY_IMPORTS = ['vnpy.task.celery_app.worker_started'] + +# app.conf.update( # CELERY_TASK_SERIALIZER='json', # CELERY_RESULT_SERIALIZER='json', # CELERY_ACCEPT_CONTENT=['json'], @@ -56,12 +57,12 @@ def worker_started(): """发送worker启动的通知""" try: import socket - from vnpy.trader.util_logger import setup_logger - logger = setup_logger(file_name='celery_worker') - logger.inf('celery worker started') + from vnpy.trader.util_wechat import send_wx_msg + send_wx_msg(u'{} Celery Worker 启动'.format(socket.gethostname())) except: pass + @app.task(bind=True) def execute(self, func, *args, **kwargs): """