From 72b43fce4c0818d8c1f9034d4245e2d868cfcdb8 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 8 Jun 2023 20:50:41 +0200 Subject: [PATCH] feat: offer arrange invocation receipt --- packages/core/arrange-offers.ts | 29 +++++++++- packages/core/package.json | 1 + packages/functions/src/data/offer-arrange.ts | 52 ++++++++++++++++++ packages/functions/src/utils.ts | 7 +++ pnpm-lock.yaml | 3 + stacks/DataStack.ts | 58 ++++++++++++++++++-- 6 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 packages/functions/src/data/offer-arrange.ts diff --git a/packages/core/arrange-offers.ts b/packages/core/arrange-offers.ts index 770fe3a..a4eda4c 100644 --- a/packages/core/arrange-offers.ts +++ b/packages/core/arrange-offers.ts @@ -1,5 +1,8 @@ import all from 'it-all' import * as AggregateAPI from '@web3-storage/aggregate-api' +import * as Offer from '@web3-storage/capabilities/offer' +import { Signer } from '@ucanto/interface' +import { Message, Receipt } from '@ucanto/core' import { ArrangedOfferStore, ArrangedOffer } from './tables/arranged-offer-store' @@ -30,4 +33,28 @@ export async function mutateOffersToArrange (arrangedOfferStore: ArrangedOfferSt return offersToArrange } -// TODO: arrange offers +export async function offerArrange(offer: ArrangedOffer, serviceSigner: Signer) { + const status = offer.stat + const invocation = await Offer.arrange + .invoke({ + issuer: serviceSigner, + audience: serviceSigner, + with: serviceSigner.did(), + nb: { + commitmentProof: offer.commitmentProof, + }, + }) + .delegate() + + // Create receipt with invocation + const receipt = await Receipt.issue({ + issuer: serviceSigner, + result: { ok: { status } }, + ran: invocation, + }) + + // Create message + return await Message.build({ + receipts: [receipt] + }) +} \ No newline at end of file diff --git a/packages/core/package.json b/packages/core/package.json index 65efe18..296075f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -11,6 +11,7 @@ "@aws-sdk/client-s3": "^3.338.0", "@aws-sdk/util-dynamodb": "^3.338.0", "@ipld/dag-ucan": "3.3.2", + "@ucanto/core": "8.0.0", "@ucanto/interface": "8.0.0", "@ucanto/principal": "^8.0.0", "@web3-storage/aggregate-api": "^0.0.0", diff --git a/packages/functions/src/data/offer-arrange.ts b/packages/functions/src/data/offer-arrange.ts new file mode 100644 index 0000000..7fc7577 --- /dev/null +++ b/packages/functions/src/data/offer-arrange.ts @@ -0,0 +1,52 @@ +import * as Sentry from '@sentry/serverless' +import { Config } from 'sst/node/config' + +import * as CAR from '@ucanto/transport/car' + +import { connect as ucanLogConnect } from '@spade-proxy/core/ucan-log' +import { offerArrange } from '@spade-proxy/core/arrange-offers' +import { getServiceSigner } from '@spade-proxy/core/service' + +import { mustGetEnv, parseDynamoDbEvent } from '../utils' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +export async function main(event: import('aws-lambda').DynamoDBStreamEvent) { + const { + spadeProxyDid, + ucanLogUrl + } = getLambdaEnv() + + const records = parseDynamoDbEvent(event) + if (records.length > 1) { + throw new Error('Should only receive one offer to arrange') + } + + // @ts-expect-error can't figure out type of new + const newRecord = unmarshall(records[0].new) + + const { PRIVATE_KEY, UCAN_LOG_BASIC_AUTH } = Config + + const ucanLog = ucanLogConnect({ + url: new URL(ucanLogUrl || ''), + auth: UCAN_LOG_BASIC_AUTH + }) + const serviceSigner = getServiceSigner({ SPADE_PROXY_DID: spadeProxyDid, PRIVATE_KEY }) + + // Create message with receipt from offer arrange + const message = await offerArrange(newRecord, serviceSigner) + + // We block until we can log the UCAN invocation if this fails it will restart + await ucanLog.log(CAR.request.encode(message)) +} + +function getLambdaEnv () { + return { + spadeProxyDid: mustGetEnv('SPADE_PROXY_DID'), + ucanLogUrl: mustGetEnv('UCAN_LOG_URL') + } +} diff --git a/packages/functions/src/utils.ts b/packages/functions/src/utils.ts index 4ba1ff5..457822d 100644 --- a/packages/functions/src/utils.ts +++ b/packages/functions/src/utils.ts @@ -3,3 +3,10 @@ export function mustGetEnv (name: string) { if (!value) throw new Error(`missing ${name} environment variable`) return value } + +export function parseDynamoDbEvent (event: import('aws-lambda').DynamoDBStreamEvent) { + return event.Records.map(r => ({ + new: r.dynamodb?.NewImage, + old: r.dynamodb?.OldImage + })) +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 660d324..391a5a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -57,6 +57,9 @@ importers: '@ipld/dag-ucan': specifier: 3.3.2 version: 3.3.2 + '@ucanto/core': + specifier: 8.0.0 + version: 8.0.0 '@ucanto/interface': specifier: 8.0.0 version: 8.0.0 diff --git a/stacks/DataStack.ts b/stacks/DataStack.ts index 1ea2736..dda3d16 100644 --- a/stacks/DataStack.ts +++ b/stacks/DataStack.ts @@ -1,13 +1,24 @@ import { Bucket, + Config, Table, StackContext } from 'sst/constructs'; +import { StartingPosition } from 'aws-cdk-lib/aws-lambda'; -import { arrangedOfferTableProps } from '../packages/core/tables/index' -import { getBucketConfig } from './config'; +import { arrangedOfferTableProps } from '../packages/core/tables/index'; +import { + getBucketConfig, + setupSentry +} from './config'; + +export function DataStack({ app, stack }: StackContext) { + // Setup app monitoring with Sentry + setupSentry(app, stack) + + const ucanLogBasicAuth = new Config.Secret(stack, 'UCAN_LOG_BASIC_AUTH') + const privateKey = new Config.Secret(stack, 'PRIVATE_KEY') -export function DataStack({ stack }: StackContext) { const bucket = getBucketConfig('offer-store', stack.stage) const offerBucket = new Bucket(stack, bucket.bucketName, { cors: true, @@ -22,10 +33,49 @@ export function DataStack({ stack }: StackContext) { const arrangedOfferTable = new Table(stack, 'arranged-offer', { ...arrangedOfferTableProps, // information that will be written to the stream - stream: 'new_image', + stream: 'new_and_old_images', }) // TODO: Stream table with batches of 1 + // invoke offer arrange once stat is set + arrangedOfferTable.addConsumers(stack, { + offerArrange: { + function: { + handler: 'packages/functions/src/data/offer-arrange.main', + environment: { + UCAN_LOG_URL: process.env.UCAN_LOG_URL ?? '', + SPADE_PROXY_DID: process.env.SPADE_PROXY_DID ?? '', + }, + permissions: [], + timeout: 3 * 60, + bind: [ + privateKey, + ucanLogBasicAuth + ] + }, + cdk: { + eventSource: { + batchSize: 1, + // Start reading at the last untrimmed record in the shard in the system. + startingPosition: StartingPosition.TRIM_HORIZON, + // TODO: Add error queue + // https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.DynamoEventSourceProps.html#onfailure + } + }, + filters: [ + // Trigger when state changed + { + dynamodb: { + NewImage: { + stat: { + S: ['accepted', 'rejected'] + } + } + } + } + ] + } + }) return { offerBucket,