This commit is contained in:
msincenselee 2018-05-12 14:37:40 +08:00
parent 83dc65544c
commit f1ceec1b17

View File

@ -4,11 +4,12 @@ print( 'load vtEngine.py')
import shelve import shelve
from collections import OrderedDict from collections import OrderedDict
import os import os,sys
import copy import copy
from pymongo import MongoClient from pymongo import MongoClient
from pymongo.errors import ConnectionFailure from pymongo.errors import ConnectionFailure,AutoReconnect
#import vnpy.trader.mongo_proxy
from vnpy.trader.vtEvent import Event as vn_event from vnpy.trader.vtEvent import Event as vn_event
from vnpy.trader.language import text from vnpy.trader.language import text
@ -26,6 +27,8 @@ try:
except: except:
print('import util_mail fail') print('import util_mail fail')
LOG_DB_NAME = 'vt_logger'
######################################################################## ########################################################################
class MainEngine(object): class MainEngine(object):
"""主引擎""" """主引擎"""
@ -45,6 +48,7 @@ class MainEngine(object):
# MongoDB数据库相关 # MongoDB数据库相关
self.dbClient = None # MongoDB客户端对象 self.dbClient = None # MongoDB客户端对象
self.db_has_connected = False
# 接口实例 # 接口实例
self.gatewayDict = OrderedDict() self.gatewayDict = OrderedDict()
@ -282,9 +286,11 @@ class MainEngine(object):
# 只断开指定的gateway # 只断开指定的gateway
if gateway_name != EMPTY_STRING: if gateway_name != EMPTY_STRING:
if gateway_name in self.gatewayDict: if gateway_name in self.gatewayDict:
self.writeLog(u'获取{} gateway'.format(gateway_name))
gateway = self.gatewayDict[gateway_name] gateway = self.gatewayDict[gateway_name]
gateway.close() gateway.close()
if gateway_name in self.connected_gw_names: if gateway_name in self.connected_gw_names:
self.writeLog(u'移除connected_gw_names[{}]'.format(gateway_name))
self.connected_gw_names.remove(gateway_name) self.connected_gw_names.remove(gateway_name)
return return
else: else:
@ -346,7 +352,7 @@ class MainEngine(object):
if self.logger is not None: if self.logger is not None:
self.logger.error(content) self.logger.error(content)
else: else:
print(content) print(content, file=sys.stderr)
self.createLogger() self.createLogger()
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
@ -362,7 +368,7 @@ class MainEngine(object):
if self.logger is not None: if self.logger is not None:
self.logger.warning(content) self.logger.warning(content)
else: else:
print( content) print(content,file=sys.stderr)
self.createLogger() self.createLogger()
# 发出邮件 # 发出邮件
@ -400,7 +406,7 @@ class MainEngine(object):
if self.logger: if self.logger:
self.logger.critical(content) self.logger.critical(content)
else: else:
print( content) print( content,file=sys.stderr)
self.createLogger() self.createLogger()
# 发出邮件 # 发出邮件
@ -424,27 +430,81 @@ class MainEngine(object):
self.dbClient.server_info() self.dbClient.server_info()
self.writeLog(text.DATABASE_CONNECTING_COMPLETED) self.writeLog(text.DATABASE_CONNECTING_COMPLETED)
self.db_has_connected = True
# 如果启动日志记录,则注册日志事件监听函数 # 如果启动日志记录,则注册日志事件监听函数
if logging: #if logging:
self.eventEngine.register(EVENT_LOG, self.dbLogging) # self.eventEngine.register(EVENT_LOG, self.dbLogging)
except ConnectionFailure: except ConnectionFailure:
self.dbClient = None
self.writeError(text.DATABASE_CONNECTING_FAILED) self.writeError(text.DATABASE_CONNECTING_FAILED)
self.db_has_connected = False
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def dbInsert(self, dbName, collectionName, d): def dbInsert(self, dbName, collectionName, d):
"""向MongoDB中插入数据d是具体数据""" """向MongoDB中插入数据d是具体数据"""
try:
if self.dbClient: if self.dbClient:
db = self.dbClient[dbName] db = self.dbClient[dbName]
collection = db[collectionName] collection = db[collectionName]
collection.insert_one(d) collection.insert_one(d)
else: else:
self.writeLog(text.DATA_INSERT_FAILED) self.writeLog(text.DATA_INSERT_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbInsert exception:{}'.format(str(ex)))
def dbInsertMany(self,dbName, collectionName, data_list,ordered=True):
"""
向MongoDB中插入数据data_list是具体数据 列表
:param dbName:
:param collectionName:
:param data_list:
:param ordered: 是否忽略insert error
:return:
"""
if not isinstance(data_list,list):
self.writeLog(text.DATA_INSERT_FAILED)
return
try:
if self.dbClient:
db = self.dbClient[dbName]
collection = db[collectionName]
collection.insert_many(data_list, ordered = ordered)
else:
self.writeLog(text.DATA_INSERT_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbInsertMany exception:{}'.format(str(ex)))
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def dbQuery(self, dbName, collectionName, d): def dbQuery(self, dbName, collectionName, d):
"""从MongoDB中读取数据d是查询要求返回的是数据库查询的指针""" """从MongoDB中读取数据d是查询要求返回的是数据库查询的指针"""
try:
if self.dbClient: if self.dbClient:
db = self.dbClient[dbName] db = self.dbClient[dbName]
collection = db[collectionName] collection = db[collectionName]
@ -455,17 +515,113 @@ class MainEngine(object):
return [] return []
else: else:
self.writeLog(text.DATA_QUERY_FAILED) self.writeLog(text.DATA_QUERY_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbQuery exception:{}'.format(str(ex)))
return [] return []
def dbQueryBySort(self, dbName, collectionName, d, sortName, sortType, limitNum=0):
"""从MongoDB中读取数据d是查询要求sortName是排序的字段,sortType是排序类型
返回的是数据库查询的指针"""
try:
if self.dbClient:
db = self.dbClient[dbName]
collection = db[collectionName]
if limitNum > 0:
cursor = collection.find(d).sort(sortName, sortType).limit(limitNum)
else:
cursor = collection.find(d).sort(sortName, sortType)
if cursor:
return list(cursor)
else:
return []
else:
self.writeLog(text.DATA_QUERY_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbQueryBySort exception:{}'.format(str(ex)))
return []
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def dbUpdate(self, dbName, collectionName, d, flt, upsert=False): def dbUpdate(self, dbName, collectionName, d, flt, upsert=False):
"""向MongoDB中更新数据d是具体数据flt是过滤条件upsert代表若无是否要插入""" """向MongoDB中更新数据d是具体数据flt是过滤条件upsert代表若无是否要插入"""
try:
if self.dbClient: if self.dbClient:
db = self.dbClient[dbName] db = self.dbClient[dbName]
collection = db[collectionName] collection = db[collectionName]
collection.replace_one(flt, d, upsert) collection.replace_one(flt, d, upsert)
else: else:
self.writeLog(text.DATA_UPDATE_FAILED) self.writeLog(text.DATA_UPDATE_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbUpdate exception:{}'.format(str(ex)))
def dbDelete(self,dbName, collectionName, flt):
"""
向mongodb中删除数据flt是过滤条件
:param dbName:
:param collectionName:
:param flt:
:return:
"""
try:
if self.dbClient:
db = self.dbClient[dbName]
collection = db[collectionName]
collection.delete_many(flt)
else:
self.writeLog(text.DATA_DELETE_FAILED)
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except ConnectionFailure:
self.dbClient = None
self.writeError(u'数据库连接断开')
if self.db_has_connected:
self.writeLog(u'重新尝试连接数据库')
self.dbConnect()
except AutoReconnect as ex:
self.writeError(u'数据库连接断开重连:{}'.format(str(ex)))
time.sleep(1)
except Exception as ex:
self.writeError(u'dbDelete exception:{}'.format(str(ex)))
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def dbLogging(self, event): def dbLogging(self, event):