-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathdocproc.py
103 lines (72 loc) · 3.25 KB
/
docproc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import json
import os
from helper import FileHelper, AwsHelper
def postMessage(client, qUrl, jsonMessage):
message = json.dumps(jsonMessage)
client.send_message(
QueueUrl=qUrl,
MessageBody=message
)
print("Submitted message to queue: {}".format(message))
def processRequest(request):
output = ""
print("request: {}".format(request))
documentId = request["documentId"]
bucketName = request["bucketName"]
objectName = request["objectName"]
print("Input Object: {}/{}".format(bucketName, objectName))
ext = FileHelper.getFileExtenstion(objectName.lower())
print("Extension: {}".format(ext))
if(ext and ext in ["jpg", "jpeg", "png"]):
qUrl = request['syncQueueUrl']
elif (ext in ["pdf"]):
qUrl = request['asyncQueueUrl']
if(qUrl):
features = ["Text", "Forms", "Tables"]
jsonMessage = { 'documentId' : documentId,
"features" : features,
'bucketName': bucketName,
'objectName' : objectName }
client = AwsHelper().getClient('sqs')
postMessage(client, qUrl, jsonMessage)
output = "Completed routing for documentId: {}, object: {}/{}".format(documentId, bucketName, objectName)
print(output)
def processRecord(record, syncQueueUrl, asyncQueueUrl):
newImage = record["dynamodb"]["NewImage"]
documentId = None
bucketName = None
objectName = None
documentStatus = None
if("documentId" in newImage and "S" in newImage["documentId"]):
documentId = newImage["documentId"]["S"]
if("bucketName" in newImage and "S" in newImage["bucketName"]):
bucketName = newImage["bucketName"]["S"]
if("objectName" in newImage and "S" in newImage["objectName"]):
objectName = newImage["objectName"]["S"]
if("documentStatus" in newImage and "S" in newImage["documentStatus"]):
documentStatus = newImage["documentStatus"]["S"]
print("DocumentId: {}, BucketName: {}, ObjectName: {}, DocumentStatus: {}".format(documentId, bucketName, objectName, documentStatus))
if(documentId and bucketName and objectName and documentStatus):
request = {}
request["documentId"] = documentId
request["bucketName"] = bucketName
request["objectName"] = objectName
request['syncQueueUrl'] = syncQueueUrl
request['asyncQueueUrl'] = asyncQueueUrl
processRequest(request)
def lambda_handler(event, context):
try:
print("event: {}".format(event))
syncQueueUrl = os.environ['SYNC_QUEUE_URL']
asyncQueueUrl = os.environ['ASYNC_QUEUE_URL']
if("Records" in event and event["Records"]):
for record in event["Records"]:
try:
print("Processing record: {}".format(record))
if("eventName" in record and record["eventName"] == "INSERT"):
if("dynamodb" in record and record["dynamodb"] and "NewImage" in record["dynamodb"]):
processRecord(record, syncQueueUrl, asyncQueueUrl)
except Exception as e:
print("Faild to process record. Exception: {}".format(e))
except Exception as e:
print("Failed to process records. Exception: {}".format(e))