-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MultimodalQnA image query, pdf, and dynamic ports #1134
base: main
Are you sure you want to change the base?
Changes from all commits
2eaf136
36de7cf
89655e7
20a79e1
236da36
fa61505
a4db261
cc4f41f
7b045f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ | |
service_type=ServiceType.ASR, | ||
endpoint="/v1/audio/transcriptions", | ||
host="0.0.0.0", | ||
port=9099, | ||
port=int(os.getenv("ASR_PORT", 9099)), | ||
input_datatype=Base64ByteStrDoc, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as above. |
||
output_datatype=LLMParamsDoc, | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,16 @@ | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import base64 | ||
import json | ||
import os | ||
import shutil | ||
import time | ||
import uuid | ||
from pathlib import Path | ||
from typing import Any, Dict, Iterable, List, Optional, Type, Union | ||
|
||
import pymupdf | ||
from config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, LVM_ENDPOINT, REDIS_URL, WHISPER_MODEL | ||
from fastapi import File, HTTPException, UploadFile | ||
from langchain_community.utilities.redis import _array_to_buffer | ||
|
@@ -301,7 +304,53 @@ def prepare_data_and_metadata_from_annotation( | |
return text_list, image_list, metadatas | ||
|
||
|
||
def ingest_multimodal(videoname, data_folder, embeddings): | ||
def prepare_pdf_data_from_annotation(annotation, path_to_frames, title): | ||
"""PDF data processing has some key differences from videos and images. | ||
|
||
1. Neighboring frames' transcripts are not currently considered relevant. | ||
We are only taking the text located on the same page as the image. | ||
2. The images/frames are indexed differently, by page and image-within-page | ||
indices, as opposed to a single frame index. | ||
3. Instead of time of frame in ms, we return the PDF page index through | ||
the pre-existing time_of_frame_ms metadata key to maintain compatibility. | ||
""" | ||
text_list = [] | ||
image_list = [] | ||
metadatas = [] | ||
for frame in annotation: | ||
page_index = frame["frame_no"] | ||
image_index = frame["sub_video_id"] | ||
path_to_frame = os.path.join(path_to_frames, f"page{page_index}_image{image_index}.png") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mhbuehler Anyway, this is a new function. Why do we need to preserve the name of local variables relating to frames/videos (e.g., path_to_frame, video_id)? This might cause confusion. |
||
caption_for_ingesting = frame["caption"] | ||
caption_for_inference = frame["caption"] | ||
|
||
video_id = frame["video_id"] | ||
b64_img_str = frame["b64_img_str"] | ||
embedding_type = "pair" if b64_img_str else "text" | ||
source_video = frame["video_name"] | ||
|
||
text_list.append(caption_for_ingesting) | ||
|
||
if b64_img_str: | ||
image_list.append(path_to_frame) | ||
|
||
metadatas.append( | ||
{ | ||
"content": caption_for_ingesting, | ||
"b64_img_str": b64_img_str, | ||
"video_id": video_id, | ||
"source_video": source_video, | ||
"time_of_frame_ms": page_index, # For PDFs save the page number | ||
"embedding_type": embedding_type, | ||
"title": title, | ||
"transcript_for_inference": caption_for_inference, | ||
} | ||
) | ||
|
||
return text_list, image_list, metadatas | ||
|
||
|
||
def ingest_multimodal(filename, data_folder, embeddings, is_pdf=False): | ||
"""Ingest text image pairs to Redis from the data/ directory that consists of frames and annotations.""" | ||
data_folder = os.path.abspath(data_folder) | ||
annotation_file_path = os.path.join(data_folder, "annotations.json") | ||
|
@@ -310,10 +359,15 @@ def ingest_multimodal(videoname, data_folder, embeddings): | |
annotation = load_json_file(annotation_file_path) | ||
|
||
# prepare data to ingest | ||
text_list, image_list, metadatas = prepare_data_and_metadata_from_annotation(annotation, path_to_frames, videoname) | ||
if is_pdf: | ||
text_list, image_list, metadatas = prepare_pdf_data_from_annotation(annotation, path_to_frames, filename) | ||
else: | ||
text_list, image_list, metadatas = prepare_data_and_metadata_from_annotation( | ||
annotation, path_to_frames, filename | ||
) | ||
|
||
MultimodalRedis.from_text_image_pairs_return_keys( | ||
texts=[f"From {videoname}. " + text for text in text_list], | ||
texts=[f"From {filename}. " + text for text in text_list], | ||
images=image_list, | ||
embedding=embeddings, | ||
metadatas=metadatas, | ||
|
@@ -335,7 +389,10 @@ def drop_index(index_name, redis_url=REDIS_URL): | |
|
||
|
||
@register_microservice( | ||
name="opea_service@prepare_videodoc_redis", endpoint="/v1/generate_transcripts", host="0.0.0.0", port=6007 | ||
name="opea_service@prepare_videodoc_redis", | ||
endpoint="/v1/generate_transcripts", | ||
host="0.0.0.0", | ||
port=int(os.getenv("DATAPREP_MMR_PORT", 6007)), | ||
) | ||
async def ingest_generate_transcripts(files: List[UploadFile] = File(None)): | ||
"""Upload videos or audio files with speech, generate transcripts using whisper and ingest into redis.""" | ||
|
@@ -444,7 +501,10 @@ async def ingest_generate_transcripts(files: List[UploadFile] = File(None)): | |
|
||
|
||
@register_microservice( | ||
name="opea_service@prepare_videodoc_redis", endpoint="/v1/generate_captions", host="0.0.0.0", port=6007 | ||
name="opea_service@prepare_videodoc_redis", | ||
endpoint="/v1/generate_captions", | ||
host="0.0.0.0", | ||
port=int(os.getenv("DATAPREP_MMR_PORT", 6007)), | ||
) | ||
async def ingest_generate_caption(files: List[UploadFile] = File(None)): | ||
"""Upload images and videos without speech (only background music or no audio), generate captions using lvm microservice and ingest into redis.""" | ||
|
@@ -506,11 +566,11 @@ async def ingest_generate_caption(files: List[UploadFile] = File(None)): | |
name="opea_service@prepare_videodoc_redis", | ||
endpoint="/v1/ingest_with_text", | ||
host="0.0.0.0", | ||
port=6007, | ||
port=int(os.getenv("DATAPREP_MMR_PORT", 6007)), | ||
) | ||
async def ingest_with_text(files: List[UploadFile] = File(None)): | ||
if files: | ||
accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif"] | ||
accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"] | ||
# Create a lookup dictionary containing all media files | ||
matched_files = {f.filename: [f] for f in files if os.path.splitext(f.filename)[1] in accepted_media_formats} | ||
uploaded_files_map = {} | ||
|
@@ -537,25 +597,25 @@ async def ingest_with_text(files: List[UploadFile] = File(None)): | |
elif file_extension not in accepted_media_formats: | ||
print(f"Skipping file {file.filename} because of unsupported format.") | ||
|
||
# Check if every media file has a caption file | ||
for media_file_name, file_pair in matched_files.items(): | ||
if len(file_pair) != 2: | ||
# Check that every media file that is not a pdf has a caption file | ||
for media_file_name, file_list in matched_files.items(): | ||
if len(file_list) != 2 and os.path.splitext(media_file_name)[1] != ".pdf": | ||
raise HTTPException(status_code=400, detail=f"No caption file found for {media_file_name}") | ||
|
||
if len(matched_files.keys()) == 0: | ||
return HTTPException( | ||
status_code=400, | ||
detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt)", | ||
detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt) or one .pdf file", | ||
) | ||
|
||
for media_file in matched_files: | ||
print(f"Processing file {media_file}") | ||
file_name, file_extension = os.path.splitext(media_file) | ||
|
||
# Assign unique identifier to file | ||
file_id = generate_id() | ||
|
||
# Create file name by appending identifier | ||
file_name, file_extension = os.path.splitext(media_file) | ||
media_file_name = f"{file_name}_{file_id}{file_extension}" | ||
media_dir_name = os.path.splitext(media_file_name)[0] | ||
|
||
|
@@ -564,25 +624,72 @@ async def ingest_with_text(files: List[UploadFile] = File(None)): | |
shutil.copyfileobj(matched_files[media_file][0].file, f) | ||
uploaded_files_map[file_name] = media_file_name | ||
|
||
# Save caption file in upload directory | ||
caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1] | ||
caption_file = f"{media_dir_name}{caption_file_extension}" | ||
with open(os.path.join(upload_folder, caption_file), "wb") as f: | ||
shutil.copyfileobj(matched_files[media_file][1].file, f) | ||
if file_extension == ".pdf": | ||
# Set up location to store pdf images and text, reusing "frames" and "annotations" from video | ||
output_dir = os.path.join(upload_folder, media_dir_name) | ||
os.makedirs(output_dir, exist_ok=True) | ||
os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True) | ||
doc = pymupdf.open(os.path.join(upload_folder, media_file_name)) | ||
annotations = [] | ||
for page_idx, page in enumerate(doc, start=1): | ||
text = page.get_text() | ||
images = page.get_images() | ||
for image_idx, image in enumerate(images, start=1): | ||
# Write image and caption file for each image found in pdf | ||
img_fname = f"page{page_idx}_image{image_idx}" | ||
img_fpath = os.path.join(output_dir, "frames", img_fname + ".png") | ||
pix = pymupdf.Pixmap(doc, image[0]) # create pixmap | ||
|
||
if pix.n - pix.alpha > 3: # if CMYK, convert to RGB first | ||
pix = pymupdf.Pixmap(pymupdf.csRGB, pix) | ||
|
||
pix.save(img_fpath) # pixmap to png | ||
pix = None | ||
|
||
# Convert image to base64 encoded string | ||
with open(img_fpath, "rb") as image2str: | ||
encoded_string = base64.b64encode(image2str.read()) # png to bytes | ||
|
||
decoded_string = encoded_string.decode() # bytes to string | ||
|
||
# Create annotations file, reusing metadata keys from video | ||
annotations.append( | ||
{ | ||
"video_id": file_id, | ||
"video_name": os.path.basename(os.path.join(upload_folder, media_file_name)), | ||
"b64_img_str": decoded_string, | ||
"caption": text, | ||
"time": 0.0, | ||
"frame_no": page_idx, | ||
"sub_video_id": image_idx, | ||
} | ||
) | ||
|
||
with open(os.path.join(output_dir, "annotations.json"), "w") as f: | ||
json.dump(annotations, f) | ||
|
||
# Ingest multimodal data into redis | ||
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings, is_pdf=True) | ||
else: | ||
# Save caption file in upload directory | ||
caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1] | ||
caption_file = f"{media_dir_name}{caption_file_extension}" | ||
with open(os.path.join(upload_folder, caption_file), "wb") as f: | ||
shutil.copyfileobj(matched_files[media_file][1].file, f) | ||
|
||
# Store frames and caption annotations in a new directory | ||
extract_frames_and_annotations_from_transcripts( | ||
file_id, | ||
os.path.join(upload_folder, media_file_name), | ||
os.path.join(upload_folder, caption_file), | ||
os.path.join(upload_folder, media_dir_name), | ||
) | ||
# Store frames and caption annotations in a new directory | ||
extract_frames_and_annotations_from_transcripts( | ||
file_id, | ||
os.path.join(upload_folder, media_file_name), | ||
os.path.join(upload_folder, caption_file), | ||
os.path.join(upload_folder, media_dir_name), | ||
) | ||
|
||
# Delete temporary caption file | ||
os.remove(os.path.join(upload_folder, caption_file)) | ||
# Delete temporary caption file | ||
os.remove(os.path.join(upload_folder, caption_file)) | ||
|
||
# Ingest multimodal data into redis | ||
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings) | ||
# Ingest multimodal data into redis | ||
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings) | ||
|
||
# Delete temporary media directory containing frames and annotations | ||
shutil.rmtree(os.path.join(upload_folder, media_dir_name)) | ||
|
@@ -602,7 +709,10 @@ async def ingest_with_text(files: List[UploadFile] = File(None)): | |
|
||
|
||
@register_microservice( | ||
name="opea_service@prepare_videodoc_redis", endpoint="/v1/dataprep/get_files", host="0.0.0.0", port=6007 | ||
name="opea_service@prepare_videodoc_redis", | ||
endpoint="/v1/dataprep/get_files", | ||
host="0.0.0.0", | ||
port=int(os.getenv("DATAPREP_MMR_PORT", 6007)), | ||
) | ||
async def rag_get_file_structure(): | ||
"""Returns list of names of uploaded videos saved on the server.""" | ||
|
@@ -616,7 +726,10 @@ async def rag_get_file_structure(): | |
|
||
|
||
@register_microservice( | ||
name="opea_service@prepare_videodoc_redis", endpoint="/v1/dataprep/delete_files", host="0.0.0.0", port=6007 | ||
name="opea_service@prepare_videodoc_redis", | ||
endpoint="/v1/dataprep/delete_files", | ||
host="0.0.0.0", | ||
port=int(os.getenv("DATAPREP_MMR_PORT", 6007)), | ||
) | ||
async def delete_files(): | ||
"""Delete all uploaded files along with redis index.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal container server port can be forwarded to any arbitrary host port. Why we need this also to be configurable?