From 1bcabfa758ced98b15168e810414d2970e39130d Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 8 Jun 2023 20:50:41 +0200 Subject: [PATCH] feat: offer arrange invocation receipt --- packages/core/arrange-offers.ts | 37 ++++++++++- packages/core/package.json | 1 + packages/core/test/arrange-offers.test.js | 48 +++++++++++++- packages/functions/src/data/offer-arrange.ts | 52 +++++++++++++++ packages/functions/src/utils.ts | 7 ++ pnpm-lock.yaml | 3 + stacks/ApiStack.ts | 11 ++-- stacks/CronStack.ts | 11 +++- stacks/DataStack.ts | 67 ++++++++++++++++++-- stacks/config.ts | 4 +- 10 files changed, 222 insertions(+), 19 deletions(-) create mode 100644 packages/functions/src/data/offer-arrange.ts diff --git a/packages/core/arrange-offers.ts b/packages/core/arrange-offers.ts index 770fe3a..ea522b4 100644 --- a/packages/core/arrange-offers.ts +++ b/packages/core/arrange-offers.ts @@ -1,5 +1,8 @@ import all from 'it-all' import * as AggregateAPI from '@web3-storage/aggregate-api' +import * as Offer from '@web3-storage/capabilities/offer' +import { Signer } from '@ucanto/interface' +import { Message, Receipt } from '@ucanto/core' import { ArrangedOfferStore, ArrangedOffer } from './tables/arranged-offer-store' @@ -30,4 +33,36 @@ export async function mutateOffersToArrange (arrangedOfferStore: ArrangedOfferSt return offersToArrange } -// TODO: arrange offers +export async function offerArrange(offer: ArrangedOffer, serviceSigner: Signer) { + const { stat: status, commitmentProof } = offer + + if (status !== 'accepted' && status !== 'rejected') { + // TODO: should this be a receipt of invocation with error? + throw new Error(`offer arrange for ${commitmentProof} cannot be fulfilled with given status: ${status}`) + } + + const invocation = await Offer.arrange + .invoke({ + issuer: serviceSigner, + audience: serviceSigner, + with: serviceSigner.did(), + nb: { + commitmentProof, + }, + }) + .delegate() + + // TODO: error format in case of reporting failures + + // Create receipt with invocation + const receipt = await Receipt.issue({ + issuer: serviceSigner, + result: { ok: { status } }, + ran: invocation, + }) + + // Create message + return await Message.build({ + receipts: [receipt] + }) +} \ No newline at end of file diff --git a/packages/core/package.json b/packages/core/package.json index 65efe18..296075f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -11,6 +11,7 @@ "@aws-sdk/client-s3": "^3.338.0", "@aws-sdk/util-dynamodb": "^3.338.0", "@ipld/dag-ucan": "3.3.2", + "@ucanto/core": "8.0.0", "@ucanto/interface": "8.0.0", "@ucanto/principal": "^8.0.0", "@web3-storage/aggregate-api": "^0.0.0", diff --git a/packages/core/test/arrange-offers.test.js b/packages/core/test/arrange-offers.test.js index 9b3ac33..ed46370 100644 --- a/packages/core/test/arrange-offers.test.js +++ b/packages/core/test/arrange-offers.test.js @@ -5,11 +5,13 @@ import { createDynamodDb, createTable, } from './helpers/resources.js' -import { CID } from 'multiformats/cid' import { generateOffers, createAggregateStore } from './helpers/offers.js' +import { CID } from 'multiformats/cid' +import { ed25519 } from '@ucanto/principal' + import { arrangedOfferTableProps } from '../tables/index.js' -import { mutateOffersToArrange } from '../arrange-offers.js' +import { mutateOffersToArrange, offerArrange } from '../arrange-offers.js' import { useOfferStore } from '../buckets/offer-store.js' import { useArrangedOfferStore } from '../tables/arranged-offer-store.js' @@ -20,7 +22,7 @@ test.before(async (t) => { }) }) -test('can arrange multiple offers', async t => { +test('can mutate multiple offers to be arranged according to their state', async t => { const { dynamo, s3 } = t.context const bucketName = await createBucket(s3) @@ -57,3 +59,43 @@ test('can arrange multiple offers', async t => { t.is(resWithSecondSegment.length, 1) t.is(resWithSecondSegment[0].commitmentProof.toString(), dataSegments[1].toString()) }) + +test('can create agent messages with receipt for arranged offer', async t => { + const signer = await ed25519.generate() + const commitmentProof = CID.parse('baga6ea4seaqcq4xx7rqx2lsrm6iky7qqk5jh7pbaj5bgdu22afhp4fodvccb6bq') + + /** @type {import('../tables/arranged-offer-store.js').STAT[]} */ + const stats = ['accepted', 'rejected'] + + for (const stat of stats) { + const offer = { + commitmentProof, + stat + } + + const messageAcceptedReceipt = await offerArrange(offer, signer) + + // console.log('message accepted', messageAcceptedReceipt.receipts) + t.is(messageAcceptedReceipt.receipts.size, 1) + const receiptCids = Array.from(messageAcceptedReceipt.receipts.keys()) + const receipt = messageAcceptedReceipt.receipts.get(receiptCids[0]) + + // @ts-ignore unknown type + t.is(receipt?.out.ok?.status, stat) + } +}) + +test('fails to create agent message with receipt for non arranged offer', async t => { + const signer = await ed25519.generate() + const commitmentProof = CID.parse('baga6ea4seaqcq4xx7rqx2lsrm6iky7qqk5jh7pbaj5bgdu22afhp4fodvccb6bq') + + /** @type {import('../tables/arranged-offer-store.js').STAT} */ + const stat = 'queued' + + const offer = { + commitmentProof, + stat + } + + await t.throwsAsync(() => offerArrange(offer, signer)) +}) diff --git a/packages/functions/src/data/offer-arrange.ts b/packages/functions/src/data/offer-arrange.ts new file mode 100644 index 0000000..7fc7577 --- /dev/null +++ b/packages/functions/src/data/offer-arrange.ts @@ -0,0 +1,52 @@ +import * as Sentry from '@sentry/serverless' +import { Config } from 'sst/node/config' + +import * as CAR from '@ucanto/transport/car' + +import { connect as ucanLogConnect } from '@spade-proxy/core/ucan-log' +import { offerArrange } from '@spade-proxy/core/arrange-offers' +import { getServiceSigner } from '@spade-proxy/core/service' + +import { mustGetEnv, parseDynamoDbEvent } from '../utils' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +export async function main(event: import('aws-lambda').DynamoDBStreamEvent) { + const { + spadeProxyDid, + ucanLogUrl + } = getLambdaEnv() + + const records = parseDynamoDbEvent(event) + if (records.length > 1) { + throw new Error('Should only receive one offer to arrange') + } + + // @ts-expect-error can't figure out type of new + const newRecord = unmarshall(records[0].new) + + const { PRIVATE_KEY, UCAN_LOG_BASIC_AUTH } = Config + + const ucanLog = ucanLogConnect({ + url: new URL(ucanLogUrl || ''), + auth: UCAN_LOG_BASIC_AUTH + }) + const serviceSigner = getServiceSigner({ SPADE_PROXY_DID: spadeProxyDid, PRIVATE_KEY }) + + // Create message with receipt from offer arrange + const message = await offerArrange(newRecord, serviceSigner) + + // We block until we can log the UCAN invocation if this fails it will restart + await ucanLog.log(CAR.request.encode(message)) +} + +function getLambdaEnv () { + return { + spadeProxyDid: mustGetEnv('SPADE_PROXY_DID'), + ucanLogUrl: mustGetEnv('UCAN_LOG_URL') + } +} diff --git a/packages/functions/src/utils.ts b/packages/functions/src/utils.ts index 4ba1ff5..457822d 100644 --- a/packages/functions/src/utils.ts +++ b/packages/functions/src/utils.ts @@ -3,3 +3,10 @@ export function mustGetEnv (name: string) { if (!value) throw new Error(`missing ${name} environment variable`) return value } + +export function parseDynamoDbEvent (event: import('aws-lambda').DynamoDBStreamEvent) { + return event.Records.map(r => ({ + new: r.dynamodb?.NewImage, + old: r.dynamodb?.OldImage + })) +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 660d324..391a5a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -57,6 +57,9 @@ importers: '@ipld/dag-ucan': specifier: 3.3.2 version: 3.3.2 + '@ucanto/core': + specifier: 8.0.0 + version: 8.0.0 '@ucanto/interface': specifier: 8.0.0 version: 8.0.0 diff --git a/stacks/ApiStack.ts b/stacks/ApiStack.ts index 904d80a..0f96e52 100644 --- a/stacks/ApiStack.ts +++ b/stacks/ApiStack.ts @@ -1,6 +1,5 @@ import { Api, - Config, StackContext, use } from 'sst/constructs'; @@ -14,7 +13,12 @@ import { } from './config'; export function ApiStack({ app, stack }: StackContext) { - const { offerBucket, arrangedOfferTable } = use(DataStack) + const { + offerBucket, + arrangedOfferTable, + privateKey, + ucanLogBasicAuth + } = use(DataStack) // Setup app monitoring with Sentry setupSentry(app, stack) @@ -23,14 +27,13 @@ export function ApiStack({ app, stack }: StackContext) { const customDomain = getCustomDomain(stack.stage, process.env.HOSTED_ZONE) const pkg = getApiPackageJson() const git = getGitInfo() - const privateKey = new Config.Secret(stack, 'PRIVATE_KEY') - const ucanLogBasicAuth = new Config.Secret(stack, 'UCAN_LOG_BASIC_AUTH') const api = new Api(stack, 'api', { customDomain, defaults: { function: { permissions: [ + arrangedOfferTable, offerBucket ], environment: { diff --git a/stacks/CronStack.ts b/stacks/CronStack.ts index 021a563..b068a30 100644 --- a/stacks/CronStack.ts +++ b/stacks/CronStack.ts @@ -1,10 +1,16 @@ import { Cron, StackContext, use } from 'sst/constructs'; import { DataStack } from './DataStack'; +import { + getConstructName +} from './config'; + export function CronStack({ stack }: StackContext) { const { offerBucket, arrangedOfferTable } = use(DataStack) - new Cron(stack, 'arrange-offers', { + const arrangeOffersCronName = getConstructName('arrange-offers-cron', stack.stage) + new Cron(stack, arrangeOffersCronName, { + // schedule: 'rate(30 seconds)', schedule: 'rate(30 minutes)', job: { function: { @@ -17,7 +23,8 @@ export function CronStack({ stack }: StackContext) { }) // Write finalized files - const mergeOfferCron = new Cron(stack, 'merge-offer-cron', { + const mergeOfferCronName = getConstructName('merge-offer-cron', stack.stage) + const mergeOfferCron = new Cron(stack, mergeOfferCronName, { schedule: 'cron(0,15,30,45 * * * ? *)', job: { function: { diff --git a/stacks/DataStack.ts b/stacks/DataStack.ts index 1ea2736..c9dc5fd 100644 --- a/stacks/DataStack.ts +++ b/stacks/DataStack.ts @@ -1,13 +1,25 @@ import { Bucket, + Config, Table, StackContext } from 'sst/constructs'; +import { StartingPosition } from 'aws-cdk-lib/aws-lambda'; -import { arrangedOfferTableProps } from '../packages/core/tables/index' -import { getBucketConfig } from './config'; +import { arrangedOfferTableProps } from '../packages/core/tables/index'; +import { + getBucketConfig, + getConstructName, + setupSentry +} from './config'; + +export function DataStack({ app, stack }: StackContext) { + // Setup app monitoring with Sentry + setupSentry(app, stack) + + const ucanLogBasicAuth = new Config.Secret(stack, 'UCAN_LOG_BASIC_AUTH') + const privateKey = new Config.Secret(stack, 'PRIVATE_KEY') -export function DataStack({ stack }: StackContext) { const bucket = getBucketConfig('offer-store', stack.stage) const offerBucket = new Bucket(stack, bucket.bucketName, { cors: true, @@ -19,16 +31,57 @@ export function DataStack({ stack }: StackContext) { /** * This table tracks CARs pending a Filecoin deal together with their metadata. */ - const arrangedOfferTable = new Table(stack, 'arranged-offer', { + const tableName = getConstructName('arranged-offer', stack.stage) + const arrangedOfferTable = new Table(stack, tableName, { ...arrangedOfferTableProps, // information that will be written to the stream - stream: 'new_image', + stream: 'new_and_old_images', }) - // TODO: Stream table with batches of 1 + // invoke offer arrange once stat is set + arrangedOfferTable.addConsumers(stack, { + offerArrange: { + function: { + handler: 'packages/functions/src/data/offer-arrange.main', + environment: { + UCAN_LOG_URL: process.env.UCAN_LOG_URL ?? '', + SPADE_PROXY_DID: process.env.SPADE_PROXY_DID ?? '', + }, + permissions: [], + timeout: 3 * 60, + bind: [ + privateKey, + ucanLogBasicAuth + ] + }, + cdk: { + eventSource: { + batchSize: 1, + // Start reading at the last untrimmed record in the shard in the system. + startingPosition: StartingPosition.TRIM_HORIZON, + // TODO: Add error queue + // https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.DynamoEventSourceProps.html#onfailure + } + }, + filters: [ + // Trigger when state changed + { + dynamodb: { + NewImage: { + stat: { + S: ['accepted', 'rejected'] + } + } + } + } + ] + } + }) return { offerBucket, - arrangedOfferTable + arrangedOfferTable, + ucanLogBasicAuth, + privateKey } } diff --git a/stacks/config.ts b/stacks/config.ts index 143f2f7..0989f20 100644 --- a/stacks/config.ts +++ b/stacks/config.ts @@ -6,14 +6,14 @@ import { LayerVersion } from 'aws-cdk-lib/aws-lambda' /** * Get nicer bucket names */ -export function getBucketName (name: string, stage: string, version = 0) { +export function getConstructName (name: string, stage: string, version = 0) { // e.g `carpark-prod-0` or `satnav-pr101-0` return `${name}-${stage}-${version}` } export function getBucketConfig(name: string, stage: string, version = 0){ return { - bucketName: getBucketName(name, stage, version), + bucketName: getConstructName(name, stage, version), ...(isPrBuild(stage) && { autoDeleteObjects: true, removalPolicy: RemovalPolicy.DESTROY