Skip to content

Commit

Permalink
Fix df_to_db db locked error issue
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Jun 16, 2024
1 parent 039dc96 commit 2920538
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions src/zvt/contract/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ def df_to_db(
sub_size: int = 5000,
drop_duplicates: bool = True,
dtype=None,
session=None,
) -> object:
"""
store the df to db
Expand Down Expand Up @@ -528,36 +529,37 @@ def df_to_db(

saved = 0

db_engine = get_db_engine(provider, data_schema=data_schema)
with db_engine.connect() as conn:
for step in range(step_size):
df_current = df.iloc[sub_size * step : sub_size * (step + 1)]
if not session:
session = get_db_session(provider=provider, data_schema=data_schema)

session = get_db_session(provider=provider, data_schema=data_schema)
if force_update:
ids = df_current["id"].tolist()
if len(ids) == 1:
sql = text(f'delete from `{data_schema.__tablename__}` where id = "{ids[0]}"')
else:
sql = text(f"delete from `{data_schema.__tablename__}` where id in {tuple(ids)}")
for step in range(step_size):
df_current = df.iloc[sub_size * step : sub_size * (step + 1)]

conn.execute(sql)
if force_update:
ids = df_current["id"].tolist()
if len(ids) == 1:
sql = text(f'delete from `{data_schema.__tablename__}` where id = "{ids[0]}"')
else:
current = get_data(
session=session,
data_schema=data_schema,
columns=[data_schema.id],
provider=provider,
ids=df_current["id"].tolist(),
)
if pd_is_not_null(current):
df_current = df_current[~df_current["id"].isin(current["id"])]
session.commit()

if pd_is_not_null(df_current):
saved = saved + len(df_current)
df_current.to_sql(data_schema.__tablename__, conn, index=False, if_exists="append", dtype=dtype)
conn.commit()
sql = text(f"delete from `{data_schema.__tablename__}` where id in {tuple(ids)}")

session.execute(sql)
else:
current = get_data(
session=session,
data_schema=data_schema,
columns=[data_schema.id],
provider=provider,
ids=df_current["id"].tolist(),
)
if pd_is_not_null(current):
df_current = df_current[~df_current["id"].isin(current["id"])]

if pd_is_not_null(df_current):
saved = saved + len(df_current)
df_current.to_sql(
data_schema.__tablename__, session.connection(), index=False, if_exists="append", dtype=dtype
)
session.commit()
return saved


Expand Down

0 comments on commit 2920538

Please sign in to comment.