Skip to content

Commit

Permalink
fix: calculation problems of recording usages
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintShit committed Sep 13, 2023
1 parent 0968a81 commit 4c187d1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 54 deletions.
2 changes: 1 addition & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
redoc_url='/redoc' if DOCS else None
)
app.openapi = custom_openapi(app)
scheduler = BackgroundScheduler({'apscheduler.job_defaults.max_instances': 5}, timezone='UTC')
scheduler = BackgroundScheduler({'apscheduler.job_defaults.max_instances': 10}, timezone='UTC')
logger = logging.getLogger('uvicorn.error')
app.add_middleware(
CORSMiddleware,
Expand Down
84 changes: 31 additions & 53 deletions app/jobs/record_usages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union

from pymysql.err import OperationalError
from sqlalchemy import and_, bindparam, insert, select, update
from sqlalchemy import and_, bindparam, insert, select, sql, update

from app import scheduler, xray
from app.db import GetDB
Expand All @@ -15,6 +15,30 @@
from xray_api import exc as xray_exc


def safe_execute(db, stmt, params=None):
if db.bind.name == 'mysql':
if isinstance(stmt, sql.dml.Insert):
stmt = stmt.prefix_with('IGNORE')

tries = 0
done = False
while not done:
try:
db.execute(stmt, params)
db.commit()
done = True
except OperationalError as err:
if err.args[0] == 1213 and tries < 3: # Deadlock
db.rollback()
tries += 1
continue
raise err

else:
db.execute(stmt, params)
db.commit()


def record_user_stats(params: list, node_id: Union[int, None]):
if not params:
return
Expand All @@ -41,30 +65,15 @@ def record_user_stats(params: list, node_id: Union[int, None]):
node_id=node_id,
used_traffic=0
)
if db.bind.name == 'mysql':
stmt = stmt.prefix_with('IGNORE')
db.execute(stmt, [{'uid': uid} for uid in uids_to_insert])
safe_execute(db, stmt, [{'uid': uid} for uid in uids_to_insert])

# record
stmt = update(NodeUserUsage) \
.values(used_traffic=NodeUserUsage.used_traffic + bindparam('value')) \
.where(and_(NodeUserUsage.user_id == bindparam('uid'),
NodeUserUsage.node_id == node_id,
NodeUserUsage.created_at == created_at))

tries = 0
done = False
while not done:
try:
db.execute(stmt, params)
db.commit()
done = True
except OperationalError as err:
if err.args[0] == 1213 and tries < 3: # Deadlock
db.rollback()
tries += 1
continue
raise err
safe_execute(db, stmt, params)


def record_node_stats(params: dict, node_id: Union[int, None]):
Expand All @@ -81,31 +90,14 @@ def record_node_stats(params: dict, node_id: Union[int, None]):
notfound = db.execute(select_stmt).first() is None
if notfound:
stmt = insert(NodeUsage).values(created_at=created_at, node_id=node_id, uplink=0, downlink=0)
if db.bind.name == 'mysql':
stmt = stmt.prefix_with('IGNORE')
db.execute(stmt)
safe_execute(db, stmt)

# record
stmt = update(NodeUsage). \
values(uplink=NodeUsage.uplink + bindparam('up'), downlink=NodeUsage.downlink + bindparam('down')). \
where(and_(NodeUsage.node_id == node_id, NodeUsage.created_at == created_at))

db.execute(stmt, params)

# commit changes
tries = 0
done = False
while not done:
try:
db.execute(stmt, params)
db.commit()
done = True
except OperationalError as err:
if err.args[0] == 1213 and tries < 3: # Deadlock
db.rollback()
tries += 1
continue
raise err
safe_execute(db, stmt, params)


def get_users_stats(api: XRayAPI):
Expand Down Expand Up @@ -151,20 +143,7 @@ def record_user_usages():
stmt = update(User). \
where(User.id == bindparam('uid')). \
values(used_traffic=User.used_traffic + bindparam('value'))

tries = 0
done = False
while not done:
try:
db.execute(stmt, users_usage)
db.commit()
done = True
except OperationalError as err:
if err.args[0] == 1213 and tries < 3: # Deadlock
db.rollback()
tries += 1
continue
raise err
safe_execute(db, stmt, users_usage)

if DISABLE_RECORDING_NODE_USAGE:
return
Expand Down Expand Up @@ -198,8 +177,7 @@ def record_node_usages():
uplink=System.uplink + total_up,
downlink=System.downlink + total_down
)
db.execute(stmt)
db.commit()
safe_execute(db, stmt)

if DISABLE_RECORDING_NODE_USAGE:
return
Expand Down

0 comments on commit 4c187d1

Please sign in to comment.