diff --git a/tests/trader/run.py b/tests/trader/run.py index 6d38ac31..bb940a5f 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -7,8 +7,8 @@ from vnpy.trader.ui import MainWindow, create_qapp from vnpy.gateway.bitmex import BitmexGateway from vnpy.gateway.futu import FutuGateway from vnpy.gateway.ib import IbGateway -# from vnpy.gateway.ctp import CtpGateway -from vnpy.gateway.femas import FemasGateway +from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.femas import FemasGateway from vnpy.gateway.tiger import TigerGateway from vnpy.gateway.oes import OesGateway from vnpy.gateway.okex import OkexGateway @@ -16,6 +16,7 @@ from vnpy.gateway.huobi import HuobiGateway from vnpy.gateway.bitfinex import BitfinexGateway from vnpy.gateway.onetoken import OnetokenGateway from vnpy.gateway.okexf import OkexfGateway +from vnpy.gateway.xtp import XtpGateway from vnpy.app.cta_strategy import CtaStrategyApp from vnpy.app.csv_loader import CsvLoaderApp @@ -30,8 +31,9 @@ def main(): event_engine = EventEngine() main_engine = MainEngine(event_engine) - # main_engine.add_gateway(CtpGateway) - main_engine.add_gateway(FemasGateway) + main_engine.add_gateway(XtpGateway) + main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(FemasGateway) main_engine.add_gateway(IbGateway) main_engine.add_gateway(FutuGateway) main_engine.add_gateway(BitmexGateway) diff --git a/vnpy/api/xtp/vnxtp.pyd b/vnpy/api/xtp/vnxtp.pyd index 099ec46e..d528b22f 100644 Binary files a/vnpy/api/xtp/vnxtp.pyd and b/vnpy/api/xtp/vnxtp.pyd differ diff --git a/vnpy/api/xtp/vnxtp.pyi b/vnpy/api/xtp/vnxtp.pyi index faca3608..40e0641d 100644 --- a/vnpy/api/xtp/vnxtp.pyi +++ b/vnpy/api/xtp/vnxtp.pyi @@ -7,19 +7,34 @@ if typing.TYPE_CHECKING: from .vnxtp import * +def set_async_callback_exception_handler(handler: Callable[[AsyncDispatchException], None]): + """ + set a customize exception handler for async callback in this module(pyd) + \a handler should return True if it handles that exception, + If the return value of \a handler is not True, exception will be re-thrown. + """ + ... + + +class AsyncDispatchException: + what: str + instance: object + function_name: str + + from . import vnxtp_XTP as XTP class XTPRspInfoStruct(): error_id: int - error_msg: Sequence[int] + error_msg: str class XTPSpecificTickerStruct(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] + ticker: str class XTPMarketDataStockExData(): @@ -71,7 +86,7 @@ class XTPMarketDataStruct(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] + ticker: str last_price: float pre_close_price: float open_price: float @@ -95,7 +110,7 @@ class XTPMarketDataStruct(): bid_qty: Sequence[int] ask_qty: Sequence[int] trades_count: int - ticker_status: Sequence[int] + ticker_status: str stk: XTPMarketDataStockExData opt: XTPMarketDataOptionExData data_type: XTP_MARKETDATA_TYPE @@ -106,8 +121,8 @@ class XTPQuoteStaticInfo(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] - ticker_name: Sequence[int] + ticker: str + ticker_name: str ticker_type: XTP_TICKER_TYPE pre_close_price: float upper_limit_price: float @@ -121,7 +136,7 @@ class OrderBookStruct(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] + ticker: str last_price: float qty: int turnover: float @@ -161,7 +176,7 @@ class XTPTickByTickStruct(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] + ticker: str seq: int data_time: int type: XTP_TBT_TYPE @@ -173,7 +188,7 @@ class XTPTickerPriceInfo(): exchange_id: XTP_EXCHANGE_TYPE - ticker: Sequence[int] + ticker: str last_price: float @@ -182,7 +197,7 @@ class XTPOrderInsertInfo(): order_xtp_id: int order_client_id: int - ticker: Sequence[int] + ticker: str market: XTP_MARKET_TYPE price: float stop_price: float @@ -210,7 +225,7 @@ class XTPOrderInfo(): order_client_id: int order_cancel_client_id: int order_cancel_xtp_id: int - ticker: Sequence[int] + ticker: str market: XTP_MARKET_TYPE price: float quantity: int @@ -227,7 +242,7 @@ class XTPOrderInfo(): update_time: int cancel_time: int trade_amount: float - order_local_id: Sequence[int] + order_local_id: str order_status: XTP_ORDER_STATUS_TYPE order_submit_status: XTP_ORDER_SUBMIT_STATUS_TYPE order_type: int @@ -238,16 +253,16 @@ class XTPTradeReport(): order_xtp_id: int order_client_id: int - ticker: Sequence[int] + ticker: str market: XTP_MARKET_TYPE local_order_id: int - exec_id: Sequence[int] + exec_id: str price: float quantity: int trade_time: int trade_amount: float report_index: int - order_exch_id: Sequence[int] + order_exch_id: str trade_type: int u32: int side: int @@ -255,13 +270,13 @@ class XTPTradeReport(): reserved1: int reserved2: int business_type: XTP_BUSINESS_TYPE - branch_pbu: Sequence[int] + branch_pbu: str class XTPQueryOrderReq(): - ticker: Sequence[int] + ticker: str begin_time: int end_time: int @@ -270,13 +285,13 @@ class XTPQueryReportByExecIdReq(): order_xtp_id: int - exec_id: Sequence[int] + exec_id: str class XTPQueryTraderReq(): - ticker: Sequence[int] + ticker: str begin_time: int end_time: int @@ -311,8 +326,8 @@ class XTPQueryAssetRsp(): class XTPQueryStkPositionRsp(): - ticker: Sequence[int] - ticker_name: Sequence[int] + ticker: str + ticker_name: str market: XTP_MARKET_TYPE total_qty: int sellable_qty: int @@ -350,17 +365,17 @@ class XTPQueryStructuredFundInfoReq(): exchange_id: XTP_EXCHANGE_TYPE - sf_ticker: Sequence[int] + sf_ticker: str class XTPStructuredFundInfo(): exchange_id: XTP_EXCHANGE_TYPE - sf_ticker: Sequence[int] - sf_ticker_name: Sequence[int] - ticker: Sequence[int] - ticker_name: Sequence[int] + sf_ticker: str + sf_ticker_name: str + ticker: str + ticker_name: str split_merge_status: XTP_SPLIT_MERGE_STATUS ratio: int min_split_qty: int @@ -372,15 +387,15 @@ class XTPQueryETFBaseReq(): market: XTP_MARKET_TYPE - ticker: Sequence[int] + ticker: str class XTPQueryETFBaseRsp(): market: XTP_MARKET_TYPE - etf: Sequence[int] - subscribe_redemption_ticker: Sequence[int] + etf: str + subscribe_redemption_ticker: str unit: int subscribe_status: int redemption_status: int @@ -395,16 +410,16 @@ class XTPQueryETFComponentReq(): market: XTP_MARKET_TYPE - ticker: Sequence[int] + ticker: str class XTPQueryETFComponentRsp(): market: XTP_MARKET_TYPE - ticker: Sequence[int] - component_ticker: Sequence[int] - component_name: Sequence[int] + ticker: str + component_ticker: str + component_name: str quantity: int component_market: XTP_MARKET_TYPE replace_type: ETF_REPLACE_TYPE @@ -416,8 +431,8 @@ class XTPQueryIPOTickerRsp(): market: XTP_MARKET_TYPE - ticker: Sequence[int] - ticker_name: Sequence[int] + ticker: str + ticker_name: str price: float unit: int qty_upper_limit: int @@ -434,17 +449,17 @@ class XTPQueryOptionAuctionInfoReq(): market: XTP_MARKET_TYPE - ticker: Sequence[int] + ticker: str class XTPQueryOptionAuctionInfoRsp(): - ticker: Sequence[int] + ticker: str security_id_source: XTP_MARKET_TYPE - symbol: Sequence[int] - contract_id: Sequence[int] - underlying_security_id: Sequence[int] + symbol: str + contract_id: str + underlying_security_id: str underlying_security_id_source: XTP_MARKET_TYPE list_date: int last_trade_date: int @@ -483,8 +498,8 @@ class XTPFundTransferReq(): serial_id: int - fund_account: Sequence[int] - password: Sequence[int] + fund_account: str + password: str amount: float transfer_type: XTP_FUND_TRANSFER_TYPE @@ -610,7 +625,7 @@ class XTP_POSITION_DIRECTION_TYPE(Enum): class XTP_MARKETDATA_TYPE(Enum): XTP_MARKETDATA_ACTUAL: XTP_MARKETDATA_TYPE XTP_MARKETDATA_OPTION: XTP_MARKETDATA_TYPE -XTPVersionType = Sequence[int] +XTPVersionType = str XTP_LOG_LEVEL = XTP_LOG_LEVEL XTP_PROTOCOL_TYPE = XTP_PROTOCOL_TYPE XTP_EXCHANGE_TYPE = XTP_EXCHANGE_TYPE diff --git a/vnpy/api/xtp/vnxtp/generated_files/generated_functions_0.cpp b/vnpy/api/xtp/vnxtp/generated_files/generated_functions_0.cpp index 50ee5235..b0dc588f 100644 --- a/vnpy/api/xtp/vnxtp/generated_files/generated_functions_0.cpp +++ b/vnpy/api/xtp/vnxtp/generated_files/generated_functions_0.cpp @@ -379,7 +379,8 @@ void generate_class_XTP_API_TraderApi(pybind11::object & parent) brigand::list< > >::value, - pybind11::call_guard() + pybind11::call_guard(), + pybind11::return_value_policy::reference ); c.def("GetApiVersion", autocxxpy::apply_function_transform< diff --git a/vnpy/api/xtp/vnxtp/generated_files/generated_functions_1.cpp b/vnpy/api/xtp/vnxtp/generated_files/generated_functions_1.cpp index 14130770..ff2f5f04 100644 --- a/vnpy/api/xtp/vnxtp/generated_files/generated_functions_1.cpp +++ b/vnpy/api/xtp/vnxtp/generated_files/generated_functions_1.cpp @@ -277,9 +277,14 @@ void generate_class_XTP_API_QuoteApi(pybind11::object & parent) PyQuoteApi > c(parent, "QuoteApi"); c.def_static("CreateQuoteApi", + autocxxpy::apply_function_transform< + autocxxpy::function_constant< &XTP::API::QuoteApi::CreateQuoteApi - //, - //pybind11::call_guard() + >, + brigand::list< + > + >::value, + pybind11::call_guard() ); c.def("Release", autocxxpy::apply_function_transform< @@ -319,7 +324,8 @@ void generate_class_XTP_API_QuoteApi(pybind11::object & parent) brigand::list< > >::value, - pybind11::call_guard() + pybind11::call_guard(), + pybind11::return_value_policy::reference ); c.def("SetUDPBufferSize", autocxxpy::apply_function_transform< diff --git a/vnpy/api/xtp/vnxtp/generated_files/module.cpp b/vnpy/api/xtp/vnxtp/generated_files/module.cpp index 617f88e8..7c62af9f 100644 --- a/vnpy/api/xtp/vnxtp/generated_files/module.cpp +++ b/vnpy/api/xtp/vnxtp/generated_files/module.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include "module.hpp" @@ -20,6 +21,12 @@ void additional_init(pybind11::module &m) void init_dispatcher(pybind11::module &m) { + m.def("set_async_callback_exception_handler", &autocxxpy::async_callback_exception_handler::set_handler); + pybind11::class_ c(m, "AsyncDispatchException"); + c.def_property("what", &autocxxpy::async_dispatch_exception::what, nullptr); + c.def_readonly("instance", &autocxxpy::async_dispatch_exception::instance); + c.def_readonly("function_name", &autocxxpy::async_dispatch_exception::function_name); + autocxxpy::dispatcher::instance().start(); } diff --git a/vnpy/api/xtp/vnxtp/include/autocxxpy/callback_wrapper.hpp b/vnpy/api/xtp/vnxtp/include/autocxxpy/callback_wrapper.hpp index 6100625a..2b9e523c 100644 --- a/vnpy/api/xtp/vnxtp/include/autocxxpy/callback_wrapper.hpp +++ b/vnpy/api/xtp/vnxtp/include/autocxxpy/callback_wrapper.hpp @@ -2,6 +2,7 @@ #include #include +#include #include "brigand.hpp" @@ -66,7 +67,7 @@ namespace autocxxpy template constexpr callback_type callback_type_of_v = callback_type_of::value; -#ifdef PYBIND11_OVERLOAD_NAME +#ifdef AUTOCXXPY_INCLUDED_PYBIND11 template struct pybind11_static_caster { static pybind11::detail::overload_caster_t caster; @@ -75,18 +76,59 @@ namespace autocxxpy template AUTOCXXPY_SELECT_ANY pybind11::detail::overload_caster_t pybind11_static_caster::caster; + struct async_dispatch_exception : public std::exception + { + async_dispatch_exception(const char *what, const pybind11::object &instance, std::string function_name) + : std::exception(what), instance(instance), function_name(function_name) + {} + pybind11::object instance; + std::string function_name; + inline const char* what() noexcept + { + return std::exception::what(); + } + }; + + struct async_callback_exception_handler + { + using handler_type = std::function; + static handler_type custom_handler; + + inline static void handle_excepiton(const async_dispatch_exception&e) + { + if (custom_handler) + { + custom_handler(e); + } + } + + inline static void set_handler(const handler_type& handler) + { + custom_handler = handler; + } + }; + + AUTOCXXPY_SELECT_ANY async_callback_exception_handler::handler_type async_callback_exception_handler::custom_handler; #endif namespace arg_helper { + ////////////////////////////////////////////////////////////////////////// + // stores + ////////////////////////////////////////////////////////////////////////// + // # todo: char8, char16, char32, wchar_t, etc... // # todo: shall i copy only const type, treating non-const type as output pointer? - inline auto save(const char *val) + inline std::optional save(const char* val) { // match const char * + if (nullptr == val) AUTOCXXPY_UNLIKELY + return std::nullopt; // maybe empty string is also a choice? return std::string(val); } - inline auto save(char *val) + inline std::optional save(char* val) { // match char * + if (nullptr == val) AUTOCXXPY_UNLIKELY + return std::nullopt; // maybe empty string is also a choice? return std::string(val); } template @@ -101,29 +143,41 @@ namespace autocxxpy } template - inline T &save(T *val) + inline std::optional save(T * val) { // match pointer + if (nullptr == val) AUTOCXXPY_UNLIKELY + { + return std::nullopt; + } return *val; } template - inline T &save(const T *val) + inline std::optional& save(const T * val) { // match const pointer + if (nullptr == val) AUTOCXXPY_UNLIKELY + { + return std::nullopt; + } return const_cast(*val); } template - inline T &save(const T &val) + inline T& save(const T & val) { // match everything else : just use original type return const_cast(val); } + ////////////////////////////////////////////////////////////////////////// + // loads + ////////////////////////////////////////////////////////////////////////// + template struct loader { // match default(everyting besides pointer) template - inline to_type operator ()(src_type &val) + inline to_type operator ()(src_type& val) { return val; } @@ -132,57 +186,57 @@ namespace autocxxpy template struct loader> { // match const char [] - using to_type = const char *; - inline to_type operator ()(const std::string &val) + using to_type = const char*; + inline to_type operator ()(const std::string& val) { - return const_cast(val.data()); + return const_cast(val.data()); } }; template struct loader> { // match char [] - using to_type = char *; - inline to_type operator ()(const std::string &val) + using to_type = char*; + inline to_type operator ()(const std::string& val) { - return const_cast(val.data()); + return const_cast(val.data()); } }; template <> - struct loader + struct loader { // match const char * - using to_type = const char *; - inline to_type operator ()(const std::string &val) + using to_type = const char*; + inline to_type operator ()(const std::optional& val) { - return const_cast(val.data()); + if (val) AUTOCXXPY_LIKELY + return const_cast(val->data()); + return nullptr; } }; template <> - struct loader + struct loader { // match char * - using to_type = char *; - inline to_type operator ()(const std::string &val) + using to_type = char*; + inline to_type operator ()(const std::optional& val) { - return const_cast(val.data()); + if (val) AUTOCXXPY_LIKELY + return const_cast(val->data()); + return nullptr; } }; template - struct loader + struct loader { // match pointer template - inline to_type *operator ()(src_type &val) + inline to_type* operator ()(const std::optional& val) { // val to poiner - return const_cast(&val); + if (val) AUTOCXXPY_LIKELY + return const_cast(&(*val)); + return nullptr; } - - //template - //inline to_type *operator ()(src_type *val) - //{ // pointer to pointer - // return val; - //} }; }; @@ -195,7 +249,7 @@ namespace autocxxpy using class_type = class_of_member_method_t; public: template - inline static ret_type call(class_type *instance, const char *py_func_name, arg_types ... args) + inline static ret_type call(class_type* instance, const char* py_func_name, arg_types ... args) { if constexpr (callback_type_of_v == callback_type::Direct) { @@ -211,47 +265,70 @@ namespace autocxxpy } template - inline static void async(class_type *instance, const char *py_func_name, arg_types ... args) + inline static void async(class_type* instance, const char* py_func_name, arg_types ... args) { return async_impl(instance, py_func_name, std::index_sequence_for{}, args ...); } template - inline static ret_type sync(class_type *instance, const char * py_func_name, arg_types ... args) + inline static ret_type sync(class_type * instance, const char* py_func_name, arg_types ... args) { // if this code is under test environment, we don't need pybind11 // since header of pybind11 use #pragma once, no macros is defined, we use a public macro to check if pybind11 is included or not #ifdef PYBIND11_OVERLOAD_NAME pybind11::gil_scoped_acquire gil; - pybind11::function overload = pybind11::get_overload(static_cast(instance), py_func_name); - if (overload) { - auto o = overload(args ...); - if (pybind11::detail::cast_is_temporary_value_reference::value) { - auto & caster = pybind11_static_caster::caster; - return pybind11::detail::cast_ref(std::move(o), caster); + pybind11::function overload = pybind11::get_overload(static_cast(instance), py_func_name); + if (overload) AUTOCXXPY_LIKELY{ + try + { + auto result = overload(args ...); + if (pybind11::detail::cast_is_temporary_value_reference::value) + { + auto& caster = pybind11_static_caster::caster; + return pybind11::detail::cast_ref(std::move(result), caster); + } + else + { + return pybind11::detail::cast_safe(std::move(result)); + } + } + catch (const pybind11::error_already_set & e) + { + // todo: option to not to throw when sync is called directly + throw async_dispatch_exception(e.what(), pybind11::cast(instance), py_func_name); } - else return pybind11::detail::cast_safe(std::move(o)); } #endif return (instance->*method)(args ...); } private: template - inline static void async_impl(class_type *instance, const char *py_func_name, std::index_sequence, arg_types ... args) + inline static void async_impl(class_type * instance, const char* py_func_name, std::index_sequence, arg_types ... args) { // wrap for ctp like function calls: // all the pointer might be unavailable after this call, so copy its value into a tuple auto arg_tuple = std::make_tuple(arg_helper::save(args) ...); auto task = [instance, py_func_name, arg_tuple = std::move(arg_tuple)]() { - // resolve all value: - // if it was originally a pointer, then use pointer type. - // if it was originally a value, just keep a reference to that value. - sync( - instance, py_func_name, - arg_helper::loader, brigand::integral_constant > >{} - (std::get(arg_tuple)) ... - ); +#ifdef AUTOCXXPY_INCLUDED_PYBIND11 + try + { +#endif + // resolve all value: + // if it was originally a pointer, then use pointer type. + // if it was originally a value, just keep a reference to that value. + sync( + instance, py_func_name, + arg_helper::loader, brigand::integral_constant > >{} + (std::get(arg_tuple)) ... + ); +#ifdef AUTOCXXPY_INCLUDED_PYBIND11 + } + catch (const async_dispatch_exception &e) + { + async_callback_exception_handler::handle_excepiton(e); + } +#endif }; dispatcher::instance().add(std::move(task)); } diff --git a/vnpy/api/xtp/vnxtp/include/autocxxpy/config/config.hpp b/vnpy/api/xtp/vnxtp/include/autocxxpy/config/config.hpp index 79301824..f0e28a91 100644 --- a/vnpy/api/xtp/vnxtp/include/autocxxpy/config/config.hpp +++ b/vnpy/api/xtp/vnxtp/include/autocxxpy/config/config.hpp @@ -8,12 +8,30 @@ #ifndef AUTOCXXPY_UNUSED -#define AUTOCXXPY_UNUSED(x) (void)(x) +# define AUTOCXXPY_UNUSED(x) (void)(x) #endif #ifdef _MSC_VER -#define AUTOCXXPY_SELECT_ANY __declspec(selectany) +# define AUTOCXXPY_SELECT_ANY __declspec(selectany) #else -#define AUTOCXXPY_SELECT_ANY __attribute__ ((selectany)) -#endif \ No newline at end of file +# define AUTOCXXPY_SELECT_ANY __attribute__ ((selectany)) +#endif + + +#ifdef __has_cpp_attribute +# if __has_cpp_attribute(likely) +# define AUTOCXXPY_LIKELY [[likely]] +# endif +# if __has_cpp_attribute(unlikely) +# define AUTOCXXPY_UNLIKELY [[unlikely]] +# endif +#endif + +#ifndef AUTOCXXPY_LIKELY +#define AUTOCXXPY_LIKELY +#endif + +#ifndef AUTOCXXPY_UNLIKELY +#define AUTOCXXPY_UNLIKELY +#endif diff --git a/vnpy/api/xtp/vnxtp/vnxtp.vcxproj b/vnpy/api/xtp/vnxtp/vnxtp.vcxproj index 9811582a..e711c1af 100644 --- a/vnpy/api/xtp/vnxtp/vnxtp.vcxproj +++ b/vnpy/api/xtp/vnxtp/vnxtp.vcxproj @@ -119,6 +119,7 @@ stdcpp17 4819 /bigobj %(AdditionalOptions) + true Console @@ -138,6 +139,7 @@ stdcpp17 4819 /bigobj %(AdditionalOptions) + true Console @@ -160,6 +162,7 @@ stdcpp17 4819 /bigobj %(AdditionalOptions) + true Console @@ -184,6 +187,7 @@ stdcpp17 4819 /bigobj %(AdditionalOptions) + true Console diff --git a/vnpy/api/xtp/vnxtp_XTP.pyi b/vnpy/api/xtp/vnxtp_XTP.pyi index 63aa7d94..f5ac8cb9 100644 --- a/vnpy/api/xtp/vnxtp_XTP.pyi +++ b/vnpy/api/xtp/vnxtp_XTP.pyi @@ -7,5 +7,20 @@ if typing.TYPE_CHECKING: from .vnxtp import * +def set_async_callback_exception_handler(handler: Callable[[Exception, object, str], bool]): + """ + set a customize exception handler for async callback in this module(pyd) + \a handler should return True if it handles that exception, + If the return value of \a handler is not True, exception will be re-thrown. + """ + ... + + +class AsyncDispatchException: + what: str + instance: object + function_name: str + + from . import vnxtp_XTP_API as API diff --git a/vnpy/api/xtp/vnxtp_XTP_API.pyi b/vnpy/api/xtp/vnxtp_XTP_API.pyi index 11706ffc..e3f98ddd 100644 --- a/vnpy/api/xtp/vnxtp_XTP_API.pyi +++ b/vnpy/api/xtp/vnxtp_XTP_API.pyi @@ -7,6 +7,21 @@ if typing.TYPE_CHECKING: from .vnxtp import * +def set_async_callback_exception_handler(handler: Callable[[Exception, object, str], bool]): + """ + set a customize exception handler for async callback in this module(pyd) + \a handler should return True if it handles that exception, + If the return value of \a handler is not True, exception will be re-thrown. + """ + ... + + +class AsyncDispatchException: + what: str + instance: object + function_name: str + + class TraderSpi(): diff --git a/vnpy/app/cta_strategy/backtesting.py b/vnpy/app/cta_strategy/backtesting.py index fdeff439..e4aa25c7 100644 --- a/vnpy/app/cta_strategy/backtesting.py +++ b/vnpy/app/cta_strategy/backtesting.py @@ -226,6 +226,8 @@ class BacktestingEngine: # Use the first [days] of history data for initializing strategy day_count = 0 + ix = 0 + for ix, data in enumerate(self.history_data): if self.datetime and data.datetime.day != self.datetime.day: day_count += 1 diff --git a/vnpy/app/cta_strategy/engine.py b/vnpy/app/cta_strategy/engine.py index ebe8ad73..901a7207 100644 --- a/vnpy/app/cta_strategy/engine.py +++ b/vnpy/app/cta_strategy/engine.py @@ -97,6 +97,8 @@ class CtaEngine(BaseEngine): self.rq_client = None self.rq_symbols = set() + self.vt_tradeids = set() # for filtering duplicate trade + self.offset_converter = OffsetConverter(self.main_engine) def init_engine(self): @@ -190,6 +192,11 @@ class CtaEngine(BaseEngine): """""" trade = event.data + # Filter duplicate trade push + if trade.vt_tradeid in self.vt_tradeids: + return + self.vt_tradeids.add(trade.vt_tradeid) + self.offset_converter.update_trade(trade) strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index 997d3f02..1b17ca2f 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -299,8 +299,6 @@ class TigerGateway(BaseGateway): def on_order_change(self, tiger_account: str, data: list): """""" data = dict(data) - print("委托推送", data["origin_symbol"], - data["order_id"], data["filled"], data["status"]) symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) status = PUSH_STATUS_TIGER2VT[data["status"]] @@ -368,8 +366,6 @@ class TigerGateway(BaseGateway): self.ID_VT2TIGER[local_id] = str(order.order_id) self.trade_client.place_order(order) - print("发单:", order.contract.symbol, - order.order_id, order.quantity, order.status) except: # noqa traceback.print_exc() @@ -551,8 +547,6 @@ class TigerGateway(BaseGateway): self.on_order(order) self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()} - print("原始委托字典", self.ID_TIGER2VT) - print("原始反向字典", self.ID_VT2TIGER) def process_deal(self, data): """ diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 91b72d3f..726bf41e 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,193 +1,741 @@ from typing import Any, Sequence +from datetime import datetime +from threading import Thread -from vnpy.api.xtp.vnxtp import (OrderBookStruct, XTP, XTPMarketDataStruct, XTPQuoteStaticInfo, - XTPRspInfoStruct, XTPSpecificTickerStruct, XTPTickByTickStruct, - XTPTickerPriceInfo, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL, - XTP_PROTOCOL_TYPE) - +from vnpy.api.xtp.vnxtp import ( + XTP, + set_async_callback_exception_handler, + AsyncDispatchException, + OrderBookStruct, + XTPMarketDataStruct, + XTPQuoteStaticInfo, + XTPRspInfoStruct, + XTPSpecificTickerStruct, + XTPTickByTickStruct, + XTPTickerPriceInfo, + XTPOrderInsertInfo, + XTPOrderInfo, + XTPTradeReport, + XTPOrderCancelInfo, + XTPQueryStkPositionRsp, + XTPQueryAssetRsp, + XTPStructuredFundInfo, + XTPFundTransferNotice, + XTPQueryETFBaseRsp, + XTPQueryETFComponentRsp, + XTPQueryIPOTickerRsp, + XTPQueryIPOQuotaRsp, + XTPQueryOptionAuctionInfoRsp, + XTP_EXCHANGE_TYPE, + XTP_LOG_LEVEL, + XTP_PROTOCOL_TYPE, + XTP_TE_RESUME_TYPE, + XTP_SIDE_BUY, + XTP_SIDE_SELL, + XTP_BUSINESS_TYPE, + XTP_TICKER_TYPE, + XTP_MARKET_TYPE, + XTP_PRICE_TYPE, + XTP_ORDER_STATUS_TYPE +) from vnpy.event import EventEngine -from vnpy.trader.constant import Exchange +from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import CancelRequest, OrderRequest, SubscribeRequest +from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, + TickData, ContractData, OrderData, TradeData, + PositionData, AccountData) +from vnpy.trader.utility import get_folder_path + API = XTP.API + + EXCHANGE_XTP2VT = { XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE, XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE, } EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()} +MARKET_XTP2VT = { + XTP_MARKET_TYPE.XTP_MKT_SH_A: Exchange.SSE, + XTP_MARKET_TYPE.XTP_MKT_SZ_A: Exchange.SZSE +} +MARKET_VT2XTP = {v: k for k, v in MARKET_XTP2VT.items()} -class QuoteSpi(API.QuoteSpi): +PRODUCT_XTP2VT = { + XTP_TICKER_TYPE.XTP_TICKER_TYPE_STOCK: Product.EQUITY, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_INDEX: Product.INDEX, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_FUND: Product.FUND, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_BOND: Product.BOND, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_OPTION: Product.OPTION +} - def OnDisconnected(self, reason: int) -> Any: - print("OnDisconnected") - return super().OnDisconnected(reason) +DIRECTION_VT2XTP = { + Direction.LONG: XTP_SIDE_BUY, + Direction.SHORT: XTP_SIDE_SELL +} +DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()} - def OnError(self, error_info: XTPRspInfoStruct) -> Any: - return super().OnError(error_info) +ORDERTYPE_VT2XTP = { + OrderType.LIMIT: XTP_PRICE_TYPE.XTP_PRICE_LIMIT, + OrderType.MARKET: XTP_PRICE_TYPE.XTP_PRICE_BEST5_OR_CANCEL +} +ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()} - def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - print("OnSubMarketData") - return super().OnSubMarketData(ticker, error_info, is_last) +STATUS_XTP2VT = { + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_INIT: Status.SUBMITTING, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.CANCELLED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_CANCELED: Status.CANCELLED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_REJECTED: Status.REJECTED, +} - def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubMarketData(ticker, error_info, is_last) - def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int], - bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], - ask1_count: int, max_ask1_count: int) -> Any: - print("OnDepthMarketData") - return super().OnDepthMarketData(market_data, bid1_qty, bid1_count, max_bid1_count, - ask1_qty, ask1_count, max_ask1_count) - - def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnSubOrderBook(ticker, error_info, is_last) - - def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubOrderBook(ticker, error_info, is_last) - - def OnOrderBook(self, order_book: OrderBookStruct) -> Any: - return super().OnOrderBook(order_book) - - def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnSubTickByTick(ticker, error_info, is_last) - - def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubTickByTick(ticker, error_info, is_last) - - def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any: - return super().OnTickByTick(tbt_data) - - def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllMarketData(exchange_id, error_info) - - def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllMarketData(exchange_id, error_info) - - def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOrderBook(exchange_id, error_info) - - def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOrderBook(exchange_id, error_info) - - def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllTickByTick(exchange_id, error_info) - - def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllTickByTick(exchange_id, error_info) - - def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnQueryAllTickers(ticker_info, error_info, is_last) - - def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnQueryTickersPriceInfo(ticker_info, error_info, is_last) - - def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionMarketData(exchange_id, error_info) - - def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionMarketData(exchange_id, error_info) - - def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionOrderBook(exchange_id, error_info) - - def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionOrderBook(exchange_id, error_info) - - def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionTickByTick(exchange_id, error_info) - - def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionTickByTick(exchange_id, error_info) +symbol_name_map = {} class XtpGateway(BaseGateway): - def __init__(self, event_engine: "EventEngine"): - self.client_id: int = 1 - self.quote_api = API.QuoteApi.CreateQuoteApi( - self.client_id, # todo: change client id - "log", # todo: use vnpy temp path - XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE - ) - self.quote_spi = QuoteSpi() - super().__init__(event_engine, "XTP") - default_setting = { - "client_id": "0", - "quote_server_ip": "", - "quote_server_port": "", - "quote_server_protocol": ["TCP", "UDP"], - "quote_userid": "", - "quote_password": "", + "账号": "", + "密码": "", + "客户号": 1, + "行情地址": "", + "行情端口": 0, + "交易地址": "", + "交易端口": 0, + "行情协议": ["TCP", "UDP"], + "授权码": "" } + def __init__(self, event_engine: EventEngine): + """""" + super().__init__(event_engine, "XTP") + + self.quote_api = XtpQuoteApi(self) + self.trader_api = XtpTraderApi(self) + + set_async_callback_exception_handler( + self._async_callback_exception_handler) + def connect(self, setting: dict): - self.client_id = int(setting['client_id']) - quote_server_ip = setting['quote_server_ip'] - quote_server_port = int(setting['quote_server_port']) - quote_server_protocol = setting['quote_server_protocol'] - quote_userid = setting['quote_userid'] - quote_password = setting['quote_password'] + """""" + userid = setting['账号'] + password = setting['密码'] + client_id = int(setting['客户号']) + quote_ip = setting['行情地址'] + quote_port = int(setting['行情端口']) + trader_ip = setting['交易地址'] + trader_port = int(setting['交易端口']) + quote_protocol = setting["行情协议"] + software_key = setting["授权码"] - quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP if quote_server_protocol == 'TCP' else 'UDP' - - self.quote_api.RegisterSpi(self.quote_spi) - # self.quote_api.SetHeartBeatInterval(60) - - ret = self.quote_api.Login( - quote_server_ip, - quote_server_port, - quote_userid, - quote_password, - quote_protocol - ) - if ret == 0: - # login succeed - self.write_log("Login succeed.") - pass + self.quote_api.connect(userid, password, client_id, + quote_ip, quote_port, quote_protocol) + self.trader_api.connect(userid, password, client_id, + trader_ip, trader_port, software_key) + self.init_query() def close(self): - pass + """""" + self.quote_api.close() + self.trader_api.close() def subscribe(self, req: SubscribeRequest): - ret = self.quote_api.SubscribeMarketData( - [req.symbol], - EXCHANGE_VT2XTP[req.exchange], - ) - if ret != 0: - print("订阅行情失败") # improve: return True or False, or raise with reason - pass + """""" + self.quote_api.subscrbie(req) def send_order(self, req: OrderRequest) -> str: - pass + """""" + return self.trader_api.send_order(req) def cancel_order(self, req: CancelRequest): - pass + """""" + self.trader_api.cancel_order(req) def query_account(self): - pass + """""" + self.trader_api.query_account() def query_position(self): + """""" + self.trader_api.query_position() + + def process_timer_event(self, event): + """""" + self.count += 1 + if self.count < 2: + return + self.count = 0 + + func = self.query_functions.pop(0) + func() + self.query_functions.append(func) + + def init_query(self): + """""" + self.count = 0 + self.query_functions = [self.query_account, self.query_position] + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def _async_callback_exception_handler(self, e: AsyncDispatchException): + error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" + print(error_str) + + +class XtpQuoteApi(API.QuoteSpi): + + def __init__(self, gateway: BaseGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.userid = "" + self.password = "" + self.client_id: int = 0 + self.server_ip = "" + self.server_port: int = 0 + self.server_protocol = "" + + self.api = None + + def connect( + self, + userid: str, + password: str, + client_id: int, + server_ip: str, + server_port: int, + quote_protocol: str + ): + """""" + if self.api: + return + + self.userid = userid + self.password = password + self.client_id = client_id + self.server_ip = server_ip + self.server_port = server_port + + if quote_protocol == "CTP": + self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP + else: + self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_UDP + + # Create API object + path = str(get_folder_path(self.gateway_name.lower())) + + self.api = API.QuoteApi.CreateQuoteApi( + self.client_id, + path, + XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE + ) + + self.api.RegisterSpi(self) + self.gateway.write_log("行情接口初始化成功") + + # Login to server + Thread(target=self.login).start() + + def login(self): + """""" + ret = self.api.Login( + self.server_ip, + self.server_port, + self.userid, + self.password, + self.quote_protocol + ) + + if not ret: + msg = "行情服务器登录成功" + self.query_contract() + else: + msg = f"行情服务器登录失败,原因:{ret}" + self.gateway.write_log(msg) + + def close(self): + """""" + if self.api: + self.api.RegisterSpi(None) + self.api.Release() + + def subscrbie(self, req: SubscribeRequest): + """""" + xtp_exchange = EXCHANGE_VT2XTP.get(req.exchange, "") + self.api.SubscribeMarketData([req.symbol], xtp_exchange) + + def query_contract(self): + """""" + for exchange_id in EXCHANGE_XTP2VT.keys(): + self.api.QueryAllTickers(exchange_id) + + def check_error(self, func_name: str, error_info: XTPRspInfoStruct): + """""" + if error_info and error_info.error_id: + msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" + self.gateway.write_log(msg) + return True + else: + return False + + def OnDisconnected(self, reason: int) -> Any: + """""" + self.gateway.write_log("行情服务器连接断开") + + def OnError(self, error_info: XTPRspInfoStruct) -> Any: + """""" + self.check_error("行情接口", error_info) + + def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + self.check_error("订阅行情", error_info) + + def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int], + bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], + ask1_count: int, max_ask1_count: int) -> Any: + """""" + timestamp = str(market_data.data_time) + dt = datetime.strptime(timestamp, "%Y%m%d%H%M%S%f") + + tick = TickData( + symbol=market_data.ticker, + exchange=EXCHANGE_XTP2VT[market_data.exchange_id], + datetime=dt, + volume=market_data.qty, + last_price=market_data.last_price, + limit_up=market_data.upper_limit_price, + limit_down=market_data.lower_limit_price, + open_price=market_data.open_price, + high_price=market_data.high_price, + low_price=market_data.low_price, + pre_close=market_data.pre_close_price, + bid_price_1=market_data.bid[0], + bid_price_2=market_data.bid[1], + bid_price_3=market_data.bid[2], + bid_price_4=market_data.bid[3], + bid_price_5=market_data.bid[4], + ask_price_1=market_data.ask[0], + ask_price_2=market_data.ask[1], + ask_price_3=market_data.ask[2], + ask_price_4=market_data.ask[3], + ask_price_5=market_data.ask[4], + bid_volume_1=market_data.bid_qty[0], + bid_volume_2=market_data.bid_qty[1], + bid_volume_3=market_data.bid_qty[2], + bid_volume_4=market_data.bid_qty[3], + bid_volume_5=market_data.bid_qty[4], + ask_volume_1=market_data.ask_qty[0], + ask_volume_2=market_data.ask_qty[1], + ask_volume_3=market_data.ask_qty[2], + ask_volume_4=market_data.ask_qty[3], + ask_volume_5=market_data.ask_qty[4], + gateway_name=self.gateway_name + ) + tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol) + + self.gateway.on_tick(tick) + + def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnOrderBook(self, order_book: OrderBookStruct) -> Any: + """""" + pass + + def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any: + """""" + pass + + def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + if self.check_error("查询合约", error_info): + return + + contract = ContractData( + symbol=ticker_info.ticker, + exchange=EXCHANGE_XTP2VT[ticker_info.exchange_id], + name=ticker_info.ticker_name, + product=PRODUCT_XTP2VT[ticker_info.ticker_type], + size=1, + pricetick=ticker_info.price_tick, + min_volume=ticker_info.buy_qty_unit, + gateway_name=self.gateway_name + ) + self.gateway.on_contract(contract) + + symbol_name_map[contract.vt_symbol] = contract.name + + if is_last: + self.gateway.write_log(f"{contract.exchange.value}合约信息查询成功") + + def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + +class XtpTraderApi(API.TraderSpi): + + def __init__(self, gateway: BaseGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.userid = "" + self.password = "" + self.client_id = "" + self.server_ip = "" + self.server_port = "" + self.software_key = "" + + self.api = None + self.session_id = 0 + self.reqid = 0 + + def connect( + self, + userid: str, + password: str, + client_id: int, + server_ip: str, + server_port: int, + software_key: str + ): + """""" + if self.api: + return + + self.userid = userid + self.password = password + self.client_id = client_id + self.server_ip = server_ip + self.server_port = server_port + self.software_key = software_key + + # Create API object + path = str(get_folder_path(self.gateway_name.lower())) + + self.api = API.TraderApi.CreateTraderApi( + self.client_id, + path, + XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE + ) + + self.api.RegisterSpi(self) + self.api.SetSoftwareKey(self.software_key) + self.api.SubscribePublicTopic(XTP_TE_RESUME_TYPE.XTP_TERT_RESTART) + + self.gateway.write_log("交易接口初始化成功") + + # Login to server + Thread(target=self.login).start() + + def login(self): + """""" + self.session_id = self.api.Login( + self.server_ip, + self.server_port, + self.userid, + self.password, + XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP + ) + + if self.session_id: + msg = "交易服务器登录成功" + else: + error = self.api.GetApiLastError() + msg = f"交易服务器登录失败,原因:{error.error_msg}" + + self.gateway.write_log(msg) + + def close(self): + """""" + if self.api: + self.api.RegisterSpi(None) + self.api.Release() + + def send_order(self, req: OrderRequest) -> str: + """""" + if req.exchange not in MARKET_VT2XTP: + self.gateway.write_log(f"委托失败,不支持的交易所{req.exchange.value}") + return "" + + if req.type not in ORDERTYPE_VT2XTP: + self.gateway.write_log(f"委托失败,不支持的委托类型{req.type.value}") + return "" + + xtp_req = XTPOrderInsertInfo() + xtp_req.ticker = req.symbol + xtp_req.market = MARKET_VT2XTP[req.exchange] + xtp_req.price = req.price + xtp_req.quantity = int(req.volume) + xtp_req.side = DIRECTION_VT2XTP[req.direction] + xtp_req.price_type = ORDERTYPE_VT2XTP[req.type] + xtp_req.business_type = XTP_BUSINESS_TYPE.XTP_BUSINESS_TYPE_CASH + + orderid = self.api.InsertOrder(xtp_req, self.session_id) + + order = req.create_order_data(str(orderid), self.gateway_name) + self.gateway.on_order(order) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + self.api.CancelOrder(int(req.orderid), self.session_id) + + def query_account(self): + """""" + if not self.api: + return + + self.reqid += 1 + self.api.QueryAsset(self.session_id, self.reqid) + + def query_position(self): + """""" + if not self.api: + return + + self.reqid += 1 + self.api.QueryPosition("", self.session_id, self.reqid) + + def check_error(self, func_name: str, error_info: XTPRspInfoStruct): + """""" + if error_info and error_info.error_id: + msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" + self.gateway.write_log(msg) + return True + else: + return False + + def OnDisconnected(self, session_id: int, reason: int) -> Any: + """""" + self.gateway.write_log("交易服务器连接断开") + + def OnError(self, error_info: XTPRspInfoStruct) -> Any: + """""" + self.check_error("交易接口", error_info) + + def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, + session_id: int) -> Any: + """""" + self.check_error("委托下单", error_info) + + order = OrderData( + symbol=order_info.ticker, + exchange=MARKET_XTP2VT[order_info.market], + orderid=str(order_info.order_xtp_id), + type=ORDERTYPE_XTP2VT[order_info.price_type], + direction=DIRECTION_XTP2VT[order_info.side], + price=order_info.price, + volume=order_info.quantity, + traded=order_info.qty_traded, + status=STATUS_XTP2VT[order_info.order_status], + time=order_info.insert_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_order(order) + + def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any: + """""" + trade = TradeData( + symbol=trade_info.ticker, + exchange=MARKET_XTP2VT[trade_info.market], + orderid=str(trade_info.order_xtp_id), + tradeid=str(trade_info.exec_id), + direction=DIRECTION_XTP2VT[trade_info.side], + price=trade_info.price, + volume=trade_info.quantity, + time=trade_info.trade_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_trade(trade) + + def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct, + session_id: int) -> Any: + """""" + self.check_error("委托撤单", error_info) + + def OnQueryOrder(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + if self.check_error("查询委托", error_info): + return + + self.updateOrder(order_info) + + if is_last: + self.gateway.write_log("查询委托信息成功") + + def OnQueryTrade(self, trade_info: XTPTradeReport, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + if self.check_error("查询成交", error_info): + return + + self.updateTrade(trade_info) + + if is_last: + self.gateway.write_log("查询成交信息成功") + + def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct, + request_id: int, is_last: bool, session_id: int) -> Any: + """""" + position = PositionData( + symbol=xtp_position.ticker, + exchange=MARKET_XTP2VT[xtp_position.market], + direction=Direction.NET, + volume=xtp_position.total_qty, + frozen=xtp_position.locked_position, + price=xtp_position.avg_price, + pnl=xtp_position.unrealized_pnl, + yd_volume=xtp_position.yesterday_position, + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) + + def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct, + request_id: int, is_last: bool, session_id: int) -> Any: + """""" + account = AccountData( + accountid=self.userid, + balance=asset.buying_power, + frozen=asset.withholding_amount, + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def OnQueryStructuredFund(self, fund_info: XTPStructuredFundInfo, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, session_id: int) -> Any: + """""" + pass + + def OnQueryETF(self, etf_info: XTPQueryETFBaseRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryETFBasket(self, etf_component_info: XTPQueryETFComponentRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryIPOInfoList(self, ipo_info: XTPQueryIPOTickerRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryIPOQuotaInfo(self, quota_info: XTPQueryIPOQuotaRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryOptionAuctionInfo(self, option_info: XTPQueryOptionAuctionInfoRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" pass