Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IonLoader is returning 'invalid_state' for the kinesis stream event as QLDB table changes.(Inserting a record) #123

Open
sontalra opened this issue Jun 23, 2020 · 7 comments

Comments

@sontalra
Copy link

sontalra commented Jun 23, 2020

Hi Team,

I'm writing a lambda for kinesis event processing on the changes to QLDB table.
Similar to https://github.com/aws-samples/amazon-qldb-streams-dmv-sample-lambda-python

I'm using "Amazon.QLDB.Driver" Version="1.0.0-rc.1" which has a dependency on Amazon.IonDotnet 1.0.0.

My Lambda is receiving the kinesis event as:
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "xxxxx", "sequenceNumber": "xxxxx", "data": "CkA3N2I1MTNlYzc2N2U0ZjRiMWM4Y2QzZWE1ZTg1NjI2MmViNGUzMjQwZGExMDE1YWM3NWRhYTE1MTQ3Yzk2NTczCkAyY2Q0NzE3MjdkNDMyMTJhZDdiZjVhMmQ4MjZiZTI4ODViYzVhZWI2NGU2ODQ0ZTE1OTkxY2EwMTkwOGJkODQ1CkAwNmQ4MGNiOTc5M2E3NjE0NTcwNTdmZjM0NzM3YTUwNjczMGRiNmY4ZDBhNTZlMzNkZGMzMWE1ZjQ2ODEyZWE2GgUIGgUBAQEBcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vdHJhbnNhY3Rpb25JZGJsb2NrVGltZXN0YW1wYmxvY2tIYXNoZW50cmllc0hhc2hwcmV2aW91c0Jsb2NrSGFzaGVudHJpZXNIYXNoTGlzdHRyYW5zYWN0aW9uSW5mb3N0YXRlbWVudHNzdGF0ZW1lbnRzdGFydFRpbWVzdGF0ZW1lbnREaWdlc3QDYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeEJMT0NLX1NVTU1BUlkCOXhTeHc5aE1SWGFJZHQwT1l0VDRZeCIBFzZycTU5YkJMN0YyMmlQaUM3dHhDWDNrDwJedxN2fk9LHF5iYk4yQBAVdVFHZXNnVTE+NlhJBhY+TV56ZEMKMmctNhh+CWFvVS83ZVQyCQNBJnFKHlpuN1IMCAoPPh8kMFZFawM9VWJwS3xxFT9pIHhcVn9vMGVTRUxFQ1QgKiBGUk9NIGluZm9ybWF0aW9uX3NjaGVtYS51c2VyX3RhYmxlc2sPAjw4IgkRbBIHdBIXOEoaCAgBGggBAgICcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vdHJhbnNhY3Rpb25JZGJsb2NrVGltZXN0YW1wYmxvY2tIYXNoZW50cmllc0hhc2hwcmV2aW91c0Jsb2NrSGFzaGVudHJpZXNIYXNoTGlzdHRyYW5zYWN0aW9uSW5mb3N0YXRlbWVudHNzdGF0ZW1lbnRzdGFydFRpbWVzdGF0ZW1lbnREaWdlc3Rkb2N1bWVudHMwYkdJM3ZoRkZwYkVaSlhGaXNscHJDdGFibGVOYW1ldGFibGVJZHJldmlzaW9uU3VtbWFyaWVzaGFzaGRvY3VtZW50SWQFYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeEJMT0NLX1NVTU1BUlkEOXhTeHc5aE1SWGFJZHQwT1l0VDRZeCIBGDZycTU5YkJMM3ZxRlVIV2YyMjRycjhrDwIscXJ9QyEqWi1rW05oRFkBRXxPX1lQKzB9elkdNRcyMkt3E3Z+T0scXmJiTjJAEBV1UUdlcwEeHHxBHxtyfiAbZUBYKGZLD2xOZgYMeTp2FFcFf0c3BnMKbjMaX0YuRkoHKXE8SjElcAN5elIIEgdOFBY/fi5cdSZbAQEBAQpJTlNFUlQgSU5UTyBWaXJ0dWFsQWNjb3VudFRyYW5zYWN0aW9ucyB7ICdWaXJ0dWFsQWNjb3VudElkJzogJ2lkNScsICdBbW91bnQnOiAxMCwgJ1RyYW5zYWN0aW9uRGF0ZSc6ICcyMDIwLTA2LTAyVDE2OjMwOjU1LjA1NFonIH0Kaw8CPUZLFHBOLB9AdEk1UyRyeFZpcnR1YWxBY2NvdW50VHJhbnNhY3Rpb25zMlpncjN2NDdTOUdEc0FkOHN0RExROSAGDHk6dhRXBX9HNwZzCm4zGl9GLjBiR0kzdmhGRnBiRVpKWEZpc2xwckMaBAgCGgQBAQEBcWxkYlN0cmVhbUFybnJlY29yZFR5cGVwYXlsb2FkdGFibGVJbmZvdGFibGVOYW1ldGFibGVJZHJldmlzaW9uYmxvY2tBZGRyZXNzc3RyYW5kSWRzZXF1ZW5jZU5vaGFzaGRhdGFWaXJ0dWFsQWNjb3VudElkQW1vdW50VHJhbnNhY3Rpb25EYXRlbWV0YWRhdGFpZHR4VGltZXR4SWQCYXJuOmF3czpxbGRiOmFwLXNvdXRoZWFzdC0yOnh4eHh4eDpzdHJlYW0vcG9jLWxlZGdlci1wb2MtMjQwNS1sZWRnZXIvNTlOYWlYVjF2WE0ya1pPc3VhdUtyeFJFVklTSU9OX0RFVEFJTFMBVmlydHVhbEFjY291bnRUcmFuc2FjdGlvbnMyWmdyM3Y0N1M5R0RzQWQ4c3RETFE5ATl4U3h3OWhNUlhhSWR0ME9ZdFQ0WXgiARgGDHk6dhRXBX9HNwZzCm4zGl9GLmlkNSEKMjAyMC0wNi0wMlQxNjozMDo1NS4wNTRaMGJHSTN2aEZGcGJFWkpYRmlzbHByQyBrDwI2cnE1OWJCTDN2cUZVSFdmMjI0cnI4MhpIFXF+OQ==", "approximateArrivalTimestamp": 1592877325.432 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-xxxx", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::xxx:role/TransactionNotifierLambdaRole", "awsRegion": "ap-southeast-2", "eventSourceARN": "arn:aws:kinesis:ap-southeast-2:xxx:stream/TransactionNotificationStream" } ] }

When I am trying to load the datagram using the IonLoader with decoded Kinesis.Data using base64, I'm getting below error:

Invalid state: IonException at Amazon.IonDotnet.Internals.Text.RawTextReader.ParseNext() at Amazon.IonDotnet.Internals.Text.RawTextReader.HasNext() at Amazon.IonDotnet.Internals.Text.UserTextReader.HasNext() at Amazon.IonDotnet.Internals.Text.RawTextReader.MoveNext() at Amazon.IonDotnet.Internals.PrivateIonWriterBase.WriteValues(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.WriteDatagram(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.Load(Byte[] data) at Serverless.Handlers.TransactionEventHandler.Add(Stream stream, ILambdaContext contexts) in /home/runner/work/poc-ledger/poc-ledger/Serverless/Handlers/TransactionEventHandler.cs:line 52 at lambda_method(Closure , Stream , Stream , LambdaContextInternal )

The code is as below:
var decodedData = Convert.FromBase64String(record.Kinesis.Data);
var datagram = IonLoader.WithReaderOptions(new ReaderOptions { Format = ReaderFormat.Detect, Encoding = Encoding.ASCII }).Load(decodedData);

And if I dont decode the Kinesis.Data and pass the string directly to loader I'm getting below error:

Numeric value [84] followed by invalid character: 109: FormatException at Amazon.IonDotnet.Internals.Text.TextScanner.FinishLoadNumber(StringBuilder numericText, Int32 c, Int32 token) at Amazon.IonDotnet.Internals.Text.TextScanner.LoadNumber(StringBuilder valueBuffer) at Amazon.IonDotnet.Internals.Text.RawTextReader.LoadTokenContents(Int32 scannerToken) at Amazon.IonDotnet.Internals.Text.RawTextReader.MoveNext() at Amazon.IonDotnet.Internals.PrivateIonWriterBase.WriteValues(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.WriteDatagram(IIonReader reader) at Amazon.IonDotnet.Builders.IonLoader.Load(String ionText) at Serverless.Handlers.TransactionEventHandler.Add(Stream stream, ILambdaContext contexts) in /home/runner/work/poc-ledger/poc-ledger/Serverless/Handlers/TransactionEventHandler.cs:line 49 at lambda_method(Closure , Stream , Stream , LambdaContextInternal )

I'm not sure how shall I decode the data from kinesis stream using this sdk. Any help really appreciated.

Note: I have used UTF8 decoding as well.

@tgregg
Copy link
Contributor

tgregg commented Jun 25, 2020

Here's what I get when I decode the base64 data and interpret it as a UTF-8 string:

qldbStreamArnrecordTypepayloadblockAddressstrandIdsequenceNotransactionIdblockTimestampblockHashentriesHashpreviousBlockHashentriesHashListtransactionInfostatementsstatementstartTimestatementDigest�arn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxBLOCK_SUMMARY�9xSxw9hMRXaIdt0OYtT4Yx"��6rq59bBL7F22iPiC7txCX3k��^w�v~OK�^bbN2@��uQGesgU1>6XI��>M^zdC
2g-6�~	aoU/7eT2	�A&qJ�Zn7R
�>�$0VEk�=UbpK|q�?i x\V�o0eSELECT * FROM information_schema.user_tablesk��<8"	�l��t��8�����qldbStreamArnrecordTypepayloadblockAddressstrandIdsequenceNotransactionIdblockTimestampblockHashentriesHashpreviousBlockHashentriesHashListtransactionInfostatementsstatementstartTimestatementDigestdocuments0bGI3vhFFpbEZJXFislprCtableNametableIdrevisionSummarieshashdocumentId�arn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxBLOCK_SUMMARY�9xSxw9hMRXaIdt0OYtT4Yx"��6rq59bBL3vqFUHWf224rr8k��,qr}C!*Z-k[NhDY�E|O_YP+0}zY�5�22Kw�v~OK�^bbN2@��uQGes���|A��r~ �e@X(fK�lNf�y:v�W��G7�s
n3�_F.FJ�)q<J1%p�yz��N��?~.\u&[����
INSERT INTO VirtualAccountTransactions { 'VirtualAccountId': 'id5', 'Amount': 10, 'TransactionDate': '2020-06-02T16:30:55.054Z' }
k��=FK�pN,�@tI5S$rxVirtualAccountTransactions2Zgr3v47S9GDsAd8stDLQ9 �y:v�W��G7�s
n3�_F.0bGI3vhFFpbEZJXFislprC��������qldbStreamArnrecordTypepayloadtableInfotableNametableIdrevisionblockAddressstrandIdsequenceNohashdataVirtualAccountIdAmountTransactionDatemetadataidtxTimetxId�arn:aws:qldb:ap-southeast-2:xxxxxx:stream/poc-ledger-poc-2405-ledger/59NaiXV1vXM2kZOsuauKrxREVISION_DETAILS�VirtualAccountTransactions2Zgr3v47S9GDsAd8stDLQ9�9xSxw9hMRXaIdt0OYtT4Yx"���y:v�W��G7�s
n3�_F.id5!

This is not Ion text or Ion binary, hence the failures raised by the Ion parser.

@sontalra
Copy link
Author

Hi @tgregg
Thanks for your reply.

The data is received as is from the logs when QLDB stream(changes to QLDB table) raises a Kinesis Event.
As per AWS documentation the data format is Amazon ION.
https://docs.aws.amazon.com/qldb/latest/developerguide/streams.records.html

I have just created a stream in Amazon Console and expect the data to be in Ion format. There is no manipulation of data anywhere.

@tgregg
Copy link
Contributor

tgregg commented Jun 26, 2020

Unfortunately, this blob of data is not in the Ion format.

I recommend reaching out to the QLDB support team. They'll be better able to answer questions about the data emitted by QLDB.

@varunhasteer
Copy link

@sontalra Were you able to fix this issue? I am working on a similar pipeline and facing the exact same issue.

@juniortads
Copy link

Hey @sontalra QLDB delivers stream records to Kinesis as binary-encoded Ion objects and you need to use the Kinesis data stream directly with the Load function. Similar to this:

var ionDatagram = IonLoader.Default.Load(record.Kinesis.Data);
var ionData = ionDatagram.GetElementAt(0);
                
 //take the field record type from the body
Console.WriteLine($"record Type: {ionData.GetField("recordType").StringValue}");

Let me know if this works for you.

@ashritbista
Copy link

ashritbista commented Apr 3, 2023

Was there any resolution found for this? We are facing a similar issue, in our case, the stream is parsed directly into Ion values as @juniortads suggested, but our Lambda is showing intermittent behavior. The Ion Loader is able to load sometimes, but fails to load the other time. Our Kinesis Stream is set to only receive from QLDB stream, so it's guaranteed that the data in kinesis stream is a QLDB stream.

It seems like QLDB stream is sending non-Ion value data through the Kinesis stream, the doc for QLDB states that the stream is ion binary data, but for some reason, that doesn't seem to be true.

@varunhasteer
Copy link

varunhasteer commented Apr 3, 2023

@ashritbista Below worked for me in Spark:

kinesis = (
spark.readStream.format("kinesis")
.option("streamName", "your kinesis stream name")
.option("region", "your AWS region")
.option("roleArn", your role arn)
.option("initialPosition", "earliest")
.load()
)

Below UDF converts Binary ION stream to Json string

def binary_json_encode(payload):
ion_record = ion.load(BytesIO(payload))
return json.dumps(ion_record, cls=IonToJSONEncoder)

convert_UDF_bin2json = udf(lambda z: binary_json_encode(z), StringType())

json_df = kinesis.withColumn("json_data", convert_UDF_bin2json(kinesis["data"]))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants