Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Make async
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Feb 26, 2024
1 parent 167e0b4 commit 8be6ed6
Showing 1 changed file with 67 additions and 58 deletions.
125 changes: 67 additions & 58 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from datetime import datetime
from tm_admin.users.users_class import UsersTable
from tm_admin.tasks.api import TasksAPI
import asyncio
from codetiming import Timer

# Instantiate logger
log = logging.getLogger(__name__)
Expand All @@ -44,128 +46,130 @@
# FIXME: For now these tests assume you have a local postgres installed. One has the TM
# database, the other for tm_admin.

def get_task():
async def get_task():
log.debug(f"--- get_task() unimplemented!")
task_id = 1
result = await task.getByID(task_id)
# task_id: int, project_id: int) -> Task:

def _is_task_undoable():
async def _is_task_undoable():
"""Determines if the current task status can be undone by the logged in user"""
log.debug(f"--- _is_task_undoable() unimplemented!")
#logged_in_user_id: int, task: Task) -> bool:

def lock_task_for_mapping():
async def lock_task_for_mapping():
log.debug(f"--- lock_task_for_mapping() unimplemented!")
# lock_task_dto: LockTaskDTO) -> TaskDTO:

def unlock_task_after_mapping():
async def unlock_task_after_mapping():
"""Unlocks the task and sets the task history appropriately"""
log.debug(f"--- unlock_task_after_mapping() unimplemented!")
# mapped_task: MappedTaskDTO) -> TaskDTO:

def stop_mapping_task():
async def stop_mapping_task():
# stop_task: StopMappingTaskDTO) -> TaskDTO:
log.debug(f"--- stop_mapping_task() unimplemented!")

"""Unlocks the task and revert the task status to the last one"""
def get_task_locked_by_user():
async def get_task_locked_by_user():
log.debug(f"--- get_task_locked_by_user() unimplemented!")
# project_id: int, task_id: int, user_id: int) -> Task:

def add_task_comment():
async def add_task_comment():
"""Adds the comment to the task history"""
log.debug(f"--- add_task_comment() unimplemented!")
# task_comment: TaskCommentDTO) -> TaskDTO:

def generate_gpx():
async def generate_gpx():
log.debug(f"--- generate_gpx() unimplemented!")
# project_id: int, task_ids_str: str, timestamp=None):

def generate_osm_xml():
async def generate_osm_xml():
log.debug(f"--- generate_osm_xml() unimplemented!")
# project_id: int, task_ids_str: str) -> str:

"""Generate xml response suitable for loading into JOSM. A sample output file is in"""
def undo_mapping():
async def undo_mapping():
log.debug(f"--- undo_mapping() unimplemented!")
#project_id: int, task_id: int, user_id: int, preferred_locale: str = "en"

def map_all_tasks():
async def map_all_tasks():
log.debug(f"--- map_all_tasks() unimplemented!")
# project_id: int, user_id: int):

"""Marks all tasks on a project as mapped"""
def reset_all_badimagery():
async def reset_all_badimagery():
log.debug(f"--- reset_all_badimagery() unimplemented!")
# project_id: int, user_id: int):

"""Marks all bad imagery tasks ready for mapping"""
def lock_time_can_be_extended():
async def lock_time_can_be_extended():
log.debug(f"--- lock_time_can_be_extended() unimplemented!")
# project_id, task_id, user_id):

# task = Task.get(task_id, project_id)
def extend_task_lock_time():
async def extend_task_lock_time():
log.debug(f"--- extend_task_lock_time() unimplemented!")
#extend_dto: ExtendLockTimeDTO):

def get_task_as_dto():
async def get_task_as_dto():
log.debug(f"--- get_task_as_dto( unimplemented!")
# task_id: int,

# FMTM API tests
def get_task_count_in_project():
async def get_task_count_in_project():
# db: Session, project_id: int):
log.debug(f"--- get_task_count_in_project() unimplemented!")

def get_task_lists():
async def get_task_lists():
# db: Session, project_id: int):
log.debug(f"--- get_task_lists() unimplemented!")

def get_tasks():
async def get_tasks():
#
log.debug(f"--- get_tasks() unimplemented!")

def get_task():
async def get_task():
# db: Session, task_id: int, db_obj: bool = False):
log.debug(f"--- get_task() unimplemented!")

def update_task_status():
async def update_task_status():
# db: Session, user_id: int, task_id: int, new_status: TaskStatus):
log.debug(f"--- update_task_status() unimplemented!")

def update_qrcode():
async def update_qrcode():
log.debug(f"--- update_qrcode() unimplemented!")

def create_task_history_for_status_change():
async def create_task_history_for_status_change():
log.debug(f"--- create_task_history_for_status_change() unimplemented!")

def convert_to_app_history():
async def convert_to_app_history():
# db_histories: List[db_models.DbTaskHistory]):
log.debug(f"--- convert_to_app_history() unimplemented!")

def convert_to_app_task():
async def convert_to_app_task():
# db_task: db_models.DbTask):
log.debug(f"--- convert_to_app_task() unimplemented!")

def convert_to_app_tasks():
async def convert_to_app_tasks():
# db_tasks: List[db_models.DbTask]):
log.debug(f"--- convert_to_app_tasks() unimplemented!")

def get_qr_codes_for_task():
async def get_qr_codes_for_task():
log.debug(f"--- get_qr_codes_for_task() unimplemented!")

def get_task_by_id():
async def get_task_by_id():
# db: Session, task_id: int):
log.debug(f"--- get_task_by_id() unimplemented!")

def update_task_files():
async def update_task_files():
log.debug(f"--- update_task_files() unimplemented!")

def edit_task_boundary():
async def edit_task_boundary():
# db: Session, task_id: int, boundary: str):
log.debug(f"--- edit_task_boundary() unimplemented!")

if __name__ == "__main__":
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", nargs="?", const="0", help="verbose output")
parser.add_argument("-u", "--uri", default='localhost/tm_admin', help="Database URI")
Expand All @@ -188,33 +192,38 @@ def edit_task_boundary():
await task.connect(args.uri)
# project = ProjectsDB(args.uri)

get_task()
get_task_as_dto()
_is_task_undoable()
lock_task_for_mapping()
unlock_task_after_mapping()
stop_mapping_task()
get_task_locked_by_user()
add_task_comment()
generate_gpx()
generate_osm_xml()
undo_mapping()
map_all_tasks()
reset_all_badimagery()
lock_time_can_be_extended()
extend_task_lock_time()
await get_task()
await get_task_as_dto()
await _is_task_undoable()
await lock_task_for_mapping()
await unlock_task_after_mapping()
await stop_mapping_task()
await get_task_locked_by_user()
await add_task_comment()
await generate_gpx()
await generate_osm_xml()
await undo_mapping()
await map_all_tasks()
await reset_all_badimagery()
await lock_time_can_be_extended()
await extend_task_lock_time()

# FMTM API tests

get_tasks()
get_task()
update_task_status()
update_qrcode()
create_task_history_for_status_change()
convert_to_app_history()
convert_to_app_task()
convert_to_app_tasks()
get_qr_codes_for_task()
get_task_by_id()
update_task_files()
edit_task_boundary()
await get_tasks()
await get_task()
await update_task_status()
await update_qrcode()
await create_task_history_for_status_change()
await convert_to_app_history()
await convert_to_app_task()
await convert_to_app_tasks()
await get_qr_codes_for_task()
await get_task_by_id()
await update_task_files()
await edit_task_boundary()

if __name__ == "__main__":
"""This is just a hook so this file can be run standalone during development."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())

0 comments on commit 8be6ed6

Please sign in to comment.