Skip to content

Commit

Permalink
split reading & inserting
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed May 26, 2024
1 parent b1a00ad commit 88ce226
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/app/controllers/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def get(self, player_name: str) -> PlayerInDB:

async def insert(self, player: PlayerCreate) -> PlayerInDB:
player.name = self.sanitize_name(player.name)
sql = sqla.insert(DBPlayer).values(player.model_dump())
sql = sqla.insert(DBPlayer).values(player.model_dump()).prefix_with("IGNORE")
await self.session.execute(sql)
return await self.get(player_name=player.name)

Expand Down
80 changes: 61 additions & 19 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
logger = logging.getLogger(__name__)


async def create_insert_batch(report_queue: Queue):
last_time = time.time()
batch = []
async def check_duplicate_report(report_queue: Queue, valid_report_queue: Queue):
while True:
# Check if both queues are empty
if report_queue.empty():
Expand All @@ -38,20 +36,12 @@ async def create_insert_batch(report_queue: Queue):
reporting_id=msg.reportingID,
region_id=msg.region_id,
)
if report:
continue
# append to batch
batch.append(msg)

now = time.time()
if now - last_time > 10:
logger.debug(f"batch inserting: {len(batch)}")
last_time = time.time()
await report_controller.insert(reports=batch)
logger.debug(
f"inserted: {len(batch)}, duration (sec): {int(time.time()-last_time)}"
)
batch = []

# skip duplicate reports
if report:
continue

await valid_report_queue.put(msg)
except OperationalError as e:
await report_queue.put(msg)
logger.error({"error": e})
Expand All @@ -65,6 +55,50 @@ async def create_insert_batch(report_queue: Queue):
continue


async def queue_to_batch(queue: Queue, max_len: int = None) -> list:
output = []
max_len = max_len if max_len else queue.qsize()
for _ in range(max_len):
msg = await queue.get()
queue.task_done()
output.append(msg)
return output


async def insert_batch(valid_report_queue: Queue):
last_time = time.time()
while True:
if valid_report_queue.empty():
await asyncio.sleep(1)
continue

if time.time() - last_time < 10:
await asyncio.sleep(1)
continue

try:
# Acquire an asynchronous database session
session: AsyncSession = await get_session()
async with session.begin():
report_controller = ReportController(session=session)

batch = await queue_to_batch(queue=valid_report_queue)
logger.debug(f"batch inserting: {len(batch)}")
await report_controller.insert(reports=batch)
last_time = time.time()

except OperationalError as e:
await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch])
logger.error({"error": e})
await asyncio.sleep(5)

except Exception as e:
await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch])
logger.error({"error": e})
logger.debug(f"Traceback: \n{traceback.format_exc()}")
await asyncio.sleep(5)


async def process_data(report_queue: Queue):
receive_queue = consumer.get_queue()
error_queue = producer.get_queue()
Expand Down Expand Up @@ -122,11 +156,19 @@ async def process_data(report_queue: Queue):

async def main():
report_queue = Queue(maxsize=500)
valid_report_queue = Queue()
await producer.start_engine(topic="report")
await consumer.start_engine(topics=["report"])

asyncio.create_task(process_data(report_queue))
asyncio.create_task(create_insert_batch(report_queue))
for _ in range(5):
asyncio.create_task(process_data(report_queue=report_queue))
asyncio.create_task(
check_duplicate_report(
report_queue=report_queue, valid_report_queue=valid_report_queue
)
)
asyncio.create_task(insert_batch(valid_report_queue=valid_report_queue))

while True:
await asyncio.sleep(60)

Expand Down

0 comments on commit 88ce226

Please sign in to comment.