From 3894b7386d5014bbe207c62e760052f84b2ab012 Mon Sep 17 00:00:00 2001 From: foolcage <5533061@qq.com> Date: Tue, 13 Aug 2024 23:09:24 +0800 Subject: [PATCH] 1)Add qmt data runner for downloading 1d kdata and tick 2)Add realtime data selector 3)Add function build_stock_pool_and_tag_stats --- src/zvt/api/selector.py | 26 +++++- src/zvt/broker/qmt/data_manager.py | 58 ------------- src/zvt/broker/qmt/qmt_quote.py | 12 ++- .../qmt/meta/qmt_stock_meta_recorder.py | 2 +- .../qmt/quotes/qmt_kdata_recorder.py | 45 +++++----- src/zvt/tag/dynamic_pool.py | 31 ------- src/zvt/tag/tag_service.py | 4 +- src/zvt/tag/tag_stats.py | 49 +++++++++-- src/zvt/tasks/qmt_data_runner.py | 86 +++++++++++++++++++ 9 files changed, 188 insertions(+), 125 deletions(-) delete mode 100644 src/zvt/broker/qmt/data_manager.py delete mode 100644 src/zvt/tag/dynamic_pool.py create mode 100644 src/zvt/tasks/qmt_data_runner.py diff --git a/src/zvt/api/selector.py b/src/zvt/api/selector.py index 967b5ed3..29e70b43 100644 --- a/src/zvt/api/selector.py +++ b/src/zvt/api/selector.py @@ -7,7 +7,7 @@ from zvt.api.kdata import default_adjust_type, get_kdata_schema from zvt.contract import IntervalLevel from zvt.contract.api import get_entity_ids -from zvt.domain import DragonAndTiger, Stock1dHfqKdata, Stock, LimitUpInfo +from zvt.domain import DragonAndTiger, Stock1dHfqKdata, Stock, LimitUpInfo, StockQuote from zvt.utils.pd_utils import pd_is_not_null from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval, current_date @@ -290,6 +290,30 @@ def get_middle_and_big_stock(timestamp, provider="em"): ) +def get_limit_up_today(): + df = StockQuote.query_data(filters=[StockQuote.is_limit_up], columns=[StockQuote.entity_id]) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +def get_top_up_today(n=100): + df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.desc(), limit=n) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +def get_top_down_today(n=100): + df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.asc(), limit=n) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +def get_limit_down_today(): + df = StockQuote.query_data(filters=[StockQuote.is_limit_down], columns=[StockQuote.entity_id]) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + if __name__ == "__main__": # target_date = get_latest_kdata_date(provider="em", entity_type="stock", adjust_type=AdjustType.hfq) # big = get_big_cap_stock(timestamp=target_date) diff --git a/src/zvt/broker/qmt/data_manager.py b/src/zvt/broker/qmt/data_manager.py deleted file mode 100644 index e611bd21..00000000 --- a/src/zvt/broker/qmt/data_manager.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- -import logging - -import pandas as pd -from xtquant import xtdata - -from zvt import init_log - -logger = logging.getLogger(__name__) - - -def download_data(): - period = "1d" - xtdata.download_sector_data() - stock_codes = xtdata.get_stock_list_in_sector("沪深A股") - stock_codes = sorted(stock_codes) - count = len(stock_codes) - - for index, stock_code in enumerate(stock_codes): - logger.info(f"run to {index + 1}/{count}") - - xtdata.download_history_data(stock_code, period=period) - logger.info(f"download {stock_code} {period} kdata ok") - records = xtdata.get_market_data( - stock_list=[stock_code], - period=period, - count=5, - dividend_type="front", - fill_data=False, - ) - dfs = [] - for col in records: - df = records[col].T - df.columns = [col] - dfs.append(df) - kdatas = pd.concat(dfs, axis=1) - logger.info(kdatas) - - start_time = kdatas.index.to_list()[0] - xtdata.download_history_data(stock_code, period="tick", start_time=start_time) - logger.info(f"download {stock_code} tick from {start_time} ok") - # records = xtdata.get_market_data( - # stock_list=[stock_code], - # period="tick", - # count=5, - # fill_data=False, - # ) - # logger.info(records[stock_code]) - - xtdata.download_financial_data2( - stock_list=stock_codes, table_list=["Capital"], start_time="", end_time="", callback=lambda x: print(x) - ) - logger.info("download capital data ok") - - -if __name__ == "__main__": - init_log("qmt_data_manager.log") - download_data() diff --git a/src/zvt/broker/qmt/qmt_quote.py b/src/zvt/broker/qmt/qmt_quote.py index 831290f6..2aef91b7 100644 --- a/src/zvt/broker/qmt/qmt_quote.py +++ b/src/zvt/broker/qmt/qmt_quote.py @@ -140,11 +140,13 @@ def get_kdata( end_timestamp, level=IntervalLevel.LEVEL_1DAY, adjust_type=AdjustType.qfq, + download_history=True, ): code = _to_qmt_code(entity_id=entity_id) period = level.value - # 保证qmt先下载数据到本地 - xtdata.download_history_data(stock_code=code, period=period) + # download比较耗时,建议单独定时任务来做 + if download_history: + xtdata.download_history_data(stock_code=code, period=period) records = xtdata.get_market_data( stock_list=[code], period=period, @@ -159,7 +161,9 @@ def get_kdata( df = records[col].T df.columns = [col] dfs.append(df) - return pd.concat(dfs, axis=1) + df = pd.concat(dfs, axis=1) + df["volume"] = df["volume"] * 100 + return df def tick_to_quote(): @@ -185,6 +189,8 @@ def on_data(datas, stock_df=entity_df): start_time = time.time() tick_df = pd.DataFrame.from_records(data=[datas[code] for code in datas], index=list(datas.keys())) + # 过滤无效tick,一般是退市的 + tick_df = tick_df[tick_df["lastPrice"] != 0] tick_df.index = tick_df.index.map(_to_zvt_entity_id) df = pd.concat( diff --git a/src/zvt/recorders/qmt/meta/qmt_stock_meta_recorder.py b/src/zvt/recorders/qmt/meta/qmt_stock_meta_recorder.py index b30a0760..5685b2e6 100644 --- a/src/zvt/recorders/qmt/meta/qmt_stock_meta_recorder.py +++ b/src/zvt/recorders/qmt/meta/qmt_stock_meta_recorder.py @@ -12,7 +12,7 @@ class QMTStockRecorder(Recorder): def run(self): df = qmt_quote.get_entity_list() - self.logger.info(df) + self.logger.info(df.tail()) df_to_db(df=df, data_schema=self.data_schema, provider=self.provider, force_update=True) diff --git a/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py b/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py index fb4cb051..cfc665fa 100644 --- a/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py +++ b/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py @@ -70,6 +70,9 @@ def __init__( ) def record(self, entity, start, end, size, timestamps): + if start and (self.level == IntervalLevel.LEVEL_1DAY): + start = start.date() + # 判断是否需要重新计算之前保存的前复权数据 if start and (self.adjust_type == AdjustType.qfq): check_df = qmt_quote.get_kdata( @@ -78,24 +81,26 @@ def record(self, entity, start, end, size, timestamps): end_timestamp=start, adjust_type=self.adjust_type, level=self.level, + download_history=False, ) - current_df = get_kdata( - entity_id=entity.id, - provider=self.provider, - start_timestamp=start, - end_timestamp=start, - limit=1, - level=self.level, - adjust_type=self.adjust_type, - ) - if pd_is_not_null(current_df): - old = current_df.iloc[0, :]["close"] - new = check_df["close"][0] - # 相同时间的close不同,表明前复权需要重新计算 - if round(old, 2) != round(new, 2): - # 删掉重新获取 - self.session.query(self.data_schema).filter(self.data_schema.entity_id == entity.id).delete() - start = "2005-01-01" + if pd_is_not_null(check_df): + current_df = get_kdata( + entity_id=entity.id, + provider=self.provider, + start_timestamp=start, + end_timestamp=start, + limit=1, + level=self.level, + adjust_type=self.adjust_type, + ) + if pd_is_not_null(current_df): + old = current_df.iloc[0, :]["close"] + new = check_df["close"][0] + # 相同时间的close不同,表明前复权需要重新计算 + if round(old, 2) != round(new, 2): + # 删掉重新获取 + self.session.query(self.data_schema).filter(self.data_schema.entity_id == entity.id).delete() + start = "2005-01-01" if not start: start = "2005-01-01" @@ -108,6 +113,7 @@ def record(self, entity, start, end, size, timestamps): end_timestamp=end, adjust_type=self.adjust_type, level=self.level, + download_history=False, ) if pd_is_not_null(df): df["entity_id"] = entity.id @@ -131,9 +137,8 @@ class QMTStockKdataRecorder(BaseQmtKdataRecorder): if __name__ == "__main__": - Stock.record_data(provider="qmt") - QMTStockKdataRecorder(entity_id="stock_sz_000338", adjust_type=AdjustType.qfq).run() - + # Stock.record_data(provider="qmt") + QMTStockKdataRecorder(entity_id="stock_sz_301611", adjust_type=AdjustType.qfq).run() # the __all__ is generated __all__ = ["BaseQmtKdataRecorder", "QMTStockKdataRecorder"] diff --git a/src/zvt/tag/dynamic_pool.py b/src/zvt/tag/dynamic_pool.py deleted file mode 100644 index 65ecbe81..00000000 --- a/src/zvt/tag/dynamic_pool.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -from zvt.domain import StockQuote -from zvt.utils.pd_utils import pd_is_not_null - - -def get_limit_up(): - df = StockQuote.query_data(filters=[StockQuote.is_limit_up], columns=[StockQuote.entity_id]) - if pd_is_not_null(df): - return df["entity_id"].to_list() - - -def get_top_50(): - df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.desc(), limit=50) - if pd_is_not_null(df): - return df["entity_id"].to_list() - - -def get_limit_down(): - df = StockQuote.query_data(filters=[StockQuote.is_limit_down], columns=[StockQuote.entity_id]) - if pd_is_not_null(df): - return df["entity_id"].to_list() - - -if __name__ == "__main__": - print(get_limit_up()) - print(get_limit_down()) - print(get_top_50()) - - -# the __all__ is generated -__all__ = ["get_limit_up", "get_top_50", "get_limit_down"] diff --git a/src/zvt/tag/tag_service.py b/src/zvt/tag/tag_service.py index f3f4886e..e608e594 100644 --- a/src/zvt/tag/tag_service.py +++ b/src/zvt/tag/tag_service.py @@ -664,8 +664,8 @@ def activate_sub_tags(activate_sub_tags_model: ActivateSubTagsModel): if __name__ == "__main__": - activate_default_main_tag(industry="半导体") - # activate_sub_tags(ActivateSubTagsModel(sub_tags=["无人驾驶"])) + # activate_default_main_tag(industry="半导体") + activate_sub_tags(ActivateSubTagsModel(sub_tags=["航天概念", "天基互联", "北斗导航", "通用航空"])) # the __all__ is generated diff --git a/src/zvt/tag/tag_stats.py b/src/zvt/tag/tag_stats.py index 2a6bc0a3..2ebd108d 100644 --- a/src/zvt/tag/tag_stats.py +++ b/src/zvt/tag/tag_stats.py @@ -5,14 +5,17 @@ import pandas as pd import sqlalchemy +from zvt.api.kdata import get_kdata_schema +from zvt.contract import AdjustType, IntervalLevel from zvt.contract.api import df_to_db, get_db_session -from zvt.domain import Stock1dHfqKdata +from zvt.domain.quotes import Stock1dHfqKdata, KdataCommon from zvt.factors.top_stocks import TopStocks, get_top_stocks +from zvt.tag.common import InsertMode from zvt.tag.tag_models import CreateStockPoolsModel from zvt.tag.tag_schemas import TagStats, StockTags, StockPools from zvt.tag.tag_service import build_stock_pool from zvt.utils.pd_utils import pd_is_not_null -from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval +from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval, current_date logger = logging.getLogger(__name__) @@ -65,14 +68,16 @@ def build_system_stock_pools(): build_stock_pool(create_stock_pools_model, target_date=target_date) -def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False): +def build_stock_pool_tag_stats( + stock_pool_name, force_rebuild_latest=False, target_date=None, adjust_type=AdjustType.hfq, provider="em" +): datas = TagStats.query_data( limit=1, filters=[TagStats.stock_pool_name == stock_pool_name], order=TagStats.timestamp.desc(), return_type="domain", ) - start = None + start = target_date current_df = None if datas: if force_rebuild_latest: @@ -105,11 +110,14 @@ def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False): entity_ids = stock_pool.entity_ids tags_df = StockTags.query_data(entity_ids=entity_ids, return_type="df", index="entity_id") - kdata_df = Stock1dHfqKdata.query_data( - provider="em", + kdata_schema: KdataCommon = get_kdata_schema( + entity_type="stock", level=IntervalLevel.LEVEL_1DAY, adjust_type=adjust_type + ) + kdata_df = kdata_schema.query_data( + provider=provider, entity_ids=entity_ids, - filters=[Stock1dHfqKdata.timestamp == to_pd_timestamp(target_date)], - columns=[Stock1dHfqKdata.entity_id, Stock1dHfqKdata.name, Stock1dHfqKdata.turnover], + filters=[kdata_schema.timestamp == to_pd_timestamp(target_date)], + columns=[kdata_schema.entity_id, kdata_schema.name, kdata_schema.turnover], index="entity_id", ) @@ -158,6 +166,29 @@ def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False): current_df = sorted_df +def build_stock_pool_and_tag_stats( + stock_pool_name, + entity_ids, + insert_mode=InsertMode.append, + target_date=current_date(), + provider="em", + adjust_type=AdjustType.hfq, +): + create_stock_pools_model: CreateStockPoolsModel = CreateStockPoolsModel( + stock_pool_name=stock_pool_name, entity_ids=entity_ids, insert_mode=insert_mode + ) + + build_stock_pool(create_stock_pools_model, target_date=target_date) + + build_stock_pool_tag_stats( + stock_pool_name=stock_pool_name, + force_rebuild_latest=True, + target_date=target_date, + adjust_type=adjust_type, + provider=provider, + ) + + if __name__ == "__main__": # build_system_stock_pools() build_stock_pool_tag_stats(stock_pool_name="main_line", force_rebuild_latest=True) @@ -165,4 +196,4 @@ def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False): # the __all__ is generated -__all__ = ["build_system_stock_pools", "build_stock_pool_tag_stats"] +__all__ = ["build_system_stock_pools", "build_stock_pool_tag_stats", "build_stock_pool_and_tag_stats"] diff --git a/src/zvt/tasks/qmt_data_runner.py b/src/zvt/tasks/qmt_data_runner.py new file mode 100644 index 00000000..8626477a --- /dev/null +++ b/src/zvt/tasks/qmt_data_runner.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +import logging +import time + +import pandas as pd +from xtquant import xtdata + +from zvt import init_log +from zvt.contract import AdjustType +from zvt.recorders.qmt.meta import QMTStockRecorder +from zvt.recorders.qmt.quotes import QMTStockKdataRecorder + +logger = logging.getLogger(__name__) + + +def download_data(download_tick=False): + period = "1d" + xtdata.download_sector_data() + stock_codes = xtdata.get_stock_list_in_sector("沪深A股") + stock_codes = sorted(stock_codes) + count = len(stock_codes) + download_status = {"ok": False} + + def update_progress(data, download_status: dict = download_status): + logger.info(data) + finished = data["finished"] + total = data["total"] + download_status["finished"] = finished + download_status["total"] = total + if finished == total: + download_status["ok"] = True + + start_time = time.time() + + xtdata.download_history_data2(stock_list=stock_codes, period=period, callback=update_progress) + + while True: + logger.info(f"current download_status:{download_status}") + if download_status["ok"]: + logger.info(f"finish download 1d kdata") + break + cost_time = time.time() - start_time + if cost_time >= 60 * 30: + logger.info(f"timeout download 1d kdata") + break + time.sleep(10) + + QMTStockRecorder().run() + QMTStockKdataRecorder(adjust_type=AdjustType.qfq, sleeping_time=0).run() + + xtdata.download_financial_data2( + stock_list=stock_codes, table_list=["Capital"], start_time="", end_time="", callback=lambda x: print(x) + ) + logger.info("download capital data ok") + + if download_tick: + for index, stock_code in enumerate(stock_codes): + logger.info(f"run to {index + 1}/{count}") + + records = xtdata.get_market_data( + stock_list=[stock_code], + period=period, + count=5, + dividend_type="front", + fill_data=False, + ) + dfs = [] + for col in records: + df = records[col].T + df.columns = [col] + dfs.append(df) + kdatas = pd.concat(dfs, axis=1) + start_time = kdatas.index.to_list()[0] + xtdata.download_history_data(stock_code, period="tick", start_time=start_time) + logger.info(f"download {stock_code} tick from {start_time} ok") + + +if __name__ == "__main__": + init_log("qmt_data_runner.log") + from apscheduler.schedulers.background import BackgroundScheduler + + sched = BackgroundScheduler() + download_data() + sched.add_job(func=download_data, trigger="cron", hour=15, minute=30, day_of_week="mon-fri") + sched.start() + sched._thread.join()