diff --git a/.projen/deps.json b/.projen/deps.json index cc81e91d..3f49ab3b 100644 --- a/.projen/deps.json +++ b/.projen/deps.json @@ -104,18 +104,6 @@ "name": "typescript", "type": "build" }, - { - "name": "@types/deep-diff", - "type": "bundled" - }, - { - "name": "@types/npmlog", - "type": "bundled" - }, - { - "name": "deepmerge", - "type": "bundled" - }, { "name": "aws-cdk-lib", "version": "2.99.1", diff --git a/.projen/tasks.json b/.projen/tasks.json index 9b19fa1e..df3372cd 100644 --- a/.projen/tasks.json +++ b/.projen/tasks.json @@ -251,13 +251,13 @@ "exec": "yarn upgrade npm-check-updates" }, { - "exec": "npm-check-updates --upgrade --target=minor --peer --dep=dev,peer,prod,optional --filter=@types/jest,@types/node,@typescript-eslint/eslint-plugin,@typescript-eslint/parser,eslint-import-resolver-node,eslint-import-resolver-typescript,eslint-plugin-header,eslint-plugin-import,eslint,jest,jest-junit,jsii-diff,jsii-pacmak,npm-check-updates,standard-version,ts-jest,ts-node,typescript,@types/deep-diff,@types/npmlog,deepmerge,constructs" + "exec": "npm-check-updates --upgrade --target=minor --peer --dep=dev,peer,prod,optional --filter=@types/jest,@types/node,@typescript-eslint/eslint-plugin,@typescript-eslint/parser,eslint-import-resolver-node,eslint-import-resolver-typescript,eslint-plugin-header,eslint-plugin-import,eslint,jest,jest-junit,jsii-diff,jsii-pacmak,npm-check-updates,standard-version,ts-jest,ts-node,typescript,constructs" }, { "exec": "yarn install --check-files" }, { - "exec": "yarn upgrade @types/jest @types/node @typescript-eslint/eslint-plugin @typescript-eslint/parser eslint-import-resolver-node eslint-import-resolver-typescript eslint-plugin-header eslint-plugin-import eslint jest jest-junit jsii-diff jsii-pacmak npm-check-updates standard-version ts-jest ts-node typescript @types/deep-diff @types/npmlog deepmerge constructs" + "exec": "yarn upgrade @types/jest @types/node @typescript-eslint/eslint-plugin @typescript-eslint/parser eslint-import-resolver-node eslint-import-resolver-typescript eslint-plugin-header eslint-plugin-import eslint jest jest-junit jsii-diff jsii-pacmak npm-check-updates standard-version ts-jest ts-node typescript constructs" }, { "exec": "npx projen" diff --git a/.projenrc.ts b/.projenrc.ts index 8e6a246d..70e6ef4b 100644 --- a/.projenrc.ts +++ b/.projenrc.ts @@ -36,7 +36,7 @@ const project = new awscdk.AwsCdkConstructLibrary({ packageName: '@' + GITHUB_USER + '/' + PROJECT_NAME, /* The "name" in package.json. */ keywords: ['constructs', 'aws-cdk', 'generative-ai', 'emerging-tech'], devDeps: ['eslint-plugin-header'], - bundledDeps: ['deepmerge', '@types/deep-diff', '@types/npmlog'], + //bundledDeps: ['deepmerge', '@types/deep-diff', '@types/npmlog'], // Keep synchronized with https://github.com/nodejs/release#release-schedule minNodeVersion: '18.12.0', // 'MAINTENANCE' (first LTS) diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/Dockerfile b/lambda/aws-summarization-appsync-stepfn/document_reader/Dockerfile new file mode 100644 index 00000000..315ef03e --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/Dockerfile @@ -0,0 +1,20 @@ +FROM amazon/aws-lambda-python:latest + +# Installs python, removes cache file to make things smaller +RUN yum update -y && \ + yum install -y python3 python3-dev python3-pip gcc git && \ + rm -Rf /var/cache/yum + +# Copies requirements.txt file into the container +COPY requirements.txt ./ + +# Installs dependencies found in your requirements.txt file +RUN pip install -r requirements.txt + +# Be sure to copy over the function itself! +# Goes last to take advantage of Docker caching. +# COPY lambda.py ./ +COPY . . + +# Points to the handler function of your lambda function +CMD ["lambda.handler"] \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/helper.py b/lambda/aws-summarization-appsync-stepfn/document_reader/helper.py new file mode 100644 index 00000000..6cdf7714 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/helper.py @@ -0,0 +1,59 @@ +from typing import Dict +import boto3 +from PyPDF2 import PdfReader + +from aws_lambda_powertools import Logger, Tracer +from s3inmemoryloader import S3FileLoaderInMemory + +logger = Logger(service="SUMMARY_DOCUMENT_READER") +tracer = Tracer(service="SUMMARY_DOCUMENT_READER") + +s3 = boto3.resource('s3') + +@tracer.capture_method +def read_file_from_s3(bucket, key): + logger.info(f"Fetching file from S3: bucket: {bucket}, key: {key}") + try: + obj = s3.Object(bucket, key) + return obj.get()["Body"].read().decode("utf-8") + except Exception as e: + logger.exception({"An error occured while attempting to read key " : key }) + return None + +@tracer.capture_method +def check_file_exists(bucket,key): + s3_client = boto3.client('s3') + try: + resp = s3_client.head_object(Bucket=bucket, Key=key) + return True + except s3_client.exceptions.ClientError as exp: + if exp.response['Error']['Code'] == '404': + logger.exception('Object doesn\'t exist') + return False + else: + logger.exception('An error occured') + return False + + +@tracer.capture_method +def get_file_transformation(transformed_asset_bucket,transformed_file_name, + input_asset_bucket,original_file_name): + response = { + 'status':'File transformation Pending', + 'name':original_file_name, + } + if (check_file_exists(transformed_asset_bucket, transformed_file_name) == False): + loader = S3FileLoaderInMemory(input_asset_bucket, original_file_name) + document_content = loader.load() + if not document_content: + response['status'] = 'Error' + response['name'] = '' + return response + encoded_string = document_content.encode("utf-8") + s3.Bucket(transformed_asset_bucket).put_object(Key=transformed_file_name, Body=encoded_string) + response['status'] = 'File transformed' + response['name'] = transformed_file_name + else: + response['status'] = 'File already exists' + + return response diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/lambda.py b/lambda/aws-summarization-appsync-stepfn/document_reader/lambda.py new file mode 100644 index 00000000..8bc1447e --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/lambda.py @@ -0,0 +1,117 @@ +import os +from helper import check_file_exists,get_file_transformation +import redis + +from update_summary_status import updateSummaryJobStatus +from aws_lambda_powertools import Logger, Tracer, Metrics +from aws_lambda_powertools.utilities.typing import LambdaContext +from aws_lambda_powertools.metrics import MetricUnit + +logger = Logger(service="SUMMARY_DOCUMENT_READER") +tracer = Tracer(service="SUMMARY_DOCUMENT_READER") +metrics = Metrics(namespace="summary_pipeline", service="SUMMARY_DOCUMENT_READER") + +transformed_bucket_name = os.environ["TRANSFORMED_ASSET_BUCKET"] +input_bucket_name = os.environ["INPUT_ASSET_BUCKET"] +is_file_tranformation_required = os.environ["IS_FILE_TRANSFORMED"] + + +@logger.inject_lambda_context(log_event=True) +@tracer.capture_lambda_handler +@metrics.log_metrics(capture_cold_start_metric=True) +def handler(event, context: LambdaContext): + + logger.info(f"{event=}") + + original_file_name = event["name"] + job_id = event["jobid"] + + response = { + "is_summary_available": False, + "summary_job_id": job_id, + "file_name": original_file_name, + "status": "Pending", + "summary": "", + "transformed_file_name":'', + } + + logger.set_correlation_id(job_id) + metrics.add_metadata(key='correlationId', value=job_id) + tracer.put_annotation(key="correlationId", value=job_id) + + filesummary = get_summary_from_cache(original_file_name) + + if filesummary is not None: + metrics.add_metric(name="summary_cache_hit",unit=MetricUnit.Count, value=1) + response.update( + { + "file_name": original_file_name, + "status": "Completed", + "summary": filesummary, + "is_summary_available": True, + } + ) + else: + metrics.add_metric(name="summary_llm_hit", unit=MetricUnit.Count, value=1) + transformed_file_name = original_file_name.replace(".pdf", "_transformed.txt") + + if(is_file_tranformation_required): + transformed_file = get_file_transformation(transformed_bucket_name, + transformed_file_name, + input_bucket_name, + original_file_name) + response.update( + { + "file_name": original_file_name, + "status": transformed_file['status'], + "summary": '', + "transformed_file_name":transformed_file_name, + "is_summary_available": False + } + ) + else: + pdf_transformed_file = check_file_exists(transformed_bucket_name, + transformed_file_name) + if not pdf_transformed_file: + response.update( + { + "file_name": original_file_name, + "status": "Error", + "summary": f"No file {transformed_file_name} available to generate the summary.", + } + ) + logger.exception({"No file {transformed_file_name} available to generate the summary."}) + return response + + + logger.info({"document reader response:": response}) + updateSummaryJobStatus({'jobid': job_id, + 'file_name':response["file_name"] + ,'status':response['status'] , + 'summary':response["summary"]}) + return response + +@tracer.capture_method +def get_summary_from_cache(file_name): + + logger.info({"Searching Redis for cached summary file: "+file_name}) + redis_host = os.environ.get("REDIS_HOST", "N/A") + redis_port = os.environ.get("REDIS_PORT", "N/A") + + logger.info({"Redis host: "+redis_host}) + logger.info({"Redis port: "+redis_port}) + + try: + redis_client = redis.Redis(host=redis_host, port=redis_port) + fileSummary = redis_client.get(file_name) + except (ValueError, redis.ConnectionError) as e: + logger.exception({"An error occured while connecting to Redis" : e}) + return + + if fileSummary: + logger.info({"File summary found in cache: ": fileSummary}) + return fileSummary.decode() + + + logger.info("File summary not found in cache, generating it from llm") + diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/requirements.txt b/lambda/aws-summarization-appsync-stepfn/document_reader/requirements.txt new file mode 100644 index 00000000..3b3e11ac --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/requirements.txt @@ -0,0 +1,8 @@ +redis +pypdf2 +langchain==0.0.309 +urllib3<2 +aws-xray-sdk +aws-lambda-powertools +requests==2.31.0 +requests-aws4auth==1.2.3 \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/s3inmemoryloader.py b/lambda/aws-summarization-appsync-stepfn/document_reader/s3inmemoryloader.py new file mode 100644 index 00000000..d1ccbeb0 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/s3inmemoryloader.py @@ -0,0 +1,49 @@ +"""Loading logic for loading documents in memory from an s3 file.""" +from typing import List +from io import BytesIO +from langchain.docstore.document import Document +from langchain.document_loaders.base import BaseLoader +from langchain.docstore.document import Document +from langchain.text_splitter import NLTKTextSplitter +from PyPDF2 import PdfReader +from aws_lambda_powertools import Logger, Tracer + +logger = Logger(service="SUMMARY_DOCUMENT_READER") +tracer = Tracer(service="SUMMARY_DOCUMENT_READER") + +@tracer.capture_method +class S3FileLoaderInMemory(BaseLoader): + """Loading logic for loading documents from s3.""" + + def __init__(self, bucket: str, key: str): + """Initialize with bucket and key name.""" + self.bucket = bucket + self.key = key + + def load(self) -> str: + """Load documents.""" + try: + import boto3 + except ImportError: + raise ImportError( + "Could not import `boto3` python package. " + "Please install it with `pip install boto3`." + ) + logger.exception('ImportError boto3') + # read S3 + try: + s3 = boto3.resource('s3') + obj = s3.Object(self.bucket, self.key) + encodedpdf = obj.get()['Body'].read() + pdfFile = PdfReader(BytesIO(encodedpdf)) + except s3.meta.client.exceptions.NoSuchBucket as exception: + logger.exception('NoSuchBucket') + return "" + except s3.meta.client.exceptions.NoSuchKey as exception: + logger.exception('NoSuchKey') + return "" + # read pdf + raw_text = [] + for page in pdfFile.pages: + raw_text.append(page.extract_text()) + return '\n'.join(raw_text) diff --git a/lambda/aws-summarization-appsync-stepfn/document_reader/update_summary_status.py b/lambda/aws-summarization-appsync-stepfn/document_reader/update_summary_status.py new file mode 100644 index 00000000..2c2baab6 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/document_reader/update_summary_status.py @@ -0,0 +1,71 @@ +import os +import requests +import json +import boto3 +from requests_aws4auth import AWS4Auth + +from aws_lambda_powertools import Logger, Tracer + +logger = Logger(service="SUMMARY_DOCUMENT_READER") +tracer = Tracer(service="SUMMARY_DOCUMENT_READER") + +aws_region = boto3.Session().region_name +credentials = boto3.Session().get_credentials() +service = 'appsync' +aws_auth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + aws_region, + service, + session_token=credentials.token, +) + +@tracer.capture_method +def get_credentials(secret_id: str, region_name: str) -> str: + + client = boto3.client('secretsmanager', region_name=region_name) + response = client.get_secret_value(SecretId=secret_id) + secrets_value = response['SecretString'] + + return secrets_value + +@tracer.capture_method +def updateSummaryJobStatus(variables): + + logger.info(f"send status variables :: {variables}") + query = """mutation updateSummaryJobStatus { + updateSummaryJobStatus(summary_job_id: \"$jobid\",file_name: \"$file_name\", status: \"$status\", summary: \"$summary\") { + summary_job_id + file_name + status + summary + } + } + """ + + query = query.replace("$jobid", variables['jobid']) + query = query.replace("$file_name", variables['file_name']) + query = query.replace("$status", variables['status']) + query = query.replace("$summary", variables['summary']) + + + # query = query.replace("\"file_name\"", "file_name") + # query = query.replace("\"status\"", "status") + query = query.replace("\n", "") + + request = {'query':query} + + logger.info({"request": request}) + + GRAPHQL_URL = os.environ['GRAPHQL_URL'] + HEADERS={ + "Content-Type": "application/json", + } + + responseJobstatus = requests.post( + json=request, + url=GRAPHQL_URL, + headers=HEADERS, + auth=aws_auth + ) + logger.info({'res :: ': responseJobstatus}) \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/input_validator/Dockerfile b/lambda/aws-summarization-appsync-stepfn/input_validator/Dockerfile new file mode 100644 index 00000000..839f2dfc --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/input_validator/Dockerfile @@ -0,0 +1,19 @@ +FROM amazon/aws-lambda-python:latest + +# Installs python, removes cache file to make things smaller +RUN yum update -y && \ + yum install -y python3 python3-dev python3-pip gcc git && \ + rm -Rf /var/cache/yum + +# Copies requirements.txt file into the container +COPY requirements.txt ./ +# Installs dependencies found in your requirements.txt file +RUN pip install -r requirements.txt + +# Be sure to copy over the function itself! +# Goes last to take advantage of Docker caching. +# COPY lambda.py ./ +COPY . . + +# Points to the handler function of your lambda function +CMD ["lambda.handler"] \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/input_validator/lambda.py b/lambda/aws-summarization-appsync-stepfn/input_validator/lambda.py new file mode 100644 index 00000000..6f7b8385 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/input_validator/lambda.py @@ -0,0 +1,70 @@ + +from aws_lambda_powertools import Logger, Tracer, Metrics +from aws_lambda_powertools.utilities.typing import LambdaContext +from aws_lambda_powertools.metrics import MetricUnit +from update_file_status import updateFileStatus + +logger = Logger(service="SUMMARY_VALIDATE_INPUT_JOB") +tracer = Tracer(service="SUMMARY_VALIDATE_INPUT_JOB") +metrics = Metrics(namespace="summary_pipeline", service="SUMMARY_INPUT_VALIDATION") + + +@logger.inject_lambda_context(log_event=True) +@tracer.capture_lambda_handler +@metrics.log_metrics(capture_cold_start_metric=True) +def handler(event, context: LambdaContext)-> dict: + summary_input = event["detail"]["summaryInput"] + job_id = summary_input['summary_job_id'] + + # Add a correlationId (tracking code). + logger.set_correlation_id(job_id) + metrics.add_metadata(key='correlationId', value=job_id) + tracer.put_annotation(key="correlationId", value=job_id) + + input_files = summary_input['files'] + + response = process_files(input_files) + + updateFileStatus({'jobid': job_id, 'files': response['files']}) + + response_transformed = add_job_id_to_response(response, job_id) + + logger.info({"response": response_transformed}) + return response_transformed + + +@tracer.capture_method +def process_files(input_files): + files_to_process = [] + valid = True + for i in range(len(input_files)): + filename = input_files[i]['name'] + status = "Unsupported" + if filename.lower().endswith(('.pdf')): + status = "Supported" + metrics.add_metric(name="SupportedFile", unit=MetricUnit.Count, value=1) + else: + logger.info("file {filename} extension is currently not supported") + metrics.add_metric(name="UnsupportedFile", unit=MetricUnit.Count, value=1) + file_to_process = { + 'status':status, + 'name':filename, + 'summary':'' + } + files_to_process.append(file_to_process) + + if not files_to_process: + valid = False + + response = { + 'isValid':valid, + 'files':files_to_process + } + + return response + +@tracer.capture_method +def add_job_id_to_response(response, job_id): + for file in response['files']: + file['jobid'] = job_id + return response \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/input_validator/requirements.txt b/lambda/aws-summarization-appsync-stepfn/input_validator/requirements.txt new file mode 100644 index 00000000..1ff0fcb0 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/input_validator/requirements.txt @@ -0,0 +1,5 @@ +aws-lambda-powertools +aws-xray-sdk +boto3>=1.28.61 +requests==2.31.0 +requests-aws4auth==1.2.3 diff --git a/lambda/aws-summarization-appsync-stepfn/input_validator/update_file_status.py b/lambda/aws-summarization-appsync-stepfn/input_validator/update_file_status.py new file mode 100644 index 00000000..5b35352b --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/input_validator/update_file_status.py @@ -0,0 +1,70 @@ +import os +import requests +import json +import boto3 +from requests_aws4auth import AWS4Auth + +from aws_lambda_powertools import Logger, Tracer + +logger = Logger(service="SUMMARY_INPUT_VALIDATION") +tracer = Tracer(service="SUMMARY_INPUT_VALIDATION") + +aws_region = boto3.Session().region_name +credentials = boto3.Session().get_credentials() +service = 'appsync' +aws_auth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + aws_region, + service, + session_token=credentials.token, +) + +@tracer.capture_method +def get_credentials(secret_id: str, region_name: str) -> str: + + client = boto3.client('secretsmanager', region_name=region_name) + response = client.get_secret_value(SecretId=secret_id) + secrets_value = response['SecretString'] + + return secrets_value + +@tracer.capture_method +def updateFileStatus(variables): + + print(f"send status variables :: {variables}") + query = """ + mutation updateFileStatus { + updateFileStatus(files: $files, summary_job_id: \"$jobid\") { + files { + name + status + summary + } + summary_job_id + } + } + """ + + query = query.replace("$jobid", str(variables['jobid'])) + query = query.replace("$files", str(variables['files']).replace("\'", "\"")) + query = query.replace("\"name\"", "name") + query = query.replace("\"status\"", "status") + query = query.replace("\"summary\"", "summary") + + request = {'query':query} + + logger.info({"request": request}) + + GRAPHQL_URL = os.environ['GRAPHQL_URL'] + HEADERS={ + "Content-Type": "application/json", + } + + responseJobstatus = requests.post( + json=request, + url=GRAPHQL_URL, + headers=HEADERS, + auth=aws_auth + ) + logger.info({'res :: ': responseJobstatus}) \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/summary_generator/Dockerfile b/lambda/aws-summarization-appsync-stepfn/summary_generator/Dockerfile new file mode 100644 index 00000000..839f2dfc --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/summary_generator/Dockerfile @@ -0,0 +1,19 @@ +FROM amazon/aws-lambda-python:latest + +# Installs python, removes cache file to make things smaller +RUN yum update -y && \ + yum install -y python3 python3-dev python3-pip gcc git && \ + rm -Rf /var/cache/yum + +# Copies requirements.txt file into the container +COPY requirements.txt ./ +# Installs dependencies found in your requirements.txt file +RUN pip install -r requirements.txt + +# Be sure to copy over the function itself! +# Goes last to take advantage of Docker caching. +# COPY lambda.py ./ +COPY . . + +# Points to the handler function of your lambda function +CMD ["lambda.handler"] \ No newline at end of file diff --git a/lambda/aws-summarization-appsync-stepfn/summary_generator/helper.py b/lambda/aws-summarization-appsync-stepfn/summary_generator/helper.py new file mode 100644 index 00000000..8edadc77 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/summary_generator/helper.py @@ -0,0 +1,31 @@ +import os +import nltk +import boto3 + +from aws_lambda_powertools import Logger, Tracer + +logger = Logger(service="SUMMARY_GENERATION") +tracer = Tracer(service="SUMMARY_GENERATION") + +def set_nltk_data(): + root = os.path.dirname(os.path.abspath(__file__)) + download_dir = os.path.join(root, "nltk_data") + nltk.data.path.append(download_dir) + + +def set_transformer_cache_dir(pathdir): + os.environ["TRANSFORMERS_CACHE"] = pathdir + + +# read text file √from s3 bucket +def read_file_from_s3(bucket, key): + try: + s3 = boto3.resource("s3") + obj = s3.Object(bucket, key) + return obj.get()["Body"].read().decode("utf-8") + except Exception as e: + logger.exception( + f"An error occured while attempting to read {key} from {bucket}.\n" + f"Reason: {e}" + ) + return None diff --git a/lambda/aws-summarization-appsync-stepfn/summary_generator/lambda.py b/lambda/aws-summarization-appsync-stepfn/summary_generator/lambda.py new file mode 100644 index 00000000..9637f08f --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/summary_generator/lambda.py @@ -0,0 +1,129 @@ +import os,boto3 +import base64 + +from langchain.llms.bedrock import Bedrock +from update_summary_status import updateSummaryJobStatus + +# external files +from langchain.docstore.document import Document +from langchain.chains.summarize import load_summarize_chain + +import redis + + +from aws_lambda_powertools import Logger, Tracer, Metrics +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger(service="SUMMARY_GENERATION") +tracer = Tracer(service="SUMMARY_GENERATION") +metrics = Metrics(namespace="summary_pipeline", service="SUMMARY_GENERATION") + + +# internal files +from helper import read_file_from_s3 + +transformed_bucket_name = os.environ["ASSET_BUCKET_NAME"] +chain_type = os.environ["SUMMARY_LLM_CHAIN_TYPE"] + +aws_region = boto3.Session().region_name +bedrock_client = boto3.client( + service_name='bedrock-runtime', + region_name=aws_region, + endpoint_url=f'https://bedrock-runtime.{aws_region}.amazonaws.com' + ) + +@logger.inject_lambda_context(log_event=True) +@tracer.capture_lambda_handler +@metrics.log_metrics(capture_cold_start_metric=True) +def handler(event, context: LambdaContext)-> dict: + logger.info("Starting summary agent with input", event) + + + job_id = event["summary_job_id"] + + logger.set_correlation_id(job_id) + metrics.add_metadata(key='correlationId', value=job_id) + tracer.put_annotation(key="correlationId", value=job_id) + + original_file_name = event["file_name"] + transformed_file_name = event["transformed_file_name"] + + # create response + response = { + "summary_job_id": job_id, + "file_name": original_file_name, + "status": "Pending", + "summary": "" + } + + + + summary_llm = Bedrock( + model_id="anthropic.claude-v2", + client=bedrock_client, + ) + + redis_host = os.environ.get("REDIS_HOST", "N/A") + redis_port = os.environ.get("REDIS_PORT", "N/A") + + if redis_host and redis_port: + logger.info("connecting to redis host...") + redis_client = redis.Redis(host=redis_host, port=redis_port) + else: + logger.info("Redis host or port not set in environment variables") + + inputFile = read_file_from_s3(transformed_bucket_name, transformed_file_name) + if inputFile is None: + response["status"] = "Failed to load file from S3" + return response + + finalsummary = generate_summary(summary_llm,chain_type,inputFile) + + llm_answer_bytes = finalsummary.encode("utf-8") + base64_bytes = base64.b64encode(llm_answer_bytes) + llm_answer_base64_string = base64_bytes.decode("utf-8") + logger.info(finalsummary) + logger.info("Summarization done") + + response.update({ + 'file_name':original_file_name, + 'status':"Completed", + 'summary':llm_answer_base64_string + } + ) + + logger.info("Saving respone in Redis :: ",response) + try: + redis_client.set(original_file_name, + llm_answer_base64_string, ex=604800) + logger.info("Saved summary in Redis") + except (ValueError, redis.ConnectionError) as e: + logger.exception( + "An error occured while trying to connect to Redis.\n" + f'Host: "{redis_host}", Port: "{redis_port}".\n' + f"Exception: {e}" + ) + updateSummaryJobStatus({'jobid': job_id, + 'file_name':response["file_name"] + ,'status':response['status'] , + 'summary':response["summary"]}) + return response + + + + +def generate_summary(_summary_llm,chain_type,inputFile)-> str: + + logger.info(f" Using chain_type as {chain_type} for the document") + docs = [Document(page_content=inputFile)] + # run LLM + # prompt = load_prompt("prompt.json") + chain = load_summarize_chain( + _summary_llm, + chain_type=chain_type, + verbose=False + ) + return chain.run(docs) + + + diff --git a/lambda/aws-summarization-appsync-stepfn/summary_generator/requirements.txt b/lambda/aws-summarization-appsync-stepfn/summary_generator/requirements.txt new file mode 100644 index 00000000..a34c9a82 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/summary_generator/requirements.txt @@ -0,0 +1,10 @@ +redis +boto3>=1.28.61 +botocore>=1.31.61 +requests==2.31.0 +requests-aws4auth==1.2.3 +langchain==0.0.309 +nltk==3.8.1 +urllib3<2 +aws-lambda-powertools +aws-xray-sdk diff --git a/lambda/aws-summarization-appsync-stepfn/summary_generator/update_summary_status.py b/lambda/aws-summarization-appsync-stepfn/summary_generator/update_summary_status.py new file mode 100644 index 00000000..35e43646 --- /dev/null +++ b/lambda/aws-summarization-appsync-stepfn/summary_generator/update_summary_status.py @@ -0,0 +1,69 @@ +import os +import requests +import json +import boto3 +from requests_aws4auth import AWS4Auth + +from aws_lambda_powertools import Logger, Tracer + +logger = Logger(service="SUMMARY_GENERATION") +tracer = Tracer(service="SUMMARY_GENERATION") + +aws_region = boto3.Session().region_name +credentials = boto3.Session().get_credentials() +service = 'appsync' +aws_auth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + aws_region, + service, + session_token=credentials.token, +) + +@tracer.capture_method +def get_credentials(secret_id: str, region_name: str) -> str: + + client = boto3.client('secretsmanager', region_name=region_name) + response = client.get_secret_value(SecretId=secret_id) + secrets_value = response['SecretString'] + + return secrets_value + +@tracer.capture_method +def updateSummaryJobStatus(variables): + + logger.info(f"send status variables :: {variables}") + query = """mutation updateSummaryJobStatus { + updateSummaryJobStatus(summary_job_id: \"$jobid\",file_name: \"$file_name\", status: \"$status\", summary: \"$summary\") { + summary_job_id + file_name + status + summary + } + } + """ + + query = query.replace("$jobid", variables['jobid']) + query = query.replace("$file_name", variables['file_name']) + query = query.replace("$status", variables['status']) + query = query.replace("$summary", variables['summary']) + + + query = query.replace("\n", "") + + request = {'query':query} + + logger.info({"request": request}) + + GRAPHQL_URL = os.environ['GRAPHQL_URL'] + HEADERS={ + "Content-Type": "application/json", + } + + responseJobstatus = requests.post( + json=request, + url=GRAPHQL_URL, + headers=HEADERS, + auth=aws_auth + ) + logger.info({'res :: ': responseJobstatus}) \ No newline at end of file diff --git a/package.json b/package.json index 6c262474..97afd2a7 100644 --- a/package.json +++ b/package.json @@ -61,16 +61,6 @@ "aws-cdk-lib": "2.99.1", "constructs": "^10.0.5" }, - "dependencies": { - "@types/deep-diff": "^1.0.3", - "@types/npmlog": "^4.1.4", - "deepmerge": "^4.3.1" - }, - "bundledDependencies": [ - "@types/deep-diff", - "@types/npmlog", - "deepmerge" - ], "keywords": [ "aws-cdk", "cdk", diff --git a/resources/gen-ai/aws-summarization-appsync-stepfn/schema.graphql b/resources/gen-ai/aws-summarization-appsync-stepfn/schema.graphql new file mode 100644 index 00000000..58bea492 --- /dev/null +++ b/resources/gen-ai/aws-summarization-appsync-stepfn/schema.graphql @@ -0,0 +1,43 @@ + +input FileStatusInput { + name: String + status: String +} + +type SummaryDocs @aws_iam @aws_cognito_user_pools { + summary_job_id: ID + file_name: String, + status: String, + summary: String +} + +input SummaryDocsInput { + summary_job_id: ID + files: [FileStatusInput] +} + +type Mutation @aws_iam @aws_cognito_user_pools { + generateSummary(summaryInput: SummaryDocsInput!): SummaryDocs + updateSummaryJobStatus(summary_job_id: ID, file_name: String, status: String, summary: String): SummaryDocs + updateFileStatus(summaryjobid: ID, files: [FileStatusInput]): SummaryDocs +} + +type Query @aws_iam @aws_cognito_user_pools { + getSummary: SummaryDocs +} + +type Subscription @aws_iam @aws_cognito_user_pools { + generateSummary(summaryInput: SummaryDocsInput!): SummaryDocs + @aws_subscribe(mutations: ["generateSummary"]) + updateSummaryJobStatus(summary_job_id: ID, file_name: String): SummaryDocs + @aws_subscribe(mutations: ["updateSummaryJobStatus"]) + updateFileStatus(summaryjobid: ID, files: [FileStatusInput]): SummaryDocs + @aws_subscribe(mutations: ["updateFileStatus"]) +} + +schema { + query: Query + mutation: Mutation + subscription: Subscription +} + diff --git a/src/common/helpers/appsyncmergedapi-helper.ts b/src/common/helpers/appsyncmergedapi-helper.ts index 988cff46..f6b210cf 100644 --- a/src/common/helpers/appsyncmergedapi-helper.ts +++ b/src/common/helpers/appsyncmergedapi-helper.ts @@ -10,121 +10,65 @@ * OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions * and limitations under the License. */ +import { Aws } from 'aws-cdk-lib'; import * as appsync from 'aws-cdk-lib/aws-appsync'; import * as iam from 'aws-cdk-lib/aws-iam'; - import { Construct } from 'constructs'; export interface AppsyncMergedApiProps { - /** - * name of merged api on appsync - * @default 'mergedApi' - */ - readonly appsyncmergedApiName: string; - /** - * cognito user pool id for appsync auth - * @default None - */ - readonly userpoolid: string; /** - * cognito authentication user pool is used as authentication Type + * Optional, existing merge api + * schema for multiple source api. + * @default None */ - readonly cognitoAuthenticationUserpool: string; + readonly existingMergeApi?: appsync.CfnGraphQLApi; /** - * AWS region + * Optional user provided appsync props + * @default - authentication type - AMAZON_COGNITO_USER_POOL + * @default - api type -MERGED + * @default - name - appsyncmergeAPI + * */ - readonly region: string; + readonly cfnGraphQLApiProps?: appsync.CfnGraphQLApiProps; /** - * AWS account + * OPTIONAL cognito user pool id for appsync auth + * @default None */ - readonly accountid: string; + readonly userPoolId?: string; /** - * appsync service principle role + * Required appsync service principle role + * @default - appsync.amazonaws.com */ readonly appsyncServicePrincipleRole: string; /** - * owner contact for merged api appsync notification - * @default None - */ - readonly appsyncMergedApiContact: string; - - /** - * Security configuration for your GraphQL API. - * Allowed values - API_KEY , AWS_IAM , AMAZON_COGNITO_USER_POOLS , OPENID_CONNECT , or AWS_LAMBDA - * @default None - */ - readonly authenticationType: string; - - /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is AWS_LAMBDA - */ - readonly authorizerResultTtlInSeconds: number; - - /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is AWS_LAMBDA - */ - readonly authorizerUri: string; - - /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is AWS_LAMBDA - */ - readonly identityValidationExpression: string; - - - /** - * Configuration for openIdConnectConfig authorization. - * Use this if APP sync authentication Type is OPENID_CONNECT - */ - readonly authTtl: number; - - /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is OPENID_CONNECT - */ - readonly clientId: string; - - /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is OPENID_CONNECT + * Optional Field log level + * @default None */ - readonly iatTtl: number; + readonly fieldLogLevel?: string; /** - * Configuration for AWS Lambda function authorization. - * Use this if APP sync authentication Type is OPENID_CONNECT + * Optional log verbose content + * @default false */ - readonly issuer: string; + readonly excludeVerboseContent?: boolean; -} - -export interface LogConfigProps { /** - * The service role that AWS AppSync assumes to publish to CloudWatch logs in your account. - * @default None + * Optional x ray enablement for app sync + * @default false */ - readonly cloudWatchLogsRoleArn: string; + readonly xRayEnabled?: boolean; /** - * Log level - * @default None + * Required mergedApiRole for app sync + * @default */ - readonly fieldLogLevel: string; - - /** - * Set to TRUE to exclude sections that contain information such as - * headers, context, and evaluated mapping templates, regardless of logging level. - * @default false - */ - readonly excludeVerboseContent: boolean; + readonly mergedApiRole: iam.Role; } /** @@ -133,92 +77,64 @@ export interface LogConfigProps { * build app sync merge api with source api associations * * @param AppsyncMergedApiProps The props to be used by the construct + * @param apiType - MERGED, GRAPHQL * @returns App sync merge api */ -export function buildMergedAPI(scope: Construct, props: AppsyncMergedApiProps, logProps: LogConfigProps) { - - let mergedapi = new appsync.CfnGraphQLApi(scope, props?.appsyncmergedApiName, { - apiType: 'MERGED', - name: props?.appsyncmergedApiName, - authenticationType: props?.authenticationType, - additionalAuthenticationProviders: [getAdditionalAuthenticationMode(props)], - logConfig: { - cloudWatchLogsRoleArn: setAppsyncCloudWatchlogsRole(scope, props).roleArn, - fieldLogLevel: logProps?.fieldLogLevel, - excludeVerboseContent: logProps.excludeVerboseContent, - }, - xrayEnabled: true, - mergedApiExecutionRoleArn: getMergedAPIRole(scope, props).roleArn, - ownerContact: props.appsyncMergedApiContact, - }); - - return mergedapi; +export function buildMergedAPI(scope: Construct, id: string, props: AppsyncMergedApiProps) { + + if (props.existingMergeApi) { + return props.existingMergeApi; + } else { + const mergeAPIname = props.cfnGraphQLApiProps?.name || 'appsyncmergeAPI'; + const apiType = props.cfnGraphQLApiProps?.apiType || 'MERGED'; + const fieldLogLevel = props?.fieldLogLevel || appsync.FieldLogLevel.NONE; + const excludeVerboseContent = props?.excludeVerboseContent || false; + const xRayEnabled = props?.xRayEnabled || false; + + let mergedApi = new appsync.CfnGraphQLApi(scope, id, { + apiType: apiType, + name: mergeAPIname, + authenticationType: props.cfnGraphQLApiProps!.authenticationType, + userPoolConfig: props.cfnGraphQLApiProps?.userPoolConfig, + additionalAuthenticationProviders: [{ + authenticationType: 'AWS_IAM', + }], + logConfig: { + cloudWatchLogsRoleArn: setAppsyncCloudWatchlogsRole(scope, props).roleArn, + fieldLogLevel: fieldLogLevel, + excludeVerboseContent: excludeVerboseContent, + }, + xrayEnabled: xRayEnabled, + mergedApiExecutionRoleArn: props.mergedApiRole.roleArn, + ownerContact: props?.cfnGraphQLApiProps!.ownerContact, + }); + return mergedApi; + } } -function getAdditionalAuthenticationMode(props: AppsyncMergedApiProps) { - - if (props.authenticationType == 'AMAZON_COGNITO_USER_POOLS') { - const additionalAuthenticationMode: appsync.CfnGraphQLApi.AdditionalAuthenticationProviderProperty = { - authenticationType: props?.authenticationType, - userPoolConfig: { - awsRegion: props?.region, - userPoolId: props?.userpoolid, - }, - }; - return additionalAuthenticationMode; - } else if (props.authenticationType == 'AWS_LAMBDA') { - const additionalAuthenticationMode: appsync.CfnGraphQLApi.AdditionalAuthenticationProviderProperty = { - authenticationType: props?.authenticationType, - lambdaAuthorizerConfig: { - authorizerResultTtlInSeconds: props?.authorizerResultTtlInSeconds, - authorizerUri: props?.authorizerUri, - identityValidationExpression: props?.identityValidationExpression, - }, +export function checkAppsyncMergedApiProps(propsObject: AppsyncMergedApiProps | any) { + let errorMessages = ''; + let errorFound = false; - }; - return additionalAuthenticationMode; - } else if (props.authenticationType == 'OPENID_CONNECT') { - const additionalAuthenticationMode: appsync.CfnGraphQLApi.AdditionalAuthenticationProviderProperty = { - authenticationType: props?.authenticationType, - openIdConnectConfig: { - authTtl: props?.authTtl, - clientId: props?.clientId, - iatTtl: props?.iatTtl, - issuer: props?.issuer, - }, - }; - return additionalAuthenticationMode; + if (propsObject.existingMergeApi && propsObject.cfnGraphQLApiProps) { + errorMessages += 'Error - Either provide existingMergeApi or cfnGraphQLApiProps, but not both.\n'; + errorFound = true; + } + if (!propsObject.existingMergeApi && !propsObject.cfnGraphQLApiProps) { + errorMessages += 'Error - Atleast one is required either existingMergeApi or cfnGraphQLApiProps.\n'; + errorFound = true; } - const additionalAuthenticationMode: appsync.CfnGraphQLApi.AdditionalAuthenticationProviderProperty = { - authenticationType: props?.authenticationType, - }; - return additionalAuthenticationMode; -} -function setAppsyncCloudWatchlogsRole(scope: Construct, props: AppsyncMergedApiProps) { - let appsynccloudWatchlogsRole = new iam.Role(scope, 'appsynccloudWatchlogsRole', { - assumedBy: new iam.ServicePrincipal(props.appsyncServicePrincipleRole), - }); - appsynccloudWatchlogsRole.addToPolicy( - new iam.PolicyStatement({ - effect: iam.Effect.ALLOW, - actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'], - resources: ['*'], - }), - ); - return appsynccloudWatchlogsRole; + if (errorFound) { + throw new Error(errorMessages); + } } -function getMergedAPIRole(scope: Construct, props: AppsyncMergedApiProps) { - return new iam.Role(scope, 'mergedapirole', { - assumedBy: new iam.ServicePrincipal(props.appsyncServicePrincipleRole), - }); -} /** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - * + * @internal This is an internal core function and should not be called directly + * by Solutions Constructs clients. * set the merge api role to access source api associations * * @param AppsyncMergedApiProps The props to be used by the construct @@ -226,17 +142,35 @@ function getMergedAPIRole(scope: Construct, props: AppsyncMergedApiProps) { * @param mergedApiRole iam role * @returns App sync merge api role */ -export function setMergedApiRole(props: AppsyncMergedApiProps, mergedAPI: appsync.CfnGraphQLApi, mergedApiRole: iam.Role ) { +export function setMergedApiRole(mergedApiID: String, sourceApiId: String, mergedApiRole: iam.Role ) { mergedApiRole.addToPolicy( new iam.PolicyStatement({ effect: iam.Effect.ALLOW, - actions: ['appsync:StartSchemaMerge'], + actions: ['appsync:SourceGraphQL', + 'appsync:StartSchemaMerge'], resources: [ - 'arn:aws:appsync:' + props.region + ':' + props.accountid + ':apis/' - + mergedAPI.attrApiId + '/sourceApiAssociations/*', + 'arn:aws:appsync:' + Aws.REGION + ':' + Aws.ACCOUNT_ID + + ':apis/' + sourceApiId + '/*', + 'arn:aws:appsync:'+ Aws.REGION+':'+Aws.ACCOUNT_ID+':apis/'+mergedApiID+'/sourceApiAssociations/*', + 'arn:aws:appsync:'+ Aws.REGION+':'+Aws.ACCOUNT_ID+':apis/'+sourceApiId+'/sourceApiAssociations/*', ], }), ); return mergedApiRole; +} + +function setAppsyncCloudWatchlogsRole(scope: Construct, props: AppsyncMergedApiProps) { + const appsyncServicePrincipleRole = props.appsyncServicePrincipleRole || 'appsync.amazonaws.com'; + let appsynccloudWatchlogsRole = new iam.Role(scope, 'appsynccloudWatchlogsRole', { + assumedBy: new iam.ServicePrincipal(appsyncServicePrincipleRole), + }); + appsynccloudWatchlogsRole.addToPolicy( + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'], + resources: ['*'], + }), + ); + return appsynccloudWatchlogsRole; } \ No newline at end of file diff --git a/src/common/helpers/eventbridge-helper.ts b/src/common/helpers/eventbridge-helper.ts index be1d1abb..d6ef3c65 100644 --- a/src/common/helpers/eventbridge-helper.ts +++ b/src/common/helpers/eventbridge-helper.ts @@ -15,17 +15,18 @@ import { Construct } from 'constructs'; export interface buildEventBridgeProps { /** - * Existing instance of SNS Topic object, providing both this and `topicProps` will cause an error. + * Optional Existing instance of SNS Topic object, providing both this and `topicProps` will cause an error. * * @default - None. */ readonly existingEventBusInterface?: events.IEventBus; /** - * Event bus to receive the request - * @default 'eventbus' + * Optional user provided event bus props + * + * @default - Default props are used. */ - readonly eventBusName: string; + readonly eventBusProps?: events.EventBusProps; } @@ -33,9 +34,8 @@ export function buildEventBus(scope: Construct, props: buildEventBridgeProps) { if (props.existingEventBusInterface) { return props.existingEventBusInterface; } else { - return new events.EventBus(scope, props.eventBusName, { - eventBusName: props.eventBusName, - }); + const eventBusName = props.eventBusProps?.eventBusName || 'customEventBus'; + return new events.EventBus(scope, eventBusName, props.eventBusProps); } } diff --git a/src/common/helpers/redis-helper.ts b/src/common/helpers/redis-helper.ts index ebe3cea9..6c5f7b90 100644 --- a/src/common/helpers/redis-helper.ts +++ b/src/common/helpers/redis-helper.ts @@ -13,86 +13,130 @@ import * as ec2 from 'aws-cdk-lib/aws-ec2'; import * as elasticache from 'aws-cdk-lib/aws-elasticache'; import { Construct } from 'constructs'; -import * as vpc_helper from './vpc-helper'; export interface RedisProps { /** - * Existing instance of a VPC, if this is set then the all Props are ignored, + * Required. Existing instance of a VPC, if this is set then the all Props are ignored, * if this is not set then deafultVPC Props are used. */ - readonly existingVpc?: ec2.IVpc; + readonly existingVpc: ec2.IVpc; - // /** - // * security group for lambda - // * @default 'lambdaSecurityGroup' - // */ - // readonly lambdaSecurityGroup: ec2.ISecurityGroup; + /** + * Optional cfnCacheClusterProps + * @default cacheNodeType - 'cache.r6g.xlarge' + * @default numCacheNodes- 1 + */ + readonly cfnCacheClusterProps?: elasticache.CfnCacheClusterProps; + + /** + * Optional. Existing Redis cluster to cache the generated summary + * for subsequent request of same document. + * + * @default - none + */ + readonly existingRedisCulster?: elasticache.CfnCacheCluster; /** - * name of redis Security Group + * Optional .name of redis Security Group * @default 'redisSecurityGroup' */ - readonly redisSecurityGroupname: string; + readonly redisSecurityGroupname?: string; /** - * cache node type - * @default 'cache.r6g.xlarge' + * Required. redis Security Group + * */ - readonly cacheNodeType: string; + readonly redisSecurityGroup: ec2.SecurityGroup; /** - * redis cluster name - * @default 'redisCluster' + * Required. list of subnet Ids + * @default None */ - readonly redisclustername: string; + readonly subnetIds: string []; + + /** + * Required. redis Subnet Group Id + * @default redisSubnetGroup + */ + readonly redisSubnetGroupId: string; + + /** + * Required. lambda security group which will acces the redis cluster + * + */ + readonly inboundSecurityGroup: ec2.ISecurityGroup; + + /** + * Optional. redis port number + * @default redisPort + */ + readonly redisPort?: number; + + } -/** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - * - * build redis cluster to cache the results - * - * @param RedisProps The default props to be used by the construct - * @returns redis cluster instance. - * - */ + export function buildRedisCluster(scope: Construct, props: RedisProps): elasticache.CfnCacheCluster { - const redisclustername = props.redisclustername || 'redisCluster'; - const cacheNodeType = props.cacheNodeType || 'cache.r6g.xlarge'; + + const redisclustername = props.cfnCacheClusterProps?.clusterName || 'redisCluster'; + const cacheNodeType = props.cfnCacheClusterProps?.cacheNodeType || 'cache.r6g.xlarge'; + const engine = props.cfnCacheClusterProps?.engine || 'redis'; + const numCacheNodes = props.cfnCacheClusterProps?.numCacheNodes || 1; let redisCulster = new elasticache.CfnCacheCluster(scope, 'redisCluster', { clusterName: redisclustername, cacheNodeType: cacheNodeType, - engine: 'redis', - numCacheNodes: 1, + engine: engine, + numCacheNodes: numCacheNodes, cacheSubnetGroupName: getRedisSubnetGroup(scope, props).ref, - vpcSecurityGroupIds: [getRedisSecurityGroup(scope, props).securityGroupId], + vpcSecurityGroupIds: [props.redisSecurityGroup!.securityGroupId], }); - //this.redisHost = redisCluster.attrRedisEndpointAddress; - //this.redisPort = redisCluster.attrRedisEndpointPort; return redisCulster; } // get redis subnet group from existing vpc function getRedisSubnetGroup(scope: Construct, props: RedisProps): elasticache.CfnSubnetGroup { - let redisSubnetGroup = new elasticache.CfnSubnetGroup(scope, 'redisSubnetGroup', { - description: 'redis subnet group', - subnetIds: vpc_helper.getPrivateSubnetIDs(props?.existingVpc!), + let redisSubnetGroup = new elasticache.CfnSubnetGroup(scope, props.redisSubnetGroupId, { + description: 'Redis subnet group', + subnetIds: props.subnetIds, }); return redisSubnetGroup; } // get redis security group from existing vpc -function getRedisSecurityGroup(scope: Construct, props: RedisProps): ec2.SecurityGroup { - let redisSecurityGroup = new ec2.SecurityGroup(scope, 'redisSecurityGroup', { - vpc: props?.existingVpc!, +export function getRedisSecurityGroup(scope: Construct, + props: RedisProps | any, redisSecurityGroupname: string ): ec2.SecurityGroup { + const redisSecurityGroupName = props.redisSecurityGroupname || 'redisSecuritygroup'; + let redisSecurityGroup = new ec2.SecurityGroup(scope, redisSecurityGroupname, { + vpc: props.existingVpc, allowAllOutbound: true, description: 'security group for elasticache', - securityGroupName: props?.redisSecurityGroupname, + securityGroupName: redisSecurityGroupName, }); + return redisSecurityGroup; } +export function setInboundRules(redisSecurityGroup:ec2.SecurityGroup, + sourceSecuritygroup:ec2.ISecurityGroup ) { + redisSecurityGroup.connections.allowFrom(sourceSecuritygroup, + ec2.Port.tcp(6379)); +} + + +export function CheckRedisClusterProps(propsObject: RedisProps | any) { + let errorMessages = ''; + let errorFound = false; + + if (propsObject.existingRedisCulster && propsObject.cfnCacheClusterProps) { + errorMessages += 'Error - Either provide existingRedisCulster or cfnCacheClusterProps, but not both.\n'; + errorFound = true; + } + + if (errorFound) { + throw new Error(errorMessages); + } +} diff --git a/src/common/helpers/s3-bucket-helper.ts b/src/common/helpers/s3-bucket-helper.ts index 6fc9e52c..b7d59730 100644 --- a/src/common/helpers/s3-bucket-helper.ts +++ b/src/common/helpers/s3-bucket-helper.ts @@ -13,6 +13,7 @@ import * as s3 from 'aws-cdk-lib/aws-s3'; + export interface S3Props { readonly existingBucketObj?: s3.Bucket; readonly existingBucketInterface?: s3.IBucket; @@ -26,27 +27,34 @@ export function CheckS3Props(propsObject: S3Props | any) { let errorMessages = ''; let errorFound = false; + console.log('ncheck s3 props1'+ propsObject); if ((propsObject.existingBucketObj || propsObject.existingBucketInterface) && propsObject.bucketProps) { errorMessages += 'Error - Either provide bucketProps or existingBucketObj, but not both.\n'; errorFound = true; } + console.log('ncheck s3 props2'+ propsObject); if (propsObject.existingLoggingBucketObj && propsObject.loggingBucketProps) { errorMessages += 'Error - Either provide existingLoggingBucketObj or loggingBucketProps, but not both.\n'; errorFound = true; } + console.log('ncheck s3 props3'+ propsObject); if ((propsObject?.logS3AccessLogs === false) && (propsObject.loggingBucketProps || propsObject.existingLoggingBucketObj)) { errorMessages += 'Error - If logS3AccessLogs is false, supplying loggingBucketProps or existingLoggingBucketObj is invalid.\n'; errorFound = true; } + console.log('ncheck s3 props4'+ propsObject); if (propsObject.existingBucketObj && (propsObject.loggingBucketProps || propsObject.logS3AccessLogs)) { errorMessages += 'Error - If existingBucketObj is provided, supplying loggingBucketProps or logS3AccessLogs is an error.\n'; errorFound = true; } + console.log('is error found '+errorFound); if (errorFound) { throw new Error(errorMessages); } -} \ No newline at end of file +} + + diff --git a/src/common/helpers/utils.ts b/src/common/helpers/utils.ts deleted file mode 100644 index f0209f96..00000000 --- a/src/common/helpers/utils.ts +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance - * with the License. A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES - * OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ -import * as deepdiff from 'deep-diff'; -import * as deepmerge from 'deepmerge'; -//import * as log from 'npmlog'; - -function isObject(val: object) { - return val != null && typeof val === 'object' - && Object.prototype.toString.call(val) === '[object Object]'; -} - -function isPlainObject(o: object) { - if (Array.isArray(o) === true) { - return true; - } - - if (isObject(o) === false) { - return false; - } - - // If has modified constructor - const ctor = o.constructor; - if (typeof ctor !== 'function') { - return false; - } - - // If has modified prototype - const prot = ctor.prototype; - if (isObject(prot) === false) { - return false; - } - - // If constructor does not have an Object-specific method - if (prot.hasOwnProperty('isPrototypeOf') === false) { - return false; - } - - // Most likely a plain Object - return true; -} - -/** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - * - * Creates the props to be used to instantiate a CDK L2 construct within a Solutions Construct - * - * @param defaultProps The default props to be used by the construct - * @param clientProps Optional properties passed in from the client in the props object - * @param constructProps Optional properties required by the construct for the construct to work (override any other values) - * @returns The properties to use - all values prioritized: - * 1) constructProps value - * 2) clientProps value - * 3) defaultProps value - */ -export function consolidateProps(defaultProps: object, clientProps?: object, constructProps?: object, concatArray: boolean = false): any { - let result: object = defaultProps; - - if (clientProps) { - result = overrideProps(result, clientProps, concatArray); - } - - if (constructProps) { - result = overrideProps(result, constructProps, concatArray); - } - - return result; -} - -/** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - */ -function overrideProps(DefaultProps: object, userProps: object, concatArray: boolean = false): any { - // Notify the user via console output if defaults are overridden - const overrideWarningsEnabled = (process.env.overrideWarningsEnabled !== 'false'); - if (overrideWarningsEnabled) { - flagOverriddenDefaults(DefaultProps, userProps); - } - // Override the sensible defaults with user provided props - if (concatArray) { - return deepmerge(DefaultProps, userProps, { - arrayMerge: (destinationArray, sourceArray) => destinationArray.concat(sourceArray), - isMergeableObject: isPlainObject, - }); - } else { - return deepmerge(DefaultProps, userProps, { - arrayMerge: (_destinationArray, sourceArray) => sourceArray, // underscore allows arg to be ignored - isMergeableObject: isPlainObject, - }); - } -} - -/** The prefilter function returns true for any filtered path/key that should be excluded from the diff check. - * Any Construct Props using cdk.Duration type is not properly handled by - * 'deep-diff' library, whenever it encounters a Duration object, it throws the exception - * 'Argument to Intrinsic must be a plain value object', so such props are excluded from the diff check. - */ -function _prefilter(_path: any[], _key: string): boolean { - const prefilters = ['maxRecordAge', 'expiration', 'transitionAfter', 'destination', 'timeout', 'period']; - - if (prefilters.indexOf(_key) >= 0) { - return true; - } - return false; -} - -/** - * Performs a diff check of the userProps against the defaultProps to detect overridden properties. - * @param {object} defaultProps the prescriptive defaults for the pattern. - * @param {object} userProps the properties provided by the user, to be compared against the defaultProps. - * @return {Array} an array containing the overridden values. - */ -function findOverrides(defaultProps: object, userProps: object) { - const diff = deepdiff.diff(defaultProps, userProps, _prefilter); - // Filter the results - return (diff !== undefined) ? diff?.filter((e:any) => ( - e.kind === 'E' && // only return overrides - !e.path?.includes('node') && // stop traversing once the object graph hits the node - !e.path?.includes('bind') // stop traversing once the object graph hits internal functions - )) : []; -} - -/** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - * - * Emits a warning to the console when a prescriptive default value is overridden by the user. - * @param {object} defaultProps the prescriptive defaults for the pattern. - * @param {object} userProps the properties provided by the user, to be compared against the defaultProps. - */ -export function flagOverriddenDefaults(defaultProps: object, userProps: object) { - // Compare the properties and return any overrides - const overrides = findOverrides(defaultProps, userProps); - // Emit a warning for each override - for (let i = 0; i < ((overrides !== undefined) ? overrides.length : 0); i++) { - const e = Object.assign(overrides[i]); - // Determine appropriate logging granularity - const valuesAreReadable = ( - checkReadability(e.lhs) && - checkReadability(e.rhs) - ); - // Format the path for readability - const path = formatOverridePath(e.path); - // Output - const details = (valuesAreReadable) ? ` Default value: '${e.lhs}'. You provided: '${e.rhs}'.` : ''; - printWarning(`An override has been provided for the property: ${path}.` + details); - } -} -/** - * @internal This is an internal core function and should not be called directly by Solutions Constructs clients. - */ -function printWarning(message: string) { - // Style the log output - //log.prefixStyle.bold = true; - //log.prefixStyle.fg = 'red'; - //log.enableColor(); - console.log('AWS_SOLUTIONS_CONSTRUCTS_WARNING: ', message); -} - -/** - * Converts the path object from the deep-diff module to a user-readable, bracket notation format. - * @param {string | string[]} path either a string value or an array of strings. - * @return {string} the formatted override path. - */ -function formatOverridePath(path: string | string[]) { - return (path !== undefined && path.length > 1) ? path.toString() - .replace(/,/g, '][') - .replace(/\]/, '') - .replace(/$/, ']') : path; -} -/** - * Check the readability of an input value and, in the context of the override warning service, return true if it - * meets the established criteria. This function is used to determine whether more-detailed log output can be given. - * @param {any} value input to be evaluated against the given criteria. - * @return {boolean} true if the value meets the given criteria. - * @return {boolean} false if the value does not meet the given criteria. - */ -function checkReadability(value: any) { - return ( - typeof(value) === 'string' && // strings only - !value.includes('${Token[') && // no circular refs - value !== '' // no empty strings - ); -} diff --git a/src/common/helpers/vpc-helper.ts b/src/common/helpers/vpc-helper.ts index f497b466..680e33bc 100644 --- a/src/common/helpers/vpc-helper.ts +++ b/src/common/helpers/vpc-helper.ts @@ -12,7 +12,6 @@ */ import * as ec2 from 'aws-cdk-lib/aws-ec2'; import { Construct } from 'constructs'; -import { consolidateProps } from './utils'; export interface VpcPropsSet { @@ -67,7 +66,7 @@ export function buildVpc(scope: Construct, props: BuildVpcProps): ec2.IVpc { // Merge props provided by construct builder and by the end user // If user provided props are empty, the vpc will use only the builder provided props - cumulativeProps = consolidateProps(cumulativeProps, props?.userVpcProps, props?.constructVpcProps); + //cumulativeProps = consolidateProps(cumulativeProps, props?.userVpcProps, props?.constructVpcProps); const vpc = new ec2.Vpc(scope, 'Vpc', cumulativeProps); return vpc; diff --git a/src/index.ts b/src/index.ts index ef7d0c3c..b4366a8a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,5 +11,6 @@ * and limitations under the License. */ export * from './patterns/gen-ai/aws-rag-appsync-stepfn-opensearch'; -export * from './patterns/gen-ai/aws-qa-appsync-opensearch'; +export * from './patterns/gen-ai/aws-summarization-appsync-stepfn'; export * from './patterns/gen-ai/aws-langchain-common-layer'; +export * from './patterns/gen-ai/aws-qa-appsync-opensearch'; diff --git a/src/patterns/gen-ai/aws-summarization-appsync-stepfn/README.md b/src/patterns/gen-ai/aws-summarization-appsync-stepfn/README.md index 8b84016e..439c674d 100644 --- a/src/patterns/gen-ai/aws-summarization-appsync-stepfn/README.md +++ b/src/patterns/gen-ai/aws-summarization-appsync-stepfn/README.md @@ -22,7 +22,7 @@ ## Overview -This construct provides a gen-ai summarization implementation using AWS Appsync and AWS Step function. +This construct provides a gen-ai summarization implementation using AWS Appsync ,Amazon EventBridge, AWS Step function and AWS Lambda. Here is a minimal deployable pattern definition: @@ -51,45 +51,59 @@ Parameters | **Name** | **Type** | **Required** |**Description** | |:-------------|:----------------|-----------------|-----------------| -| redisHost | string | ![Required](https://img.shields.io/badge/required-ff0000) | Amazon ElastiCache for Redis host to cache the summary. | -| redisPort | string| ![Required](https://img.shields.io/badge/required-ff0000) | Amazon ElastiCache for Redis Port to cache the summary. | -| openSearchIndexName | string | ![Required](https://img.shields.io/badge/required-ff0000) | Domain endpoint for the OpenSearch Service. | -| cognitoUserPool | [cognito.IUserPool](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_cognito.IUserPool.html) | ![Required](https://img.shields.io/badge/required-ff0000) | Cognito user pool used for authentication. | -| vpcProps | [ec2.VpcProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.VpcProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Custom properties for a VPC the construct will create. This VPC will be used by the Lambda functions the construct creates. Providing both this and existingVpc is an error. | -| existingVpc | [ec2.IVpc](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.IVpc.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | An existing VPC in which to deploy the construct. Providing both this and vpcProps is an error. | -| existingSecurityGroup | [ec2.ISecurityGroup](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.ISecurityGroup.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing security group allowing access to opensearch. Used by the lambda functions built by this construct. If not provided, the construct will create one. | -| existingIngestionBusInterface | [events.IEventBus](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.IEventBus.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing instance of an Eventbridge bus. If not provided, the construct will create one. | -| existingInputAssetsBucketObj | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.IBucket.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing instance of S3 Bucket object, providing both this and `bucketInputsAssetsProps` will cause an error. | -| bucketInputsAssetsProps | [s3.BucketProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.BucketProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | User provided props to override the default props for the S3 Bucket. Providing both this and `existingInputAssetsBucketObj` will cause an error. | -| existingProcessedAssetsBucketObj | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.IBucket.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing instance of S3 Bucket object, providing both this and `bucketProcessedAssetsProps` will cause an error. | -| bucketProcessedAssetsProps | [s3.BucketProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.BucketProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | User provided props to override the default props for the S3 Bucket. Providing both this and `existingProcessedAssetsBucketObj` will cause an error. | -| stage | string | ![Optional](https://img.shields.io/badge/optional-4169E1) | Value will be appended to resources name Service. | -| mergedApiGraphQL | string | ![Optional](https://img.shields.io/badge/optional-4169E1) | URL endpoint of the appsync merged api | +| userPoolId | string | ![Required](https://img.shields.io/badge/required-ff0000) | Amazon Cognito user pool id for AWS Appsync authentication and authorization. | +| userVpcProps | [ec2.VpcProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.VpcProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | The construct creates a custom VPC based on userVpcProps. Providing both this and existingVpc is an error. | +| existingVpc | [ec2.IVpc](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.IVpc.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | An existing VPC can be used to deploy the construct.| +| existingRedisCulster | [elasticache.CfnCacheCluster](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_elasticache.CfnCacheClusterProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing Redis cluster to cache the generated summary for subsequent request of same document. | +| cfnCacheClusterProps | [elasticache.CfnCacheClusterProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_elasticache.CfnCacheClusterProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Properties defining cfnCacheClusterProps. If there is no existing redis cluster cfnCacheClusterProps can be used to create a new cluster| +| existingSecurityGroup | [ec2.SecurityGroup](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.SecurityGroup.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Security group for the lambda function which this construct will use. If no exisiting security group is provided it will create one from the vpc.| +| existingInputAssetsBucket | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing s3 Bucket to store the input document which needs to be summarized. pdf is the supported input document format. If transformed (txt format) file is available then this bucket is optional. | +| bucketInputsAssetsProps | [s3.BucketProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.BucketProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | User provided props to override the default props for the S3 Bucket.Providing both this and `existingInputAssetsBucketObj` will cause an error.| +| isFileTransformationRequired | [string] | ![Optional](https://img.shields.io/badge/optional-4169E1) | The summary construct transform the input document into txt format. If the transformation is not required then this flag can be set to false. If set to true then a transformed asset bucket is created which transform the input document from input asset bucket to txt format.| +| existingTransformedAssetsBucket | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | This bucket stores the transformed (txt) assets for generating summary.If None is provided then this contruct will create one.| +| bucketTransformedAssetsProps | [s3.BucketProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.BucketProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | User provided props to override the default props for the S3 Bucket.Providing both this and `existingTransformedAssetsBucket` will cause an error.| +| existingEventBusInterface | [events.IEventBus](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.IEventBus.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing instance of EventBus. The summary construct integrate appsync with event bridge' to route the request to step functions.| +| eventBusProps | [events.EventBusProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.EventBusProps.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | A new custom EventBus is created with provided props. Providing existingEventBusInterface and eventBusProps both will result in validation error.| +| existingMergeApi | [appsync.CfnGraphQLApi](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.CfnGraphQLApi.html) | ![Required](https://img.shields.io/badge/required-ff0000) | Existing merge api instance. This construct create a merge API to support multiple modalities with different source APIs. The merge API provode a fedeareted schema over source API schemas.| +| summaryApiName | [string] | ![Optional](https://img.shields.io/badge/optional-4169E1) | User provided Name for summary api on appsync.A graphql api will be created by this construct with this name.| +| logConfig | [appsync.LogConfig](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.LogConfig.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Logging configuration for AppSync. | +| xrayEnabled | [boolean] | ![Optional](https://img.shields.io/badge/optional-4169E1) | Enable AWS Xray for appsync | +| summaryChainType | [string] | ![Optional](https://img.shields.io/badge/optional-4169E1) | Chain type defines how to pass the document to LLM. there are three types of chain types. Stuff: Simply "stuff" all your documents into a single prompt. Map-reduce: Summarize each document on it's own in a "map" step and then "reduce" the summaries into a final summary Refine : This constructs a response by looping over the input documents and iteratively updating its answer. | ## Pattern Properties | **Name** | **Type** | **Description** | |:-------------|:----------------|-----------------| -| vpc | [ec2.IVpc](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.IVpc.html) | The VPC used by the construct (whether created by the construct or providedb by the client) | -| securityGroup | [ec2.ISecurityGroup](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.ISecurityGroup.html) | The VPC used by the construct (whether created by the construct or providedb by the client) | -| ingestionBus | [events.IEventBus](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.IEventBus.html) | The VPC used by the construct (whether created by the construct or providedb by the client) | -| s3InputAssetsBucketInterface | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.IBucket.html) | Returns an instance of s3.IBucket created by the construct | -| s3InputAssetsBucket | [s3.Bucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) | Returns an instance of s3.Bucket created by the construct. IMPORTANT: If existingInputAssetsBucketObj was provided in Pattern Construct Props, this property will be undefined | -| s3ProcessedAssetsBucketInterface | [s3.IBucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.IBucket.html) | Returns an instance of s3.IBucket created by the construct | -| s3ProcessedAssetsBucket | [s3.Bucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) | Returns an instance of s3.IBucket created by the construct. IMPORTANT: If existingProcessedAssetsBucketObj was provided in Pattern Construct Props, this property will be undefined | -| graphqlApi| [appsync.IGraphqlApi](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.GraphqlApi.html) | Returns an instance of appsync.IGraphqlApi created by the construct | -| stateMachine| [StateMachine](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions.StateMachine.html) | Returns an instance of appsync.IGraphqlApi created by the construct | +| eventBridgeBus | [events.IEventBus](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.IEventBus.html) | An instance of events.IEventBus created by the construct | +| mergeApi | [appsync.CfnGraphQLApi](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.CfnGraphQLApi.html) | Instance of appsync.CfnGraphQLApi for merge api created by the construct | +| summaryGraphqlApi | [appsync.IGraphqlApi](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.IGraphqlApi.html) | Instance of appsync.CfnGraphQLApi for summary created by the construct| +| redisCluster | [elasticache.CfnCacheCluster](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_elasticache.CfnCacheClusterProps.html) | Instance of redis cluster created by the construct | +| vpc | [ec2.IVpc](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.IVpc.html) |Returns the instance of ec2.ISecurityGroup used by the construct | +| securityGroup | [ec2.ISecurityGroup](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ec2.ISecurityGroup.html) | Returns the instance of ec2.ISecurityGroup used by the construct. | +| inputAssetBucket | [s3.Bucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) | Instance of s3.IBucket used by the construct | +| processedAssetBucket | [s3.Bucket](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_s3.Bucket.html) |Instance of s3.IBucket used by the construct| +| logConfig | [appsync.LogConfig](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_appsync.LogConfig.html)|Instance of s3.IBucket used by the construct| ## Default properties Out of the box implementation of the Construct without any override will set the following defaults: +### VPC +- Sets up vpc to deploy the contruct + +### Amazon ElastiCache for Redis +- Sets up amazon elastic cache for redis cluster. + ### Appsync +- Sets up AWS Appsync merge api + - Associate the source api with merge api using 'AUTO-MERGE' ### Amazon S3 Buckets - Sets up two Amazon S3 Buckets - Uses existing buckets if provided, otherwise creates new ones +- If isFileTransformationRequired is set to False then +only one bucket is created for inout assets. diff --git a/src/patterns/gen-ai/aws-summarization-appsync-stepfn/index.ts b/src/patterns/gen-ai/aws-summarization-appsync-stepfn/index.ts new file mode 100644 index 00000000..71076b24 --- /dev/null +++ b/src/patterns/gen-ai/aws-summarization-appsync-stepfn/index.ts @@ -0,0 +1,707 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance + * with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES + * OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +import * as path from 'path'; +import * as cdk from 'aws-cdk-lib'; +import * as appsync from 'aws-cdk-lib/aws-appsync'; +import * as cognito from 'aws-cdk-lib/aws-cognito'; +import * as ec2 from 'aws-cdk-lib/aws-ec2'; +import * as elasticache from 'aws-cdk-lib/aws-elasticache'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as targets from 'aws-cdk-lib/aws-events-targets'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as lambdaFunction from 'aws-cdk-lib/aws-lambda'; +import * as logs from 'aws-cdk-lib/aws-logs'; +import * as s3 from 'aws-cdk-lib/aws-s3'; +import * as sqs from 'aws-cdk-lib/aws-sqs'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as sfnTask from 'aws-cdk-lib/aws-stepfunctions-tasks'; +import { Construct } from 'constructs'; +import * as appsyncMergedApi from '../../../common/helpers/appsyncmergedapi-helper'; +import * as eventBridge from '../../../common/helpers/eventbridge-helper'; +import * as redisHelper from '../../../common/helpers/redis-helper'; +import * as s3BucketHelper from '../../../common/helpers/s3-bucket-helper'; +import * as vpcHelper from '../../../common/helpers/vpc-helper'; + +export interface SummarizationAppsyncStepfnProps { + /** + * Optional. The construct creates a custom VPC based on userVpcProps. + * Providing both this and existingVpc is an error. + * + * @default - none + */ + readonly userVpcProps?: ec2.VpcProps; + + /** + * Optional. An existing VPC can be used to deploy the construct. + * Providing both this and vpcProps is an error. + * + * @default - none + */ + readonly existingVpc?: ec2.IVpc; + + /** + * Optional. Existing Redis cluster to cache the generated summary + * for subsequent request of same document. + * + * @default - none + */ + readonly existingRedisCulster?: elasticache.CfnCacheCluster; + + + /** + * Optional. Custom cfnCacheClusterProps for Redis. + * Providing existingRedisCulster and cfnCacheClusterProps together will result in error. + * @default cacheNodeType - 'cache.r6g.xlarge' + * @default numCacheNodes- 1 + */ + readonly cfnCacheClusterProps?: elasticache.CfnCacheClusterProps; + + /** + * Optional. Security group for the lambda function which this construct will use. + * If no exisiting security group is provided it will create one from the vpc. + * @default - none + */ + readonly existingSecurityGroup?: ec2.SecurityGroup; + + /** + * Required. This construct use Cognito Auth for Appsync authorization. + * User pool id is required with authentication type of AMAZON_COGNITO_USER_POOLS. + * @default None + */ + readonly userPoolId: string; + + + /** + * Optional. Existing s3 Bucket to store the input document which needs to be summarized. + * pdf is the supported input document format. If transformed (txt format) file is + * available then this bucket is optional. + * + * @default - None + */ + readonly existingInputAssetsBucket?: s3.IBucket; + + /** + * Optional. User provided props to override the default props for the S3 Bucket. + * Providing both this and `existingInputAssetsBucketObj` will cause an error. + * + * @default - Default props are used + */ + readonly bucketInputsAssetsProps?: s3.BucketProps; + + /** + * Optional. The summary construct transform the input document into txt format. If the + * transformation is not required then this flag can be set to false. If set to true + * then a transformed asset bucket is created which transform the input document from + * input asset bucket to txt format. + * + * @default - False + */ + readonly isFileTransformationRequired?: string; + + /** + * Optional. This bucket stores the transformed (txt) assets for generating summary. + * If None is provided then this contruct will create one. + * @default - None + */ + readonly existingTransformedAssetsBucket?: s3.IBucket; + + + /** + * Optional. User provided props to override the default props for the S3 Bucket. + * Providing both this and `existingTransformedAssetsBucket` will cause an error. + * + * @default - Default props are used + */ + readonly bucketTransformedAssetsProps?: s3.BucketProps; + + /** + * Optional. Existing instance of EventBus. The summary construct integrate appsync with event bridge' + * to route the request to step functions. + * + * @default - None + */ + readonly existingEventBusInterface?: events.IEventBus; + + /** + * Optional. A new custom EventBus is created with provided props. + * Providing existingEventBusInterface and eventBusProps both will result in validation error. + * + * @default - None + */ + readonly eventBusProps?: events.EventBusProps; + + /** + * Required. Existing merge api instance. This construct create a merge API to support + * multiple modalities with different source APIs. The merge API provode a fedeareted schema over source API schemas. + * + * @default None + */ + readonly existingMergeApi: appsync.CfnGraphQLApi; + + /** + * Optional. User provided Name for summary api on appsync. + * A graphql api will be created by this construct with this name. + * @default 'summaryApi' + */ + readonly summaryApiName?: string; + + + /** + * Optional. Logging configuration for AppSync + * @default - fieldLogLevel - None + */ + readonly logConfig?: appsync.LogConfig; + + /** + * Optional. xray enablement for AppSync + * @default - false + */ + readonly xrayEnabled?: boolean; + + /** + * Optional. Chain type defines how to pass the document to LLM. + * there are three types of chain types. + * Stuff: Simply "stuff" all your documents into a single prompt. + * Map-reduce: Summarize each document on it's own in a "map" step and then "reduce" the summaries into a final summary + * Refine : This constructs a response by looping over the input documents and iteratively updating its answer + * @default - Stuff + */ + readonly summaryChainType?: string; + + /** + * Value will be appended to resources name. + * + * @default - _dev + */ + readonly stage?: string; +} + +export class SummarizationAppsyncStepfn extends Construct { + /** + * Returns an instance of events.IEventBus created by the construct + */ + public readonly eventBridgeBus: events.IEventBus | undefined; + /** + * Returns an instance of appsync.CfnGraphQLApi for merge api created by the construct + */ + public readonly mergeApi: appsync.CfnGraphQLApi | undefined; + + /** + * Returns an instance of appsync.CfnGraphQLApi for summary created by the construct + */ + public readonly summaryGraphqlApi: appsync.IGraphqlApi | undefined; + + /** + * Returns an instance of redis cluster created by the construct + */ + public readonly redisCluster: elasticache.CfnCacheCluster | undefined; + + /** + * Returns the instance of ec2.IVpc used by the construct + */ + public readonly vpc: ec2.IVpc; + /** + * Returns the instance of ec2.ISecurityGroup used by the construct + */ + public readonly securityGroup: ec2.ISecurityGroup; + + /** + * Returns the instance of s3.IBucket used by the construct + */ + public readonly inputAssetBucket: s3.IBucket | undefined; + + /** + * Returns the instance of s3.IBucket used by the construct + */ + public readonly processedAssetBucket: s3.IBucket | undefined; + + /** + * Logging configuration for AppSync + * @default - fieldLogLevel - None + */ + public readonly logConfig?: appsync.LogConfig; + + + /** + * @summary Constructs a new instance of the SummarizationAppsyncStepfn class. + * @param {Construct} scope - represents the scope for all the resources. + * @param {string} id - this is a a scope-unique id. + * @param {SummarizationAppsyncStepfnProps} props - user provided props for the construct. + * @since 0.0.0 + * @access public + */ + constructor(scope: Construct, id: string, props: SummarizationAppsyncStepfnProps) { + super(scope, id); + + let stage = '-dev'; + if (props?.stage) { + stage = props.stage; + } + + // vpc + if (props?.existingVpc) { + this.vpc = props.existingVpc; + } else { + this.vpc = new ec2.Vpc(this, 'Vpc', props.userVpcProps); + } + // Security group + if (props?.existingSecurityGroup) { + this.securityGroup = props.existingSecurityGroup; + } else { + this.securityGroup = new ec2.SecurityGroup( + this, + 'securityGroup', + { + vpc: this.vpc, + allowAllOutbound: true, + securityGroupName: 'securityGroup'+stage, + }, + ); + } + // bucket for input document + s3BucketHelper.CheckS3Props({ + existingBucketObj: props.existingInputAssetsBucket, + bucketProps: props.bucketInputsAssetsProps, + }); + + if (props?.existingInputAssetsBucket) { + this.inputAssetBucket = props.existingInputAssetsBucket; + } else if (props?.bucketInputsAssetsProps) { + this.inputAssetBucket = new s3.Bucket(this, + 'inputAssetsBucket'+stage, props.bucketInputsAssetsProps); + } else { + const bucketName= 'input-assets-bucket'+stage+'-'+cdk.Aws.ACCOUNT_ID; + this.inputAssetBucket = new s3.Bucket(this, 'inputAssetsBucket'+stage, + { + blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, + encryption: s3.BucketEncryption.S3_MANAGED, + bucketName: bucketName, + }); + } + + // bucket for transformed document + s3BucketHelper.CheckS3Props({ + existingBucketObj: props.existingTransformedAssetsBucket, + bucketProps: props.bucketTransformedAssetsProps, + }); + + if (props?.existingTransformedAssetsBucket) { + this.processedAssetBucket = props.existingTransformedAssetsBucket; + } else if (props?.bucketTransformedAssetsProps) { + this.processedAssetBucket = new s3.Bucket(this, + 'processedAssetsBucket'+stage, props.bucketTransformedAssetsProps); + } else { + const bucketName= 'processed-assets-bucket'+stage+'-'+cdk.Aws.ACCOUNT_ID; + + this.processedAssetBucket = new s3.Bucket(this, 'processedAssetsBucket'+stage, + { + blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, + encryption: s3.BucketEncryption.S3_MANAGED, + bucketName: bucketName, + }); + } + + // set up redis cluster + redisHelper.CheckRedisClusterProps(props); + + + // build redis cluster only when cfnCacheClusterProps is set + if (props?.cfnCacheClusterProps) { + const redisSecurityGroupname= 'redisSCgroup'+stage; + const redisSecurityGroup =redisHelper.getRedisSecurityGroup(this, props, redisSecurityGroupname); + this.redisCluster = redisHelper.buildRedisCluster(this, { + existingVpc: this.vpc, + cfnCacheClusterProps: props.cfnCacheClusterProps, + redisSecurityGroupname: redisSecurityGroupname, + subnetIds: vpcHelper.getPrivateSubnetIDs(this.vpc), + redisSubnetGroupId: 'redisSecuritygroup'+stage, + inboundSecurityGroup: this.securityGroup, + redisSecurityGroup: redisSecurityGroup, + }); + redisHelper.setInboundRules(redisSecurityGroup, this.securityGroup); + } else { + this.redisCluster= props?.existingRedisCulster; + } + + const redisHost = this.redisCluster?.attrRedisEndpointAddress!; + const redisPort = this.redisCluster?.attrRedisEndpointPort!; + + + eventBridge.CheckEventBridgeProps(props); + // Create event bridge + this.eventBridgeBus = eventBridge.buildEventBus(this, { + existingEventBusInterface: props.existingEventBusInterface, + eventBusProps: props.eventBusProps, + }); + + + appsyncMergedApi.checkAppsyncMergedApiProps(props); + + const appsyncServicePrincipleRoleName = 'appsync.amazonaws.com'; + + const mergeApiRole = new iam.Role(this, 'mergedapirole'+stage, { + assumedBy: new iam.ServicePrincipal(appsyncServicePrincipleRoleName), + }); + + + this.mergeApi = props.existingMergeApi; + + const mergeApiId = this.mergeApi.attrApiId; + const mergeapiurl = this.mergeApi.attrGraphQlUrl; + + // cognito auth for app sync + const cognitoUserPool = cognito.UserPool.fromUserPoolId(this, + 'cognitoUserPool'+stage, props.userPoolId); + + // make it generic for other auth config as well + const authorizationConfig: appsync.AuthorizationConfig = { + defaultAuthorization: { + authorizationType: appsync.AuthorizationType.USER_POOL, + userPoolConfig: { userPool: cognitoUserPool }, + }, + additionalAuthorizationModes: [ + { + authorizationType: appsync.AuthorizationType.IAM, + }, + ], + }; + + + const isXrayEnabled= props?.xrayEnabled || false; + const apiName = props.summaryApiName || 'summaryApi'; + + + if (props?.logConfig) { + this.logConfig = props.logConfig; + } else { + this.logConfig= { + fieldLogLevel: appsync.FieldLogLevel.NONE, + }; + } + + // graphql api for summary. client invoke this api with given schema and cognito user pool auth. + const summarizationGraphqlApi = new appsync.GraphqlApi(this, 'summarizationGraphqlApi'+stage, + { + name: apiName+stage, + logConfig: this.logConfig, + schema: appsync.SchemaFile.fromAsset(path.join(__dirname, '../../../../resources/gen-ai/aws-summarization-appsync-stepfn/schema.graphql')), + authorizationConfig: authorizationConfig, + xrayEnabled: isXrayEnabled, + }); + this.summaryGraphqlApi= summarizationGraphqlApi; + + // If the user provides a mergedApi endpoint, the lambda + // functions will use this endpoint to send their status updates + + const updateGraphQlApiId = !mergeApiId ? summarizationGraphqlApi.apiId : mergeApiId; + + + // associate source api with merge api + const sourceApiAssociationConfigProperty: + appsync.CfnSourceApiAssociation.SourceApiAssociationConfigProperty = { + mergeType: 'AUTO_MERGE', + }; + + const sourceApiAssociation = new appsync.CfnSourceApiAssociation(this, + 'sourceApiAssociations'+stage, + { + mergedApiIdentifier: mergeApiId, + sourceApiAssociationConfig: sourceApiAssociationConfigProperty, + sourceApiIdentifier: summarizationGraphqlApi.apiId, + }); + + sourceApiAssociation.node.addDependency(summarizationGraphqlApi); + + // update merge api role with access + appsyncMergedApi.setMergedApiRole(mergeApiId, summarizationGraphqlApi.apiId, mergeApiRole); + + // Lambda function to validate Input + const inputValidatorLambda = + new lambdaFunction.DockerImageFunction(this, 'inputValidatorLambda'+stage, + { + code: lambdaFunction.DockerImageCode.fromImageAsset(path.join(__dirname, '../../../../lambda/aws-summarization-appsync-stepfn/input_validator')), + functionName: 'summary_input_validator'+stage, + description: 'Lambda function to validate input for summary api', + vpc: this.vpc, + tracing: lambdaFunction.Tracing.ACTIVE, + vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }, + securityGroups: [this.securityGroup], + memorySize: 1_769 * 1, + timeout: cdk.Duration.minutes(5), + environment: { + GRAPHQL_URL: !mergeapiurl ? summarizationGraphqlApi.graphqlUrl : mergeapiurl, + }, + }); + + + const transformedAssetBucketName = this.processedAssetBucket.bucketName; + const inputAssetBucketName = this.inputAssetBucket.bucketName; + const isFileTransformationRequired = props?.isFileTransformationRequired || 'false'; + + const documentReaderLambda = new lambdaFunction.DockerImageFunction(this, 'documentReaderLambda'+stage, { + code: lambdaFunction.DockerImageCode.fromImageAsset(path.join(__dirname, '../../../../lambda/aws-summarization-appsync-stepfn/document_reader')), + functionName: 'summary_document_reader'+stage, + description: 'Lambda function to read the input transformed document', + vpc: this.vpc, + vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }, + securityGroups: [this.securityGroup], + memorySize: 1_769 * 1, + tracing: lambdaFunction.Tracing.ACTIVE, + timeout: cdk.Duration.minutes(5), + environment: { + REDIS_HOST: redisHost, + REDIS_PORT: redisPort, + TRANSFORMED_ASSET_BUCKET: transformedAssetBucketName, + INPUT_ASSET_BUCKET: inputAssetBucketName, + IS_FILE_TRANSFORMED: isFileTransformationRequired, + GRAPHQL_URL: !mergeapiurl ? summarizationGraphqlApi.graphqlUrl : mergeapiurl, + + }, + }); + + const summaryChainType = props?.summaryChainType || 'stuff'; + + const generateSummarylambda = new lambdaFunction.DockerImageFunction(this, 'generateSummarylambda'+stage, { + functionName: 'summary_generator'+stage, + description: 'Lambda function to generate the summary', + code: lambdaFunction.DockerImageCode.fromImageAsset(path.join(__dirname, '../../../../lambda/aws-summarization-appsync-stepfn/summary_generator')), + vpc: this.vpc, + vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }, + securityGroups: [this.securityGroup], + memorySize: 1_769 * 4, + timeout: cdk.Duration.minutes(10), + environment: { + REDIS_HOST: redisHost, + REDIS_PORT: redisPort, + ASSET_BUCKET_NAME: transformedAssetBucketName, + GRAPHQL_URL: !mergeapiurl ? summarizationGraphqlApi.graphqlUrl : mergeapiurl, + SUMMARY_LLM_CHAIN_TYPE: summaryChainType, + TRANSFORMERS_CACHE: '/tmp', + }, + }); + + this.inputAssetBucket?.grantRead(generateSummarylambda); + this.processedAssetBucket?.grantReadWrite(generateSummarylambda); + this.inputAssetBucket?.grantRead(documentReaderLambda); + this.processedAssetBucket?.grantReadWrite(documentReaderLambda); + + + documentReaderLambda.addToRolePolicy( + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: ['s3:GetObject', + 's3:GetBucketLocation', + 's3:ListBucket', + 's3:PutObject', + 'appsync:GraphQL'], + resources: ['arn:aws:s3:::' + inputAssetBucketName + '/*', + 'arn:aws:s3:::' + transformedAssetBucketName + '/*', + 'arn:aws:appsync:'+cdk.Aws.REGION+':'+cdk.Aws.ACCOUNT_ID+':apis/'+updateGraphQlApiId+'/*'], + }), + ); + + generateSummarylambda.addToRolePolicy( + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: ['s3:GetObject', + 's3:GetBucketLocation', + 's3:ListBucket', + 's3:PutObject', + 'appsync:GraphQL', + 'bedrock:*'], + resources: ['arn:aws:s3:::' + inputAssetBucketName + '/*', + 'arn:aws:s3:::' + transformedAssetBucketName + '/*', + 'arn:aws:appsync:'+cdk.Aws.REGION+':'+cdk.Aws.ACCOUNT_ID+':apis/'+updateGraphQlApiId+'/*' + , '*'], + + }), + ); + + + inputValidatorLambda.addToRolePolicy( + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: ['s3:GetObject', + 's3:GetBucketLocation', + 's3:ListBucket', + 'appsync:GraphQL'], + + resources: ['arn:aws:s3:::' + inputAssetBucketName + '/*', + 'arn:aws:s3:::' + transformedAssetBucketName + '/*', + 'arn:aws:appsync:'+cdk.Aws.REGION+':'+cdk.Aws.ACCOUNT_ID+':apis/'+updateGraphQlApiId+'/*'], + }), + ); + + // create datasource at appsync + const SummaryStatusDataSource = new appsync.NoneDataSource + (this, 'noneDataSource'+stage, { + api: summarizationGraphqlApi, + name: 'SummaryStatusDataSource', + }); + + SummaryStatusDataSource.createResolver + ('summaryResponseresolver'+stage, { + typeName: 'Mutation', + fieldName: 'updateSummaryJobStatus', + requestMappingTemplate: appsync.MappingTemplate.fromString( + '{ "version": "2017-02-28", "payload": $util.toJson($context.args) ', + ), + + responseMappingTemplate: appsync.MappingTemplate.fromString( + '$util.toJson($context.result)'), + }); + + // step functions + const inputValidationTask = new sfnTask.LambdaInvoke(this, 'Validate Input ', { + lambdaFunction: inputValidatorLambda, + resultPath: '$.validation_result', + }); + + const documentReaderTask = new sfnTask.LambdaInvoke(this, 'Read document and check summary in cache ', { + lambdaFunction: documentReaderLambda, + resultPath: '$.document_result', + }); + + + const generateSummaryTask = new sfnTask.LambdaInvoke(this, 'Generate Summary with llm', { + lambdaFunction: generateSummarylambda, + resultPath: '$.summary_result', + }); + + const dlq: sqs.Queue = new sqs.Queue(this, 'dlq', { + queueName: 'summarydlq'+stage, + retentionPeriod: cdk.Duration.days(7), + }); + + + const publishValidationFailureMessage = new sfnTask.EventBridgePutEvents( + this, + 'publish validation failure message', + { + entries: [ + { + detail: sfn.TaskInput.fromObject({ + 'summary.$': '$.summary', + 'summaryjobstatus.$': '$.status', + 'summary_job_id.$': '$.summary_job_id', + 'filename.$': '$.filename', + }), + eventBus: this.eventBridgeBus, + detailType: 'MessageFromStepFunctions', + source: 'step.functions', + }, + ], + }, + ); + + const jobSuccess= new sfn.Succeed(this, 'succeeded', { + comment: 'AWS summary Job succeeded', + }); + + // step function choice steps + const validateInputChoice = new sfn.Choice(this, 'is Valid Parameters?', { + outputPath: '$.validation_result.Payload.files', + }); + + const summaryfromCacheChoice = new sfn.Choice(this, 'is Summary in Cache?', { + outputPath: '$.document_result.Payload', + }); + + + // step function, run files in parallel + const runFilesInparallel = new sfn.Map(this, 'Run Files in Parallel', { + maxConcurrency: 100, + }).iterator( + documentReaderTask.next( + summaryfromCacheChoice + .when( + sfn.Condition.booleanEquals('$.document_result.Payload.is_summary_available', true), + jobSuccess, + ).otherwise( + generateSummaryTask.next(jobSuccess), + ), + ), + ); + + const summarizationLogGroup = new logs.LogGroup(this, 'summarizationLogGroup', {}); + + // step function definition + const definition = inputValidationTask.next( + validateInputChoice + .when( + sfn.Condition.booleanEquals('$.validation_result.Payload.isValid', false), + publishValidationFailureMessage, + ) + .otherwise(runFilesInparallel), + ); + + // step function + + const summarizationStepFunction = new sfn.StateMachine(this, 'summarizationStepFunction', { + definition, + timeout: cdk.Duration.minutes(15), + logs: { + destination: summarizationLogGroup, + level: sfn.LogLevel.ALL, + }, + tracingEnabled: true, + }); + + // event bridge datasource for summarization api + const eventBridgeDataSource = summarizationGraphqlApi.addEventBridgeDataSource( + 'eventBridgeDataSource', + this.eventBridgeBus, + ); + + this.eventBridgeBus.grantPutEventsTo(eventBridgeDataSource.grantPrincipal); + + // add resolver on summary graphql schema + eventBridgeDataSource.createResolver('generateSummary', { + typeName: 'Mutation', + fieldName: 'generateSummary', + requestMappingTemplate: appsync.MappingTemplate.fromString( + ` + { + "version": "2018-05-29", + "operation": "PutEvents", + "events": [{ + "source": "summary", + "detail": $util.toJson($context.arguments), + "detailType": "genAIdemo" + } + ] + } + `, + ), + + responseMappingTemplate: appsync.MappingTemplate.fromString( + '#if($ctx.error)$utilerror($ctx.error.message, $ctx.error.type, $ctx.result) #end $util.toJson($ctx.result)', + ), + }); + + new events.Rule(this, 'SummaryMutationRule', { + description: 'Summary Mutation Rule', + eventBus: this.eventBridgeBus, + eventPattern: { + source: ['summary'], + }, + targets: [ + new targets.SfnStateMachine(summarizationStepFunction, { + deadLetterQueue: dlq, + retryAttempts: 1, + }), + ], + }); + + } +} diff --git a/yarn.lock b/yarn.lock index 240d53ad..0c60af0f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -944,11 +944,6 @@ dependencies: "@babel/types" "^7.20.7" -"@types/deep-diff@^1.0.3": - version "1.0.3" - resolved "https://registry.yarnpkg.com/@types/deep-diff/-/deep-diff-1.0.3.tgz#70f3974d2567d484f260dc459cdb70cefa2e1bf9" - integrity sha512-/UKphL2AFqEhbbBf/x65UcVratlCRbDkPYbf23tNykDSPo4Aw/V/NS464JFCeK4KusBBPkWL8DJsQoRIJcY0vw== - "@types/graceful-fs@^4.1.3": version "4.1.7" resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.7.tgz#30443a2e64fd51113bc3e2ba0914d47109695e2a" @@ -1020,11 +1015,6 @@ resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.2.tgz#9b0e3e8533fe5024ad32d6637eb9589988b6fdca" integrity sha512-lqa4UEhhv/2sjjIQgjX8B+RBjj47eo0mzGasklVJ78UKGQY1r0VpB9XHDaZZO9qzEFDdy4MrXLuEaSmPrPSe/A== -"@types/npmlog@^4.1.4": - version "4.1.4" - resolved "https://registry.yarnpkg.com/@types/npmlog/-/npmlog-4.1.4.tgz#30eb872153c7ead3e8688c476054ddca004115f6" - integrity sha512-WKG4gTr8przEZBiJ5r3s8ZIAoMXNbOgQ+j/d5O4X3x6kZJRLNvyUJuUK/KoG3+8BaOHPhp2m7WC6JKKeovDSzQ== - "@types/semver@^7.5.0": version "7.5.3" resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.5.3.tgz#9a726e116beb26c24f1ccd6850201e1246122e04" @@ -2116,7 +2106,7 @@ deep-is@^0.1.3: resolved "https://registry.yarnpkg.com/deep-is/-/deep-is-0.1.4.tgz#a6f2dce612fadd2ef1f519b73551f17e85199831" integrity sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ== -deepmerge@^4.2.2, deepmerge@^4.3.1: +deepmerge@^4.2.2: version "4.3.1" resolved "https://registry.yarnpkg.com/deepmerge/-/deepmerge-4.3.1.tgz#44b5f2147cd3b00d4b56137685966f26fd25dd4a" integrity sha512-3sUqbMEc77XqpdNO7FRyRog+eW3ph+GYCbj+rK+uYyRMuwsVy0rMiVtPn+QJlKFvWP/1PYpapqYn0Me2knFn+A==