Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
feat: offer arrange invocation receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jun 9, 2023
1 parent 9e6324a commit 1bcabfa
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 19 deletions.
37 changes: 36 additions & 1 deletion packages/core/arrange-offers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import all from 'it-all'
import * as AggregateAPI from '@web3-storage/aggregate-api'
import * as Offer from '@web3-storage/capabilities/offer'
import { Signer } from '@ucanto/interface'
import { Message, Receipt } from '@ucanto/core'

import { ArrangedOfferStore, ArrangedOffer } from './tables/arranged-offer-store'

Expand Down Expand Up @@ -30,4 +33,36 @@ export async function mutateOffersToArrange (arrangedOfferStore: ArrangedOfferSt
return offersToArrange
}

// TODO: arrange offers
export async function offerArrange(offer: ArrangedOffer, serviceSigner: Signer) {
const { stat: status, commitmentProof } = offer

if (status !== 'accepted' && status !== 'rejected') {
// TODO: should this be a receipt of invocation with error?
throw new Error(`offer arrange for ${commitmentProof} cannot be fulfilled with given status: ${status}`)
}

const invocation = await Offer.arrange
.invoke({
issuer: serviceSigner,
audience: serviceSigner,
with: serviceSigner.did(),
nb: {
commitmentProof,
},
})
.delegate()

// TODO: error format in case of reporting failures

// Create receipt with invocation
const receipt = await Receipt.issue({
issuer: serviceSigner,
result: { ok: { status } },
ran: invocation,
})

// Create message
return await Message.build({
receipts: [receipt]
})
}
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"@aws-sdk/client-s3": "^3.338.0",
"@aws-sdk/util-dynamodb": "^3.338.0",
"@ipld/dag-ucan": "3.3.2",
"@ucanto/core": "8.0.0",
"@ucanto/interface": "8.0.0",
"@ucanto/principal": "^8.0.0",
"@web3-storage/aggregate-api": "^0.0.0",
Expand Down
48 changes: 45 additions & 3 deletions packages/core/test/arrange-offers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import {
createDynamodDb,
createTable,
} from './helpers/resources.js'
import { CID } from 'multiformats/cid'
import { generateOffers, createAggregateStore } from './helpers/offers.js'

import { CID } from 'multiformats/cid'
import { ed25519 } from '@ucanto/principal'

import { arrangedOfferTableProps } from '../tables/index.js'
import { mutateOffersToArrange } from '../arrange-offers.js'
import { mutateOffersToArrange, offerArrange } from '../arrange-offers.js'
import { useOfferStore } from '../buckets/offer-store.js'
import { useArrangedOfferStore } from '../tables/arranged-offer-store.js'

Expand All @@ -20,7 +22,7 @@ test.before(async (t) => {
})
})

test('can arrange multiple offers', async t => {
test('can mutate multiple offers to be arranged according to their state', async t => {
const { dynamo, s3 } = t.context
const bucketName = await createBucket(s3)

Expand Down Expand Up @@ -57,3 +59,43 @@ test('can arrange multiple offers', async t => {
t.is(resWithSecondSegment.length, 1)
t.is(resWithSecondSegment[0].commitmentProof.toString(), dataSegments[1].toString())
})

test('can create agent messages with receipt for arranged offer', async t => {
const signer = await ed25519.generate()
const commitmentProof = CID.parse('baga6ea4seaqcq4xx7rqx2lsrm6iky7qqk5jh7pbaj5bgdu22afhp4fodvccb6bq')

/** @type {import('../tables/arranged-offer-store.js').STAT[]} */
const stats = ['accepted', 'rejected']

for (const stat of stats) {
const offer = {
commitmentProof,
stat
}

const messageAcceptedReceipt = await offerArrange(offer, signer)

// console.log('message accepted', messageAcceptedReceipt.receipts)
t.is(messageAcceptedReceipt.receipts.size, 1)
const receiptCids = Array.from(messageAcceptedReceipt.receipts.keys())
const receipt = messageAcceptedReceipt.receipts.get(receiptCids[0])

// @ts-ignore unknown type
t.is(receipt?.out.ok?.status, stat)
}
})

test('fails to create agent message with receipt for non arranged offer', async t => {
const signer = await ed25519.generate()
const commitmentProof = CID.parse('baga6ea4seaqcq4xx7rqx2lsrm6iky7qqk5jh7pbaj5bgdu22afhp4fodvccb6bq')

/** @type {import('../tables/arranged-offer-store.js').STAT} */
const stat = 'queued'

const offer = {
commitmentProof,
stat
}

await t.throwsAsync(() => offerArrange(offer, signer))
})
52 changes: 52 additions & 0 deletions packages/functions/src/data/offer-arrange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as Sentry from '@sentry/serverless'
import { Config } from 'sst/node/config'

import * as CAR from '@ucanto/transport/car'

import { connect as ucanLogConnect } from '@spade-proxy/core/ucan-log'
import { offerArrange } from '@spade-proxy/core/arrange-offers'
import { getServiceSigner } from '@spade-proxy/core/service'

import { mustGetEnv, parseDynamoDbEvent } from '../utils'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

export async function main(event: import('aws-lambda').DynamoDBStreamEvent) {
const {
spadeProxyDid,
ucanLogUrl
} = getLambdaEnv()

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one offer to arrange')
}

// @ts-expect-error can't figure out type of new
const newRecord = unmarshall(records[0].new)

const { PRIVATE_KEY, UCAN_LOG_BASIC_AUTH } = Config

const ucanLog = ucanLogConnect({
url: new URL(ucanLogUrl || ''),
auth: UCAN_LOG_BASIC_AUTH
})
const serviceSigner = getServiceSigner({ SPADE_PROXY_DID: spadeProxyDid, PRIVATE_KEY })

// Create message with receipt from offer arrange
const message = await offerArrange(newRecord, serviceSigner)

// We block until we can log the UCAN invocation if this fails it will restart
await ucanLog.log(CAR.request.encode(message))
}

function getLambdaEnv () {
return {
spadeProxyDid: mustGetEnv('SPADE_PROXY_DID'),
ucanLogUrl: mustGetEnv('UCAN_LOG_URL')
}
}
7 changes: 7 additions & 0 deletions packages/functions/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ export function mustGetEnv (name: string) {
if (!value) throw new Error(`missing ${name} environment variable`)
return value
}

export function parseDynamoDbEvent (event: import('aws-lambda').DynamoDBStreamEvent) {
return event.Records.map(r => ({
new: r.dynamodb?.NewImage,
old: r.dynamodb?.OldImage
}))
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions stacks/ApiStack.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
Api,
Config,
StackContext,
use
} from 'sst/constructs';
Expand All @@ -14,7 +13,12 @@ import {
} from './config';

export function ApiStack({ app, stack }: StackContext) {
const { offerBucket, arrangedOfferTable } = use(DataStack)
const {
offerBucket,
arrangedOfferTable,
privateKey,
ucanLogBasicAuth
} = use(DataStack)

// Setup app monitoring with Sentry
setupSentry(app, stack)
Expand All @@ -23,14 +27,13 @@ export function ApiStack({ app, stack }: StackContext) {
const customDomain = getCustomDomain(stack.stage, process.env.HOSTED_ZONE)
const pkg = getApiPackageJson()
const git = getGitInfo()
const privateKey = new Config.Secret(stack, 'PRIVATE_KEY')
const ucanLogBasicAuth = new Config.Secret(stack, 'UCAN_LOG_BASIC_AUTH')

const api = new Api(stack, 'api', {
customDomain,
defaults: {
function: {
permissions: [
arrangedOfferTable,
offerBucket
],
environment: {
Expand Down
11 changes: 9 additions & 2 deletions stacks/CronStack.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { Cron, StackContext, use } from 'sst/constructs';
import { DataStack } from './DataStack';

import {
getConstructName
} from './config';

export function CronStack({ stack }: StackContext) {
const { offerBucket, arrangedOfferTable } = use(DataStack)

new Cron(stack, 'arrange-offers', {
const arrangeOffersCronName = getConstructName('arrange-offers-cron', stack.stage)
new Cron(stack, arrangeOffersCronName, {
// schedule: 'rate(30 seconds)',
schedule: 'rate(30 minutes)',
job: {
function: {
Expand All @@ -17,7 +23,8 @@ export function CronStack({ stack }: StackContext) {
})

// Write finalized files
const mergeOfferCron = new Cron(stack, 'merge-offer-cron', {
const mergeOfferCronName = getConstructName('merge-offer-cron', stack.stage)
const mergeOfferCron = new Cron(stack, mergeOfferCronName, {
schedule: 'cron(0,15,30,45 * * * ? *)',
job: {
function: {
Expand Down
67 changes: 60 additions & 7 deletions stacks/DataStack.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import {
Bucket,
Config,
Table,
StackContext
} from 'sst/constructs';
import { StartingPosition } from 'aws-cdk-lib/aws-lambda';

import { arrangedOfferTableProps } from '../packages/core/tables/index'
import { getBucketConfig } from './config';
import { arrangedOfferTableProps } from '../packages/core/tables/index';
import {
getBucketConfig,
getConstructName,
setupSentry
} from './config';

export function DataStack({ app, stack }: StackContext) {
// Setup app monitoring with Sentry
setupSentry(app, stack)

const ucanLogBasicAuth = new Config.Secret(stack, 'UCAN_LOG_BASIC_AUTH')
const privateKey = new Config.Secret(stack, 'PRIVATE_KEY')

export function DataStack({ stack }: StackContext) {
const bucket = getBucketConfig('offer-store', stack.stage)
const offerBucket = new Bucket(stack, bucket.bucketName, {
cors: true,
Expand All @@ -19,16 +31,57 @@ export function DataStack({ stack }: StackContext) {
/**
* This table tracks CARs pending a Filecoin deal together with their metadata.
*/
const arrangedOfferTable = new Table(stack, 'arranged-offer', {
const tableName = getConstructName('arranged-offer', stack.stage)
const arrangedOfferTable = new Table(stack, tableName, {
...arrangedOfferTableProps,
// information that will be written to the stream
stream: 'new_image',
stream: 'new_and_old_images',
})

// TODO: Stream table with batches of 1
// invoke offer arrange once stat is set
arrangedOfferTable.addConsumers(stack, {
offerArrange: {
function: {
handler: 'packages/functions/src/data/offer-arrange.main',
environment: {
UCAN_LOG_URL: process.env.UCAN_LOG_URL ?? '',
SPADE_PROXY_DID: process.env.SPADE_PROXY_DID ?? '',
},
permissions: [],
timeout: 3 * 60,
bind: [
privateKey,
ucanLogBasicAuth
]
},
cdk: {
eventSource: {
batchSize: 1,
// Start reading at the last untrimmed record in the shard in the system.
startingPosition: StartingPosition.TRIM_HORIZON,
// TODO: Add error queue
// https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.DynamoEventSourceProps.html#onfailure
}
},
filters: [
// Trigger when state changed
{
dynamodb: {
NewImage: {
stat: {
S: ['accepted', 'rejected']
}
}
}
}
]
}
})

return {
offerBucket,
arrangedOfferTable
arrangedOfferTable,
ucanLogBasicAuth,
privateKey
}
}
4 changes: 2 additions & 2 deletions stacks/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { LayerVersion } from 'aws-cdk-lib/aws-lambda'
/**
* Get nicer bucket names
*/
export function getBucketName (name: string, stage: string, version = 0) {
export function getConstructName (name: string, stage: string, version = 0) {
// e.g `carpark-prod-0` or `satnav-pr101-0`
return `${name}-${stage}-${version}`
}

export function getBucketConfig(name: string, stage: string, version = 0){
return {
bucketName: getBucketName(name, stage, version),
bucketName: getConstructName(name, stage, version),
...(isPrBuild(stage) && {
autoDeleteObjects: true,
removalPolicy: RemovalPolicy.DESTROY
Expand Down

0 comments on commit 1bcabfa

Please sign in to comment.