diff --git a/CHANGELOG.md b/CHANGELOG.md index 9152db5..47d6057 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ *Andrea Sponziello* ### **Copyrigth**: *Tiledesk SRL* +## [2024-05-02] + +### 0.1.10 +- fixed: any fields of metadata cannot be None. +- added: TILELLM_ROLE=qa|train in order to manage qa and train + ## [2024-05-01] ### 0.1.9 diff --git a/README.md b/README.md index b89a76a..7610f98 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,8 @@ pip install -e . export REDIS_URL="redis://localhost:6379/0" export PINECONE_API_KEY="pinecone api key" export PINECONE_TEXT_KEY="pinecone field for text - default text in pod content" -export PINECONE_INDEX = "pinecone index name" +export PINECONE_INDEX="pinecone index name" +export TILELLM_ROLE="role in pod. Train enable all the APIs, qa do not consume redis queue only Q&A" tilellm ``` @@ -27,7 +28,7 @@ sudo docker build -t tilellm . ``` - sudo docker run -d -p 8000:8000 --env environment="dev|prod" --env PINECONE_API_KEY="yourapikey" --env PINECONE_TEXT_KEY="text|content" --env PINECONE_INDEX="index_name" --env REDIS_URL="redis://redis:6379/0" --name tilellm --link test-redis:redis tilellm + sudo docker run -d -p 8000:8000 --env environment="dev|prod" --env PINECONE_API_KEY="yourapikey" --env PINECONE_TEXT_KEY="text|content" --env PINECONE_INDEX="index_name" --env TILELLM_ROLE="train|qa" --env REDIS_URL="redis://redis:6379/0" --name tilellm --link test-redis:redis tilellm ``` diff --git a/pyproject.toml b/pyproject.toml index 844bbca..ed9d7a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tilellm" -version = "0.1.9" +version = "0.1.10" description = "tiledesk for RAG" authors = ["Gianluca Lorenzo "] repository = "https://github.com/Tiledesk/tiledesk-llm" diff --git a/tilellm/__main__.py b/tilellm/__main__.py index 0a76f39..7650f84 100644 --- a/tilellm/__main__.py +++ b/tilellm/__main__.py @@ -62,6 +62,7 @@ # os.environ.__setitem__("ENVIRON", environment) redis_url = os.environ.get("REDIS_URL") +tilellm_role = os.environ.get("TILELLM_ROLE") async def get_redis_client(): @@ -78,108 +79,111 @@ async def reader(channel: aioredis.client.Redis): :param channel: :return: """ - + from tilellm.shared import const + logger.debug(f"My role is {tilellm_role}") webhook = "" token = "" item = {} - while True: - try: - messages = await channel.xreadgroup( - groupname=const.STREAM_CONSUMER_GROUP, - consumername=const.STREAM_CONSUMER_NAME, - streams={const.STREAM_NAME: '>'}, - count=1, - block=0 # Set block to 0 for non-blocking - ) - - for stream, message_data in messages: - for message in message_data: - - message_id, message_values = message - import ast - - byte_str = message_values[b"single"] - dict_str = byte_str.decode("UTF-8") - logger.info(dict_str) - item = ast.literal_eval(dict_str) - item_single = ItemSingle(**item) - scrape_status_response = ScrapeStatusResponse(status_message="Indexing started", - status_code=2 - ) - add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", - scrape_status_response.model_dump_json(), - ex=expiration_in_seconds) - - logger.debug(f"Start {add_to_queue}") - raw_webhook = item.get('webhook', "") - if '?' in raw_webhook: - webhook, raw_token = raw_webhook.split('?') - - if raw_token.startswith('token='): - _, token = raw_token.split('=') - else: - webhook = raw_webhook - - logger.info(f"webhook: {webhook}, token: {token}") - - pc_result = await add_pc_item(item_single) - # import datetime - # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f") - - # pc_result["date"]= current_time - # pc_result["status"] = current_time - - # A POST request to the API - - scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish", - status_code=3 - ) - add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", - scrape_status_response.model_dump_json(), - ex=expiration_in_seconds) - - logger.debug(f"End {add_to_queue}") - if webhook: - try: - async with aiohttp.ClientSession() as session: - res = await session.post(webhook, - json=pc_result.model_dump(exclude_none=True), - headers={"Content-Type": "application/json", - "X-Auth-Token": token}) - logger.info(res) - logger.info(f"===========> {await res.json()}") - except Exception as ewh: - logger.error(ewh) - pass - - await channel.xack( - const.STREAM_NAME, - const.STREAM_CONSUMER_GROUP, - message_id) - - except Exception as e: - scrape_status_response = ScrapeStatusResponse(status_message="Error", - status_code=4 - ) - add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", - scrape_status_response.model_dump_json(), - ex=expiration_in_seconds) - - logger.error(f"Error {add_to_queue}") - import traceback - if webhook: - res = PineconeIndexingResult(status=400, error=repr(e)) - async with aiohttp.ClientSession() as session: - response = await session.post(webhook, json=res.model_dump(exclude_none=True), - headers={"Content-Type": "application/json", "X-Auth-Token": token}) - logger.error(response) - logger.error(f"{await response.json()}") - logger.error(f"Error {e}, webhook: {webhook}") - traceback.print_exc() - logger.error(e) - pass - + if tilellm_role == "train": + while True: + try: + messages = await channel.xreadgroup( + groupname=const.STREAM_CONSUMER_GROUP, + consumername=const.STREAM_CONSUMER_NAME, + streams={const.STREAM_NAME: '>'}, + count=1, + block=0 # Set block to 0 for non-blocking + ) + + for stream, message_data in messages: + for message in message_data: + logger.debug(f"My role is {tilellm_role} consume message") + message_id, message_values = message + import ast + + byte_str = message_values[b"single"] + dict_str = byte_str.decode("UTF-8") + logger.info(dict_str) + item = ast.literal_eval(dict_str) + item_single = ItemSingle(**item) + scrape_status_response = ScrapeStatusResponse(status_message="Indexing started", + status_code=2 + ) + add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.debug(f"Start {add_to_queue}") + raw_webhook = item.get('webhook', "") + if '?' in raw_webhook: + webhook, raw_token = raw_webhook.split('?') + + if raw_token.startswith('token='): + _, token = raw_token.split('=') + else: + webhook = raw_webhook + + logger.info(f"webhook: {webhook}, token: {token}") + + pc_result = await add_pc_item(item_single) + # import datetime + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f") + + # pc_result["date"]= current_time + # pc_result["status"] = current_time + + # A POST request to the API + + scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish", + status_code=3 + ) + add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.debug(f"End {add_to_queue}") + if webhook: + try: + async with aiohttp.ClientSession() as session: + res = await session.post(webhook, + json=pc_result.model_dump(exclude_none=True), + headers={"Content-Type": "application/json", + "X-Auth-Token": token}) + logger.info(res) + logger.info(f"===========> {await res.json()}") + except Exception as ewh: + logger.error(ewh) + pass + + await channel.xack( + const.STREAM_NAME, + const.STREAM_CONSUMER_GROUP, + message_id) + + except Exception as e: + scrape_status_response = ScrapeStatusResponse(status_message="Error", + status_code=4 + ) + add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.error(f"Error {add_to_queue}") + import traceback + if webhook: + res = PineconeIndexingResult(status=400, error=repr(e)) + async with aiohttp.ClientSession() as session: + response = await session.post(webhook, json=res.model_dump(exclude_none=True), + headers={"Content-Type": "application/json", "X-Auth-Token": token}) + logger.error(response) + logger.error(f"{await response.json()}") + logger.error(f"Error {e}, webhook: {webhook}") + traceback.print_exc() + logger.error(e) + pass + else: + logger.debug(f"My role is {tilellm_role}") @asynccontextmanager async def redis_consumer(app: FastAPI): @@ -191,8 +195,8 @@ async def redis_consumer(app: FastAPI): await redis_client.close() -populate_constant() +populate_constant() app = FastAPI(lifespan=redis_consumer) @@ -384,7 +388,7 @@ async def get_root_endpoint(): def main(): - print(f"Ambiente: {environment}") + logger.debug(f"Environment: {environment}") import uvicorn uvicorn.run("tilellm.__main__:app", host="0.0.0.0", port=8000, reload=True, log_level="info")#, log_config=args.log_path diff --git a/tilellm/shared/const.py b/tilellm/shared/const.py index babfa79..62cd2c7 100644 --- a/tilellm/shared/const.py +++ b/tilellm/shared/const.py @@ -1,13 +1,14 @@ import os -STREAM_NAME="stream:single" -STREAM_CONSUMER_NAME="llmconsumer" -STREAM_CONSUMER_GROUP="llmconsumergroup" +STREAM_NAME = "stream:single" +STREAM_CONSUMER_NAME = "llmconsumer" +STREAM_CONSUMER_GROUP = "llmconsumergroup" PINECONE_API_KEY = None PINECONE_INDEX = None PINECONE_TEXT_KEY = None + def populate_constant(): global PINECONE_API_KEY, PINECONE_INDEX, PINECONE_TEXT_KEY PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") @@ -16,3 +17,4 @@ def populate_constant(): + diff --git a/tilellm/store/pinecone_repository.py b/tilellm/store/pinecone_repository.py index ec39e32..22332cd 100644 --- a/tilellm/store/pinecone_repository.py +++ b/tilellm/store/pinecone_repository.py @@ -54,8 +54,20 @@ async def add_pc_item(item): document.metadata["type"] = type_source document.metadata["embedding"] = embedding + for key, value in document.metadata.items(): + if isinstance(value, list) and all(item is None for item in value): + document.metadata[key] = [""] + elif value is None: + document.metadata[key] = "" + chunks.extend(chunk_data(data=[document])) + + + # from pprint import pprint + # pprint(documents) + logger.debug(documents) + a = vector_store.from_documents(chunks, embedding=oai_embeddings, index_name=const.PINECONE_INDEX,