From 918db42db00c2a0746ca496e86e6fdd6fe9f08c6 Mon Sep 17 00:00:00 2001 From: glorenzo972 Date: Sat, 18 May 2024 11:12:47 +0200 Subject: [PATCH] added /api/scrape/single and /api/scrape/enqueue --- CHANGELOG.md | 8 +- pyproject.toml | 2 +- tilellm/__main__.py | 115 ++++++++++++++++++++++++--- tilellm/store/pinecone_repository.py | 4 +- 4 files changed, 117 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8120207..cd093c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ *Andrea Sponziello* ### **Copyrigth**: *Tiledesk SRL* +## [2024-05-18] + +### 0.1.16 +- added: /api/scrape/single without redis queue +- added: /api/scrape/enqueue to enqueue item into redis queue + ## [2024-05-14] ### 0.1.15 @@ -15,7 +21,7 @@ ### 0.1.14 - added parameter to entrypoint.sh -- + ## [2024-05-06] ### 0.1.13 diff --git a/pyproject.toml b/pyproject.toml index f4edec9..b6a4195 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tilellm" -version = "0.1.15" +version = "0.1.16" description = "tiledesk for RAG" authors = ["Gianluca Lorenzo "] repository = "https://github.com/Tiledesk/tiledesk-llm" diff --git a/tilellm/__main__.py b/tilellm/__main__.py index 3a02fcd..99dc0e3 100644 --- a/tilellm/__main__.py +++ b/tilellm/__main__.py @@ -98,9 +98,9 @@ async def reader(channel: aioredis.client.Redis): ) for stream, message_data in messages: - for message in message_data: + for message_id, message_values in message_data: logger.debug(f"My role is {tilellm_role} consume message") - message_id, message_values = message + # message_id, message_values= message import ast byte_str = message_values[b"single"] @@ -136,8 +136,7 @@ async def reader(channel: aioredis.client.Redis): json=res.model_dump(exclude_none=True), headers={"Content-Type": "application/json", "X-Auth-Token": token}) - logger.info(res) - logger.info(f"200 ==>{await res.json()}") + logger.info(f"200 {await res.json()}") except Exception as ewh: logger.error(ewh) pass @@ -166,8 +165,7 @@ async def reader(channel: aioredis.client.Redis): json=pc_result.model_dump(exclude_none=True), headers={"Content-Type": "application/json", "X-Auth-Token": token}) - logger.info(res) - logger.info(f"===========> {await res.json()}") + logger.info(f"300 {await res.json()}") except Exception as ewh: logger.error(ewh) pass @@ -176,6 +174,7 @@ async def reader(channel: aioredis.client.Redis): const.STREAM_NAME, const.STREAM_CONSUMER_GROUP, message_id) + logger.info(f"xack to message_id: {message_id}") except Exception as e: scrape_status_response = ScrapeStatusResponse(status_message="Error", @@ -216,10 +215,10 @@ async def redis_consumer(app: FastAPI): app = FastAPI(lifespan=redis_consumer) -@app.post("/api/scrape/single") -async def create_scrape_item_main(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)): +@app.post("/api/scrape/enqueue") +async def enqueue_scrape_item_main(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)): """ - Add items to namespace + enqueue item to redis. Consumer read message and add it to namespace :param item: :param redis_client: :return: @@ -238,6 +237,104 @@ async def create_scrape_item_main(item: ItemSingle, redis_client: aioredis.clien return {"message": f"Item {item.id} created successfully, more {res}"} +@app.post("/api/scrape/single") +async def create_scrape_item_single(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)): + """ + Add item to namespace + :param item: + :param redis_client: + :return: + """ + webhook = "" + token = "" + + try: + logger.debug(item) + scrape_status_response = ScrapeStatusResponse(status_message="Indexing started", + status_code=2 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.debug(f"Start {add_to_queue}") + + raw_webhook = item.webhook + if '?' in raw_webhook: + webhook, raw_token = raw_webhook.split('?') + + if raw_token.startswith('token='): + _, token = raw_token.split('=') + else: + webhook = raw_webhook + + logger.info(f"webhook: {webhook}, token: {token}") + + if webhook: + res = PineconeIndexingResult(id=item.id, status=200) + try: + async with aiohttp.ClientSession() as session: + res = await session.post(webhook, + json=res.model_dump(exclude_none=True), + headers={"Content-Type": "application/json", + "X-Auth-Token": token}) + logger.info(f"200 {await res.json()}") + except Exception as ewh: + logger.error(ewh) + pass + + pc_result = await add_pc_item(item) + # import datetime + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f") + + # pc_result["date"]= current_time + # pc_result["status"] = current_time + + # A POST request to the API + + scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish", + status_code=3 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.debug(f"End {add_to_queue}") + if webhook: + try: + async with aiohttp.ClientSession() as session: + res = await session.post(webhook, + json=pc_result.model_dump(exclude_none=True), + headers={"Content-Type": "application/json", + "X-Auth-Token": token}) + logger.info(f"300 {await res.json()}") + except Exception as ewh: + logger.error(ewh) + pass + + return JSONResponse(content={"message": f"Item {item.id} created successfully"}) + + except Exception as e: + scrape_status_response = ScrapeStatusResponse(status_message="Error", + status_code=4 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.error(f"Error {add_to_queue}") + import traceback + if webhook: + res = PineconeIndexingResult(id=item.id, status=400, error=repr(e)) + async with aiohttp.ClientSession() as session: + response = await session.post(webhook, json=res.model_dump(exclude_none=True), + headers={"Content-Type": "application/json", "X-Auth-Token": token}) + logger.error(response) + logger.error(f"{await response.json()}") + logger.error(f"Error {e}, webhook: {webhook}") + traceback.print_exc() + logger.error(e) + raise HTTPException(status_code=400, detail=repr(e)) @app.post("/api/qa") async def post_ask_with_memory_main(question_answer: QuestionAnswer): diff --git a/tilellm/store/pinecone_repository.py b/tilellm/store/pinecone_repository.py index 5c7cc0f..c81e418 100644 --- a/tilellm/store/pinecone_repository.py +++ b/tilellm/store/pinecone_repository.py @@ -297,9 +297,11 @@ async def get_pc_all_obj_namespace(namespace: str): logger.debug(f"pinecone total vector in {namespace}: {total_vectors}") + batch_size = min([total_vectors, 1000]) + pc_res = index.query( vector=[0] * 1536, # [0,0,0,0......0] - top_k=total_vectors, + top_k=batch_size, # filter={"id": {"$eq": id}}, namespace=namespace, include_values=False,