Skip to content

Commit

Permalink
modified: log_conf.json to INFO Level
Browse files Browse the repository at this point in the history
  • Loading branch information
glorenzo972 committed May 2, 2024
1 parent 23e4713 commit 64adb4f
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 106 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
*Andrea Sponziello*
### **Copyrigth**: *Tiledesk SRL*

## [2024-05-02]

### 0.1.10
- fixed: any fields of metadata cannot be None.
- added: TILELLM_ROLE=qa|train in order to manage qa and train

## [2024-05-01]

### 0.1.9
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pip install -e .
export REDIS_URL="redis://localhost:6379/0"
export PINECONE_API_KEY="pinecone api key"
export PINECONE_TEXT_KEY="pinecone field for text - default text in pod content"
export PINECONE_INDEX = "pinecone index name"
export PINECONE_INDEX="pinecone index name"
export TILELLM_ROLE="role in pod. Train enable all the APIs, qa do not consume redis queue only Q&A"
tilellm
```

Expand All @@ -27,7 +28,7 @@ sudo docker build -t tilellm .


```
sudo docker run -d -p 8000:8000 --env environment="dev|prod" --env PINECONE_API_KEY="yourapikey" --env PINECONE_TEXT_KEY="text|content" --env PINECONE_INDEX="index_name" --env REDIS_URL="redis://redis:6379/0" --name tilellm --link test-redis:redis tilellm
sudo docker run -d -p 8000:8000 --env environment="dev|prod" --env PINECONE_API_KEY="yourapikey" --env PINECONE_TEXT_KEY="text|content" --env PINECONE_INDEX="index_name" --env TILELLM_ROLE="train|qa" --env REDIS_URL="redis://redis:6379/0" --name tilellm --link test-redis:redis tilellm
```
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tilellm"
version = "0.1.9"
version = "0.1.10"
description = "tiledesk for RAG"
authors = ["Gianluca Lorenzo <[email protected]>"]
repository = "https://github.com/Tiledesk/tiledesk-llm"
Expand Down
204 changes: 104 additions & 100 deletions tilellm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
# os.environ.__setitem__("ENVIRON", environment)

redis_url = os.environ.get("REDIS_URL")
tilellm_role = os.environ.get("TILELLM_ROLE")


async def get_redis_client():
Expand All @@ -78,108 +79,111 @@ async def reader(channel: aioredis.client.Redis):
:param channel:
:return:
"""

from tilellm.shared import const
logger.debug(f"My role is {tilellm_role}")
webhook = ""
token = ""
item = {}
while True:
try:
messages = await channel.xreadgroup(
groupname=const.STREAM_CONSUMER_GROUP,
consumername=const.STREAM_CONSUMER_NAME,
streams={const.STREAM_NAME: '>'},
count=1,
block=0 # Set block to 0 for non-blocking
)

for stream, message_data in messages:
for message in message_data:

message_id, message_values = message
import ast

byte_str = message_values[b"single"]
dict_str = byte_str.decode("UTF-8")
logger.info(dict_str)
item = ast.literal_eval(dict_str)
item_single = ItemSingle(**item)
scrape_status_response = ScrapeStatusResponse(status_message="Indexing started",
status_code=2
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"Start {add_to_queue}")
raw_webhook = item.get('webhook', "")
if '?' in raw_webhook:
webhook, raw_token = raw_webhook.split('?')

if raw_token.startswith('token='):
_, token = raw_token.split('=')
else:
webhook = raw_webhook

logger.info(f"webhook: {webhook}, token: {token}")

pc_result = await add_pc_item(item_single)
# import datetime
# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")

# pc_result["date"]= current_time
# pc_result["status"] = current_time

# A POST request to the API

scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish",
status_code=3
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"End {add_to_queue}")
if webhook:
try:
async with aiohttp.ClientSession() as session:
res = await session.post(webhook,
json=pc_result.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(res)
logger.info(f"===========> {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass

await channel.xack(
const.STREAM_NAME,
const.STREAM_CONSUMER_GROUP,
message_id)

except Exception as e:
scrape_status_response = ScrapeStatusResponse(status_message="Error",
status_code=4
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.error(f"Error {add_to_queue}")
import traceback
if webhook:
res = PineconeIndexingResult(status=400, error=repr(e))
async with aiohttp.ClientSession() as session:
response = await session.post(webhook, json=res.model_dump(exclude_none=True),
headers={"Content-Type": "application/json", "X-Auth-Token": token})
logger.error(response)
logger.error(f"{await response.json()}")
logger.error(f"Error {e}, webhook: {webhook}")
traceback.print_exc()
logger.error(e)
pass

if tilellm_role == "train":
while True:
try:
messages = await channel.xreadgroup(
groupname=const.STREAM_CONSUMER_GROUP,
consumername=const.STREAM_CONSUMER_NAME,
streams={const.STREAM_NAME: '>'},
count=1,
block=0 # Set block to 0 for non-blocking
)

for stream, message_data in messages:
for message in message_data:
logger.debug(f"My role is {tilellm_role} consume message")
message_id, message_values = message
import ast

byte_str = message_values[b"single"]
dict_str = byte_str.decode("UTF-8")
logger.info(dict_str)
item = ast.literal_eval(dict_str)
item_single = ItemSingle(**item)
scrape_status_response = ScrapeStatusResponse(status_message="Indexing started",
status_code=2
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"Start {add_to_queue}")
raw_webhook = item.get('webhook', "")
if '?' in raw_webhook:
webhook, raw_token = raw_webhook.split('?')

if raw_token.startswith('token='):
_, token = raw_token.split('=')
else:
webhook = raw_webhook

logger.info(f"webhook: {webhook}, token: {token}")

pc_result = await add_pc_item(item_single)
# import datetime
# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")

# pc_result["date"]= current_time
# pc_result["status"] = current_time

# A POST request to the API

scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish",
status_code=3
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"End {add_to_queue}")
if webhook:
try:
async with aiohttp.ClientSession() as session:
res = await session.post(webhook,
json=pc_result.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(res)
logger.info(f"===========> {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass

await channel.xack(
const.STREAM_NAME,
const.STREAM_CONSUMER_GROUP,
message_id)

except Exception as e:
scrape_status_response = ScrapeStatusResponse(status_message="Error",
status_code=4
)
add_to_queue = await channel.set(f"{item.get('namespace')}/{item.get('id')}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.error(f"Error {add_to_queue}")
import traceback
if webhook:
res = PineconeIndexingResult(status=400, error=repr(e))
async with aiohttp.ClientSession() as session:
response = await session.post(webhook, json=res.model_dump(exclude_none=True),
headers={"Content-Type": "application/json", "X-Auth-Token": token})
logger.error(response)
logger.error(f"{await response.json()}")
logger.error(f"Error {e}, webhook: {webhook}")
traceback.print_exc()
logger.error(e)
pass
else:
logger.debug(f"My role is {tilellm_role}")

@asynccontextmanager
async def redis_consumer(app: FastAPI):
Expand All @@ -191,8 +195,8 @@ async def redis_consumer(app: FastAPI):

await redis_client.close()

populate_constant()

populate_constant()
app = FastAPI(lifespan=redis_consumer)


Expand Down Expand Up @@ -384,7 +388,7 @@ async def get_root_endpoint():


def main():
print(f"Ambiente: {environment}")
logger.debug(f"Environment: {environment}")
import uvicorn
uvicorn.run("tilellm.__main__:app", host="0.0.0.0", port=8000, reload=True, log_level="info")#, log_config=args.log_path

Expand Down
8 changes: 5 additions & 3 deletions tilellm/shared/const.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os

STREAM_NAME="stream:single"
STREAM_CONSUMER_NAME="llmconsumer"
STREAM_CONSUMER_GROUP="llmconsumergroup"
STREAM_NAME = "stream:single"
STREAM_CONSUMER_NAME = "llmconsumer"
STREAM_CONSUMER_GROUP = "llmconsumergroup"

PINECONE_API_KEY = None
PINECONE_INDEX = None
PINECONE_TEXT_KEY = None


def populate_constant():
global PINECONE_API_KEY, PINECONE_INDEX, PINECONE_TEXT_KEY
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
Expand All @@ -16,3 +17,4 @@ def populate_constant():




12 changes: 12 additions & 0 deletions tilellm/store/pinecone_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ async def add_pc_item(item):
document.metadata["type"] = type_source
document.metadata["embedding"] = embedding

for key, value in document.metadata.items():
if isinstance(value, list) and all(item is None for item in value):
document.metadata[key] = [""]
elif value is None:
document.metadata[key] = ""

chunks.extend(chunk_data(data=[document]))



# from pprint import pprint
# pprint(documents)
logger.debug(documents)

a = vector_store.from_documents(chunks,
embedding=oai_embeddings,
index_name=const.PINECONE_INDEX,
Expand Down

0 comments on commit 64adb4f

Please sign in to comment.