diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 2046a4e79..b9efe4993 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master, V0.9.59 ] + branches: [ master, 'V0.9.60' ] pull_request: branches: [ master ] diff --git a/README.md b/README.md index 8e8a421f8..7fd060dd5 100644 --- a/README.md +++ b/README.md @@ -14,14 +14,14 @@ >源于[缠中说缠博客](http://blog.sina.com.cn/chzhshch),原始博客中的内容不太完整,且没有评论,以下是网友整理的原文备份 * 备份网址1:http://www.fxgan.com +* 备份网址2:https://chzhshch.blog * 已经开始用czsc库进行量化研究的朋友,欢迎[加入飞书群](https://applink.feishu.cn/client/chat/chatter/add_by_link?link_token=0bak668e-7617-452c-b935-94d2c209e6cf),快点击加入吧! * [B站视频教程合集(持续更新...)](https://space.bilibili.com/243682308/channel/series) > 最近在考虑使用 vue + rust + tauri 为 CZSC 开发一个桌面应用,欢迎有兴趣的朋友一起参与。 > 如果你碰巧熟悉 vue、tauri、rust 的使用,欢迎加入我们的开发组,一起为 CZSC 开发一个更好的桌面应用。 -> 有意愿的朋友请联系我,微信号:**zengbin93**,备注:**桌面应用开发**。 -> 我们将为你提供一个更好的量化交易学习和交流平台。 +> 有意愿的朋友请联系我,微信号:**zengbin93**,备注:**桌面应用开发**。我们将为你提供一个更好的量化交易学习和交流平台。 ## 缠论精华 @@ -35,20 +35,6 @@ >边界条件分段后,就要确定一旦发生哪种情况就如何操作,也就是把操作也同样给分段化了。然后,把所有情况交给市场本身,让市场自己去当下选择。 >所有的操作,其实都是根据不同分段边界的一个结果,只是每个人的分段边界不同而已。因此,问题不是去预测什么,而是确定分段边界。 -## 知识星球 - -* [CZSC小圈子(缠论、量化、专享案例)](https://s0cqcxuy3p.feishu.cn/wiki/wikcnwXSk9mWnki1b6URPhLA2Hc) - -* 链接:https://wx.zsxq.com/dweb2/index/group/88851448582512 -* 加入:https://t.zsxq.com/0aMSAqcgO - -> **知识星球【CZSC小圈子】的定位是什么?** -> - 为仔细研读过禅师原文并且愿意使用 CZSC 库进行量化投研的朋友提供一个深入交流的平台。 -> - 寻找一群有能力、有兴趣、有主见的朋友共同进行量化策略研究讨论交流。 -> - 对于刚接触缠论和量化交易的新朋友,给出一些力所能及的帮助(可以在圈子中提问,必回复)。 -> - 2024年,小圈子将提供一些专享内容,主要是使用 czsc 构建量化策略的优质案例。 - - ## 项目贡献 * [择时策略研究框架](https://s0cqcxuy3p.feishu.cn/wiki/wikcnhizrtIOQakwVcZLMKJNaib) diff --git a/czsc/__init__.py b/czsc/__init__.py index 4395bec0a..2c63d06fb 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -15,6 +15,7 @@ from czsc.strategies import CzscStrategyBase, CzscJsonStrategy from czsc.sensors import holds_concepts_effect, CTAResearch, EventMatchSensor from czsc.sensors.feature import FixedNumberSelector +from czsc.utils import ta from czsc.traders import ( CzscTrader, CzscSignals, @@ -40,11 +41,13 @@ get_heartbeat_time, clear_strategy, get_strategy_weights, + get_strategy_latest, OpensOptimize, ExitsOptimize, ) from czsc.utils import ( + timeout_decorator, mac_address, overlap, to_arrow, @@ -209,13 +212,19 @@ monotonicity, min_max_limit, rolling_layers, + cal_symbols_factor, + weights_simple_ensemble, + unify_weights, + sma_long_bear, + dif_long_bear, + tsf_type, ) -__version__ = "0.9.59" +__version__ = "0.9.60" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20240901" +__date__ = "20240918" def welcome(): diff --git a/czsc/eda.py b/czsc/eda.py index ae4a9fd09..016117fc3 100644 --- a/czsc/eda.py +++ b/czsc/eda.py @@ -8,6 +8,8 @@ import loguru import pandas as pd import numpy as np +from typing import Callable +from tqdm import tqdm from sklearn.linear_model import Ridge, LinearRegression, Lasso @@ -222,3 +224,286 @@ def rolling_layers(df, factor, n=5, **kwargs): df.drop([f"{factor}_qcut"], axis=1, inplace=True) return df + + +def cal_yearly_days(dts: list, **kwargs): + """计算年度交易日数量 + + :param dts: list, datetime 列表 + :param kwargs: + :return: int, 年度交易日数量 + """ + logger = kwargs.get("logger", loguru.logger) + + assert len(dts) > 0, "输入的日期数量必须大于0" + + # 将日期列表转换为 DataFrame + dts = pd.DataFrame(dts, columns=["dt"]) + dts["dt"] = pd.to_datetime(dts["dt"]).dt.date + dts = dts.drop_duplicates() + + # 时间跨度小于一年,直接返回252,并警告 + if (dts["dt"].max() - dts["dt"].min()).days < 365: + logger.warning("时间跨度小于一年,直接返回 252") + return 252 + + # 设置索引为日期,并确保索引为 DatetimeIndex + dts.set_index(pd.to_datetime(dts["dt"]), inplace=True) + dts.drop(columns=["dt"], inplace=True) + + # 按年重采样并计算每年的交易日数量,取最大值 + yearly_days = dts.resample('YE').size().max() + return yearly_days + + +def cal_symbols_factor(dfk: pd.DataFrame, factor_function: Callable, **kwargs): + """计算多个品种的标准量价因子 + + :param dfk: 行情数据,N 个品种的行情数据 + :param factor_function: 因子文件,py文件 + :param kwargs: + + - logger: loguru.logger, 默认为 loguru.logger + - factor_params: dict, 因子计算参数 + - min_klines: int, 最小K线数据量,默认为 300 + + :return: dff, pd.DataFrame, 计算后的因子数据 + """ + logger = kwargs.get("logger", loguru.logger) + min_klines = kwargs.get("min_klines", 300) + factor_params = kwargs.get("factor_params", {}) + symbols = dfk["symbol"].unique().tolist() + factor_name = factor_function.__name__ + + rows = [] + for symbol in tqdm(symbols, desc=f"{factor_name} 因子计算"): + try: + df = dfk[(dfk["symbol"] == symbol)].copy() + df = df.sort_values("dt", ascending=True).reset_index(drop=True) + if len(df) < min_klines: + logger.warning(f"{symbol} 数据量过小,跳过;仅有 {len(df)} 条数据,需要 {min_klines} 条数据") + continue + + df = factor_function(df, **factor_params) + df["price"] = df["close"] + df["n1b"] = (df["price"].shift(-1) / df["price"] - 1).fillna(0) + + factor = [x for x in df.columns if x.startswith("F#")][0] + df[factor] = df[factor].replace([np.inf, -np.inf], np.nan).ffill().fillna(0) + if df[factor].var() == 0 or np.isnan(df[factor].var()): + logger.warning(f"{symbol} {factor} var is 0 or nan") + else: + rows.append(df.copy()) + except Exception as e: + logger.error(f"{factor_name} - {symbol} - 计算因子出错:{e}") + + dff = pd.concat(rows, ignore_index=True) + return dff + + +def weights_simple_ensemble(df, weight_cols, method="mean", only_long=False, **kwargs): + """用朴素的方法集成多个策略的权重 + + :param df: pd.DataFrame, 包含多个策略的权重列 + :param weight_cols: list, 权重列名称列表 + :param method: str, 集成方法,可选 mean, vote, sum_clip + + - mean: 平均值 + - vote: 投票 + - sum_clip: 求和并截断 + + :param only_long: bool, 是否只做多 + :param kwargs: dict, 其他参数 + + - clip_min: float, 截断最小值 + - clip_max: float, 截断最大值 + + :return: pd.DataFrame, 添加了 weight 列的数据 + """ + method = method.lower() + + assert all([x in df.columns for x in weight_cols]), f"数据中不包含全部权重列,不包含的列:{set(weight_cols) - set(df.columns)}" + assert 'weight' not in df.columns, "数据中已经包含 weight 列,请先删除,再调用该函数" + + if method == "mean": + df["weight"] = df[weight_cols].mean(axis=1).fillna(0) + + elif method == "vote": + df["weight"] = np.sign(df[weight_cols].sum(axis=1)).fillna(0) + + elif method == "sum_clip": + clip_min = kwargs.get("clip_min", -1) + clip_max = kwargs.get("clip_max", 1) + df["weight"] = df[weight_cols].sum(axis=1).clip(clip_min, clip_max).fillna(0) + + else: + raise ValueError("method 参数错误,可选 mean, vote, sum_clip") + + if only_long: + df["weight"] = np.where(df["weight"] > 0, df["weight"], 0) + + return df + + +def unify_weights(dfw: pd.DataFrame, **kwargs): + """按策略统一权重进行大盘择时交易 + + 在任意时刻 dt,将所有品种的权重通过某种算法合并,然后所有品种都按照这个权重进行操作 + + :param dfw: pd.DataFrame,columns=['symbol', 'weight', 'dt', 'price'],数据样例如下 + + ======== =================== ======== ======= + symbol dt weight price + ======== =================== ======== ======= + IC9001 2017-01-03 00:00:00 -0.82 11113.8 + IC9001 2017-01-04 00:00:00 -0.83 11275.3 + IC9001 2017-01-05 00:00:00 -0.84 11261.1 + ======== =================== ======== ======= + + :param kwargs: dict,其他参数 + + - method: str,权重合并方法,支持 'mean' 和 'sum_clip',默认 'sum_clip' + - copy: bool,是否复制输入数据,默认 True + - clip_min: float,权重合并方法为 'sum_clip' 时,clip 的最小值,默认 -1 + - clip_max: float,权重合并方法为 'sum_clip' 时,clip 的最大值,默认 1 + + :return: pd.DataFrame,columns=['symbol', 'weight', 'dt', 'price'] + + example: + ================ + dfw = ... + wb = czsc.WeightBacktest(dfw, fee_rate=0.0002) + print(wb.stats) + + dfw1 = unify_weights(dfw.copy(), method='mean') + wb1 = czsc.WeightBacktest(dfw1, fee_rate=0.0002) + print(wb1.stats) + + dfw2 = unify_weights(dfw.copy(), method='sum_clip') + wb2 = czsc.WeightBacktest(dfw2, fee_rate=0.0002) + print(wb2.stats) + + # 合并 daily_return,看看是否一致 + dfd1 = wb.daily_return.copy() + dfd2 = wb1.daily_return.copy() + dfd3 = wb2.daily_return.copy() + + dfd = pd.merge(dfd1, dfd2, on='date', how='left', suffixes=('', '_mean')) + dfd = pd.merge(dfd, dfd3, on='date', how='left', suffixes=('', '_sum_clip')) + print(dfd[['total', 'total_mean', 'total_sum_clip']].corr()) + ================ + """ + method = kwargs.get('method', 'sum_clip') + if kwargs.get("copy", True): + dfw = dfw.copy() + + if method == 'mean': + uw = dfw.groupby('dt')['weight'].mean().reset_index() + + elif method == 'sum_clip': + clip_min = kwargs.get('clip_min', -1) + clip_max = kwargs.get('clip_max', 1) + assert clip_min < clip_max, "clip_min should be less than clip_max" + + uw = dfw.groupby('dt')['weight'].sum().reset_index() + uw['weight'] = uw['weight'].clip(clip_min, clip_max) + + else: + raise ValueError(f"method {method} not supported") + + dfw = pd.merge(dfw, uw, on='dt', how='left', suffixes=('_raw', '_unified')) + dfw['weight'] = dfw['weight_unified'].copy() + return dfw + + +def sma_long_bear(df: pd.DataFrame, **kwargs): + """均线牛熊分界指标过滤持仓,close 在长期均线上方为牛市,下方为熊市 + + 牛市只做多,熊市只做空。 + + :param df: DataFrame, 必须包含 dt, close, symbol, weight 列 + :return: DataFrame + """ + assert df["symbol"].nunique() == 1, "数据中包含多个品种,必须单品种" + assert df["dt"].is_monotonic_increasing, "数据未按日期排序,必须升序排列" + assert df["dt"].is_unique, "数据中存在重复dt,必须唯一" + + window = kwargs.get("window", 200) + + if kwargs.get("copy", True): + df = df.copy() + + df['SMA_LB'] = df['close'].rolling(window).mean() + df['raw_weight'] = df['weight'] + df['weight'] = np.where(np.sign(df['close'] - df['SMA_LB']) == np.sign(df['weight']), df['weight'], 0) + return df + + +def dif_long_bear(df: pd.DataFrame, **kwargs): + """DIF牛熊分界指标过滤持仓,DIF 在0上方为牛市,下方为熊市 + + 牛市只做多,熊市只做空。 + + :param df: DataFrame, 必须包含 dt, close, symbol, weight 列 + :return: DataFrame + """ + from czsc.utils.ta import MACD + + assert df["symbol"].nunique() == 1, "数据中包含多个品种,必须单品种" + assert df["dt"].is_monotonic_increasing, "数据未按日期排序,必须升序排列" + assert df["dt"].is_unique, "数据中存在重复dt,必须唯一" + + if kwargs.get("copy", True): + df = df.copy() + + df['DIF_LB'], _, _ = MACD(df['close']) + df['raw_weight'] = df['weight'] + df['weight'] = np.where(np.sign(df['DIF_LB']) == np.sign(df['weight']), df['weight'], 0) + return df + + +def tsf_type(df: pd.DataFrame, factor, n=5, **kwargs): + """时序因子的类型定性分析 + + tsf 是 time series factor 的缩写,时序因子的类型定性分析,是指对某个时序因子进行分层,然后计算每个分层的平均收益, + + :param df: pd.DataFrame, 必须包含 dt, symbol, factor 列,其中 dt 为日期,symbol 为标的代码,factor 为因子值 + :param factor: str, 因子列名 + :param n: int, 分层数量 + :param kwargs: + + - window: int, 窗口大小,默认为600 + - min_periods: int, 最小样本数量,默认为300 + - target: str, 目标列名,默认为 n1b + + :return: str, 返回分层收益排序(从大到小)结果,例如:第01层->第02层->第03层->第04层->第05层 + """ + window = kwargs.get("window", 600) + min_periods = kwargs.get("min_periods", 300) + target = kwargs.get("target", "n1b") + + if target == 'n1b' and 'n1b' not in df.columns: + from czsc.utils.trade import update_nxb + df = update_nxb(df, nseq=(1,)) + + assert target in df.columns, f"数据中不存在 {target} 列" + assert factor in df.columns, f"数据中不存在 {factor} 列" + + rows = [] + for symbol, dfg in df.groupby("symbol"): + dfg = dfg.copy().reset_index(drop=True) + dfg = rolling_layers(dfg, factor, n=n, window=window, min_periods=min_periods) + rows.append(dfg) + + df = pd.concat(rows, ignore_index=True) + layers = [x for x in df[f"{factor}分层"].unique() if x != "第00层" and str(x).endswith("层")] + + # 计算每个分层的平均收益 + layer_returns = {} + for layer in layers: + dfg = df[df[f"{factor}分层"] == layer].copy() + dfg = dfg.groupby("dt")[target].mean().reset_index() + layer_returns[layer] = dfg[target].sum() + + sorted_layers = sorted(layer_returns.items(), key=lambda x: x[1], reverse=True) + return "->".join([f"{x[0]}" for x in sorted_layers]) diff --git a/czsc/objects.py b/czsc/objects.py index 865405596..e0fa525a9 100644 --- a/czsc/objects.py +++ b/czsc/objects.py @@ -289,6 +289,17 @@ def power_volume(self): """成交量力度""" return sum([x.vol for x in self.bars[1:-1]]) + @property + def power_snr(self): + """SNR 度量力度 + + SNR越大,说明内部走势越顺畅,力度也就越大 + """ + close = [x.close for x in self.raw_bars] + abs_diff = [abs(close[i] - close[i - 1]) for i in range(1, len(close))] + snr = self.power_price / sum(abs_diff) + return round(snr, 4) + @property def change(self): """笔的涨跌幅""" diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index 4bbd960fd..6237d9115 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -121,6 +121,9 @@ bar_decision_V240608, bar_decision_V240616, bar_td9_V240616, + bar_volatility_V241013, + bar_zfzd_V241013, + bar_zfzd_V241014, ) from czsc.signals.jcc import ( @@ -216,6 +219,7 @@ tas_dif_zero_V240612, tas_dif_zero_V240614, cci_decision_V240620, + tas_dif_layer_V241010, ) from czsc.signals.pos import ( diff --git a/czsc/signals/bar.py b/czsc/signals/bar.py index 692a8d699..47bbd72ac 100644 --- a/czsc/signals/bar.py +++ b/czsc/signals/bar.py @@ -14,11 +14,9 @@ from deprecated import deprecated from collections import OrderedDict from czsc import envs, CZSC, Signal -from czsc.traders.base import CzscSignals from czsc.objects import RawBar from czsc.utils.sig import check_pressure_support from czsc.signals.tas import update_ma_cache, update_macd_cache -from czsc.utils.bar_generator import freq_end_time from czsc.utils import single_linear, freq_end_time, get_sub_elements, create_single_signal @@ -2266,3 +2264,121 @@ def bar_td9_V240616(c: CZSC, **kwargs) -> OrderedDict: v2 = f"{count}转" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + +def bar_volatility_V241013(c: CZSC, **kwargs) -> OrderedDict: + """波动率分三层 + + 参数模板:"{freq}_波动率分层W{w}N{n}_完全分类V241013" + + **信号逻辑:** + + 波动率分层,要求如下。 + 1. 取最近 n 根K线,计算这 n 根K线的最高价和最低价的差值,记为 r + 2. 取最近 w 根K线,将 r 分为三等分,分别为低波动,中波动,高波动 + + **信号列表:** + + - Signal('60分钟_波动率分层W200N10_完全分类V241013_低波动_任意_任意_0') + - Signal('60分钟_波动率分层W200N10_完全分类V241013_中波动_任意_任意_0') + - Signal('60分钟_波动率分层W200N10_完全分类V241013_高波动_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + w = int(kwargs.get("w", 200)) # K线数量 + n = int(kwargs.get("n", 10)) # 波动率窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_波动率分层W{w}N{n}_完全分类V241013".split("_") + v1 = "其他" + key = f"volatility_{n}" + for bar in c.bars_raw[-n:]: + if key not in bar.cache: + n_max_close = max([x.close for x in c.bars_raw[-n:]]) + n_min_close = min([x.close for x in c.bars_raw[-n:]]) + bar.cache[key] = n_max_close - n_min_close + + if len(c.bars_raw) < w + n + 100: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + vols = [x.cache.get(key, 0) for x in c.bars_raw[-w:]] + try: + v1 = pd.qcut(vols, 3, labels=["低波动", "中波动", "高波动"], duplicates="drop")[-1] + except Exception as e: + v1 = "其他" + logger.info(f"{e}") + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def bar_zfzd_V241013(c: CZSC, **kwargs) -> OrderedDict: + """窄幅震荡形态:窗口内任意两根K线都要有重叠 + + 参数模板:"{freq}_窄幅震荡N{n}_形态V241013" + + **信号逻辑:** + + 1. 取最近 n 根K线,这 n 根K线的 最高价最小值 >= 最低价最大值,即有重叠 + + **信号列表:** + + - Signal('60分钟_窄幅震荡N5_形态V241013_满足_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + n = int(kwargs.get("n", 5)) # 窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_窄幅震荡N{n}_形态V241013".split("_") + v1 = "其他" + + if len(c.bars_raw) < n + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-n:] + zg = min([x.high for x in bars]) + zd = max([x.low for x in bars]) + if zg >= zd: + v1 = "满足" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def bar_zfzd_V241014(c: CZSC, **kwargs) -> OrderedDict: + """窄幅震荡形态 + + 参数模板:"{freq}_窄幅震荡N{n}_形态V241014" + + **信号逻辑:** + + 1. 找到窗口内最大实体的K线,这根K线跟所有的其他K线都有重叠 + 2. 相比于 bar_zfzd_V241013,条件更宽松一些 + + **信号列表:** + + - Signal('60分钟_窄幅震荡N10_形态V241014_满足_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + n = int(kwargs.get("n", 5)) # 窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_窄幅震荡N{n}_形态V241014".split("_") + v1 = "其他" + + if len(c.bars_raw) < n + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-n:] + max_bar = sorted(bars, key=lambda x: x.solid, reverse=True)[0] + + if max_bar.solid > 2 * np.mean([x.solid for x in bars]): + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + if all([min(x.high, max_bar.high) > max(x.low, max_bar.low) for x in bars]): + v1 = "满足" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/czsc/signals/pos.py b/czsc/signals/pos.py index 6d7922575..0dc5220ce 100644 --- a/czsc/signals/pos.py +++ b/czsc/signals/pos.py @@ -131,7 +131,7 @@ def pos_fx_stop_V230414(cat: CzscTrader, **kwargs) -> OrderedDict: def pos_bar_stop_V230524(cat: CzscTrader, **kwargs) -> OrderedDict: """按照开仓点附近的N根K线极值止损 - 参数模板:"{pos_name}_{freq1}N{n}K_止损V2305224" + 参数模板:"{pos_name}_{freq1}N{n}K_止损V230524" **信号逻辑:** @@ -142,8 +142,8 @@ def pos_bar_stop_V230524(cat: CzscTrader, **kwargs) -> OrderedDict: **信号列表:** - - Signal('日线三买多头_日线N3K_止损V2305224_多头止损_任意_任意_0') - - Signal('日线三买多头_日线N3K_止损V2305224_空头止损_任意_任意_0') + - Signal('日线三买多头_日线N3K_止损V230524_多头止损_任意_任意_0') + - Signal('日线三买多头_日线N3K_止损V230524_空头止损_任意_任意_0') :param cat: CzscTrader对象 :param kwargs: 参数字典 @@ -157,7 +157,7 @@ def pos_bar_stop_V230524(cat: CzscTrader, **kwargs) -> OrderedDict: pos_name = kwargs["pos_name"] freq1 = kwargs["freq1"] n = int(kwargs.get("n", 3)) - k1, k2, k3 = f"{pos_name}_{freq1}N{n}K_止损V2305224".split("_") + k1, k2, k3 = f"{pos_name}_{freq1}N{n}K_止损V230524".split("_") v1 = "其他" assert 20 >= n >= 1, "参数 n 取值范围为 1~20" # 如果没有持仓策略,则不产生信号 diff --git a/czsc/signals/tas.py b/czsc/signals/tas.py index 9d9d15b23..4b3003b4c 100644 --- a/czsc/signals/tas.py +++ b/czsc/signals/tas.py @@ -103,9 +103,9 @@ def update_macd_cache(c: CZSC, **kwargs): dif, dea, macd = MACD(close, fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod) for i in range(len(close)): _c = dict(c.bars_raw[i].cache) if c.bars_raw[i].cache else dict() - dif_i = dif[i] if dif[i] else close[i] - dea_i = dea[i] if dea[i] else close[i] - macd_i = dif_i - dea_i + dif_i = dif[i] if dif[i] else 0 + dea_i = dea[i] if dea[i] else 0 + macd_i = macd[i] if macd[i] else 0 _c.update({cache_key: {"dif": dif_i, "dea": dea_i, "macd": macd_i}}) c.bars_raw[i].cache = _c @@ -1900,6 +1900,54 @@ def tas_macd_base_V230320(c: CZSC, **kwargs) -> OrderedDict: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) +def tas_dif_layer_V241010(c: CZSC, **kwargs) -> OrderedDict: + """DIF分三层:零轴附近,上方远离,下方远离 + + 参数模板:"{freq}_DIF分层W{w}T{t}_完全分类V241010" + + **信号逻辑:** + + DIF分层,要求如下。 + + 1,取最近 w 根K线,获取 diffs 序列 + 2. 计算 diffs 的最大绝对值 r,作为波动率的标准 + 3. 如果最近一个 diff 在 t * r 的范围内,则认为是零轴附近 + 4. 如果最近一个 diff > 0 且在 t * r 的范围外,则认为是多头远离;反之,空头远离 + + **信号列表:** + + - Signal('60分钟_DIF分层W100T50_完全分类V241010_零轴附近_任意_任意_0') + - Signal('60分钟_DIF分层W100T50_完全分类V241010_空头远离_任意_任意_0') + - Signal('60分钟_DIF分层W100T50_完全分类V241010_多头远离_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + w = int(kwargs.get("w", 100)) # K线数量 + t = int(kwargs.get("t", 30)) # 零轴附近的阈值,相比与 max(diffs) 的比例 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_DIF分层W{w}T{t}_完全分类V241010".split("_") + v1 = "其他" + key = update_macd_cache(c) + if len(c.bars_raw) < w + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-w:] + diffs = [x.cache[key]["dif"] for x in bars] + + r = max([abs(x) for x in diffs]) / 100 + if diffs[-1] < 0 and abs(diffs[-1]) > r * t: + v1 = "空头远离" + elif diffs[-1] > 0 and abs(diffs[-1]) > r * t: + v1 = "多头远离" + else: + v1 = "零轴附近" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + def update_cci_cache(c: CZSC, **kwargs): """更新CCI缓存 diff --git a/czsc/traders/__init__.py b/czsc/traders/__init__.py index 79858d8ca..09a2294f6 100644 --- a/czsc/traders/__init__.py +++ b/czsc/traders/__init__.py @@ -5,15 +5,23 @@ create_dt: 2021/11/1 22:20 describe: 交易员(traders):使用 CZSC 分析工具进行择时策略的开发,交易等 """ -from czsc.traders.base import ( - CzscSignals, CzscTrader, generate_czsc_signals, check_signals_acc, get_unique_signals -) +from czsc.traders.base import CzscSignals, CzscTrader, generate_czsc_signals, check_signals_acc, get_unique_signals from czsc.traders.performance import ( - PairsPerformance, combine_holds_and_pairs, combine_dates_and_pairs, stock_holds_performance + PairsPerformance, + combine_holds_and_pairs, + combine_dates_and_pairs, + stock_holds_performance, ) from czsc.traders.dummy import DummyBacktest from czsc.traders.sig_parse import SignalsParser, get_signals_config, get_signals_freqs from czsc.traders.weight_backtest import WeightBacktest, get_ensemble_weight, long_short_equity, stoploss_by_direction -from czsc.traders.rwc import RedisWeightsClient, get_strategy_mates, get_heartbeat_time, clear_strategy, get_strategy_weights +from czsc.traders.rwc import ( + RedisWeightsClient, + get_strategy_mates, + get_heartbeat_time, + clear_strategy, + get_strategy_weights, + get_strategy_latest, +) from czsc.traders.optimize import OpensOptimize, ExitsOptimize diff --git a/czsc/traders/rwc.py b/czsc/traders/rwc.py index df9b04b00..857404c40 100644 --- a/czsc/traders/rwc.py +++ b/czsc/traders/rwc.py @@ -603,7 +603,6 @@ def get_strategy_names(redis_url=None, connection_pool=None, key_prefix="Weights :param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取 :param connection_pool: redis.ConnectionPool, redis连接池 :param key_prefix: str, redis中key的前缀,默认为 Weights - :return: list, 所有策略名 """ @@ -615,3 +614,35 @@ def get_strategy_names(redis_url=None, connection_pool=None, key_prefix="Weights rs = r.smembers(f"{key_prefix}:StrategyNames") return list(rs) + + +def get_strategy_latest(redis_url=None, connection_pool=None, key_prefix="Weights"): + """获取所有策略的最新持仓 + + :param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取 + :param connection_pool: redis.ConnectionPool, redis连接池 + :param key_prefix: str, redis中key的前缀,默认为 Weights + :return: pd.DataFrame, 最新持仓数据 + """ + if connection_pool: + r = redis.Redis(connection_pool=connection_pool) + else: + redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL") + r = redis.Redis.from_url(redis_url, decode_responses=True) + + keys = r.keys(f"{key_prefix}:*:LAST") + pipeline = r.pipeline() + for key in keys: + pipeline.hgetall(key) + fetched_data = pipeline.execute() + + results = [] + for key, data in zip(keys, fetched_data): + strategy = key.split(":")[1] + data["strategy"] = strategy + results.append(data) + + df = pd.DataFrame(results) + df["weight"] = df["weight"].astype(float) + df = df[["dt", "strategy", "symbol", "weight", "update_time"]] + return df diff --git a/czsc/traders/weight_backtest.py b/czsc/traders/weight_backtest.py index d0d65ab34..2052507ee 100644 --- a/czsc/traders/weight_backtest.py +++ b/czsc/traders/weight_backtest.py @@ -554,6 +554,7 @@ def backtest(self, n_jobs=1): dret = pd.concat([v["daily"] for k, v in res.items() if k in symbols], ignore_index=True) dret = pd.pivot_table(dret, index="date", columns="symbol", values="return").fillna(0) dret["total"] = dret[list(res.keys())].mean(axis=1) + # dret 中的 date 对应的是上一日;date 后移一位,对应的才是当日收益 dret = dret.round(4).reset_index() res["品种等权日收益"] = dret @@ -570,6 +571,8 @@ def backtest(self, n_jobs=1): stats.update({"多头占比": round(long_rate, 4), "空头占比": round(short_rate, 4)}) alpha = self.alpha.copy() + stats["波动比"] = round(alpha["策略"].std() / alpha["基准"].std(), 4) + stats["与基准波动相关性"] = round(alpha["策略"].corr(alpha["基准"].abs()), 4) stats["与基准相关性"] = round(alpha["策略"].corr(alpha["基准"]), 4) alpha_short = alpha[alpha["基准"] < 0].copy() stats["与基准空头相关性"] = round(alpha_short["策略"].corr(alpha_short["基准"]), 4) diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index 4d93a3000..bed005841 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -1,7 +1,10 @@ # coding: utf-8 import os +import functools +import threading import pandas as pd from typing import List, Union +from loguru import logger from . import qywx from . import ta @@ -226,3 +229,39 @@ def to_arrow(df: pd.DataFrame): with pa.ipc.new_file(sink, table.schema) as writer: writer.write_table(table) return sink.getvalue() + + +def timeout_decorator(timeout): + """Timeout decorator using threading + + :param timeout: int, timeout duration in seconds + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + result = [None] + exception = [None] + + def target(): + try: + result[0] = func(*args, **kwargs) + except Exception as e: + exception[0] = e + + thread = threading.Thread(target=target) + thread.start() + thread.join(timeout) + + if thread.is_alive(): + logger.warning(f"{func.__name__} timed out after {timeout} seconds; args: {args}; kwargs: {kwargs}") + return None + + if exception[0]: + raise exception[0] + + return result[0] + + return wrapper + + return decorator diff --git a/czsc/utils/minutes_split.feather b/czsc/utils/minutes_split.feather index 7999c167b..6062084fa 100644 Binary files a/czsc/utils/minutes_split.feather and b/czsc/utils/minutes_split.feather differ diff --git a/czsc/utils/ta.py b/czsc/utils/ta.py index 4430eb1de..e95637fe8 100644 --- a/czsc/utils/ta.py +++ b/czsc/utils/ta.py @@ -4,9 +4,14 @@ email: zeng_bin8888@163.com create_dt: 2022/1/24 15:01 describe: 常用技术分析指标 + +参考链接: +1. https://github.com/twopirllc/pandas-ta + """ import numpy as np import pandas as pd +import pandas_ta def SMA(close: np.array, timeperiod=5): @@ -30,6 +35,26 @@ def SMA(close: np.array, timeperiod=5): return np.array(res, dtype=np.double).round(4) +def WMA(close: np.array, timeperiod=5): + """加权移动平均 + + :param close: np.array + 收盘价序列 + :param timeperiod: int + 均线参数 + :return: np.array + """ + res = [] + for i in range(len(close)): + if i < timeperiod: + res.append(np.nan) + continue + + seq = close[i - timeperiod + 1 : i + 1] + res.append(np.average(seq, weights=range(1, len(seq) + 1))) + return np.array(res, dtype=np.double).round(4) + + def EMA(close: np.array, timeperiod=5): """ https://baike.baidu.com/item/EMA/12646151 @@ -50,12 +75,12 @@ def EMA(close: np.array, timeperiod=5): return np.array(res, dtype=np.double).round(4) -def MACD(close: np.array, fastperiod=12, slowperiod=26, signalperiod=9): +def MACD(real: np.array, fastperiod=12, slowperiod=26, signalperiod=9): """MACD 异同移动平均线 https://baike.baidu.com/item/MACD%E6%8C%87%E6%A0%87/6271283 - :param close: np.array - 收盘价序列 + :param real: np.array + 价格序列 :param fastperiod: int 快周期,默认值 12 :param slowperiod: int @@ -65,8 +90,8 @@ def MACD(close: np.array, fastperiod=12, slowperiod=26, signalperiod=9): :return: (np.array, np.array, np.array) diff, dea, macd """ - ema12 = EMA(close, timeperiod=fastperiod) - ema26 = EMA(close, timeperiod=slowperiod) + ema12 = EMA(real, timeperiod=fastperiod) + ema26 = EMA(real, timeperiod=slowperiod) diff = ema12 - ema26 dea = EMA(diff, timeperiod=signalperiod) macd = (diff - dea) * 2 @@ -127,15 +152,15 @@ def RSQ(close: [np.array, list]) -> float: """ x = list(range(len(close))) y = np.array(close) - x_squred_sum = sum([x1 * x1 for x1 in x]) + x_squared_sum = sum([x1 * x1 for x1 in x]) xy_product_sum = sum([x[i] * y[i] for i in range(len(x))]) num = len(x) x_sum = sum(x) y_sum = sum(y) - delta = float(num * x_squred_sum - x_sum * x_sum) + delta = float(num * x_squared_sum - x_sum * x_sum) if delta == 0: return 0 - y_intercept = (1 / delta) * (x_squred_sum * y_sum - x_sum * xy_product_sum) + y_intercept = (1 / delta) * (x_squared_sum * y_sum - x_sum * xy_product_sum) slope = (1 / delta) * (num * xy_product_sum - x_sum * y_sum) y_mean = np.mean(y) @@ -146,7 +171,7 @@ def RSQ(close: [np.array, list]) -> float: return round(rsq, 4) -def plus_di(high, low, close, timeperiod=14): +def PLUS_DI(high, low, close, timeperiod=14): """ Calculate Plus Directional Indicator (PLUS_DI) manually. @@ -179,7 +204,7 @@ def plus_di(high, low, close, timeperiod=14): return plus_di_ -def minus_di(high, low, close, timeperiod=14): +def MINUS_DI(high, low, close, timeperiod=14): """ Calculate Minus Directional Indicator (MINUS_DI) manually. @@ -213,7 +238,7 @@ def minus_di(high, low, close, timeperiod=14): return minus_di_ -def atr(high, low, close, timeperiod=14): +def ATR(high, low, close, timeperiod=14): """ Calculate Average True Range (ATR). @@ -234,7 +259,6 @@ def atr(high, low, close, timeperiod=14): # Calculate ATR atr_ = tr.rolling(window=timeperiod).mean() - return atr_ @@ -338,3 +362,505 @@ def LINEARREG_ANGLE(real, timeperiod=14): angles[today] = np.arctan(m) * (180.0 / np.pi) return angles + + +def DOUBLE_SMA_LS(series: pd.Series, n=5, m=20, **kwargs): + """双均线多空 + + :param series: str, 数据源字段 + :param n: int, 短周期 + :param m: int, 长周期 + """ + assert n < m, "短周期必须小于长周期" + return np.sign(series.rolling(window=n).mean() - series.rolling(window=m).mean()).fillna(0) + + +def BOLL_LS(series: pd.Series, n=5, s=0.1, **kwargs): + """布林线多空 + + series 大于 n 周期均线 + s * n周期标准差,做多;小于 n 周期均线 - s * n周期标准差,做空 + + :param series: str, 数据源字段 + :param n: int, 短周期 + :param s: int, 波动率的倍数,默认为 0.1 + """ + sm = series.rolling(window=n).mean() + sd = series.rolling(window=n).std() + return np.where(series > sm + s * sd, 1, np.where(series < sm - s * sd, -1, 0)) + + +def SMA_MIN_MAX_SCALE(series: pd.Series, timeperiod=5, window=5, **kwargs): + """均线的最大最小值归一化 + + :param series: str, 数据源字段 + :param timeperiod: int, 均线周期 + :param window: int, 窗口大小 + """ + sm = series.rolling(window=timeperiod).mean() + sm_min = sm.rolling(window=window).min() + sm_max = sm.rolling(window=window).max() + res = (sm - sm_min) / (sm_max - sm_min) + res = res.fillna(0) * 2 - 1 + return res + + +def RS_VOLATILITY(df: pd.DataFrame, timeperiod=30, **kwargs): + """RS 波动率,值越大,波动越大 + + :param df: str, 标准K线数据 + :param timeperiod: int, 周期 + """ + log_h_c = np.log(df["high"] / df["close"]) + log_h_o = np.log(df["high"] / df["open"]) + log_l_c = np.log(df["low"] / df["close"]) + log_l_o = np.log(df["low"] / df["open"]) + + x = log_h_c * log_h_o + log_l_c * log_l_o + res = np.sqrt(x.rolling(window=timeperiod).mean()) + return res + + +def PK_VOLATILITY(df: pd.DataFrame, timeperiod=30, **kwargs): + """PK 波动率,值越大,波动越大 + + :param df: str, 标准K线数据 + :param timeperiod: int, 周期 + """ + log_h_l = np.log(df["high"] / df["low"]).pow(2) + log_hl_mean = log_h_l.rolling(window=timeperiod).sum() / (4 * timeperiod * np.log(2)) + res = np.sqrt(log_hl_mean) + return res + + +def SNR(real: pd.Series, timeperiod=14, **kwargs): + """信噪比(Signal Noise Ratio,SNR)""" + return real.diff(timeperiod) / real.diff().abs().rolling(window=timeperiod).sum() + + +try: + import talib as ta + + SMA = ta.SMA + EMA = ta.EMA + MACD = ta.MACD + PPO = ta.PPO + ATR = ta.ATR + PLUS_DI = ta.PLUS_DI + MINUS_DI = ta.MINUS_DI + MFI = ta.MFI + CCI = ta.CCI + BOLL = ta.BBANDS + RSI = ta.RSI + ADX = ta.ADX + ADXR = ta.ADXR + AROON = ta.AROON + AROONOSC = ta.AROONOSC + ROCR = ta.ROCR + ROCR100 = ta.ROCR100 + TRIX = ta.TRIX + ULTOSC = ta.ULTOSC + WILLR = ta.WILLR + LINEARREG = ta.LINEARREG + LINEARREG_ANGLE = ta.LINEARREG_ANGLE + LINEARREG_INTERCEPT = ta.LINEARREG_INTERCEPT + LINEARREG_SLOPE = ta.LINEARREG_SLOPE + + KAMA = ta.KAMA + STOCH = ta.STOCH + STOCHF = ta.STOCHF + STOCHRSI = ta.STOCHRSI + T3 = ta.T3 + TEMA = ta.TEMA + TRIMA = ta.TRIMA + WMA = ta.WMA + BBANDS = ta.BBANDS + DEMA = ta.DEMA + HT_TRENDLINE = ta.HT_TRENDLINE + + BOP = ta.BOP + CMO = ta.CMO + DX = ta.DX + BETA = ta.BETA + + +except ImportError: + print( + f"ta-lib 没有正确安装,将使用自定义分析函数。建议安装 ta-lib,可以大幅提升计算速度。" + f"请参考安装教程 https://blog.csdn.net/qaz2134560/article/details/98484091" + ) + + +def CHOP(high, low, close, **kwargs): + """Choppiness Index + + 为了确定市场当前是否在波动或趋势中,可以使用波动指数。波动指数是由澳大利亚大宗商品交易员 Bill Dreiss 开发的波动率指标。 + 波动指数不是为了预测未来的市场方向,而是用于量化当前市场的“波动”。波动的市场是指价格大幅上下波动的市场。 + 波动指数的值在 100 和 0 之间波动。值越高,市场波动性越高。 + + Sources: + https://www.tradingview.com/scripts/choppinessindex/ + https://www.motivewave.com/studies/choppiness_index.htm + + Calculation: + Default Inputs: + length=14, scalar=100, drift=1 + + HH = high.rolling(length).max() + LL = low.rolling(length).min() + ATR_SUM = SUM(ATR(drift), length) + CHOP = scalar * (LOG10(ATR_SUM) - LOG10(HH - LL)) / LOG10(length) + + :param high: pd.Series, Series of 'high's + :param low: pd.Series, Series of 'low's + :param close: pd.Series, Series of 'close's + :param kwargs: dict, Additional arguments + + - length (int): It's period. Default: 14 + - atr_length (int): Length for ATR. Default: 1 + - ln (bool): If True, uses ln otherwise log10. Default: False + - scalar (float): How much to magnify. Default: 100 + - drift (int): The difference period. Default: 1 + - offset (int): How many periods to offset the result. Default: 0 + - fillna (value): pd.DataFrame.fillna(value) + - fill_method (value): Type of fill method + + :return: pd.Series, New feature generated. + """ + return pandas_ta.chop(high=high, low=low, close=close, **kwargs) + + +def SNR(real: pd.Series, timeperiod=14, **kwargs): + """信噪比(Signal Noise Ratio,SNR)""" + return real.diff(timeperiod).abs() / real.diff().abs().rolling(window=timeperiod).sum() + + +def rolling_polyfit(real: pd.Series, window=20, degree=1): + """滚动多项式拟合系数 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + :param degree: int, 多项式次数 + """ + res = real.rolling(window=window).apply(lambda x: np.polyfit(range(len(x)), x, degree)[0], raw=True) + return res + + +def rolling_auto_corr(real: pd.Series, window=20, lag=1): + """滚动自相关系数 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + :param lag: int, 滞后期 + """ + res = real.rolling(window=window).apply(lambda x: x.autocorr(lag), raw=True) + return res + + +def rolling_ptp(real: pd.Series, window=20): + """滚动极差 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = real.rolling(window=window).apply(lambda x: np.max(x) - np.min(x), raw=True) + return res + + +def rolling_skew(real: pd.Series, window=20): + """滚动偏度 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = real.rolling(window=window).skew() + return res + + +def rolling_kurt(real: pd.Series, window=20): + """滚动峰度 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = real.rolling(window=window).kurt() + return res + + +def rolling_corr(x: pd.Series, y: pd.Series, window=20): + """滚动相关系数 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).corr(y) + return res + + +def rolling_cov(x: pd.Series, y: pd.Series, window=20): + """滚动协方差 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).cov(y) + return res + + +def rolling_beta(x: pd.Series, y: pd.Series, window=20): + """滚动贝塔系数 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = rolling_cov(x, y, window) / rolling_cov(y, y, window) + return res + + +def rolling_alpha(x: pd.Series, y: pd.Series, window=20): + """滚动阿尔法系数 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).mean() - rolling_beta(x, y, window) * y.rolling(window=window).mean() + return res + + +def rolling_rsq(x: pd.Series, window=20): + """滚动拟合优度 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).apply(lambda x1: RSQ(x1), raw=True) + return res + + +def rolling_argmax(x: pd.Series, window=20): + """滚动最大值位置 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).apply(lambda x1: np.argmax(x1), raw=True) + return res + + +def rolling_argmin(x: pd.Series, window=20): + """滚动最小值位置 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).apply(lambda x1: np.argmin(x1), raw=True) + return res + + +def rolling_ir(x: pd.Series, window=20): + """滚动信息系数 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).mean() / x.rolling(window=window).std().replace(0, np.nan) + return res + + +def rolling_zscore(x: pd.Series, window=20): + """滚动标准化 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = (x - x.rolling(window=window).mean()) / x.rolling(window=window).std().replace(0, np.nan) + return res + + +def rolling_rank(x: pd.Series, window=20): + """滚动排名 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).rank(pct=True, ascending=True, method="first") + return res + + +def rolling_max(x: pd.Series, window=20): + """滚动最大值 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).max() + return res + + +def rolling_min(x: pd.Series, window=20): + """滚动最小值 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).min() + return res + + +def rolling_mdd(x: pd.Series, window=20): + """滚动最大回撤 + + :param x: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = x.rolling(window=window).apply(lambda x1: 1 - (x1 / np.maximum.accumulate(x1)).min(), raw=True) + return res + + +def rolling_rank_sub(x: pd.Series, y: pd.Series, window=20): + """滚动排名差 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = rolling_rank(x, window) - rolling_rank(y, window) + return res + + +def rolling_rank_div(x: pd.Series, y: pd.Series, window=20): + """滚动排名比 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = rolling_rank(x, window) / rolling_rank(y, window) + return res + + +def rolling_rank_mul(x: pd.Series, y: pd.Series, window=20): + """滚动排名乘 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = rolling_rank(x, window) * rolling_rank(y, window) + return res + + +def rolling_rank_sum(x: pd.Series, y: pd.Series, window=20): + """滚动排名和 + + :param x: pd.Series, 数据源 + :param y: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = rolling_rank(x, window) + rolling_rank(y, window) + return res + + +def rolling_vwap(close: pd.Series, volume: pd.Series, window=20): + """滚动成交量加权平均价格 + + :param close: pd.Series, 收盘价 + :param volume: pd.Series, 成交量 + :param window: int, 窗口大小 + """ + res = (close * volume).rolling(window=window).sum() / volume.rolling(window=window).sum().replace(0, np.nan) + return res + + +def rolling_obv(close: pd.Series, volume: pd.Series, window=200): + """滚动能量潮 + + :param close: pd.Series, 收盘价 + :param volume: pd.Series, 成交量 + :param window: int, 窗口大小 + """ + res = np.where(close.diff() > 0, volume, np.where(close.diff() < 0, -volume, 0)) + res = res.rolling(window=window).sum() + return res + + +def rolling_pvt(close: pd.Series, volume: pd.Series, window=20): + """滚动价格成交量趋势 + + :param close: pd.Series, 收盘价 + :param volume: pd.Series, 成交量 + :param window: int, 窗口大小 + """ + res = ((close.diff() / close.shift(1)) * volume).rolling(window=window).sum() + return res + + +def rolling_pvi(close: pd.Series, volume: pd.Series, window=20): + """滚动正量指标 + + :param close: pd.Series, 收盘价 + :param volume: pd.Series, 成交量 + :param window: int, 窗口大小 + """ + res = np.where(close.diff() > 0, volume, 0).rolling(window=window).sum() + return res + + +def rolling_std(real: pd.Series, window=20): + """滚动标准差 + + :param real: pd.Series, 数据源 + :param window: int, 窗口大小 + """ + res = real.rolling(window=window).std() + return res + + +def ultimate_smoother(price, period: int = 7): + """Ultimate Smoother + + https://www.95sca.cn/archives/111068 + + 终极平滑器(Ultimate Smoother)是由交易系统和算法交易策略开发者John Ehlers设计的 + 一种技术分析指标,它是一种趋势追踪指标,用于识别股票价格的趋势。 + + :param price: np.array, 价格序列 + :param period: int, 周期 + :return: + """ + # 初始化变量 + a1 = np.exp(-1.414 * np.pi / period) + b1 = 2 * a1 * np.cos(1.414 * 180 / period) + c2 = b1 + c3 = -a1 * a1 + c1 = (1 + c2 - c3) / 4 + + # 准备输出结果的序列 + us = np.zeros(len(price)) + + # 计算 Ultimate Smoother + for i in range(len(price)): + if i < 4: + us[i] = price[i] + else: + us[i] = ( + (1 - c1) * price[i] + + (2 * c1 - c2) * price[i - 1] + - (c1 + c3) * price[i - 2] + + c2 * us[i - 1] + + c3 * us[i - 2] + ) + return us + + +def sigmoid(x): + """Sigmoid 函数""" + return 1 / (1 + np.exp(-x)) + + +def log_return(x): + """对数收益率""" + return np.log(x / x.shift(1)) diff --git a/czsc/utils/ta1.py b/czsc/utils/ta1.py deleted file mode 100644 index 63bd47542..000000000 --- a/czsc/utils/ta1.py +++ /dev/null @@ -1,86 +0,0 @@ -# coding: utf-8 -""" - -常用技术分析指标:MA, MACD, BOLL - -使用 ta-lib 可以大幅提升计算性能,10倍左右 -""" -import numpy as np -import talib as ta - -SMA = ta.SMA -EMA = ta.EMA -MACD = ta.MACD - - -def KDJ(close: np.array, high: np.array, low: np.array): - """ - - :param close: 收盘价序列 - :param high: 最高价序列 - :param low: 最低价序列 - :return: - """ - n = 9 - hv = [] - lv = [] - for i in range(len(close)): - if i < n: - h_ = high[0: i+1] - l_ = low[0: i+1] - else: - h_ = high[i - n + 1: i + 1] - l_ = low[i - n + 1: i + 1] - hv.append(max(h_)) - lv.append(min(l_)) - - hv = np.around(hv, decimals=2) - lv = np.around(lv, decimals=2) - rsv = np.where(hv == lv, 0, (close - lv) / (hv - lv) * 100) - - k = [] - d = [] - j = [] - for i in range(len(rsv)): - if i < n: - k_ = rsv[i] - d_ = k_ - else: - k_ = (2 / 3) * k[i-1] + (1 / 3) * rsv[i] - d_ = (2 / 3) * d[i-1] + (1 / 3) * k_ - - k.append(k_) - d.append(d_) - j.append(3 * k_ - 2 * d_) - - k = np.array(k, dtype=np.double) - d = np.array(d, dtype=np.double) - j = np.array(j, dtype=np.double) - return k, d, j - - -def RSQ(close: [np.array, list]) -> float: - """拟合优度 R SQuare - - :param close: 收盘价序列 - :return: - """ - x = list(range(len(close))) - y = np.array(close) - x_squred_sum = sum([x1 * x1 for x1 in x]) - xy_product_sum = sum([x[i] * y[i] for i in range(len(x))]) - num = len(x) - x_sum = sum(x) - y_sum = sum(y) - delta = float(num * x_squred_sum - x_sum * x_sum) - if delta == 0: - return 0 - y_intercept = (1 / delta) * (x_squred_sum * y_sum - x_sum * xy_product_sum) - slope = (1 / delta) * (num * xy_product_sum - x_sum * y_sum) - - y_mean = np.mean(y) - ss_tot = sum([(y1 - y_mean) * (y1 - y_mean) for y1 in y]) + 0.00001 - ss_err = sum([(y[i] - slope * x[i] - y_intercept) * (y[i] - slope * x[i] - y_intercept) for i in range(len(x))]) - rsq = 1 - ss_err / ss_tot - - return round(rsq, 4) diff --git a/examples/develop/bar_end_time.py b/examples/develop/bar_end_time.py index 82149d15b..f736113a1 100644 --- a/examples/develop/bar_end_time.py +++ b/examples/develop/bar_end_time.py @@ -59,31 +59,9 @@ def split_time(freq="60分钟"): return dfx -rows = [] -for freq in ( - "1分钟", - "2分钟", - "3分钟", - "4分钟", - "5分钟", - "6分钟", - "10分钟", - "12分钟", - "15分钟", - "20分钟", - "30分钟", - "60分钟", - "120分钟", -): - rows.append(split_time(freq)) - -df = pd.concat(rows, ignore_index=True) -df = pd.pivot_table(df, index="time", columns="freq", values="edt", aggfunc="first").reset_index() -df["market"] = "默认" -df = df[ - [ - "market", - "time", +def main(): + rows = [] + for freq in ( "1分钟", "2分钟", "3分钟", @@ -97,9 +75,40 @@ def split_time(freq="60分钟"): "30分钟", "60分钟", "120分钟", + ): + rows.append(split_time(freq)) + + df = pd.concat(rows, ignore_index=True) + df = pd.pivot_table(df, index="time", columns="freq", values="edt", aggfunc="first").reset_index() + df["market"] = "默认" + df = df[ + [ + "market", + "time", + "1分钟", + "2分钟", + "3分钟", + "4分钟", + "5分钟", + "6分钟", + "10分钟", + "12分钟", + "15分钟", + "20分钟", + "30分钟", + "60分钟", + "120分钟", + ] ] -] -df.to_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx", index=False) + df.to_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx", index=False) -df = pd.read_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx") -df.to_feather(r"D:\ZB\git_repo\waditu\czsc\czsc\utils\minites_split.feather") + df = pd.read_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx") + df.to_feather(r"D:\ZB\git_repo\waditu\czsc\czsc\utils\minites_split.feather") + + +def fix_error(): + df = pd.read_feather(r"A:\ZB\git_repo\waditu\czsc\czsc\utils\minutes_split.feather") + df.to_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx", index=False) + + df = pd.read_excel(r"C:\Users\zengb\Desktop\time_split_conf_V4.xlsx") + df.to_feather(r"A:\ZB\git_repo\waditu\czsc\czsc\utils\minutes_split.feather") diff --git a/examples/develop/weight_backtest.py b/examples/develop/weight_backtest.py index 8762b8c9b..e508dbdf2 100644 --- a/examples/develop/weight_backtest.py +++ b/examples/develop/weight_backtest.py @@ -1,6 +1,7 @@ # https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb import sys -sys.path.insert(0, r"D:\ZB\git_repo\waditu\czsc") + +sys.path.insert(0, r"A:\ZB\git_repo\waditu\czsc") import czsc import pandas as pd @@ -9,7 +10,6 @@ def test_ensemble_weight(): """从持仓权重样例数据中回测""" - dfw = pd.read_feather(r"C:\Users\zengb\Desktop\weight_example.feather") + dfw = pd.read_feather(r"C:\Users\zengb\Downloads\weight_example.feather") wb = czsc.WeightBacktest(dfw, digits=1, fee_rate=0.0002, res_path=r"C:\Users\zengb\Desktop\weight_example") - res = wb.backtest() - + # res = wb.backtest() diff --git "a/examples/develop/\345\257\271\346\257\224numpy\344\270\216talib\347\232\204\351\200\237\345\272\246.py" "b/examples/develop/\345\257\271\346\257\224numpy\344\270\216talib\347\232\204\351\200\237\345\272\246.py" index 75a47430b..1f2700062 100644 --- "a/examples/develop/\345\257\271\346\257\224numpy\344\270\216talib\347\232\204\351\200\237\345\272\246.py" +++ "b/examples/develop/\345\257\271\346\257\224numpy\344\270\216talib\347\232\204\351\200\237\345\272\246.py" @@ -1,4 +1,5 @@ import sys +import time sys.path.insert(0, r"A:\ZB\git_repo\waditu\czsc") import talib @@ -6,14 +7,83 @@ from czsc.connectors import cooperation as coo -df = coo.get_raw_bars(symbol="SFIC9001", freq="30分钟", fq="后复权", sdt="20100101", edt="20210301", raw_bars=False) +df = coo.get_raw_bars(symbol="SFIC9001", freq="日线", fq="后复权", sdt="20100101", edt="20210301", raw_bars=False) -def test_with_numpy(): +def test_vs_volatility(): df1 = df.copy() - df1["x"] = ta.LINEARREG_ANGLE(df["close"].values, 10) + df1["STD"] = df["close"].pct_change().rolling(30).std() + df1["RS"] = ta.RS_VOLATILITY(df1, timeperiod=30) + df1["PK"] = ta.PK_VOLATILITY(df1, timeperiod=30) + df1[["STD", "RS", "PK"]].corr() -def test_with_talib(): +def test_compare_LINEARREG_ANGLE(): df1 = df.copy() - df1["x"] = talib.LINEARREG_ANGLE(df["close"].values, 10) + s1 = time.time() + df1["x1"] = ta.LINEARREG_ANGLE(df["close"].values, 10) + e1 = time.time() - s1 + + s2 = time.time() + df1["x2"] = talib.LINEARREG_ANGLE(df["close"].values, 10) + e2 = time.time() - s2 + + print(f"计算时间差异,ta: {e1}, talib: {e2};相差:{e1 - e2}") + df1["diff"] = df1["x1"] - df1["x2"] + assert df1["diff"].abs().max() < 1e-6 + print(df1["diff"].abs().max()) + + +def test_compare_CCI(): + # 数据对不上,需要调整 + df1 = df.copy() + s1 = time.time() + df1["x1"] = ta.CCI(df1["high"].values, df1["low"].values, df1["close"].values, timeperiod=14) + e1 = time.time() - s1 + + s2 = time.time() + df1["x2"] = talib.CCI(df1["high"].values, df1["low"].values, df1["close"].values, timeperiod=14) + e2 = time.time() - s2 + + print(f"计算时间差异,ta: {e1}, talib: {e2};相差:{e1 - e2}") + df1["diff"] = df1["x1"] - df1["x2"] + print(df1["diff"].abs().describe()) + + assert df1["diff"].abs().max() < 1e-6 + + +def test_compare_MFI(): + # 数据对不上,需要调整 + df1 = df.copy() + s1 = time.time() + df1["x1"] = ta.MFI(df1["high"], df1["low"], df1["close"], df1["vol"], timeperiod=14) + e1 = time.time() - s1 + + s2 = time.time() + df1["x2"] = talib.MFI(df1["high"].values, df1["low"].values, df1["close"].values, df1["vol"].values, timeperiod=14) + e2 = time.time() - s2 + + print(f"计算时间差异,ta: {e1}, talib: {e2};相差:{e1 - e2}") + df1["diff"] = df1["x1"] - df1["x2"] + print(df1["diff"].abs().describe()) + + assert df1["diff"].abs().max() < 1e-6 + + +def test_compare_PLUS_DI(): + # 数据对不上,需要调整 + df1 = coo.get_raw_bars(symbol="SFIC9001", freq="日线", fq="后复权", sdt="20100101", edt="20210301", raw_bars=False) + + s1 = time.time() + df1["x1"] = ta.PLUS_DI(df1["high"], df1["low"], df1["close"], timeperiod=14) + e1 = time.time() - s1 + + s2 = time.time() + df1["x2"] = talib.PLUS_DI(df1["high"].values, df1["low"].values, df1["close"].values, timeperiod=14) + e2 = time.time() - s2 + + print(f"计算时间差异,ta: {e1}, talib: {e2};相差:{e1 - e2}") + df1["diff"] = df1["x1"] - df1["x2"] + print(df1["diff"].abs().describe()) + + assert df1["diff"].abs().max() < 1e-6 diff --git "a/examples/develop/\346\265\213\350\257\225TA\346\212\200\346\234\257\346\214\207\346\240\207\350\256\241\347\256\227.py" "b/examples/develop/\346\265\213\350\257\225TA\346\212\200\346\234\257\346\214\207\346\240\207\350\256\241\347\256\227.py" new file mode 100644 index 000000000..7a75f00b7 --- /dev/null +++ "b/examples/develop/\346\265\213\350\257\225TA\346\212\200\346\234\257\346\214\207\346\240\207\350\256\241\347\256\227.py" @@ -0,0 +1,71 @@ +import sys +import time + +sys.path.insert(0, r"A:\ZB\git_repo\waditu\czsc") +import talib +from czsc.utils import ta +from czsc.connectors import cooperation as coo + + +df = coo.get_raw_bars(symbol="SFIC9001", freq="日线", fq="后复权", sdt="20100101", edt="20210301", raw_bars=False) + + +def test_CHOP(): + df1 = df.copy() + df1["CHOP"] = ta.CHOP(df1["high"], df1["low"], df1["close"]) + df1["WMA"] = ta.WMA(df1["close"], timeperiod=3) + + +def test_WMA(): + df1 = df.copy() + df1["WMA"] = ta.WMA(df1["close"], timeperiod=3) + df1["WMA2"] = talib.WMA(df1["close"], timeperiod=5) + print(df1[["close", "WMA", "WMA2"]].tail(10)) + + +def test_rolling_rsq(): + df1 = df.copy() + df1["rsq"] = ta.rolling_rsq(df1["close"], window=5) + print(df1[["close", "rsq"]].tail(10)) + + +def test_rolling_corr(): + df1 = df.copy() + df1["corr"] = ta.rolling_corr(df1["close"], df1["open"], window=5) + print(df1[["close", "open", "corr"]].tail(10)) + + +def test_rolling_beta(): + df1 = df.copy() + df1["beta"] = ta.rolling_beta(df1["close"], df1["open"], window=5) + print(df1[["close", "open", "beta"]].tail(10)) + + +def test_rolling_std(): + df1 = df.copy() + df1["std"] = ta.rolling_std(df1["close"], window=5) + print(df1[["close", "std"]].tail(10)) + + +def test_rolling_max(): + df1 = df.copy() + df1["max"] = ta.rolling_max(df1["close"], window=5) + print(df1[["close", "max"]].tail(10)) + + +def test_rolling_min(): + df1 = df.copy() + df1["min"] = ta.rolling_min(df1["close"], window=5) + print(df1[["close", "min"]].tail(10)) + + +def test_rolling_mdd(): + df1 = df.copy() + df1["mdd"] = ta.rolling_mdd(df1["close"], window=5) + print(df1[["close", "mdd"]].tail(10)) + + +def test_ultimate_smoother(): + df1 = df.copy() + df1["uo"] = ta.ultimate_smoother(df1["close"], period=5) + print(df1[["close", "uo"]].tail(10)) diff --git a/examples/signals_dev/bar_volatility_V241013.py b/examples/signals_dev/bar_volatility_V241013.py new file mode 100644 index 000000000..b11a821b5 --- /dev/null +++ b/examples/signals_dev/bar_volatility_V241013.py @@ -0,0 +1,67 @@ +import numpy as np +from collections import OrderedDict + +import pandas as pd + +from czsc.analyze import CZSC +from czsc.utils import create_single_signal + + +def bar_volatility_V241013(c: CZSC, **kwargs) -> OrderedDict: + """波动率分三层 + + 参数模板:"{freq}_波动率分层W{w}N{n}_完全分类V241013" + + **信号逻辑:** + + 波动率分层,要求如下。 + 1. 取最近 n 根K线,计算这 n 根K线的最高价和最低价的差值,记为 r + 2. 取最近 w 根K线,将 r 分为三等分,分别为低波动,中波动,高波动 + + **信号列表:** + + - Signal('60分钟_波动率分层W200N10_完全分类V241013_低波动_任意_任意_0') + - Signal('60分钟_波动率分层W200N10_完全分类V241013_中波动_任意_任意_0') + - Signal('60分钟_波动率分层W200N10_完全分类V241013_高波动_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + w = int(kwargs.get("w", 200)) # K线数量 + n = int(kwargs.get("n", 10)) # 波动率窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_波动率分层W{w}N{n}_完全分类V241013".split("_") + v1 = "其他" + key = f"volatility_{n}" + for bar in c.bars_raw[-n:]: + if key not in bar.cache: + n_max_close = max([x.close for x in c.bars_raw[-n:]]) + n_min_close = min([x.close for x in c.bars_raw[-n:]]) + bar.cache[key] = n_max_close - n_min_close + + if len(c.bars_raw) < w + n + 100: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + vols = [x.cache.get(key, 0) for x in c.bars_raw[-w:]] + try: + v1 = pd.qcut(vols, 3, labels=["低波动", "中波动", "高波动"], duplicates="drop")[-1] + except Exception as e: + v1 = "其他" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols("A股主要指数") + bars = research.get_raw_bars(symbols[0], "15分钟", "20181101", "20210101", fq="前复权") + + signals_config = [{"name": bar_volatility_V241013, "freq": "60分钟", "n": 10}] + check_signals_acc(bars, signals_config=signals_config, height="780px", delta_days=5) # type: ignore + + +if __name__ == "__main__": + check() diff --git a/examples/signals_dev/bar_zfzd_V241013.py b/examples/signals_dev/bar_zfzd_V241013.py new file mode 100644 index 000000000..aff55aa4a --- /dev/null +++ b/examples/signals_dev/bar_zfzd_V241013.py @@ -0,0 +1,56 @@ +import numpy as np +from collections import OrderedDict + +import pandas as pd + +from czsc.analyze import CZSC +from czsc.utils import create_single_signal + + +def bar_zfzd_V241013(c: CZSC, **kwargs) -> OrderedDict: + """窄幅震荡形态:窗口内任意两根K线都要有重叠 + + 参数模板:"{freq}_窄幅震荡N{n}_形态V241013" + + **信号逻辑:** + + 1. 取最近 n 根K线,这 n 根K线的 最高价最小值 >= 最低价最大值,即有重叠 + + **信号列表:** + + - Signal('60分钟_窄幅震荡N5_形态V241013_满足_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + n = int(kwargs.get("n", 5)) # 窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_窄幅震荡N{n}_形态V241013".split("_") + v1 = "其他" + + if len(c.bars_raw) < n + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-n:] + zg = min([x.high for x in bars]) + zd = max([x.low for x in bars]) + if zg >= zd: + v1 = "满足" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols("A股主要指数") + bars = research.get_raw_bars(symbols[0], "15分钟", "20181101", "20210101", fq="前复权") + + signals_config = [{"name": bar_zfzd_V241013, "freq": "60分钟", "n": 5}] + check_signals_acc(bars, signals_config=signals_config, height="780px", delta_days=5) # type: ignore + + +if __name__ == "__main__": + check() diff --git a/examples/signals_dev/bar_zfzd_V241014.py b/examples/signals_dev/bar_zfzd_V241014.py new file mode 100644 index 000000000..5ba9cc647 --- /dev/null +++ b/examples/signals_dev/bar_zfzd_V241014.py @@ -0,0 +1,60 @@ +import numpy as np +from collections import OrderedDict + +import pandas as pd + +from czsc.analyze import CZSC +from czsc.utils import create_single_signal + + +def bar_zfzd_V241014(c: CZSC, **kwargs) -> OrderedDict: + """窄幅震荡形态 + + 参数模板:"{freq}_窄幅震荡N{n}_形态V241014" + + **信号逻辑:** + + 1. 找到窗口内最大实体的K线,这根K线跟所有的其他K线都有重叠 + 2. 相比于 bar_zfzd_V241013,条件更宽松一些 + + **信号列表:** + + - Signal('60分钟_窄幅震荡N10_形态V241014_满足_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + n = int(kwargs.get("n", 5)) # 窗口大小 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_窄幅震荡N{n}_形态V241014".split("_") + v1 = "其他" + + if len(c.bars_raw) < n + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-n:] + max_bar = sorted(bars, key=lambda x: x.solid, reverse=True)[0] + + if max_bar.solid > 2 * np.mean([x.solid for x in bars]): + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + if all([min(x.high, max_bar.high) > max(x.low, max_bar.low) for x in bars]): + v1 = "满足" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols("A股主要指数") + bars = research.get_raw_bars(symbols[0], "15分钟", "20181101", "20210101", fq="前复权") + + signals_config = [{"name": bar_zfzd_V241014, "freq": "60分钟", "n": 10}] + check_signals_acc(bars, signals_config=signals_config, height="780px", delta_days=5) # type: ignore + + +if __name__ == "__main__": + check() diff --git a/examples/signals_dev/tas_dif_layer_V241010.py b/examples/signals_dev/tas_dif_layer_V241010.py new file mode 100644 index 000000000..b265f3dbf --- /dev/null +++ b/examples/signals_dev/tas_dif_layer_V241010.py @@ -0,0 +1,68 @@ +import numpy as np +from collections import OrderedDict +from czsc.analyze import CZSC, Direction +from czsc.signals.tas import update_macd_cache +from czsc.utils import create_single_signal + + +def tas_dif_layer_V241010(c: CZSC, **kwargs) -> OrderedDict: + """DIF分三层:零轴附近,上方远离,下方远离 + + 参数模板:"{freq}_DIF分层W{w}T{t}_完全分类V241010" + + **信号逻辑:** + + DIF分层,要求如下。 + + 1,取最近 w 根K线,获取 diffs 序列 + 2. 计算 diffs 的最大绝对值 r,作为波动率的标准 + 3. 如果最近一个 diff 在 t * r 的范围内,则认为是零轴附近 + 4. 如果最近一个 diff > 0 且在 t * r 的范围外,则认为是多头远离;反之,空头远离 + + **信号列表:** + + - Signal('60分钟_DIF分层W100T50_完全分类V241010_零轴附近_任意_任意_0') + - Signal('60分钟_DIF分层W100T50_完全分类V241010_空头远离_任意_任意_0') + - Signal('60分钟_DIF分层W100T50_完全分类V241010_多头远离_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + w = int(kwargs.get("w", 100)) # K线数量 + t = int(kwargs.get("t", 30)) # 零轴附近的阈值,相比与 max(diffs) 的比例 + + freq = c.freq.value + k1, k2, k3 = f"{freq}_DIF分层W{w}T{t}_完全分类V241010".split("_") + v1 = "其他" + key = update_macd_cache(c) + if len(c.bars_raw) < w + 50: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = c.bars_raw[-w:] + diffs = [x.cache[key]["dif"] for x in bars] + + r = max([abs(x) for x in diffs]) / 100 + if diffs[-1] < 0 and abs(diffs[-1]) > r * t: + v1 = "空头远离" + elif diffs[-1] > 0 and abs(diffs[-1]) > r * t: + v1 = "多头远离" + else: + v1 = "零轴附近" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols("A股主要指数") + bars = research.get_raw_bars(symbols[0], "15分钟", "20181101", "20210101", fq="前复权") + + signals_config = [{"name": tas_dif_layer_V241010, "freq": "60分钟", "t": 30}] + check_signals_acc(bars, signals_config=signals_config, height="780px", delta_days=5) # type: ignore + + +if __name__ == "__main__": + check() diff --git a/requirements.txt b/requirements.txt index 0aa9f37bf..cabef51c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ requests>=2.24.0 pyecharts>=1.9.1 tqdm>=4.66.4 -pandas>=1.1.0 -numpy>=1.16.5 +pandas>=1.0.0 +numpy>1.16,<=1.26.4 tushare>=1.4.6 python-docx matplotlib @@ -28,4 +28,5 @@ cryptography pytz flask scipy -requests_toolbelt \ No newline at end of file +requests_toolbelt +pandas-ta \ No newline at end of file diff --git a/test/test_eda.py b/test/test_eda.py new file mode 100644 index 000000000..fd6c40542 --- /dev/null +++ b/test/test_eda.py @@ -0,0 +1,73 @@ +import pytest +import pandas as pd +from czsc.eda import weights_simple_ensemble + + +def test_cal_yearly_days(): + if pd.__version__ < "2.1.0": + pytest.skip("skip this test if pandas version is less than 1.3.0") + + from czsc.eda import cal_yearly_days + + # Test with a list of dates within a single year + dts = ["2023-01-01", "2023-01-02", "2023-01-03", "2023-12-31"] + assert cal_yearly_days(dts) == 252 + + # Test with a list of dates spanning more than one year + dts = ["2022-01-01", "2022-12-31", "2023-01-01", "2023-12-31"] + assert cal_yearly_days(dts) == 2 + + # Test with a list of dates with minute precision + dts = [ + "2023-01-01 12:00", + "2023-01-02 13:00", + "2023-01-01 14:00", + "2023-02-01 15:00", + "2023-03-01 16:00", + "2023-03-01 17:00", + ] + assert cal_yearly_days(dts) == 252 + + # Test with an empty list + with pytest.raises(AssertionError): + cal_yearly_days([]) + + # Test with a list of dates with duplicates + dts = ["2023-01-01", "2023-01-01", "2023-01-02", "2023-01-02"] + assert cal_yearly_days(dts) == 252 + + +def test_weights_simple_ensemble_mean(): + df = pd.DataFrame({"strategy1": [0.1, 0.2, 0.3], "strategy2": [0.2, 0.3, 0.4], "strategy3": [0.3, 0.4, 0.5]}) + weight_cols = ["strategy1", "strategy2", "strategy3"] + result = weights_simple_ensemble(df, weight_cols, method="mean") + expected = pd.Series([0.2, 0.3, 0.4], name="weight") + pd.testing.assert_series_equal(result["weight"], expected) + + +def test_weights_simple_ensemble_vote(): + df = pd.DataFrame({"strategy1": [1, -1, 1], "strategy2": [-1, 1, -1], "strategy3": [1, 1, -1]}) + weight_cols = ["strategy1", "strategy2", "strategy3"] + result = weights_simple_ensemble(df, weight_cols, method="vote") + expected = pd.Series([1, 1, -1], name="weight") + pd.testing.assert_series_equal(result["weight"], expected) + + +def test_weights_simple_ensemble_sum_clip(): + df = pd.DataFrame({"strategy1": [0.5, -0.5, 0.5], "strategy2": [0.5, 0.5, -0.5], "strategy3": [0.5, 0.5, 0.5]}) + weight_cols = ["strategy1", "strategy2", "strategy3"] + result = weights_simple_ensemble(df, weight_cols, method="sum_clip", clip_min=-1, clip_max=1) + expected = pd.Series([1, 0.5, 0.5], name="weight") + pd.testing.assert_series_equal(result["weight"], expected) + + +def test_weights_simple_ensemble_only_long(): + df = pd.DataFrame({"strategy1": [0.5, -0.5, 0.5], "strategy2": [0.5, 0.5, -0.5], "strategy3": [0.5, 0.5, 0.5]}) + weight_cols = ["strategy1", "strategy2", "strategy3"] + result = weights_simple_ensemble(df, weight_cols, method="sum_clip", clip_min=-1, clip_max=1, only_long=True) + expected = pd.Series([1, 0.5, 0.5], name="weight") + pd.testing.assert_series_equal(result["weight"], expected) + + +if __name__ == "__main__": + pytest.main() diff --git a/test/test_utils.py b/test/test_utils.py index 9c984dd1e..ac95ae04b 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -7,9 +7,11 @@ """ import sys import pytest +import time import pandas as pd import numpy as np from czsc import utils +from czsc.utils import timeout_decorator def test_x_round(): @@ -344,3 +346,21 @@ def test_overlap(): # 验证结果 assert result["col_overlap"].tolist() == [1, 2, 1, 2, 1] + + +def test_timeout_decorator_success(): + @timeout_decorator(2) + def fast_function(): + time.sleep(1) + return "Completed" + + assert fast_function() == "Completed" + + +def test_timeout_decorator_timeout(): + @timeout_decorator(1) + def slow_function(): + time.sleep(5) + return "Completed" + + assert slow_function() is None