[增强] 增加csv文件追加数据方法,增加动态加载类

This commit is contained in:
msincenselee 2020-01-19 17:15:13 +08:00
parent 99d2d0cf79
commit 65bfdd2241

View File

@ -5,6 +5,8 @@ General utility functions.
import json import json
import logging import logging
import sys import sys
import os
import csv
import re import re
from pathlib import Path from pathlib import Path
from typing import Callable, Dict from typing import Callable, Dict
@ -170,7 +172,6 @@ def extract_vt_symbol(vt_symbol: str):
symbol, exchange_str = vt_symbol.split(".") symbol, exchange_str = vt_symbol.split(".")
return symbol, Exchange(exchange_str) return symbol, Exchange(exchange_str)
def generate_vt_symbol(symbol: str, exchange: Exchange): def generate_vt_symbol(symbol: str, exchange: Exchange):
""" """
return vt_symbol return vt_symbol
@ -304,6 +305,74 @@ def print_dict(d: dict):
"""返回dict的字符串类型""" """返回dict的字符串类型"""
return '\n'.join([f'{key}:{d[key]}' for key in sorted(d.keys())]) return '\n'.join([f'{key}:{d[key]}' for key in sorted(d.keys())])
def append_data(self, file_name: str, dict_data: dict, field_names: list = []):
"""
添加数据到csv文件中
:param file_name: csv的文件全路径
:param dict_data: OrderedDict
:return:
"""
dict_fieldnames = sorted(list(dict_data.keys())) if len(field_names) == 0 else field_names
try:
if not os.path.exists(file_name):
print(u'create csv file:{}'.format(file_name))
with open(file_name, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel')
print(u'write csv header:{}'.format(dict_fieldnames))
writer.writeheader()
writer.writerow(dict_data)
else:
with open(file_name, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel',
extrasaction='ignore')
writer.writerow(dict_data)
except Exception as ex:
print(u'append_data exception:{}'.format(str(ex)), file=sys.stderr)
def import_module_by_str(import_module_name):
"""
动态导入模块/函数
:param import_module_name:
:return:
"""
import traceback
from importlib import import_module, reload
# 参数检查
if len(import_module_name) == 0:
print('import_module_by_str parameter error,return None')
return None
print('trying to import {}'.format(import_module_name))
try:
import_name = str(import_module_name).replace(':', '.')
modules = import_name.split('.')
if len(modules) == 1:
mod = import_module(modules[0])
return mod
else:
loaded_modules = '.'.join(modules[0:-1])
print('import {}'.format(loaded_modules))
mod = import_module(loaded_modules)
comp = modules[-1]
if not hasattr(mod, comp):
loaded_modules = '.'.join([loaded_modules,comp])
print('realod {}'.format(loaded_modules))
mod = reload(loaded_modules)
else:
print('from {} import {}'.format(loaded_modules,comp))
mod = getattr(mod, comp)
return mod
except Exception as ex:
print('import {} fail,{},{}'.format(import_module_name,str(ex),traceback.format_exc()))
return None
class BarGenerator: class BarGenerator:
""" """
For: For: