Skip to content

Commit

Permalink
fully working code
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 6, 2024
1 parent f3647b1 commit 0206945
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
Binary file modified kafka_setup/kafka_data/kafka_data.zip
Binary file not shown.
5 changes: 3 additions & 2 deletions src/app/views/player.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel
Expand Down Expand Up @@ -31,8 +32,8 @@ class PlayerUpdate(BaseModel):

class PlayerInDB(PlayerCreate):
id: int
created_at: str
updated_at: str
created_at: datetime
updated_at: datetime


class Player(PlayerInDB):
Expand Down
2 changes: 1 addition & 1 deletion src/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Settings(BaseSettings):
DATABASE_URL: str
POOL_TIMEOUT: int
POOL_RECYCLE: int
ENV: str = "DEV"
ENV: str = "PRD"


settings = Settings()
5 changes: 5 additions & 0 deletions src/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ async def get_session() -> AsyncSession:
return SessionFactory()


def model_to_dict(model):
"""Converts an SQLAlchemy model instance to a dictionary."""
return {c.name: getattr(model, c.name) for c in model.__table__.columns}


Base = declarative_base()
12 changes: 5 additions & 7 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from app.views.player import PlayerInDB
from app.views.report import ReportInQueue, StgReportCreate, convert_report_q_to_db
from core.config import settings
from database.database import get_session
from database.database import get_session, model_to_dict
from database.models.player import Player
from database.models.report import StgReport

Expand All @@ -27,15 +27,15 @@ async def select_player(session: AsyncSession, name: str) -> PlayerInDB:
sql = sql.where(Player.name == name)
result: AsyncResult = await session.execute(sql)
data = result.scalars().all()
return PlayerInDB(**data[0]) if data else None
return PlayerInDB(**model_to_dict(data[0])) if data else None


# TODO: pydantic data
async def insert_report(session: AsyncSession, data: StgReportCreate):
sql: Insert = insert(StgReport)
sql = sql.values(data.model_dump(mode="json"))
sql = sql.prefix_with("IGNORE")
# await session.execute(sql)
await session.execute(sql)
return


Expand All @@ -62,8 +62,6 @@ async def process_data(receive_queue: Queue, error_queue: Queue, shutdown_event:
# Get a message from the chosen queue
message: dict = await receive_queue.get()
parsed_msg = ReportInQueue(**message)
# TEMP
print(parsed_msg)

try:
# Acquire an asynchronous database session
Expand All @@ -73,15 +71,15 @@ async def process_data(receive_queue: Queue, error_queue: Queue, shutdown_event:
session=session, name=parsed_msg.reporter
)
if reporter is None:
logger.error(f"reporter does not exist {parsed_msg.reporter}")
logger.error(f"reporter does not exist: '{parsed_msg.reporter}'")
receive_queue.task_done()
continue

reported = await select_player(
session=session, name=parsed_msg.reported
)
if reported is None:
logger.error(f"reported does not exist {parsed_msg.reported}")
logger.error(f"reported does not exist: '{parsed_msg.reported}'")
receive_queue.task_done()
continue

Expand Down

0 comments on commit 0206945

Please sign in to comment.