Skip to content

Commit

Permalink
Add Cache to avoid frequently loading cache
Browse files Browse the repository at this point in the history
  • Loading branch information
you-n-g committed Dec 22, 2021
1 parent 125922b commit 2b3bfee
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 52 deletions.
14 changes: 13 additions & 1 deletion qlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@

# init qlib
def init(default_conf="client", **kwargs):
"""
Parameters
----------
**kwargs :
clear_mem_cache: str
the default value is True;
Will the memory cache be clear.
It is often used to improve performance when init will be called for multiple times
"""
from .config import C
from .data.cache import H

Expand All @@ -28,7 +38,9 @@ def init(default_conf="client", **kwargs):
logger.warning("Skip initialization because `skip_if_reg is True`")
return

H.clear()
clear_mem_cache = kwargs.pop("clear_mem_cache", True)
if clear_mem_cache:
H.clear()
C.set(default_conf, **kwargs)

# mount nfs
Expand Down
11 changes: 9 additions & 2 deletions qlib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- server
"""
from __future__ import annotations

import os
import re
Expand All @@ -18,7 +19,11 @@
import platform
import multiprocessing
from pathlib import Path
from typing import Union
from typing import Optional, Union
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from qlib.utils.time import Freq


class Config:
Expand Down Expand Up @@ -296,7 +301,9 @@ def get_uri_type(uri: Union[str, Path]):
else:
return QlibConfig.LOCAL_URI

def get_data_uri(self, freq: str = None) -> Path:
def get_data_uri(self, freq: Optional[Union[str, Freq]] = None) -> Path:
if freq is not None:
freq = str(freq) # converting Freq to string
if freq is None or freq not in self.provider_uri:
freq = QlibConfig.DEFAULT_FREQ
_provider_uri = self.provider_uri[freq]
Expand Down
1 change: 0 additions & 1 deletion qlib/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from ..log import get_module_logger
from ..utils.time import Freq
from ..utils.resam import resam_calendar
from .cache import DiskDatasetCache, DiskExpressionCache
from ..utils import (
Wrapper,
Expand Down
58 changes: 34 additions & 24 deletions qlib/data/storage/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from qlib.utils.time import Freq
from qlib.utils.resam import resam_calendar
from qlib.config import C
from qlib.data.cache import H
from qlib.log import get_module_logger
from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT

Expand All @@ -33,15 +34,15 @@ def support_freq(self) -> List[str]:
if hasattr(self, _v):
return getattr(self, _v)
if len(self.provider_uri) == 1 and C.DEFAULT_FREQ in self.provider_uri:
freq = filter(
freq_l = filter(
lambda _freq: not _freq.endswith("_future"),
map(lambda x: x.stem, self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt")),
)
else:
freq = self.provider_uri.keys()
freq = list(freq)
setattr(self, _v, freq)
return freq
freq_l = self.provider_uri.keys()
freq_l = [Freq(freq) for freq in freq_l]
setattr(self, _v, freq_l)
return freq_l

@property
def uri(self) -> Path:
Expand All @@ -65,15 +66,28 @@ def __init__(self, freq: str, future: bool, provider_uri: dict, **kwargs):
super(FileCalendarStorage, self).__init__(freq, future, **kwargs)
self.future = future
self.provider_uri = C.DataPathManager.format_provider_uri(provider_uri)
self.resample_freq = None
self.enable_read_cache = True # TODO: make it configurable

@property
def file_name(self) -> str:
return f"{self.use_freq}_future.txt" if self.future else f"{self.use_freq}.txt".lower()
return f"{self._freq_file}_future.txt" if self.future else f"{self._freq_file}.txt".lower()

@property
def use_freq(self) -> str:
return self.freq if self.resample_freq is None else self.resample_freq
def _freq_file(self) -> str:
"""the freq to read from file"""
if not hasattr(self, "_freq_file_cache"):
freq = Freq(self.freq)
if freq not in self.support_freq:
# NOTE: uri
# 1. If `uri` does not exist
# - Get the `min_uri` of the closest `freq` under the same "directory" as the `uri`
# - Read data from `min_uri` and resample to `freq`

freq = Freq.get_recent_freq(freq, self.support_freq)
if freq is None:
raise ValueError(f"can't find a freq from {self.support_freq} that can resample to {self.freq}!")
self._freq_file_cache = freq
return self._freq_file_cache

def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> List[CalVT]:
if not self.uri.exists():
Expand All @@ -90,25 +104,21 @@ def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"):

@property
def uri(self) -> Path:
freq = self.freq
if freq not in self.support_freq:
# NOTE: uri
# 1. If `uri` does not exist
# - Get the `min_uri` of the closest `freq` under the same "directory" as the `uri`
# - Read data from `min_uri` and resample to `freq`

freq = Freq.get_recent_freq(freq, self.support_freq)
if freq is None:
raise ValueError(f"can't find a freq from {self.support_freq} that can resample to {self.freq}!")
self.resample_freq = freq
return self.dpm.get_data_uri(self.use_freq).joinpath(f"{self.storage_name}s", self.file_name)
return self.dpm.get_data_uri(self._freq_file).joinpath(f"{self.storage_name}s", self.file_name)

@property
def data(self) -> List[CalVT]:
self.check()
_calendar = self._read_calendar()
if self.resample_freq is not None:
_calendar = resam_calendar(np.array(list(map(pd.Timestamp, _calendar))), self.resample_freq, self.freq)
# If cache is enabled, then return cache directly
if self.enable_read_cache:
key = "orig_file" + str(self.uri)
if not key in H["c"]:
H["c"][key] = self._read_calendar()
_calendar = H["c"][key]
else:
_calendar = self._read_calendar()
if Freq(self._freq_file) != Freq(self.freq):
_calendar = resam_calendar(np.array(list(map(pd.Timestamp, _calendar))), self._freq_file, self.freq)
return _calendar

def _get_storage_freq(self) -> List[str]:
Expand Down
26 changes: 13 additions & 13 deletions qlib/utils/resam.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .time import Freq, cal_sam_minute


def resam_calendar(calendar_raw: np.ndarray, freq_raw: str, freq_sam: str) -> np.ndarray:
def resam_calendar(calendar_raw: np.ndarray, freq_raw: Union[str, Freq], freq_sam: Union[str, Freq]) -> np.ndarray:
"""
Resample the calendar with frequency freq_raw into the calendar with frequency freq_sam
Assumption:
Expand All @@ -28,36 +28,36 @@ def resam_calendar(calendar_raw: np.ndarray, freq_raw: str, freq_sam: str) -> np
np.ndarray
The calendar with frequency freq_sam
"""
raw_count, freq_raw = Freq.parse(freq_raw)
sam_count, freq_sam = Freq.parse(freq_sam)
freq_raw = Freq(freq_raw)
freq_sam = Freq(freq_sam)
if not len(calendar_raw):
return calendar_raw

# if freq_sam is xminute, divide each trading day into several bars evenly
if freq_sam == Freq.NORM_FREQ_MINUTE:
if freq_raw != Freq.NORM_FREQ_MINUTE:
if freq_sam.base == Freq.NORM_FREQ_MINUTE:
if freq_raw.base != Freq.NORM_FREQ_MINUTE:
raise ValueError("when sampling minute calendar, freq of raw calendar must be minute or min")
else:
if raw_count > sam_count:
if freq_raw.count > freq_sam.count:
raise ValueError("raw freq must be higher than sampling freq")
_calendar_minute = np.unique(list(map(lambda x: cal_sam_minute(x, sam_count), calendar_raw)))
_calendar_minute = np.unique(list(map(lambda x: cal_sam_minute(x, freq_sam.count), calendar_raw)))
return _calendar_minute

# else, convert the raw calendar into day calendar, and divide the whole calendar into several bars evenly
else:
_calendar_day = np.unique(list(map(lambda x: pd.Timestamp(x.year, x.month, x.day, 0, 0, 0), calendar_raw)))
if freq_sam == Freq.NORM_FREQ_DAY:
return _calendar_day[::sam_count]
if freq_sam.base == Freq.NORM_FREQ_DAY:
return _calendar_day[:: freq_sam.count]

elif freq_sam == Freq.NORM_FREQ_WEEK:
elif freq_sam.base == Freq.NORM_FREQ_WEEK:
_day_in_week = np.array(list(map(lambda x: x.dayofweek, _calendar_day)))
_calendar_week = _calendar_day[np.ediff1d(_day_in_week, to_begin=-1) < 0]
return _calendar_week[::sam_count]
return _calendar_week[:: freq_sam.count]

elif freq_sam == Freq.NORM_FREQ_MONTH:
elif freq_sam.base == Freq.NORM_FREQ_MONTH:
_day_in_month = np.array(list(map(lambda x: x.day, _calendar_day)))
_calendar_month = _calendar_day[np.ediff1d(_day_in_month, to_begin=-1) < 0]
return _calendar_month[::sam_count]
return _calendar_month[:: freq_sam.count]
else:
raise ValueError("sampling freq must be xmin, xd, xw, xm")

Expand Down
43 changes: 32 additions & 11 deletions qlib/utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""
import bisect
from datetime import datetime, time, date
from typing import List, Tuple, Union
from typing import List, Optional, Tuple, Union
import functools
import re

Expand Down Expand Up @@ -69,13 +69,29 @@ class Freq:
NORM_FREQ_MONTH = "month"
NORM_FREQ_WEEK = "week"
NORM_FREQ_DAY = "day"
NORM_FREQ_MINUTE = "minute"
NORM_FREQ_MINUTE = "min" # using min instead of minute for align with Qlib's data filename
SUPPORT_CAL_LIST = [NORM_FREQ_MINUTE, NORM_FREQ_DAY] # FIXME: this list should from data

MIN_CAL = get_min_cal()

def __init__(self, freq: str) -> None:
self.count, self.base = self.parse(freq)
def __init__(self, freq: Union[str, "Freq"]) -> None:
if isinstance(freq, str):
self.count, self.base = self.parse(freq)
elif isinstance(freq, Freq):
self.count, self.base = freq.count, freq.base
else:
raise NotImplementedError(f"This type of input is not supported")

def __eq__(self, freq):
freq = Freq(freq)
return freq.count == self.count and freq.base == self.base

def __str__(self):
# trying to align to the filename of Qlib: day, 30min, 5min, 1min...
return f"{self.count if self.count != 1 or self.base != 'day' else ''}{self.base}"

def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self)})"

@staticmethod
def parse(freq: str) -> Tuple[int, str]:
Expand Down Expand Up @@ -159,14 +175,14 @@ def get_min_delta(left_frq: str, right_freq: str):
Freq.NORM_FREQ_WEEK: 7 * 60 * 24,
Freq.NORM_FREQ_MONTH: 30 * 7 * 60 * 24,
}
left_freq = Freq.parse(left_frq)
left_minutes = left_freq[0] * minutes_map[left_freq[1]]
right_freq = Freq.parse(right_freq)
right_minutes = right_freq[0] * minutes_map[right_freq[1]]
left_freq = Freq(left_frq)
left_minutes = left_freq.count * minutes_map[left_freq.base]
right_freq = Freq(right_freq)
right_minutes = right_freq.count * minutes_map[right_freq.base]
return left_minutes - right_minutes

@staticmethod
def get_recent_freq(base_freq: str, freq_list: List[str]) -> str:
def get_recent_freq(base_freq: Union[str, "Freq"], freq_list: List[Union[str, "Freq"]]) -> Optional["Freq"]:
"""Get the closest freq to base_freq from freq_list
Parameters
Expand All @@ -176,17 +192,22 @@ def get_recent_freq(base_freq: str, freq_list: List[str]) -> str:
Returns
-------
if the recent frequency is found
Freq
else:
None
"""
base_freq = Freq(base_freq)
# use the nearest freq greater than 0
_freq_minutes = []
min_freq = None
for _freq in freq_list:
freq = Freq(_freq)
_min_delta = Freq.get_min_delta(base_freq, _freq)
if _min_delta < 0:
continue
if min_freq is None:
min_freq = (_min_delta, _freq)
min_freq = (_min_delta, str(_freq))
continue
min_freq = min_freq if min_freq[0] <= _min_delta else (_min_delta, _freq)
return min_freq[1] if min_freq else None
Expand Down

0 comments on commit 2b3bfee

Please sign in to comment.