commit
9fb8b1fc2b
41
examples/candle_chart/run.py
Normal file
41
examples/candle_chart/run.py
Normal file
@ -0,0 +1,41 @@
|
||||
from datetime import datetime
|
||||
|
||||
from vnpy.trader.ui import create_qapp, QtCore
|
||||
from vnpy.trader.database import database_manager
|
||||
from vnpy.trader.constant import Exchange, Interval
|
||||
from vnpy.chart import ChartWidget, VolumeItem, CandleItem
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = create_qapp()
|
||||
|
||||
bars = database_manager.load_bar_data(
|
||||
"IF88",
|
||||
Exchange.CFFEX,
|
||||
interval=Interval.MINUTE,
|
||||
start=datetime(2019, 7, 1),
|
||||
end=datetime(2019, 7, 17)
|
||||
)
|
||||
|
||||
widget = ChartWidget()
|
||||
widget.add_plot("candle", hide_x_axis=True)
|
||||
widget.add_plot("volume", maximum_height=200)
|
||||
widget.add_item(CandleItem, "candle", "candle")
|
||||
widget.add_item(VolumeItem, "volume", "volume")
|
||||
widget.add_cursor()
|
||||
|
||||
n = 1000
|
||||
history = bars[:n]
|
||||
new_data = bars[n:]
|
||||
|
||||
widget.update_history(history)
|
||||
|
||||
def update_bar():
|
||||
bar = new_data.pop(0)
|
||||
widget.update_bar(bar)
|
||||
|
||||
timer = QtCore.QTimer()
|
||||
timer.timeout.connect(update_bar)
|
||||
timer.start(100)
|
||||
|
||||
widget.show()
|
||||
app.exec_()
|
@ -8,7 +8,7 @@ from vnpy.trader.ui import MainWindow, create_qapp
|
||||
# from vnpy.gateway.bitmex import BitmexGateway
|
||||
# from vnpy.gateway.futu import FutuGateway
|
||||
# from vnpy.gateway.ib import IbGateway
|
||||
# from vnpy.gateway.ctp import CtpGateway
|
||||
from vnpy.gateway.ctp import CtpGateway
|
||||
# from vnpy.gateway.ctptest import CtptestGateway
|
||||
# from vnpy.gateway.mini import MiniGateway
|
||||
# from vnpy.gateway.femas import FemasGateway
|
||||
@ -44,7 +44,7 @@ def main():
|
||||
main_engine = MainEngine(event_engine)
|
||||
|
||||
# main_engine.add_gateway(BinanceGateway)
|
||||
# main_engine.add_gateway(CtpGateway)
|
||||
main_engine.add_gateway(CtpGateway)
|
||||
# main_engine.add_gateway(CtptestGateway)
|
||||
# main_engine.add_gateway(MiniGateway)
|
||||
# main_engine.add_gateway(FemasGateway)
|
||||
|
2
vnpy/chart/__init__.py
Normal file
2
vnpy/chart/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from .widget import ChartWidget
|
||||
from .item import CandleItem, VolumeItem
|
44
vnpy/chart/axis.py
Normal file
44
vnpy/chart/axis.py
Normal file
@ -0,0 +1,44 @@
|
||||
from typing import List
|
||||
|
||||
import pyqtgraph as pg
|
||||
|
||||
from vnpy.trader.ui import QtGui
|
||||
|
||||
from .manager import BarManager
|
||||
|
||||
|
||||
AXIS_WIDTH = 0.8
|
||||
AXIS_FONT = QtGui.QFont("Arial", 10, QtGui.QFont.Bold)
|
||||
|
||||
|
||||
class DatetimeAxis(pg.AxisItem):
|
||||
""""""
|
||||
|
||||
def __init__(self, manager: BarManager, *args, **kwargs):
|
||||
""""""
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._manager: BarManager = manager
|
||||
|
||||
self.setPen(width=AXIS_WIDTH)
|
||||
self.setStyle(tickFont=AXIS_FONT, autoExpandTextSpace=True)
|
||||
|
||||
def tickStrings(self, values: List[int], scale: float, spacing: int):
|
||||
"""
|
||||
Convert original index to datetime string.
|
||||
"""
|
||||
strings = []
|
||||
|
||||
for ix in values:
|
||||
dt = self._manager.get_datetime(ix)
|
||||
|
||||
if not dt:
|
||||
s = ""
|
||||
elif dt.hour:
|
||||
s = dt.strftime("%Y-%m-%d\n%H:%M:%S")
|
||||
else:
|
||||
s = dt.strftime("%Y-%m-%d")
|
||||
|
||||
strings.append(s)
|
||||
|
||||
return strings
|
15
vnpy/chart/base.py
Normal file
15
vnpy/chart/base.py
Normal file
@ -0,0 +1,15 @@
|
||||
WHITE_COLOR = (255, 255, 255)
|
||||
BLACK_COLOR = (0, 0, 0)
|
||||
GREY_COLOR = (100, 100, 100)
|
||||
|
||||
UP_COLOR = (255, 0, 0)
|
||||
DOWN_COLOR = (0, 255, 50)
|
||||
CURSOR_COLOR = (255, 245, 162)
|
||||
|
||||
PEN_WIDTH = 1
|
||||
BAR_WIDTH = 0.4
|
||||
|
||||
|
||||
def to_int(value: float) -> int:
|
||||
""""""
|
||||
return int(round(value, 0))
|
260
vnpy/chart/item.py
Normal file
260
vnpy/chart/item.py
Normal file
@ -0,0 +1,260 @@
|
||||
from abc import abstractmethod
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
import pyqtgraph as pg
|
||||
|
||||
from vnpy.trader.ui import QtCore, QtGui, QtWidgets
|
||||
from vnpy.trader.object import BarData
|
||||
|
||||
from .base import UP_COLOR, DOWN_COLOR, PEN_WIDTH, BAR_WIDTH
|
||||
from .manager import BarManager
|
||||
|
||||
|
||||
class ChartItem(pg.GraphicsObject):
|
||||
""""""
|
||||
|
||||
def __init__(self, manager: BarManager):
|
||||
""""""
|
||||
super().__init__()
|
||||
|
||||
self._manager: BarManager = manager
|
||||
|
||||
self._bar_picutures: Dict[int, QtGui.QPicture] = {}
|
||||
self._item_picuture: QtGui.QPicture = None
|
||||
|
||||
self._up_pen: QtGui.QPen = pg.mkPen(
|
||||
color=UP_COLOR, width=PEN_WIDTH
|
||||
)
|
||||
self._up_brush: QtGui.QBrush = pg.mkBrush(color=UP_COLOR)
|
||||
|
||||
self._down_pen: QtGui.QPen = pg.mkPen(
|
||||
color=DOWN_COLOR, width=PEN_WIDTH
|
||||
)
|
||||
self._down_brush: QtGui.QBrush = pg.mkBrush(color=DOWN_COLOR)
|
||||
|
||||
@abstractmethod
|
||||
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
|
||||
"""
|
||||
Draw picture for specific bar.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def boundingRect(self) -> QtCore.QRectF:
|
||||
"""
|
||||
Get bounding rectangles for item.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
|
||||
"""
|
||||
Get range of y-axis with given x-axis range.
|
||||
|
||||
If min_ix and max_ix not specified, then return range with whole data set.
|
||||
"""
|
||||
pass
|
||||
|
||||
def update_history(self, history: List[BarData]) -> BarData:
|
||||
"""
|
||||
Update a list of bar data.
|
||||
"""
|
||||
self._bar_picutures.clear()
|
||||
|
||||
bars = self._manager.get_all_bars()
|
||||
for ix, bar in enumerate(bars):
|
||||
bar_picture = self._draw_bar_picture(ix, bar)
|
||||
self._bar_picutures[ix] = bar_picture
|
||||
|
||||
self.update()
|
||||
|
||||
def update_bar(self, bar: BarData) -> BarData:
|
||||
"""
|
||||
Update single bar data.
|
||||
"""
|
||||
ix = self._manager.get_index(bar.datetime)
|
||||
|
||||
bar_picture = self._draw_bar_picture(ix, bar)
|
||||
self._bar_picutures[ix] = bar_picture
|
||||
|
||||
self.update()
|
||||
|
||||
def update(self) -> None:
|
||||
"""
|
||||
Refresh the item.
|
||||
"""
|
||||
if self.scene():
|
||||
self.scene().update()
|
||||
|
||||
def paint(
|
||||
self,
|
||||
painter: QtGui.QPainter,
|
||||
opt: QtWidgets.QStyleOptionGraphicsItem,
|
||||
w: QtWidgets.QWidget
|
||||
):
|
||||
"""
|
||||
Reimplement the paint method of parent class.
|
||||
|
||||
This function is called by external QGraphicsView.
|
||||
"""
|
||||
rect = opt.exposedRect
|
||||
|
||||
min_ix = int(rect.left())
|
||||
max_ix = int(rect.right())
|
||||
max_ix = min(max_ix, len(self._bar_picutures))
|
||||
self._draw_item_picture(min_ix, max_ix)
|
||||
|
||||
self._item_picuture.play(painter)
|
||||
|
||||
def _draw_item_picture(self, min_ix: int, max_ix: int) -> None:
|
||||
"""
|
||||
Draw the picture of item in specific range.
|
||||
"""
|
||||
self._item_picuture = QtGui.QPicture()
|
||||
painter = QtGui.QPainter(self._item_picuture)
|
||||
|
||||
for n in range(min_ix, max_ix):
|
||||
bar_picture = self._bar_picutures[n]
|
||||
bar_picture.play(painter)
|
||||
|
||||
painter.end()
|
||||
|
||||
def clear_all(self) -> None:
|
||||
"""
|
||||
Clear all data in the item.
|
||||
"""
|
||||
self._item_picuture = None
|
||||
self._bar_picutures.clear()
|
||||
self.update()
|
||||
|
||||
|
||||
class CandleItem(ChartItem):
|
||||
""""""
|
||||
|
||||
def __init__(self, manager: BarManager):
|
||||
""""""
|
||||
super().__init__(manager)
|
||||
|
||||
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
|
||||
""""""
|
||||
# Create objects
|
||||
candle_picture = QtGui.QPicture()
|
||||
painter = QtGui.QPainter(candle_picture)
|
||||
|
||||
# Set painter color
|
||||
if bar.close_price >= bar.open_price:
|
||||
painter.setPen(self._up_pen)
|
||||
painter.setBrush(self._up_brush)
|
||||
else:
|
||||
painter.setPen(self._down_pen)
|
||||
painter.setBrush(self._down_brush)
|
||||
|
||||
# Draw candle body
|
||||
if bar.open_price == bar.close_price:
|
||||
painter.drawLine(
|
||||
QtCore.QPointF(ix - BAR_WIDTH, bar.open_price),
|
||||
QtCore.QPointF(ix + BAR_WIDTH, bar.open_price),
|
||||
)
|
||||
else:
|
||||
rect = QtCore.QRectF(
|
||||
ix - BAR_WIDTH,
|
||||
bar.open_price,
|
||||
BAR_WIDTH * 2,
|
||||
bar.close_price - bar.open_price
|
||||
)
|
||||
painter.drawRect(rect)
|
||||
|
||||
# Draw candle shadow
|
||||
body_bottom = min(bar.open_price, bar.close_price)
|
||||
body_top = max(bar.open_price, bar.close_price)
|
||||
|
||||
if bar.low_price < body_bottom:
|
||||
painter.drawLine(
|
||||
QtCore.QPointF(ix, bar.low_price),
|
||||
QtCore.QPointF(ix, body_bottom),
|
||||
)
|
||||
|
||||
if bar.high_price > body_top:
|
||||
painter.drawLine(
|
||||
QtCore.QPointF(ix, bar.high_price),
|
||||
QtCore.QPointF(ix, body_top),
|
||||
)
|
||||
|
||||
# Finish
|
||||
painter.end()
|
||||
return candle_picture
|
||||
|
||||
def boundingRect(self) -> QtCore.QRectF:
|
||||
""""""
|
||||
min_price, max_price = self._manager.get_price_range()
|
||||
rect = QtCore.QRectF(
|
||||
0,
|
||||
min_price,
|
||||
len(self._bar_picutures),
|
||||
max_price - min_price
|
||||
)
|
||||
return rect
|
||||
|
||||
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
|
||||
"""
|
||||
Get range of y-axis with given x-axis range.
|
||||
|
||||
If min_ix and max_ix not specified, then return range with whole data set.
|
||||
"""
|
||||
min_price, max_price = self._manager.get_price_range(min_ix, max_ix)
|
||||
return min_price, max_price
|
||||
|
||||
|
||||
class VolumeItem(ChartItem):
|
||||
""""""
|
||||
|
||||
def __init__(self, manager: BarManager):
|
||||
""""""
|
||||
super().__init__(manager)
|
||||
|
||||
def _draw_bar_picture(self, ix: int, bar: BarData) -> QtGui.QPicture:
|
||||
""""""
|
||||
# Create objects
|
||||
volume_picture = QtGui.QPicture()
|
||||
painter = QtGui.QPainter(volume_picture)
|
||||
|
||||
# Set painter color
|
||||
if bar.close_price >= bar.open_price:
|
||||
painter.setPen(self._up_pen)
|
||||
painter.setBrush(self._up_brush)
|
||||
else:
|
||||
painter.setPen(self._down_pen)
|
||||
painter.setBrush(self._down_brush)
|
||||
|
||||
# Draw volume body
|
||||
rect = QtCore.QRectF(
|
||||
ix - BAR_WIDTH,
|
||||
0,
|
||||
BAR_WIDTH * 2,
|
||||
bar.volume
|
||||
)
|
||||
painter.drawRect(rect)
|
||||
|
||||
# Finish
|
||||
painter.end()
|
||||
return volume_picture
|
||||
|
||||
def boundingRect(self) -> QtCore.QRectF:
|
||||
""""""
|
||||
min_volume, max_volume = self._manager.get_volume_range()
|
||||
rect = QtCore.QRectF(
|
||||
0,
|
||||
min_volume,
|
||||
len(self._bar_picutures),
|
||||
max_volume - min_volume
|
||||
)
|
||||
return rect
|
||||
|
||||
def get_y_range(self, min_ix: int = None, max_ix: int = None) -> Tuple[float, float]:
|
||||
"""
|
||||
Get range of y-axis with given x-axis range.
|
||||
|
||||
If min_ix and max_ix not specified, then return range with whole data set.
|
||||
"""
|
||||
min_volume, max_volume = self._manager.get_volume_range(min_ix, max_ix)
|
||||
return min_volume, max_volume
|
168
vnpy/chart/manager.py
Normal file
168
vnpy/chart/manager.py
Normal file
@ -0,0 +1,168 @@
|
||||
from typing import Dict, List, Tuple
|
||||
from datetime import datetime
|
||||
|
||||
from vnpy.trader.object import BarData
|
||||
|
||||
from .base import to_int
|
||||
|
||||
|
||||
class BarManager:
|
||||
""""""
|
||||
|
||||
def __init__(self):
|
||||
""""""
|
||||
self._bars: Dict[datetime, BarData] = {}
|
||||
self._datetime_index_map: Dict[datetime, int] = {}
|
||||
self._index_datetime_map: Dict[int, datetime] = {}
|
||||
|
||||
self._price_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
|
||||
self._volume_ranges: Dict[Tuple[int, int], Tuple[float, float]] = {}
|
||||
|
||||
def update_history(self, history: List[BarData]) -> None:
|
||||
"""
|
||||
Update a list of bar data.
|
||||
"""
|
||||
# Put all new bars into dict
|
||||
for bar in history:
|
||||
self._bars[bar.datetime] = bar
|
||||
|
||||
# Sort bars dict according to bar.datetime
|
||||
self._bars = dict(sorted(self._bars.items(), key=lambda tp: tp[0]))
|
||||
|
||||
# Update map relationiship
|
||||
ix_list = range(len(self._bars))
|
||||
dt_list = self._bars.keys()
|
||||
|
||||
self._datetime_index_map = dict(zip(dt_list, ix_list))
|
||||
self._index_datetime_map = dict(zip(ix_list, dt_list))
|
||||
|
||||
# Clear data range cache
|
||||
self._clear_cache()
|
||||
|
||||
def update_bar(self, bar: BarData) -> None:
|
||||
"""
|
||||
Update one single bar data.
|
||||
"""
|
||||
dt = bar.datetime
|
||||
|
||||
if dt not in self._datetime_index_map:
|
||||
ix = len(self._bars)
|
||||
self._datetime_index_map[dt] = ix
|
||||
self._index_datetime_map[ix] = dt
|
||||
|
||||
self._bars[dt] = bar
|
||||
|
||||
self._clear_cache()
|
||||
|
||||
def get_count(self) -> int:
|
||||
"""
|
||||
Get total number of bars.
|
||||
"""
|
||||
return len(self._bars)
|
||||
|
||||
def get_index(self, dt: datetime) -> int:
|
||||
"""
|
||||
Get index with datetime.
|
||||
"""
|
||||
return self._datetime_index_map.get(dt, None)
|
||||
|
||||
def get_datetime(self, ix: float) -> datetime:
|
||||
"""
|
||||
Get datetime with index.
|
||||
"""
|
||||
ix = to_int(ix)
|
||||
return self._index_datetime_map.get(ix, None)
|
||||
|
||||
def get_bar(self, ix: float) -> BarData:
|
||||
"""
|
||||
Get bar data with index.
|
||||
"""
|
||||
ix = to_int(ix)
|
||||
dt = self._index_datetime_map.get(ix, None)
|
||||
if not dt:
|
||||
return None
|
||||
|
||||
return self._bars[dt]
|
||||
|
||||
def get_all_bars(self) -> List[BarData]:
|
||||
"""
|
||||
Get all bar data.
|
||||
"""
|
||||
return list(self._bars.values())
|
||||
|
||||
def get_price_range(self, min_ix: float = None, max_ix: float = None) -> Tuple[float, float]:
|
||||
"""
|
||||
Get price range to show within given index range.
|
||||
"""
|
||||
if not self._bars:
|
||||
return 0, 1
|
||||
|
||||
if not min_ix:
|
||||
min_ix = 0
|
||||
max_ix = len(self._bars) - 1
|
||||
else:
|
||||
min_ix = to_int(min_ix)
|
||||
max_ix = to_int(max_ix)
|
||||
max_ix = min(max_ix, self.get_count())
|
||||
|
||||
buf = self._price_ranges.get((min_ix, max_ix), None)
|
||||
if buf:
|
||||
return buf
|
||||
|
||||
bar_list = list(self._bars.values())[min_ix:max_ix + 1]
|
||||
first_bar = bar_list[0]
|
||||
max_price = first_bar.high_price
|
||||
min_price = first_bar.low_price
|
||||
|
||||
for bar in bar_list[1:]:
|
||||
max_price = max(max_price, bar.high_price)
|
||||
min_price = min(min_price, bar.low_price)
|
||||
|
||||
return min_price, max_price
|
||||
|
||||
def get_volume_range(self, min_ix: float = None, max_ix: float = None) -> Tuple[float, float]:
|
||||
"""
|
||||
Get volume range to show within given index range.
|
||||
"""
|
||||
if not self._bars:
|
||||
return 0, 1
|
||||
|
||||
if not min_ix:
|
||||
min_ix = 0
|
||||
max_ix = len(self._bars) - 1
|
||||
else:
|
||||
min_ix = to_int(min_ix)
|
||||
max_ix = to_int(max_ix)
|
||||
max_ix = min(max_ix, self.get_count())
|
||||
|
||||
buf = self._volume_ranges.get((min_ix, max_ix), None)
|
||||
if buf:
|
||||
return buf
|
||||
|
||||
bar_list = list(self._bars.values())[min_ix:max_ix + 1]
|
||||
|
||||
first_bar = bar_list[0]
|
||||
max_volume = first_bar.volume
|
||||
min_volume = 0
|
||||
|
||||
for bar in bar_list[1:]:
|
||||
max_volume = max(max_volume, bar.volume)
|
||||
|
||||
return min_volume, max_volume
|
||||
|
||||
def _clear_cache(self) -> None:
|
||||
"""
|
||||
Clear cached range data.
|
||||
"""
|
||||
self._price_ranges.clear()
|
||||
self._volume_ranges.clear()
|
||||
|
||||
def clear_all(self) -> None:
|
||||
"""
|
||||
Clear all data in manager.
|
||||
"""
|
||||
self._bars.clear()
|
||||
self._datetime_index_map.clear()
|
||||
self._index_datetime_map.clear()
|
||||
|
||||
self._clear_cache()
|
508
vnpy/chart/widget.py
Normal file
508
vnpy/chart/widget.py
Normal file
@ -0,0 +1,508 @@
|
||||
from typing import List, Dict, Type
|
||||
|
||||
import pyqtgraph as pg
|
||||
|
||||
from vnpy.trader.ui import QtGui, QtWidgets, QtCore
|
||||
from vnpy.trader.object import BarData
|
||||
|
||||
from .manager import BarManager
|
||||
from .base import GREY_COLOR, WHITE_COLOR, CURSOR_COLOR, BLACK_COLOR, to_int
|
||||
from .axis import DatetimeAxis, AXIS_FONT
|
||||
from .item import ChartItem
|
||||
|
||||
|
||||
class ChartWidget(pg.PlotWidget):
|
||||
""""""
|
||||
MIN_BAR_COUNT = 100
|
||||
|
||||
def __init__(self, parent: QtWidgets.QWidget = None):
|
||||
""""""
|
||||
super().__init__(parent)
|
||||
|
||||
self._manager: BarManager = BarManager()
|
||||
|
||||
self._plots: Dict[str, ChartItem] = {}
|
||||
self._items: Dict[str, pg.GraphicsObject] = {}
|
||||
self._item_plot_map: Dict[ChartItem, pg.GraphicsObject] = {}
|
||||
|
||||
self._first_plot: pg.PlotItem = None
|
||||
self._cursor: ChartCursor = None
|
||||
|
||||
self._right_ix: int = 0 # Index of most right data
|
||||
self._bar_count: int = self.MIN_BAR_COUNT # Total bar visible in chart
|
||||
|
||||
self._init_ui()
|
||||
|
||||
def _init_ui(self) -> None:
|
||||
""""""
|
||||
self.setWindowTitle("ChartWidget of vn.py")
|
||||
|
||||
self._layout = pg.GraphicsLayout()
|
||||
self._layout.setContentsMargins(10, 10, 10, 10)
|
||||
self._layout.setSpacing(0)
|
||||
self._layout.setBorder(color=GREY_COLOR, width=0.8)
|
||||
self._layout.setZValue(0)
|
||||
self.setCentralItem(self._layout)
|
||||
|
||||
self._x_axis = DatetimeAxis(self._manager, orientation='bottom')
|
||||
|
||||
def add_cursor(self) -> None:
|
||||
""""""
|
||||
if not self._cursor:
|
||||
self._cursor = ChartCursor(self, self._manager, self._plots)
|
||||
|
||||
def add_plot(
|
||||
self,
|
||||
plot_name: str,
|
||||
minimum_height: int = 80,
|
||||
maximum_height: int = None,
|
||||
hide_x_axis: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Add plot area.
|
||||
"""
|
||||
# Create plot object
|
||||
plot = pg.PlotItem(axisItems={'bottom': self._x_axis})
|
||||
plot.setMenuEnabled(False)
|
||||
plot.setClipToView(True)
|
||||
plot.hideAxis('left')
|
||||
plot.showAxis('right')
|
||||
plot.setDownsampling(mode='peak')
|
||||
plot.setRange(xRange=(0, 1), yRange=(0, 1))
|
||||
plot.hideButtons()
|
||||
plot.setMinimumHeight(minimum_height)
|
||||
|
||||
if maximum_height:
|
||||
plot.setMaximumHeight(maximum_height)
|
||||
|
||||
if hide_x_axis:
|
||||
plot.hideAxis("bottom")
|
||||
|
||||
if not self._first_plot:
|
||||
self._first_plot = plot
|
||||
|
||||
# Connect view change signal to update y range function
|
||||
view = plot.getViewBox()
|
||||
view.sigXRangeChanged.connect(self._update_y_range)
|
||||
view.setMouseEnabled(x=True, y=False)
|
||||
|
||||
# Set right axis
|
||||
right_axis = plot.getAxis('right')
|
||||
right_axis.setWidth(60)
|
||||
right_axis.setStyle(tickFont=AXIS_FONT)
|
||||
|
||||
# Connect x-axis link
|
||||
if self._plots:
|
||||
first_plot = list(self._plots.values())[0]
|
||||
plot.setXLink(first_plot)
|
||||
|
||||
# Store plot object in dict
|
||||
self._plots[plot_name] = plot
|
||||
|
||||
def add_item(
|
||||
self,
|
||||
item_class: Type[ChartItem],
|
||||
item_name: str,
|
||||
plot_name: str
|
||||
):
|
||||
"""
|
||||
Add chart item.
|
||||
"""
|
||||
item = item_class(self._manager)
|
||||
self._items[item_name] = item
|
||||
|
||||
plot = self._plots.get(plot_name)
|
||||
plot.addItem(item)
|
||||
self._item_plot_map[item] = plot
|
||||
|
||||
self._layout.nextRow()
|
||||
self._layout.addItem(plot)
|
||||
|
||||
def get_all_plots(self):
|
||||
"""
|
||||
Get all plot objects.
|
||||
"""
|
||||
return self._plots.values()
|
||||
|
||||
def clear_all(self) -> None:
|
||||
"""
|
||||
Clear all data.
|
||||
"""
|
||||
self._manager.clear_all()
|
||||
|
||||
for item in self._items.values():
|
||||
item.clear_all()
|
||||
|
||||
if self._cursor:
|
||||
self._cursor.clear_all()
|
||||
|
||||
def update_history(self, history: List[BarData]) -> None:
|
||||
"""
|
||||
Update a list of bar data.
|
||||
"""
|
||||
self._manager.update_history(history)
|
||||
|
||||
for item in self._items.values():
|
||||
item.update_history(history)
|
||||
|
||||
self._update_plot_limits()
|
||||
|
||||
self.move_to_right()
|
||||
|
||||
def update_bar(self, bar: BarData) -> None:
|
||||
"""
|
||||
Update single bar data.
|
||||
"""
|
||||
self._manager.update_bar(bar)
|
||||
|
||||
for item in self._items.values():
|
||||
item.update_bar(bar)
|
||||
|
||||
self._update_plot_limits()
|
||||
|
||||
if self._right_ix >= (self._manager.get_count() - self._bar_count / 2):
|
||||
self.move_to_right()
|
||||
|
||||
def _update_plot_limits(self) -> None:
|
||||
"""
|
||||
Update the limit of plots.
|
||||
"""
|
||||
for item, plot in self._item_plot_map.items():
|
||||
min_value, max_value = item.get_y_range()
|
||||
|
||||
plot.setLimits(
|
||||
xMin=-1,
|
||||
xMax=self._manager.get_count(),
|
||||
yMin=min_value,
|
||||
yMax=max_value
|
||||
)
|
||||
|
||||
def _update_x_range(self) -> None:
|
||||
"""
|
||||
Update the x-axis range of plots.
|
||||
"""
|
||||
max_ix = self._right_ix
|
||||
min_ix = self._right_ix - self._bar_count
|
||||
|
||||
for plot in self._plots.values():
|
||||
plot.setRange(xRange=(min_ix, max_ix), padding=0)
|
||||
|
||||
def _update_y_range(self) -> None:
|
||||
"""
|
||||
Update the y-axis range of plots.
|
||||
"""
|
||||
view = self._first_plot.getViewBox()
|
||||
view_range = view.viewRange()
|
||||
|
||||
min_ix = max(0, int(view_range[0][0]))
|
||||
max_ix = min(self._manager.get_count(), int(view_range[0][1]))
|
||||
|
||||
# Update limit for y-axis
|
||||
for item, plot in self._item_plot_map.items():
|
||||
y_range = item.get_y_range(min_ix, max_ix)
|
||||
plot.setRange(yRange=y_range)
|
||||
|
||||
def paintEvent(self, event: QtGui.QPaintEvent) -> None:
|
||||
"""
|
||||
Reimplement this method of parent to update current max_ix value.
|
||||
"""
|
||||
view = self._first_plot.getViewBox()
|
||||
view_range = view.viewRange()
|
||||
self._right_ix = max(0, view_range[0][1])
|
||||
|
||||
super().paintEvent(event)
|
||||
|
||||
def keyPressEvent(self, event: QtGui.QKeyEvent) -> None:
|
||||
"""
|
||||
Reimplement this method of parent to move chart horizontally and zoom in/out.
|
||||
"""
|
||||
if event.key() == QtCore.Qt.Key_Left:
|
||||
self._on_key_left()
|
||||
elif event.key() == QtCore.Qt.Key_Right:
|
||||
self._on_key_right()
|
||||
elif event.key() == QtCore.Qt.Key_Up:
|
||||
self._on_key_up()
|
||||
elif event.key() == QtCore.Qt.Key_Down:
|
||||
self._on_key_down()
|
||||
|
||||
def wheelEvent(self, event: QtGui.QWheelEvent) -> None:
|
||||
"""
|
||||
Reimplement this method of parent to zoom in/out.
|
||||
"""
|
||||
delta = event.angleDelta()
|
||||
|
||||
if delta.y() > 0:
|
||||
self._on_key_up()
|
||||
elif delta.y() < 0:
|
||||
self._on_key_down()
|
||||
|
||||
def _on_key_left(self) -> None:
|
||||
"""
|
||||
Move chart to left.
|
||||
"""
|
||||
self._right_ix -= 1
|
||||
self._right_ix = max(self._right_ix, self._bar_count)
|
||||
|
||||
self._update_x_range()
|
||||
self._cursor.move_left()
|
||||
self._cursor.update_info()
|
||||
|
||||
def _on_key_right(self) -> None:
|
||||
"""
|
||||
Move chart to right.
|
||||
"""
|
||||
self._right_ix += 1
|
||||
self._right_ix = min(self._right_ix, self._manager.get_count())
|
||||
|
||||
self._update_x_range()
|
||||
self._cursor.move_right()
|
||||
self._cursor.update_info()
|
||||
|
||||
def _on_key_down(self) -> None:
|
||||
"""
|
||||
Zoom out the chart.
|
||||
"""
|
||||
self._bar_count *= 1.2
|
||||
self._bar_count = min(int(self._bar_count), self._manager.get_count())
|
||||
|
||||
self._update_x_range()
|
||||
self._cursor.update_info()
|
||||
|
||||
def _on_key_up(self) -> None:
|
||||
"""
|
||||
Zoom in the chart.
|
||||
"""
|
||||
self._bar_count /= 1.2
|
||||
self._bar_count = max(int(self._bar_count), self.MIN_BAR_COUNT)
|
||||
|
||||
self._update_x_range()
|
||||
self._cursor.update_info()
|
||||
|
||||
def move_to_right(self) -> None:
|
||||
"""
|
||||
Move chart to the most right.
|
||||
"""
|
||||
self._right_ix = self._manager.get_count()
|
||||
self._update_x_range()
|
||||
self._cursor.update_info()
|
||||
|
||||
|
||||
class ChartCursor(QtCore.QObject):
|
||||
""""""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
widget: ChartWidget,
|
||||
manager: BarManager,
|
||||
plots: Dict[str, pg.GraphicsObject]
|
||||
):
|
||||
""""""
|
||||
super().__init__()
|
||||
|
||||
self._widget: ChartWidget = widget
|
||||
self._manager: BarManager = manager
|
||||
self._plots: Dict[str, pg.GraphicsObject] = plots
|
||||
|
||||
self._x: int = 0
|
||||
self._y: int = 0
|
||||
self._plot_name: str = ""
|
||||
|
||||
self._init_ui()
|
||||
self._connect_signal()
|
||||
|
||||
def _init_ui(self):
|
||||
""""""
|
||||
self._init_line()
|
||||
self._init_label()
|
||||
self._init_info()
|
||||
|
||||
def _init_line(self) -> None:
|
||||
"""
|
||||
Create line objects.
|
||||
"""
|
||||
self._v_lines: Dict[str, pg.InfiniteLine] = {}
|
||||
self._h_lines: Dict[str, pg.InfiniteLine] = {}
|
||||
self._views: Dict[str, pg.ViewBox] = {}
|
||||
|
||||
pen = pg.mkPen(WHITE_COLOR)
|
||||
|
||||
for plot_name, plot in self._plots.items():
|
||||
v_line = pg.InfiniteLine(angle=90, movable=False, pen=pen)
|
||||
h_line = pg.InfiniteLine(angle=0, movable=False, pen=pen)
|
||||
view = plot.getViewBox()
|
||||
|
||||
for line in [v_line, h_line]:
|
||||
line.setZValue(0)
|
||||
line.hide()
|
||||
view.addItem(line)
|
||||
|
||||
self._v_lines[plot_name] = v_line
|
||||
self._h_lines[plot_name] = h_line
|
||||
self._views[plot_name] = view
|
||||
|
||||
def _init_label(self) -> None:
|
||||
"""
|
||||
Create label objects on axis.
|
||||
"""
|
||||
self._y_labels: Dict[str, pg.TextItem] = {}
|
||||
for plot_name, plot in self._plots.items():
|
||||
label = pg.TextItem(plot_name, fill=CURSOR_COLOR, color=BLACK_COLOR)
|
||||
label.hide()
|
||||
label.setZValue(2)
|
||||
plot.addItem(label, ignoreBounds=True)
|
||||
self._y_labels[plot_name] = label
|
||||
|
||||
self._x_label: pg.TextItem = pg.TextItem(
|
||||
"datetime", fill=CURSOR_COLOR, color=BLACK_COLOR)
|
||||
self._x_label.hide()
|
||||
self._x_label.setZValue(2)
|
||||
plot.addItem(self._x_label, ignoreBounds=True)
|
||||
|
||||
def _init_info(self) -> None:
|
||||
"""
|
||||
"""
|
||||
self._info = pg.TextItem("info", color=CURSOR_COLOR)
|
||||
self._info.hide()
|
||||
self._info.setZValue(2)
|
||||
|
||||
plot = list(self._plots.values())[0]
|
||||
plot.addItem(self._info, ignoreBounds=True)
|
||||
|
||||
def _connect_signal(self) -> None:
|
||||
"""
|
||||
Connect mouse move signal to update function.
|
||||
"""
|
||||
self._proxy = pg.SignalProxy(
|
||||
self._widget.scene().sigMouseMoved,
|
||||
rateLimit=360,
|
||||
slot=self._mouse_moved
|
||||
)
|
||||
|
||||
def _mouse_moved(self, evt: tuple) -> None:
|
||||
"""
|
||||
Callback function when mouse is moved.
|
||||
"""
|
||||
if not self._manager.get_count():
|
||||
return
|
||||
|
||||
# First get current mouse point
|
||||
pos = evt[0]
|
||||
for plot_name, view in self._views.items():
|
||||
rect = view.sceneBoundingRect()
|
||||
|
||||
if rect.contains(pos):
|
||||
mouse_point = view.mapSceneToView(pos)
|
||||
self._x = to_int(mouse_point.x())
|
||||
self._y = mouse_point.y()
|
||||
self._plot_name = plot_name
|
||||
break
|
||||
|
||||
# Then update cursor component
|
||||
self._update_line()
|
||||
self._update_label()
|
||||
self.update_info()
|
||||
|
||||
def _update_line(self) -> None:
|
||||
""""""
|
||||
for v_line in self._v_lines.values():
|
||||
v_line.setPos(self._x)
|
||||
v_line.show()
|
||||
|
||||
for plot_name, h_line in self._h_lines.items():
|
||||
if plot_name == self._plot_name:
|
||||
h_line.setPos(self._y)
|
||||
h_line.show()
|
||||
else:
|
||||
h_line.hide()
|
||||
|
||||
def _update_label(self) -> None:
|
||||
""""""
|
||||
bottom_plot = list(self._plots.values())[-1]
|
||||
axis_width = bottom_plot.getAxis("right").width()
|
||||
axis_height = bottom_plot.getAxis("bottom").height()
|
||||
axis_offset = QtCore.QPointF(axis_width, axis_height)
|
||||
|
||||
bottom_view = list(self._views.values())[-1]
|
||||
bottom_right = bottom_view.mapSceneToView(
|
||||
bottom_view.sceneBoundingRect().bottomRight() - axis_offset
|
||||
)
|
||||
|
||||
for plot_name, label in self._y_labels.items():
|
||||
if plot_name == self._plot_name:
|
||||
label.setText(str(self._y))
|
||||
label.show()
|
||||
label.setPos(bottom_right.x(), self._y)
|
||||
else:
|
||||
label.hide()
|
||||
|
||||
dt = self._manager.get_datetime(self._x)
|
||||
if dt:
|
||||
self._x_label.setText(dt.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
self._x_label.show()
|
||||
self._x_label.setPos(self._x, bottom_right.y())
|
||||
self._x_label.setAnchor((0, 0))
|
||||
|
||||
def update_info(self) -> None:
|
||||
""""""
|
||||
bar = self._manager.get_bar(self._x)
|
||||
|
||||
if bar:
|
||||
op = bar.open_price
|
||||
hp = bar.high_price
|
||||
lp = bar.low_price
|
||||
cp = bar.close_price
|
||||
v = bar.volume
|
||||
text = f"(open){op} (high){hp} (low){lp} (close){cp} (volume){v}"
|
||||
else:
|
||||
text = ""
|
||||
|
||||
self._info.setText(text)
|
||||
self._info.show()
|
||||
|
||||
view = list(self._views.values())[0]
|
||||
top_left = view.mapSceneToView(view.sceneBoundingRect().topLeft())
|
||||
self._info.setPos(top_left)
|
||||
|
||||
def move_right(self) -> None:
|
||||
"""
|
||||
Move cursor index to right by 1.
|
||||
"""
|
||||
if self._x == self._manager.get_count() - 1:
|
||||
return
|
||||
self._x += 1
|
||||
|
||||
self._update_after_move()
|
||||
|
||||
def move_left(self) -> None:
|
||||
"""
|
||||
Move cursor index to left by 1.
|
||||
"""
|
||||
if self._x == 0:
|
||||
return
|
||||
self._x -= 1
|
||||
|
||||
self._update_after_move()
|
||||
|
||||
def _update_after_move(self) -> None:
|
||||
"""
|
||||
Update cursor after moved by left/right.
|
||||
"""
|
||||
bar = self._manager.get_bar(self._x)
|
||||
self._y = bar.close_price
|
||||
|
||||
self._update_line()
|
||||
self._update_label()
|
||||
|
||||
def clear_all(self) -> None:
|
||||
"""
|
||||
Clear all data.
|
||||
"""
|
||||
self._x = 0
|
||||
self._y = 0
|
||||
self._plot_name = ""
|
||||
|
||||
for line in self._v_lines + self._h_lines:
|
||||
line.hide()
|
||||
|
||||
for label in self._y_labels + [self._x_label]:
|
||||
label.hide()
|
@ -22,6 +22,9 @@ def extract_vt_symbol(vt_symbol: str):
|
||||
|
||||
|
||||
def generate_vt_symbol(symbol: str, exchange: Exchange):
|
||||
"""
|
||||
return vt_symbol
|
||||
"""
|
||||
return f"{symbol}.{exchange.value}"
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user