Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
feat: merge offer cron
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jun 6, 2023
1 parent 3537c54 commit 1604bf5
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 12 deletions.
59 changes: 57 additions & 2 deletions packages/core/buckets/offer-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {
S3Client,
ServiceInputTypes,
PutObjectCommand,
GetObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import pRetry from 'p-retry'

Expand Down Expand Up @@ -31,9 +33,62 @@ export function useOfferStore(s3client: S3Client, bucketName: string) {

await pRetry(() => s3client.send(putCmd))

// TODO: fxArchive
// TODO: Save fxArchive
},
putMergedOffers: async (key, mergedOffers) => {
const putCmd = new PutObjectCommand({
Bucket: bucketName,
ContentType: 'application/json',
Key: key,
Body: JSON.stringify(mergedOffers)
})

await pRetry(() => s3client.send(putCmd))
},
list: async function * (prefix: string) {
let continuationToken
do {
const listCmd = new ListObjectsV2Command({
Prefix: prefix,
Bucket: bucketName,
ContinuationToken: continuationToken
}) as ListObjectsV2Command

const response = await s3client.send(listCmd)
continuationToken = response.NextContinuationToken

if (response.Contents) {
const items = await Promise.all(
response.Contents.map(async item => {
const getCmd = new GetObjectCommand({
Bucket: bucketName,
Key: item.Key
})

return await s3client.send(getCmd)
})
)
for (const item of items) {
const offer = await item.Body?.transformToString()
if (offer) {
yield JSON.parse(offer)
}
}
}

} while (continuationToken)
}
} as AggregateAPI.OfferStore
} as OfferStore
}

export interface OfferStore extends AggregateAPI.OfferStore {
putMergedOffers: (key: string, mergedOffers: MergedOffer[]) => Promise<void>
list: (prefix: string) => AsyncGenerator<MergedOffer, void> // TODO
}

export type MergedOffer = {
commitmentProof: string,
offers: AggregateAPI.Offer[]
}

export function getNextUtcDateName() {
Expand Down
21 changes: 21 additions & 0 deletions packages/core/merge-offers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { OfferStore } from './buckets/offer-store'
import all from 'it-all'

export async function mergeOffers (date: Date, offerStore: OfferStore) {
const d = getPreviousUtcDateName(date)
const offers = (await all(offerStore.list(d)))
await offerStore.putMergedOffers(d, offers)

return d
}

function getPreviousUtcDateName(date: Date) {
// normalize date to multiple of 15 minutes
const currentMinute = date.getUTCMinutes()
const factor = Math.floor(currentMinute / 15) // difference between previous and next
const additionalTime = ((factor * 15) - currentMinute) * 60000

const nDate = new Date(date.getTime() + additionalTime)

return `${nDate.getUTCFullYear()}-${nDate.getUTCMonth()}-${nDate.getUTCDay()} ${nDate.getUTCHours()}:${nDate.getUTCMinutes()}:00`
}
6 changes: 5 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s 'test/**.test.js'",
"test": "ava --node-arguments='--experimental-fetch' --serial --verbose --timeout=60s 'test/**.test.js'",
"typecheck": "tsc -noEmit"
},
"dependencies": {
Expand All @@ -14,13 +14,17 @@
"@web3-storage/capabilities": "file:../../../w3up/packages/capabilities",
"@web3-storage/aggregate-api": "file:../../../w3up/packages/aggregate-api",
"@web3-storage/aggregate-client": "file:../../../w3up/packages/aggregate-client",
"it-all": "^3.0.2",
"node-fetch": "^3.3.1",
"p-retry": "^5.1.2"
},
"devDependencies": {
"@ipld/car": "5.1.1",
"@types/node": "^18.16.3",
"@ucanto/transport": "^8.0.0",
"@web-std/blob": "^3.0.4",
"ava": "^4.3.3",
"multiformats": "^11.0.2",
"nanoid": "^4.0.0",
"sst": "^2.8.3",
"testcontainers": "^8.13.0",
Expand Down
14 changes: 14 additions & 0 deletions packages/core/test/helpers/offers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { randomCARs } from './utils.js'

/**
* @param {number} length
* @param {number} size
*/
export async function generateOffers(length, size) {
return (await randomCARs(length, size))
// Inflate size for testing within range
.map((car) => ({
...car,
size: car.size * 10e5,
}))
}
59 changes: 59 additions & 0 deletions packages/core/test/helpers/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { CID } from 'multiformats'
import { webcrypto } from 'crypto'
import { sha256 } from 'multiformats/hashes/sha2'
import * as CAR from '@ucanto/transport/car'
import * as raw from 'multiformats/codecs/raw'
import { CarWriter } from '@ipld/car'
import { Blob } from '@web-std/blob'

/** @param {number} size */
export async function randomBytes(size) {
const bytes = new Uint8Array(size)
while (size) {
const chunk = new Uint8Array(Math.min(size, 65_536))
webcrypto.getRandomValues(chunk)

size -= bytes.length
bytes.set(chunk, size)
}
return bytes
}

/** @param {number} size */
export async function randomCAR(size) {
const bytes = await randomBytes(size)
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
writer.close()

const chunks = []
for await (const chunk of out) {
chunks.push(chunk)
}
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

return Object.assign(blob, { cid, roots: [root] })
}

/**
* @param {number} length
* @param {number} size
* @param {object} [options]
* @param {string} [options.origin]
*/
export async function randomCARs(length, size, options = {}) {
const origin = options.origin || 'https://carpark.web3.storage'

return (
await Promise.all(Array.from({ length }).map(() => randomCAR(size)))
).map((car) => ({
link: car.cid,
size: car.size,
commitmentProof: 'todo-commP',
src: [`${origin}/${car.cid.toString()}`],
}))
}
52 changes: 52 additions & 0 deletions packages/core/test/merge-offers.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import {
GetObjectCommand,
} from '@aws-sdk/client-s3'

import { test } from './helpers/context.js'
import { createS3, createBucket } from './helpers/resources.js'
import { generateOffers } from './helpers/offers.js'

import { mergeOffers } from '../merge-offers.js'
import { useOfferStore } from '../buckets/offer-store.js'

test.before(async (t) => {
Object.assign(t.context, {
s3: (await createS3()).client,
})
})

test('can merge offers', async t => {
const { s3 } = t.context
const bucketName = await createBucket(s3)
const offerStore = useOfferStore(s3, bucketName)

const offers = (await generateOffers(10, 10))

const dateToFront = new Date()
dateToFront.setMinutes(dateToFront.getMinutes() + 15)
const dataSegments = [
'commP0',
'commP1'
]
await Promise.all(
dataSegments.map(commitmentProof => offerStore.queue({ commitmentProof, offers }, new Uint8Array([1, 2, 3])))
)

const mergedOfferId = await mergeOffers(dateToFront, offerStore)
t.truthy(mergedOfferId)

const mergedOffersGetCmd = new GetObjectCommand({
Bucket: bucketName,
Key: mergedOfferId
})
const mergedOffersResponse = await s3.send(mergedOffersGetCmd)
const rawMergedOffers = await mergedOffersResponse.Body?.transformToString()
if (!rawMergedOffers) {
throw new Error('merged offer not written')
}
const mergedOffers = JSON.parse(rawMergedOffers)
t.is(mergedOffers.length, dataSegments.length)
for (const segment of dataSegments) {
t.truthy(mergedOffers.find(o => o.commitmentProof === segment))
}
})
22 changes: 14 additions & 8 deletions packages/functions/src/api/ucan-invocation-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { createAggregateStore } from '@spade-proxy/core/tables/aggregate-store'
import { createArrangedOfferStore } from '@spade-proxy/core/tables/arranged-offer-store'
import { createUcantoServer, getServiceSigner } from '@spade-proxy/core/service'

import { mustGetEnv } from '../utils'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
Expand All @@ -20,13 +22,9 @@ Sentry.AWSLambda.init({
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

export async function ucanInvocationRouter(request: APIGatewayProxyEventV2) {
const {
OFFER_BUCKET_NAME: offerBucketName = '',
SPADE_PROXY_DID,
UCAN_LOG_URL,
} = process.env
const { offerBucketName, spadeProxyDid, ucanLogUrl } = getLambdaEnv()

if (!SPADE_PROXY_DID) {
if (!spadeProxyDid) {
return {
statusCode: 500,
}
Expand All @@ -39,10 +37,10 @@ export async function ucanInvocationRouter(request: APIGatewayProxyEventV2) {
const { PRIVATE_KEY, UCAN_LOG_BASIC_AUTH } = Config

const ucanLog = ucanLogConnect({
url: new URL(UCAN_LOG_URL || ''),
url: new URL(ucanLogUrl || ''),
auth: UCAN_LOG_BASIC_AUTH
})
const serviceSigner = getServiceSigner({ SPADE_PROXY_DID, PRIVATE_KEY })
const serviceSigner = getServiceSigner({ SPADE_PROXY_DID: spadeProxyDid, PRIVATE_KEY })
const server = createUcantoServer(serviceSigner, {
aggregateStore: createAggregateStore(),
offerStore: createOfferStore(AWS_REGION, offerBucketName),
Expand Down Expand Up @@ -102,6 +100,14 @@ export const fromLambdaRequest = (request: APIGatewayProxyEventV2) => ({
body: Buffer.from(request.body || '', 'base64'),
} as API.HTTPRequest)

function getLambdaEnv () {
return {
offerBucketName: mustGetEnv('OFFER_BUCKET_NAME'),
spadeProxyDid: mustGetEnv('SPADE_PROXY_DID'),
ucanLogUrl: mustGetEnv('UCAN_LOG_URL')
}
}

// would be generated by sst, but requires `sst build` to be run, which calls out to aws; not great for CI
declare module 'sst/node/config' {
export interface SecretResources {
Expand Down
27 changes: 27 additions & 0 deletions packages/functions/src/cron/merge-offers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import * as Sentry from '@sentry/serverless'

import { mergeOffers } from '@spade-proxy/core/merge-offers'
import { createOfferStore } from '@spade-proxy/core/buckets/offer-store'

import { mustGetEnv } from '../utils'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

export async function main() {
const { offerBucketName } = getLambdaEnv()
const date = new Date()
const offerStore = createOfferStore(AWS_REGION, offerBucketName)

return mergeOffers(date, offerStore)
}

function getLambdaEnv () {
return {
offerBucketName: mustGetEnv('OFFER_BUCKET_NAME'),
}
}
9 changes: 9 additions & 0 deletions packages/functions/src/cron/offer-arrange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export async function main() {
const d = checkDate()
console.log(d)
return {}
}

function checkDate() {
return (new Date()).toISOString()
}
5 changes: 5 additions & 0 deletions packages/functions/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export function mustGetEnv (name: string) {
const value = process.env[name]
if (!value) throw new Error(`missing ${name} environment variable`)
return value
}
Loading

0 comments on commit 1604bf5

Please sign in to comment.