Skip to content

Commit

Permalink
feat(construct): summarization construct (#24)
Browse files Browse the repository at this point in the history
* Added summary construct and lambda functions for summary stepfunction

* updated summary construct lambda functions

* fixed summary lambda issues

* summary construct fixes

* summary construct fixes

* removed unused nltk with bedrock client

* chore: self mutation

Signed-off-by: github-actions <[email protected]>

* added review comments

---------

Signed-off-by: github-actions <[email protected]>
Co-authored-by: Dinesh Sajwan <[email protected]>
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 5460b4e commit 95a7de8
Show file tree
Hide file tree
Showing 30 changed files with 1,736 additions and 464 deletions.
12 changes: 0 additions & 12 deletions .projen/deps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .projen/tasks.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const project = new awscdk.AwsCdkConstructLibrary({
packageName: '@' + GITHUB_USER + '/' + PROJECT_NAME, /* The "name" in package.json. */
keywords: ['constructs', 'aws-cdk', 'generative-ai', 'emerging-tech'],
devDeps: ['eslint-plugin-header'],
bundledDeps: ['deepmerge', '@types/deep-diff', '@types/npmlog'],
//bundledDeps: ['deepmerge', '@types/deep-diff', '@types/npmlog'],

// Keep synchronized with https://github.com/nodejs/release#release-schedule
minNodeVersion: '18.12.0', // 'MAINTENANCE' (first LTS)
Expand Down
20 changes: 20 additions & 0 deletions lambda/aws-summarization-appsync-stepfn/document_reader/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM amazon/aws-lambda-python:latest

# Installs python, removes cache file to make things smaller
RUN yum update -y && \
yum install -y python3 python3-dev python3-pip gcc git && \
rm -Rf /var/cache/yum

# Copies requirements.txt file into the container
COPY requirements.txt ./

# Installs dependencies found in your requirements.txt file
RUN pip install -r requirements.txt

# Be sure to copy over the function itself!
# Goes last to take advantage of Docker caching.
# COPY lambda.py ./
COPY . .

# Points to the handler function of your lambda function
CMD ["lambda.handler"]
59 changes: 59 additions & 0 deletions lambda/aws-summarization-appsync-stepfn/document_reader/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Dict
import boto3
from PyPDF2 import PdfReader

from aws_lambda_powertools import Logger, Tracer
from s3inmemoryloader import S3FileLoaderInMemory

logger = Logger(service="SUMMARY_DOCUMENT_READER")
tracer = Tracer(service="SUMMARY_DOCUMENT_READER")

s3 = boto3.resource('s3')

@tracer.capture_method
def read_file_from_s3(bucket, key):
logger.info(f"Fetching file from S3: bucket: {bucket}, key: {key}")
try:
obj = s3.Object(bucket, key)
return obj.get()["Body"].read().decode("utf-8")
except Exception as e:
logger.exception({"An error occured while attempting to read key " : key })
return None

@tracer.capture_method
def check_file_exists(bucket,key):
s3_client = boto3.client('s3')
try:
resp = s3_client.head_object(Bucket=bucket, Key=key)
return True
except s3_client.exceptions.ClientError as exp:
if exp.response['Error']['Code'] == '404':
logger.exception('Object doesn\'t exist')
return False
else:
logger.exception('An error occured')
return False


@tracer.capture_method
def get_file_transformation(transformed_asset_bucket,transformed_file_name,
input_asset_bucket,original_file_name):
response = {
'status':'File transformation Pending',
'name':original_file_name,
}
if (check_file_exists(transformed_asset_bucket, transformed_file_name) == False):
loader = S3FileLoaderInMemory(input_asset_bucket, original_file_name)
document_content = loader.load()
if not document_content:
response['status'] = 'Error'
response['name'] = ''
return response
encoded_string = document_content.encode("utf-8")
s3.Bucket(transformed_asset_bucket).put_object(Key=transformed_file_name, Body=encoded_string)
response['status'] = 'File transformed'
response['name'] = transformed_file_name
else:
response['status'] = 'File already exists'

return response
117 changes: 117 additions & 0 deletions lambda/aws-summarization-appsync-stepfn/document_reader/lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import os
from helper import check_file_exists,get_file_transformation
import redis

from update_summary_status import updateSummaryJobStatus
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.metrics import MetricUnit

logger = Logger(service="SUMMARY_DOCUMENT_READER")
tracer = Tracer(service="SUMMARY_DOCUMENT_READER")
metrics = Metrics(namespace="summary_pipeline", service="SUMMARY_DOCUMENT_READER")

transformed_bucket_name = os.environ["TRANSFORMED_ASSET_BUCKET"]
input_bucket_name = os.environ["INPUT_ASSET_BUCKET"]
is_file_tranformation_required = os.environ["IS_FILE_TRANSFORMED"]


@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event, context: LambdaContext):

logger.info(f"{event=}")

original_file_name = event["name"]
job_id = event["jobid"]

response = {
"is_summary_available": False,
"summary_job_id": job_id,
"file_name": original_file_name,
"status": "Pending",
"summary": "",
"transformed_file_name":'',
}

logger.set_correlation_id(job_id)
metrics.add_metadata(key='correlationId', value=job_id)
tracer.put_annotation(key="correlationId", value=job_id)

filesummary = get_summary_from_cache(original_file_name)

if filesummary is not None:
metrics.add_metric(name="summary_cache_hit",unit=MetricUnit.Count, value=1)
response.update(
{
"file_name": original_file_name,
"status": "Completed",
"summary": filesummary,
"is_summary_available": True,
}
)
else:
metrics.add_metric(name="summary_llm_hit", unit=MetricUnit.Count, value=1)
transformed_file_name = original_file_name.replace(".pdf", "_transformed.txt")

if(is_file_tranformation_required):
transformed_file = get_file_transformation(transformed_bucket_name,
transformed_file_name,
input_bucket_name,
original_file_name)
response.update(
{
"file_name": original_file_name,
"status": transformed_file['status'],
"summary": '',
"transformed_file_name":transformed_file_name,
"is_summary_available": False
}
)
else:
pdf_transformed_file = check_file_exists(transformed_bucket_name,
transformed_file_name)
if not pdf_transformed_file:
response.update(
{
"file_name": original_file_name,
"status": "Error",
"summary": f"No file {transformed_file_name} available to generate the summary.",
}
)
logger.exception({"No file {transformed_file_name} available to generate the summary."})
return response


logger.info({"document reader response:": response})
updateSummaryJobStatus({'jobid': job_id,
'file_name':response["file_name"]
,'status':response['status'] ,
'summary':response["summary"]})
return response

@tracer.capture_method
def get_summary_from_cache(file_name):

logger.info({"Searching Redis for cached summary file: "+file_name})
redis_host = os.environ.get("REDIS_HOST", "N/A")
redis_port = os.environ.get("REDIS_PORT", "N/A")

logger.info({"Redis host: "+redis_host})
logger.info({"Redis port: "+redis_port})

try:
redis_client = redis.Redis(host=redis_host, port=redis_port)
fileSummary = redis_client.get(file_name)
except (ValueError, redis.ConnectionError) as e:
logger.exception({"An error occured while connecting to Redis" : e})
return

if fileSummary:
logger.info({"File summary found in cache: ": fileSummary})
return fileSummary.decode()


logger.info("File summary not found in cache, generating it from llm")

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
redis
pypdf2
langchain==0.0.309
urllib3<2
aws-xray-sdk
aws-lambda-powertools
requests==2.31.0
requests-aws4auth==1.2.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Loading logic for loading documents in memory from an s3 file."""
from typing import List
from io import BytesIO
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.docstore.document import Document
from langchain.text_splitter import NLTKTextSplitter
from PyPDF2 import PdfReader
from aws_lambda_powertools import Logger, Tracer

logger = Logger(service="SUMMARY_DOCUMENT_READER")
tracer = Tracer(service="SUMMARY_DOCUMENT_READER")

@tracer.capture_method
class S3FileLoaderInMemory(BaseLoader):
"""Loading logic for loading documents from s3."""

def __init__(self, bucket: str, key: str):
"""Initialize with bucket and key name."""
self.bucket = bucket
self.key = key

def load(self) -> str:
"""Load documents."""
try:
import boto3
except ImportError:
raise ImportError(
"Could not import `boto3` python package. "
"Please install it with `pip install boto3`."
)
logger.exception('ImportError boto3')
# read S3
try:
s3 = boto3.resource('s3')
obj = s3.Object(self.bucket, self.key)
encodedpdf = obj.get()['Body'].read()
pdfFile = PdfReader(BytesIO(encodedpdf))
except s3.meta.client.exceptions.NoSuchBucket as exception:
logger.exception('NoSuchBucket')
return ""
except s3.meta.client.exceptions.NoSuchKey as exception:
logger.exception('NoSuchKey')
return ""
# read pdf
raw_text = []
for page in pdfFile.pages:
raw_text.append(page.extract_text())
return '\n'.join(raw_text)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import requests
import json
import boto3
from requests_aws4auth import AWS4Auth

from aws_lambda_powertools import Logger, Tracer

logger = Logger(service="SUMMARY_DOCUMENT_READER")
tracer = Tracer(service="SUMMARY_DOCUMENT_READER")

aws_region = boto3.Session().region_name
credentials = boto3.Session().get_credentials()
service = 'appsync'
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
aws_region,
service,
session_token=credentials.token,
)

@tracer.capture_method
def get_credentials(secret_id: str, region_name: str) -> str:

client = boto3.client('secretsmanager', region_name=region_name)
response = client.get_secret_value(SecretId=secret_id)
secrets_value = response['SecretString']

return secrets_value

@tracer.capture_method
def updateSummaryJobStatus(variables):

logger.info(f"send status variables :: {variables}")
query = """mutation updateSummaryJobStatus {
updateSummaryJobStatus(summary_job_id: \"$jobid\",file_name: \"$file_name\", status: \"$status\", summary: \"$summary\") {
summary_job_id
file_name
status
summary
}
}
"""

query = query.replace("$jobid", variables['jobid'])
query = query.replace("$file_name", variables['file_name'])
query = query.replace("$status", variables['status'])
query = query.replace("$summary", variables['summary'])


# query = query.replace("\"file_name\"", "file_name")
# query = query.replace("\"status\"", "status")
query = query.replace("\n", "")

request = {'query':query}

logger.info({"request": request})

GRAPHQL_URL = os.environ['GRAPHQL_URL']
HEADERS={
"Content-Type": "application/json",
}

responseJobstatus = requests.post(
json=request,
url=GRAPHQL_URL,
headers=HEADERS,
auth=aws_auth
)
logger.info({'res :: ': responseJobstatus})
19 changes: 19 additions & 0 deletions lambda/aws-summarization-appsync-stepfn/input_validator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM amazon/aws-lambda-python:latest

# Installs python, removes cache file to make things smaller
RUN yum update -y && \
yum install -y python3 python3-dev python3-pip gcc git && \
rm -Rf /var/cache/yum

# Copies requirements.txt file into the container
COPY requirements.txt ./
# Installs dependencies found in your requirements.txt file
RUN pip install -r requirements.txt

# Be sure to copy over the function itself!
# Goes last to take advantage of Docker caching.
# COPY lambda.py ./
COPY . .

# Points to the handler function of your lambda function
CMD ["lambda.handler"]
Loading

0 comments on commit 95a7de8

Please sign in to comment.