[增强功能] 国金证券easytrader接口;算法交易引擎增加FAK和限价单支持套利;股票支持策略定时器接口;Cta增强引擎增加锁单功能;融航ctpGateway添加天勤五档行情

This commit is contained in:
msincenselee 2020-07-31 18:25:23 +08:00
parent 42371cc967
commit 17da760de4
45 changed files with 5727 additions and 36 deletions

View File

@ -10,6 +10,19 @@ github 链接: https://github.com/msincenselee/vnpy
gitee 链接: https://gitee.com/vnpy2/vnpy
###Fork版本主要改进如下
16、EasyTrade股票接入国金证券
- vnpy.api.easytrader,
+ 直接使用无需pip install easytrader
+ 任然需要安装组件 pip install -r vnpy/api/easytrader/requirement.txt
- vnpy.gateway.gj 国金证券的gateway
+ 使用了tdx作为股票基础数据
+ 使用了天勤作为行情服务
+ 使用了easytrader的remote_client作为接入.
- prod.stock_qj 运行例子
+ run_es_restful_server.py 放在A机器安装国金全能客户端。
+ run_main_gj01.py 放在B机器运行vn_trader客户端
15、天勤行情接入
- vnpy.data.tq 定制downloder扩展下载字段

View File

@ -0,0 +1,6 @@
{
"资金账号": "----",
"登录密码": "----",
"RPC IP": "192.168.0.201",
"RPC Port": 1430
}

View File

@ -0,0 +1,12 @@
# flake8: noqa
import os
import sys
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.api.easytrader import server
server.run(port=1430)

View File

@ -12,7 +12,7 @@ from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.binancef import BinancefGateway
#from vnpy.gateway.binancef import BinancefGateway
# from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
@ -40,8 +40,10 @@ from vnpy.gateway.binancef import BinancefGateway
# from vnpy.gateway.bitstamp import BitstampGateway
# from vnpy.gateway.gateios import GateiosGateway
# from vnpy.gateway.bybit import BybitGateway
from vnpy.gateway.gj import GjGateway
from vnpy.app.cta_strategy_pro import CtaStrategyProApp
# from vnpy.app.cta_crypto import CtaCryptoApp
from vnpy.app.cta_stock import CtaStockApp
# from vnpy.app.csv_loader import CsvLoaderApp
# from vnpy.app.algo_trading import AlgoTradingApp
# from vnpy.app.cta_backtester import CtaBacktesterApp
@ -62,7 +64,7 @@ def main():
main_engine = MainEngine(event_engine)
main_engine.add_gateway(BinancefGateway, 'binance_future')
main_engine.add_gateway(GjGateway, 'gj01')
#main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(MiniGateway)
@ -92,7 +94,7 @@ def main():
#main_engine.add_gateway(BybitGateway)
#main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaStrategyProApp)
main_engine.add_app(CtaStockApp)
#main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
import urllib3
from .exceptions import *
from .api import use, follower
from .log import logger
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
__version__ = "0.22.0"
__author__ = "shidenggui"

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
import logging
import sys
import six
from .joinquant_follower import JoinQuantFollower
from .log import logger
from .ricequant_follower import RiceQuantFollower
from .xq_follower import XueQiuFollower
from .xqtrader import XueQiuTrader
if sys.version_info <= (3, 5):
raise TypeError("不支持 Python3.5 及以下版本,请升级")
def use(broker, debug=False, **kwargs):
"""用于生成特定的券商对象
:param broker:券商名支持 ['yh_client', '银河客户端'] ['ht_client', '华泰客户端']
:param debug: 控制 debug 日志的显示, 默认为 True
:param initial_assets: [雪球参数] 控制雪球初始资金默认为一百万
:return the class of trader
Usage::
>>> import easytrader
>>> user = easytrader.use('xq')
>>> user.prepare('xq.json')
"""
if debug:
logger.setLevel(logging.DEBUG)
if broker.lower() in ["xq", "雪球"]:
return XueQiuTrader(**kwargs)
if broker.lower() in ["yh_client", "银河客户端"]:
from .yh_clienttrader import YHClientTrader
return YHClientTrader()
if broker.lower() in ["ht_client", "华泰客户端"]:
from .ht_clienttrader import HTClientTrader
return HTClientTrader()
if broker.lower() in ["wk_client", "五矿客户端"]:
from .wk_clienttrader import WKClientTrader
return WKClientTrader()
if broker.lower() in ["htzq_client", "海通证券客户端"]:
from .htzq_clienttrader import HTZQClientTrader
return HTZQClientTrader()
if broker.lower() in ["gj_client", "国金客户端"]:
from .gj_clienttrader import GJClientTrader
return GJClientTrader()
if broker.lower() in ["ths", "同花顺客户端"]:
from .clienttrader import ClientTrader
return ClientTrader()
raise NotImplementedError
def follower(platform, **kwargs):
"""用于生成特定的券商对象
:param platform:平台支持 ['jq', 'joinquant', '聚宽’]
:param initial_assets: [雪球参数] 控制雪球初始资金默认为一万,
总资金由 initial_assets * 组合当前净值 得出
:param total_assets: [雪球参数] 控制雪球总资金无默认值,
若设置则覆盖 initial_assets
:return the class of follower
Usage::
>>> import easytrader
>>> user = easytrader.use('xq')
>>> user.prepare('xq.json')
>>> jq = easytrader.follower('jq')
>>> jq.login(user='username', password='password')
>>> jq.follow(users=user, strategies=['strategies_link'])
"""
if platform.lower() in ["rq", "ricequant", "米筐"]:
return RiceQuantFollower()
if platform.lower() in ["jq", "joinquant", "聚宽"]:
return JoinQuantFollower()
if platform.lower() in ["xq", "xueqiu", "雪球"]:
return XueQiuFollower(**kwargs)
raise NotImplementedError

View File

@ -0,0 +1,545 @@
# -*- coding: utf-8 -*-
import abc
import functools
import logging
import os
import re
import sys
import time
from typing import Type, Union
import easyutils
from pywinauto import findwindows, timings
from . import grid_strategies, pop_dialog_handler, refresh_strategies
from .config import client
from .grid_strategies import IGridStrategy
from .log import logger
from .refresh_strategies import IRefreshStrategy
from .utils.misc import file2dict
from .utils.perf import perf_clock
if not sys.platform.startswith("darwin"):
import pywinauto
import pywinauto.clipboard
class IClientTrader(abc.ABC):
@property
@abc.abstractmethod
def app(self):
"""Return current app instance"""
pass
@property
@abc.abstractmethod
def main(self):
"""Return current main window instance"""
pass
@property
@abc.abstractmethod
def config(self):
"""Return current config instance"""
pass
@abc.abstractmethod
def wait(self, seconds: float):
"""Wait for operation return"""
pass
@abc.abstractmethod
def refresh(self):
"""Refresh data"""
pass
@abc.abstractmethod
def is_exist_pop_dialog(self):
pass
class ClientTrader(IClientTrader):
_editor_need_type_keys = False
# The strategy to use for getting grid data
#grid_strategy: Union[IGridStrategy, Type[IGridStrategy]] = grid_strategies.Copy
grid_strategy: Union[IGridStrategy, Type[IGridStrategy]] = grid_strategies.Xls
_grid_strategy_instance: IGridStrategy = None
refresh_strategy: IRefreshStrategy = refresh_strategies.Switch()
def enable_type_keys_for_editor(self):
"""
有些客户端无法通过 set_edit_text 方法输入内容可以通过使用 type_keys 方法绕过
"""
self._editor_need_type_keys = True
@property
def grid_strategy_instance(self):
if self._grid_strategy_instance is None:
self._grid_strategy_instance = (
self.grid_strategy
if isinstance(self.grid_strategy, IGridStrategy)
else self.grid_strategy()
)
self._grid_strategy_instance.set_trader(self)
return self._grid_strategy_instance
def __init__(self):
self._config = client.create(self.broker_type)
self._app = None
self._main = None
self._toolbar = None
@property
def app(self):
return self._app
@property
def main(self):
return self._main
@property
def config(self):
return self._config
def connect(self, exe_path=None, **kwargs):
"""
直接连接登陆后的客户端
:param exe_path: 客户端路径类似 r'C:\\htzqzyb2\\xiadan.exe', 默认 r'C:\\htzqzyb2\\xiadan.exe'
:return:
"""
connect_path = exe_path or self._config.DEFAULT_EXE_PATH
if connect_path is None:
raise ValueError(
"参数 exe_path 未设置,请设置客户端对应的 exe 地址,类似 C:\\客户端安装目录\\xiadan.exe"
)
self._app = pywinauto.Application().connect(path=connect_path, timeout=10)
self._close_prompt_windows()
self._main = self._app.top_window()
self._init_toolbar()
@property
def broker_type(self):
return "ths"
@property
def balance(self):
self._switch_left_menus(["查询[F4]", "资金股票"])
return self._get_balance_from_statics()
def _init_toolbar(self):
self._toolbar = self._main.child_window(class_name="ToolbarWindow32")
def _get_balance_from_statics(self):
result = {}
for key, control_id in self._config.BALANCE_CONTROL_ID_GROUP.items():
result[key] = float(
self._main.child_window(
control_id=control_id, class_name="Static"
).window_text()
)
return result
@property
def position(self):
self._switch_left_menus(["查询[F4]", "资金股票"])
return self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
@property
def today_entrusts(self):
self._switch_left_menus(["查询[F4]", "当日委托"])
return self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
@property
def today_trades(self):
self._switch_left_menus(["查询[F4]", "当日成交"])
return self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
@property
def cancel_entrusts(self):
self.refresh()
self._switch_left_menus(["撤单[F3]"])
return self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
@perf_clock
def cancel_entrust(self, entrust_no):
self.refresh()
for i, entrust in enumerate(self.cancel_entrusts):
if entrust[self._config.CANCEL_ENTRUST_ENTRUST_FIELD] == entrust_no:
self._cancel_entrust_by_double_click(i)
return self._handle_pop_dialogs()
return {"message": "委托单状态错误不能撤单, 该委托单可能已经成交或者已撤"}
@perf_clock
def repo(self, security, price, amount, **kwargs):
self._switch_left_menus(["债券回购", "融资回购(正回购)"])
return self.trade(security, price, amount)
@perf_clock
def reverse_repo(self, security, price, amount, **kwargs):
self._switch_left_menus(["债券回购", "融劵回购(逆回购)"])
return self.trade(security, price, amount)
@perf_clock
def buy(self, security, price, amount, **kwargs):
self._switch_left_menus(["买入[F1]"])
return self.trade(security, price, amount)
@perf_clock
def sell(self, security, price, amount, **kwargs):
self._switch_left_menus(["卖出[F2]"])
return self.trade(security, price, amount)
@perf_clock
def market_buy(self, security, amount, ttype=None, limit_price=None, **kwargs):
"""
市价买入
:param security: 六位证券代码
:param amount: 交易数量
:param ttype: 市价委托类型默认客户端默认选择
深市可选 ['对手方最优价格', '本方最优价格', '即时成交剩余撤销', '最优五档即时成交剩余 '全额成交或撤销']
沪市可选 ['最优五档成交剩余撤销', '最优五档成交剩余转限价']
:param limit_price: 科创板 限价
:return: {'entrust_no': '委托单号'}
"""
self._switch_left_menus(["市价委托", "买入"])
return self.market_trade(security, amount, ttype, limit_price=limit_price)
@perf_clock
def market_sell(self, security, amount, ttype=None, limit_price=None, **kwargs):
"""
市价卖出
:param security: 六位证券代码
:param amount: 交易数量
:param ttype: 市价委托类型默认客户端默认选择
深市可选 ['对手方最优价格', '本方最优价格', '即时成交剩余撤销', '最优五档即时成交剩余 '全额成交或撤销']
沪市可选 ['最优五档成交剩余撤销', '最优五档成交剩余转限价']
:param limit_price: 科创板 限价
:return: {'entrust_no': '委托单号'}
"""
self._switch_left_menus(["市价委托", "卖出"])
return self.market_trade(security, amount, ttype, limit_price=limit_price)
def market_trade(self, security, amount, ttype=None, limit_price=None, **kwargs):
"""
市价交易
:param security: 六位证券代码
:param amount: 交易数量
:param ttype: 市价委托类型默认客户端默认选择
深市可选 ['对手方最优价格', '本方最优价格', '即时成交剩余撤销', '最优五档即时成交剩余 '全额成交或撤销']
沪市可选 ['最优五档成交剩余撤销', '最优五档成交剩余转限价']
:return: {'entrust_no': '委托单号'}
"""
code = security[-6:]
self._type_edit_control_keys(self._config.TRADE_SECURITY_CONTROL_ID, code)
if ttype is not None:
retry = 0
retry_max = 10
while retry < retry_max:
try:
self._set_market_trade_type(ttype)
break
except:
retry += 1
self.wait(0.1)
self._set_market_trade_params(security, amount, limit_price=limit_price)
self._submit_trade()
return self._handle_pop_dialogs(
handler_class=pop_dialog_handler.TradePopDialogHandler
)
def _set_market_trade_type(self, ttype):
"""根据选择的市价交易类型选择对应的下拉选项"""
selects = self._main.child_window(
control_id=self._config.TRADE_MARKET_TYPE_CONTROL_ID, class_name="ComboBox"
)
for i, text in enumerate(selects.texts()):
# skip 0 index, because 0 index is current select index
if i == 0:
if re.search(ttype, text): # 当前已经选中
return
else:
continue
if re.search(ttype, text):
selects.select(i - 1)
return
raise TypeError("不支持对应的市价类型: {}".format(ttype))
def auto_ipo(self):
self._switch_left_menus(self._config.AUTO_IPO_MENU_PATH)
stock_list = self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
if len(stock_list) == 0:
return {"message": "今日无新股"}
invalid_list_idx = [
i for i, v in enumerate(stock_list) if v[self.config.AUTO_IPO_NUMBER] <= 0
]
if len(stock_list) == len(invalid_list_idx):
return {"message": "没有发现可以申购的新股"}
self._click(self._config.AUTO_IPO_SELECT_ALL_BUTTON_CONTROL_ID)
self.wait(0.1)
for row in invalid_list_idx:
self._click_grid_by_row(row)
self.wait(0.1)
self._click(self._config.AUTO_IPO_BUTTON_CONTROL_ID)
self.wait(0.1)
return self._handle_pop_dialogs()
def _click_grid_by_row(self, row):
x = self._config.COMMON_GRID_LEFT_MARGIN
y = (
self._config.COMMON_GRID_FIRST_ROW_HEIGHT
+ self._config.COMMON_GRID_ROW_HEIGHT * row
)
self._app.top_window().child_window(
control_id=self._config.COMMON_GRID_CONTROL_ID,
class_name="CVirtualGridCtrl",
).click(coords=(x, y))
@perf_clock
def is_exist_pop_dialog(self):
self.wait(0.5) # wait dialog display
try:
return (
self._main.wrapper_object() != self._app.top_window().wrapper_object()
)
except (
findwindows.ElementNotFoundError,
timings.TimeoutError,
RuntimeError,
) as ex:
logger.exception("check pop dialog timeout")
return False
def _run_exe_path(self, exe_path):
return os.path.join(os.path.dirname(exe_path), "xiadan.exe")
def wait(self, seconds):
time.sleep(seconds)
def exit(self):
self._app.kill()
def _close_prompt_windows(self):
self.wait(1)
for window in self._app.windows(class_name="#32770", visible_only=True):
title = window.window_text()
if title != self._config.TITLE:
logging.info("close " + title)
window.close()
self.wait(0.2)
self.wait(1)
def close_pormpt_window_no_wait(self):
for window in self._app.windows(class_name="#32770"):
if window.window_text() != self._config.TITLE:
window.close()
def trade(self, security, price, amount):
self._set_trade_params(security, price, amount)
self._submit_trade()
return self._handle_pop_dialogs(
handler_class=pop_dialog_handler.TradePopDialogHandler
)
def _click(self, control_id):
self._app.top_window().child_window(
control_id=control_id, class_name="Button"
).click()
@perf_clock
def _submit_trade(self):
time.sleep(0.2)
self._main.child_window(
control_id=self._config.TRADE_SUBMIT_CONTROL_ID, class_name="Button"
).click()
@perf_clock
def __get_top_window_pop_dialog(self):
return self._app.top_window().window(
control_id=self._config.POP_DIALOD_TITLE_CONTROL_ID
)
@perf_clock
def _get_pop_dialog_title(self):
return (
self._app.top_window()
.child_window(control_id=self._config.POP_DIALOD_TITLE_CONTROL_ID)
.window_text()
)
def _set_trade_params(self, security, price, amount):
code = security[-6:]
self._type_edit_control_keys(self._config.TRADE_SECURITY_CONTROL_ID, code)
# wait security input finish
self.wait(0.1)
self._type_edit_control_keys(
self._config.TRADE_PRICE_CONTROL_ID,
easyutils.round_price_by_code(price, code),
)
self._type_edit_control_keys(
self._config.TRADE_AMOUNT_CONTROL_ID, str(int(amount))
)
def _set_market_trade_params(self, security, amount, limit_price=None):
self._type_edit_control_keys(
self._config.TRADE_AMOUNT_CONTROL_ID, str(int(amount))
)
self.wait(0.1)
price_control = None
if str(security).startswith("68"): # 科创板存在限价
try:
price_control = self._main.child_window(
control_id=self._config.TRADE_PRICE_CONTROL_ID, class_name="Edit"
)
except:
pass
if price_control is not None:
price_control.set_edit_text(limit_price)
def _get_grid_data(self, control_id):
return self.grid_strategy_instance.get(control_id)
def _type_keys(self, control_id, text):
self._main.child_window(control_id=control_id, class_name="Edit").set_edit_text(
text
)
def _type_edit_control_keys(self, control_id, text):
if not self._editor_need_type_keys:
self._main.child_window(
control_id=control_id, class_name="Edit"
).set_edit_text(text)
else:
editor = self._main.child_window(control_id=control_id, class_name="Edit")
editor.select()
editor.type_keys(text)
def _collapse_left_menus(self):
items = self._get_left_menus_handle().roots()
for item in items:
item.collapse()
@perf_clock
def _switch_left_menus(self, path, sleep=0.2):
self._get_left_menus_handle().get_item(path).click()
self._app.top_window().type_keys('{ESC}')
self._app.top_window().type_keys('{F5}')
self.wait(sleep)
def _switch_left_menus_by_shortcut(self, shortcut, sleep=0.5):
self._app.top_window().type_keys(shortcut)
self.wait(sleep)
@functools.lru_cache()
def _get_left_menus_handle(self):
count = 2
while True:
try:
handle = self._main.child_window(
control_id=129, class_name="SysTreeView32"
)
if count <= 0:
return handle
# sometime can't find handle ready, must retry
handle.wait("ready", 2)
return handle
# pylint: disable=broad-except
except Exception as ex:
logger.exception("error occurred when trying to get left menus")
count = count - 1
def _cancel_entrust_by_double_click(self, row):
x = self._config.CANCEL_ENTRUST_GRID_LEFT_MARGIN
y = (
self._config.CANCEL_ENTRUST_GRID_FIRST_ROW_HEIGHT
+ self._config.CANCEL_ENTRUST_GRID_ROW_HEIGHT * row
)
self._app.top_window().child_window(
control_id=self._config.COMMON_GRID_CONTROL_ID,
class_name="CVirtualGridCtrl",
).double_click(coords=(x, y))
def refresh(self):
self.refresh_strategy.set_trader(self)
self.refresh_strategy.refresh()
@perf_clock
def _handle_pop_dialogs(self, handler_class=pop_dialog_handler.PopDialogHandler):
handler = handler_class(self._app)
while self.is_exist_pop_dialog():
try:
title = self._get_pop_dialog_title()
except pywinauto.findwindows.ElementNotFoundError:
return {"message": "success"}
result = handler.handle(title)
if result:
return result
return {"message": "success"}
class BaseLoginClientTrader(ClientTrader):
@abc.abstractmethod
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""Login Client Trader"""
pass
def prepare(
self,
config_path=None,
user=None,
password=None,
exe_path=None,
comm_password=None,
**kwargs
):
"""
登陆客户端
:param config_path: 登陆配置文件跟参数登陆方式二选一
:param user: 账号
:param password: 明文密码
:param exe_path: 客户端路径类似 r'C:\\htzqzyb2\\xiadan.exe', 默认 r'C:\\htzqzyb2\\xiadan.exe'
:param comm_password: 通讯密码
:return:
"""
if config_path is not None:
account = file2dict(config_path)
user = account["user"]
password = account["password"]
comm_password = account.get("comm_password")
exe_path = account.get("exe_path")
self.login(
user,
password,
exe_path or self._config.DEFAULT_EXE_PATH,
comm_password,
**kwargs
)
self._init_toolbar()

View File

View File

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
def create(broker):
if broker == "yh":
return YH
if broker == "ht":
return HT
if broker == "gj":
return GJ
if broker == "ths":
return CommonConfig
if broker == "wk":
return WK
if broker == "htzq":
return HTZQ
raise NotImplementedError
class CommonConfig:
DEFAULT_EXE_PATH: str = ""
TITLE = "网上股票交易系统5.0"
TRADE_SECURITY_CONTROL_ID = 1032
TRADE_PRICE_CONTROL_ID = 1033
TRADE_AMOUNT_CONTROL_ID = 1034
TRADE_SUBMIT_CONTROL_ID = 1006
TRADE_MARKET_TYPE_CONTROL_ID = 1541
COMMON_GRID_CONTROL_ID = 1047
COMMON_GRID_LEFT_MARGIN = 10
COMMON_GRID_FIRST_ROW_HEIGHT = 30
COMMON_GRID_ROW_HEIGHT = 16
BALANCE_MENU_PATH = ["查询[F4]", "资金股票"]
POSITION_MENU_PATH = ["查询[F4]", "资金股票"]
TODAY_ENTRUSTS_MENU_PATH = ["查询[F4]", "当日委托"]
TODAY_TRADES_MENU_PATH = ["查询[F4]", "当日成交"]
BALANCE_CONTROL_ID_GROUP = {
"资金余额": 1012,
"可用金额": 1016,
"可取金额": 1017,
"股票市值": 1014,
"总资产": 1015,
}
POP_DIALOD_TITLE_CONTROL_ID = 1365
GRID_DTYPE = {
"操作日期": str,
"委托编号": str,
"申请编号": str,
"合同编号": str,
"证券代码": str,
"股东代码": str,
"资金帐号": str,
"资金帐户": str,
"发生日期": str,
}
CANCEL_ENTRUST_ENTRUST_FIELD = "合同编号"
CANCEL_ENTRUST_GRID_LEFT_MARGIN = 50
CANCEL_ENTRUST_GRID_FIRST_ROW_HEIGHT = 30
CANCEL_ENTRUST_GRID_ROW_HEIGHT = 16
AUTO_IPO_SELECT_ALL_BUTTON_CONTROL_ID = 1098
AUTO_IPO_BUTTON_CONTROL_ID = 1006
AUTO_IPO_MENU_PATH = ["新股申购", "批量新股申购"]
AUTO_IPO_NUMBER = '申购数量'
class YH(CommonConfig):
DEFAULT_EXE_PATH = r"C:\双子星-中国银河证券\Binarystar.exe"
BALANCE_GRID_CONTROL_ID = 1308
GRID_DTYPE = {
"操作日期": str,
"委托编号": str,
"申请编号": str,
"合同编号": str,
"证券代码": str,
"股东代码": str,
"资金帐号": str,
"资金帐户": str,
"发生日期": str,
}
AUTO_IPO_MENU_PATH = ["新股申购", "一键打新"]
class HT(CommonConfig):
DEFAULT_EXE_PATH = r"C:\htzqzyb2\xiadan.exe"
BALANCE_CONTROL_ID_GROUP = {
"资金余额": 1012,
"冻结资金": 1013,
"可用金额": 1016,
"可取金额": 1017,
"股票市值": 1014,
"总资产": 1015,
}
GRID_DTYPE = {
"操作日期": str,
"委托编号": str,
"申请编号": str,
"合同编号": str,
"证券代码": str,
"股东代码": str,
"资金帐号": str,
"资金帐户": str,
"发生日期": str,
}
AUTO_IPO_MENU_PATH = ["新股申购", "批量新股申购"]
class GJ(CommonConfig):
DEFAULT_EXE_PATH = "C:\\全能行证券交易终端\\xiadan.exe"
GRID_DTYPE = {
"操作日期": str,
"委托编号": str,
"申请编号": str,
"合同编号": str,
"证券代码": str,
"股东代码": str,
"资金帐号": str,
"资金帐户": str,
"发生日期": str,
}
AUTO_IPO_MENU_PATH = ["新股申购", "新股批量申购"]
class WK(HT):
pass
class HTZQ(CommonConfig):
DEFAULT_EXE_PATH = r"c:\\海通证券委托\\xiadan.exe"
BALANCE_CONTROL_ID_GROUP = {
"资金余额": 1012,
"可用金额": 1016,
"可取金额": 1017,
"总资产": 1015,
}
AUTO_IPO_NUMBER = '可申购数量'

View File

@ -0,0 +1,52 @@
{
"response_format": {
"int": [
"current_amount",
"enable_amount",
"entrust_amount",
"business_amount",
"成交数量",
"撤单数量",
"委托数量",
"股份可用",
"买入冻结",
"卖出冻结",
"当前持仓",
"股份余额"
],
"float": [
"current_balance",
"enable_balance",
"fetch_balance",
"market_value",
"asset_balance",
"av_buy_price",
"cost_price",
"income_balance",
"market_value",
"entrust_price",
"business_price",
"business_balance",
"fare1",
"occur_balance",
"farex",
"fare0",
"occur_amount",
"post_balance",
"fare2",
"fare3",
"资金余额",
"可用资金",
"参考市值",
"总资产",
"股份参考盈亏",
"委托价格",
"成交价格",
"成交金额",
"参考盈亏",
"参考成本价",
"参考市价",
"参考市值"
]
}
}

View File

@ -0,0 +1,9 @@
{
"login_api": "https://xueqiu.com/user/login",
"prefix": "https://xueqiu.com/user/login",
"portfolio_url": "https://xueqiu.com/p/",
"search_stock_url": "https://xueqiu.com/stock/p/search.json",
"rebalance_url": "https://xueqiu.com/cubes/rebalancing/create.json",
"history_url": "https://xueqiu.com/cubes/rebalancing/history.json",
"referer": "https://xueqiu.com/p/update?action=holdings&symbol=%s"
}

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
class TradeError(IOError):
pass
class NotLoginError(Exception):
def __init__(self, result=None):
super(NotLoginError, self).__init__()
self.result = result

View File

@ -0,0 +1,408 @@
# -*- coding: utf-8 -*-
import abc
import datetime
import os
import pickle
import queue
import re
import threading
import time
from typing import List
import requests
from . import exceptions
from .log import logger
class BaseFollower(metaclass=abc.ABCMeta):
"""
slippage: 滑点取值范围为 [0, 1]
"""
LOGIN_PAGE = ""
LOGIN_API = ""
TRANSACTION_API = ""
CMD_CACHE_FILE = "cmd_cache.pk"
WEB_REFERER = ""
WEB_ORIGIN = ""
def __init__(self):
self.trade_queue = queue.Queue()
self.expired_cmds = set()
self.s = requests.Session()
self.s.verify = False
self.slippage: float = 0.0
def login(self, user=None, password=None, **kwargs):
"""
登陆接口
:param user: 用户名
:param password: 密码
:param kwargs: 其他参数
:return:
"""
headers = self._generate_headers()
self.s.headers.update(headers)
# init cookie
self.s.get(self.LOGIN_PAGE)
# post for login
params = self.create_login_params(user, password, **kwargs)
rep = self.s.post(self.LOGIN_API, data=params)
self.check_login_success(rep)
logger.info("登录成功")
def _generate_headers(self):
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.8",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/54.0.2840.100 Safari/537.36",
"Referer": self.WEB_REFERER,
"X-Requested-With": "XMLHttpRequest",
"Origin": self.WEB_ORIGIN,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
return headers
def check_login_success(self, rep):
"""检查登录状态是否成功
:param rep: post login 接口返回的 response 对象
:raise 如果登录失败应该抛出 NotLoginError """
pass
def create_login_params(self, user, password, **kwargs) -> dict:
"""生成 post 登录接口的参数
:param user: 用户名
:param password: 密码
:return dict 登录参数的字典
"""
return {}
def follow(
self,
users,
strategies,
track_interval=1,
trade_cmd_expire_seconds=120,
cmd_cache=True,
slippage: float = 0.0,
**kwargs
):
"""跟踪平台对应的模拟交易,支持多用户多策略
:param users: 支持easytrader的用户对象支持使用 [] 指定多个用户
:param strategies: 雪球组合名, 类似 ZH123450
:param total_assets: 雪球组合对应的总资产 格式 [ 组合1对应资金, 组合2对应资金 ]
strategies=['ZH000001', 'ZH000002'] 设置 total_assets=[10000, 10000], 则表明每个组合对应的资产为 1w
假设组合 ZH000001 加仓 价格为 p 股票 A 10%, 则对应的交易指令为 买入 股票 A 价格 P 股数 1w * 10% / p 并按 100 取整
:param initial_assets:雪球组合对应的初始资产, 格式 [ 组合1对应资金, 组合2对应资金 ]
总资产由 初始资产 × 组合净值 算得 total_assets 会覆盖此参数
:param track_interval: 轮询模拟交易时间单位为秒
:param trade_cmd_expire_seconds: 交易指令过期时间, 单位为秒
:param cmd_cache: 是否读取存储历史执行过的指令防止重启时重复执行已经交易过的指令
:param slippage: 滑点0.0 表示无滑点, 0.05 表示滑点为 5%
"""
self.slippage = slippage
def _calculate_price_by_slippage(self, action: str, price: float) -> float:
"""
计算考虑滑点之后的价格
:param action: 交易动作 支持 ['buy', 'sell']
:param price: 原始交易价格
:return: 考虑滑点后的交易价格
"""
if action == "buy":
return price * (1 + self.slippage)
if action == "sell":
return price * (1 - self.slippage)
return price
def load_expired_cmd_cache(self):
if os.path.exists(self.CMD_CACHE_FILE):
with open(self.CMD_CACHE_FILE, "rb") as f:
self.expired_cmds = pickle.load(f)
def start_trader_thread(
self,
users,
trade_cmd_expire_seconds,
entrust_prop="limit",
send_interval=0,
):
trader = threading.Thread(
target=self.trade_worker,
args=[users],
kwargs={
"expire_seconds": trade_cmd_expire_seconds,
"entrust_prop": entrust_prop,
"send_interval": send_interval,
},
)
trader.setDaemon(True)
trader.start()
@staticmethod
def warp_list(value):
if not isinstance(value, list):
value = [value]
return value
@staticmethod
def extract_strategy_id(strategy_url):
"""
抽取 策略 id一般用于获取策略相关信息
:param strategy_url: 策略 url
:return: str 策略 id
"""
pass
def extract_strategy_name(self, strategy_url):
"""
抽取 策略名主要用于日志打印便于识别
:param strategy_url:
:return: str 策略名
"""
pass
def track_strategy_worker(self, strategy, name, interval=10, **kwargs):
"""跟踪下单worker
:param strategy: 策略id
:param name: 策略名字
:param interval: 轮询策略的时间间隔单位为秒"""
while True:
try:
transactions = self.query_strategy_transaction(
strategy, **kwargs
)
# pylint: disable=broad-except
except Exception as e:
logger.exception("无法获取策略 %s 调仓信息, 错误: %s, 跳过此次调仓查询", name, e)
time.sleep(3)
continue
for transaction in transactions:
trade_cmd = {
"strategy": strategy,
"strategy_name": name,
"action": transaction["action"],
"stock_code": transaction["stock_code"],
"amount": transaction["amount"],
"price": transaction["price"],
"datetime": transaction["datetime"],
}
if self.is_cmd_expired(trade_cmd):
continue
logger.info(
"策略 [%s] 发送指令到交易队列, 股票: %s 动作: %s 数量: %s 价格: %s 信号产生时间: %s",
name,
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
)
self.trade_queue.put(trade_cmd)
self.add_cmd_to_expired_cmds(trade_cmd)
try:
for _ in range(interval):
time.sleep(1)
except KeyboardInterrupt:
logger.info("程序退出")
break
@staticmethod
def generate_expired_cmd_key(cmd):
return "{}_{}_{}_{}_{}_{}".format(
cmd["strategy_name"],
cmd["stock_code"],
cmd["action"],
cmd["amount"],
cmd["price"],
cmd["datetime"],
)
def is_cmd_expired(self, cmd):
key = self.generate_expired_cmd_key(cmd)
return key in self.expired_cmds
def add_cmd_to_expired_cmds(self, cmd):
key = self.generate_expired_cmd_key(cmd)
self.expired_cmds.add(key)
with open(self.CMD_CACHE_FILE, "wb") as f:
pickle.dump(self.expired_cmds, f)
@staticmethod
def _is_number(s):
try:
float(s)
return True
except ValueError:
return False
def _execute_trade_cmd(
self, trade_cmd, users, expire_seconds, entrust_prop, send_interval
):
"""分发交易指令到对应的 user 并执行
:param trade_cmd:
:param users:
:param expire_seconds:
:param entrust_prop:
:param send_interval:
:return:
"""
for user in users:
# check expire
now = datetime.datetime.now()
expire = (now - trade_cmd["datetime"]).total_seconds()
if expire > expire_seconds:
logger.warning(
"策略 [%s] 指令(股票: %s 动作: %s 数量: %s 价格: %s)超时,指令产生时间: %s 当前时间: %s, 超过设置的最大过期时间 %s 秒, 被丢弃",
trade_cmd["strategy_name"],
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
now,
expire_seconds,
)
break
# check price
price = trade_cmd["price"]
if not self._is_number(price) or price <= 0:
logger.warning(
"策略 [%s] 指令(股票: %s 动作: %s 数量: %s 价格: %s)超时,指令产生时间: %s 当前时间: %s, 价格无效 , 被丢弃",
trade_cmd["strategy_name"],
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
now,
)
break
# check amount
if trade_cmd["amount"] <= 0:
logger.warning(
"策略 [%s] 指令(股票: %s 动作: %s 数量: %s 价格: %s)超时,指令产生时间: %s 当前时间: %s, 买入股数无效 , 被丢弃",
trade_cmd["strategy_name"],
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
trade_cmd["price"],
trade_cmd["datetime"],
now,
)
break
actual_price = self._calculate_price_by_slippage(
trade_cmd["action"], trade_cmd["price"]
)
args = {
"security": trade_cmd["stock_code"],
"price": actual_price,
"amount": trade_cmd["amount"],
"entrust_prop": entrust_prop,
}
try:
response = getattr(user, trade_cmd["action"])(**args)
except exceptions.TradeError as e:
trader_name = type(user).__name__
err_msg = "{}: {}".format(type(e).__name__, e.args)
logger.error(
"%s 执行 策略 [%s] 指令(股票: %s 动作: %s 数量: %s 价格(考虑滑点): %s 指令产生时间: %s) 失败, 错误信息: %s",
trader_name,
trade_cmd["strategy_name"],
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
actual_price,
trade_cmd["datetime"],
err_msg,
)
else:
logger.info(
"策略 [%s] 指令(股票: %s 动作: %s 数量: %s 价格(考虑滑点): %s 指令产生时间: %s) 执行成功, 返回: %s",
trade_cmd["strategy_name"],
trade_cmd["stock_code"],
trade_cmd["action"],
trade_cmd["amount"],
actual_price,
trade_cmd["datetime"],
response,
)
def trade_worker(
self, users, expire_seconds=120, entrust_prop="limit", send_interval=0
):
"""
:param send_interval: 交易发送间隔 默认为0s调大可防止卖出买入时买出单没有及时成交导致的买入金额不足
"""
while True:
trade_cmd = self.trade_queue.get()
self._execute_trade_cmd(
trade_cmd, users, expire_seconds, entrust_prop, send_interval
)
time.sleep(send_interval)
def query_strategy_transaction(self, strategy, **kwargs):
params = self.create_query_transaction_params(strategy)
rep = self.s.get(self.TRANSACTION_API, params=params)
history = rep.json()
transactions = self.extract_transactions(history)
self.project_transactions(transactions, **kwargs)
return self.order_transactions_sell_first(transactions)
def extract_transactions(self, history) -> List[str]:
"""
抽取接口返回中的调仓记录列表
:param history: 调仓接口返回信息的字典对象
:return: [] 调参历史记录的列表
"""
return []
def create_query_transaction_params(self, strategy) -> dict:
"""
生成用于查询调参记录的参数
:param strategy: 策略 id
:return: dict 调参记录参数
"""
return {}
@staticmethod
def re_find(pattern, string, dtype=str):
return dtype(re.search(pattern, string).group())
@staticmethod
def re_search(pattern, string, dtype=str):
return dtype(re.search(pattern,string).group(1))
def project_transactions(self, transactions, **kwargs):
"""
修证调仓记录为内部使用的统一格式
:param transactions: [] 调仓记录的列表
:return: [] 修整后的调仓记录
"""
pass
def order_transactions_sell_first(self, transactions):
# 调整调仓记录的顺序为先卖再买
sell_first_transactions = []
for transaction in transactions:
if transaction["action"] == "sell":
sell_first_transactions.insert(0, transaction)
else:
sell_first_transactions.append(transaction)
return sell_first_transactions

View File

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
import re
import tempfile
import time
import pywinauto
import pywinauto.clipboard
from . import clienttrader
from .utils.captcha import recognize_verify_code
class GJClientTrader(clienttrader.BaseLoginClientTrader):
@property
def broker_type(self):
return "gj"
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""
登陆客户端
:param user: 账号
:param password: 明文密码
:param exe_path: 客户端路径类似 'C:\\中国银河证券双子星3.2\\Binarystar.exe',
默认 'C:\\中国银河证券双子星3.2\\Binarystar.exe'
:param comm_password: 通讯密码, 华泰需要可不设
:return:
"""
try:
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=1
)
# pylint: disable=broad-except
except Exception:
self._app = pywinauto.Application().start(exe_path)
# wait login window ready
while True:
try:
self._app.top_window().Edit1.wait("ready")
break
except RuntimeError:
pass
self._app.top_window().Edit1.type_keys(user)
self._app.top_window().Edit2.type_keys(password)
edit3 = self._app.top_window().window(control_id=0x3eb)
while True:
try:
code = self._handle_verify_code()
edit3.type_keys(code)
time.sleep(1)
self._app.top_window()["确定(Y)"].click()
# detect login is success or not
try:
self._app.top_window().wait_not("exists", 5)
break
# pylint: disable=broad-except
except Exception:
self._app.top_window()["确定"].click()
# pylint: disable=broad-except
except Exception:
pass
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=10
)
self._main = self._app.window(title="网上股票交易系统5.0")
def _handle_verify_code(self):
control = self._app.top_window().window(control_id=0x5db)
control.click()
time.sleep(0.2)
file_path = tempfile.mktemp() + ".jpg"
control.capture_as_image().save(file_path)
time.sleep(0.2)
vcode = recognize_verify_code(file_path, "gj_client")
return "".join(re.findall("[a-zA-Z0-9]+", vcode))

View File

@ -0,0 +1,220 @@
# -*- coding: utf-8 -*-
import abc
import io
import tempfile
from io import StringIO
from typing import TYPE_CHECKING, Dict, List, Optional
import pandas as pd
import pywinauto.keyboard
import pywinauto
import pywinauto.clipboard
from .log import logger
from .utils.captcha import captcha_recognize
from .utils.win_gui import SetForegroundWindow, ShowWindow, win32defines
if TYPE_CHECKING:
# pylint: disable=unused-import
from easytrader import clienttrader
class IGridStrategy(abc.ABC):
@abc.abstractmethod
def get(self, control_id: int) -> List[Dict]:
"""
获取 gird 数据并格式化返回
:param control_id: grid control id
:return: grid 数据
"""
pass
@abc.abstractmethod
def set_trader(self, trader: "clienttrader.IClientTrader"):
pass
class BaseStrategy(IGridStrategy):
def __init__(self):
self._trader = None
def set_trader(self, trader: "clienttrader.IClientTrader"):
self._trader = trader
@abc.abstractmethod
def get(self, control_id: int) -> List[Dict]:
"""
:param control_id: grid control id
:return: grid 数据
"""
pass
def _get_grid(self, control_id: int):
grid = self._trader.main.child_window(
control_id=control_id, class_name="CVirtualGridCtrl"
)
return grid
def _set_foreground(self, grid=None):
try:
if grid is None:
grid = self._trader.main
if grid.has_style(win32defines.WS_MINIMIZE): # if minimized
ShowWindow(grid.wrapper_object(), 9) # restore window state
else:
SetForegroundWindow(grid.wrapper_object()) # bring to front
except:
pass
class Copy(BaseStrategy):
"""
通过复制 grid 内容到剪切板再读取来获取 grid 内容
"""
_need_captcha_reg = True
def get(self, control_id: int) -> List[Dict]:
grid = self._get_grid(control_id)
self._set_foreground(grid)
grid.type_keys("^A^C", set_foreground=False)
content = self._get_clipboard_data()
return self._format_grid_data(content)
def _format_grid_data(self, data: str) -> List[Dict]:
try:
df = pd.read_csv(
io.StringIO(data),
delimiter="\t",
dtype=self._trader.config.GRID_DTYPE,
na_filter=False,
)
return df.to_dict("records")
except:
Copy._need_captcha_reg = True
def _get_clipboard_data(self) -> str:
if Copy._need_captcha_reg:
if (
self._trader.app.top_window().window(class_name="Static", title_re="验证码").exists(timeout=1)
):
file_path = "tmp.png"
count = 5
found = False
while count > 0:
self._trader.app.top_window().window(
control_id=0x965, class_name="Static"
).capture_as_image().save(
file_path
) # 保存验证码
captcha_num = captcha_recognize(file_path) # 识别验证码
logger.info("captcha result-->" + captcha_num)
if len(captcha_num) == 4:
self._trader.app.top_window().window(
control_id=0x964, class_name="Edit"
).set_text(
captcha_num
) # 模拟输入验证码
self._trader.app.top_window().set_focus()
pywinauto.keyboard.SendKeys("{ENTER}") # 模拟发送enter点击确定
try:
logger.info(
self._trader.app.top_window()
.window(control_id=0x966, class_name="Static")
.window_text()
)
except Exception as ex: # 窗体消失
logger.exception(ex)
found = True
break
count -= 1
self._trader.wait(0.1)
self._trader.app.top_window().window(
control_id=0x965, class_name="Static"
).click()
if not found:
self._trader.app.top_window().Button2.click() # 点击取消
else:
Copy._need_captcha_reg = False
count = 5
while count > 0:
try:
return pywinauto.clipboard.GetData()
# pylint: disable=broad-except
except Exception as e:
count -= 1
logger.exception("%s, retry ......", e)
class WMCopy(Copy):
"""
通过复制 grid 内容到剪切板再读取来获取 grid 内容
"""
def get(self, control_id: int) -> List[Dict]:
grid = self._get_grid(control_id)
grid.post_message(win32defines.WM_COMMAND, 0xE122, 0)
self._trader.wait(0.1)
content = self._get_clipboard_data()
return self._format_grid_data(content)
class Xls(BaseStrategy):
"""
通过将 Grid 另存为 xls 文件再读取的方式获取 grid 内容
"""
def __init__(self, tmp_folder: Optional[str] = None):
"""
:param tmp_folder: 用于保持临时文件的文件夹
"""
super().__init__()
self.tmp_folder = tmp_folder
def get(self, control_id: int) -> List[Dict]:
grid = self._get_grid(control_id)
# ctrl+s 保存 grid 内容为 xls 文件
self._set_foreground(grid) # setFocus buggy, instead of SetForegroundWindow
grid.type_keys("^s", set_foreground=False)
count = 10
while count > 0:
if self._trader.is_exist_pop_dialog():
break
self._trader.wait(0.2)
count -= 1
temp_path = tempfile.mktemp(suffix=".xls", dir=self.tmp_folder)
self._set_foreground(self._trader.app.top_window())
# alt+s保存alt+y替换已存在的文件
self._trader.app.top_window().Edit1.set_edit_text(temp_path)
self._trader.wait(0.1)
self._trader.app.top_window().type_keys("%{s}%{y}", set_foreground=False)
# Wait until file save complete otherwise pandas can not find file
self._trader.wait(0.2)
if self._trader.is_exist_pop_dialog():
self._trader.app.top_window().Button2.click()
self._trader.wait(0.2)
return self._format_grid_data(temp_path)
def _format_grid_data(self, data: str) -> List[Dict]:
try:
with open(data, encoding="gbk", errors="replace") as f:
content = f.read()
df = pd.read_csv(
StringIO(content),
delimiter="\t",
dtype=self._trader.config.GRID_DTYPE,
na_filter=False,
)
return df.to_dict("records")
except Exception as ex:
print(f'打开文件{data}异常:{str(ex)}')
return []

View File

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
import pywinauto
import pywinauto.clipboard
from . import grid_strategies
from . import clienttrader
class HTClientTrader(clienttrader.BaseLoginClientTrader):
grid_strategy = grid_strategies.Xls
@property
def broker_type(self):
return "ht"
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""
:param user: 用户名
:param password: 密码
:param exe_path: 客户端路径, 类似
:param comm_password:
:param kwargs:
:return:
"""
self._editor_need_type_keys = False
if comm_password is None:
raise ValueError("华泰必须设置通讯密码")
try:
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=1
)
# pylint: disable=broad-except
except Exception:
self._app = pywinauto.Application().start(exe_path)
# wait login window ready
while True:
try:
self._app.top_window().Edit1.wait("ready")
break
except RuntimeError:
pass
self._app.top_window().Edit1.set_focus()
self._app.top_window().Edit1.type_keys(user)
self._app.top_window().Edit2.type_keys(password)
self._app.top_window().Edit3.type_keys(comm_password)
self._app.top_window().button0.click()
# detect login is success or not
self._app.top_window().wait_not("exists", 100)
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=10
)
self._close_prompt_windows()
self._main = self._app.window(title="网上股票交易系统5.0")
@property
def balance(self):
self._switch_left_menus(self._config.BALANCE_MENU_PATH)
return self._get_balance_from_statics()
def _get_balance_from_statics(self):
result = {}
for key, control_id in self._config.BALANCE_CONTROL_ID_GROUP.items():
result[key] = float(
self._main.child_window(
control_id=control_id, class_name="Static"
).window_text()
)
return result

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import pywinauto
import pywinauto.clipboard
from . import grid_strategies
from . import clienttrader
class HTZQClientTrader(clienttrader.BaseLoginClientTrader):
grid_strategy = grid_strategies.Xls
@property
def broker_type(self):
return "htzq"
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""
:param user: 用户名
:param password: 密码
:param exe_path: 客户端路径, 类似
:param comm_password:
:param kwargs:
:return:
"""
self._editor_need_type_keys = False
if comm_password is None:
raise ValueError("必须设置通讯密码")
try:
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=1
)
# pylint: disable=broad-except
except Exception:
self._app = pywinauto.Application().start(exe_path)
# wait login window ready
while True:
try:
self._app.top_window().Edit1.wait("ready")
break
except RuntimeError:
pass
self._app.top_window().Edit1.set_focus()
self._app.top_window().Edit1.type_keys(user)
self._app.top_window().Edit2.type_keys(password)
self._app.top_window().Edit3.type_keys(comm_password)
self._app.top_window().button0.click()
# detect login is success or not
self._app.top_window().wait_not("exists", 100)
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=10
)
self._close_prompt_windows()
self._main = self._app.window(title="网上股票交易系统5.0")

View File

@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from threading import Thread
from . import exceptions
from .follower import BaseFollower
from .log import logger
class JoinQuantFollower(BaseFollower):
LOGIN_PAGE = "https://www.joinquant.com"
LOGIN_API = "https://www.joinquant.com/user/login/doLogin?ajax=1"
TRANSACTION_API = (
"https://www.joinquant.com/algorithm/live/transactionDetail"
)
WEB_REFERER = "https://www.joinquant.com/user/login/index"
WEB_ORIGIN = "https://www.joinquant.com"
def create_login_params(self, user, password, **kwargs):
params = {
"CyLoginForm[username]": user,
"CyLoginForm[pwd]": password,
"ajax": 1,
}
return params
def check_login_success(self, rep):
set_cookie = rep.headers["set-cookie"]
if len(set_cookie) < 50:
raise exceptions.NotLoginError("登录失败,请检查用户名和密码")
self.s.headers.update({"cookie": set_cookie})
def follow(
self,
users,
strategies,
track_interval=1,
trade_cmd_expire_seconds=120,
cmd_cache=True,
entrust_prop="limit",
send_interval=0,
):
"""跟踪joinquant对应的模拟交易支持多用户多策略
:param users: 支持easytrader的用户对象支持使用 [] 指定多个用户
:param strategies: joinquant 的模拟交易地址支持使用 [] 指定多个模拟交易,
地址类似 https://www.joinquant.com/algorithm/live/index?backtestId=xxx
:param track_interval: 轮训模拟交易时间单位为秒
:param trade_cmd_expire_seconds: 交易指令过期时间, 单位为秒
:param cmd_cache: 是否读取存储历史执行过的指令防止重启时重复执行已经交易过的指令
:param entrust_prop: 委托方式, 'limit' 为限价'market' 为市价, 仅在银河实现
:param send_interval: 交易发送间隔 默认为0s调大可防止卖出买入时卖出单没有及时成交导致的买入金额不足
"""
users = self.warp_list(users)
strategies = self.warp_list(strategies)
if cmd_cache:
self.load_expired_cmd_cache()
self.start_trader_thread(
users, trade_cmd_expire_seconds, entrust_prop, send_interval
)
workers = []
for strategy_url in strategies:
try:
strategy_id = self.extract_strategy_id(strategy_url)
strategy_name = self.extract_strategy_name(strategy_url)
except:
logger.error("抽取交易id和策略名失败, 无效的模拟交易url: %s", strategy_url)
raise
strategy_worker = Thread(
target=self.track_strategy_worker,
args=[strategy_id, strategy_name],
kwargs={"interval": track_interval},
)
strategy_worker.start()
workers.append(strategy_worker)
logger.info("开始跟踪策略: %s", strategy_name)
for worker in workers:
worker.join()
# @staticmethod
# def extract_strategy_id(strategy_url):
# return re.search(r"(?<=backtestId=)\w+", strategy_url).group()
#
# def extract_strategy_name(self, strategy_url):
# rep = self.s.get(strategy_url)
# return self.re_find(
# r'(?<=title="点击修改策略名称"\>).*(?=\</span)', rep.content.decode("utf8")
# )
def extract_strategy_id(self, strategy_url):
rep = self.s.get(strategy_url)
return self.re_search(r'name="backtest\[backtestId\]"\s+?value="(.*?)">', rep.content.decode("utf8"))
def extract_strategy_name(self, strategy_url):
rep = self.s.get(strategy_url)
return self.re_search(r'class="backtest_name".+?>(.*?)</span>', rep.content.decode("utf8"))
def create_query_transaction_params(self, strategy):
today_str = datetime.today().strftime("%Y-%m-%d")
params = {"backtestId": strategy, "date": today_str, "ajax": 1}
return params
def extract_transactions(self, history):
transactions = history["data"]["transaction"]
return transactions
@staticmethod
def stock_shuffle_to_prefix(stock):
assert (
len(stock) == 11
), "stock {} must like 123456.XSHG or 123456.XSHE".format(stock)
code = stock[:6]
if stock.find("XSHG") != -1:
return "sh" + code
if stock.find("XSHE") != -1:
return "sz" + code
raise TypeError("not valid stock code: {}".format(code))
def project_transactions(self, transactions, **kwargs):
for transaction in transactions:
transaction["amount"] = self.re_find(
r"\d+", transaction["amount"], dtype=int
)
time_str = "{} {}".format(transaction["date"], transaction["time"])
transaction["datetime"] = datetime.strptime(
time_str, "%Y-%m-%d %H:%M:%S"
)
stock = self.re_find(r"\d{6}\.\w{4}", transaction["stock"])
transaction["stock_code"] = self.stock_shuffle_to_prefix(stock)
transaction["action"] = (
"buy" if transaction["transaction"] == "" else "sell"
)

View File

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
import logging
logger = logging.getLogger("easytrader")
logger.setLevel(logging.INFO)
logger.propagate = False
fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(filename)s %(lineno)s: %(message)s"
)
ch = logging.StreamHandler()
ch.setFormatter(fmt)
logger.handlers.append(ch)

View File

@ -0,0 +1,98 @@
# coding:utf-8
import re
import time
from typing import Optional
from . import exceptions
from .utils.perf import perf_clock
from .utils.win_gui import SetForegroundWindow, ShowWindow, win32defines
class PopDialogHandler:
def __init__(self, app):
self._app = app
@staticmethod
def _set_foreground(window):
if window.has_style(win32defines.WS_MINIMIZE): # if minimized
ShowWindow(window.wrapper_object(), 9) # restore window state
else:
SetForegroundWindow(window.wrapper_object()) # bring to front
@perf_clock
def handle(self, title):
if any(s in title for s in {"提示信息", "委托确认", "网上交易用户协议", "撤单确认"}):
self._submit_by_shortcut()
return None
if "提示" in title:
content = self._extract_content()
self._submit_by_click()
return {"message": content}
content = self._extract_content()
self._close()
return {"message": "unknown message: {}".format(content)}
def _extract_content(self):
return self._app.top_window().Static.window_text()
@staticmethod
def _extract_entrust_id(content):
return re.search(r"[\da-zA-Z]+", content).group()
def _submit_by_click(self):
try:
self._app.top_window()["确定"].click()
except Exception as ex:
self._app.Window_(best_match="Dialog", top_level_only=True).ChildWindow(
best_match="确定"
).click()
def _submit_by_shortcut(self):
self._set_foreground(self._app.top_window())
self._app.top_window().type_keys("%Y", set_foreground=False)
def _close(self):
self._app.top_window().close()
class TradePopDialogHandler(PopDialogHandler):
@perf_clock
def handle(self, title) -> Optional[dict]:
if title == "委托确认":
self._submit_by_shortcut()
return None
if title == "提示信息":
content = self._extract_content()
if "超出涨跌停" in content:
self._submit_by_shortcut()
return None
if "委托价格的小数价格应为" in content:
self._submit_by_shortcut()
return None
if "逆回购" in content:
self._submit_by_shortcut()
return None
if "正回购" in content:
self._submit_by_shortcut()
return None
return None
if title == "提示":
content = self._extract_content()
if "成功" in content:
entrust_no = self._extract_entrust_id(content)
self._submit_by_click()
return {"entrust_no": entrust_no}
self._submit_by_click()
time.sleep(0.05)
raise exceptions.TradeError(content)
self._close()
return None

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
import abc
import io
import tempfile
from io import StringIO
from typing import TYPE_CHECKING, Dict, List, Optional
import pandas as pd
import pywinauto.keyboard
import pywinauto
import pywinauto.clipboard
from .log import logger
from .utils.captcha import captcha_recognize
from .utils.win_gui import SetForegroundWindow, ShowWindow, win32defines
if TYPE_CHECKING:
# pylint: disable=unused-import
from easytrader import clienttrader
class IRefreshStrategy(abc.ABC):
_trader: "clienttrader.ClientTrader"
@abc.abstractmethod
def refresh(self):
"""
刷新数据
"""
pass
def set_trader(self, trader: "clienttrader.ClientTrader"):
self._trader = trader
# noinspection PyProtectedMember
class Switch(IRefreshStrategy):
"""通过切换菜单栏刷新"""
def __init__(self, sleep: float = 0.1):
self.sleep = sleep
def refresh(self):
self._trader._switch_left_menus_by_shortcut("{F5}", sleep=self.sleep)
# noinspection PyProtectedMember
class Toolbar(IRefreshStrategy):
"""通过点击工具栏刷新按钮刷新"""
def __init__(self, refresh_btn_index: int = 4):
"""
:param refresh_btn_index:
交易客户端工具栏中刷新排序默认为第4个请根据自己实际调整
"""
self.refresh_btn_index = refresh_btn_index
def refresh(self):
self._trader._toolbar.button(self.refresh_btn_index - 1).click()

View File

@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
import requests
from .utils.misc import file2dict
def use(broker, host, port=1430, **kwargs):
return RemoteClient(broker, host, port)
class RemoteClient:
def __init__(self, broker, host, port=1430, **kwargs):
self._s = requests.session()
self._api = "http://{}:{}".format(host, port)
self._broker = broker
def prepare(
self,
config_path=None,
user=None,
password=None,
exe_path=None,
comm_password=None,
**kwargs
):
"""
登陆客户端
:param config_path: 登陆配置文件跟参数登陆方式二选一
:param user: 账号
:param password: 明文密码
:param exe_path: 客户端路径类似 r'C:\\htzqzyb2\\xiadan.exe',
默认 r'C:\\htzqzyb2\\xiadan.exe'
:param comm_password: 通讯密码
:return:
"""
params = locals().copy()
params.pop("self")
if config_path is not None:
account = file2dict(config_path)
params["user"] = account["user"]
params["password"] = account["password"]
params["broker"] = self._broker
response = self._s.post(self._api + "/prepare", json=params)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
@property
def balance(self):
return self.common_get("balance")
@property
def position(self):
return self.common_get("position")
@property
def today_entrusts(self):
return self.common_get("today_entrusts")
@property
def today_trades(self):
return self.common_get("today_trades")
@property
def cancel_entrusts(self):
return self.common_get("cancel_entrusts")
def auto_ipo(self):
return self.common_get("auto_ipo")
def exit(self):
return self.common_get("exit")
def common_get(self, endpoint):
response = self._s.get(self._api + "/" + endpoint)
if response.status_code >= 300:
print(Exception(response.json()["error"]))
return response.json()
def buy(self, security, price, amount, **kwargs):
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/buy", json=params)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
def sell(self, security, price, amount, **kwargs):
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/sell", json=params)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
def cancel_entrust(self, entrust_no):
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/cancel_entrust", json=params)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()

View File

@ -0,0 +1,31 @@
-i http://mirrors.aliyun.com/pypi/simple/
--trusted-host mirrors.aliyun.com
beautifulsoup4
bs4
certifi
chardet
click
cssselect
dill
easyutils
flask
idna
itsdangerous
jinja2
lxml
markupsafe
numpy
pandas
pillow
pyperclip
pyquery
pytesseract
python-dateutil
python-xlib
pytz
pywinauto==0.6.6
requests
six
urllib3
werkzeug

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from threading import Thread
from .follower import BaseFollower
from .log import logger
class RiceQuantFollower(BaseFollower):
def __init__(self):
super().__init__()
self.client = None
def login(self, user=None, password=None, **kwargs):
from rqopen_client import RQOpenClient
self.client = RQOpenClient(user, password, logger=logger)
def follow(
self,
users,
run_id,
track_interval=1,
trade_cmd_expire_seconds=120,
cmd_cache=True,
entrust_prop="limit",
send_interval=0,
):
"""跟踪ricequant对应的模拟交易支持多用户多策略
:param users: 支持easytrader的用户对象支持使用 [] 指定多个用户
:param run_id: ricequant 的模拟交易ID支持使用 [] 指定多个模拟交易
:param track_interval: 轮训模拟交易时间单位为秒
:param trade_cmd_expire_seconds: 交易指令过期时间, 单位为秒
:param cmd_cache: 是否读取存储历史执行过的指令防止重启时重复执行已经交易过的指令
:param entrust_prop: 委托方式, 'limit' 为限价'market' 为市价, 仅在银河实现
:param send_interval: 交易发送间隔 默认为0s调大可防止卖出买入时卖出单没有及时成交导致的买入金额不足
"""
users = self.warp_list(users)
run_ids = self.warp_list(run_id)
if cmd_cache:
self.load_expired_cmd_cache()
self.start_trader_thread(
users, trade_cmd_expire_seconds, entrust_prop, send_interval
)
workers = []
for id_ in run_ids:
strategy_name = self.extract_strategy_name(id_)
strategy_worker = Thread(
target=self.track_strategy_worker,
args=[id_, strategy_name],
kwargs={"interval": track_interval},
)
strategy_worker.start()
workers.append(strategy_worker)
logger.info("开始跟踪策略: %s", strategy_name)
for worker in workers:
worker.join()
def extract_strategy_name(self, run_id):
ret_json = self.client.get_positions(run_id)
if ret_json["code"] != 200:
logger.error(
"fetch data from run_id %s fail, msg %s",
run_id,
ret_json["msg"],
)
raise RuntimeError(ret_json["msg"])
return ret_json["resp"]["name"]
def extract_day_trades(self, run_id):
ret_json = self.client.get_day_trades(run_id)
if ret_json["code"] != 200:
logger.error(
"fetch day trades from run_id %s fail, msg %s",
run_id,
ret_json["msg"],
)
raise RuntimeError(ret_json["msg"])
return ret_json["resp"]["trades"]
def query_strategy_transaction(self, strategy, **kwargs):
transactions = self.extract_day_trades(strategy)
transactions = self.project_transactions(transactions, **kwargs)
return self.order_transactions_sell_first(transactions)
@staticmethod
def stock_shuffle_to_prefix(stock):
assert (
len(stock) == 11
), "stock {} must like 123456.XSHG or 123456.XSHE".format(stock)
code = stock[:6]
if stock.find("XSHG") != -1:
return "sh" + code
if stock.find("XSHE") != -1:
return "sz" + code
raise TypeError("not valid stock code: {}".format(code))
def project_transactions(self, transactions, **kwargs):
new_transactions = []
for transaction in transactions:
new_transaction = {}
new_transaction["price"] = transaction["price"]
new_transaction["amount"] = int(abs(transaction["quantity"]))
new_transaction["datetime"] = datetime.strptime(
transaction["time"], "%Y-%m-%d %H:%M:%S"
)
new_transaction["stock_code"] = self.stock_shuffle_to_prefix(
transaction["order_book_id"]
)
new_transaction["action"] = (
"buy" if transaction["quantity"] > 0 else "sell"
)
new_transactions.append(new_transaction)
return new_transactions

View File

@ -0,0 +1,137 @@
import functools
from flask import Flask, jsonify, request
from . import api
from .log import logger
app = Flask(__name__)
global_store = {}
def error_handle(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
# pylint: disable=broad-except
except Exception as e:
logger.exception("server error")
message = "{}: {}".format(e.__class__, e)
return jsonify({"error": message}), 400
return wrapper
@app.route("/prepare", methods=["POST"])
@error_handle
def post_prepare():
json_data = request.get_json(force=True)
user = api.use(json_data.pop("broker"))
user.prepare(**json_data)
global_store["user"] = user
return jsonify({"msg": "login success"}), 201
@app.route("/balance", methods=["GET"])
@error_handle
def get_balance():
print('get balance')
user = global_store["user"]
balance = user.balance
return jsonify(balance), 200
@app.route("/position", methods=["GET"])
@error_handle
def get_position():
print('get position')
user = global_store["user"]
position = user.position
return jsonify(position), 200
@app.route("/auto_ipo", methods=["GET"])
@error_handle
def get_auto_ipo():
user = global_store["user"]
res = user.auto_ipo()
return jsonify(res), 200
@app.route("/today_entrusts", methods=["GET"])
@error_handle
def get_today_entrusts():
user = global_store["user"]
today_entrusts = user.today_entrusts
return jsonify(today_entrusts), 200
@app.route("/today_trades", methods=["GET"])
@error_handle
def get_today_trades():
user = global_store["user"]
today_trades = user.today_trades
return jsonify(today_trades), 200
@app.route("/cancel_entrusts", methods=["GET"])
@error_handle
def get_cancel_entrusts():
user = global_store["user"]
cancel_entrusts = user.cancel_entrusts
return jsonify(cancel_entrusts), 200
@app.route("/buy", methods=["POST"])
@error_handle
def post_buy():
json_data = request.get_json(force=True)
user = global_store["user"]
res = user.buy(**json_data)
return jsonify(res), 201
@app.route("/sell", methods=["POST"])
@error_handle
def post_sell():
json_data = request.get_json(force=True)
user = global_store["user"]
res = user.sell(**json_data)
return jsonify(res), 201
@app.route("/cancel_entrust", methods=["POST"])
@error_handle
def post_cancel_entrust():
json_data = request.get_json(force=True)
user = global_store["user"]
res = user.cancel_entrust(**json_data)
return jsonify(res), 201
@app.route("/exit", methods=["GET"])
@error_handle
def get_exit():
user = global_store["user"]
user.exit()
return jsonify({"msg": "exit success"}), 200
def run(port=1430):
app.run(host="0.0.0.0", port=port)

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,103 @@
import re
import requests
from PIL import Image
from vnpy.api.easytrader import exceptions
def captcha_recognize(img_path):
import pytesseract
im = Image.open(img_path).convert("L")
# 1. threshold the image
threshold = 200
table = []
for i in range(256):
if i < threshold:
table.append(0)
else:
table.append(1)
out = im.point(table, "1")
# 2. recognize with tesseract
num = pytesseract.image_to_string(out)
return num
def recognize_verify_code(image_path, broker="ht"):
"""识别验证码,返回识别后的字符串,使用 tesseract 实现
:param image_path: 图片路径
:param broker: 券商 ['ht', 'yjb', 'gf', 'yh']
:return recognized: verify code string"""
if broker == "gf":
return detect_gf_result(image_path)
if broker in ["yh_client", "gj_client"]:
return detect_yh_client_result(image_path)
# 调用 tesseract 识别
return default_verify_code_detect(image_path)
def detect_yh_client_result(image_path):
"""封装了tesseract的识别部署在阿里云上
服务端源码地址为 https://github.com/shidenggui/yh_verify_code_docker"""
api = "http://yh.ez.shidenggui.com:5000/yh_client"
with open(image_path, "rb") as f:
rep = requests.post(api, files={"image": f})
if rep.status_code != 201:
error = rep.json()["message"]
raise exceptions.TradeError("request {} error: {}".format(api, error))
return rep.json()["result"]
def input_verify_code_manual(image_path):
from PIL import Image
image = Image.open(image_path)
image.show()
code = input(
"image path: {}, input verify code answer:".format(image_path)
)
return code
def default_verify_code_detect(image_path):
from PIL import Image
img = Image.open(image_path)
return invoke_tesseract_to_recognize(img)
def detect_gf_result(image_path):
from PIL import ImageFilter, Image
img = Image.open(image_path)
if hasattr(img, "width"):
width, height = img.width, img.height
else:
width, height = img.size
for x in range(width):
for y in range(height):
if img.getpixel((x, y)) < (100, 100, 100):
img.putpixel((x, y), (256, 256, 256))
gray = img.convert("L")
two = gray.point(lambda p: 0 if 68 < p < 90 else 256)
min_res = two.filter(ImageFilter.MinFilter)
med_res = min_res.filter(ImageFilter.MedianFilter)
for _ in range(2):
med_res = med_res.filter(ImageFilter.MedianFilter)
return invoke_tesseract_to_recognize(med_res)
def invoke_tesseract_to_recognize(img):
import pytesseract
try:
res = pytesseract.image_to_string(img)
except FileNotFoundError:
raise Exception(
"tesseract 未安装,请至 https://github.com/tesseract-ocr/tesseract/wiki 查看安装教程"
)
valid_chars = re.findall("[0-9a-z]", res, re.IGNORECASE)
return "".join(valid_chars)

View File

@ -0,0 +1,31 @@
# coding:utf-8
import json
def parse_cookies_str(cookies):
"""
parse cookies str to dict
:param cookies: cookies str
:type cookies: str
:return: cookie dict
:rtype: dict
"""
cookie_dict = {}
for record in cookies.split(";"):
key, value = record.strip().split("=", 1)
cookie_dict[key] = value
return cookie_dict
def file2dict(path):
with open(path, encoding="utf-8") as f:
return json.load(f)
def grep_comma(num_str):
return num_str.replace(",", "")
def str2num(num_str, convert_type="float"):
num = float(grep_comma(num_str))
return num if convert_type == "float" else int(num)

View File

@ -0,0 +1,46 @@
# coding:utf-8
import functools
import logging
import timeit
from vnpy.api.easytrader import logger
try:
from time import process_time
except:
from time import clock as process_time
def perf_clock(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if not logger.isEnabledFor(logging.DEBUG):
return f(*args, **kwargs)
ts = timeit.default_timer()
cs = process_time()
ex = None
result = None
try:
result = f(*args, **kwargs)
except Exception as ex1:
ex = ex1
te = timeit.default_timer()
ce = process_time()
logger.debug(
"%r consume %2.4f sec, cpu %2.4f sec. args %s, extra args %s"
% (
f.__name__,
te - ts,
ce - cs,
args[1:],
kwargs,
)
)
if ex is not None:
raise ex
return result
return wrapper

View File

@ -0,0 +1,91 @@
# coding:utf-8
import datetime
import json
import random
import requests
def get_stock_type(stock_code):
"""判断股票ID对应的证券市场
匹配规则
['50', '51', '60', '90', '110'] sh
['00', '13', '18', '15', '16', '18', '20', '30', '39', '115'] sz
['5', '6', '9'] 开头的为 sh 其余为 sz
:param stock_code:股票ID, 若以 'sz', 'sh' 开头直接返回对应类型否则使用内置规则判断
:return 'sh' or 'sz'"""
stock_code = str(stock_code)
if stock_code.startswith(("sh", "sz")):
return stock_code[:2]
if stock_code.startswith(
("50", "51", "60", "73", "90", "110", "113", "132", "204", "78")
):
return "sh"
if stock_code.startswith(
("00", "13", "18", "15", "16", "18", "20", "30", "39", "115", "1318")
):
return "sz"
if stock_code.startswith(("5", "6", "9")):
return "sh"
return "sz"
def get_30_date():
"""
获得用于查询的默认日期, 今天的日期, 以及30天前的日期
用于查询的日期格式通常为 20160211
:return:
"""
now = datetime.datetime.now()
end_date = now.date()
start_date = end_date - datetime.timedelta(days=30)
return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
def get_today_ipo_data():
"""
查询今天可以申购的新股信息
:return: 今日可申购新股列表 apply_code申购代码 price发行价格
"""
agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:43.0) Gecko/20100101 Firefox/43.0"
send_headers = {
"Host": "xueqiu.com",
"User-Agent": agent,
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3",
"Accept-Encoding": "deflate",
"Cache-Control": "no-cache",
"X-Requested-With": "XMLHttpRequest",
"Referer": "https://xueqiu.com/hq",
"Connection": "keep-alive",
}
timestamp = random.randint(1000000000000, 9999999999999)
home_page_url = "https://xueqiu.com"
ipo_data_url = (
"https://xueqiu.com/proipo/query.json?column=symbol,name,onl_subcode,onl_subbegdate,actissqty,onl"
"_actissqty,onl_submaxqty,iss_price,onl_lotwiner_stpub_date,onl_lotwinrt,onl_lotwin_amount,stock_"
"income&orderBy=onl_subbegdate&order=desc&stockType=&page=1&size=30&_=%s"
% (str(timestamp))
)
session = requests.session()
session.get(home_page_url, headers=send_headers) # 产生cookies
ipo_response = session.post(ipo_data_url, headers=send_headers)
json_data = json.loads(ipo_response.text)
today_ipo = []
for line in json_data["data"]:
if datetime.datetime.now().strftime("%a %b %d") == line[3][:10]:
today_ipo.append(
{
"stock_code": line[0],
"stock_name": line[1],
"apply_code": line[2],
"price": line[7],
}
)
return today_ipo

View File

@ -0,0 +1,3 @@
# coding:utf-8
from pywinauto import win32defines
from pywinauto.win32functions import SetForegroundWindow, ShowWindow

View File

@ -0,0 +1,245 @@
# -*- coding: utf-8 -*-
import abc
import logging
import os
import re
import time
from threading import Thread
import requests
import requests.exceptions
from . import exceptions
from .log import logger
from .utils.misc import file2dict, str2num
from .utils.stock import get_30_date
# noinspection PyIncorrectDocstring
class WebTrader(metaclass=abc.ABCMeta):
global_config_path = os.path.dirname(__file__) + "/config/global.json"
config_path = ""
def __init__(self, debug=True):
self.__read_config()
self.trade_prefix = self.config["prefix"]
self.account_config = ""
self.heart_active = True
self.heart_thread = Thread(target=self.send_heartbeat)
self.heart_thread.setDaemon(True)
self.log_level = logging.DEBUG if debug else logging.INFO
def read_config(self, path):
try:
self.account_config = file2dict(path)
except ValueError:
logger.error("配置文件格式有误,请勿使用记事本编辑,推荐 sublime text")
for value in self.account_config:
if isinstance(value, int):
logger.warning("配置文件的值最好使用双引号包裹,使用字符串,否则可能导致不可知问题")
def prepare(self, config_file=None, user=None, password=None, **kwargs):
"""登录的统一接口
:param config_file 登录数据文件若无则选择参数登录模式
:param user: 各家券商的账号
:param password: 密码, 券商为加密后的密码
:param cookies: [雪球登录需要]雪球登录需要设置对应的 cookies
:param portfolio_code: [雪球登录需要]组合代码
:param portfolio_market: [雪球登录需要]交易市场
可选['cn', 'us', 'hk'] 默认 'cn'
"""
if config_file is not None:
self.read_config(config_file)
else:
self._prepare_account(user, password, **kwargs)
self.autologin()
def _prepare_account(self, user, password, **kwargs):
"""映射用户名密码到对应的字段"""
raise Exception("支持参数登录需要实现此方法")
def autologin(self, limit=10):
"""实现自动登录
:param limit: 登录次数限制
"""
for _ in range(limit):
if self.login():
break
else:
raise exceptions.NotLoginError(
"登录失败次数过多, 请检查密码是否正确 / 券商服务器是否处于维护中 / 网络连接是否正常"
)
self.keepalive()
def login(self):
pass
def keepalive(self):
"""启动保持在线的进程 """
if self.heart_thread.is_alive():
self.heart_active = True
else:
self.heart_thread.start()
def send_heartbeat(self):
"""每隔10秒查询指定接口保持 token 的有效性"""
while True:
if self.heart_active:
self.check_login()
else:
time.sleep(1)
def check_login(self, sleepy=30):
logger.setLevel(logging.ERROR)
try:
response = self.heartbeat()
self.check_account_live(response)
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.RequestException as e:
logger.setLevel(self.log_level)
logger.error("心跳线程发现账户出现错误: %s %s, 尝试重新登陆", e.__class__, e)
self.autologin()
finally:
logger.setLevel(self.log_level)
time.sleep(sleepy)
def heartbeat(self):
return self.balance
def check_account_live(self, response):
pass
def exit(self):
"""结束保持 token 在线的进程"""
self.heart_active = False
def __read_config(self):
"""读取 config"""
self.config = file2dict(self.config_path)
self.global_config = file2dict(self.global_config_path)
self.config.update(self.global_config)
@property
def balance(self):
return self.get_balance()
def get_balance(self):
"""获取账户资金状况"""
return self.do(self.config["balance"])
@property
def position(self):
return self.get_position()
def get_position(self):
"""获取持仓"""
return self.do(self.config["position"])
@property
def entrust(self):
return self.get_entrust()
def get_entrust(self):
"""获取当日委托列表"""
return self.do(self.config["entrust"])
@property
def current_deal(self):
return self.get_current_deal()
def get_current_deal(self):
"""获取当日委托列表"""
# return self.do(self.config['current_deal'])
logger.warning("目前仅在 佣金宝/银河子类 中实现, 其余券商需要补充")
@property
def exchangebill(self):
"""
默认提供最近30天的交割单, 通常只能返回查询日期内最新的 90 天数据
:return:
"""
# TODO 目前仅在 华泰子类 中实现
start_date, end_date = get_30_date()
return self.get_exchangebill(start_date, end_date)
def get_exchangebill(self, start_date, end_date):
"""
查询指定日期内的交割单
:param start_date: 20160211
:param end_date: 20160211
:return:
"""
logger.warning("目前仅在 华泰子类 中实现, 其余券商需要补充")
def get_ipo_limit(self, stock_code):
"""
查询新股申购额度申购上限
:param stock_code: 申购代码 ID
:return:
"""
logger.warning("目前仅在 佣金宝子类 中实现, 其余券商需要补充")
def do(self, params):
"""发起对 api 的请求并过滤返回结果
:param params: 交易所需的动态参数"""
request_params = self.create_basic_params()
request_params.update(params)
response_data = self.request(request_params)
try:
format_json_data = self.format_response_data(response_data)
# pylint: disable=broad-except
except Exception:
# Caused by server force logged out
return None
return_data = self.fix_error_data(format_json_data)
try:
self.check_login_status(return_data)
except exceptions.NotLoginError:
self.autologin()
return return_data
def create_basic_params(self) -> dict:
"""生成基本的参数"""
return {}
def request(self, params) -> dict:
"""请求并获取 JSON 数据
:param params: Get 参数"""
return {}
def format_response_data(self, data):
"""格式化返回的 json 数据
:param data: 请求返回的数据 """
return data
def fix_error_data(self, data):
"""若是返回错误移除外层的列表
:param data: 需要判断是否包含错误信息的数据"""
return data
def format_response_data_type(self, response_data):
"""格式化返回的值为正确的类型
:param response_data: 返回的数据
"""
if isinstance(response_data, list) and not isinstance(
response_data, str
):
return response_data
int_match_str = "|".join(self.config["response_format"]["int"])
float_match_str = "|".join(self.config["response_format"]["float"])
for item in response_data:
for key in item:
try:
if re.search(int_match_str, key) is not None:
item[key] = str2num(item[key], "int")
elif re.search(float_match_str, key) is not None:
item[key] = str2num(item[key], "float")
except ValueError:
continue
return response_data
def check_login_status(self, return_data):
pass

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
import pywinauto
from .ht_clienttrader import HTClientTrader
class WKClientTrader(HTClientTrader):
@property
def broker_type(self):
return "wk"
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""
:param user: 用户名
:param password: 密码
:param exe_path: 客户端路径, 类似
:param comm_password:
:param kwargs:
:return:
"""
self._editor_need_type_keys = False
if comm_password is None:
raise ValueError("五矿必须设置通讯密码")
try:
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=1
)
# pylint: disable=broad-except
except Exception:
self._app = pywinauto.Application().start(exe_path)
# wait login window ready
while True:
try:
self._app.top_window().Edit1.wait("ready")
break
except RuntimeError:
pass
self._app.top_window().Edit1.set_focus()
self._app.top_window().Edit1.set_edit_text(user)
self._app.top_window().Edit2.set_edit_text(password)
self._app.top_window().Edit3.set_edit_text(comm_password)
self._app.top_window().Button1.click()
# detect login is success or not
self._app.top_window().wait_not("exists", 100)
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=10
)
self._close_prompt_windows()
self._main = self._app.window(title="网上股票交易系统5.0")

View File

@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
from __future__ import division, print_function, unicode_literals
import json
import re
from datetime import datetime
from numbers import Number
from threading import Thread
from .follower import BaseFollower
from .log import logger
from .utils.misc import parse_cookies_str
class XueQiuFollower(BaseFollower):
LOGIN_PAGE = "https://www.xueqiu.com"
LOGIN_API = "https://xueqiu.com/snowman/login"
TRANSACTION_API = "https://xueqiu.com/cubes/rebalancing/history.json"
PORTFOLIO_URL = "https://xueqiu.com/p/"
WEB_REFERER = "https://www.xueqiu.com"
def __init__(self):
super().__init__()
self._adjust_sell = None
self._users = None
def login(self, user=None, password=None, **kwargs):
"""
雪球登陆 需要设置 cookies
:param cookies: 雪球登陆需要设置 cookies 具体见
https://smalltool.github.io/2016/08/02/cookie/
:return:
"""
cookies = kwargs.get("cookies")
if cookies is None:
raise TypeError(
"雪球登陆需要设置 cookies 具体见" "https://smalltool.github.io/2016/08/02/cookie/"
)
headers = self._generate_headers()
self.s.headers.update(headers)
self.s.get(self.LOGIN_PAGE)
cookie_dict = parse_cookies_str(cookies)
self.s.cookies.update(cookie_dict)
logger.info("登录成功")
def follow( # type: ignore
self,
users,
strategies,
total_assets=10000,
initial_assets=None,
adjust_sell=False,
track_interval=10,
trade_cmd_expire_seconds=120,
cmd_cache=True,
slippage: float = 0.0,
):
"""跟踪 joinquant 对应的模拟交易,支持多用户多策略
:param users: 支持 easytrader 的用户对象支持使用 [] 指定多个用户
:param strategies: 雪球组合名, 类似 ZH123450
:param total_assets: 雪球组合对应的总资产 格式 [组合1对应资金, 组合2对应资金]
strategies=['ZH000001', 'ZH000002'],
设置 total_assets=[10000, 10000], 则表明每个组合对应的资产为 1w
假设组合 ZH000001 加仓 价格为 p 股票 A 10%,
则对应的交易指令为 买入 股票 A 价格 P 股数 1w * 10% / p 并按 100 取整
:param adjust_sell: 是否根据用户的实际持仓数调整卖出股票数量
当卖出股票数大于实际持仓数时调整为实际持仓数目前仅在银河客户端测试通过
users 为多个时根据第一个 user 的持仓数决定
:type adjust_sell: bool
:param initial_assets: 雪球组合对应的初始资产,
格式 [ 组合1对应资金, 组合2对应资金 ]
总资产由 初始资产 × 组合净值 算得 total_assets 会覆盖此参数
:param track_interval: 轮训模拟交易时间单位为秒
:param trade_cmd_expire_seconds: 交易指令过期时间, 单位为秒
:param cmd_cache: 是否读取存储历史执行过的指令防止重启时重复执行已经交易过的指令
:param slippage: 滑点0.0 表示无滑点, 0.05 表示滑点为 5%
"""
super().follow(
users=users,
strategies=strategies,
track_interval=track_interval,
trade_cmd_expire_seconds=trade_cmd_expire_seconds,
cmd_cache=cmd_cache,
slippage=slippage,
)
self._adjust_sell = adjust_sell
self._users = self.warp_list(users)
strategies = self.warp_list(strategies)
total_assets = self.warp_list(total_assets)
initial_assets = self.warp_list(initial_assets)
if cmd_cache:
self.load_expired_cmd_cache()
self.start_trader_thread(self._users, trade_cmd_expire_seconds)
for strategy_url, strategy_total_assets, strategy_initial_assets in zip(
strategies, total_assets, initial_assets
):
assets = self.calculate_assets(
strategy_url, strategy_total_assets, strategy_initial_assets
)
try:
strategy_id = self.extract_strategy_id(strategy_url)
strategy_name = self.extract_strategy_name(strategy_url)
except:
logger.error("抽取交易id和策略名失败, 无效模拟交易url: %s", strategy_url)
raise
strategy_worker = Thread(
target=self.track_strategy_worker,
args=[strategy_id, strategy_name],
kwargs={"interval": track_interval, "assets": assets},
)
strategy_worker.start()
logger.info("开始跟踪策略: %s", strategy_name)
def calculate_assets(self, strategy_url, total_assets=None, initial_assets=None):
# 都设置时优先选择 total_assets
if total_assets is None and initial_assets is not None:
net_value = self._get_portfolio_net_value(strategy_url)
total_assets = initial_assets * net_value
if not isinstance(total_assets, Number):
raise TypeError("input assets type must be number(int, float)")
if total_assets < 1e3:
raise ValueError("雪球总资产不能小于1000元当前预设值 {}".format(total_assets))
return total_assets
@staticmethod
def extract_strategy_id(strategy_url):
return strategy_url
def extract_strategy_name(self, strategy_url):
base_url = "https://xueqiu.com/cubes/nav_daily/all.json?cube_symbol={}"
url = base_url.format(strategy_url)
rep = self.s.get(url)
info_index = 0
return rep.json()[info_index]["name"]
def extract_transactions(self, history):
if history["count"] <= 0:
return []
rebalancing_index = 0
raw_transactions = history["list"][rebalancing_index]["rebalancing_histories"]
transactions = []
for transaction in raw_transactions:
if transaction["price"] is None:
logger.info("该笔交易无法获取价格,疑似未成交,跳过。交易详情: %s", transaction)
continue
transactions.append(transaction)
return transactions
def create_query_transaction_params(self, strategy):
params = {"cube_symbol": strategy, "page": 1, "count": 1}
return params
# noinspection PyMethodOverriding
def none_to_zero(self, data):
if data is None:
return 0
return data
# noinspection PyMethodOverriding
def project_transactions(self, transactions, assets):
for transaction in transactions:
weight_diff = self.none_to_zero(transaction["weight"]) - self.none_to_zero(
transaction["prev_weight"]
)
initial_amount = abs(weight_diff) / 100 * assets / transaction["price"]
transaction["datetime"] = datetime.fromtimestamp(
transaction["created_at"] // 1000
)
transaction["stock_code"] = transaction["stock_symbol"].lower()
transaction["action"] = "buy" if weight_diff > 0 else "sell"
transaction["amount"] = int(round(initial_amount, -2))
if transaction["action"] == "sell" and self._adjust_sell:
transaction["amount"] = self._adjust_sell_amount(
transaction["stock_code"], transaction["amount"]
)
def _adjust_sell_amount(self, stock_code, amount):
"""
根据实际持仓值计算雪球卖出股数
因为雪球的交易指令是基于持仓百分比在取近似值的情况下可能出现不精确的问题
导致如下情况的产生计算出的指令为买入 1049 取近似值买入 1000
而卖出的指令计算出为卖出 1051 取近似值卖出 1100 超过 1000 股的买入量
导致卖出失败
:param stock_code: 证券代码
:type stock_code: str
:param amount: 卖出股份数
:type amount: int
:return: 考虑实际持仓之后的卖出股份数
:rtype: int
"""
stock_code = stock_code[-6:]
user = self._users[0]
position = user.position
try:
stock = next(s for s in position if s["证券代码"] == stock_code)
except StopIteration:
logger.info("根据持仓调整 %s 卖出额,发现未持有股票 %s, 不做任何调整", stock_code, stock_code)
return amount
available_amount = stock["可用余额"]
if available_amount >= amount:
return amount
adjust_amount = available_amount // 100 * 100
logger.info(
"股票 %s 实际可用余额 %s, 指令卖出股数为 %s, 调整为 %s",
stock_code,
available_amount,
amount,
adjust_amount,
)
return adjust_amount
def _get_portfolio_info(self, portfolio_code):
"""
获取组合信息
"""
url = self.PORTFOLIO_URL + portfolio_code
portfolio_page = self.s.get(url)
match_info = re.search(r"(?<=SNB.cubeInfo = ).*(?=;\n)", portfolio_page.text)
if match_info is None:
raise Exception("cant get portfolio info, portfolio url : {}".format(url))
try:
portfolio_info = json.loads(match_info.group())
except Exception as e:
raise Exception("get portfolio info error: {}".format(e))
return portfolio_info
def _get_portfolio_net_value(self, portfolio_code):
"""
获取组合信息
"""
portfolio_info = self._get_portfolio_info(portfolio_code)
return portfolio_info["net_value"]

View File

@ -0,0 +1,549 @@
# -*- coding: utf-8 -*-
import json
import numbers
import os
import re
import time
import requests
from . import exceptions, webtrader
from .log import logger
from .utils.misc import parse_cookies_str
class XueQiuTrader(webtrader.WebTrader):
config_path = os.path.dirname(__file__) + "/config/xq.json"
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/64.0.3282.167 Safari/537.36",
"Host": "xueqiu.com",
"Pragma": "no-cache",
"Connection": "keep-alive",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Referer": "https://xueqiu.com/P/ZH004612",
"X-Requested-With": "XMLHttpRequest",
}
def __init__(self, **kwargs):
super(XueQiuTrader, self).__init__()
# 资金换算倍数
self.multiple = (
kwargs["initial_assets"] if "initial_assets" in kwargs else 1000000
)
if not isinstance(self.multiple, numbers.Number):
raise TypeError("initial assets must be number(int, float)")
if self.multiple < 1e3:
raise ValueError("雪球初始资产不能小于1000元当前预设值 {}".format(self.multiple))
self.s = requests.Session()
self.s.verify = False
self.s.headers.update(self._HEADERS)
self.account_config = None
def autologin(self, **kwargs):
"""
使用 cookies 之后不需要自动登陆
:return:
"""
self._set_cookies(self.account_config["cookies"])
def _set_cookies(self, cookies):
"""设置雪球 cookies代码来自于
https://github.com/shidenggui/easytrader/issues/269
:param cookies: 雪球 cookies
:type cookies: str
"""
cookie_dict = parse_cookies_str(cookies)
self.s.cookies.update(cookie_dict)
def _prepare_account(self, user="", password="", **kwargs):
"""
转换参数到登录所需的字典格式
:param cookies: 雪球登陆需要设置 cookies 具体见
https://smalltool.github.io/2016/08/02/cookie/
:param portfolio_code: 组合代码
:param portfolio_market: 交易市场 可选['cn', 'us', 'hk'] 默认 'cn'
:return:
"""
if "portfolio_code" not in kwargs:
raise TypeError("雪球登录需要设置 portfolio_code(组合代码) 参数")
if "portfolio_market" not in kwargs:
kwargs["portfolio_market"] = "cn"
if "cookies" not in kwargs:
raise TypeError(
"雪球登陆需要设置 cookies 具体见"
"https://smalltool.github.io/2016/08/02/cookie/"
)
self.account_config = {
"cookies": kwargs["cookies"],
"portfolio_code": kwargs["portfolio_code"],
"portfolio_market": kwargs["portfolio_market"],
}
def _virtual_to_balance(self, virtual):
"""
虚拟净值转化为资金
:param virtual: 雪球组合净值
:return: 换算的资金
"""
return virtual * self.multiple
def _get_html(self, url):
return self.s.get(url).text
def _search_stock_info(self, code):
"""
通过雪球的接口获取股票详细信息
:param code: 股票代码 000001
:return: 查询到的股票 {u'stock_id': 1000279, u'code': u'SH600325',
u'name': u'华发股份', u'ind_color': u'#d9633b', u'chg': -1.09,
u'ind_id': 100014, u'percent': -9.31, u'current': 10.62,
u'hasexist': None, u'flag': 1, u'ind_name': u'房地产', u'type': None,
u'enName': None}
** flag : 未上市(0)正常(1)停牌(2)涨跌停(3)退市(4)
"""
data = {
"code": str(code),
"size": "300",
"key": "47bce5c74f",
"market": self.account_config["portfolio_market"],
}
r = self.s.get(self.config["search_stock_url"], params=data)
stocks = json.loads(r.text)
stocks = stocks["stocks"]
stock = None
if len(stocks) > 0:
stock = stocks[0]
return stock
def _get_portfolio_info(self, portfolio_code):
"""
获取组合信息
:return: 字典
"""
url = self.config["portfolio_url"] + portfolio_code
html = self._get_html(url)
match_info = re.search(r"(?<=SNB.cubeInfo = ).*(?=;\n)", html)
if match_info is None:
raise Exception(
"cant get portfolio info, portfolio html : {}".format(html)
)
try:
portfolio_info = json.loads(match_info.group())
except Exception as e:
raise Exception("get portfolio info error: {}".format(e))
return portfolio_info
def get_balance(self):
"""
获取账户资金状况
:return:
"""
portfolio_code = self.account_config.get("portfolio_code", "ch")
portfolio_info = self._get_portfolio_info(portfolio_code)
asset_balance = self._virtual_to_balance(
float(portfolio_info["net_value"])
) # 总资产
position = portfolio_info["view_rebalancing"] # 仓位结构
cash = asset_balance * float(position["cash"]) / 100
market = asset_balance - cash
return [
{
"asset_balance": asset_balance,
"current_balance": cash,
"enable_balance": cash,
"market_value": market,
"money_type": u"人民币",
"pre_interest": 0.25,
}
]
def _get_position(self):
"""
获取雪球持仓
:return:
"""
portfolio_code = self.account_config["portfolio_code"]
portfolio_info = self._get_portfolio_info(portfolio_code)
position = portfolio_info["view_rebalancing"] # 仓位结构
stocks = position["holdings"] # 持仓股票
return stocks
@staticmethod
def _time_strftime(time_stamp):
try:
local_time = time.localtime(time_stamp / 1000)
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
# pylint: disable=broad-except
except Exception:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def get_position(self):
"""
获取持仓
:return:
"""
xq_positions = self._get_position()
balance = self.get_balance()[0]
position_list = []
for pos in xq_positions:
volume = pos["weight"] * balance["asset_balance"] / 100
position_list.append(
{
"cost_price": volume / 100,
"current_amount": 100,
"enable_amount": 100,
"income_balance": 0,
"keep_cost_price": volume / 100,
"last_price": volume / 100,
"market_value": volume,
"position_str": "random",
"stock_code": pos["stock_symbol"],
"stock_name": pos["stock_name"],
}
)
return position_list
def _get_xq_history(self):
"""
获取雪球调仓历史
:param instance:
:param owner:
:return:
"""
data = {
"cube_symbol": str(self.account_config["portfolio_code"]),
"count": 20,
"page": 1,
}
resp = self.s.get(self.config["history_url"], params=data)
res = json.loads(resp.text)
return res["list"]
@property
def history(self):
return self._get_xq_history()
def get_entrust(self):
"""
获取委托单(目前返回20次调仓的结果)
操作数量都按1手模拟换算的
:return:
"""
xq_entrust_list = self._get_xq_history()
entrust_list = []
replace_none = lambda s: s or 0
for xq_entrusts in xq_entrust_list:
status = xq_entrusts["status"] # 调仓状态
if status == "pending":
status = "已报"
elif status in ["canceled", "failed"]:
status = "废单"
else:
status = "已成"
for entrust in xq_entrusts["rebalancing_histories"]:
price = entrust["price"]
entrust_list.append(
{
"entrust_no": entrust["id"],
"entrust_bs": u"买入"
if entrust["target_weight"]
> replace_none(entrust["prev_weight"])
else u"卖出",
"report_time": self._time_strftime(
entrust["updated_at"]
),
"entrust_status": status,
"stock_code": entrust["stock_symbol"],
"stock_name": entrust["stock_name"],
"business_amount": 100,
"business_price": price,
"entrust_amount": 100,
"entrust_price": price,
}
)
return entrust_list
def cancel_entrust(self, entrust_no):
"""
对未成交的调仓进行伪撤单
:param entrust_no:
:return:
"""
xq_entrust_list = self._get_xq_history()
is_have = False
for xq_entrusts in xq_entrust_list:
status = xq_entrusts["status"] # 调仓状态
for entrust in xq_entrusts["rebalancing_histories"]:
if entrust["id"] == entrust_no and status == "pending":
is_have = True
buy_or_sell = (
"buy"
if entrust["target_weight"] < entrust["weight"]
else "sell"
)
if (
entrust["target_weight"] == 0
and entrust["weight"] == 0
):
raise exceptions.TradeError(u"移除的股票操作无法撤销,建议重新买入")
balance = self.get_balance()[0]
volume = (
abs(entrust["target_weight"] - entrust["weight"])
* balance["asset_balance"]
/ 100
)
r = self._trade(
security=entrust["stock_symbol"],
volume=volume,
entrust_bs=buy_or_sell,
)
if len(r) > 0 and "error_info" in r[0]:
raise exceptions.TradeError(
u"撤销失败!%s" % ("error_info" in r[0])
)
if not is_have:
raise exceptions.TradeError(u"撤销对象已失效")
return True
def adjust_weight(self, stock_code, weight):
"""
雪球组合调仓, weight 为调整后的仓位比例
:param stock_code: str 股票代码
:param weight: float 调整之后的持仓百分比 0 - 100 之间的浮点数
"""
stock = self._search_stock_info(stock_code)
if stock is None:
raise exceptions.TradeError(u"没有查询要操作的股票信息")
if stock["flag"] != 1:
raise exceptions.TradeError(u"未上市、停牌、涨跌停、退市的股票无法操作。")
# 仓位比例向下取两位数
weight = round(weight, 2)
# 获取原有仓位信息
position_list = self._get_position()
# 调整后的持仓
for position in position_list:
if position["stock_id"] == stock["stock_id"]:
position["proactive"] = True
position["weight"] = weight
if weight != 0 and stock["stock_id"] not in [
k["stock_id"] for k in position_list
]:
position_list.append(
{
"code": stock["code"],
"name": stock["name"],
"enName": stock["enName"],
"hasexist": stock["hasexist"],
"flag": stock["flag"],
"type": stock["type"],
"current": stock["current"],
"chg": stock["chg"],
"percent": str(stock["percent"]),
"stock_id": stock["stock_id"],
"ind_id": stock["ind_id"],
"ind_name": stock["ind_name"],
"ind_color": stock["ind_color"],
"textname": stock["name"],
"segment_name": stock["ind_name"],
"weight": weight,
"url": "/S/" + stock["code"],
"proactive": True,
"price": str(stock["current"]),
}
)
remain_weight = 100 - sum(i.get("weight") for i in position_list)
cash = round(remain_weight, 2)
logger.info("调仓比例:%f, 剩余持仓 :%f", weight, remain_weight)
data = {
"cash": cash,
"holdings": str(json.dumps(position_list)),
"cube_symbol": str(self.account_config["portfolio_code"]),
"segment": "true",
"comment": "",
}
try:
resp = self.s.post(self.config["rebalance_url"], data=data)
# pylint: disable=broad-except
except Exception as e:
logger.warning("调仓失败: %s ", e)
return None
logger.info("调仓 %s: 持仓比例%d", stock["name"], weight)
resp_json = json.loads(resp.text)
if "error_description" in resp_json and resp.status_code != 200:
logger.error("调仓错误: %s", resp_json["error_description"])
return [
{
"error_no": resp_json["error_code"],
"error_info": resp_json["error_description"],
}
]
logger.info("调仓成功 %s: 持仓比例%d", stock["name"], weight)
return None
def _trade(self, security, price=0, amount=0, volume=0, entrust_bs="buy"):
"""
调仓
:param security:
:param price:
:param amount:
:param volume:
:param entrust_bs:
:return:
"""
stock = self._search_stock_info(security)
balance = self.get_balance()[0]
if stock is None:
raise exceptions.TradeError(u"没有查询要操作的股票信息")
if not volume:
volume = int(float(price) * amount) # 可能要取整数
if balance["current_balance"] < volume and entrust_bs == "buy":
raise exceptions.TradeError(u"没有足够的现金进行操作")
if stock["flag"] != 1:
raise exceptions.TradeError(u"未上市、停牌、涨跌停、退市的股票无法操作。")
if volume == 0:
raise exceptions.TradeError(u"操作金额不能为零")
# 计算调仓调仓份额
weight = volume / balance["asset_balance"] * 100
weight = round(weight, 2)
# 获取原有仓位信息
position_list = self._get_position()
# 调整后的持仓
is_have = False
for position in position_list:
if position["stock_id"] == stock["stock_id"]:
is_have = True
position["proactive"] = True
old_weight = position["weight"]
if entrust_bs == "buy":
position["weight"] = weight + old_weight
else:
if weight > old_weight:
raise exceptions.TradeError(u"操作数量大于实际可卖出数量")
else:
position["weight"] = old_weight - weight
position["weight"] = round(position["weight"], 2)
if not is_have:
if entrust_bs == "buy":
position_list.append(
{
"code": stock["code"],
"name": stock["name"],
"enName": stock["enName"],
"hasexist": stock["hasexist"],
"flag": stock["flag"],
"type": stock["type"],
"current": stock["current"],
"chg": stock["chg"],
"percent": str(stock["percent"]),
"stock_id": stock["stock_id"],
"ind_id": stock["ind_id"],
"ind_name": stock["ind_name"],
"ind_color": stock["ind_color"],
"textname": stock["name"],
"segment_name": stock["ind_name"],
"weight": round(weight, 2),
"url": "/S/" + stock["code"],
"proactive": True,
"price": str(stock["current"]),
}
)
else:
raise exceptions.TradeError(u"没有持有要卖出的股票")
if entrust_bs == "buy":
cash = (
(balance["current_balance"] - volume)
/ balance["asset_balance"]
* 100
)
else:
cash = (
(balance["current_balance"] + volume)
/ balance["asset_balance"]
* 100
)
cash = round(cash, 2)
logger.info("weight:%f, cash:%f", weight, cash)
data = {
"cash": cash,
"holdings": str(json.dumps(position_list)),
"cube_symbol": str(self.account_config["portfolio_code"]),
"segment": 1,
"comment": "",
}
try:
resp = self.s.post(self.config["rebalance_url"], data=data)
# pylint: disable=broad-except
except Exception as e:
logger.warning("调仓失败: %s ", e)
return None
else:
logger.info(
"调仓 %s%s: %d", entrust_bs, stock["name"], resp.status_code
)
resp_json = json.loads(resp.text)
if "error_description" in resp_json and resp.status_code != 200:
logger.error("调仓错误: %s", resp_json["error_description"])
return [
{
"error_no": resp_json["error_code"],
"error_info": resp_json["error_description"],
}
]
return [
{
"entrust_no": resp_json["id"],
"init_date": self._time_strftime(resp_json["created_at"]),
"batch_no": "委托批号",
"report_no": "申报号",
"seat_no": "席位编号",
"entrust_time": self._time_strftime(
resp_json["updated_at"]
),
"entrust_price": price,
"entrust_amount": amount,
"stock_code": security,
"entrust_bs": "买入",
"entrust_type": "雪球虚拟委托",
"entrust_status": "-",
}
]
def buy(self, security, price=0, amount=0, volume=0, entrust_prop=0):
"""买入卖出股票
:param security: 股票代码
:param price: 买入价格
:param amount: 买入股数
:param volume: 买入总金额 volume / price 取整 若指定 price 则此参数无效
:param entrust_prop:
"""
return self._trade(security, price, amount, volume, "buy")
def sell(self, security, price=0, amount=0, volume=0, entrust_prop=0):
"""卖出股票
:param security: 股票代码
:param price: 卖出价格
:param amount: 卖出股数
:param volume: 卖出总金额 volume / price 取整 若指定 price 则此参数无效
:param entrust_prop:
"""
return self._trade(security, price, amount, volume, "sell")

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
import re
import tempfile
import pywinauto
from . import clienttrader, grid_strategies
from .utils.captcha import recognize_verify_code
class YHClientTrader(clienttrader.BaseLoginClientTrader):
"""
Changelog:
2018.07.01:
银河客户端 2018.5.11 更新后不再支持通过剪切板复制获取 Grid 内容
改为使用保存为 Xls 再读取的方式获取
"""
grid_strategy = grid_strategies.Xls
@property
def broker_type(self):
return "yh"
def login(self, user, password, exe_path, comm_password=None, **kwargs):
"""
登陆客户端
:param user: 账号
:param password: 明文密码
:param exe_path: 客户端路径类似 'C:\\中国银河证券双子星3.2\\Binarystar.exe',
默认 'C:\\中国银河证券双子星3.2\\Binarystar.exe'
:param comm_password: 通讯密码, 华泰需要可不设
:return:
"""
try:
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=1
)
# pylint: disable=broad-except
except Exception:
self._app = pywinauto.Application().start(exe_path)
is_xiadan = True if "xiadan.exe" in exe_path else False
# wait login window ready
while True:
try:
self._app.top_window().Edit1.wait("ready")
break
except RuntimeError:
pass
self._app.top_window().Edit1.type_keys(user)
self._app.top_window().Edit2.type_keys(password)
while True:
self._app.top_window().Edit3.type_keys(
self._handle_verify_code(is_xiadan)
)
self._app.top_window()["确定" if is_xiadan else "登录"].click()
# detect login is success or not
try:
self._app.top_window().wait_not("exists visible", 10)
break
# pylint: disable=broad-except
except Exception:
if is_xiadan:
self._app.top_window()["确定"].click()
self._app = pywinauto.Application().connect(
path=self._run_exe_path(exe_path), timeout=10
)
self._close_prompt_windows()
self._main = self._app.window(title="网上股票交易系统5.0")
try:
self._main.child_window(
control_id=129, class_name="SysTreeView32"
).wait("ready", 2)
# pylint: disable=broad-except
except Exception:
self.wait(2)
self._switch_window_to_normal_mode()
def _switch_window_to_normal_mode(self):
self._app.top_window().child_window(
control_id=32812, class_name="Button"
).click()
def _handle_verify_code(self, is_xiadan):
control = self._app.top_window().child_window(
control_id=1499 if is_xiadan else 22202
)
control.click()
file_path = tempfile.mktemp()
if is_xiadan:
rect = control.element_info.rectangle
rect.right = round(
rect.right + (rect.right - rect.left) * 0.3
) # 扩展验证码控件截图范围为4个字符
control.capture_as_image(rect).save(file_path, "jpeg")
else:
control.capture_as_image().save(file_path, "jpeg")
verify_code = recognize_verify_code(file_path, "yh_client")
return "".join(re.findall(r"\d+", verify_code))
@property
def balance(self):
self._switch_left_menus(self._config.BALANCE_MENU_PATH)
return self._get_grid_data(self._config.BALANCE_GRID_CONTROL_ID)
def auto_ipo(self):
self._switch_left_menus(self._config.AUTO_IPO_MENU_PATH)
stock_list = self._get_grid_data(self._config.COMMON_GRID_CONTROL_ID)
if len(stock_list) == 0:
return {"message": "今日无新股"}
invalid_list_idx = [
i for i, v in enumerate(stock_list) if v["申购数量"] <= 0
]
if len(stock_list) == len(invalid_list_idx):
return {"message": "没有发现可以申购的新股"}
self.wait(0.1)
# for row in invalid_list_idx:
# self._click_grid_by_row(row)
self._click(self._config.AUTO_IPO_BUTTON_CONTROL_ID)
self.wait(0.1)
return self._handle_pop_dialogs()

View File

@ -187,7 +187,8 @@ class AlgoEngine(BaseEngine):
price: float,
volume: float,
order_type: OrderType,
offset: Offset
offset: Offset,
lock:bool=False
):
""""""
contract = self.main_engine.get_contract(vt_symbol)
@ -208,7 +209,7 @@ class AlgoEngine(BaseEngine):
price=price,
offset=offset
)
req_list = self.offset_converter.convert_order_request(req=original_req, lock=False, gateway_name=contract.gateway_name)
req_list = self.offset_converter.convert_order_request(req=original_req, lock=lock, gateway_name=contract.gateway_name)
vt_orderids = []
for req in req_list:
vt_orderid = self.main_engine.send_order(req, contract.gateway_name)
@ -251,7 +252,8 @@ class AlgoEngine(BaseEngine):
'order_volume': req.volume,
'timer_interval': 60 * 60 * 24,
'strategy_name': req.strategy_name,
'gateway_name': gateway_name
'gateway_name': gateway_name,
'order_type': req.type
}
# 更新算法配置
setting.update(contract)

View File

@ -1,6 +1,6 @@
from vnpy.trader.engine import BaseEngine
from vnpy.trader.object import TickData, OrderData, TradeData
from vnpy.trader.constant import OrderType, Offset, Direction
from vnpy.trader.constant import OrderType, Offset, Direction,Exchange
from vnpy.trader.utility import virtual
@ -114,7 +114,8 @@ class AlgoTemplate:
price,
volume,
order_type: OrderType = OrderType.LIMIT,
offset: Offset = Offset.NONE
offset: Offset = Offset.NONE,
lock: bool = False
):
""""""
if offset in [Offset.CLOSE]:
@ -131,7 +132,8 @@ class AlgoTemplate:
price,
volume,
order_type,
offset
offset,
lock
)
def sell(
@ -140,7 +142,8 @@ class AlgoTemplate:
price,
volume,
order_type: OrderType = OrderType.LIMIT,
offset: Offset = Offset.NONE
offset: Offset = Offset.NONE,
lock: bool = False
):
""""""
if offset in [Offset.NONE, Offset.CLOSE]:
@ -157,7 +160,8 @@ class AlgoTemplate:
price,
volume,
order_type,
offset
offset,
lock
)
def cancel_order(self, vt_orderid: str):

View File

@ -237,6 +237,11 @@ class CtaEngine(BaseEngine):
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
for strategy in self.strategies.values():
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_timer)
def process_tick_event(self, event: Event):
"""处理tick到达事件"""
tick = event.data

View File

@ -11,7 +11,7 @@ from copy import copy,deepcopy
from typing import Any, Callable
from logging import INFO, ERROR
from datetime import datetime
from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType, Color
from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType, Color, Exchange
from vnpy.trader.object import BarData, TickData, OrderData, TradeData
from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol
@ -836,7 +836,7 @@ class CtaProTemplate(CtaTemplate):
if self.position.long_pos > 0:
for g in self.gt.get_opened_grids(direction=Direction.LONG):
vt_symbol = g.snapshot.get('mi_symbol', self.vt_symbol)
vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol)
open_price = g.snapshot.get('open_price', g.open_price)
pos_list.append({'vt_symbol': vt_symbol,
'direction': 'long',
@ -845,7 +845,7 @@ class CtaProTemplate(CtaTemplate):
if abs(self.position.short_pos) > 0:
for g in self.gt.get_opened_grids(direction=Direction.SHORT):
vt_symbol = g.snapshot.get('mi_symbol', self.vt_symbol)
vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol)
open_price = g.snapshot.get('open_price', g.open_price)
pos_list.append({'vt_symbol': vt_symbol,
'direction': 'short',
@ -1202,20 +1202,25 @@ class CtaProFutureTemplate(CtaProTemplate):
self.fix_order(order)
if order.vt_orderid in self.active_orders:
active_order = self.active_orders[order.vt_orderid]
if order.volume == order.traded and order.status in [Status.ALLTRADED]:
self.on_order_all_traded(order)
elif order.offset == Offset.OPEN and order.status in [Status.CANCELLED]:
#elif order.offset == Offset.OPEN and order.status in [Status.CANCELLED]:
# 这里 换成active_order的因为原始order有可能被换成锁仓方式
elif active_order['offset'] == Offset.OPEN and order.status in [Status.CANCELLED]:
# 开仓委托单被撤销
self.on_order_open_canceled(order)
elif order.offset != Offset.OPEN and order.status in [Status.CANCELLED]:
#elif order.offset != Offset.OPEN and order.status in [Status.CANCELLED]:
# # 这里 换成active_order的因为原始order有可能被换成锁仓方式
elif active_order['offset'] != Offset.OPEN and order.status in [Status.CANCELLED]:
# 平仓委托单被撤销
self.on_order_close_canceled(order)
elif order.status == Status.REJECTED:
if order.offset == Offset.OPEN:
if active_order['offset'] == Offset.OPEN:
self.write_error(u'{}委托单开{}被拒price:{},total:{},traded:{}status:{}'
.format(order.vt_symbol, order.direction, order.price, order.volume,
order.traded, order.status))
@ -1238,10 +1243,10 @@ class CtaProFutureTemplate(CtaProTemplate):
:return:
"""
self.write_log(u'委托单全部完成:{}'.format(order.__dict__))
order_info = self.active_orders[order.vt_orderid]
active_order = self.active_orders[order.vt_orderid]
# 通过vt_orderid找到对应的网格
grid = order_info.get('grid', None)
grid = active_order.get('grid', None)
if grid is not None:
# 移除当前委托单
if order.vt_orderid in grid.order_ids:
@ -1253,7 +1258,7 @@ class CtaProFutureTemplate(CtaProTemplate):
grid.traded_volume = 0
# 平仓完毕cover sell
if order.offset != Offset.OPEN:
if active_order['offset'] != Offset.OPEN:
grid.open_status = False
grid.close_status = True

View File

@ -7,7 +7,7 @@ from copy import copy
import bz2
import pickle
import zlib
from vnpy.trader.utility import append_data
from vnpy.trader.utility import append_data, extract_vt_symbol
from .template import (
CtaPosition,
CtaGridTrade,
@ -16,6 +16,7 @@ from .template import (
Direction,
datetime,
Offset,
Exchange,
OrderType,
OrderData,
TradeData,
@ -27,10 +28,14 @@ from .template import (
class CtaSpreadTemplate(CtaTemplate):
"""CTA套利模板"""
activate_fak = False
order_type = OrderType.LIMIT
act_vt_symbol = "" # 主动腿合约
pas_vt_symbol = "" # 被动腿合约
act_symbol = ""
pas_symbol = ""
act_exchange = None
pas_exchange = None
act_vol_ratio = 1
pas_vol_ratio = 1
@ -48,6 +53,8 @@ class CtaSpreadTemplate(CtaTemplate):
""""""
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.parameters.append('activate_fak')
# 基础组件
self.position = CtaPosition(strategy=self)
self.gt = CtaGridTrade(strategy=self)
@ -74,11 +81,13 @@ class CtaSpreadTemplate(CtaTemplate):
"""更新配置参数"""
super().update_setting(setting)
self.act_symbol, self.act_exchange = extract_vt_symbol(self.act_vt_symbol)
self.pas_symbol, self.pas_exchange = extract_vt_symbol(self.pas_vt_symbol)
self.act_price_tick = self.cta_engine.get_price_tick(self.act_vt_symbol)
self.pas_price_tick = self.cta_engine.get_price_tick(self.pas_vt_symbol)
# 实盘采用FAK
if not self.backtesting:
if not self.backtesting and self.activate_fak:
self.order_type = OrderType.FAK
def display_grids(self):
@ -298,6 +307,59 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(f'新增订阅合约:{vt_symbol}')
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
def get_positions(self):
"""
获取策略当前持仓(重构使用主力合约
:return: [{'vt_symbol':symbol,'direction':direction,'volume':volume]
"""
if not self.position:
return []
pos_list = []
for grid in self.gt.get_opened_grids(direction=Direction.LONG):
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume)
act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume)
pas_open_price = grid.snapshot.get('pas_open_price')
pos_list.append({'vt_symbol': act_vt_symbol,
'direction': 'long',
'volume': act_open_volume,
'price': act_open_price})
pos_list.append({'vt_symbol': pas_vt_symbol,
'direction': 'short',
'volume': pas_open_volume,
'price': pas_open_price})
for grid in self.gt.get_opened_grids(direction=Direction.SHORT):
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume)
act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume)
pas_open_price = grid.snapshot.get('pas_open_price')
pos_list.append({'vt_symbol': act_vt_symbol,
'direction': 'short',
'volume': act_open_volume,
'price': act_open_price})
pos_list.append({'vt_symbol': pas_vt_symbol,
'direction': 'long',
'volume': pas_open_volume,
'price': pas_open_price})
if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10:
self.write_log(u'当前持仓:{}'.format(pos_list))
return pos_list
def on_start(self):
"""启动策略(必须由用户继承实现)"""
# 订阅主动腿/被动腿合约
@ -434,20 +496,20 @@ class CtaSpreadTemplate(CtaTemplate):
self.fix_order(order)
if order.vt_orderid in self.active_orders:
active_order = self.active_orders[order.vt_orderid]
if order.volume == order.traded and order.status in [Status.ALLTRADED]:
self.on_order_all_traded(order)
elif order.offset == Offset.OPEN and order.status in [Status.CANCELLED]:
elif active_order['offset'] == Offset.OPEN and order.status in [Status.CANCELLED]:
# 开仓委托单被撤销
self.on_order_open_canceled(order)
elif order.offset != Offset.OPEN and order.status in [Status.CANCELLED]:
elif active_order['offset'] != Offset.OPEN and order.status in [Status.CANCELLED]:
# 平仓委托单被撤销
self.on_order_close_canceled(order)
elif order.status == Status.REJECTED:
if order.offset == Offset.OPEN:
if active_order['offset'] == Offset.OPEN:
self.write_error(u'{}委托单开{}被拒price:{},total:{},traded:{}status:{}'
.format(order.vt_symbol, order.direction, order.price, order.volume,
order.traded, order.status))
@ -517,6 +579,7 @@ class CtaSpreadTemplate(CtaTemplate):
'pas_close_volume': pas_open_volume,
'pas_vt_symbol': self.pas_vt_symbol})
self.gt.save()
def on_order_all_traded(self, order: OrderData):
"""
订单全部成交
@ -593,6 +656,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'{} 委托信息:{}'.format(order.vt_orderid, old_order))
old_order['traded'] = order.traded
order_vt_symbol = copy(old_order['vt_symbol'])
order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol)
order_volume = old_order['volume'] - old_order['traded']
if order_volume <= 0:
msg = u'{} {}{}需重新开仓数量为{},不再开仓' \
@ -657,6 +722,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.buy(price=buy_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange==Exchange.CFFEX,
order_type=OrderType.FAK,
order_time=self.cur_datetime,
grid=grid)
@ -693,6 +759,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.short(price=short_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange==Exchange.CFFEX,
order_type=OrderType.FAK,
order_time=self.cur_datetime,
grid=grid)
@ -745,6 +812,8 @@ class CtaSpreadTemplate(CtaTemplate):
old_order['traded'] = order.traded
# order_time = old_order['order_time']
order_vt_symbol = copy(old_order['vt_symbol'])
order_symbol,order_exchange = extract_vt_symbol(order_vt_symbol)
order_volume = old_order['volume'] - old_order['traded']
if order_volume <= 0:
msg = u'{} {}{}重新平仓数量为{},不再平仓' \
@ -796,6 +865,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.cover(price=cover_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange==Exchange.CFFEX,
order_type=OrderType.FAK,
order_time=self.cur_datetime,
grid=grid)
@ -829,6 +899,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.sell(price=sell_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange==Exchange.CFFEX,
order_type=OrderType.FAK,
order_time=self.cur_datetime,
grid=grid)
@ -882,6 +953,7 @@ class CtaSpreadTemplate(CtaTemplate):
for vt_orderid in list(self.active_orders.keys()):
order_info = self.active_orders[vt_orderid]
order_vt_symbol = order_info.get('vt_symbol', self.vt_symbol)
order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol)
order_time = order_info['order_time']
order_volume = order_info['volume'] - order_info['traded']
order_grid = order_info['grid']
@ -928,6 +1000,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.short(price=short_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange == Exchange.CFFEX,
order_type=order_type,
order_time=self.cur_datetime,
grid=order_grid)
@ -946,6 +1019,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.buy(price=buy_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange == Exchange.CFFEX,
order_type=order_type,
order_time=self.cur_datetime,
grid=order_grid)
@ -965,6 +1039,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.sell(price=sell_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange == Exchange.CFFEX,
order_type=order_type,
order_time=self.cur_datetime,
grid=order_grid)
@ -981,6 +1056,7 @@ class CtaSpreadTemplate(CtaTemplate):
vt_orderids = self.cover(price=cover_price,
volume=order_volume,
vt_symbol=order_vt_symbol,
lock=order_exchange == Exchange.CFFEX,
order_type=order_type,
order_time=self.cur_datetime,
grid=order_grid)
@ -1082,6 +1158,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 开空主动腿
act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange == Exchange.CFFEX,
price=self.cur_act_tick.bid_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
@ -1094,6 +1171,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 开多被动腿
pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
@ -1141,6 +1219,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 开多主动腿
act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange==Exchange.CFFEX,
price=self.cur_act_tick.ask_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
@ -1153,6 +1232,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 开空被动腿
pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
@ -1212,6 +1292,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 主动腿多单平仓
act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange==Exchange.CFFEX,
price=self.cur_act_tick.bid_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
@ -1224,6 +1305,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 被动腿空单平仓
pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
@ -1268,8 +1350,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_error('主动腿/被动退得持仓数据不存在')
return []
act_close_volume = grid.snapshot.get('act_open_volume')
pas_close_volume = grid.snapshot.get('pas_open_volume')
act_close_volume = grid.snapshot.get('act_open_volume', 0)
pas_close_volume = grid.snapshot.get('pas_open_volume', 0)
if self.act_pos.short_pos < act_close_volume:
self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}'
@ -1283,6 +1365,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 主动腿空单平仓
act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange==Exchange.CFFEX,
price=self.cur_act_tick.ask_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
@ -1295,6 +1378,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 被动腿多单平仓
pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,

View File

@ -0,0 +1 @@
from .gj_gateway import GjGateway

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
采用restful方式访问另一台windows机器上的1430端口。
服务运行:
from vnpy.api.easytrader import server
server.run(port=1430)
资金账号和密码通过http request的方式请求无需在服务器中指定
国金证券的'全能行证券交易终端'安装在服务器缺省的目录。

View File

@ -5,6 +5,9 @@ import json
import traceback
from datetime import datetime, timedelta
from copy import copy,deepcopy
from functools import lru_cache
from typing import List
import pandas as pd
from .vnctpmd import MdApi
from .vnctptd import TdApi
@ -47,11 +50,13 @@ from vnpy.trader.constant import (
OrderType,
Product,
Status,
OptionType
OptionType,
Interval
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
BarData,
OrderData,
TradeData,
PositionData,
@ -60,6 +65,7 @@ from vnpy.trader.object import (
OrderRequest,
CancelRequest,
SubscribeRequest,
HistoryRequest
)
from vnpy.trader.utility import (
extract_vt_symbol,
@ -150,6 +156,64 @@ index_contracts = {}
# tdx 期货配置本地缓存
future_contracts = get_future_contracts()
# 时间戳对齐
TIME_GAP = 8 * 60 * 60 * 1000000000
INTERVAL_VT2TQ = {
Interval.MINUTE: 60,
Interval.HOUR: 60 * 60,
Interval.DAILY: 60 * 60 * 24,
}
TQ2VT_TYPE = {
"FUTURE_OPTION": Product.OPTION,
"INDEX": Product.INDEX,
"FUTURE_COMBINE": Product.SPREAD,
"SPOT": Product.SPOT,
"FUTURE_CONT": Product.INDEX,
"FUTURE": Product.FUTURES,
"FUTURE_INDEX": Product.INDEX,
"OPTION": Product.OPTION,
}
@lru_cache(maxsize=9999)
def vt_to_tq_symbol(symbol: str, exchange: Exchange) -> str:
"""
TQSdk exchange first
"""
for count, word in enumerate(symbol):
if word.isdigit():
break
fix_symbol = symbol
if exchange in [Exchange.INE, Exchange.SHFE, Exchange.DCE]:
fix_symbol = symbol.lower()
# Check for index symbol
time_str = symbol[count:]
if time_str in ["88"]:
return f"KQ.m@{exchange.value}.{fix_symbol[:count]}"
if time_str in ["99"]:
return f"KQ.i@{exchange.value}.{fix_symbol[:count]}"
return f"{exchange.value}.{fix_symbol}"
@lru_cache(maxsize=9999)
def tq_to_vt_symbol(tq_symbol: str) -> str:
""""""
if "KQ.m" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}88.{exchange}"
elif "KQ.i" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}99.{exchange}"
else:
exchange, symbol = tq_symbol.split(".")
return f"{symbol}.{exchange}"
class RohonGateway(BaseGateway):
"""
@ -183,6 +247,7 @@ class RohonGateway(BaseGateway):
self.md_api = None
self.tdx_api = None
self.rabbit_api = None
self.tq_api = None
self.subscribed_symbols = set() # 已订阅合约代码
@ -202,7 +267,7 @@ class RohonGateway(BaseGateway):
auth_code = setting["授权编码"]
product_info = setting["产品信息"]
rabbit_dict = setting.get('rabbit', None)
tq_dict = setting.get('tq', None)
if not td_address.startswith("tcp://"):
td_address = "tcp://" + td_address
if not md_address.startswith("tcp://"):
@ -232,22 +297,28 @@ class RohonGateway(BaseGateway):
self.md_api.connect(md_address, userid, password, brokerid)
if rabbit_dict:
self.write_log(f'激活RabbitMQ行情接口')
self.rabbit_api = SubMdApi(gateway=self)
self.rabbit_api.connect(rabbit_dict)
elif tq_dict is not None:
self.write_log(f'激活天勤行情接口')
self.tq_api = TqMdApi(gateway=self)
self.tq_api.connect(tq_dict)
else:
self.write_log(f'激活通达信行情接口')
self.tdx_api = TdxMdApi(gateway=self)
self.tdx_api.connect()
self.init_query()
for (vt_symbol, is_bar) in self.subscribed_symbols:
for (vt_symbol, is_bar) in list(self.subscribed_symbols):
symbol, exchange = extract_vt_symbol(vt_symbol)
req = SubscribeRequest(
symbol=symbol,
exchange=exchange,
is_bar=is_bar
)
# 指数合约从tdx行情订阅
# 指数合约从tdx行情、天勤订阅
if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper()
if self.tdx_api is not None:
@ -255,9 +326,18 @@ class RohonGateway(BaseGateway):
self.tdx_api.connect()
self.tdx_api.subscribe(req)
elif self.rabbit_api is not None:
# 使用rabbitmq获取
self.rabbit_api.subscribe(req)
elif self.tq_api:
# 使用天勤行情获取
self.tq_api.subscribe(req)
else:
self.md_api.subscribe(req)
# 上期所、上能源支持五档行情,使用天勤接口
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用天勤接口订阅')
self.tq_api.subscribe(req)
else:
self.md_api.subscribe(req)
def check_status(self):
"""检查状态"""
@ -332,11 +412,22 @@ class RohonGateway(BaseGateway):
if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper()
if self.tdx_api:
self.write_log(f'使用通达信接口订阅{req.symbol}')
self.tdx_api.subscribe(req)
elif self.rabbit_api:
self.write_log(f'使用RabbitMQ接口订阅{req.symbol}')
self.rabbit_api.subscribe(req)
elif self.tq_api:
self.write_log(f'使用天勤接口订阅{ req.symbol}')
self.tq_api.subscribe(req)
else:
self.md_api.subscribe(req)
# 上期所、上能源支持五档行情,使用天勤接口
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用天勤接口订阅{ req.symbol}')
self.tq_api.subscribe(req)
else:
self.write_log(f'使用CTP接口订阅{req.symbol}')
self.md_api.subscribe(req)
# Allow the strategies to start before the connection
self.subscribed_symbols.add((req.vt_symbol, req.is_bar))
@ -375,6 +466,13 @@ class RohonGateway(BaseGateway):
""""""
self.td_api.query_position()
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""查询K线历史"""
if self.tq_api:
return self.tq_api.query_history(req)
else:
return []
def close(self):
""""""
if self.md_api:
@ -401,6 +499,12 @@ class RohonGateway(BaseGateway):
self.rabbit_api = None
tmp4.close()
if self.tq_api:
self.write_log(u'天勤行情API')
tmp5 = self.tq_api
self.tq_api = None
tmp5.close()
def process_timer_event(self, event):
""""""
self.count += 1
@ -1703,6 +1807,224 @@ class SubMdApi():
self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol)))
class TqMdApi():
"""天勤行情API"""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.api = None
self.is_connected = False
self.subscribe_array = []
# 行情对象列表
self.quote_objs = []
# 数据更新线程
self.update_thread = None
# 所有的合约
self.all_instruments = []
self.ticks = {}
def connect(self, setting):
""""""
try:
from tqsdk import TqApi
self.api = TqApi()
except Exception as e:
self.gateway.write_log(f'天勤行情API接入异常'.format(str(e)))
if self.api:
self.is_connected = True
self.gateway.write_log(f'天勤行情API已连接')
self.update_thread = Thread(target=self.update)
self.update_thread.start()
def generate_tick_from_quote(self, vt_symbol, quote) -> TickData:
"""
生成TickData
"""
# 清洗 nan
quote = {k: 0 if v != v else v for k, v in quote.items()}
symbol, exchange = extract_vt_symbol(vt_symbol)
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"),
name=symbol,
volume=quote["volume"],
open_interest=quote["open_interest"],
last_price=quote["last_price"],
limit_up=quote["upper_limit"],
limit_down=quote["lower_limit"],
open_price=quote["open"],
high_price=quote["highest"],
low_price=quote["lowest"],
pre_close=quote["pre_close"],
bid_price_1=quote["bid_price1"],
bid_price_2=quote["bid_price2"],
bid_price_3=quote["bid_price3"],
bid_price_4=quote["bid_price4"],
bid_price_5=quote["bid_price5"],
ask_price_1=quote["ask_price1"],
ask_price_2=quote["ask_price2"],
ask_price_3=quote["ask_price3"],
ask_price_4=quote["ask_price4"],
ask_price_5=quote["ask_price5"],
bid_volume_1=quote["bid_volume1"],
bid_volume_2=quote["bid_volume2"],
bid_volume_3=quote["bid_volume3"],
bid_volume_4=quote["bid_volume4"],
bid_volume_5=quote["bid_volume5"],
ask_volume_1=quote["ask_volume1"],
ask_volume_2=quote["ask_volume2"],
ask_volume_3=quote["ask_volume3"],
ask_volume_4=quote["ask_volume4"],
ask_volume_5=quote["ask_volume5"],
gateway_name=self.gateway_name
)
if symbol.endswith('99') and tick.ask_price_1 == 0.0 and tick.bid_price_1 == 0.0:
price_tick = quote['price_tick']
if isinstance(price_tick, float) or isinstance(price_tick,int):
tick.ask_price_1 = tick.last_price + price_tick
tick.ask_volume_1 = 1
tick.bid_price_1 = tick.last_price - price_tick
tick.bid_volume_1 = 1
return tick
def update(self) -> None:
"""
更新行情/委托/账户/持仓
"""
while self.api.wait_update():
# 更新行情信息
for vt_symbol, quote in self.quote_objs:
if self.api.is_changing(quote):
tick = self.generate_tick_from_quote(vt_symbol, quote)
if tick:
self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick)
def subscribe(self, req: SubscribeRequest) -> None:
"""
订阅行情
"""
if req.vt_symbol not in self.subscribe_array:
symbol, exchange = extract_vt_symbol(req.vt_symbol)
try:
quote = self.api.get_quote(vt_to_tq_symbol(symbol, exchange))
self.quote_objs.append((req.vt_symbol, quote))
self.subscribe_array.append(req.vt_symbol)
except Exception as ex:
self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex)))
def query_contracts(self) -> None:
""""""
self.all_instruments = [
v for k, v in self.api._data["quotes"].items() if v["expired"] == False
]
for contract in self.all_instruments:
if (
"SSWE" in contract["instrument_id"]
or "CSI" in contract["instrument_id"]
):
# vnpy没有这两个交易所需要可以自行修改vnpy代码
continue
vt_symbol = tq_to_vt_symbol(contract["instrument_id"])
symbol, exchange = extract_vt_symbol(vt_symbol)
if TQ2VT_TYPE[contract["ins_class"]] == Product.OPTION:
contract_data = ContractData(
symbol=symbol,
exchange=exchange,
name=symbol,
product=TQ2VT_TYPE[contract["ins_class"]],
size=contract["volume_multiple"],
pricetick=contract["price_tick"],
history_data=True,
option_strike=contract["strike_price"],
option_underlying=tq_to_vt_symbol(contract["underlying_symbol"]),
option_type=OptionType[contract["option_class"]],
option_expiry=datetime.fromtimestamp(contract["expire_datetime"]),
option_index=tq_to_vt_symbol(contract["underlying_symbol"]),
gateway_name=self.gateway_name,
)
else:
contract_data = ContractData(
symbol=symbol,
exchange=exchange,
name=symbol,
product=TQ2VT_TYPE[contract["ins_class"]],
size=contract["volume_multiple"],
pricetick=contract["price_tick"],
history_data=True,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract_data)
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""
获取历史数据
"""
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
# 天勤需要的数据
tq_symbol = vt_to_tq_symbol(symbol, exchange)
tq_interval = INTERVAL_VT2TQ.get(interval)
end += timedelta(1)
total_days = end - start
# 一次最多只能下载 8964 根Bar
min_length = min(8964, total_days.days * 500)
df = self.api.get_kline_serial(tq_symbol, tq_interval, min_length).sort_values(
by=["datetime"]
)
# 时间戳对齐
df["datetime"] = pd.to_datetime(df["datetime"] + TIME_GAP)
# 过滤开始结束时间
df = df[(df["datetime"] >= start - timedelta(days=1)) & (df["datetime"] < end)]
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row["datetime"].to_pydatetime(),
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("close_oi", 0),
gateway_name=self.gateway_name,
)
data.append(bar)
return data
def close(self) -> None:
""""""
try:
if self.api:
self.api.close()
self.is_connected = False
if self.update_thread:
self.update_thread.join()
except Exception as e:
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)))
class TickCombiner(object):
"""
Tick合成类