Merge pull request #1637 from vnpy/dev_xtp

Dev xtp
This commit is contained in:
vn.py 2019-04-26 16:46:34 +08:00 committed by GitHub
commit be3850fe3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 974 additions and 263 deletions

View File

@ -7,8 +7,8 @@ from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.tiger import TigerGateway
from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway
@ -16,6 +16,7 @@ from vnpy.gateway.huobi import HuobiGateway
from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.gateway.okexf import OkexfGateway
from vnpy.gateway.xtp import XtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp
@ -30,8 +31,9 @@ def main():
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# main_engine.add_gateway(CtpGateway)
main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(XtpGateway)
main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(IbGateway)
main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway)

Binary file not shown.

View File

@ -7,19 +7,34 @@ if typing.TYPE_CHECKING:
from .vnxtp import *
def set_async_callback_exception_handler(handler: Callable[[AsyncDispatchException], None]):
"""
set a customize exception handler for async callback in this module(pyd)
\a handler should return True if it handles that exception,
If the return value of \a handler is not True, exception will be re-thrown.
"""
...
class AsyncDispatchException:
what: str
instance: object
function_name: str
from . import vnxtp_XTP as XTP
class XTPRspInfoStruct():
error_id: int
error_msg: Sequence[int]
error_msg: str
class XTPSpecificTickerStruct():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker: str
class XTPMarketDataStockExData():
@ -71,7 +86,7 @@ class XTPMarketDataStruct():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker: str
last_price: float
pre_close_price: float
open_price: float
@ -95,7 +110,7 @@ class XTPMarketDataStruct():
bid_qty: Sequence[int]
ask_qty: Sequence[int]
trades_count: int
ticker_status: Sequence[int]
ticker_status: str
stk: XTPMarketDataStockExData
opt: XTPMarketDataOptionExData
data_type: XTP_MARKETDATA_TYPE
@ -106,8 +121,8 @@ class XTPQuoteStaticInfo():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker_name: Sequence[int]
ticker: str
ticker_name: str
ticker_type: XTP_TICKER_TYPE
pre_close_price: float
upper_limit_price: float
@ -121,7 +136,7 @@ class OrderBookStruct():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker: str
last_price: float
qty: int
turnover: float
@ -161,7 +176,7 @@ class XTPTickByTickStruct():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker: str
seq: int
data_time: int
type: XTP_TBT_TYPE
@ -173,7 +188,7 @@ class XTPTickerPriceInfo():
exchange_id: XTP_EXCHANGE_TYPE
ticker: Sequence[int]
ticker: str
last_price: float
@ -182,7 +197,7 @@ class XTPOrderInsertInfo():
order_xtp_id: int
order_client_id: int
ticker: Sequence[int]
ticker: str
market: XTP_MARKET_TYPE
price: float
stop_price: float
@ -210,7 +225,7 @@ class XTPOrderInfo():
order_client_id: int
order_cancel_client_id: int
order_cancel_xtp_id: int
ticker: Sequence[int]
ticker: str
market: XTP_MARKET_TYPE
price: float
quantity: int
@ -227,7 +242,7 @@ class XTPOrderInfo():
update_time: int
cancel_time: int
trade_amount: float
order_local_id: Sequence[int]
order_local_id: str
order_status: XTP_ORDER_STATUS_TYPE
order_submit_status: XTP_ORDER_SUBMIT_STATUS_TYPE
order_type: int
@ -238,16 +253,16 @@ class XTPTradeReport():
order_xtp_id: int
order_client_id: int
ticker: Sequence[int]
ticker: str
market: XTP_MARKET_TYPE
local_order_id: int
exec_id: Sequence[int]
exec_id: str
price: float
quantity: int
trade_time: int
trade_amount: float
report_index: int
order_exch_id: Sequence[int]
order_exch_id: str
trade_type: int
u32: int
side: int
@ -255,13 +270,13 @@ class XTPTradeReport():
reserved1: int
reserved2: int
business_type: XTP_BUSINESS_TYPE
branch_pbu: Sequence[int]
branch_pbu: str
class XTPQueryOrderReq():
ticker: Sequence[int]
ticker: str
begin_time: int
end_time: int
@ -270,13 +285,13 @@ class XTPQueryReportByExecIdReq():
order_xtp_id: int
exec_id: Sequence[int]
exec_id: str
class XTPQueryTraderReq():
ticker: Sequence[int]
ticker: str
begin_time: int
end_time: int
@ -311,8 +326,8 @@ class XTPQueryAssetRsp():
class XTPQueryStkPositionRsp():
ticker: Sequence[int]
ticker_name: Sequence[int]
ticker: str
ticker_name: str
market: XTP_MARKET_TYPE
total_qty: int
sellable_qty: int
@ -350,17 +365,17 @@ class XTPQueryStructuredFundInfoReq():
exchange_id: XTP_EXCHANGE_TYPE
sf_ticker: Sequence[int]
sf_ticker: str
class XTPStructuredFundInfo():
exchange_id: XTP_EXCHANGE_TYPE
sf_ticker: Sequence[int]
sf_ticker_name: Sequence[int]
ticker: Sequence[int]
ticker_name: Sequence[int]
sf_ticker: str
sf_ticker_name: str
ticker: str
ticker_name: str
split_merge_status: XTP_SPLIT_MERGE_STATUS
ratio: int
min_split_qty: int
@ -372,15 +387,15 @@ class XTPQueryETFBaseReq():
market: XTP_MARKET_TYPE
ticker: Sequence[int]
ticker: str
class XTPQueryETFBaseRsp():
market: XTP_MARKET_TYPE
etf: Sequence[int]
subscribe_redemption_ticker: Sequence[int]
etf: str
subscribe_redemption_ticker: str
unit: int
subscribe_status: int
redemption_status: int
@ -395,16 +410,16 @@ class XTPQueryETFComponentReq():
market: XTP_MARKET_TYPE
ticker: Sequence[int]
ticker: str
class XTPQueryETFComponentRsp():
market: XTP_MARKET_TYPE
ticker: Sequence[int]
component_ticker: Sequence[int]
component_name: Sequence[int]
ticker: str
component_ticker: str
component_name: str
quantity: int
component_market: XTP_MARKET_TYPE
replace_type: ETF_REPLACE_TYPE
@ -416,8 +431,8 @@ class XTPQueryIPOTickerRsp():
market: XTP_MARKET_TYPE
ticker: Sequence[int]
ticker_name: Sequence[int]
ticker: str
ticker_name: str
price: float
unit: int
qty_upper_limit: int
@ -434,17 +449,17 @@ class XTPQueryOptionAuctionInfoReq():
market: XTP_MARKET_TYPE
ticker: Sequence[int]
ticker: str
class XTPQueryOptionAuctionInfoRsp():
ticker: Sequence[int]
ticker: str
security_id_source: XTP_MARKET_TYPE
symbol: Sequence[int]
contract_id: Sequence[int]
underlying_security_id: Sequence[int]
symbol: str
contract_id: str
underlying_security_id: str
underlying_security_id_source: XTP_MARKET_TYPE
list_date: int
last_trade_date: int
@ -483,8 +498,8 @@ class XTPFundTransferReq():
serial_id: int
fund_account: Sequence[int]
password: Sequence[int]
fund_account: str
password: str
amount: float
transfer_type: XTP_FUND_TRANSFER_TYPE
@ -610,7 +625,7 @@ class XTP_POSITION_DIRECTION_TYPE(Enum):
class XTP_MARKETDATA_TYPE(Enum):
XTP_MARKETDATA_ACTUAL: XTP_MARKETDATA_TYPE
XTP_MARKETDATA_OPTION: XTP_MARKETDATA_TYPE
XTPVersionType = Sequence[int]
XTPVersionType = str
XTP_LOG_LEVEL = XTP_LOG_LEVEL
XTP_PROTOCOL_TYPE = XTP_PROTOCOL_TYPE
XTP_EXCHANGE_TYPE = XTP_EXCHANGE_TYPE

View File

@ -379,7 +379,8 @@ void generate_class_XTP_API_TraderApi(pybind11::object & parent)
brigand::list<
>
>::value,
pybind11::call_guard<pybind11::gil_scoped_release>()
pybind11::call_guard<pybind11::gil_scoped_release>(),
pybind11::return_value_policy::reference
);
c.def("GetApiVersion",
autocxxpy::apply_function_transform<

View File

@ -277,9 +277,14 @@ void generate_class_XTP_API_QuoteApi(pybind11::object & parent)
PyQuoteApi
> c(parent, "QuoteApi");
c.def_static("CreateQuoteApi",
autocxxpy::apply_function_transform<
autocxxpy::function_constant<
&XTP::API::QuoteApi::CreateQuoteApi
//,
//pybind11::call_guard<pybind11::gil_scoped_release>()
>,
brigand::list<
>
>::value,
pybind11::call_guard<pybind11::gil_scoped_release>()
);
c.def("Release",
autocxxpy::apply_function_transform<
@ -319,7 +324,8 @@ void generate_class_XTP_API_QuoteApi(pybind11::object & parent)
brigand::list<
>
>::value,
pybind11::call_guard<pybind11::gil_scoped_release>()
pybind11::call_guard<pybind11::gil_scoped_release>(),
pybind11::return_value_policy::reference
);
c.def("SetUDPBufferSize",
autocxxpy::apply_function_transform<

View File

@ -1,6 +1,7 @@
#include <iostream>
#include <string>
#include <pybind11/pybind11.h>
#include <pybind11/functional.h>
#include <autocxxpy/autocxxpy.hpp>
#include "module.hpp"
@ -20,6 +21,12 @@ void additional_init(pybind11::module &m)
void init_dispatcher(pybind11::module &m)
{
m.def("set_async_callback_exception_handler", &autocxxpy::async_callback_exception_handler::set_handler);
pybind11::class_<autocxxpy::async_dispatch_exception> c(m, "AsyncDispatchException");
c.def_property("what", &autocxxpy::async_dispatch_exception::what, nullptr);
c.def_readonly("instance", &autocxxpy::async_dispatch_exception::instance);
c.def_readonly("function_name", &autocxxpy::async_dispatch_exception::function_name);
autocxxpy::dispatcher::instance().start();
}

View File

@ -2,6 +2,7 @@
#include <tuple>
#include <type_traits>
#include <optional>
#include "brigand.hpp"
@ -66,7 +67,7 @@ namespace autocxxpy
template <auto method>
constexpr callback_type callback_type_of_v = callback_type_of<method>::value;
#ifdef PYBIND11_OVERLOAD_NAME
#ifdef AUTOCXXPY_INCLUDED_PYBIND11
template <class ret_type>
struct pybind11_static_caster {
static pybind11::detail::overload_caster_t<ret_type> caster;
@ -75,18 +76,59 @@ namespace autocxxpy
template <class ret_type>
AUTOCXXPY_SELECT_ANY pybind11::detail::overload_caster_t<ret_type> pybind11_static_caster<ret_type>::caster;
struct async_dispatch_exception : public std::exception
{
async_dispatch_exception(const char *what, const pybind11::object &instance, std::string function_name)
: std::exception(what), instance(instance), function_name(function_name)
{}
pybind11::object instance;
std::string function_name;
inline const char* what() noexcept
{
return std::exception::what();
}
};
struct async_callback_exception_handler
{
using handler_type = std::function<void(const async_dispatch_exception&)>;
static handler_type custom_handler;
inline static void handle_excepiton(const async_dispatch_exception&e)
{
if (custom_handler)
{
custom_handler(e);
}
}
inline static void set_handler(const handler_type& handler)
{
custom_handler = handler;
}
};
AUTOCXXPY_SELECT_ANY async_callback_exception_handler::handler_type async_callback_exception_handler::custom_handler;
#endif
namespace arg_helper
{
//////////////////////////////////////////////////////////////////////////
// stores
//////////////////////////////////////////////////////////////////////////
// # todo: char8, char16, char32, wchar_t, etc...
// # todo: shall i copy only const type, treating non-const type as output pointer?
inline auto save(const char *val)
inline std::optional<std::string> save(const char* val)
{ // match const char *
if (nullptr == val) AUTOCXXPY_UNLIKELY
return std::nullopt; // maybe empty string is also a choice?
return std::string(val);
}
inline auto save(char *val)
inline std::optional<std::string> save(char* val)
{ // match char *
if (nullptr == val) AUTOCXXPY_UNLIKELY
return std::nullopt; // maybe empty string is also a choice?
return std::string(val);
}
template<size_t size>
@ -101,29 +143,41 @@ namespace autocxxpy
}
template <class T>
inline T &save(T *val)
inline std::optional<T> save(T * val)
{ // match pointer
if (nullptr == val) AUTOCXXPY_UNLIKELY
{
return std::nullopt;
}
return *val;
}
template <class T>
inline T &save(const T *val)
inline std::optional<T>& save(const T * val)
{ // match const pointer
if (nullptr == val) AUTOCXXPY_UNLIKELY
{
return std::nullopt;
}
return const_cast<T&>(*val);
}
template <class T>
inline T &save(const T &val)
inline T& save(const T & val)
{ // match everything else : just use original type
return const_cast<T&>(val);
}
//////////////////////////////////////////////////////////////////////////
// loads
//////////////////////////////////////////////////////////////////////////
template <class to_type>
struct loader
{ // match default(everyting besides pointer)
template <class src_type>
inline to_type operator ()(src_type &val)
inline to_type operator ()(src_type& val)
{
return val;
}
@ -132,57 +186,57 @@ namespace autocxxpy
template <size_t size>
struct loader<const string_array<size>>
{ // match const char []
using to_type = const char *;
inline to_type operator ()(const std::string &val)
using to_type = const char*;
inline to_type operator ()(const std::string& val)
{
return const_cast<char *>(val.data());
return const_cast<char*>(val.data());
}
};
template <size_t size>
struct loader<string_array<size>>
{ // match char []
using to_type = char *;
inline to_type operator ()(const std::string &val)
using to_type = char*;
inline to_type operator ()(const std::string& val)
{
return const_cast<char *>(val.data());
return const_cast<char*>(val.data());
}
};
template <>
struct loader<const char *>
struct loader<const char*>
{ // match const char *
using to_type = const char *;
inline to_type operator ()(const std::string &val)
using to_type = const char*;
inline to_type operator ()(const std::optional<std::string>& val)
{
return const_cast<char *>(val.data());
if (val) AUTOCXXPY_LIKELY
return const_cast<char*>(val->data());
return nullptr;
}
};
template <>
struct loader<char *>
struct loader<char*>
{ // match char *
using to_type = char *;
inline to_type operator ()(const std::string &val)
using to_type = char*;
inline to_type operator ()(const std::optional<std::string>& val)
{
return const_cast<char *>(val.data());
if (val) AUTOCXXPY_LIKELY
return const_cast<char*>(val->data());
return nullptr;
}
};
template <class to_type>
struct loader<to_type *>
struct loader<to_type*>
{ // match pointer
template <class src_type>
inline to_type *operator ()(src_type &val)
inline to_type* operator ()(const std::optional<src_type>& val)
{ // val to poiner
return const_cast<to_type *>(&val);
if (val) AUTOCXXPY_LIKELY
return const_cast<to_type*>(&(*val));
return nullptr;
}
//template <class src_type>
//inline to_type *operator ()(src_type *val)
//{ // pointer to pointer
// return val;
//}
};
};
@ -195,7 +249,7 @@ namespace autocxxpy
using class_type = class_of_member_method_t<method>;
public:
template <class ... arg_types>
inline static ret_type call(class_type *instance, const char *py_func_name, arg_types ... args)
inline static ret_type call(class_type* instance, const char* py_func_name, arg_types ... args)
{
if constexpr (callback_type_of_v<method> == callback_type::Direct)
{
@ -211,47 +265,70 @@ namespace autocxxpy
}
template <class ... arg_types>
inline static void async(class_type *instance, const char *py_func_name, arg_types ... args)
inline static void async(class_type* instance, const char* py_func_name, arg_types ... args)
{
return async_impl(instance, py_func_name, std::index_sequence_for<arg_types ...>{}, args ...);
}
template <class ... arg_types>
inline static ret_type sync(class_type *instance, const char * py_func_name, arg_types ... args)
inline static ret_type sync(class_type * instance, const char* py_func_name, arg_types ... args)
{
// if this code is under test environment, we don't need pybind11
// since header of pybind11 use #pragma once, no macros is defined, we use a public macro to check if pybind11 is included or not
#ifdef PYBIND11_OVERLOAD_NAME
pybind11::gil_scoped_acquire gil;
pybind11::function overload = pybind11::get_overload(static_cast<const class_type *>(instance), py_func_name);
if (overload) {
auto o = overload(args ...);
if (pybind11::detail::cast_is_temporary_value_reference<ret_type>::value) {
auto & caster = pybind11_static_caster<ret_type>::caster;
return pybind11::detail::cast_ref<ret_type>(std::move(o), caster);
pybind11::function overload = pybind11::get_overload(static_cast<const class_type*>(instance), py_func_name);
if (overload) AUTOCXXPY_LIKELY{
try
{
auto result = overload(args ...);
if (pybind11::detail::cast_is_temporary_value_reference<ret_type>::value)
{
auto& caster = pybind11_static_caster<ret_type>::caster;
return pybind11::detail::cast_ref<ret_type>(std::move(result), caster);
}
else
{
return pybind11::detail::cast_safe<ret_type>(std::move(result));
}
}
catch (const pybind11::error_already_set & e)
{
// todo: option to not to throw when sync is called directly
throw async_dispatch_exception(e.what(), pybind11::cast(instance), py_func_name);
}
else return pybind11::detail::cast_safe<ret_type>(std::move(o));
}
#endif
return (instance->*method)(args ...);
}
private:
template <class ... arg_types, size_t ... idx>
inline static void async_impl(class_type *instance, const char *py_func_name, std::index_sequence<idx ...>, arg_types ... args)
inline static void async_impl(class_type * instance, const char* py_func_name, std::index_sequence<idx ...>, arg_types ... args)
{
// wrap for ctp like function calls:
// all the pointer might be unavailable after this call, so copy its value into a tuple
auto arg_tuple = std::make_tuple(arg_helper::save(args) ...);
auto task = [instance, py_func_name, arg_tuple = std::move(arg_tuple)]()
{
// resolve all value:
// if it was originally a pointer, then use pointer type.
// if it was originally a value, just keep a reference to that value.
sync<arg_types ...>(
instance, py_func_name,
arg_helper::loader<brigand::at<brigand::list<arg_types ...>, brigand::integral_constant<int, idx> > >{}
(std::get<idx>(arg_tuple)) ...
);
#ifdef AUTOCXXPY_INCLUDED_PYBIND11
try
{
#endif
// resolve all value:
// if it was originally a pointer, then use pointer type.
// if it was originally a value, just keep a reference to that value.
sync<arg_types ...>(
instance, py_func_name,
arg_helper::loader<brigand::at<brigand::list<arg_types ...>, brigand::integral_constant<int, idx> > >{}
(std::get<idx>(arg_tuple)) ...
);
#ifdef AUTOCXXPY_INCLUDED_PYBIND11
}
catch (const async_dispatch_exception &e)
{
async_callback_exception_handler::handle_excepiton(e);
}
#endif
};
dispatcher::instance().add(std::move(task));
}

View File

@ -8,12 +8,30 @@
#ifndef AUTOCXXPY_UNUSED
#define AUTOCXXPY_UNUSED(x) (void)(x)
# define AUTOCXXPY_UNUSED(x) (void)(x)
#endif
#ifdef _MSC_VER
#define AUTOCXXPY_SELECT_ANY __declspec(selectany)
# define AUTOCXXPY_SELECT_ANY __declspec(selectany)
#else
#define AUTOCXXPY_SELECT_ANY __attribute__ ((selectany))
#endif
# define AUTOCXXPY_SELECT_ANY __attribute__ ((selectany))
#endif
#ifdef __has_cpp_attribute
# if __has_cpp_attribute(likely)
# define AUTOCXXPY_LIKELY [[likely]]
# endif
# if __has_cpp_attribute(unlikely)
# define AUTOCXXPY_UNLIKELY [[unlikely]]
# endif
#endif
#ifndef AUTOCXXPY_LIKELY
#define AUTOCXXPY_LIKELY
#endif
#ifndef AUTOCXXPY_UNLIKELY
#define AUTOCXXPY_UNLIKELY
#endif

View File

@ -119,6 +119,7 @@
<LanguageStandard>stdcpp17</LanguageStandard>
<DisableSpecificWarnings>4819</DisableSpecificWarnings>
<AdditionalOptions>/bigobj %(AdditionalOptions)</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -138,6 +139,7 @@
<LanguageStandard>stdcpp17</LanguageStandard>
<DisableSpecificWarnings>4819</DisableSpecificWarnings>
<AdditionalOptions>/bigobj %(AdditionalOptions)</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -160,6 +162,7 @@
<LanguageStandard>stdcpp17</LanguageStandard>
<DisableSpecificWarnings>4819</DisableSpecificWarnings>
<AdditionalOptions>/bigobj %(AdditionalOptions)</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -184,6 +187,7 @@
<LanguageStandard>stdcpp17</LanguageStandard>
<DisableSpecificWarnings>4819</DisableSpecificWarnings>
<AdditionalOptions>/bigobj %(AdditionalOptions)</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>

View File

@ -7,5 +7,20 @@ if typing.TYPE_CHECKING:
from .vnxtp import *
def set_async_callback_exception_handler(handler: Callable[[Exception, object, str], bool]):
"""
set a customize exception handler for async callback in this module(pyd)
\a handler should return True if it handles that exception,
If the return value of \a handler is not True, exception will be re-thrown.
"""
...
class AsyncDispatchException:
what: str
instance: object
function_name: str
from . import vnxtp_XTP_API as API

View File

@ -7,6 +7,21 @@ if typing.TYPE_CHECKING:
from .vnxtp import *
def set_async_callback_exception_handler(handler: Callable[[Exception, object, str], bool]):
"""
set a customize exception handler for async callback in this module(pyd)
\a handler should return True if it handles that exception,
If the return value of \a handler is not True, exception will be re-thrown.
"""
...
class AsyncDispatchException:
what: str
instance: object
function_name: str
class TraderSpi():

View File

@ -226,6 +226,8 @@ class BacktestingEngine:
# Use the first [days] of history data for initializing strategy
day_count = 0
ix = 0
for ix, data in enumerate(self.history_data):
if self.datetime and data.datetime.day != self.datetime.day:
day_count += 1

View File

@ -97,6 +97,8 @@ class CtaEngine(BaseEngine):
self.rq_client = None
self.rq_symbols = set()
self.vt_tradeids = set() # for filtering duplicate trade
self.offset_converter = OffsetConverter(self.main_engine)
def init_engine(self):
@ -190,6 +192,11 @@ class CtaEngine(BaseEngine):
""""""
trade = event.data
# Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids:
return
self.vt_tradeids.add(trade.vt_tradeid)
self.offset_converter.update_trade(trade)
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)

View File

@ -299,8 +299,6 @@ class TigerGateway(BaseGateway):
def on_order_change(self, tiger_account: str, data: list):
""""""
data = dict(data)
print("委托推送", data["origin_symbol"],
data["order_id"], data["filled"], data["status"])
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
status = PUSH_STATUS_TIGER2VT[data["status"]]
@ -368,8 +366,6 @@ class TigerGateway(BaseGateway):
self.ID_VT2TIGER[local_id] = str(order.order_id)
self.trade_client.place_order(order)
print("发单:", order.contract.symbol,
order.order_id, order.quantity, order.status)
except: # noqa
traceback.print_exc()
@ -551,8 +547,6 @@ class TigerGateway(BaseGateway):
self.on_order(order)
self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()}
print("原始委托字典", self.ID_TIGER2VT)
print("原始反向字典", self.ID_VT2TIGER)
def process_deal(self, data):
"""

View File

@ -1,193 +1,741 @@
from typing import Any, Sequence
from datetime import datetime
from threading import Thread
from vnpy.api.xtp.vnxtp import (OrderBookStruct, XTP, XTPMarketDataStruct, XTPQuoteStaticInfo,
XTPRspInfoStruct, XTPSpecificTickerStruct, XTPTickByTickStruct,
XTPTickerPriceInfo, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL,
XTP_PROTOCOL_TYPE)
from vnpy.api.xtp.vnxtp import (
XTP,
set_async_callback_exception_handler,
AsyncDispatchException,
OrderBookStruct,
XTPMarketDataStruct,
XTPQuoteStaticInfo,
XTPRspInfoStruct,
XTPSpecificTickerStruct,
XTPTickByTickStruct,
XTPTickerPriceInfo,
XTPOrderInsertInfo,
XTPOrderInfo,
XTPTradeReport,
XTPOrderCancelInfo,
XTPQueryStkPositionRsp,
XTPQueryAssetRsp,
XTPStructuredFundInfo,
XTPFundTransferNotice,
XTPQueryETFBaseRsp,
XTPQueryETFComponentRsp,
XTPQueryIPOTickerRsp,
XTPQueryIPOQuotaRsp,
XTPQueryOptionAuctionInfoRsp,
XTP_EXCHANGE_TYPE,
XTP_LOG_LEVEL,
XTP_PROTOCOL_TYPE,
XTP_TE_RESUME_TYPE,
XTP_SIDE_BUY,
XTP_SIDE_SELL,
XTP_BUSINESS_TYPE,
XTP_TICKER_TYPE,
XTP_MARKET_TYPE,
XTP_PRICE_TYPE,
XTP_ORDER_STATUS_TYPE
)
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import CancelRequest, OrderRequest, SubscribeRequest
from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest,
TickData, ContractData, OrderData, TradeData,
PositionData, AccountData)
from vnpy.trader.utility import get_folder_path
API = XTP.API
EXCHANGE_XTP2VT = {
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE,
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE,
}
EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()}
MARKET_XTP2VT = {
XTP_MARKET_TYPE.XTP_MKT_SH_A: Exchange.SSE,
XTP_MARKET_TYPE.XTP_MKT_SZ_A: Exchange.SZSE
}
MARKET_VT2XTP = {v: k for k, v in MARKET_XTP2VT.items()}
class QuoteSpi(API.QuoteSpi):
PRODUCT_XTP2VT = {
XTP_TICKER_TYPE.XTP_TICKER_TYPE_STOCK: Product.EQUITY,
XTP_TICKER_TYPE.XTP_TICKER_TYPE_INDEX: Product.INDEX,
XTP_TICKER_TYPE.XTP_TICKER_TYPE_FUND: Product.FUND,
XTP_TICKER_TYPE.XTP_TICKER_TYPE_BOND: Product.BOND,
XTP_TICKER_TYPE.XTP_TICKER_TYPE_OPTION: Product.OPTION
}
def OnDisconnected(self, reason: int) -> Any:
print("OnDisconnected")
return super().OnDisconnected(reason)
DIRECTION_VT2XTP = {
Direction.LONG: XTP_SIDE_BUY,
Direction.SHORT: XTP_SIDE_SELL
}
DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()}
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
return super().OnError(error_info)
ORDERTYPE_VT2XTP = {
OrderType.LIMIT: XTP_PRICE_TYPE.XTP_PRICE_LIMIT,
OrderType.MARKET: XTP_PRICE_TYPE.XTP_PRICE_BEST5_OR_CANCEL
}
ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()}
def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
print("OnSubMarketData")
return super().OnSubMarketData(ticker, error_info, is_last)
STATUS_XTP2VT = {
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_INIT: Status.SUBMITTING,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.CANCELLED,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_CANCELED: Status.CANCELLED,
XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_REJECTED: Status.REJECTED,
}
def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubMarketData(ticker, error_info, is_last)
def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int],
bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int],
ask1_count: int, max_ask1_count: int) -> Any:
print("OnDepthMarketData")
return super().OnDepthMarketData(market_data, bid1_qty, bid1_count, max_bid1_count,
ask1_qty, ask1_count, max_ask1_count)
def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnSubOrderBook(ticker, error_info, is_last)
def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubOrderBook(ticker, error_info, is_last)
def OnOrderBook(self, order_book: OrderBookStruct) -> Any:
return super().OnOrderBook(order_book)
def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnSubTickByTick(ticker, error_info, is_last)
def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubTickByTick(ticker, error_info, is_last)
def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any:
return super().OnTickByTick(tbt_data)
def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllMarketData(exchange_id, error_info)
def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllMarketData(exchange_id, error_info)
def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOrderBook(exchange_id, error_info)
def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOrderBook(exchange_id, error_info)
def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllTickByTick(exchange_id, error_info)
def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllTickByTick(exchange_id, error_info)
def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnQueryAllTickers(ticker_info, error_info, is_last)
def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnQueryTickersPriceInfo(ticker_info, error_info, is_last)
def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionMarketData(exchange_id, error_info)
def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionMarketData(exchange_id, error_info)
def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionOrderBook(exchange_id, error_info)
def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionOrderBook(exchange_id, error_info)
def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionTickByTick(exchange_id, error_info)
def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionTickByTick(exchange_id, error_info)
symbol_name_map = {}
class XtpGateway(BaseGateway):
def __init__(self, event_engine: "EventEngine"):
self.client_id: int = 1
self.quote_api = API.QuoteApi.CreateQuoteApi(
self.client_id, # todo: change client id
"log", # todo: use vnpy temp path
XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE
)
self.quote_spi = QuoteSpi()
super().__init__(event_engine, "XTP")
default_setting = {
"client_id": "0",
"quote_server_ip": "",
"quote_server_port": "",
"quote_server_protocol": ["TCP", "UDP"],
"quote_userid": "",
"quote_password": "",
"账号": "",
"密码": "",
"客户号": 1,
"行情地址": "",
"行情端口": 0,
"交易地址": "",
"交易端口": 0,
"行情协议": ["TCP", "UDP"],
"授权码": ""
}
def __init__(self, event_engine: EventEngine):
""""""
super().__init__(event_engine, "XTP")
self.quote_api = XtpQuoteApi(self)
self.trader_api = XtpTraderApi(self)
set_async_callback_exception_handler(
self._async_callback_exception_handler)
def connect(self, setting: dict):
self.client_id = int(setting['client_id'])
quote_server_ip = setting['quote_server_ip']
quote_server_port = int(setting['quote_server_port'])
quote_server_protocol = setting['quote_server_protocol']
quote_userid = setting['quote_userid']
quote_password = setting['quote_password']
""""""
userid = setting['账号']
password = setting['密码']
client_id = int(setting['客户号'])
quote_ip = setting['行情地址']
quote_port = int(setting['行情端口'])
trader_ip = setting['交易地址']
trader_port = int(setting['交易端口'])
quote_protocol = setting["行情协议"]
software_key = setting["授权码"]
quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP if quote_server_protocol == 'TCP' else 'UDP'
self.quote_api.RegisterSpi(self.quote_spi)
# self.quote_api.SetHeartBeatInterval(60)
ret = self.quote_api.Login(
quote_server_ip,
quote_server_port,
quote_userid,
quote_password,
quote_protocol
)
if ret == 0:
# login succeed
self.write_log("Login succeed.")
pass
self.quote_api.connect(userid, password, client_id,
quote_ip, quote_port, quote_protocol)
self.trader_api.connect(userid, password, client_id,
trader_ip, trader_port, software_key)
self.init_query()
def close(self):
pass
""""""
self.quote_api.close()
self.trader_api.close()
def subscribe(self, req: SubscribeRequest):
ret = self.quote_api.SubscribeMarketData(
[req.symbol],
EXCHANGE_VT2XTP[req.exchange],
)
if ret != 0:
print("订阅行情失败") # improve: return True or False, or raise with reason
pass
""""""
self.quote_api.subscrbie(req)
def send_order(self, req: OrderRequest) -> str:
pass
""""""
return self.trader_api.send_order(req)
def cancel_order(self, req: CancelRequest):
pass
""""""
self.trader_api.cancel_order(req)
def query_account(self):
pass
""""""
self.trader_api.query_account()
def query_position(self):
""""""
self.trader_api.query_position()
def process_timer_event(self, event):
""""""
self.count += 1
if self.count < 2:
return
self.count = 0
func = self.query_functions.pop(0)
func()
self.query_functions.append(func)
def init_query(self):
""""""
self.count = 0
self.query_functions = [self.query_account, self.query_position]
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def _async_callback_exception_handler(self, e: AsyncDispatchException):
error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}"
print(error_str)
class XtpQuoteApi(API.QuoteSpi):
def __init__(self, gateway: BaseGateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.userid = ""
self.password = ""
self.client_id: int = 0
self.server_ip = ""
self.server_port: int = 0
self.server_protocol = ""
self.api = None
def connect(
self,
userid: str,
password: str,
client_id: int,
server_ip: str,
server_port: int,
quote_protocol: str
):
""""""
if self.api:
return
self.userid = userid
self.password = password
self.client_id = client_id
self.server_ip = server_ip
self.server_port = server_port
if quote_protocol == "CTP":
self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP
else:
self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_UDP
# Create API object
path = str(get_folder_path(self.gateway_name.lower()))
self.api = API.QuoteApi.CreateQuoteApi(
self.client_id,
path,
XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE
)
self.api.RegisterSpi(self)
self.gateway.write_log("行情接口初始化成功")
# Login to server
Thread(target=self.login).start()
def login(self):
""""""
ret = self.api.Login(
self.server_ip,
self.server_port,
self.userid,
self.password,
self.quote_protocol
)
if not ret:
msg = "行情服务器登录成功"
self.query_contract()
else:
msg = f"行情服务器登录失败,原因:{ret}"
self.gateway.write_log(msg)
def close(self):
""""""
if self.api:
self.api.RegisterSpi(None)
self.api.Release()
def subscrbie(self, req: SubscribeRequest):
""""""
xtp_exchange = EXCHANGE_VT2XTP.get(req.exchange, "")
self.api.SubscribeMarketData([req.symbol], xtp_exchange)
def query_contract(self):
""""""
for exchange_id in EXCHANGE_XTP2VT.keys():
self.api.QueryAllTickers(exchange_id)
def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
""""""
if error_info and error_info.error_id:
msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}"
self.gateway.write_log(msg)
return True
else:
return False
def OnDisconnected(self, reason: int) -> Any:
""""""
self.gateway.write_log("行情服务器连接断开")
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
""""""
self.check_error("行情接口", error_info)
def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
self.check_error("订阅行情", error_info)
def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int],
bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int],
ask1_count: int, max_ask1_count: int) -> Any:
""""""
timestamp = str(market_data.data_time)
dt = datetime.strptime(timestamp, "%Y%m%d%H%M%S%f")
tick = TickData(
symbol=market_data.ticker,
exchange=EXCHANGE_XTP2VT[market_data.exchange_id],
datetime=dt,
volume=market_data.qty,
last_price=market_data.last_price,
limit_up=market_data.upper_limit_price,
limit_down=market_data.lower_limit_price,
open_price=market_data.open_price,
high_price=market_data.high_price,
low_price=market_data.low_price,
pre_close=market_data.pre_close_price,
bid_price_1=market_data.bid[0],
bid_price_2=market_data.bid[1],
bid_price_3=market_data.bid[2],
bid_price_4=market_data.bid[3],
bid_price_5=market_data.bid[4],
ask_price_1=market_data.ask[0],
ask_price_2=market_data.ask[1],
ask_price_3=market_data.ask[2],
ask_price_4=market_data.ask[3],
ask_price_5=market_data.ask[4],
bid_volume_1=market_data.bid_qty[0],
bid_volume_2=market_data.bid_qty[1],
bid_volume_3=market_data.bid_qty[2],
bid_volume_4=market_data.bid_qty[3],
bid_volume_5=market_data.bid_qty[4],
ask_volume_1=market_data.ask_qty[0],
ask_volume_2=market_data.ask_qty[1],
ask_volume_3=market_data.ask_qty[2],
ask_volume_4=market_data.ask_qty[3],
ask_volume_5=market_data.ask_qty[4],
gateway_name=self.gateway_name
)
tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol)
self.gateway.on_tick(tick)
def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnOrderBook(self, order_book: OrderBookStruct) -> Any:
""""""
pass
def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any:
""""""
pass
def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
if self.check_error("查询合约", error_info):
return
contract = ContractData(
symbol=ticker_info.ticker,
exchange=EXCHANGE_XTP2VT[ticker_info.exchange_id],
name=ticker_info.ticker_name,
product=PRODUCT_XTP2VT[ticker_info.ticker_type],
size=1,
pricetick=ticker_info.price_tick,
min_volume=ticker_info.buy_qty_unit,
gateway_name=self.gateway_name
)
self.gateway.on_contract(contract)
symbol_name_map[contract.vt_symbol] = contract.name
if is_last:
self.gateway.write_log(f"{contract.exchange.value}合约信息查询成功")
def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
class XtpTraderApi(API.TraderSpi):
def __init__(self, gateway: BaseGateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.userid = ""
self.password = ""
self.client_id = ""
self.server_ip = ""
self.server_port = ""
self.software_key = ""
self.api = None
self.session_id = 0
self.reqid = 0
def connect(
self,
userid: str,
password: str,
client_id: int,
server_ip: str,
server_port: int,
software_key: str
):
""""""
if self.api:
return
self.userid = userid
self.password = password
self.client_id = client_id
self.server_ip = server_ip
self.server_port = server_port
self.software_key = software_key
# Create API object
path = str(get_folder_path(self.gateway_name.lower()))
self.api = API.TraderApi.CreateTraderApi(
self.client_id,
path,
XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE
)
self.api.RegisterSpi(self)
self.api.SetSoftwareKey(self.software_key)
self.api.SubscribePublicTopic(XTP_TE_RESUME_TYPE.XTP_TERT_RESTART)
self.gateway.write_log("交易接口初始化成功")
# Login to server
Thread(target=self.login).start()
def login(self):
""""""
self.session_id = self.api.Login(
self.server_ip,
self.server_port,
self.userid,
self.password,
XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP
)
if self.session_id:
msg = "交易服务器登录成功"
else:
error = self.api.GetApiLastError()
msg = f"交易服务器登录失败,原因:{error.error_msg}"
self.gateway.write_log(msg)
def close(self):
""""""
if self.api:
self.api.RegisterSpi(None)
self.api.Release()
def send_order(self, req: OrderRequest) -> str:
""""""
if req.exchange not in MARKET_VT2XTP:
self.gateway.write_log(f"委托失败,不支持的交易所{req.exchange.value}")
return ""
if req.type not in ORDERTYPE_VT2XTP:
self.gateway.write_log(f"委托失败,不支持的委托类型{req.type.value}")
return ""
xtp_req = XTPOrderInsertInfo()
xtp_req.ticker = req.symbol
xtp_req.market = MARKET_VT2XTP[req.exchange]
xtp_req.price = req.price
xtp_req.quantity = int(req.volume)
xtp_req.side = DIRECTION_VT2XTP[req.direction]
xtp_req.price_type = ORDERTYPE_VT2XTP[req.type]
xtp_req.business_type = XTP_BUSINESS_TYPE.XTP_BUSINESS_TYPE_CASH
orderid = self.api.InsertOrder(xtp_req, self.session_id)
order = req.create_order_data(str(orderid), self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
self.api.CancelOrder(int(req.orderid), self.session_id)
def query_account(self):
""""""
if not self.api:
return
self.reqid += 1
self.api.QueryAsset(self.session_id, self.reqid)
def query_position(self):
""""""
if not self.api:
return
self.reqid += 1
self.api.QueryPosition("", self.session_id, self.reqid)
def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
""""""
if error_info and error_info.error_id:
msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}"
self.gateway.write_log(msg)
return True
else:
return False
def OnDisconnected(self, session_id: int, reason: int) -> Any:
""""""
self.gateway.write_log("交易服务器连接断开")
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
""""""
self.check_error("交易接口", error_info)
def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any:
""""""
self.check_error("委托下单", error_info)
order = OrderData(
symbol=order_info.ticker,
exchange=MARKET_XTP2VT[order_info.market],
orderid=str(order_info.order_xtp_id),
type=ORDERTYPE_XTP2VT[order_info.price_type],
direction=DIRECTION_XTP2VT[order_info.side],
price=order_info.price,
volume=order_info.quantity,
traded=order_info.qty_traded,
status=STATUS_XTP2VT[order_info.order_status],
time=order_info.insert_time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any:
""""""
trade = TradeData(
symbol=trade_info.ticker,
exchange=MARKET_XTP2VT[trade_info.market],
orderid=str(trade_info.order_xtp_id),
tradeid=str(trade_info.exec_id),
direction=DIRECTION_XTP2VT[trade_info.side],
price=trade_info.price,
volume=trade_info.quantity,
time=trade_info.trade_time,
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any:
""""""
self.check_error("委托撤单", error_info)
def OnQueryOrder(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
if self.check_error("查询委托", error_info):
return
self.updateOrder(order_info)
if is_last:
self.gateway.write_log("查询委托信息成功")
def OnQueryTrade(self, trade_info: XTPTradeReport, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
if self.check_error("查询成交", error_info):
return
self.updateTrade(trade_info)
if is_last:
self.gateway.write_log("查询成交信息成功")
def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct,
request_id: int, is_last: bool, session_id: int) -> Any:
""""""
position = PositionData(
symbol=xtp_position.ticker,
exchange=MARKET_XTP2VT[xtp_position.market],
direction=Direction.NET,
volume=xtp_position.total_qty,
frozen=xtp_position.locked_position,
price=xtp_position.avg_price,
pnl=xtp_position.unrealized_pnl,
yd_volume=xtp_position.yesterday_position,
gateway_name=self.gateway_name
)
self.gateway.on_position(position)
def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct,
request_id: int, is_last: bool, session_id: int) -> Any:
""""""
account = AccountData(
accountid=self.userid,
balance=asset.buying_power,
frozen=asset.withholding_amount,
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def OnQueryStructuredFund(self, fund_info: XTPStructuredFundInfo, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, session_id: int) -> Any:
""""""
pass
def OnQueryETF(self, etf_info: XTPQueryETFBaseRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryETFBasket(self, etf_component_info: XTPQueryETFComponentRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryIPOInfoList(self, ipo_info: XTPQueryIPOTickerRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryIPOQuotaInfo(self, quota_info: XTPQueryIPOQuotaRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryOptionAuctionInfo(self, option_info: XTPQueryOptionAuctionInfoRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass