diff --git a/CHANGELOG.md b/CHANGELOG.md index 69f9a9b..b29bf04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,21 @@ *Andrea Sponziello* ### **Copyrigth**: *Tiledesk SRL* +## [2024-09-21] +### 0.3.1 +- add sentence embedding with bge-m3 +- add: hybrid search with bg3-m3 +- modify: deleted env variable for vector store + +## [2024-09-23] +### 0.3.0 +- add: hybrid search +- add: indexing based on spade +- minor fix + + ## [2024-09-17] -### 0.2.19 +### 0.2.20 - upgrade: worker to 0.0.27 ## [2024-09-14] diff --git a/Dockerfile b/Dockerfile index c54574d..e163a5b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,8 +16,11 @@ RUN python -m nltk.downloader punkt RUN python -m nltk.downloader punkt_tab RUN python -m nltk.downloader averaged_perceptron_tagger RUN python -m nltk.downloader averaged_perceptron_tagger_eng +RUN python -m nltk.downloader stopwords RUN playwright install chromium RUN playwright install-deps chromium +RUN python -c "from transformers import AutoModelForSequenceClassification; model = AutoModelForSequenceClassification.from_pretrained('BAAI/bge-m3');" +RUN python -c "from transformers import AutoModelForSequenceClassification; model = AutoModelForSequenceClassification.from_pretrained('naver/splade-cocondenser-ensembledistil');" # Aggiustare redis ENV REDIS_HOST=redis ENV REDIS_URL=redis://redis:6379/0 diff --git a/README.md b/README.md index e76cd4b..64f2f15 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,7 @@ Models for /api/ask - gpt-4 - gpt-4-turbo - got-4o +- got-4o-mini ### Cohere - engine: cohere - command-r @@ -135,4 +136,58 @@ In this method, any difference greater than X standard deviations is split. In this method, the interquartile distance is used to split chunks. ### gradient -In this method, the gradient of distance is used to split chunks along with the percentile method. This method is useful when chunks are highly correlated with each other or specific to a domain e.g. legal or medical. The idea is to apply anomaly detection on gradient array so that the distribution become wider and easy to identify boundaries in highly semantic data. \ No newline at end of file +In this method, the gradient of distance is used to split chunks along with the percentile method. This method is useful when chunks are highly correlated with each other or specific to a domain e.g. legal or medical. The idea is to apply anomaly detection on gradient array so that the distribution become wider and easy to identify boundaries in highly semantic data. + + +## Hybrid Search + +### /api/scrape/single + +```json +{ + ... + "embedding":"huggingface", + "hybrid":true, + "sparse_encoder":"splade|bge-m3", + ... + "engine": + { + "name": "", + "type": "", + "apikey" : "", + "vector_size": 1024, + "index_name": "" + } +} +``` + +### /api/qa + +```json +{ + "question": "question", + "namespace": "", + "debug":true, + "citations":true, + "llm": "anthropic|groq", + "gptkey": "api-key of llm", + "model": "es. claude-3-5-sonnet-20240620 | llama-3.1-70b-versatile", + "temperature": 0.9, + "max_tokens":2048, + "embedding":"huggingfacce", + "sparse_encoder":"splade|bge-m3", + "search_type":"hybrid", + "alpha": 0.2, + "similarity_threshold":0.95, + "system_context":"", + "top_k": 6, + "engine": + { + "name": "", + "type": "", + "apikey" : "", + "vector_size": 1024, + "index_name": "" + } +} +``` diff --git a/pyproject.toml b/pyproject.toml index 44f66df..309eeaa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tilellm" -version = "0.2.20" +version = "0.3.0" description = "tiledesk for RAG" authors = ["Gianluca Lorenzo "] repository = "https://github.com/Tiledesk/tiledesk-llm" @@ -18,20 +18,23 @@ jsonschema= "4.23.0" redis= "^5.0.7" aioredis= "2.0.1" #redismutex = "^1.0.0" -langchain = "0.3.0" #"0.2.16" +langchain = "0.3.1"#"0.3.0" #"0.2.16" jq = "1.8.0" #"1.7.0" -openai = "1.45.1" #"1.37.1" -langchain-openai = "0.2.0" #"0.1.19" +openai = "1.48.0" #"1.45.1" #"1.37.1" +langchain-openai ="0.2.1" #"0.2.0" #"0.1.19" langchain-voyageai = "0.1.2" #"0.1.1" -langchain-anthropic = "0.2.0" #"0.1.21" +langchain-anthropic = "0.2.1" #"0.2.0" #"0.1.21" langchain-cohere= "0.3.0" #"0.1.9" langchain-google-genai= "2.0.0" #"1.0.8" langchain-groq = "0.2.0" #"0.1.8" -langchain-aws= "0.2.0" #"0.1.12" +langchain-aws= "0.2.1" #"0.1.12" pinecone-client = "5.0.1" #"5.0.0" python-dotenv = "1.0.1" -langchain-community = "0.3.0" #"0.2.10" -langchain-experimental = "0.3.0" #no previous +langchain-community = "0.3.1" #"0.2.10" +langchain-experimental = "0.3.1" #no previous +langchain-pinecone = "0.2.0" +langchain-huggingface="0.1.0" +peft = "0.13.0" tiktoken = "0.7.0" beautifulsoup4 = "4.12.3" @@ -45,8 +48,18 @@ html2text= "2024.2.26" psutil= "6.0.0" httpx= "0.27.2" #"0.27.0" gql= "3.5.0" +PyJWT= "2.9.0" +#pinecone-text= "0.9.0" +torch="2.4.1" +FlagEmbedding="1.2.11" + + +[tool.poetry.dependencies.pinecone-text] +version = "0.9.0" +extras = ["splade"] + [tool.poetry.dependencies.uvicorn] version = "0.30.6" #"0.30.3" extras = ["standard"] diff --git a/tilellm/__main__.py b/tilellm/__main__.py index e4d491e..23c8849 100644 --- a/tilellm/__main__.py +++ b/tilellm/__main__.py @@ -5,6 +5,8 @@ HTTPException) from fastapi.responses import JSONResponse +from pydantic_core import from_json + # import argparse import aioredis @@ -13,22 +15,26 @@ import aiohttp import json from dotenv import load_dotenv +from openai import api_key from tilellm.shared.const import populate_constant +from tilellm.shared.utility import decode_jwt from tilellm.models.item_model import (ItemSingle, QuestionAnswer, - PineconeItemToDelete, - PineconeNamespaceToDelete, + RepositoryItem, + RepositoryNamespace, ScrapeStatusReq, ScrapeStatusResponse, - PineconeIndexingResult, RetrievalResult, PineconeNamespaceResult, - PineconeDescNamespaceResult, PineconeItems, QuestionToLLM, SimpleAnswer, - QuestionToAgent) + IndexingResult, RetrievalResult, RepositoryNamespaceResult, + RepositoryDescNamespaceResult, RepositoryItems, QuestionToLLM, SimpleAnswer, + QuestionToAgent, Engine, RepositoryEngine) from tilellm.store.redis_repository import redis_xgroup_create from tilellm.controller.controller import (ask_with_memory, + ask_hybrid_with_memory, ask_with_sequence, add_pc_item, + add_pc_item_hybrid, delete_namespace, delete_id_from_namespace, delete_chunk_id_from_namespace, @@ -129,7 +135,7 @@ async def reader(channel: aioredis.client.Redis): logger.info(f"webhook: {webhook}, token: {token}") if webhook: - res = PineconeIndexingResult(id=item.get('id'), status=200) + res = IndexingResult(id=item.get('id'), status=200) try: async with aiohttp.ClientSession() as session: res = await session.post(webhook, @@ -187,7 +193,7 @@ async def reader(channel: aioredis.client.Redis): logger.error(f"Error {add_to_queue}") import traceback if webhook: - res = PineconeIndexingResult(id=item.get('id'), status=400, error=repr(e)) + res = IndexingResult(id=item.get('id'), status=400, error=repr(e)) async with aiohttp.ClientSession() as session: response = await session.post(webhook, json=res.model_dump(exclude_none=True), headers={"Content-Type": "application/json", "X-Auth-Token": token}) @@ -238,7 +244,7 @@ async def enqueue_scrape_item_main(item: ItemSingle, redis_client: aioredis.clie return {"message": f"Item {item.id} created successfully, more {res}"} -@app.post("/api/scrape/single", response_model=PineconeIndexingResult) +@app.post("/api/scrape/single", response_model=IndexingResult) async def create_scrape_item_single(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)): """ Add item to namespace @@ -283,8 +289,10 @@ async def create_scrape_item_single(item: ItemSingle, redis_client: aioredis.cli # except Exception as ewh: # logger.error(ewh) # pass - - pc_result = await add_pc_item(item) + if item.hybrid: + pc_result = await add_pc_item_hybrid(item) + else: + pc_result = await add_pc_item(item) # import datetime # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f") @@ -337,6 +345,65 @@ async def create_scrape_item_single(item: ItemSingle, redis_client: aioredis.cli logger.error(e) raise HTTPException(status_code=400, detail=repr(e)) +@app.post("/api/scrape/hybrid", response_model=IndexingResult) +async def create_scrape_item_hybrid(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)): + """ + Add item to namespace + :param item: + :param redis_client: + :return: PineconeIndexingResult + """ + webhook = "" + token = "" + + try: + logger.debug(item) + scrape_status_response = ScrapeStatusResponse(status_message="Indexing started", + status_code=2 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.debug(f"Start {add_to_queue}") + + raw_webhook = item.webhook + if '?' in raw_webhook: + webhook, raw_token = raw_webhook.split('?') + + if raw_token.startswith('token='): + _, token = raw_token.split('=') + else: + webhook = raw_webhook + + logger.info(f"webhook: {webhook}, token: {token}") + + pc_result = await add_pc_item_hybrid(item) + + scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish", + status_code=3 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + return JSONResponse(content=pc_result.model_dump(exclude_none=True)) # {"message": f"Item {item.id} created successfully"}) + + except Exception as e: + scrape_status_response = ScrapeStatusResponse(status_message="Error", + status_code=4 + ) + add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}", + scrape_status_response.model_dump_json(), + ex=expiration_in_seconds) + + logger.error(f"Error {add_to_queue}") + import traceback + + traceback.print_exc() + logger.error(e) + raise HTTPException(status_code=400, detail=repr(e)) + @app.post("/api/qa", response_model=RetrievalResult) async def post_ask_with_memory_main(question_answer: QuestionAnswer): @@ -346,8 +413,10 @@ async def post_ask_with_memory_main(question_answer: QuestionAnswer): :return: RetrievalResult """ logger.debug(question_answer) - - result = await ask_with_memory(question_answer) + if question_answer.search_type == 'hybrid': + result= await ask_hybrid_with_memory(question_answer) + else: + result = await ask_with_memory(question_answer) logger.debug(result) return JSONResponse(content=result.model_dump()) @@ -409,55 +478,6 @@ async def post_ask_with_memory_chain_main(question_answer: QuestionAnswer): # return result -@app.get("/api/list/namespace", response_model=PineconeNamespaceResult) -async def list_namespace_main(): - """ - Get all namespaces with id and vector count - :return: list of namespace - """ - try: - logger.debug(f"All Namespaces ") - result = await get_list_namespace() - return JSONResponse(content=result.model_dump(exclude_none=True)) - except Exception as ex: - logger.error(ex) - raise HTTPException(status_code=400, detail=repr(ex)) - - -@app.get("/api/desc/namespace/{namespace}", response_model=PineconeDescNamespaceResult) -async def list_namespace_items_main(namespace: str): - """ - Get description for given namespace - :param namespace: namespace_id - :return: description of namespace - """ - try: - logger.info(f"retrieve description for namespace {namespace}") - result = await get_desc_namespace(namespace) - - return JSONResponse(content=result.model_dump(exclude_none=True)) - except Exception as ex: - logger.error(ex) - raise HTTPException(status_code=400, detail=repr(ex)) - - -@app.get("/api/listitems/namespace/{namespace}", response_model=PineconeItems) -async def list_namespace_items_main(namespace: str): - """ - Get all item with given namespace - :param namespace: namespace_id - :return: list of all item into namespace - """ - try: - logger.info(f"retrieve namespace {namespace}") - result = await get_listitems_namespace(namespace) - - return JSONResponse(content=result.model_dump(exclude_none=True)) - except Exception as ex: - logger.error(ex) - raise HTTPException(status_code=400, detail=repr(ex)) - - @app.post("/api/scrape/status", response_model= ScrapeStatusResponse) async def scrape_status_main(scrape_status_req: ScrapeStatusReq, @@ -497,125 +517,141 @@ async def scrape_status_main(scrape_status_req: ScrapeStatusReq, raise HTTPException(status_code=400, detail=repr(ex)) -@app.delete("/api/namespace/{namespace}") -async def delete_namespace_main(namespace: str): - """ - Delete namespace from index - :param namespace: - :return: - """ - try: - result = await delete_namespace(namespace) - return JSONResponse(content={"message": f"Namespace {namespace} deleted"}) - except Exception as ex: - raise HTTPException(status_code=400, detail=repr(ex)) - - -@app.delete("/api/chunk/{chunk_id}/namespace/{namespace}") -async def delete_item_chunk_id_namespace_main(chunk_id: str, namespace: str): +@app.post("/api/delete/id", deprecated=True, + description="This endpoint is deprecated and is no longer supported. " + "Use method DELETE /api/id/{id}/namespace/{namespace}") +async def delete_item_id_namespace_post(item_to_delete: RepositoryItem): """ - Delete items from namespace identified by id and namespace - :param chunk_id: - :param namespace: + Delete items from namespace given document id via POST. + :param item_to_delete: :return: """ try: + metadata_id = item_to_delete.id + namespace = item_to_delete.namespace - logger.info(f"delete id {chunk_id} dal namespace {namespace}") - result = await delete_chunk_id_from_namespace(chunk_id, namespace) + logger.info(f"delete of id {metadata_id} dal namespace {namespace}") + result = await delete_id_from_namespace(item_to_delete, metadata_id, namespace) - return JSONResponse(content={"message": f"ids {chunk_id} in Namespace {namespace} deleted"}) + return JSONResponse(content={"success": True, "message": f"ids {metadata_id} in Namespace {namespace} deleted"}) except Exception as ex: - print(repr(ex)) - logger.error(ex) - raise HTTPException(status_code=400, detail=repr(ex)) + return JSONResponse(content={"success": False, "message": f"ids {metadata_id} in Namespace {namespace} not deleted due to {repr(ex)}"}) + # raise HTTPException(status_code=400, detail=repr(ex)) @app.post("/api/delete/namespace") -async def delete_namespace_main(namespace_to_delete: PineconeNamespaceToDelete): +async def delete_namespace_main(namespace_to_delete: RepositoryNamespace): """ Delete Pinecone namespace by namespace_id :param namespace_to_delete: :return: """ try: - result = await delete_namespace(namespace_to_delete.namespace) + result = await delete_namespace(namespace_to_delete) return JSONResponse(content={"success": "true", "message": f"{namespace_to_delete.namespace} is deleted from database"}) except Exception as ex: raise HTTPException(status_code=400, detail={"success": "false", "message": repr(ex)}) -@app.delete("/api/id/{metadata_id}/namespace/{namespace}") -async def delete_item_id_namespace_main(metadata_id: str, namespace: str): +@app.get("/api/list/namespace/{token}", response_model=RepositoryNamespaceResult) +async def list_namespace_main(token: str): """ - Delete items from namespace identified by id and namespace + Get all namespaces with id and vector count + :return: list of namespace + """ + try: + engine_dec = decode_jwt(token) + print(type(engine_dec)) + logger.debug(f"All Namespaces ") + repository_engine = RepositoryEngine(**engine_dec) + result = await get_list_namespace(repository_engine) + return JSONResponse(content=result.model_dump(exclude_none=True)) + except Exception as ex: + logger.error(ex) + raise HTTPException(status_code=400, detail=repr(ex)) + + +@app.get("/api/id/{metadata_id}/namespace/{namespace}/{token}", response_model=RepositoryItems) +async def get_items_id_namespace_main(token: str, metadata_id: str, namespace: str): + """ + Get all items from namespace given id of document + :param token :param metadata_id: :param namespace: :return: """ try: - logger.info(f"delete id {metadata_id} dal namespace {namespace}") - result = await delete_id_from_namespace(metadata_id, namespace) + logger.info(f"retrieve id {metadata_id} dal namespace {namespace}") + engine_dec = decode_jwt(token) + repository_engine = RepositoryEngine(**engine_dec) + result = await get_ids_namespace(repository_engine, metadata_id, namespace) - return JSONResponse(content={"message": f"ids {metadata_id} in Namespace {namespace} deleted"}) + return JSONResponse(content=result.model_dump()) except Exception as ex: logger.error(ex) raise HTTPException(status_code=400, detail=repr(ex)) -@app.post("/api/delete/id", deprecated=True, - description="This endpoint is deprecated and is no longer supported. " - "Use method DELETE /api/id/{id}/namespace/{namespace}") -async def delete_item_id_namespace_post(item_to_delete: PineconeItemToDelete): +@app.get("/api/desc/namespace/{namespace}/{token}", response_model=RepositoryDescNamespaceResult) +async def list_namespace_items_main(token: str, namespace: str): """ - Delete items from namespace given document id via POST. - :param item_to_delete: - :return: + Get description for given namespace + :param token + :param namespace: namespace_id + :return: description of namespace """ try: - metadata_id = item_to_delete.id - namespace = item_to_delete.namespace - logger.info(f"delete of id {metadata_id} dal namespace {namespace}") - result = await delete_id_from_namespace(metadata_id, namespace) + logger.info(f"retrieve description for namespace {namespace}") + engine_dec = decode_jwt(token) + repository_engine = RepositoryEngine(**engine_dec) - return JSONResponse(content={"success": True, "message": f"ids {metadata_id} in Namespace {namespace} deleted"}) + result = await get_desc_namespace(repository_engine, namespace) + + return JSONResponse(content=result.model_dump(exclude_none=True)) except Exception as ex: - return JSONResponse(content={"success": True, "message": f"ids {metadata_id} in Namespace {namespace} not deleted due to {repr(ex)}"}) - # raise HTTPException(status_code=400, detail=repr(ex)) + logger.error(ex) + raise HTTPException(status_code=400, detail=repr(ex)) -@app.get("/api/id/{metadata_id}/namespace/{namespace}", response_model=PineconeItems) -async def get_items_id_namespace_main(metadata_id: str, namespace: str): +@app.get("/api/listitems/namespace/{namespace}/{token}", response_model=RepositoryItems) +async def list_namespace_items_main(token: str, namespace: str): """ - Get all items from namespace given id of document - :param metadata_id: - :param namespace: - :return: + Get all item with given namespace + :param token + :param namespace: namespace_id + :return: list of all item into namespace """ try: - logger.info(f"retrieve id {metadata_id} dal namespace {namespace}") - result = await get_ids_namespace(metadata_id, namespace) + logger.info(f"retrieve namespace {namespace}") + engine_dec = decode_jwt(token) + repository_engine = RepositoryEngine(**engine_dec) - return JSONResponse(content=result.model_dump()) + result = await get_listitems_namespace(repository_engine, namespace) + + return JSONResponse(content=result.model_dump(exclude_none=True)) except Exception as ex: logger.error(ex) raise HTTPException(status_code=400, detail=repr(ex)) -@app.get("/api/items", response_model=PineconeItems)#?source={source}&namespace={namespace} -async def get_items_source_namespace_main(source: str, namespace: str): + + +@app.get("/api/items", response_model=RepositoryItems)#?source={source}&namespace={namespace}&token={token} +async def get_items_source_namespace_main(source: str, namespace: str, token: str): """ Get all item given the source and namespace :param source: source of document :param namespace: namespace id + :param token: :return: list of all item """ try: logger.info(f"retrieve source: {source}, namespace: {namespace}") - + engine_dec = decode_jwt(token) + repository_engine = RepositoryEngine(**engine_dec) from urllib.parse import unquote source = unquote(source) - result = await get_sources_namespace(source, namespace) + result = await get_sources_namespace(repository_engine, source, namespace) return JSONResponse(content=result.model_dump()) except Exception as ex: @@ -623,6 +659,81 @@ async def get_items_source_namespace_main(source: str, namespace: str): raise HTTPException(status_code=400, detail=repr(ex) ) +@app.delete("/api/namespace/{namespace}/{token}") +async def delete_namespace_main(token: str, namespace: str): + """ + Delete namespace from index + :param token + :param namespace: + :return: + """ + try: + engine_dec = decode_jwt(token) + engine = Engine(**engine_dec["engine"]) + + namespace_to_delete = RepositoryNamespace(namespace=namespace, engine=engine) + + result = await delete_namespace(namespace_to_delete) + return JSONResponse(content={"message": f"Namespace {namespace} deleted"}) + except Exception as ex: + raise HTTPException(status_code=400, detail=repr(ex)) + + +@app.delete("/api/chunk/{chunk_id}/namespace/{namespace}/{token}") +async def delete_item_chunk_id_namespace_main(token: str, chunk_id: str, namespace: str): + """ + Delete items from namespace identified by id and namespace + :param token + :param chunk_id: + :param namespace: + :return: + """ + try: + + logger.info(f"delete id {chunk_id} dal namespace {namespace}") + engine_dec = decode_jwt(token) + repository_engine = RepositoryEngine(**engine_dec) + + result = await delete_chunk_id_from_namespace(repository_engine, chunk_id, namespace) + + return JSONResponse(content={"message": f"ids {chunk_id} in Namespace {namespace} deleted"}) + except Exception as ex: + print(repr(ex)) + logger.error(ex) + raise HTTPException(status_code=400, detail=repr(ex)) + + +@app.delete("/api/id/{metadata_id}/namespace/{namespace}/{token}") +async def delete_item_id_namespace_main(token: str, metadata_id: str, namespace: str): + """ + Delete items from namespace identified by id and namespace + :param token + :param metadata_id: + :param namespace: + :return: + """ + try: + logger.info(f"delete id {metadata_id} dal namespace {namespace}") + engine_dec = decode_jwt(token) + engine = Engine(**engine_dec["engine"]) + item_to_delete = RepositoryItem(id=metadata_id, + namespace=namespace, + engine=engine + ) + #repository_engine = RepositoryEngine(**engine_dec) + result = await delete_id_from_namespace(item_to_delete, metadata_id, namespace) + + return JSONResponse(content={"message": f"ids {metadata_id} in Namespace {namespace} deleted"}) + except Exception as ex: + logger.error(ex) + raise HTTPException(status_code=400, detail=repr(ex)) + + + + + + + @app.get("/") async def get_root_endpoint(): return "Hello from Tiledesk LLM python server!!" diff --git a/tilellm/controller/controller.py b/tilellm/controller/controller.py index 581b1fc..e63f9ce 100644 --- a/tilellm/controller/controller.py +++ b/tilellm/controller/controller.py @@ -2,36 +2,45 @@ from typing import List import fastapi + + from langchain.chains import ConversationalRetrievalChain, LLMChain # Deprecata -from langchain.retrievers import ContextualCompressionRetriever -from langchain.retrievers.document_compressors import DocumentCompressorPipeline -from langchain_community.document_transformers import EmbeddingsRedundantFilter + from langchain_core.documents import Document -from langchain_core.prompts import PromptTemplate, SystemMessagePromptTemplate -from langchain_core.runnables import RunnablePassthrough +from langchain_core.prompts import PromptTemplate #, SystemMessagePromptTemplate + from langchain_openai import ChatOpenAI -# from tilellm.store.pinecone_repository import add_pc_item as pinecone_add_item -# from tilellm.store.pinecone_repository import create_pc_index, get_embeddings_dimension from langchain_openai import OpenAIEmbeddings from langchain_community.callbacks.openai_info import OpenAICallbackHandler -from pydantic.v1 import BaseModel, Field -from tilellm.models.item_model import RetrievalResult, ChatEntry, PineconeIndexingResult, PineconeNamespaceResult, \ - PineconeDescNamespaceResult, PineconeItems, SimpleAnswer, QuotedAnswer -from tilellm.shared.utility import inject_repo, inject_llm, inject_llm_o1 -import tilellm.shared.const as const -# from tilellm.store.pinecone_repository_base import PineconeRepositoryBase +from tilellm.controller.controller_utils import preprocess_chat_history, initialize_embeddings_and_index, \ + fetch_question_vectors, perform_hybrid_search, retrieve_documents, create_chains, get_or_create_session_history, \ + generate_answer_with_history, format_result, handle_exception, initialize_retrievers +from tilellm.models.item_model import (RetrievalResult, + ChatEntry, + IndexingResult, + RepositoryNamespaceResult, + RepositoryDescNamespaceResult, + RepositoryItems, + SimpleAnswer, + RepositoryItem, + RepositoryNamespace, + RepositoryEngine, + QuestionToAgent, + QuestionToLLM) +# from tilellm.shared.sparse_util import hybrid_score_norm, HybridRetriever + +from tilellm.shared.utility import inject_repo, inject_llm, inject_llm_o1, inject_llm_chat +# import tilellm.shared.const as const + -from langchain.chains import create_history_aware_retriever from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables.history import RunnableWithMessageHistory from langchain.chains import create_retrieval_chain -from langchain.chains.combine_documents import create_stuff_documents_chain + from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory -from langchain_community.agent_toolkits.load_tools import load_tools -from langchain.agents import AgentType, initialize_agent from tilellm.agents.shopify_agent import lookup as shopify_lookup_agent from langchain.schema import( @@ -45,150 +54,58 @@ logger = logging.getLogger(__name__) - @inject_repo -async def ask_with_memory1(question_answer, repo=None): +@inject_llm_chat +async def ask_hybrid_with_memory(question_answer, repo=None, llm=None, callback_handler=None, llm_embeddings=None): try: logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] - - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - - logger.info(question_answer_list) - openai_callback_handler = OpenAICallbackHandler() - - llm = ChatOpenAI(model_name=question_answer.model, - temperature=question_answer.temperature, - openai_api_key=question_answer.gptkey, - max_tokens=question_answer.max_tokens, - callbacks=[openai_callback_handler]) - emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) - oai_embeddings = OpenAIEmbeddings(api_key=question_answer.gptkey, model=question_answer.embedding) + # Preprocess chat history + chat_history_list, question_answer_list = preprocess_chat_history(question_answer) - vector_store = await repo.create_pc_index(oai_embeddings, emb_dimension) + # Initialize embeddings and encoders + emb_dimension, sparse_encoder, index = await initialize_embeddings_and_index(question_answer, repo, + llm_embeddings) + # Fetch vectors for the given question + dense_vector, sparse_vector = await fetch_question_vectors(question_answer, sparse_encoder, llm_embeddings) - retriever = vector_store.as_retriever(search_type='similarity', - search_kwargs={'k': question_answer.top_k, - 'namespace': question_answer.namespace} - ) - # Query on store for relevant document, returned by callback - # mydocs = retriever.get_relevant_documents( question_answer.question) - # from pprint import pprint - # pprint(len(mydocs)) + # Perform hybrid search + results = perform_hybrid_search(question_answer, index, dense_vector, sparse_vector) - if question_answer.system_context is not None and question_answer.system_context: - print("blocco if") - from langchain.chains import LLMChain - - # prompt_template = "Tell me a {adjective} joke" - # prompt = PromptTemplate( - # input_variables=["adjective"], template=prompt_template - # ) - # llm = LLMChain(llm=OpenAI(), prompt=prompt) - sys_template = """{system_context}. - - {context} - """ - - sys_prompt = PromptTemplate.from_template(sys_template) - - # llm_chain = LLMChain(llm=llm, prompt=prompt) - crc = ConversationalRetrievalChain.from_llm( - llm=llm, - retriever=retriever, - return_source_documents=True, - verbose=True, - combine_docs_chain_kwargs={"prompt": sys_prompt} - ) - # from pprint import pprint - # pprint(crc.combine_docs_chain.llm_chain.prompt.messages) - # crc.combine_docs_chain.llm_chain.prompt.messages[0]=SystemMessagePromptTemplate.from_template(sys_prompt) - - result = crc.invoke({'question': question_answer.question, - 'system_context': question_answer.system_context, - 'chat_history': question_answer_list} - ) - - else: - print("blocco else") - # PromptTemplate.from_template() - crc = ConversationalRetrievalChain.from_llm(llm=llm, - retriever=retriever, - return_source_documents=True, - verbose=True) - - # 'Use the following pieces of context to answer the user\'s question. If you don\'t know the answer, just say that you don\'t know, don\'t try to make up an answer.', - result = crc.invoke({'question': question_answer.question, - 'chat_history': question_answer_list} - ) - - docs = result["source_documents"] - from pprint import pprint - pprint(result) - - ids = [] - sources = [] - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) + # Retrieve documents based on search results + retriever = retrieve_documents(question_answer, results) - ids = list(set(ids)) - sources = list(set(sources)) - source = " ".join(sources) - metadata_id = ids[0] + # Create chains for contextualization and Q&A + history_aware_retriever, question_answer_chain, qa_prompt = create_chains(llm, question_answer, retriever) - logger.info(result) + rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) - question_answer_list.append((result['question'], result['answer'])) + # Load session history and prepare conversational chain + store = {} + get_session_history = lambda session_id: get_or_create_session_history(store, session_id, + question_answer.chat_history_dict) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + # Generate the final answer, with or without citations + result, citations, success = await generate_answer_with_history(llm=llm, + question_answer=question_answer, + rag_chain = rag_chain, + retriever = retriever, + get_session_history = get_session_history, + qa_prompt=qa_prompt) - success = bool(openai_callback_handler.successful_requests) - prompt_token_size = openai_callback_handler.total_tokens + question_answer_list.append((result['input'], result['answer'])) - result_to_return = RetrievalResult( - answer=result['answer'], - namespace=question_answer.namespace, - sources=sources, - ids=ids, - source=source, - id=metadata_id, - prompt_token_size=prompt_token_size, - success=success, - error_message=None, - chat_history_dict=chat_history_dict - ) + result_to_return = format_result(result=result, + citations=citations, + question_answer=question_answer, + callback_handler=callback_handler, + question_answer_list=question_answer_list, + success = success) - return result_to_return.dict() + return result_to_return except Exception as e: - import traceback - traceback.print_exc() - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + return handle_exception(e, question_answer) - result_to_return = RetrievalResult( - namespace=question_answer.namespace, - error_message=repr(e), - chat_history_dict=chat_history_dict - ) - raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) @inject_llm_o1 async def ask_to_llm_o1(question, chat_model=None): @@ -213,7 +130,6 @@ async def ask_to_llm_o1(question, chat_model=None): except Exception as e: import traceback traceback.print_exc() - question_answer_list = [] result_to_return = SimpleAnswer(answer=repr(e), chat_history_dict={}) @@ -221,7 +137,7 @@ async def ask_to_llm_o1(question, chat_model=None): @inject_llm -async def ask_to_llm(question, chat_model=None): +async def ask_to_llm(question: QuestionToLLM, chat_model=None): try: logger.info(question) chat_history_list = [] @@ -231,8 +147,7 @@ async def ask_to_llm(question, chat_model=None): chat_history_list.append(HumanMessage(content=entry.question)) # ('human', entry.question)) chat_history_list.append(AIMessage(content=entry.answer)) - # from pprint import pprint - # pprint(chat_history_list) + qa_prompt = ChatPromptTemplate.from_messages( [ @@ -243,11 +158,10 @@ async def ask_to_llm(question, chat_model=None): ) store = {} + get_session_history = lambda session_id: get_or_create_session_history(store, session_id, + question.chat_history_dict) + - def get_session_history(session_id: str) -> BaseChatMessageHistory: - if session_id not in store: - store[session_id] = load_session_history(question.chat_history_dict) #ChatMessageHistory() - return store[session_id] runnable = qa_prompt | chat_model @@ -260,7 +174,7 @@ def get_session_history(session_id: str) -> BaseChatMessageHistory: ) result = await runnable_with_history.ainvoke( - {"input": question.question},# 'chat_history_a': chat_history_list}, + {"input": question.question}, # 'chat_history_a': chat_history_list, config={"configurable": {"session_id": uuid.uuid4().hex} }, ) @@ -276,7 +190,6 @@ def get_session_history(session_id: str) -> BaseChatMessageHistory: except Exception as e: import traceback traceback.print_exc() - question_answer_list = [] result_to_return = SimpleAnswer(answer=repr(e), chat_history_dict={}) @@ -284,226 +197,54 @@ def get_session_history(session_id: str) -> BaseChatMessageHistory: @inject_repo -async def ask_with_memory(question_answer, repo=None) -> RetrievalResult: +@inject_llm_chat +async def ask_with_memory(question_answer, repo=None, llm=None, callback_handler=None, llm_embeddings=None) -> RetrievalResult: try: - logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] - - question_answer_list = [] - chat_history_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - chat_history_list.append(HumanMessage(content=entry.question)) # ('human', entry.question)) - chat_history_list.append(AIMessage(content=entry.answer)) - - question_answer_list.append((entry.question, entry.answer)) - - openai_callback_handler = OpenAICallbackHandler() - llm = ChatOpenAI(model_name=question_answer.model, - temperature=question_answer.temperature, - openai_api_key=question_answer.gptkey, - max_tokens=question_answer.max_tokens, - callbacks=[openai_callback_handler]) - - emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) - oai_embeddings = OpenAIEmbeddings(api_key=question_answer.gptkey, model=question_answer.embedding) - - vector_store = await repo.create_pc_index(oai_embeddings, emb_dimension) - - vs_retriever = vector_store.as_retriever(search_type=question_answer.search_type, - search_kwargs={'k': question_answer.top_k, - 'namespace': question_answer.namespace} - ) - - redundant_filter = EmbeddingsRedundantFilter(embeddings=oai_embeddings, - similarity_threshold=question_answer.similarity_threshold) - pipeline_compressor = DocumentCompressorPipeline( - transformers=[redundant_filter] - ) - retriever = ContextualCompressionRetriever( - base_compressor=pipeline_compressor, base_retriever=vs_retriever - ) - - # Contextualize question - contextualize_q_system_prompt = const.contextualize_q_system_prompt - contextualize_q_prompt = ChatPromptTemplate.from_messages( - [ - ("system", contextualize_q_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) - history_aware_retriever = create_history_aware_retriever( - llm, retriever, contextualize_q_prompt - ) + logger.info(question_answer) - if question_answer.system_context is not None and question_answer.system_context: - # Answer question - prompt from user - qa_system_prompt = question_answer.system_context - else: - # Answer question - prompt default - qa_system_prompt = const.qa_system_prompt + # Preprocess chat history + chat_history_list, question_answer_list = preprocess_chat_history(question_answer) - qa_prompt = ChatPromptTemplate.from_messages( - [ - ("system", qa_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) + # Initialize embeddings and retrievers + retriever = await initialize_retrievers(question_answer, repo, llm_embeddings) - question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) + # Create chains for contextualization and Q&A + history_aware_retriever, question_answer_chain, qa_prompt = create_chains(llm, question_answer, retriever) rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) + # Load session history and prepare conversational chain store = {} - - def get_session_history(session_id: str) -> BaseChatMessageHistory: - if session_id not in store: - store[session_id] = load_session_history(question_answer.chat_history_dict) - return store[session_id] - - - if question_answer.citations: - - rag_chain_from_docs = ( - RunnablePassthrough.assign(context=(lambda x: format_docs_with_id(x["context"]))) - | qa_prompt - | llm.with_structured_output(QuotedAnswer) - ) - - retrieve_docs = (lambda x: x["input"]) | retriever - - chain_w_citations = RunnablePassthrough.assign(context=retrieve_docs).assign( - answer=rag_chain_from_docs - ).assign(only_answer=lambda text: text["answer"].answer) - - conversational_rag_chain = RunnableWithMessageHistory( - chain_w_citations, - get_session_history, - input_messages_key="input", - history_messages_key="chat_history", - output_messages_key="only_answer", - - ) - - result = conversational_rag_chain.invoke( - {"input": question_answer.question }, # 'chat_history': chat_history_list}, - config={"configurable": {"session_id": uuid.uuid4().hex} - } # constructs a key "abc123" in `store`. - ) - - # print(result.keys()) - # from pprint import pprint - # print(f"===== {result['only_ans']} =====") - citations = result['answer'].citations - result['answer'], success = verify_answer(result['answer'].answer) - - - else: - conversational_rag_chain = RunnableWithMessageHistory( - rag_chain, - get_session_history, - input_messages_key="input", - history_messages_key="chat_history", - output_messages_key="answer", - ) - - result = conversational_rag_chain.invoke( - {"input": question_answer.question, }, # 'chat_history': chat_history_list}, - config={"configurable": {"session_id": uuid.uuid4().hex} - } # constructs a key "abc123" in `store`. - ) - result['answer'], success = verify_answer(result['answer']) - citations = None - - - docs = result["context"] - # from pprint import pprint - # pprint(docs) - - ids = [] - sources = [] - content_chunks = None - if question_answer.debug: - content_chunks = [] - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - content_chunks.append(doc.page_content) - else: - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - - ids = list(set(ids)) - sources = list(set(sources)) - - if question_answer.citations: - source = " ".join(set([cit.source_name for cit in citations])) - else: - source = " ".join(sources) - - metadata_id = ids[0] - - logger.info(f"input: {result['input']}") - logger.info(f"chat_history: {result['chat_history']}") - logger.info(f"answer: {result['answer']}") + get_session_history = lambda session_id: get_or_create_session_history(store, session_id, + question_answer.chat_history_dict) + # Generate the final answer, with or without citations + result, citations, success = await generate_answer_with_history(llm=llm, + question_answer=question_answer, + rag_chain=rag_chain, + retriever=retriever, + get_session_history=get_session_history, + qa_prompt=qa_prompt) question_answer_list.append((result['input'], result['answer'])) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + result_to_return = format_result(result=result, + citations=citations, + question_answer=question_answer, + callback_handler=callback_handler, + question_answer_list=question_answer_list, + success=success) - # success = bool(openai_callback_handler.successful_requests) - prompt_token_size = openai_callback_handler.total_tokens - - result_to_return = RetrievalResult( - answer=result['answer'], - namespace=question_answer.namespace, - sources=sources, - ids=ids, - source=source, - id=metadata_id, - citations = citations, - prompt_token_size=prompt_token_size, - content_chunks=content_chunks, - success=success, - error_message=None, - chat_history_dict=chat_history_dict - ) return result_to_return except Exception as e: - import traceback - traceback.print_exc() - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + return handle_exception(e, question_answer) - result_to_return = RetrievalResult( - namespace=question_answer.namespace, - error_message=repr(e), - chat_history_dict=chat_history_dict - ) - raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) @inject_llm -async def ask_to_agent(question_to_agent, chat_model=None): +async def ask_to_agent(question_to_agent: QuestionToAgent, chat_model=None): try: logger.info(question_to_agent) #chat_history_list = [] @@ -519,7 +260,7 @@ async def ask_to_agent(question_to_agent, chat_model=None): result_history = "" if question_to_agent.chat_history_dict is not None: #for key, entry in question_to_agent.chat_history_dict.items(): - # chat_history_list.append(HumanMessage(content=entry.question)) # ('human', entry.question)) + # chat_history_list.append(HumanMessage(content=entry.question)) # ('human', entry.question) # chat_history_list.append(AIMessage(content=entry.answer)) # "chat_history": "Human: My name is Bob\\nAI: Hello Bob!", @@ -549,7 +290,7 @@ async def ask_to_agent(question_to_agent, chat_model=None): # return store[session_id] result_shopify = shopify_lookup_agent(question_to_agent=question_to_agent, chat_model=chat_model, chat_history=result_history) - print(f"RESULT: {result_shopify.get('output')} type: {type(result_shopify.get('output'))}") + # print(f"RESULT: {result_shopify.get('output')} type: {type(result_shopify.get('output'))}") if not question_to_agent.chat_history_dict: @@ -559,13 +300,12 @@ async def ask_to_agent(question_to_agent, chat_model=None): question_to_agent.chat_history_dict[str(num)] = dict({"question": question_to_agent.question, "answer": result_shopify.get("output")}) answer_to_agent = SimpleAnswer(answer=result_shopify.get("output"), chat_history_dict=question_to_agent.chat_history_dict) - print(answer_to_agent) + # print(answer_to_agent) return answer_to_agent except Exception as e: import traceback traceback.print_exc() - question_answer_list = [] result_to_return = SimpleAnswer(answer=repr(e), chat_history_dict={}) @@ -576,15 +316,6 @@ async def ask_to_agent(question_to_agent, chat_model=None): async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: try: logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] question_answer_list = [] if question_answer.chat_history_dict is not None: @@ -616,13 +347,7 @@ async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: # pprint(len(mydocs)) if question_answer.system_context is not None and question_answer.system_context: - from langchain.chains import LLMChain - # prompt_template = "Tell me a {adjective} joke" - # prompt = PromptTemplate( - # input_variables=["adjective"], template=prompt_template - # ) - # llm = LLMChain(llm=OpenAI(), prompt=prompt) sys_template = """{system_context}. {context} @@ -630,16 +355,13 @@ async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: sys_prompt = PromptTemplate.from_template(sys_template) - # llm_chain = LLMChain(llm=llm, prompt=prompt) + crc = ConversationalRetrievalChain.from_llm( llm=llm, retriever=retriever, return_source_documents=True, combine_docs_chain_kwargs={"prompt": sys_prompt} ) - # from pprint import pprint - # pprint(crc.combine_docs_chain.llm_chain.prompt.messages) - # crc.combine_docs_chain.llm_chain.prompt.messages[0] = SystemMessagePromptTemplate.from_template(sys_prompt) result = crc.invoke({'question': question_answer.question, 'system_context': question_answer.system_context, 'chat_history': question_answer_list}) @@ -663,7 +385,7 @@ async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: ids = list(set(ids)) sources = list(set(sources)) source = " ".join(sources) - id = ids[0] + meta_id = ids[0] logger.info(result) @@ -681,7 +403,7 @@ async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: sources=sources, ids=ids, source=source, - id=id, + id=meta_id, prompt_token_size=prompt_token_size, success=success, error_message=None, @@ -710,7 +432,7 @@ async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: @inject_repo -async def add_pc_item(item, repo=None) -> PineconeIndexingResult: +async def add_pc_item(item, repo=None) -> IndexingResult: """ Add items to namespace :type repo: PineconeRepositoryBase @@ -723,117 +445,146 @@ async def add_pc_item(item, repo=None) -> PineconeIndexingResult: @inject_repo -async def delete_namespace(namespace: str, repo=None): +async def add_pc_item_hybrid(item, repo=None) -> IndexingResult: + """ + + :return: + """ + return await repo.add_pc_item_hybrid(item) + + +@inject_repo +async def delete_namespace(namespace_to_delete: RepositoryNamespace, repo=None): """ Delete Namespace from index - :param namespace: + :param namespace_to_delete: :param repo: :return: """ - # from tilellm.store.pinecone_repository import delete_pc_namespace + try: - return await repo.delete_pc_namespace(namespace) + return await repo.delete_pc_namespace(namespace_to_delete) except Exception as ex: raise ex @inject_repo -async def delete_id_from_namespace(metadata_id: str, namespace: str, repo=None): +async def delete_id_from_namespace(item_to_delete: RepositoryItem, metadata_id: str, namespace: str, repo=None): """ Delete items from namespace + :param item_to_delete: RepositoryItemToDelete :param metadata_id: :param namespace: :param repo: :return: """ - # from tilellm.store.pinecone_repository import delete_pc_ids_namespace # , delete_pc_ids_namespace1 + try: - return await repo.delete_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) + return await repo.delete_pc_ids_namespace(engine=item_to_delete.engine, metadata_id=metadata_id, namespace=namespace) except Exception as ex: logger.error(ex) raise ex @inject_repo -async def delete_chunk_id_from_namespace(chunk_id:str, namespace: str, repo=None): +async def delete_chunk_id_from_namespace(repository_engine: RepositoryEngine, chunk_id:str, namespace: str, repo=None): """ Delete chunk by id from namespace + :param repository_engine: RepositoryEngine, :param chunk_id: :param namespace: :param repo: :return: """ try: - return await repo.delete_pc_chunk_id_namespace(chunk_id=chunk_id, namespace=namespace) + return await repo.delete_pc_chunk_id_namespace(engine=repository_engine.engine, + chunk_id=chunk_id, + namespace=namespace) except Exception as ex: logger.error(ex) raise ex @inject_repo -async def get_list_namespace(repo=None) -> PineconeNamespaceResult: +async def get_list_namespace(repository_engine: RepositoryEngine, repo=None) -> RepositoryNamespaceResult: """ Get list namespaces with namespace id and vector count + :param repository_engine: RepositoryEngine :param repo: :return: list of all namespaces in index """ # from tilellm.store.pinecone_repository import pinecone_list_namespaces try: - return await repo.pinecone_list_namespaces() + return await repo.pinecone_list_namespaces(engine=repository_engine.engine) except Exception as ex: raise ex @inject_repo -async def get_ids_namespace(metadata_id: str, namespace: str, repo=None) -> PineconeItems: +async def get_ids_namespace(repository_engine: RepositoryEngine, metadata_id: str, namespace: str, repo=None) -> RepositoryItems: """ Get all items from namespace given id + :param repository_engine: RepositoryEngine :param metadata_id: :param namespace: :param repo: :return: """ - # from tilellm.store.pinecone_repository import get_pc_ids_namespace try: - return await repo.get_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) + return await repo.get_pc_ids_namespace(engine=repository_engine.engine, + metadata_id=metadata_id, + namespace=namespace) except Exception as ex: raise ex @inject_repo -async def get_listitems_namespace(namespace: str, repo=None) -> PineconeItems: +async def get_listitems_namespace(repository_engine: RepositoryEngine, namespace: str, repo=None) -> RepositoryItems: """ Get all items from given namespace + :param repository_engine: RepositoryEngine :param namespace: namespace_id :param repo: :return: list of al items PineconeItems """ - # from tilellm.store.pinecone_repository import get_pc_all_obj_namespace + try: - return await repo.get_pc_all_obj_namespace(namespace=namespace) + return await repo.get_pc_all_obj_namespace(engine=repository_engine.engine, + namespace=namespace) except Exception as ex: raise ex @inject_repo -async def get_desc_namespace(namespace: str, repo=None) -> PineconeDescNamespaceResult: +async def get_desc_namespace(repository_engine: RepositoryEngine, namespace: str, repo=None) -> RepositoryDescNamespaceResult: + """ + Desc of Namespace + :param repository_engine: + :param namespace: + :param repo: + :return: + """ try: - return await repo.get_pc_desc_namespace(namespace=namespace) + return await repo.get_pc_desc_namespace(engine=repository_engine.engine, + namespace=namespace) except Exception as ex: raise ex @inject_repo -async def get_sources_namespace(source: str, namespace: str, repo=None) -> PineconeItems: +async def get_sources_namespace(repository_engine: RepositoryEngine, source: str, namespace: str, repo=None) -> RepositoryItems: """ Get all item from namespace given source + :param repository_engine: RepositoryEngine, :param source: :param namespace: :param repo: :return: """ - # from tilellm.store.pinecone_repository import get_pc_sources_namespace + try: - return await repo.get_pc_sources_namespace(source=source, namespace=namespace) + return await repo.get_pc_sources_namespace(engine=repository_engine.engine, + source=source, + namespace=namespace) except Exception as ex: raise ex diff --git a/tilellm/controller/controller_utils.py b/tilellm/controller/controller_utils.py new file mode 100644 index 0000000..4f254da --- /dev/null +++ b/tilellm/controller/controller_utils.py @@ -0,0 +1,289 @@ +import traceback +import uuid +from typing import List + +import fastapi + +from langchain_core.documents import Document + +from langchain_core.runnables import RunnablePassthrough + +from tilellm.models.item_model import (RetrievalResult, + ChatEntry, + QuotedAnswer + ) + +from tilellm.shared.sparse_util import hybrid_score_norm, HybridRetriever +from langchain_community.document_transformers import EmbeddingsRedundantFilter +from langchain.retrievers.document_compressors import DocumentCompressorPipeline +from langchain.retrievers import ContextualCompressionRetriever + +import tilellm.shared.const as const + +from langchain.chains import create_history_aware_retriever +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_core.runnables.history import RunnableWithMessageHistory +from langchain.chains.combine_documents import create_stuff_documents_chain +from langchain_community.chat_message_histories import ChatMessageHistory +from langchain_core.chat_history import BaseChatMessageHistory + +from langchain.schema import( + AIMessage, + HumanMessage, + SystemMessage + +) + +from tilellm.tools.sparse_encoders import TiledeskSparseEncoders + +import logging + +logger = logging.getLogger(__name__) + +# Function to preprocess chat history +def preprocess_chat_history(question_answer): + logger.debug(question_answer.chat_history_dict) + question_answer_list = [] + chat_history_list = [] + if question_answer.chat_history_dict is not None: + for key, entry in question_answer.chat_history_dict.items(): + chat_history_list.append(HumanMessage(content=entry.question)) + chat_history_list.append(AIMessage(content=entry.answer)) + question_answer_list.append((entry.question, entry.answer)) + return chat_history_list, question_answer_list + + +# Function to initialize embeddings and encoders +async def initialize_embeddings_and_index(question_answer, repo, llm_embeddings): + emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) + sparse_encoder = TiledeskSparseEncoders(question_answer.sparse_encoder) + vector_store = await repo.create_pc_index(question_answer.engine, llm_embeddings, emb_dimension) + index = vector_store.get_pinecone_index(question_answer.engine.index_name, pinecone_api_key=question_answer.engine.apikey) + return emb_dimension, sparse_encoder, index + + +# Function to initialize embeddings and retrievers +async def initialize_retrievers(question_answer, repo, llm_embeddings): + emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) + vector_store = await repo.create_pc_index(question_answer.engine, llm_embeddings, emb_dimension) + + vs_retriever = vector_store.as_retriever( + search_type=question_answer.search_type, + search_kwargs={'k': question_answer.top_k, 'namespace': question_answer.namespace} + ) + + redundant_filter = EmbeddingsRedundantFilter( + embeddings=llm_embeddings, + similarity_threshold=question_answer.similarity_threshold + ) + + pipeline_compressor = DocumentCompressorPipeline(transformers=[redundant_filter]) + + retriever = ContextualCompressionRetriever( + base_compressor=pipeline_compressor, base_retriever=vs_retriever + ) + + return retriever + + +# Function to fetch vectors for the given question +async def fetch_question_vectors(question_answer, sparse_encoder, llm_embeddings): + sparse_vector = sparse_encoder.encode_queries(question_answer.question) + dense_vector = await llm_embeddings.aembed_query(question_answer.question) + return dense_vector, sparse_vector + + +# Function to perform hybrid search +def perform_hybrid_search(question_answer, index, dense_vector, sparse_vector): + dense, sparse = hybrid_score_norm(dense_vector, sparse_vector, alpha=question_answer.alpha) + results = index.query( + top_k=question_answer.top_k, + vector=dense, + sparse_vector=sparse, + namespace=question_answer.namespace, + include_metadata=True + ) + return results + + +# Function to retrieve documents based on search results +def retrieve_documents(question_answer, results): + documents = [Document(page_content=match["metadata"]["text"], metadata=match["metadata"]) for match in results["matches"]] + retriever = HybridRetriever(documents=documents, k=question_answer.top_k) + return retriever + + +# Function to create chains for contextualization and Q&A +def create_chains(llm, question_answer, retriever): + # Contextualize question + contextualize_q_system_prompt = const.contextualize_q_system_prompt + contextualize_q_prompt = ChatPromptTemplate.from_messages( + [ + ("system", contextualize_q_system_prompt), + MessagesPlaceholder("chat_history"), + ("human", "{input}"), + ] + ) + history_aware_retriever = create_history_aware_retriever( + llm, retriever, contextualize_q_prompt + ) + qa_system_prompt = question_answer.system_context if question_answer.system_context else const.qa_system_prompt + qa_prompt = ChatPromptTemplate.from_messages( + [ + ("system", qa_system_prompt), + MessagesPlaceholder("chat_history"), + ("human", "{input}"), + ] + ) + question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) + return history_aware_retriever, question_answer_chain, qa_prompt + + +# Function to get or create session history +def get_or_create_session_history(store, session_id, chat_history_dict): + if session_id not in store: + store[session_id] = load_session_history(chat_history_dict) + return store[session_id] + + +# Function to generate answer with chat history consideration +async def generate_answer_with_history(llm, question_answer, rag_chain, retriever, get_session_history, qa_prompt): + if question_answer.citations: + retrieve_docs = (lambda x: x["input"]) | retriever + rag_chain_from_docs = ( + RunnablePassthrough.assign(context=(lambda x: format_docs_with_id(x["context"]))) + | qa_prompt + | llm.with_structured_output(QuotedAnswer) + ) + chain_w_citations = RunnablePassthrough.assign(context=retrieve_docs).assign( + answer=rag_chain_from_docs + ).assign(only_answer=lambda text: text["answer"].answer) + conversational_rag_chain = RunnableWithMessageHistory( + chain_w_citations, + get_session_history, + input_messages_key="input", + history_messages_key="chat_history", + output_messages_key="only_answer", + ) + else: + conversational_rag_chain = RunnableWithMessageHistory( + rag_chain, + get_session_history, + input_messages_key="input", + history_messages_key="chat_history", + output_messages_key="answer", + ) + + result = conversational_rag_chain.invoke( + {"input": question_answer.question}, + config={"configurable": {"session_id": uuid.uuid4().hex}} + ) + + citations = result['answer'].citations if question_answer.citations else None + result['answer'], success = verify_answer(result['answer'].answer + if question_answer.citations else result['answer']) + return result, citations, success + + +# Function to format the result into the expected output structure +def format_result(result, citations, question_answer, callback_handler, question_answer_list, success): + docs = result["context"] + ids, sources, content_chunks = extract_ids_sources(docs, question_answer.debug) + source = format_sources(citations, sources, question_answer.citations) + metadata_id = ids[0] + + prompt_token_size = callback_handler.total_tokens + + logger.info(f"input: {result['input']}") + logger.info(f"chat_history: {result['chat_history']}") + logger.info(f"answer: {result['answer']}") + + chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] + chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + + result_to_return = RetrievalResult( + answer=result['answer'], + namespace=question_answer.namespace, + sources=sources, + ids=ids, + source=source, + id=metadata_id, + citations=citations, + prompt_token_size=prompt_token_size, + content_chunks=content_chunks, + success=success, + error_message=None, + chat_history_dict=chat_history_dict + ) + return result_to_return + + +# Function to extract IDs and sources from documents +def extract_ids_sources(docs, debug): + ids = [] + sources = [] + content_chunks = None + if debug: + content_chunks = [] + for doc in docs: + ids.append(doc.metadata['id']) + sources.append(doc.metadata['source']) + content_chunks.append(doc.page_content) + else: + for doc in docs: + ids.append(doc.metadata['id']) + sources.append(doc.metadata['source']) + ids = list(set(ids)) + sources = list(set(sources)) + return ids, sources, content_chunks + + +# Function to format sources based on citations +def format_sources(citations, sources, with_citations): + if with_citations: + source = " ".join(set([cit.source_name for cit in citations])) + else: + source = " ".join(sources) + return source + + +# Function to handle exceptions +def handle_exception(e, question_answer): + traceback.print_exc() + question_answer_list = [] + if question_answer.chat_history_dict is not None: + for key, entry in question_answer.chat_history_dict.items(): + question_answer_list.append((entry.question, entry.answer)) + chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] + chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} + + result_to_return = RetrievalResult( + namespace=question_answer.namespace, + error_message=repr(e), + chat_history_dict=chat_history_dict + ) + raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) + +def load_session_history(history) -> BaseChatMessageHistory: + chat_history = ChatMessageHistory() + if history is not None: + for key, entry in history.items(): + chat_history.add_message(HumanMessage(content=entry.question)) # ('human', entry.question)) + chat_history.add_message(AIMessage(content=entry.answer)) + return chat_history + + +def format_docs_with_id(docs: List[Document]) -> str: + formatted = [ + f"Source ID: {i}\nArticle Source: {doc.metadata['source']}\nArticle Snippet: {doc.page_content}" + for i, doc in enumerate(docs) + ] + return "\n\n" + "\n\n".join(formatted) + +def verify_answer(s): + if s.endswith(""): + s = s[:-7] # Rimuove dalla fine della stringa + success = False + else: + success = True + return s, success \ No newline at end of file diff --git a/tilellm/controller/openai_controller.py b/tilellm/controller/openai_controller.py deleted file mode 100644 index 693fdaa..0000000 --- a/tilellm/controller/openai_controller.py +++ /dev/null @@ -1,658 +0,0 @@ -import uuid - -import fastapi -from langchain.chains import ConversationalRetrievalChain, LLMChain # Deprecata -from langchain_core.prompts import PromptTemplate, SystemMessagePromptTemplate -from langchain_openai import ChatOpenAI -# from tilellm.store.pinecone_repository import add_pc_item as pinecone_add_item -# from tilellm.store.pinecone_repository import create_pc_index, get_embeddings_dimension -from langchain_openai import OpenAIEmbeddings -from langchain_community.callbacks.openai_info import OpenAICallbackHandler -from tilellm.models.item_model import RetrievalResult, ChatEntry, PineconeIndexingResult, PineconeNamespaceResult, \ - PineconeDescNamespaceResult, PineconeItems -from tilellm.shared.utility import inject_repo -import tilellm.shared.const as const -# from tilellm.store.pinecone_repository_base import PineconeRepositoryBase - -from langchain.chains import create_history_aware_retriever -from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_core.runnables.history import RunnableWithMessageHistory -from langchain.chains import create_retrieval_chain -from langchain.chains.combine_documents import create_stuff_documents_chain -from langchain_community.chat_message_histories import ChatMessageHistory -from langchain_core.chat_history import BaseChatMessageHistory - - -import logging - - -logger = logging.getLogger(__name__) - - -@inject_repo -async def ask_with_memory1(question_answer, repo=None): - - try: - logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] - - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - - logger.info(question_answer_list) - openai_callback_handler = OpenAICallbackHandler() - - llm = ChatOpenAI(model_name=question_answer.model, - temperature=question_answer.temperature, - openai_api_key=question_answer.gptkey, - max_tokens=question_answer.max_tokens, - callbacks=[openai_callback_handler]) - - emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) - oai_embeddings = OpenAIEmbeddings(api_key=question_answer.gptkey, model=question_answer.embedding) - - vector_store = await repo.create_pc_index(oai_embeddings, emb_dimension) - - retriever = vector_store.as_retriever(search_type='similarity', - search_kwargs={'k': question_answer.top_k, - 'namespace': question_answer.namespace} - ) - # Query on store for relevant document, returned by callback - # mydocs = retriever.get_relevant_documents( question_answer.question) - # from pprint import pprint - # pprint(len(mydocs)) - - if question_answer.system_context is not None and question_answer.system_context: - print("blocco if") - from langchain.chains import LLMChain - - # prompt_template = "Tell me a {adjective} joke" - # prompt = PromptTemplate( - # input_variables=["adjective"], template=prompt_template - # ) - # llm = LLMChain(llm=OpenAI(), prompt=prompt) - sys_template = """{system_context}. - - {context} - """ - - sys_prompt = PromptTemplate.from_template(sys_template) - - # llm_chain = LLMChain(llm=llm, prompt=prompt) - crc = ConversationalRetrievalChain.from_llm( - llm=llm, - retriever=retriever, - return_source_documents=True, - verbose=True, - combine_docs_chain_kwargs={"prompt": sys_prompt} - ) - # from pprint import pprint - # pprint(crc.combine_docs_chain.llm_chain.prompt.messages) - # crc.combine_docs_chain.llm_chain.prompt.messages[0]=SystemMessagePromptTemplate.from_template(sys_prompt) - - result = crc.invoke({'question': question_answer.question, - 'system_context': question_answer.system_context, - 'chat_history': question_answer_list} - ) - - else: - print("blocco else") - #PromptTemplate.from_template() - crc = ConversationalRetrievalChain.from_llm(llm=llm, - retriever=retriever, - return_source_documents=True, - verbose=True) - - - # 'Use the following pieces of context to answer the user\'s question. If you don\'t know the answer, just say that you don\'t know, don\'t try to make up an answer.', - result = crc.invoke({'question': question_answer.question, - 'chat_history': question_answer_list} - ) - - docs = result["source_documents"] - from pprint import pprint - pprint(result) - - ids = [] - sources = [] - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - - ids = list(set(ids)) - sources = list(set(sources)) - source = " ".join(sources) - metadata_id = ids[0] - - logger.info(result) - - question_answer_list.append((result['question'], result['answer'])) - - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - success = bool(openai_callback_handler.successful_requests) - prompt_token_size = openai_callback_handler.total_tokens - - result_to_return = RetrievalResult( - answer=result['answer'], - namespace=question_answer.namespace, - sources=sources, - ids=ids, - source=source, - id=metadata_id, - prompt_token_size=prompt_token_size, - success=success, - error_message=None, - chat_history_dict=chat_history_dict - ) - - return result_to_return.dict() - except Exception as e: - import traceback - traceback.print_exc() - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - result_to_return = RetrievalResult( - namespace=question_answer.namespace, - error_message=repr(e), - chat_history_dict=chat_history_dict - ) - raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) - - -@inject_repo -async def ask_with_memory(question_answer, repo=None) -> RetrievalResult: - try: - logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] - - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - - logger.info(question_answer_list) - openai_callback_handler = OpenAICallbackHandler() - - llm = ChatOpenAI(model_name=question_answer.model, - temperature=question_answer.temperature, - openai_api_key=question_answer.gptkey, - max_tokens=question_answer.max_tokens, - callbacks=[openai_callback_handler]) - - emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) - oai_embeddings = OpenAIEmbeddings(api_key=question_answer.gptkey, model=question_answer.embedding) - - vector_store = await repo.create_pc_index(oai_embeddings, emb_dimension) - - retriever = vector_store.as_retriever(search_type='similarity', - search_kwargs={'k': question_answer.top_k, - 'namespace': question_answer.namespace} - ) - - if question_answer.system_context is not None and question_answer.system_context: - - # Contextualize question - contextualize_q_system_prompt = const.contextualize_q_system_prompt - contextualize_q_prompt = ChatPromptTemplate.from_messages( - [ - ("system", contextualize_q_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) - history_aware_retriever = create_history_aware_retriever( - llm, retriever, contextualize_q_prompt - ) - - # Answer question - qa_system_prompt = question_answer.system_context - qa_prompt = ChatPromptTemplate.from_messages( - [ - ("system", qa_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) - - question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) - - rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) - - store = {} - - def get_session_history(session_id: str) -> BaseChatMessageHistory: - if session_id not in store: - store[session_id] = ChatMessageHistory() - return store[session_id] - - conversational_rag_chain = RunnableWithMessageHistory( - rag_chain, - get_session_history, - input_messages_key="input", - history_messages_key="chat_history", - output_messages_key="answer", - ) - - result = conversational_rag_chain.invoke( - {"input": question_answer.question, 'chat_history': question_answer_list}, - config={"configurable": {"session_id": uuid.uuid4().hex} - }, # constructs a key "abc123" in `store`. - ) - - else: - # Contextualize question - contextualize_q_system_prompt = const.contextualize_q_system_prompt - contextualize_q_prompt = ChatPromptTemplate.from_messages( - [ - ("system", contextualize_q_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) - history_aware_retriever = create_history_aware_retriever( - llm, retriever, contextualize_q_prompt - ) - - # Answer question - qa_system_prompt = const.qa_system_prompt - qa_prompt = ChatPromptTemplate.from_messages( - [ - ("system", qa_system_prompt), - MessagesPlaceholder("chat_history"), - ("human", "{input}"), - ] - ) - - question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) - - rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) - - store = {} - - def get_session_history(session_id: str) -> BaseChatMessageHistory: - if session_id not in store: - store[session_id] = ChatMessageHistory() - return store[session_id] - - conversational_rag_chain = RunnableWithMessageHistory( - rag_chain, - get_session_history, - input_messages_key="input", - history_messages_key="chat_history", - output_messages_key="answer", - ) - - result = conversational_rag_chain.invoke( - {"input": question_answer.question, 'chat_history': question_answer_list}, - config={"configurable": {"session_id": uuid.uuid4().hex} - }, # constructs a key "abc123" in `store`. - ) - - # print(store) - # print(question_answer_list) - - docs = result["context"] - # from pprint import pprint - # pprint(docs) - - ids = [] - sources = [] - content_chunks = None - if question_answer.debug: - content_chunks = [] - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - content_chunks.append(doc.page_content) - else: - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - - ids = list(set(ids)) - sources = list(set(sources)) - source = " ".join(sources) - metadata_id = ids[0] - - logger.info(result) - - result['answer'], success = verify_answer(result['answer']) - - question_answer_list.append((result['input'], result['answer'])) - - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - # success = bool(openai_callback_handler.successful_requests) - prompt_token_size = openai_callback_handler.total_tokens - - result_to_return = RetrievalResult( - answer=result['answer'], - namespace=question_answer.namespace, - sources=sources, - ids=ids, - source=source, - id=metadata_id, - prompt_token_size=prompt_token_size, - content_chunks=content_chunks, - success=success, - error_message=None, - chat_history_dict=chat_history_dict - ) - - return result_to_return - except Exception as e: - import traceback - traceback.print_exc() - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - result_to_return = RetrievalResult( - namespace=question_answer.namespace, - error_message=repr(e), - chat_history_dict=chat_history_dict - ) - raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) - - -@inject_repo -async def ask_with_sequence(question_answer, repo=None) -> RetrievalResult: - try: - logger.info(question_answer) - # question = str - # namespace: str - # gptkey: str - # model: str =Field(default="gpt-3.5-turbo") - # temperature: float = Field(default=0.0) - # top_k: int = Field(default=5) - # max_tokens: int = Field(default=128) - # system_context: Optional[str] - # chat_history_dict : Dict[str, ChatEntry] - - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - - logger.info(question_answer_list) - openai_callback_handler = OpenAICallbackHandler() - - llm = ChatOpenAI(model_name=question_answer.model, - temperature=question_answer.temperature, - openai_api_key=question_answer.gptkey, - max_tokens=question_answer.max_tokens, - - callbacks=[openai_callback_handler]) - - emb_dimension = repo.get_embeddings_dimension(question_answer.embedding) - oai_embeddings = OpenAIEmbeddings(api_key=question_answer.gptkey, model=question_answer.embedding) - - vector_store = await repo.create_pc_index(oai_embeddings, emb_dimension) - idllmchain = get_idproduct_chain(llm) - res = idllmchain.invoke(question_answer.question) - - - retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': question_answer.top_k, - 'namespace': question_answer.namespace}) - - # mydocs = retriever.get_relevant_documents( question_answer.question) - # from pprint import pprint - # pprint(len(mydocs)) - - if question_answer.system_context is not None and question_answer.system_context: - from langchain.chains import LLMChain - - # prompt_template = "Tell me a {adjective} joke" - # prompt = PromptTemplate( - # input_variables=["adjective"], template=prompt_template - # ) - # llm = LLMChain(llm=OpenAI(), prompt=prompt) - sys_template = """{system_context}. - - {context} - """ - - sys_prompt = PromptTemplate.from_template(sys_template) - - # llm_chain = LLMChain(llm=llm, prompt=prompt) - crc = ConversationalRetrievalChain.from_llm( - llm=llm, - retriever=retriever, - return_source_documents=True, - combine_docs_chain_kwargs={"prompt": sys_prompt} - ) - # from pprint import pprint - # pprint(crc.combine_docs_chain.llm_chain.prompt.messages) - # crc.combine_docs_chain.llm_chain.prompt.messages[0] = SystemMessagePromptTemplate.from_template(sys_prompt) - - result = crc.invoke({'question': question_answer.question, 'system_context': question_answer.system_context, - 'chat_history': question_answer_list}) - - else: - crc = ConversationalRetrievalChain.from_llm(llm=llm, - retriever=retriever, - return_source_documents=True) - - result = crc.invoke({'question': res.get('text'), 'chat_history': question_answer_list}) - - docs = result["source_documents"] - - ids = [] - sources = [] - for doc in docs: - ids.append(doc.metadata['id']) - sources.append(doc.metadata['source']) - print(doc) - - ids = list(set(ids)) - sources = list(set(sources)) - source = " ".join(sources) - id = ids[0] - - logger.info(result) - - question_answer_list.append((result['question'], result['answer'])) - - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - success = bool(openai_callback_handler.successful_requests) - prompt_token_size = openai_callback_handler.total_tokens - - result_to_return = RetrievalResult( - answer=result['answer'], - namespace=question_answer.namespace, - sources=sources, - ids=ids, - source=source, - id=id, - prompt_token_size=prompt_token_size, - success=success, - error_message=None, - chat_history_dict=chat_history_dict - - ) - - return result_to_return - except Exception as e: - import traceback - traceback.print_exc() - question_answer_list = [] - if question_answer.chat_history_dict is not None: - for key, entry in question_answer.chat_history_dict.items(): - question_answer_list.append((entry.question, entry.answer)) - chat_entries = [ChatEntry(question=q, answer=a) for q, a in question_answer_list] - chat_history_dict = {str(i): entry for i, entry in enumerate(chat_entries)} - - result_to_return = RetrievalResult( - namespace=question_answer.namespace, - error_message=repr(e), - chat_history_dict=chat_history_dict - - ) - raise fastapi.exceptions.HTTPException(status_code=400, detail=result_to_return.model_dump()) - - -@inject_repo -async def add_pc_item(item, repo=None) -> PineconeIndexingResult: - """ - Add items to namespace - :type repo: PineconeRepositoryBase - :param item: - :param repo: - :return: PineconeIndexingResult - """ - return await repo.add_pc_item(item) - - -@inject_repo -async def delete_namespace(namespace: str, repo=None): - """ - Delete Namespace from index - :param namespace: - :param repo: - :return: - """ - # from tilellm.store.pinecone_repository import delete_pc_namespace - try: - return await repo.delete_pc_namespace(namespace) - except Exception as ex: - raise ex - - -@inject_repo -async def delete_id_from_namespace(metadata_id: str, namespace: str, repo=None): - """ - Delete items from namespace - :param metadata_id: - :param namespace: - :param repo: - :return: - """ - # from tilellm.store.pinecone_repository import delete_pc_ids_namespace # , delete_pc_ids_namespace1 - try: - return await repo.delete_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) - except Exception as ex: - logger.error(ex) - raise ex - - -@inject_repo -async def get_list_namespace(repo=None) -> PineconeNamespaceResult: - """ - Get list namespaces with namespace id and vector count - :param repo: - :return: list of all namespaces in index - """ - # from tilellm.store.pinecone_repository import pinecone_list_namespaces - try: - return await repo.pinecone_list_namespaces() - except Exception as ex: - raise ex - - -@inject_repo -async def get_ids_namespace(metadata_id: str, namespace: str, repo=None) -> PineconeItems: - """ - Get all items from namespace given id - :param metadata_id: - :param namespace: - :param repo: - :return: - """ - # from tilellm.store.pinecone_repository import get_pc_ids_namespace - try: - return await repo.get_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) - except Exception as ex: - raise ex - - -@inject_repo -async def get_listitems_namespace(namespace: str, repo=None) -> PineconeItems: - """ - Get all items from given namespace - :param namespace: namespace_id - :param repo: - :return: list of al items PineconeItems - """ - # from tilellm.store.pinecone_repository import get_pc_all_obj_namespace - try: - return await repo.get_pc_all_obj_namespace(namespace=namespace) - except Exception as ex: - raise ex - - -@inject_repo -async def get_desc_namespace(namespace: str, repo=None) -> PineconeDescNamespaceResult: - try: - return await repo.get_pc_desc_namespace(namespace=namespace) - except Exception as ex: - raise ex - - - -@inject_repo -async def get_sources_namespace(source: str, namespace: str, repo=None) -> PineconeItems: - """ - Get all item from namespace given source - :param source: - :param namespace: - :param repo: - :return: - """ - # from tilellm.store.pinecone_repository import get_pc_sources_namespace - try: - return await repo.get_pc_sources_namespace(source=source, namespace=namespace) - except Exception as ex: - raise ex - - -def get_idproduct_chain(llm) -> LLMChain: - summary_template = """ - I want the product Identifier from this phrase (remember, it's composed by 5 digit like 36400. Ignore the other informations). Give me only the number. {question}. - """ - - summary_prompt_template = PromptTemplate( - input_variables=["question"], - template=summary_template, - ) - - return LLMChain(llm=llm, prompt=summary_prompt_template) - - -def verify_answer(s): - if s.endswith(""): - s = s[:-7] # Rimuove dalla fine della stringa - success = False - else: - success = True - return s, success diff --git a/tilellm/models/item_model.py b/tilellm/models/item_model.py index bb205a3..f463804 100644 --- a/tilellm/models/item_model.py +++ b/tilellm/models/item_model.py @@ -1,4 +1,5 @@ -from pydantic import BaseModel, Field, field_validator, ValidationError, model_validator, RootModel, root_validator +from langchain_core.documents import Document +from pydantic import BaseModel, Field, field_validator, ValidationError, model_validator, RootModel from typing import Dict, Optional, List, Union, Any import datetime @@ -9,6 +10,16 @@ class Engine(BaseModel): apikey: str vector_size: int = Field(default=1536) index_name: str = Field(default="tilellm") + text_key: Optional[str] = Field(default="text") + metric: str = Field(default="cosine") + + @model_validator(mode='after') + def set_text_key(self): + if self.type == "serverless": + self.text_key = "text" + elif self.type == "pod": + self.text_key = "content" + return self class ParametersScrapeType4(BaseModel): @@ -18,6 +29,7 @@ class ParametersScrapeType4(BaseModel): desired_classnames: Optional[List[str]] = Field(default_factory=list) remove_lines: Optional[bool] = Field(default=True) remove_comments: Optional[bool] = Field(default=True) + time_sleep: Optional[float] = Field(default=2) @model_validator(mode='after') def check_booleans(cls, values): @@ -33,6 +45,8 @@ class ItemSingle(BaseModel): source: str | None = None type: str | None = None content: str | None = None + hybrid: Optional[bool] = Field(default=False) + sparse_encoder: Optional[str] = Field(default="splade") gptkey: str | None = None scrape_type: int = Field(default_factory=lambda: 0) embedding: str = Field(default_factory=lambda: "text-embedding-ada-002") @@ -43,14 +57,14 @@ class ItemSingle(BaseModel): chunk_size: int = Field(default_factory=lambda: 1000) chunk_overlap: int = Field(default_factory=lambda: 400) parameters_scrape_type_4: Optional[ParametersScrapeType4] = None - + engine: Engine @model_validator(mode='after') def check_scrape_type(cls, values): scrape_type = values.scrape_type parameters_scrape_type_4 = values.parameters_scrape_type_4 - if scrape_type == 4: + if scrape_type == 4 or scrape_type == 2: if parameters_scrape_type_4 is None: raise ValueError('parameters_scrape_type_4 must be provided when scrape_type is 4') else: @@ -92,8 +106,10 @@ def from_dict(cls, data: dict) -> "ChatHistory": class QuestionAnswer(BaseModel): question: str namespace: str + llm: Optional[str] = Field(default="openai") gptkey: str model: str = Field(default="gpt-3.5-turbo") + sparse_encoder: Optional[str] = Field(default="splade") #bge-m3 temperature: float = Field(default=0.0) top_k: int = Field(default=5) max_tokens: int = Field(default=128) @@ -101,8 +117,10 @@ class QuestionAnswer(BaseModel): similarity_threshold: float = Field(default_factory=lambda: 1.0) debug: bool = Field(default_factory=lambda: False) citations: bool = Field(default_factory=lambda: True) + alpha: Optional[float] = Field(default=0.5) system_context: Optional[str] = None search_type: str = Field(default_factory=lambda: "similarity") + engine: Engine chat_history_dict: Optional[Dict[str, ChatEntry]] = None @field_validator("temperature") @@ -162,7 +180,7 @@ def n_messages_range(cls, v): @field_validator("max_tokens") def max_tokens_range(cls, v): """Ensures max_tokens is a positive integer.""" - if not 50 <= v <= 2000: + if not 50 <= v <= 8192: raise ValueError("top_k must be a positive integer.") return v @@ -237,7 +255,7 @@ class RetrievalResult(BaseModel): chat_history_dict: Optional[Dict[str, ChatEntry]] -class PineconeQueryResult(BaseModel): +class RepositoryQueryResult(BaseModel): id: str metadata_id: str metadata_source: str @@ -246,17 +264,21 @@ class PineconeQueryResult(BaseModel): text: Optional[str] | None = None -class PineconeItems(BaseModel): - matches: List[PineconeQueryResult] +class RepositoryItems(BaseModel): + matches: List[RepositoryQueryResult] +class RepositoryEngine(BaseModel): + engine: Engine -class PineconeNamespaceToDelete(BaseModel): +class RepositoryNamespace(BaseModel): namespace: str + engine: Engine -class PineconeItemToDelete(BaseModel): +class RepositoryItem(BaseModel): id: str namespace: str + engine: Engine class ScrapeStatusReq(BaseModel): @@ -271,7 +293,7 @@ class ScrapeStatusResponse(BaseModel): queue_order: int = Field(default=-1) -class PineconeIndexingResult(BaseModel): +class IndexingResult(BaseModel): # {"id": f"{id}", "chunks": f"{len(chunks)}", "total_tokens": f"{total_tokens}", "cost": f"{cost:.6f}"} id: str | None = None chunks: int | None = None @@ -282,21 +304,24 @@ class PineconeIndexingResult(BaseModel): error: Optional[str] | None = None -class PineconeItemNamespaceResult(BaseModel): +class RepositoryItemNamespaceResult(BaseModel): namespace: str vector_count: int -class PineconeIdSummaryResult(BaseModel): +class RepositoryIdSummaryResult(BaseModel): metadata_id: str source: str chunks_count: int -class PineconeNamespaceResult(BaseModel): - namespaces: Optional[List[PineconeItemNamespaceResult]] +class RepositoryNamespaceResult(BaseModel): + namespaces: Optional[List[RepositoryItemNamespaceResult]] + +class RepositoryDescNamespaceResult(BaseModel): + namespace_desc: RepositoryItemNamespaceResult + ids: Optional[List[RepositoryIdSummaryResult]] -class PineconeDescNamespaceResult(BaseModel): - namespace_desc: PineconeItemNamespaceResult - ids: Optional[List[PineconeIdSummaryResult]] +class MyDocument(Document): + sparse_values: Optional[dict] \ No newline at end of file diff --git a/tilellm/shared/const.py b/tilellm/shared/const.py index 4c7dca6..45b1155 100644 --- a/tilellm/shared/const.py +++ b/tilellm/shared/const.py @@ -8,13 +8,14 @@ PINECONE_INDEX = None PINECONE_TEXT_KEY = None VOYAGEAI_API_KEY = None +JWT_SECRET_KEY = None contextualize_q_system_prompt = """Given a chat history and the latest user question \ which might reference context in the chat history, formulate a standalone question \ which can be understood without the chat history. Do NOT answer the question, \ just reformulate it if needed and otherwise return it as is.""" -qa_system_prompt = """You are an helpful assistant for question-answering tasks. \ +qa_system_prompt2 = """You are an helpful assistant for question-answering tasks. \ Use ONLY the pieces of retrieved context delimited by #### to answer the question. \ The first step is to extrac relevant information to the question from retrieved context. If you don't know the answer, just say that you don't know. \ @@ -27,6 +28,22 @@ #### """ +qa_system_prompt= """You are an helpful assistant for question-answering tasks. + + Follow these steps carefully: + + 1. If the question was in English, answer in English. If it was in Italian, answer in Italian. + If it was in French, answer in French. If it was in Spanish, answer in Spanish, and so on, + regardless of the context language + 2. Use ONLY the pieces of retrieved context delimited by #### to answer the question. + 3. If the context does not contain sufficient information to generate + an accurate and informative answer, return + + ####{context}#### + + Let's think step by step. + """ + qa_system_prompt1 = """You are an AI assistant specialized in question-answering tasks. \ Your goal is to provide accurate and helpful answers based solely on the given context. \ Follow these instructions carefully: @@ -145,11 +162,13 @@ def populate_constant(): - global PINECONE_API_KEY, PINECONE_INDEX, PINECONE_TEXT_KEY, VOYAGEAI_API_KEY - PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") - PINECONE_INDEX = os.environ.get("PINECONE_INDEX") - PINECONE_TEXT_KEY = os.environ.get("PINECONE_TEXT_KEY") - VOYAGEAI_API_KEY = os.environ.get("VOYAGEAI_API_KEY") + global JWT_SECRET_KEY + JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY") + #global PINECONE_API_KEY, PINECONE_INDEX, PINECONE_TEXT_KEY, VOYAGEAI_API_KEY + #PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") + #PINECONE_INDEX = os.environ.get("PINECONE_INDEX") + #PINECONE_TEXT_KEY = os.environ.get("PINECONE_TEXT_KEY") + #VOYAGEAI_API_KEY = os.environ.get("VOYAGEAI_API_KEY") diff --git a/tilellm/shared/sparse_util.py b/tilellm/shared/sparse_util.py new file mode 100644 index 0000000..e78c12d --- /dev/null +++ b/tilellm/shared/sparse_util.py @@ -0,0 +1,104 @@ +# from collections import Counter +# from transformers import BertTokenizerFast + +from typing import List + +from langchain_core.callbacks import CallbackManagerForRetrieverRun, AsyncCallbackManagerForRetrieverRun +from langchain_core.documents import Document +from langchain_core.retrievers import BaseRetriever + +""" + + +def build_dict(input_batch): + # store a batch of sparse embeddings + sparse_emb = [] + # iterate through input batch + for token_ids in input_batch: + # convert the input_ids list to a dictionary of key to frequency values + d = dict(Counter(token_ids)) + # remove special tokens and append sparse vectors to sparse_emb list + sparse_emb.append({key: d[key] for key in d if key not in [101, 102, 103, 0]}) + # return sparse_emb list + return sparse_emb + +def generate_sparse_vectors(context_batch): + # create batch of input_ids + # load bert tokenizer from huggingface + tokenizer = BertTokenizerFast.from_pretrained( + 'bert-base-uncased' + ) + + inputs = tokenizer( + context_batch, padding=True, + truncation=True, + max_length=512 + )['input_ids'] + # create sparse dictionaries + sparse_embeds = build_dict(inputs) + return sparse_embeds + +""" + +def hybrid_score_norm(dense, sparse, alpha: float): + """Hybrid score using a convex combination + + alpha * dense + (1 - alpha) * sparse + + Args: + dense: Array of floats representing + sparse: a dict of `indices` and `values` + alpha: float between 0 and 1 where 0 == sparse only + and 1 == dense only + """ + if alpha < 0 or alpha > 1: + raise ValueError("Alpha must be between 0 and 1") + hs = { + 'indices': sparse['indices'], + 'values': [v * (1 - alpha) for v in sparse['values']] + } + return [v * alpha for v in dense], hs + + + + +class HybridRetriever(BaseRetriever): + """ + + """ + + documents: List[Document] + """List of documents to retrieve from.""" + k: int + """Number of top results to return""" + + def _get_relevant_documents( + self, query: str, *, run_manager: CallbackManagerForRetrieverRun + ) -> List[Document]: + """Sync implementations for retriever.""" + + + if len(self.documents) > self.k: + return self.documents[0:self.k] + else: + return self.documents + + + # Optional: Provide a more efficient native implementation by overriding + # _aget_relevant_documents + async def _aget_relevant_documents( + self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun + ) -> List[Document]: + """Asynchronously get documents relevant to a query. + + # Args: + # query: String to find relevant documents for + # run_manager: The callbacks handler to use + + # Returns: + # List of relevant documents + # """ + if len(self.documents) > self.k: + return self.documents[0:self.k] + else: + return self.documents diff --git a/tilellm/shared/tiledesk_chatmodel_info.py b/tilellm/shared/tiledesk_chatmodel_info.py new file mode 100644 index 0000000..f366892 --- /dev/null +++ b/tilellm/shared/tiledesk_chatmodel_info.py @@ -0,0 +1,189 @@ +"""Callback Handler that prints to std out.""" + +import threading +from typing import Any, Dict, List + +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.messages import AIMessage +from langchain_core.outputs import ChatGeneration, LLMResult + +MODEL_COST_PER_1K_TOKENS = { + "claude-3-5-sonnet-2024062": 0.000003, + "llama-3.1-8b-instant": 0.000003, + "llama-3.1-70b-versatile": 0.000003, + "mixtral-8x7b-32768": 0.000003 + +} + + +def standardize_model_name( + model_name: str, + is_completion: bool = False, +) -> str: + """ + Standardize the model name to a format that can be used in the OpenAI API. + + Args: + model_name: Model name to standardize. + is_completion: Whether the model is used for completion or not. + Defaults to False. + + Returns: + Standardized model name. + + """ + model_name = model_name.lower() + if ".ft-" in model_name: + model_name = model_name.split(".ft-")[0] + "-azure-finetuned" + if ":ft-" in model_name: + model_name = model_name.split(":")[0] + "-finetuned-legacy" + if "ft:" in model_name: + model_name = model_name.split(":")[1] + "-finetuned" + if is_completion and ( + model_name.startswith("gpt-4") + or model_name.startswith("gpt-3.5") + or model_name.startswith("gpt-35") + or model_name.startswith("o1-") + or ("finetuned" in model_name and "legacy" not in model_name) + ): + return model_name + "-completion" + else: + return model_name + + +def get_anthropic_token_cost_for_model( + model_name: str, num_tokens: int, is_completion: bool = False +) -> float: + """ + Get the cost in USD for a given model and number of tokens. + + Args: + model_name: Name of the model + num_tokens: Number of tokens. + is_completion: Whether the model is used for completion or not. + Defaults to False. + + Returns: + Cost in USD. + """ + model_name = standardize_model_name(model_name, is_completion=is_completion) + if model_name not in MODEL_COST_PER_1K_TOKENS: + raise ValueError( + f"Unknown model: {model_name}. Please provide a valid OpenAI model name." + "Known models are: " + ", ".join(MODEL_COST_PER_1K_TOKENS.keys()) + ) + return MODEL_COST_PER_1K_TOKENS[model_name] * (num_tokens / 1000) + + +class TiledeskAICallbackHandler(BaseCallbackHandler): + """Callback Handler that tracks OpenAI info.""" + + total_tokens: int = 0 + prompt_tokens: int = 0 + completion_tokens: int = 0 + successful_requests: int = 0 + total_cost: float = 0.0 + + def __init__(self) -> None: + super().__init__() + self._lock = threading.Lock() + + def __repr__(self) -> str: + return ( + f"Tokens Used: {self.total_tokens}\n" + f"\tPrompt Tokens: {self.prompt_tokens}\n" + f"\tCompletion Tokens: {self.completion_tokens}\n" + f"Successful Requests: {self.successful_requests}\n" + f"Total Cost (USD): ${self.total_cost}" + ) + + @property + def always_verbose(self) -> bool: + """Whether to call verbose callbacks even if verbose is False.""" + return True + + def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + """Print out the prompts.""" + pass + + def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + """Print out the token.""" + pass + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + """Collect token usage.""" + # Check for usage_metadata (langchain-core >= 0.2.2) + try: + generation = response.generations[0][0] + except IndexError: + generation = None + if isinstance(generation, ChatGeneration): + try: + message = generation.message + if isinstance(message, AIMessage): + usage_metadata = message.usage_metadata + response_metadata = message.response_metadata + else: + usage_metadata = None + response_metadata = None + except AttributeError: + usage_metadata = None + response_metadata = None + else: + usage_metadata = None + response_metadata = None + if usage_metadata: + token_usage = {"total_tokens": usage_metadata["total_tokens"]} + completion_tokens = usage_metadata["output_tokens"] + prompt_tokens = usage_metadata["input_tokens"] + if response_model_name := (response_metadata or {}).get("model_name"): + model_name = standardize_model_name(response_model_name) + elif response.llm_output is None: + model_name = "" + else: + model_name = standardize_model_name( + response.llm_output.get("model_name", "") + ) + + else: + if response.llm_output is None: + return None + + if "token_usage" not in response.llm_output: + with self._lock: + self.successful_requests += 1 + return None + + # compute tokens and cost for this request + token_usage = response.llm_output["token_usage"] + completion_tokens = token_usage.get("completion_tokens", 0) + prompt_tokens = token_usage.get("prompt_tokens", 0) + model_name = standardize_model_name( + response.llm_output.get("model_name", "") + ) + if model_name in MODEL_COST_PER_1K_TOKENS: + completion_cost = get_anthropic_token_cost_for_model( + model_name, completion_tokens, is_completion=True + ) + prompt_cost = get_anthropic_token_cost_for_model(model_name, prompt_tokens) + else: + completion_cost = 0 + prompt_cost = 0 + + # update shared state behind lock + with self._lock: + self.total_cost += prompt_cost + completion_cost + self.total_tokens += token_usage.get("total_tokens", 0) + self.prompt_tokens += prompt_tokens + self.completion_tokens += completion_tokens + self.successful_requests += 1 + + def __copy__(self) -> "TiledeskAICallbackHandler": + """Return a copy of the callback handler.""" + return self + + def __deepcopy__(self, memo: Any) -> "TiledeskAICallbackHandler": + """Return a deep copy of the callback handler.""" + return self \ No newline at end of file diff --git a/tilellm/shared/utility.py b/tilellm/shared/utility.py index cd9ca69..8aea53c 100644 --- a/tilellm/shared/utility.py +++ b/tilellm/shared/utility.py @@ -2,10 +2,16 @@ from functools import wraps import logging +from gc import callbacks import langchain_aws +from langchain_community.callbacks.openai_info import OpenAICallbackHandler +from langchain_community.embeddings import CohereEmbeddings #, GooglePalmEmbeddings +from langchain_huggingface import HuggingFaceEmbeddings from langchain_voyageai import VoyageAIEmbeddings from langchain_openai import OpenAIEmbeddings + + from tilellm.shared import const from langchain_openai.chat_models import ChatOpenAI @@ -15,6 +21,7 @@ from langchain_groq import ChatGroq from langchain_aws.chat_models import ChatBedrockConverse, ChatBedrock +from tilellm.shared.tiledesk_chatmodel_info import TiledeskAICallbackHandler logger = logging.getLogger(__name__) @@ -29,8 +36,11 @@ def inject_repo(func): """ @wraps(func) - def wrapper(*args, **kwargs): - repo_type = os.environ.get("PINECONE_TYPE") + def wrapper(question, *args, **kwargs): + engine_name = question.engine.name + repo_type= question.engine.type + #print(f"============== ENGINE {question.engine.model_dump()}") + #repo_type = os.environ.get("PINECONE_TYPE") logger.info(f"pinecone type {repo_type}") if repo_type == 'pod': @@ -43,7 +53,7 @@ def wrapper(*args, **kwargs): raise ValueError("Unknown repository type") kwargs['repo'] = repo - return func(*args, **kwargs) + return func(question, *args, **kwargs) return wrapper @@ -63,10 +73,25 @@ async def wrapper(self, item, *args, **kwargs): elif item.embedding == "text-embedding-3-small": embedding_obj = OpenAIEmbeddings(api_key=item.gptkey, model=item.embedding) dimension = 1536 - elif item.embedding == "claude-3": + elif item.embedding == "voyage-multilingual-2": embedding_obj = VoyageAIEmbeddings(voyage_api_key=const.VOYAGEAI_API_KEY, model="voyage-multilingual-2") # query_result = voyage.embed_query(text) dimension = 1024 + elif item.embedding == "huggingface": + import torch + from langchain_huggingface.embeddings import HuggingFaceEmbeddings + from langchain_community.embeddings import HuggingFaceBgeEmbeddings + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model_name = "BAAI/bge-m3" + model_kwargs = {'device': device} + encode_kwargs = {'normalize_embeddings': True} #True per cosine similarity + + embedding_obj = HuggingFaceBgeEmbeddings(model_name=model_name, + model_kwargs=model_kwargs, + encode_kwargs=encode_kwargs + + ) + dimension = 1024 else: embedding_obj = OpenAIEmbeddings(api_key=item.gptkey, model=item.embedding) dimension = 1536 @@ -172,6 +197,132 @@ async def wrapper(question, *args, **kwargs): return wrapper +def inject_llm_chat(func): + @wraps(func) + async def wrapper(question, *args, **kwargs): + logger.debug(question) + if question.model == "openai": + callback_handler = OpenAICallbackHandler() + llm_embeddings = OpenAIEmbeddings(api_key=question.gptkey, model=question.embedding) + llm = ChatOpenAI(api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens, + callbacks=[callback_handler]) + + elif question.llm == "anthropic": + callback_handler = TiledeskAICallbackHandler() + import torch + from langchain_huggingface.embeddings import HuggingFaceEmbeddings + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model_name = "BAAI/bge-m3" + model_kwargs = {'device': device} + encode_kwargs = {'normalize_embeddings': False} + llm_embeddings = HuggingFaceEmbeddings(model_name=model_name, + model_kwargs=model_kwargs, + encode_kwargs=encode_kwargs + ) + llm = ChatAnthropic(anthropic_api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens, + callbacks=[callback_handler]) + + elif question.llm == "cohere": + callback_handler = TiledeskAICallbackHandler() + llm_embeddings = CohereEmbeddings(model=question.embedding, cohere_api_key=question.gptkey) + llm = ChatCohere(cohere_api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens) + + elif question.llm == "google": + callback_handler = TiledeskAICallbackHandler() + #"models/embedding-gecko-001" + llm_embeddings = OpenAIEmbeddings(api_key=question.gptkey, model=question.embedding) + # llm_embeddings = GooglePalmEmbeddings(google_api_key=question.gptkey) #, model_name=question.embedding) + llm = ChatGoogleGenerativeAI(google_api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens, + convert_system_message_to_human=True) + + elif question.llm == "groq": + callback_handler = TiledeskAICallbackHandler() + import torch + from langchain_huggingface.embeddings import HuggingFaceEmbeddings + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model_name = "BAAI/bge-m3" + model_kwargs = {'device': device} + encode_kwargs = {'normalize_embeddings': False} + llm_embeddings = HuggingFaceEmbeddings(model_name=model_name, + model_kwargs=model_kwargs, + encode_kwargs=encode_kwargs + ) + llm = ChatGroq(api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens + ) + + elif question.llm == "aws": + import os + + os.environ["AWS_SECRET_ACCESS_KEY"] = question.llm_key.aws_secret_access_key + os.environ["AWS_ACCESS_KEY_ID"] = question.llm_key.aws_access_key_id + callback_handler = TiledeskAICallbackHandler() + llm_embeddings = OpenAIEmbeddings(api_key=question.gptkey, model=question.embedding) + + # chat_model = ChatBedrock(model_id=question.model, + # model_kwargs={"temperature": question.temperature,"max_tokens":question.max_tokens }, + # region_name="eu-central-1" + # ) + + # import boto3 + + # client_br = boto3.client('bedrock-runtime', + # aws_access_key_id=question.llm_key.aws_secret_access_key, + # aws_secret_access_key=question.llm_key.aws_secret_access_key, + # region_name=question.llm_key.region_name + # ) + # session = boto3.Session(aws_access_key_id=question.llm_key.aws_secret_access_key, + # aws_secret_access_key=question.llm_key.aws_secret_access_key, + # region_name=question.llm_key.region_name + # ) + # client_ss = session.client("bedrock-runtime") + + llm = ChatBedrockConverse( + # client=client_br, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens, + region_name=question.llm_key.region_name + + # base_url="http://bedroc-proxy-paacejvmzcgv-121947512.eu-central-1.elb.amazonaws.com/api/v1/", + + ) # model_kwargs={"temperature": 0.001}, + + # print(chat_model.client._get_credentials().access_key) + + + else: + callback_handler = OpenAICallbackHandler() + llm_embeddings = OpenAIEmbeddings(api_key=question.gptkey, model=question.embedding) + llm = ChatOpenAI(api_key=question.gptkey, + model=question.model, + temperature=question.temperature, + max_tokens=question.max_tokens, + callbacks=[callback_handler] + ) + + # Add chat_model agli kwargs + kwargs['llm'] = llm + kwargs['callback_handler'] = callback_handler + kwargs['llm_embeddings'] = llm_embeddings + + return await func(question, *args, **kwargs) + + return wrapper def inject_llm_o1(func): @wraps(func) @@ -192,3 +343,9 @@ async def wrapper(question, *args, **kwargs): return wrapper +def decode_jwt(token:str): + import jwt + jwt_secret_key = const.JWT_SECRET_KEY + return jwt.decode(jwt=token, key=jwt_secret_key, algorithms=['HS256']) + + diff --git a/tilellm/store/pinecone/pinecone_repository_base.py b/tilellm/store/pinecone/pinecone_repository_base.py index 9e8cabb..88140ca 100644 --- a/tilellm/store/pinecone/pinecone_repository_base.py +++ b/tilellm/store/pinecone/pinecone_repository_base.py @@ -1,12 +1,17 @@ from abc import abstractmethod + +import time +from langchain_pinecone import PineconeVectorStore + from tilellm.models.item_model import (MetadataItem, - PineconeQueryResult, - PineconeItems, - PineconeIndexingResult, - PineconeNamespaceResult, - PineconeItemNamespaceResult, - PineconeIdSummaryResult, - PineconeDescNamespaceResult + RepositoryQueryResult, + RepositoryItems, + IndexingResult, + RepositoryNamespaceResult, + RepositoryItemNamespaceResult, + RepositoryIdSummaryResult, + RepositoryDescNamespaceResult, Engine, RepositoryNamespace + ) from tilellm.shared import const @@ -24,22 +29,27 @@ class PineconeRepositoryBase: async def add_pc_item(self, item): pass + @abstractmethod + async def add_pc_item_hybrid(self, item): + pass + @staticmethod - async def delete_pc_namespace(namespace: str): + async def delete_pc_namespace(namespace_to_delete: RepositoryNamespace): """ Delete namespace from Pinecone index - :param namespace: + :param namespace_to_delete: :return: """ + engine = namespace_to_delete.engine import pinecone try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX, ) - delete_response = index.delete(delete_all=True, namespace=namespace) + delete_response = index.delete(delete_all=True, namespace=namespace_to_delete.namespace) except Exception as ex: logger.error(ex) @@ -47,29 +57,26 @@ async def delete_pc_namespace(namespace: str): raise ex @abstractmethod - async def delete_pc_ids_namespace(self, metadata_id: str, namespace: str): + async def delete_pc_ids_namespace(self, engine: Engine, metadata_id: str, namespace: str): pass @staticmethod - async def delete_pc_chunk_id_namespace(chunk_id: str, namespace: str): + async def delete_pc_chunk_id_namespace(engine: Engine, chunk_id: str, namespace: str): """ delete chunk from pinecone + :param engine: Engine :param chunk_id: :param namespace: :return: """ - """ - Delete namespace from Pinecone index - :param namespace: - :return: - """ + import pinecone try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX,) delete_response = index.delete(ids=[chunk_id], namespace=namespace) except Exception as ex: @@ -79,9 +86,10 @@ async def delete_pc_chunk_id_namespace(chunk_id: str, namespace: str): raise ex @staticmethod - async def get_pc_ids_namespace( metadata_id: str, namespace: str) -> PineconeItems: + async def get_pc_ids_namespace(engine: Engine, metadata_id: str, namespace: str) -> RepositoryItems: """ Get from Pinecone all items from namespace given document id + :param engine: Engine :param metadata_id: :param namespace: :return: @@ -90,11 +98,11 @@ async def get_pc_ids_namespace( metadata_id: str, namespace: str) -> PineconeIte try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX, ) describe = index.describe_index_stats() @@ -125,16 +133,16 @@ async def get_pc_ids_namespace( metadata_id: str, namespace: str) -> PineconeIte # print(type(matches[0].get('id'))) result = [] for obj in matches: - result.append(PineconeQueryResult(id=obj.get('id', ""), - metadata_id=obj.get('metadata').get('id'), - metadata_source=obj.get('metadata').get('source'), - metadata_type=obj.get('metadata').get('type'), - date=obj.get('metadata').get('date', 'Date not defined'), - text=obj.get('metadata').get(const.PINECONE_TEXT_KEY) - # su pod content, su Serverless text - ) + result.append(RepositoryQueryResult(id=obj.get('id', ""), + metadata_id=obj.get('metadata').get('id'), + metadata_source=obj.get('metadata').get('source'), + metadata_type=obj.get('metadata').get('type'), + date=obj.get('metadata').get('date', 'Date not defined'), + text=obj.get('metadata').get(const.PINECONE_TEXT_KEY) + # su pod content, su Serverless text + ) ) - res = PineconeItems(matches=result) + res = RepositoryItems(matches=result) logger.debug(res) return res @@ -145,16 +153,16 @@ async def get_pc_ids_namespace( metadata_id: str, namespace: str) -> PineconeIte raise ex @staticmethod - async def pinecone_list_namespaces() -> PineconeNamespaceResult: + async def pinecone_list_namespaces(engine: Engine) -> RepositoryNamespaceResult: import pinecone try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) describe = index.describe_index_stats() @@ -165,13 +173,13 @@ async def pinecone_list_namespaces() -> PineconeNamespaceResult: for namespace in namespaces.keys(): total_vectors = namespaces.get(namespace).get('vector_count') - pc_item_namespace = PineconeItemNamespaceResult(namespace=namespace, vector_count=total_vectors) + pc_item_namespace = RepositoryItemNamespaceResult(namespace=namespace, vector_count=total_vectors) results.append(pc_item_namespace) logger.debug(f"{namespace}, {total_vectors}") logger.debug(f"pinecone total vector in {results}") - return PineconeNamespaceResult(namespaces=results) + return RepositoryNamespaceResult(namespaces=results) except Exception as ex: @@ -180,9 +188,10 @@ async def pinecone_list_namespaces() -> PineconeNamespaceResult: raise ex @staticmethod - async def get_pc_all_obj_namespace(namespace: str) -> PineconeItems: + async def get_pc_all_obj_namespace(engine: Engine, namespace: str) -> RepositoryItems: """ Query Pinecone to get all object + :param engine: Engine :param namespace: :return: """ @@ -190,11 +199,11 @@ async def get_pc_all_obj_namespace(namespace: str) -> PineconeItems: try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX, ) describe = index.describe_index_stats() @@ -226,15 +235,15 @@ async def get_pc_all_obj_namespace(namespace: str) -> PineconeItems: # print(type(matches[0].get('id'))) result = [] for obj in matches: - result.append(PineconeQueryResult(id=obj.get('id', ""), - metadata_id=obj.get('metadata').get('id'), - metadata_source=obj.get('metadata').get('source'), - metadata_type=obj.get('metadata').get('type'), - date=obj.get('metadata').get('date', 'Date not defined'), - text=None # su pod content, su Serverless text - ) + result.append(RepositoryQueryResult(id=obj.get('id', ""), + metadata_id=obj.get('metadata').get('id'), + metadata_source=obj.get('metadata').get('source'), + metadata_type=obj.get('metadata').get('type'), + date=obj.get('metadata').get('date', 'Date not defined'), + text=None # su pod content, su Serverless text + ) ) - res = PineconeItems(matches=result) + res = RepositoryItems(matches=result) logger.debug(res) return res @@ -245,9 +254,10 @@ async def get_pc_all_obj_namespace(namespace: str) -> PineconeItems: raise ex @staticmethod - async def get_pc_desc_namespace(namespace: str) -> PineconeDescNamespaceResult: + async def get_pc_desc_namespace(engine: Engine, namespace: str) -> RepositoryDescNamespaceResult: """ Query Pinecone to get all object + :param engine: Engine :param namespace: :return: PineconeDescNamespaceResult """ @@ -255,11 +265,11 @@ async def get_pc_desc_namespace(namespace: str) -> PineconeDescNamespaceResult: try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX, ) describe = index.describe_index_stats() @@ -267,11 +277,11 @@ async def get_pc_desc_namespace(namespace: str) -> PineconeDescNamespaceResult: logger.debug(describe) namespaces = describe.get("namespaces", {}) total_vectors = 1 - description = PineconeItemNamespaceResult(namespace=namespace, vector_count=0) + description = RepositoryItemNamespaceResult(namespace=namespace, vector_count=0) if namespaces: if namespace in namespaces.keys(): total_vectors = namespaces.get(namespace).get('vector_count') - description = PineconeItemNamespaceResult(namespace=namespace, vector_count=total_vectors) + description = RepositoryItemNamespaceResult(namespace=namespace, vector_count=total_vectors) logger.debug(f"pinecone total vector in {namespace}: {total_vectors}") @@ -291,18 +301,18 @@ async def get_pc_desc_namespace(namespace: str) -> PineconeDescNamespaceResult: # ids = [obj.get('id') for obj in matches] # print(type(matches[0].get('id'))) result = [] - ids_count: Dict[str, PineconeIdSummaryResult] = {} + ids_count: Dict[str, RepositoryIdSummaryResult] = {} for obj in matches: metadata_id = obj.get('metadata').get('id') if metadata_id in ids_count: ids_count[metadata_id].chunks_count += 1 else: - ids_count[metadata_id] = PineconeIdSummaryResult(metadata_id=metadata_id, - source=obj.get('metadata').get('source'), - chunks_count=1) + ids_count[metadata_id] = RepositoryIdSummaryResult(metadata_id=metadata_id, + source=obj.get('metadata').get('source'), + chunks_count=1) - res = PineconeDescNamespaceResult(namespace_desc=description, ids=list(ids_count.values())) + res = RepositoryDescNamespaceResult(namespace_desc=description, ids=list(ids_count.values())) logger.debug(res) return res @@ -314,9 +324,10 @@ async def get_pc_desc_namespace(namespace: str) -> PineconeDescNamespaceResult: raise ex @staticmethod - async def get_pc_sources_namespace(source: str, namespace: str) -> PineconeItems: + async def get_pc_sources_namespace(engine: Engine, source: str, namespace: str) -> RepositoryItems: """ Get from Pinecone all items from namespace given source + :param engine: Engine :param source: :param namespace: :return: @@ -325,11 +336,11 @@ async def get_pc_sources_namespace(source: str, namespace: str) -> PineconeItems try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) # vector_store = Pinecone.from_existing_index(const.PINECONE_INDEX, ) describe = index.describe_index_stats() @@ -357,16 +368,16 @@ async def get_pc_sources_namespace(source: str, namespace: str) -> PineconeItems # print(type(matches[0].get('id'))) result = [] for obj in matches: - result.append(PineconeQueryResult(id=obj.get('id'), - metadata_id=obj.get('metadata').get('id'), - metadata_source=obj.get('metadata').get('source'), - metadata_type=obj.get('metadata').get('type'), - date=obj.get('metadata').get('date', 'Date not defined'), - text=obj.get('metadata').get(const.PINECONE_TEXT_KEY) - # su pod content, su Serverless text - ) + result.append(RepositoryQueryResult(id=obj.get('id'), + metadata_id=obj.get('metadata').get('id'), + metadata_source=obj.get('metadata').get('source'), + metadata_type=obj.get('metadata').get('type'), + date=obj.get('metadata').get('date', 'Date not defined'), + text=obj.get('metadata').get(const.PINECONE_TEXT_KEY) + # su pod content, su Serverless text + ) ) - res = PineconeItems(matches=result) + res = RepositoryItems(matches=result) logger.debug(res) return res @@ -377,53 +388,79 @@ async def get_pc_sources_namespace(source: str, namespace: str) -> PineconeItems raise ex @staticmethod - async def create_pc_index(embeddings, emb_dimension) -> VectorStore: + async def create_pc_index(engine, embeddings, emb_dimension) -> PineconeVectorStore: """ Create or return existing index + :param engine: :param embeddings: :param emb_dimension: :return: """ import pinecone - from langchain_community.vectorstores import Pinecone - pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + + pc = pinecone.Pinecone( + api_key= engine.apikey #const.PINECONE_API_KEY ) - if const.PINECONE_INDEX in pc.list_indexes().names(): - logger.debug(const.PINECONE_TEXT_KEY) - logger.debug(f'Index {const.PINECONE_INDEX} exists. Loading embeddings ... ') - vector_store = Pinecone.from_existing_index(index_name=const.PINECONE_INDEX, - embedding=embeddings, - text_key=const.PINECONE_TEXT_KEY - ) # text-key nuova versione è text + #existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] + + if engine.index_name in pc.list_indexes().names(): #const.PINECONE_INDEX in pc.list_indexes().names(): + # logger.debug(engine.index_name) #(const.PINECONE_TEXT_KEY) + #logger.debug(f'Index {const.PINECONE_INDEX} exists. Loading embeddings ... ') + logger.debug(f'Index {engine.index_name} exists. Loading embeddings ... ') + #print(f"================== {engine.index_name}, api {engine.apikey}") + host = pc.describe_index(engine.index_name).host + + index = pc.Index(name=engine.index_name, + host=host) + + vector_store = PineconeVectorStore(index=index, + embedding=embeddings, + text_key=engine.text_key, + pinecone_api_key=engine.apikey, + index_name=engine.index_name) + + + #vector_store = PineconeVectorStore.from_existing_index(index_name= engine.index_name, #const.PINECONE_INDEX, + # embedding=embeddings, + # text_key= engine.text_key #const.PINECONE_TEXT_KEY + # ) # text-key nuova versione è text else: - logger.debug(f'Create index {const.PINECONE_INDEX} and embeddings ...') + #logger.debug(f'Create index {const.PINECONE_INDEX} and embeddings ...') + logger.debug(f'Create index {engine.index_name} and embeddings ...') - if os.environ.get("PINECONE_TYPE") == "serverless": - pc.create_index(const.PINECONE_INDEX, + if engine.type == "serverless": #os.environ.get("PINECONE_TYPE") == "serverless": + pc.create_index(engine.index_name, # const.PINECONE_INDEX, dimension=emb_dimension, - metric='cosine', + metric=engine.metric, spec=pinecone.ServerlessSpec(cloud="aws", region="us-west-2" ) ) else: - pc.create_index(const.PINECONE_INDEX, + pc.create_index(engine.index_name, #const.PINECONE_INDEX, dimension=emb_dimension, - metric='cosine', + metric=engine.metric, spec=pinecone.PodSpec(pod_type="p1", pods=1, environment="us-west4-gpc" ) ) + while not pc.describe_index(engine.index_name).status["ready"]: + time.sleep(1) + + host = pc.describe_index(engine.index_name).host + index = pc.Index(name=engine.index_name, host=host) + + vector_store = PineconeVectorStore(index=index, + embedding=embeddings, + text_key=engine.text_key, + pinecone_api_key=engine.apikey, + index_name=engine.index_name) + - vector_store = Pinecone.from_existing_index(index_name=const.PINECONE_INDEX, - embedding=embeddings, - text_key=const.PINECONE_TEXT_KEY - ) return vector_store @@ -445,7 +482,9 @@ def chunk_data(data, chunk_size=256, chunk_overlap=10): @staticmethod def chunk_data_extended(data, chunk_size=256, chunk_overlap=10, **kwargs): """ - Chunk document in small pieces. Semantic chunking is implemented too + Chunk document in small pieces. Semantic chunking is implemented too with + percentile, standard_deviation, interquartile, gradient + :param data: :param chunk_size: :param chunk_overlap: diff --git a/tilellm/store/pinecone/pinecone_repository_pod.py b/tilellm/store/pinecone/pinecone_repository_pod.py index 2b85d28..1c8ea0d 100644 --- a/tilellm/store/pinecone/pinecone_repository_pod.py +++ b/tilellm/store/pinecone/pinecone_repository_pod.py @@ -1,12 +1,12 @@ from tilellm.models.item_model import (MetadataItem, - PineconeIndexingResult + IndexingResult, Engine ) from tilellm.shared.utility import inject_embedding -from tilellm.tools.document_tool_simple import (get_content_by_url, - get_content_by_url_with_bs, - load_document, - load_from_wikipedia - ) +from tilellm.tools.document_tools import (get_content_by_url, + get_content_by_url_with_bs, + load_document, + load_from_wikipedia + ) from tilellm.store.pinecone.pinecone_repository_base import PineconeRepositoryBase @@ -23,7 +23,7 @@ class PineconeRepositoryPod(PineconeRepositoryBase): @inject_embedding() - async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) -> PineconeIndexingResult: + async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) -> IndexingResult: """ Add items to name space into Pinecone index @@ -46,8 +46,9 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) chunk_size = item.chunk_size chunk_overlap = item.chunk_overlap parameters_scrape_type_4 = item.parameters_scrape_type_4 + engine = item.engine try: - await self.delete_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) + await self.delete_pc_ids_namespace(engine=engine, metadata_id=metadata_id, namespace=namespace) except Exception as ex: logger.warning(ex) pass @@ -56,8 +57,8 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) # default text-embedding-ada-002 1536, text-embedding-3-large 3072, text-embedding-3-small 1536 oai_embeddings = embedding_obj # OpenAIEmbeddings(api_key=gpt_key, model=embedding) - vector_store = await self.create_pc_index(embeddings=oai_embeddings, emb_dimension=emb_dimension) - + vector_store = await self.create_pc_index(engine=engine, embeddings=oai_embeddings, emb_dimension=emb_dimension) + # print(f"=========== POD {type(vector_store)}") chunks = [] total_tokens = 0 cost = 0 @@ -101,11 +102,12 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) # pprint(documents) logger.debug(documents) - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, - namespace=namespace, - text_key=const.PINECONE_TEXT_KEY) + a = vector_store.add_documents(chunks, + #embedding=oai_embeddings, + #index_name=const.PINECONE_INDEX, + namespace=namespace, + #text_key=const.PINECONE_TEXT_KEY + ) total_tokens, cost = self.calc_embedding_cost(chunks, embedding) logger.info(f"chunks: {len(chunks)}, total_tokens: {total_tokens}, cost: {cost: .6f}") @@ -121,11 +123,12 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) chunks.append(document) # chunks.extend(chunk_data(data=documents)) total_tokens, cost = self.calc_embedding_cost(chunks, embedding) - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, - namespace=namespace, - text_key=const.PINECONE_TEXT_KEY) + a = vector_store.add_documents(chunks, + #embedding=oai_embeddings, + # index_name=const.PINECONE_INDEX, + namespace=namespace, + # text_key=const.PINECONE_TEXT_KEY + ) else: metadata = MetadataItem(id=metadata_id, source=source, type=type_source, embedding=embedding) @@ -140,38 +143,45 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None) ) ) total_tokens, cost = self.calc_embedding_cost(chunks, embedding) - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, + a = vector_store.add_documents(chunks, + #embedding=oai_embeddings, + # index_name=const.PINECONE_INDEX, namespace=namespace, - text_key=const.PINECONE_TEXT_KEY) + # text_key=const.PINECONE_TEXT_KEY + ) - pinecone_result = PineconeIndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, - cost=f"{cost:.6f}") + pinecone_result = IndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, + cost=f"{cost:.6f}") except Exception as ex: logger.error(repr(ex)) - pinecone_result = PineconeIndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, - status=400, - cost=f"{cost:.6f}") + pinecone_result = IndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, + status=400, + cost=f"{cost:.6f}") # {"id": f"{id}", "chunks": f"{len(chunks)}", "total_tokens": f"{total_tokens}", "cost": f"{cost:.6f}"} return pinecone_result - async def delete_pc_ids_namespace(self, metadata_id: str, namespace: str): + @inject_embedding() + async def add_pc_item_hybrid(self, item, embedding_obj=None, embedding_dimension=None): + pass + + + async def delete_pc_ids_namespace(self, engine: Engine, metadata_id: str, namespace: str): """ Delete from pinecone items + :param engine: :param metadata_id: :param namespace: :return: """ - + # print(f"DELETE ID FROM NAMESPACE api {engine.apikey} index_name {engine.index_name}") import pinecone try: pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key=engine.apikey # const.PINECONE_API_KEY ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host # const.PINECONE_INDEX).host + index = pc.Index(name=engine.index_name, host=host) # const.PINECONE_INDEX, host=host) describe = index.describe_index_stats() logger.debug(describe) namespaces = describe.get("namespaces", {}) diff --git a/tilellm/store/pinecone/pinecone_repository_serverless.py b/tilellm/store/pinecone/pinecone_repository_serverless.py index 5bd2f3e..23456d0 100644 --- a/tilellm/store/pinecone/pinecone_repository_serverless.py +++ b/tilellm/store/pinecone/pinecone_repository_serverless.py @@ -1,30 +1,40 @@ +import datetime +import torch + +from langchain_core.utils import batch_iterate + from tilellm.models.item_model import (MetadataItem, - PineconeIndexingResult + IndexingResult, Engine, ItemSingle ) -from tilellm.tools.document_tool_simple import (get_content_by_url, - get_content_by_url_with_bs, - load_document, - load_from_wikipedia - ) + +from tilellm.tools.document_tools import (get_content_by_url, + get_content_by_url_with_bs, + load_document, + load_from_wikipedia + ) from tilellm.store.pinecone.pinecone_repository_base import PineconeRepositoryBase from tilellm.shared.utility import inject_embedding -from tilellm.shared import const +from pinecone_text.sparse import SpladeEncoder + from langchain_core.documents import Document -from langchain_openai import OpenAIEmbeddings + import uuid -import os + import logging +from tilellm.tools.sparse_encoders import TiledeskSparseEncoders + logger = logging.getLogger(__name__) class PineconeRepositoryServerless(PineconeRepositoryBase): + @inject_embedding() - async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None): + async def add_pc_item(self, item:ItemSingle, embedding_obj=None, embedding_dimension=None): """ Add items to name space into Pinecone index @@ -35,156 +45,174 @@ async def add_pc_item(self, item, embedding_obj=None, embedding_dimension=None): """ logger.info(item) - metadata_id = item.id - source = item.source - type_source = item.type - content = item.content - #gpt_key = item.gptkey - embedding = item.embedding - namespace = item.namespace - semantic_chunk = item.semantic_chunk - breakpoint_threshold_type = item.breakpoint_threshold_type - scrape_type = item.scrape_type - chunk_size = item.chunk_size - chunk_overlap = item.chunk_overlap - parameters_scrape_type_4 = item.parameters_scrape_type_4 + await self.delete_pc_ids_namespace(engine=item.engine, + metadata_id=item.id, + namespace=item.namespace) + + vector_store = await self.create_vector_store(engine=item.engine, + embedding_obj=embedding_obj, + embedding_dimension=embedding_dimension, + metric="cosine") + + chunks = [] + total_tokens = 0 + cost = 0 + try: - await self.delete_pc_ids_namespace(metadata_id=metadata_id, namespace=namespace) + + if item.type in ['url', 'pdf', 'docx', 'txt']: + documents = await self.fetch_documents(type_source=item.type, + source=item.source, + scrape_type=item.scrape_type, + parameters_scrape_type_4=item.parameters_scrape_type_4) + chunks = self.chunk_documents(item=item, + documents=documents, + embeddings=embedding_obj + ) + else: + metadata = MetadataItem(id=item.id, + source=item.source, + type=item.type, + embedding=item.embedding).model_dump() + documents = await self.process_contents(type_source=item.type, + source=item.source, + metadata=metadata, + content=item.content) + + chunks.extend(self.chunk_data_extended( + data=[documents[0]], + chunk_size=item.chunk_size, + chunk_overlap=item.chunk_overlap, + semantic=item.semantic_chunk, + embeddings=embedding_obj, + breakpoint_threshold_type=item.breakpoint_threshold_type) + ) + + logger.debug(documents) + + total_tokens, cost = self.calc_embedding_cost(chunks, item.embedding) + + returned_ids = await self.upsert_vector_store(vector_store=vector_store, + chunks=chunks, + metadata_id=item.id, + namespace=item.namespace) + + logger.debug(returned_ids) + + return IndexingResult(id=item.id, + chunks=len(chunks), + total_tokens=total_tokens, + cost=f"{cost:.6f}") + except Exception as ex: - logger.warning(ex) - pass + import traceback + traceback.print_exc() + logger.error(repr(ex)) + return IndexingResult(id=item.id, + chunks=len(chunks), + total_tokens=total_tokens, + status=400, + cost=f"{cost:.6f}") + + + + @inject_embedding() + async def add_pc_item_hybrid(self, item, embedding_obj=None, embedding_dimension=None): + """ + Add item for hybrid search + :param item: + :param embedding_obj: + :param embedding_dimension: + :return: + """ + logger.info(item) - emb_dimension = embedding_dimension # self.get_embeddings_dimension(embedding) + await self.delete_pc_ids_namespace(engine=item.engine, + metadata_id=item.id, + namespace=item.namespace) + + vector_store = await self.create_vector_store(engine=item.engine, + embedding_obj=embedding_obj, + embedding_dimension=embedding_dimension, + metric="dotproduct") # default text-embedding-ada-002 1536, text-embedding-3-large 3072, text-embedding-3-small 1536 - oai_embeddings = embedding_obj # OpenAIEmbeddings(api_key=gpt_key, model=embedding) - vector_store = await self.create_pc_index(embeddings=oai_embeddings, emb_dimension=emb_dimension) - # textprova ="test degli embeddings di voyage" - # query_result = oai_embeddings.embed_query(textprova) - # print(f"len: {len(query_result)}") - # print(query_result) - # raise Exception + chunks = [] total_tokens = 0 cost = 0 try: - if (type_source == 'url' or - type_source == 'pdf' or - type_source == 'docx' or - type_source == 'txt'): - - documents = [] - if type_source == 'url' or type_source == 'txt': - documents = await get_content_by_url(source, - scrape_type, - parameters_scrape_type_4=parameters_scrape_type_4) - else: # elif type_source == 'pdf' or 'docx' or 'txt': - documents = load_document(source, type_source) - - for document in documents: - - document.metadata["id"] = metadata_id - document.metadata["source"] = source - document.metadata["type"] = type_source - document.metadata["embedding"] = embedding - - for key, value in document.metadata.items(): - if isinstance(value, list) and all(item is None for item in value): - document.metadata[key] = [""] - elif value is None: - document.metadata[key] = "" - - chunks.extend(self.chunk_data_extended(data=[document], - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - semantic=semantic_chunk, - embeddings=oai_embeddings, - breakpoint_threshold_type=breakpoint_threshold_type - ) - ) - # from pprint import pprint - # pprint(documents) - logger.debug(documents) - - # from pprint import pprint - # pprint(chunks) - - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, - namespace=namespace, - text_key=const.PINECONE_TEXT_KEY, - ids=[f"{metadata_id}#{uuid.uuid4().hex}" for i in range(len(chunks))]) - - total_tokens, cost = self.calc_embedding_cost(chunks, embedding) - logger.info(f"chunks: {len(chunks)}, total_tokens: {total_tokens}, cost: {cost: .6f}") - - # from pprint import pprint - # pprint(documents) - elif type_source == 'urlbs': - doc_array = get_content_by_url_with_bs(source) - chunks = list() - for doc in doc_array: - metadata = MetadataItem(id=metadata_id, source=source, type=type_source, embedding=embedding) - - document = Document(page_content=doc, metadata=metadata.model_dump()) #dict()) - - chunks.append(document) - # chunks.extend(chunk_data(data=documents)) - total_tokens, cost = self.calc_embedding_cost(chunks, embedding) - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, - namespace=namespace, - text_key=const.PINECONE_TEXT_KEY, - ids=[f"{metadata_id}#{uuid.uuid4().hex}" for i in range(len(chunks))] - ) - + if item.type in ['url', 'pdf', 'docx', 'txt']: + documents = await self.fetch_documents(type_source = item.type, + source=item.source, + scrape_type=item.scrape_type, + parameters_scrape_type_4=item.parameters_scrape_type_4) + + chunks = self.chunk_documents(item=item, + documents=documents, + embeddings=embedding_obj + ) else: - metadata = MetadataItem(id=metadata_id, source=source, type=type_source, embedding=embedding) - document = Document(page_content=content, metadata=metadata.model_dump()) #tolto dict() - - chunks.extend(self.chunk_data_extended(data=[document], - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - semantic=semantic_chunk, - embeddings=oai_embeddings, - breakpoint_threshold_type=breakpoint_threshold_type - ) - ) - total_tokens, cost = self.calc_embedding_cost(chunks, embedding) - a = vector_store.from_documents(chunks, - embedding=oai_embeddings, - index_name=const.PINECONE_INDEX, - namespace=namespace, - text_key=const.PINECONE_TEXT_KEY, - ids=[f"{metadata_id}#{uuid.uuid4().hex}" for i in range(len(chunks))] - ) - - pinecone_result = PineconeIndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, - cost=f"{cost:.6f}") + metadata = MetadataItem(id=item.id, + source=item.source, + type=item.type, + embedding=item.embedding).model_dump() + documents = await self.process_contents(type_source=item.type, + source=item.source, + metadata=metadata, + content=item.content) + chunks.extend(self.chunk_data_extended( + data=[documents[0]], + chunk_size=item.chunk_size, + chunk_overlap=item.chunk_overlap, + semantic=item.semantic_chunk, + embeddings=embedding_obj, + breakpoint_threshold_type=item.breakpoint_threshold_type) + ) + + contents = [chunk.page_content for chunk in chunks] + total_tokens, cost = self.calc_embedding_cost(chunks, item.embedding) + + sparse_encoder = TiledeskSparseEncoders(item.sparse_encoder) + doc_sparse_vectors = sparse_encoder.encode_documents(contents) + + indice = vector_store.get_pinecone_index(item.engine.index_name, pinecone_api_key=item.engine.apikey) + + await self.upsert_vector_store_hybrid(indice, + contents, + chunks, + item.id, + namespace = item.namespace, + engine=item.engine, + embeddings=embedding_obj, + sparse_vectors=doc_sparse_vectors) + + return IndexingResult(id=item.id, chunks=len(chunks), total_tokens=total_tokens, + cost=f"{cost:.6f}") + except Exception as ex: import traceback traceback.print_exc() logger.error(repr(ex)) - pinecone_result = PineconeIndexingResult(id=metadata_id, chunks=len(chunks), total_tokens=total_tokens, - status=400, - cost=f"{cost:.6f}") - # {"id": f"{id}", "chunks": f"{len(chunks)}", "total_tokens": f"{total_tokens}", "cost": f"{cost:.6f}"} - return pinecone_result + return IndexingResult(id=item.id, chunks=len(chunks), total_tokens=total_tokens, + status=400, + cost=f"{cost:.6f}") + + #return pinecone_result - async def delete_pc_ids_namespace(self, metadata_id: str, namespace: str): + async def delete_pc_ids_namespace(self, engine, metadata_id: str, namespace: str): import pinecone try: + pc = pinecone.Pinecone( - api_key=const.PINECONE_API_KEY + api_key= engine.apikey #const.PINECONE_API_KEY ) - host = pc.describe_index(const.PINECONE_INDEX).host - index = pc.Index(name=const.PINECONE_INDEX, host=host) + host = pc.describe_index(engine.index_name).host#const.PINECONE_INDEX).host + index = pc.Index(name=engine.index_name , host=host)#const.PINECONE_INDEX, host=host) describe = index.describe_index_stats() logger.debug(describe) @@ -197,5 +225,105 @@ async def delete_pc_ids_namespace(self, metadata_id: str, namespace: str): except Exception as ex: # logger.error(ex) - raise ex - + logger.warning(ex) + #raise ex + + + #async def delete_existing_items(self, engine: Engine, metadata_id: str, namespace: str): + # try: + # await self.delete_pc_ids_namespace(engine=engine, metadata_id=metadata_id, namespace=namespace) + # except Exception as ex: + # logger.warning(ex) + + async def create_vector_store(self, engine: Engine, embedding_obj, embedding_dimension: int, metric: str): + engine.metric = metric + return await self.create_pc_index(engine=engine, embeddings=embedding_obj, emb_dimension=embedding_dimension) + + @staticmethod + async def fetch_documents(type_source, source, scrape_type, parameters_scrape_type_4): + if type_source in ['url', 'txt']: + return await get_content_by_url(source, + scrape_type, + parameters_scrape_type_4=parameters_scrape_type_4) + return load_document(source, type_source) + + @staticmethod + def process_document_metadata(document, metadata): + document.metadata.update(metadata) + document.metadata['date'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f") + for key, value in document.metadata.items(): + if isinstance(value, list) and all(item is None for item in value): + document.metadata[key] = [""] + elif value is None: + document.metadata[key] = "" + return document + + def chunk_documents(self, item, documents, embeddings): + chunks = [] + for document in documents: + document.metadata["id"] = item.id + document.metadata["source"] = item.source + document.metadata["type"] = item.type + document.metadata["embedding"] = item.embedding + processed_document = self.process_document_metadata(document, document.metadata) + chunks.extend(self.chunk_data_extended( + data=[processed_document], + chunk_size=item.chunk_size, + chunk_overlap=item.chunk_overlap, + semantic=item.semantic_chunk, + embeddings=embeddings, + breakpoint_threshold_type=item.breakpoint_threshold_type) + ) + return chunks + + @staticmethod + async def process_contents(type_source, source, metadata, content): + if type_source == 'urlbs': + doc_array = get_content_by_url_with_bs(source) + return [Document(page_content=doc, metadata=MetadataItem(**metadata).model_dump()) for doc in doc_array] + + document = Document(page_content=content, metadata=MetadataItem(**metadata).model_dump()) + return [document] + + @staticmethod + async def upsert_vector_store_hybrid(indice, contents, chunks, metadata_id, engine, namespace, embeddings, sparse_vectors): + embedding_chunk_size = 1000 + batch_size: int = 32 + + ids = [f"{metadata_id}#{uuid.uuid4().hex}" for _ in range(len(chunks))] + metadatas = [{**chunk.metadata, engine.text_key: chunk.page_content} for chunk in chunks] + async_req = True + + for i in range(0, len(contents), embedding_chunk_size): + chunk_texts = contents[i: i + embedding_chunk_size] + chunk_ids = ids[i: i + embedding_chunk_size] + chunk_metadatas = metadatas[i: i + embedding_chunk_size] + embedding_values = embeddings.embed_documents(chunk_texts)#embeddings[i: i + embedding_chunk_size] + sparse_values = sparse_vectors[i: i + embedding_chunk_size] + + vector_tuples = [ + {'id': idr, + 'values': embedding, + 'metadata': chunk, + 'sparse_values': sparse_value} + for idr, embedding, chunk, sparse_value in + zip(chunk_ids, embedding_values, chunk_metadatas, sparse_values) + ] + + if async_req: + async_res = [ + indice.upsert(vectors=batch_vector_tuples, + namespace=namespace, + async_req=async_req) + for batch_vector_tuples in batch_iterate(batch_size, vector_tuples) + ] + [res.get() for res in async_res] + else: + indice.upsert(vectors=vector_tuples, + namespace=namespace, + async_req=async_req) + + @staticmethod + async def upsert_vector_store(vector_store, chunks, metadata_id, namespace): + ids = [f"{metadata_id}#{uuid.uuid4().hex}" for _ in range(len(chunks))] + returned_ids = await vector_store.aadd_documents(chunks, namespace=namespace, ids=ids) \ No newline at end of file diff --git a/tilellm/tools/document_tool_simple.py b/tilellm/tools/document_tools.py similarity index 67% rename from tilellm/tools/document_tool_simple.py rename to tilellm/tools/document_tools.py index 78be903..35eb149 100644 --- a/tilellm/tools/document_tool_simple.py +++ b/tilellm/tools/document_tools.py @@ -1,15 +1,19 @@ +import time + +import requests +import logging +import asyncio -from langchain_community.document_loaders import BSHTMLLoader from langchain_community.document_loaders import UnstructuredURLLoader from langchain_community.document_loaders import AsyncChromiumLoader -from langchain_community.document_loaders import PlaywrightURLLoader -import requests -import logging + +from playwright.async_api import async_playwright from langchain_community.document_transformers import BeautifulSoupTransformer from langchain_core.documents import Document +from playwright.sync_api import sync_playwright logger = logging.getLogger(__name__) @@ -43,8 +47,21 @@ async def get_content_by_url(url: str, scrape_type: int, **kwargs) -> list[Docu ) docs = await loader.aload() elif scrape_type == 2: - loader = PlaywrightURLLoader(urls=urls) - docs = await loader.aload() + + + params_type_4 = kwargs.get("parameters_scrape_type_4") + docs= await scrape_page(url, params_type_4) + #loop = asyncio.new_event_loop() + #queue = Queue() + #scraping_thread = threading.Thread(target=run_scraping_in_thread, args=(loop, queue, url, params_type_4)) + ##scraping_thread.start() + #scraping_thread.join() # This will wait for the thread to finish + #if not queue.empty(): + # docs = queue.get() + #else: + # docs =[] + + elif scrape_type == 3: loader = AsyncChromiumLoader(urls=urls, user_agent='Mozilla/5.0') docs = await loader.aload() @@ -56,6 +73,7 @@ async def get_content_by_url(url: str, scrape_type: int, **kwargs) -> list[Docu params_type_4 = kwargs.get("parameters_scrape_type_4") loader = AsyncChromiumLoader(urls=urls, user_agent='Mozilla/5.0') docs = await loader.aload() + bs_transformer = BeautifulSoupTransformer() docs_transformed = bs_transformer.transform_documents(docs, tags_to_extract=params_type_4.tags_to_extract, @@ -65,6 +83,7 @@ async def get_content_by_url(url: str, scrape_type: int, **kwargs) -> list[Docu remove_comments=params_type_4.remove_comments ) docs = docs_transformed + # print(f"=== DOCS BS4 {docs}") for doc in docs: doc.metadata = clean_metadata(doc.metadata) @@ -76,6 +95,73 @@ async def get_content_by_url(url: str, scrape_type: int, **kwargs) -> list[Docu except Exception as ex: raise ex +def run_scraping_in_thread(loop, queue, *args, **kwargs): + asyncio.set_event_loop(loop) + docs = loop.run_until_complete(scrape_page(*args, **kwargs)) + queue.put(docs) + +async def scrape_page(url, params_type_4, time_sleep=2): + logger.info("Starting scraping...") + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + page = await browser.new_page(user_agent="Mozilla/5.0 AppleWebKit/537.36 Chrome/128.0.0.0 Safari/537.36", + java_script_enabled=True) + + await page.goto(url=url) + + await page.wait_for_load_state() + time.sleep(params_type_4.time_sleep) + results = await page.content() + await browser.close() + + metadata = {"source": url} + doc = Document(page_content=results, metadata=metadata) + docs = [doc] + #logger.info(docs) + bs_transformer = BeautifulSoupTransformer() + docs_transformed = bs_transformer.transform_documents( + docs, + tags_to_extract=params_type_4.tags_to_extract, + unwanted_tags=params_type_4.unwanted_tags, + unwanted_classnames=params_type_4.unwanted_classnames, + remove_lines=params_type_4.remove_lines, + remove_comments=params_type_4.remove_comments + ) + docs = docs_transformed + #for doc in docs: + # doc.metadata = clean_metadata(doc.metadata) + return docs + +def scrape_page_new(url, params_type_4): + logger.info("Starting scraping...") + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(user_agent="Mozilla/5.0 AppleWebKit/537.36 Chrome/128.0.0.0 Safari/537.36", + java_script_enabled=True) + + page.goto(url=url, wait_until="load") + results = page.content() + browser.close() + + metadata = {"source": url} + doc = Document(page_content=results, metadata=metadata) + docs = [doc] + + bs_transformer = BeautifulSoupTransformer() + docs_transformed = bs_transformer.transform_documents( + docs, + tags_to_extract=params_type_4.tags_to_extract, + unwanted_tags=params_type_4.unwanted_tags, + unwanted_classnames=params_type_4.unwanted_classnames, + remove_lines=params_type_4.remove_lines, + remove_comments=params_type_4.remove_comments + ) + docs = docs_transformed + for doc in docs: + doc.metadata = clean_metadata(doc.metadata) + return docs + + def load_document(url: str, type_source: str): # import os diff --git a/tilellm/tools/sparse_encoders.py b/tilellm/tools/sparse_encoders.py new file mode 100644 index 0000000..8a6a2c8 --- /dev/null +++ b/tilellm/tools/sparse_encoders.py @@ -0,0 +1,85 @@ +import torch +from pinecone_text.sparse import SpladeEncoder +from FlagEmbedding import BGEM3FlagModel + + +class TiledeskSpladeEncoder: + def __init__(self): + self.device = 'cuda' if torch.cuda.is_available() else 'cpu' + self.splade = SpladeEncoder(device=self.device) + + def encode_documents(self, contents): + # Logica di encoding specifica per SpladeEncoder + + doc_sparse_vectors = self.splade.encode_documents(contents) + return doc_sparse_vectors + + def encode_queries(self,query): + return self.splade.encode_queries(query) + + +class TiledeskBGEM3: + def __init__(self): + self.use_fp16_bool = True if torch.cuda.is_available() else False + self.model = BGEM3FlagModel('BAAI/bge-m3', + use_fp16=self.use_fp16_bool + ) + + def encode_documents(self, contents): + + + output_1 = self.model.encode(contents, return_dense=True, return_sparse=True, return_colbert_vecs=False) + dd = output_1['lexical_weights'] + doc_sparse_vectors = [ + { + 'indices': [int(k) for k in dd.keys()], + 'values': [float(dd[k]) for k in dd.keys()] + } + for dd in dd + ] + + #for d in doc_sparse_vectors: + # print(d) + + return doc_sparse_vectors + + def encode_queries(self, query): + query_encode = self.model.encode([query], return_dense=False, return_sparse=True, return_colbert_vecs=False) + dd = query_encode['lexical_weights'] + doc_sparse_vectors = [ + { + 'indices': [int(k) for k in dd.keys()], + 'values': [float(dd[k]) for k in dd.keys()] + } + for dd in dd + ] + #print(doc_sparse_vectors[0]) + return doc_sparse_vectors[0] + + + +class TiledeskSparseEncoders: + def __init__(self, model_name): + self.encoder = self._get_encoder(model_name) + self.device = 'cuda' if torch.cuda.is_available() else 'cpu' + + def _get_encoder(self, model_name): + if model_name == "splade": + return TiledeskSpladeEncoder() + elif model_name == "bge-m3": + return TiledeskBGEM3() + else: + raise ValueError("Unsupported model_name: {}. Supported values are 'splade' and 'bge-m3'.".format(model_name)) + + def encode_documents(self, contents): + if self.encoder: + return self.encoder.encode_documents(contents) + else: + raise ValueError("No encoder has been initialized.") + + def encode_queries(self, query): + if self.encoder: + return self.encoder.encode_queries(query) + else: + raise ValueError("No encoder has been initialized.") +