Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hs_migration_v3 #8

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions hiscore_migration/migrate_highscore_data_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ async def task_migrate(queue: asyncio.Queue, semaphore: asyncio.Semaphore):
await asyncio.sleep(1)
continue

player_id = await queue.get()
try:
player_id = await queue.get()
except OperationalError as _:
await asyncio.sleep(sleep)
sleep = min(sleep * 2, 60)
continue

queue.task_done()

async with semaphore:
Expand Down Expand Up @@ -224,9 +230,9 @@ async def task_get_players(
# bsize: 100, rows: 1000, time:
async def main():
player_id = 0
batch_size = 1000
batch_size = 1_000
async_tasks = 1
limit = 10000
limit = 1_000

player_queue = asyncio.Queue(maxsize=async_tasks+1)
# semaphore limits the number of async tasks
Expand Down
255 changes: 255 additions & 0 deletions hiscore_migration/migrate_highscore_data_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import asyncio
import os

import dotenv
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import sqlalchemy as sqla
import time
from sqlalchemy.exc import OperationalError
import logging
import json

dotenv.load_dotenv(dotenv.find_dotenv(), verbose=True)


# Configure JSON logging
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"ts": self.formatTime(record, self.datefmt),
"lvl": record.levelname,
"module": record.module,
"funcName": record.funcName,
"lineNo": record.lineno,
"msg": record.getMessage(),
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record)


class IgnoreSpecificWarnings(logging.Filter):
def filter(self, record):
# Return False to filter out messages containing "Unknown table"
return "Unknown table" not in record.getMessage()


# Set up the logger
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())

logging.basicConfig(level=logging.INFO, handlers=[handler])
logging.getLogger("asyncmy").addFilter(IgnoreSpecificWarnings())

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Establishing database connection
connection_string = os.environ.get("sql_uri")
assert connection_string is not None


engine = create_async_engine(
connection_string,
pool_size=100,
max_overflow=10,
# echo=True,
)

Session = sessionmaker(
bind=engine,
expire_on_commit=False,
class_=AsyncSession, # Use AsyncSession for asynchronous operations
autocommit=False,
autoflush=False,
)


async def get_players_to_migrate(player_id: int, limit: int):
sql = """
SELECT DISTINCT
player_id
FROM scraper_data
WHERE player_id > :player_id
ORDER BY player_id
LIMIT :limit
;
"""
params = {"player_id": player_id, "limit": limit}
async with Session() as session:
session: AsyncSession
async with session.begin():
data = await session.execute(sqla.text(sql), params=params)
result = data.mappings().all()
return result


async def migrate(player_id: int):
sql_create_temp_table = """
CREATE TEMPORARY TABLE temp_hs_data (
scraper_id BIGINT NOT NULL,
player_id INT NOT NULL,
scrape_ts DATETIME NOT NULL,
scrape_date DATE NOT NULL,
skills JSON,
activities JSON
);
"""

sql_insert_temp_table = """
INSERT INTO temp_hs_data (scraper_id, player_id, scrape_ts, scrape_date, skills, activities)
select
sdv.scrape_id,
sdv.player_id,
sdv.scrape_ts as scrape_ts,
sdv.scrape_date as scrape_date,
(
select
JSON_OBJECTAGG(
s.skill_name, ps.skill_value
)
from scraper_player_skill sps
join player_skill ps on sps.player_skill_id = ps.player_skill_id
join skill s on ps.skill_id = s.skill_id
where sdv.scrape_id = sps.scrape_id
GROUP BY
sdv.scrape_id
) as skills,
(
select
JSON_OBJECTAGG(
a.activity_name, pa.activity_value
)
from scraper_player_activity spa
join player_activity pa on spa.player_activity_id = pa.player_activity_id
join activity a on pa.activity_id = a.activity_id
where sdv.scrape_id = spa.scrape_id
GROUP BY
sdv.scrape_id
) as activities
from scraper_data_v3 sdv
WHERE 1=1
and sdv.player_id IN :player_id
;
"""

sql_insert_table = """
INSERT IGNORE INTO highscore_data (player_id, scrape_ts, scrape_date, skills, activities)
SELECT player_id, scrape_ts, scrape_date, skills, activities FROM temp_hs_data thd
WHERE NOT EXISTS (
SELECT 1 FROM highscore_data hd
WHERE 1
AND thd.player_id = hd.player_id
AND thd.scrape_date = hd.scrape_date
);
"""

sql_delete_data = """
DELETE FROM scraper_data_v3 where scrape_id in (select scraper_id from temp_hs_data);
"""

async with Session() as session:
session: AsyncSession
async with session.begin():
await session.execute(sqla.text("DROP TABLE IF EXISTS temp_hs_data;"))
await session.execute(sqla.text(sql_create_temp_table))
await session.execute(
sqla.text(sql_insert_temp_table), {"player_id": player_id}
)
await session.execute(sqla.text(sql_insert_table))
await session.execute(sqla.text(sql_delete_data))
result = await session.execute(
sqla.text("select count(*) as cnt from temp_hs_data;")
)
cnt = result.mappings().all()
await session.execute(sqla.text("DROP TABLE IF EXISTS temp_hs_data;"))

await session.commit()
return cnt


async def task_migrate(queue: asyncio.Queue, semaphore: asyncio.Semaphore):
sleep = 1

while True:
if queue.empty():
await asyncio.sleep(1)
continue

try:
player_id = await queue.get()
except OperationalError as _:
await asyncio.sleep(sleep)
sleep = min(sleep * 2, 60)
continue

queue.task_done()

async with semaphore:
try:
start_time = time.time()
cnt = await migrate(player_id=player_id)
delta = int(time.time() - start_time)
logger.info(
f"[{player_id[0]}..{player_id[-1]}] l:{len(player_id)}, {delta} sec {cnt}"
)
sleep = 1
except OperationalError as e:
logger.error(
f"err: sleep: {sleep} [{player_id[0]}..{player_id[-1]}] l:{len(player_id)}, {e._message()}"
)
await asyncio.sleep(sleep)
sleep = min(sleep * 2, 60)
continue


async def task_get_players(
queue: asyncio.Queue, player_id: int = 0, limit: int = 1000, batch_size: int = 100
):
sleep = 1
while True:
logger.info(player_id)
players = await get_players_to_migrate(player_id=player_id, limit=limit)

if not players:
logger.info(f"No players to migrate, sleeping {sleep} seconds.")
await asyncio.sleep(sleep)
sleep = min(sleep * 2, 60)
continue

players = [p["player_id"] for p in players]
for i in range(0, len(players), batch_size):
batch = players[i : i + batch_size]
await queue.put(tuple(batch))

player_id = players[-1]

if len(players) < limit:
logger.info("No players to migrate, sleeping 300 seconds.")
await asyncio.sleep(300)

# bsize: 100, rows: 1000, time:
async def main():
player_id = 0
batch_size = 1_000
async_tasks = 1
limit = 1_000

player_queue = asyncio.Queue(maxsize=async_tasks+1)
# semaphore limits the number of async tasks
semaphore = asyncio.Semaphore(value=async_tasks)

get_players = asyncio.create_task(
task_get_players(player_queue, player_id, limit, batch_size)
)
migration_tasks = [
asyncio.create_task(task_migrate(player_queue, semaphore))
for _ in range(semaphore._value)
]
tasks = [get_players, *migration_tasks]
await asyncio.gather(*tasks)


if __name__ == "__main__":
asyncio.run(main())
Empty file added test;txt
Empty file.