diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index d8422720..96d2b163 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -7,7 +7,10 @@ import traceback import uuid from datetime import datetime from enum import Enum +import platform from multiprocessing.dummy import Pool +from concurrent.futures import ThreadPoolExecutor + from threading import Lock, Thread from types import TracebackType from typing import Any, Callable, List, Optional, Type, Union @@ -138,8 +141,16 @@ class RestClient(object): self.proxies = None + self.thread_executor = ThreadPoolExecutor(max_workers=os.cpu_count() * 20) + self._tasks_lock = Lock() - self._tasks: List[multiprocessing.pool.AsyncResult] = [] + + #self._tasks: List[multiprocessing.pool.AsyncResult] = [] + self._tasks = [] + + self._executor_lock = Lock() + self._executors = [] + self._sessions_lock = Lock() self._sessions: List[requests.Session] = [] @@ -275,20 +286,32 @@ class RestClient(object): extra=extra, client=self, ) - task = pool.apply_async( - self._process_request, - args=[request, ], - callback=self._clean_finished_tasks, - # error_callback=lambda e: self.on_error(type(e), e, e.__traceback__, request), - ) - self._push_task(task) + if str(platform.system()) == 'Windows': + task = pool.apply_async( + self._process_request, + args=[request, ], + callback=self._clean_finished_tasks, + # error_callback=lambda e: self.on_error(type(e), e, e.__traceback__, request), + ) + self._push_task(task) + else: + self._clean_finished_executors() + executor = self.thread_executor.submit(self._process_request, request) + with self._executor_lock: + self._executors.append(executor) + return request def _push_task(self, task): with self._tasks_lock: self._tasks.append(task) - def _clean_finished_tasks(self, result: None): + def _clean_finished_executors(self,): + with self._executor_lock: + not_finished_executors = [i for i in self._executors if not i.done()] + self._executors = not_finished_executors + + def _clean_finished_tasks(self, result = None): with self._tasks_lock: not_finished_tasks = [i for i in self._tasks if not i.ready()] self._tasks = not_finished_tasks