Skip to content

Commit

Permalink
added /api/scrape/single and /api/scrape/enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
glorenzo972 committed May 18, 2024
1 parent 9354343 commit 918db42
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 12 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
*Andrea Sponziello*
### **Copyrigth**: *Tiledesk SRL*

## [2024-05-18]

### 0.1.16
- added: /api/scrape/single without redis queue
- added: /api/scrape/enqueue to enqueue item into redis queue

## [2024-05-14]

### 0.1.15
Expand All @@ -15,7 +21,7 @@

### 0.1.14
- added parameter to entrypoint.sh
-

## [2024-05-06]

### 0.1.13
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tilellm"
version = "0.1.15"
version = "0.1.16"
description = "tiledesk for RAG"
authors = ["Gianluca Lorenzo <[email protected]>"]
repository = "https://github.com/Tiledesk/tiledesk-llm"
Expand Down
115 changes: 106 additions & 9 deletions tilellm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ async def reader(channel: aioredis.client.Redis):
)

for stream, message_data in messages:
for message in message_data:
for message_id, message_values in message_data:
logger.debug(f"My role is {tilellm_role} consume message")
message_id, message_values = message
# message_id, message_values= message
import ast

byte_str = message_values[b"single"]
Expand Down Expand Up @@ -136,8 +136,7 @@ async def reader(channel: aioredis.client.Redis):
json=res.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(res)
logger.info(f"200 ==>{await res.json()}")
logger.info(f"200 {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass
Expand Down Expand Up @@ -166,8 +165,7 @@ async def reader(channel: aioredis.client.Redis):
json=pc_result.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(res)
logger.info(f"===========> {await res.json()}")
logger.info(f"300 {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass
Expand All @@ -176,6 +174,7 @@ async def reader(channel: aioredis.client.Redis):
const.STREAM_NAME,
const.STREAM_CONSUMER_GROUP,
message_id)
logger.info(f"xack to message_id: {message_id}")

except Exception as e:
scrape_status_response = ScrapeStatusResponse(status_message="Error",
Expand Down Expand Up @@ -216,10 +215,10 @@ async def redis_consumer(app: FastAPI):
app = FastAPI(lifespan=redis_consumer)


@app.post("/api/scrape/single")
async def create_scrape_item_main(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)):
@app.post("/api/scrape/enqueue")
async def enqueue_scrape_item_main(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)):
"""
Add items to namespace
enqueue item to redis. Consumer read message and add it to namespace
:param item:
:param redis_client:
:return:
Expand All @@ -238,6 +237,104 @@ async def create_scrape_item_main(item: ItemSingle, redis_client: aioredis.clien

return {"message": f"Item {item.id} created successfully, more {res}"}

@app.post("/api/scrape/single")
async def create_scrape_item_single(item: ItemSingle, redis_client: aioredis.client.Redis = Depends(get_redis_client)):
"""
Add item to namespace
:param item:
:param redis_client:
:return:
"""
webhook = ""
token = ""

try:
logger.debug(item)
scrape_status_response = ScrapeStatusResponse(status_message="Indexing started",
status_code=2
)
add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"Start {add_to_queue}")

raw_webhook = item.webhook
if '?' in raw_webhook:
webhook, raw_token = raw_webhook.split('?')

if raw_token.startswith('token='):
_, token = raw_token.split('=')
else:
webhook = raw_webhook

logger.info(f"webhook: {webhook}, token: {token}")

if webhook:
res = PineconeIndexingResult(id=item.id, status=200)
try:
async with aiohttp.ClientSession() as session:
res = await session.post(webhook,
json=res.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(f"200 {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass

pc_result = await add_pc_item(item)
# import datetime
# current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")

# pc_result["date"]= current_time
# pc_result["status"] = current_time

# A POST request to the API

scrape_status_response = ScrapeStatusResponse(status_message="Indexing finish",
status_code=3
)
add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.debug(f"End {add_to_queue}")
if webhook:
try:
async with aiohttp.ClientSession() as session:
res = await session.post(webhook,
json=pc_result.model_dump(exclude_none=True),
headers={"Content-Type": "application/json",
"X-Auth-Token": token})
logger.info(f"300 {await res.json()}")
except Exception as ewh:
logger.error(ewh)
pass

return JSONResponse(content={"message": f"Item {item.id} created successfully"})

except Exception as e:
scrape_status_response = ScrapeStatusResponse(status_message="Error",
status_code=4
)
add_to_queue = await redis_client.set(f"{item.namespace}/{item.id}",
scrape_status_response.model_dump_json(),
ex=expiration_in_seconds)

logger.error(f"Error {add_to_queue}")
import traceback
if webhook:
res = PineconeIndexingResult(id=item.id, status=400, error=repr(e))
async with aiohttp.ClientSession() as session:
response = await session.post(webhook, json=res.model_dump(exclude_none=True),
headers={"Content-Type": "application/json", "X-Auth-Token": token})
logger.error(response)
logger.error(f"{await response.json()}")
logger.error(f"Error {e}, webhook: {webhook}")
traceback.print_exc()
logger.error(e)
raise HTTPException(status_code=400, detail=repr(e))

@app.post("/api/qa")
async def post_ask_with_memory_main(question_answer: QuestionAnswer):
Expand Down
4 changes: 3 additions & 1 deletion tilellm/store/pinecone_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,11 @@ async def get_pc_all_obj_namespace(namespace: str):

logger.debug(f"pinecone total vector in {namespace}: {total_vectors}")

batch_size = min([total_vectors, 1000])

pc_res = index.query(
vector=[0] * 1536, # [0,0,0,0......0]
top_k=total_vectors,
top_k=batch_size,
# filter={"id": {"$eq": id}},
namespace=namespace,
include_values=False,
Expand Down

0 comments on commit 918db42

Please sign in to comment.