Skip to content

Commit

Permalink
1)Add qmt data runner for downloading 1d kdata and tick 2)Add realtim…
Browse files Browse the repository at this point in the history
…e data selector 3)Add function build_stock_pool_and_tag_stats
  • Loading branch information
foolcage committed Aug 13, 2024
1 parent 202e157 commit 3894b73
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 125 deletions.
26 changes: 25 additions & 1 deletion src/zvt/api/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from zvt.api.kdata import default_adjust_type, get_kdata_schema
from zvt.contract import IntervalLevel
from zvt.contract.api import get_entity_ids
from zvt.domain import DragonAndTiger, Stock1dHfqKdata, Stock, LimitUpInfo
from zvt.domain import DragonAndTiger, Stock1dHfqKdata, Stock, LimitUpInfo, StockQuote
from zvt.utils.pd_utils import pd_is_not_null
from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval, current_date

Expand Down Expand Up @@ -290,6 +290,30 @@ def get_middle_and_big_stock(timestamp, provider="em"):
)


def get_limit_up_today():
df = StockQuote.query_data(filters=[StockQuote.is_limit_up], columns=[StockQuote.entity_id])
if pd_is_not_null(df):
return df["entity_id"].to_list()


def get_top_up_today(n=100):
df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.desc(), limit=n)
if pd_is_not_null(df):
return df["entity_id"].to_list()


def get_top_down_today(n=100):
df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.asc(), limit=n)
if pd_is_not_null(df):
return df["entity_id"].to_list()


def get_limit_down_today():
df = StockQuote.query_data(filters=[StockQuote.is_limit_down], columns=[StockQuote.entity_id])
if pd_is_not_null(df):
return df["entity_id"].to_list()


if __name__ == "__main__":
# target_date = get_latest_kdata_date(provider="em", entity_type="stock", adjust_type=AdjustType.hfq)
# big = get_big_cap_stock(timestamp=target_date)
Expand Down
58 changes: 0 additions & 58 deletions src/zvt/broker/qmt/data_manager.py

This file was deleted.

12 changes: 9 additions & 3 deletions src/zvt/broker/qmt/qmt_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ def get_kdata(
end_timestamp,
level=IntervalLevel.LEVEL_1DAY,
adjust_type=AdjustType.qfq,
download_history=True,
):
code = _to_qmt_code(entity_id=entity_id)
period = level.value
# 保证qmt先下载数据到本地
xtdata.download_history_data(stock_code=code, period=period)
# download比较耗时,建议单独定时任务来做
if download_history:
xtdata.download_history_data(stock_code=code, period=period)
records = xtdata.get_market_data(
stock_list=[code],
period=period,
Expand All @@ -159,7 +161,9 @@ def get_kdata(
df = records[col].T
df.columns = [col]
dfs.append(df)
return pd.concat(dfs, axis=1)
df = pd.concat(dfs, axis=1)
df["volume"] = df["volume"] * 100
return df


def tick_to_quote():
Expand All @@ -185,6 +189,8 @@ def on_data(datas, stock_df=entity_df):
start_time = time.time()

tick_df = pd.DataFrame.from_records(data=[datas[code] for code in datas], index=list(datas.keys()))
# 过滤无效tick,一般是退市的
tick_df = tick_df[tick_df["lastPrice"] != 0]
tick_df.index = tick_df.index.map(_to_zvt_entity_id)

df = pd.concat(
Expand Down
2 changes: 1 addition & 1 deletion src/zvt/recorders/qmt/meta/qmt_stock_meta_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class QMTStockRecorder(Recorder):

def run(self):
df = qmt_quote.get_entity_list()
self.logger.info(df)
self.logger.info(df.tail())
df_to_db(df=df, data_schema=self.data_schema, provider=self.provider, force_update=True)


Expand Down
45 changes: 25 additions & 20 deletions src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def __init__(
)

def record(self, entity, start, end, size, timestamps):
if start and (self.level == IntervalLevel.LEVEL_1DAY):
start = start.date()

# 判断是否需要重新计算之前保存的前复权数据
if start and (self.adjust_type == AdjustType.qfq):
check_df = qmt_quote.get_kdata(
Expand All @@ -78,24 +81,26 @@ def record(self, entity, start, end, size, timestamps):
end_timestamp=start,
adjust_type=self.adjust_type,
level=self.level,
download_history=False,
)
current_df = get_kdata(
entity_id=entity.id,
provider=self.provider,
start_timestamp=start,
end_timestamp=start,
limit=1,
level=self.level,
adjust_type=self.adjust_type,
)
if pd_is_not_null(current_df):
old = current_df.iloc[0, :]["close"]
new = check_df["close"][0]
# 相同时间的close不同,表明前复权需要重新计算
if round(old, 2) != round(new, 2):
# 删掉重新获取
self.session.query(self.data_schema).filter(self.data_schema.entity_id == entity.id).delete()
start = "2005-01-01"
if pd_is_not_null(check_df):
current_df = get_kdata(
entity_id=entity.id,
provider=self.provider,
start_timestamp=start,
end_timestamp=start,
limit=1,
level=self.level,
adjust_type=self.adjust_type,
)
if pd_is_not_null(current_df):
old = current_df.iloc[0, :]["close"]
new = check_df["close"][0]
# 相同时间的close不同,表明前复权需要重新计算
if round(old, 2) != round(new, 2):
# 删掉重新获取
self.session.query(self.data_schema).filter(self.data_schema.entity_id == entity.id).delete()
start = "2005-01-01"

if not start:
start = "2005-01-01"
Expand All @@ -108,6 +113,7 @@ def record(self, entity, start, end, size, timestamps):
end_timestamp=end,
adjust_type=self.adjust_type,
level=self.level,
download_history=False,
)
if pd_is_not_null(df):
df["entity_id"] = entity.id
Expand All @@ -131,9 +137,8 @@ class QMTStockKdataRecorder(BaseQmtKdataRecorder):


if __name__ == "__main__":
Stock.record_data(provider="qmt")
QMTStockKdataRecorder(entity_id="stock_sz_000338", adjust_type=AdjustType.qfq).run()

# Stock.record_data(provider="qmt")
QMTStockKdataRecorder(entity_id="stock_sz_301611", adjust_type=AdjustType.qfq).run()

# the __all__ is generated
__all__ = ["BaseQmtKdataRecorder", "QMTStockKdataRecorder"]
31 changes: 0 additions & 31 deletions src/zvt/tag/dynamic_pool.py

This file was deleted.

4 changes: 2 additions & 2 deletions src/zvt/tag/tag_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ def activate_sub_tags(activate_sub_tags_model: ActivateSubTagsModel):


if __name__ == "__main__":
activate_default_main_tag(industry="半导体")
# activate_sub_tags(ActivateSubTagsModel(sub_tags=["无人驾驶"]))
# activate_default_main_tag(industry="半导体")
activate_sub_tags(ActivateSubTagsModel(sub_tags=["航天概念", "天基互联", "北斗导航", "通用航空"]))


# the __all__ is generated
Expand Down
49 changes: 40 additions & 9 deletions src/zvt/tag/tag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import pandas as pd
import sqlalchemy

from zvt.api.kdata import get_kdata_schema
from zvt.contract import AdjustType, IntervalLevel
from zvt.contract.api import df_to_db, get_db_session
from zvt.domain import Stock1dHfqKdata
from zvt.domain.quotes import Stock1dHfqKdata, KdataCommon
from zvt.factors.top_stocks import TopStocks, get_top_stocks
from zvt.tag.common import InsertMode
from zvt.tag.tag_models import CreateStockPoolsModel
from zvt.tag.tag_schemas import TagStats, StockTags, StockPools
from zvt.tag.tag_service import build_stock_pool
from zvt.utils.pd_utils import pd_is_not_null
from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval
from zvt.utils.time_utils import to_pd_timestamp, date_time_by_interval, current_date

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,14 +68,16 @@ def build_system_stock_pools():
build_stock_pool(create_stock_pools_model, target_date=target_date)


def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False):
def build_stock_pool_tag_stats(
stock_pool_name, force_rebuild_latest=False, target_date=None, adjust_type=AdjustType.hfq, provider="em"
):
datas = TagStats.query_data(
limit=1,
filters=[TagStats.stock_pool_name == stock_pool_name],
order=TagStats.timestamp.desc(),
return_type="domain",
)
start = None
start = target_date
current_df = None
if datas:
if force_rebuild_latest:
Expand Down Expand Up @@ -105,11 +110,14 @@ def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False):

entity_ids = stock_pool.entity_ids
tags_df = StockTags.query_data(entity_ids=entity_ids, return_type="df", index="entity_id")
kdata_df = Stock1dHfqKdata.query_data(
provider="em",
kdata_schema: KdataCommon = get_kdata_schema(
entity_type="stock", level=IntervalLevel.LEVEL_1DAY, adjust_type=adjust_type
)
kdata_df = kdata_schema.query_data(
provider=provider,
entity_ids=entity_ids,
filters=[Stock1dHfqKdata.timestamp == to_pd_timestamp(target_date)],
columns=[Stock1dHfqKdata.entity_id, Stock1dHfqKdata.name, Stock1dHfqKdata.turnover],
filters=[kdata_schema.timestamp == to_pd_timestamp(target_date)],
columns=[kdata_schema.entity_id, kdata_schema.name, kdata_schema.turnover],
index="entity_id",
)

Expand Down Expand Up @@ -158,11 +166,34 @@ def build_stock_pool_tag_stats(stock_pool_name, force_rebuild_latest=False):
current_df = sorted_df


def build_stock_pool_and_tag_stats(
stock_pool_name,
entity_ids,
insert_mode=InsertMode.append,
target_date=current_date(),
provider="em",
adjust_type=AdjustType.hfq,
):
create_stock_pools_model: CreateStockPoolsModel = CreateStockPoolsModel(
stock_pool_name=stock_pool_name, entity_ids=entity_ids, insert_mode=insert_mode
)

build_stock_pool(create_stock_pools_model, target_date=target_date)

build_stock_pool_tag_stats(
stock_pool_name=stock_pool_name,
force_rebuild_latest=True,
target_date=target_date,
adjust_type=adjust_type,
provider=provider,
)


if __name__ == "__main__":
# build_system_stock_pools()
build_stock_pool_tag_stats(stock_pool_name="main_line", force_rebuild_latest=True)
# build_stock_pool_tag_stats(stock_pool_name="vol_up")


# the __all__ is generated
__all__ = ["build_system_stock_pools", "build_stock_pool_tag_stats"]
__all__ = ["build_system_stock_pools", "build_stock_pool_tag_stats", "build_stock_pool_and_tag_stats"]
Loading

0 comments on commit 3894b73

Please sign in to comment.