From df17ecccb4e37993614a54baae8ce403d276ac06 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Thu, 26 Mar 2020 00:07:14 +0800 Subject: [PATCH] [] --- kill_celery.sh | 13 +++++ run_celery.sh | 2 +- vnpy/app/cta_crypto/engine.py | 2 +- vnpy/app/cta_crypto/template.py | 19 ++++++- vnpy/component/cta_grid_trade.py | 69 +++++++++-------------- vnpy/data/renko/readme.md | 25 ++++++++ vnpy/gateway/binancef/binancef_gateway.py | 8 +-- 7 files changed, 88 insertions(+), 50 deletions(-) create mode 100644 kill_celery.sh create mode 100644 vnpy/data/renko/readme.md diff --git a/kill_celery.sh b/kill_celery.sh new file mode 100644 index 00000000..ffd0dc0e --- /dev/null +++ b/kill_celery.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +PROGRAM_NAME=celery + +# Kill old processes +echo "-------PROGRAM_NAME1--------" +ID=`ps -ef | grep $PROGRAM_NAME | grep -v "$0" | grep -v "grep" | grep "python" | awk '{print $2}'` +echo $ID +for id in $ID +do +kill -9 $id +echo "killed $id" +done diff --git a/run_celery.sh b/run_celery.sh index 77ed6253..83577c55 100644 --- a/run_celery.sh +++ b/run_celery.sh @@ -1 +1 @@ -celery -A vnpy.task.celery_app worker --max-tasks-per-child 1 -l debug > tests/celery/worker.log 2>tests/celery/worker-error.log & +celery -A vnpy.task.celery_app worker --max-tasks-per-child 1 -l info > tests/celery/worker.log 2>tests/celery/worker-error.log & diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index ade1323c..c1b9cd74 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -600,7 +600,7 @@ class CtaEngine(BaseEngine): order = self.main_engine.get_order(vt_orderid) if not order: self.write_log(msg=f"撤单失败,找不到委托{vt_orderid}", - strategy_Name=strategy.strategy_name, + strategy_name=strategy.strategy_name, level=logging.ERROR) return False diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index 511182b8..35d1f68d 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -638,6 +638,16 @@ class CtaFutureTemplate(CtaTemplate): self.gt.save() self.display_grids() + def sync_data(self): + """同步更新数据""" + if not self.backtesting: + self.write_log(u'保存k线缓存数据') + self.save_klines_to_cache() + + if self.inited and self.trading: + self.write_log(u'保存policy数据') + self.policy.save() + def get_positions(self): """ 获取策略当前持仓(重构,使用主力合约) @@ -781,7 +791,7 @@ class CtaFutureTemplate(CtaTemplate): grid.open_status = True self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}' + f',volume:{order.volume}') - + self.gt.save() # 网格的所有委托单部分执行完毕 else: old_traded_volume = grid.traded_volume @@ -794,6 +804,7 @@ class CtaFutureTemplate(CtaTemplate): self.write_log(f'剩余委托单号:{grid.order_ids}') # 在策略得活动订单中,移除 + self.write_log(f'移除活动订单:{order.vt_orderid}') self.active_orders.pop(order.vt_orderid, None) def on_order_open_canceled(self, order: OrderData): @@ -1059,9 +1070,10 @@ class CtaFutureTemplate(CtaTemplate): # 发出平多委托 if grid.traded_volume > 0: grid.volume -= grid.traded_volume - grid.volume = round(grid.volume, 7) grid.traded_volume = 0 + grid.volume = round(grid.volume, 7) + if 0 < self.account_pos.long_pos < grid.volume: self.write_error(u'当前{}的多单持仓:{},不满足平仓目标:{},强制降低' .format(self.vt_symbol, @@ -1110,9 +1122,10 @@ class CtaFutureTemplate(CtaTemplate): # 发出cover委托 if grid.traded_volume > 0: grid.volume -= grid.traded_volume - grid.volume = round(grid.volume, 7) grid.traded_volume = 0 + grid.volume = round(grid.volume, 7) + if 0 < abs(self.account_pos.short_pos) < grid.volume: self.write_error(u'当前{}的空单持仓:{},不满足平仓目标:{}, 强制降低' .format(self.vt_symbol, diff --git a/vnpy/component/cta_grid_trade.py b/vnpy/component/cta_grid_trade.py index 58afe8b6..2e45f25c 100644 --- a/vnpy/component/cta_grid_trade.py +++ b/vnpy/component/cta_grid_trade.py @@ -38,48 +38,35 @@ class CtaGrid(object): 包括交易方向,开仓价格,平仓价格,止损价格,开仓状态,平仓状态 """ - def __init__(self, - direction: Direction = None, - open_price: float = 0, - close_price: float = 0, - stop_price: float = 0, - vt_symbol: str = '', - volume: float = 0, - traded_volume: float = 0, - order_status: bool = False, - open_status: bool = False, - close_status: bool = False, - open_time: datetime = None, - order_time: datetime = None, - reuse_count: int = 0, - type: str = '' - ): + def __init__(self, **kwargs ): - self.id: str = str(uuid.uuid1()) # gid - self.direction = direction # 交易方向(LONG:多,正套;SHORT:空,反套) - self.open_price = open_price # 开仓价格 - self.close_price = close_price # 止盈价格 - self.stop_price = stop_price # 止损价格 - self.vt_symbol = vt_symbol # 品种合约 - self.volume = volume # 开仓数量( 兼容数字货币 ) - self.traded_volume = traded_volume # 已成交数量 开仓时,为开仓数量,平仓时,为平仓数量 - self.order_status = order_status # 挂单状态: True,已挂单,False,未挂单 - self.order_ids = [] # order_id list - self.open_status = open_status # 开仓状态 - self.close_status = close_status # 平仓状态 - self.open_time = open_time # 开仓时间 - self.order_time = order_time # 委托时间 - self.lock_grid_ids = [] # 锁单的网格,[gid,gid] - self.reuse_count = reuse_count # 重用次数(0, 平仓后是否删除) - self.type = type # 网格类型标签 - self.snapshot = {} # 切片数据,如记录开仓点时的某些状态数据 + self.id: str = kwargs.get('id', str(uuid.uuid1())) # gid + self.direction = kwargs.get('direction', None) # 交易方向(LONG:多,正套;SHORT:空,反套) + if isinstance(self.direction, str): + self.direction = Direction(self.direction) + self.open_price = kwargs.get('open_price', 0) # 开仓价格 + self.close_price = kwargs.get('close_price', 0) # 止盈价格 + self.stop_price = kwargs.get('stop_price', 0) # 止损价格 + self.vt_symbol = kwargs.get('vt_symbol', '') # 品种合约 + self.volume = kwargs.get('volume', 0.0) # 开仓数量( 兼容数字货币 ) + self.traded_volume = kwargs.get('traded_volume', 0.0) # 已成交数量 开仓时,为开仓数量,平仓时,为平仓数量 + self.order_status = kwargs.get('order_status', False) # 挂单状态: True,已挂单,False,未挂单 + self.order_ids = kwargs.get('order_ids',[]) # order_id list + self.open_status = kwargs.get('open_status', False) # 开仓状态 + self.close_status = kwargs.get('close_status', False) # 平仓状态 + self.open_time = kwargs.get('open_time', None) # 开仓时间 + self.order_time = kwargs.get('order_time', None) # 委托时间 + self.lock_grid_ids = kwargs.get('lock_grid_ids', []) # 锁单的网格,[gid,gid] + self.reuse_count = kwargs.get('reuse_count', 0) # 重用次数(0, 平仓后是否删除) + self.type = kwargs.get('type', '') # 网格类型标签 + self.snapshot = kwargs.get('snapshot',{}) # 切片数据,如记录开仓点时的某些状态数据 def to_json(self): """输出JSON""" j = OrderedDict() j['id'] = self.id - j['direction'] = self.direction.value if self.direction else '' + j['direction'] = self.direction.value if isinstance(self.direction, Direction) else '' j['open_price'] = self.open_price # 开仓价格 j['close_price'] = self.close_price # 平仓价格 j['stop_price'] = self.stop_price # 止损价格 @@ -90,7 +77,7 @@ class CtaGrid(object): j['order_ids'] = self.order_ids # OrderId j['open_status'] = self.open_status # 开仓状态 j['close_status'] = self.close_status # 平仓状态 - j['lockGrids'] = self.lock_grid_ids # 对锁的网格 + j['lock_grid_ids'] = self.lock_grid_ids # 对锁的网格 j['reuse_count'] = self.reuse_count # 是否重用 j['type'] = self.type # 类型 j['snapshot'] = self.snapshot # 切片数据 @@ -124,7 +111,7 @@ class CtaGrid(object): self.vt_symbol = j.get('vt_symbol', '') self.volume = j.get('volume', 0.0) self.traded_volume = j.get('traded_volume', 0.0) # 已交易的合约数量 - self.lock_grid_ids = j.get('lockGrids', []) + self.lock_grid_ids = j.get('lock_grid_ids', []) self.type = j.get('type', '') self.reuse_count = j.get('reuse_count', 0) @@ -900,8 +887,8 @@ class CtaGridTrade(CtaComponent): data[u'up_grids'] = up_grids data[u'dn_grids'] = dn_grids - with open(grid_json_file, 'w') as f: - json_data = json.dumps(data, indent=4) + with open(grid_json_file, 'w', encoding='utf8') as f: + json_data = json.dumps(data, indent=4, ensure_ascii=True) f.write(json_data) self.write_log(u'GrideTrade保存文件{}完成'.format(grid_json_file)) @@ -928,7 +915,7 @@ class CtaGridTrade(CtaComponent): self.write_log(u'{}不存在,新建保存保存'.format(grid_json_file)) try: with open(grid_json_file, 'w') as f: - json_data = json.dumps(data, indent=4) + json_data = json.dumps(data, indent=4, ensure_ascii=True) f.write(json_data) except Exception as ex: self.write_log(u'写入网格文件{}异常:{}'.format(grid_json_file, str(ex))) @@ -951,7 +938,7 @@ class CtaGridTrade(CtaComponent): grids = [] for grid_obj in json_grids: - grid = CtaGrid(grid_obj) + grid = CtaGrid(**grid_obj) self.write_log(grid.to_str()) diff --git a/vnpy/data/renko/readme.md b/vnpy/data/renko/readme.md new file mode 100644 index 00000000..39b136aa --- /dev/null +++ b/vnpy/data/renko/readme.md @@ -0,0 +1,25 @@ +期货砖块图数据 + +1.重建 + 砖图高度,为3,5,10, K3, K5, K10个最小跳动 + 从mongo数据库,获取最后一笔记录得日期/时间 + 从tdx数据源,获取历史分笔数据(开始时间,结束时间/当前时间, 合约代码)=》 Queue + 从Queue=》 renko on Tick + +2. 查询 + 从mongo数据库,获取数据。 + 缓存至本地(数量切片缓存) + 返回K线记录 + + +股票砖图数据 +1.重建 + 砖图高度,为max(价格千分之一,0.01)得3,5,10个最小跳动 + 从mongo数据库,获取最后一笔记录得日期/时间 + 从tdx数据源,获取历史分笔数据(开始时间,结束时间/当前时间, 合约代码)=》 Queue + 从Queue=》 renko on Tick + +2. 查询 + 从mongo数据库,获取数据。 + 缓存至本地(数量切片缓存) + 返回K线记录 diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index a0e1e497..d8f1f71c 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -412,7 +412,7 @@ class BinancefRestApi(RestClient): "side": DIRECTION_VT2BINANCEF[req.direction], "type": ORDERTYPE_VT2BINANCEF[req.type], "price": float(req.price), - "quantity": int(req.volume), + "quantity": round(req.volume, 7), "newClientOrderId": orderid, "newOrderRespType": "ACK" } @@ -513,7 +513,7 @@ class BinancefRestApi(RestClient): self.gateway.write_log(json.dumps(position, indent=2)) self.contracts.update({symbol: position}) - self.gateway.write_log("账户资金查询成功") + # self.gateway.write_log("账户资金查询成功") def on_query_position(self, data: dict, request: Request) -> None: """""" @@ -581,7 +581,7 @@ class BinancefRestApi(RestClient): self.gateway.on_position(long_position) self.gateway.on_position(short_position) - self.gateway.write_log("持仓信息查询成功") + # self.gateway.write_log("持仓信息查询成功") def on_query_order(self, data: dict, request: Request) -> None: """""" @@ -850,7 +850,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): symbol=pos_data["s"], exchange=Exchange.BINANCE, direction=Direction.NET, - volume=int(float(pos_data["pa"])), + volume=round(float(pos_data["pa"]), 7), price=float(pos_data["ep"]), pnl=float(pos_data["cr"]), gateway_name=self.gateway_name,