Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
feat: cron for merge offer and offer arrange invocation trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jun 8, 2023
1 parent 47e6ced commit 935835e
Show file tree
Hide file tree
Showing 18 changed files with 1,683 additions and 2,075 deletions.
33 changes: 33 additions & 0 deletions packages/core/arrange-offers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import all from 'it-all'
import * as AggregateAPI from '@web3-storage/aggregate-api'

import { ArrangedOfferStore, ArrangedOffer } from './tables/arranged-offer-store'

const STAT_TO_ARRANGE = 'queued'

export async function mutateOffersToArrange (arrangedOfferStore: ArrangedOfferStore, aggregateStore: AggregateAPI.AggregateStore) {
const offersQueued = await all(arrangedOfferStore.list(STAT_TO_ARRANGE))

const offersToArrange = (await Promise.all(
offersQueued.map(async (commitmentProof) => {
const aggregate = await aggregateStore.get(commitmentProof)
if (!aggregate) {
return {
commitmentProof,
stat: STAT_TO_ARRANGE
}
} else {
// TODO: state
return {
commitmentProof,
stat: 'accepted'
}
}
})
)).filter(offer => offer.stat !== STAT_TO_ARRANGE) as ArrangedOffer[]

await arrangedOfferStore.batchSet(offersToArrange)
return offersToArrange
}

// TODO: arrange offers
57 changes: 56 additions & 1 deletion packages/core/buckets/offer-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {
S3Client,
ServiceInputTypes,
PutObjectCommand,
GetObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import pRetry from 'p-retry'

Expand Down Expand Up @@ -40,8 +42,61 @@ export function useOfferStore(s3client: S3Client, bucketName: string, arrangedOf
// add commitmentProof for polling
// once an aggregate is fulfilled (accepted or rejected) a receipt will be generated.
await arrangedOfferStore.set(commitmentProof, 'queued')
},
putMergedOffers: async (key, mergedOffers) => {
const putCmd = new PutObjectCommand({
Bucket: bucketName,
ContentType: 'application/json',
Key: key,
Body: JSON.stringify(mergedOffers)
})

await pRetry(() => s3client.send(putCmd))
},
list: async function * (prefix: string) {
let continuationToken
do {
const listCmd = new ListObjectsV2Command({
Prefix: prefix,
Bucket: bucketName,
ContinuationToken: continuationToken
}) as ListObjectsV2Command

const response = await s3client.send(listCmd)
continuationToken = response.NextContinuationToken

if (response.Contents) {
const items = await Promise.all(
response.Contents.map(async item => {
const getCmd = new GetObjectCommand({
Bucket: bucketName,
Key: item.Key
})

return await s3client.send(getCmd)
})
)
for (const item of items) {
const offer = await item.Body?.transformToString()
if (offer) {
yield JSON.parse(offer)
}
}
}

} while (continuationToken)
}
} as AggregateAPI.OfferStore
} as OfferStore
}

export interface OfferStore extends AggregateAPI.OfferStore {
putMergedOffers: (key: string, mergedOffers: MergedOffer[]) => Promise<void>
list: (prefix: string) => AsyncGenerator<MergedOffer, void>
}

export type MergedOffer = {
commitmentProof: string,
offers: AggregateAPI.Offer[]
}

/**
Expand Down
22 changes: 22 additions & 0 deletions packages/core/merge-offers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import all from 'it-all'

import { OfferStore } from './buckets/offer-store'

export async function mergeOffers (date: Date, offerStore: OfferStore) {
const d = getPreviousUtcDateName(date)
const offers = (await all(offerStore.list(d)))
await offerStore.putMergedOffers(d, offers)

return d
}

function getPreviousUtcDateName(date: Date) {
// normalize date to multiple of 15 minutes
const currentMinute = date.getUTCMinutes()
const factor = Math.floor(currentMinute / 15) // difference between previous and next
const additionalTime = ((factor * 15) - currentMinute) * 60000

const nDate = new Date(date.getTime() + additionalTime)

return `${nDate.getUTCFullYear()}-${nDate.getUTCMonth()}-${nDate.getUTCDay()} ${nDate.getUTCHours()}:${nDate.getUTCMinutes()}:00`
}
4 changes: 4 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
"@web3-storage/aggregate-api": "^0.0.0",
"@web3-storage/aggregate-client": "^0.0.0",
"@web3-storage/capabilities": "^6.0.0",
"it-all": "^3.0.2",
"multiformats": "^11.0.2",
"node-fetch": "^3.3.1",
"p-retry": "^5.1.2"
},
"devDependencies": {
"@ipld/car": "5.1.1",
"@types/node": "^18.16.3",
"@ucanto/transport": "^8.0.0",
"@web-std/blob": "^3.0.4",
"ava": "^5.3.0",
"nanoid": "^4.0.0",
Expand Down
80 changes: 68 additions & 12 deletions packages/core/tables/arranged-offer-store.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import {
DynamoDBClient,
GetItemCommand,
PutItemCommand,
// QueryCommand,
UpdateItemCommand,
TransactWriteItemsCommand,
QueryCommand,
} from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
import { CID } from 'multiformats/cid'
import type { Link } from '@ucanto/interface'
import * as AggregateAPI from '@web3-storage/aggregate-api'

// import { MAX_TRANSACT_WRITE_ITEMS } from './constants.js'

export function createArrangedOfferStore(region: string, tableName: string, options?: ArrangedOfferStoreDbOptions) {
const dynamoDb = new DynamoDBClient({
region,
Expand All @@ -17,7 +21,7 @@ export function createArrangedOfferStore(region: string, tableName: string, opti
return useArrangedOfferStore(dynamoDb, tableName)
}

export function useArrangedOfferStore(dynamoDb: DynamoDBClient, tableName: string) {
export function useArrangedOfferStore(dynamoDb: DynamoDBClient, tableName: string): ArrangedOfferStore {
return {
get: async (commitmentProof) => {
const cmd = new GetItemCommand({
Expand All @@ -29,29 +33,81 @@ export function useArrangedOfferStore(dynamoDb: DynamoDBClient, tableName: strin

const response = await dynamoDb.send(cmd)
if (response?.Item) {
return unmarshall(response.Item)
return unmarshall(response.Item).stat
}
},
set: async (commitmentProof, stat) => {
const item = {
commitmentProof: commitmentProof.toString(),
stat
const cmd = new UpdateItemCommand({
TableName: tableName,
Key: marshall({ commitmentProof: commitmentProof.toString() }),
ExpressionAttributeValues: {
':st': { S: stat }
},
UpdateExpression: 'SET stat = :st'
})

await dynamoDb.send(cmd)
},
batchSet: async (arrangedOffers: ArrangedOffer[]) => {
if (!arrangedOffers.length) {
return
}

const cmd = new PutItemCommand({
TableName: tableName,
Item: marshall(item),
const cmd = new TransactWriteItemsCommand({
TransactItems: arrangedOffers.map(ao => ({
Update: {
TableName: tableName,
Key: marshall({ commitmentProof: ao.commitmentProof.toString() }),
ExpressionAttributeValues: {
':st': { S: ao.stat }
},
UpdateExpression: 'SET stat = :st'
}
}))
})

await dynamoDb.send(cmd)
},
list: async function * (stat: STAT, options?: { limit: number }) {
let exclusiveStartKey
do {
const queryCommand = new QueryCommand({
TableName: tableName,
Limit: options?.limit || 20,
IndexName: 'indexStat',
ExpressionAttributeValues: {
':st': { S: stat },
},
KeyConditionExpression: 'stat = :st'
})

const res = await dynamoDb.send(queryCommand)
if (!res.Items || !res.Items.length) {
break
}

exclusiveStartKey = res.LastEvaluatedKey
for (const item of res.Items) {
yield CID.parse(unmarshall(item).commitmentProof)
}
} while (exclusiveStartKey)
}
} as ArrangedOfferStore
}
}

export interface ArrangedOfferStore extends AggregateAPI.ArrangedOfferStore {
set: (commitmentProof: Link<unknown, number, number, 0 | 1>, status: string) => Promise<void>
set: (commitmentProof: Link<unknown, number, number, 0 | 1>, stat: STAT) => Promise<void>
batchSet: (arrangedOffer: ArrangedOffer[]) => Promise<void>
list: (stat: STAT) => AsyncGenerator<CID, void>
}

export interface ArrangedOffer {
commitmentProof: Link<unknown, number, number, 0 | 1>
stat: STAT
}

export interface ArrangedOfferStoreDbOptions {
endpoint?: string
}

export type STAT = 'queued' | 'accepted' | 'rejected'
2 changes: 2 additions & 0 deletions packages/core/tables/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-dynamodb/classes/transactwriteitemscommand.html
export const MAX_TRANSACT_WRITE_ITEMS = 100
59 changes: 59 additions & 0 deletions packages/core/test/arrange-offers.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { test } from './helpers/context.js'
import {
createS3,
createBucket,
createDynamodDb,
createTable,
} from './helpers/resources.js'
import { CID } from 'multiformats/cid'
import { generateOffers, createAggregateStore } from './helpers/offers.js'

import { arrangedOfferTableProps } from '../tables/index.js'
import { mutateOffersToArrange } from '../arrange-offers.js'
import { useOfferStore } from '../buckets/offer-store.js'
import { useArrangedOfferStore } from '../tables/arranged-offer-store.js'

test.before(async (t) => {
Object.assign(t.context, {
dynamo: await createDynamodDb(),
s3: (await createS3()).client,
})
})

test('can arrange multiple offers', async t => {
const { dynamo, s3 } = t.context
const bucketName = await createBucket(s3)

const aggregateStore = createAggregateStore()
const arrangedOfferStore = useArrangedOfferStore(
dynamo,
await createTable(dynamo, arrangedOfferTableProps)
)
const offerStore = useOfferStore(s3, bucketName, arrangedOfferStore)
const offers = (await generateOffers(10, 10))

const dateToFront = new Date()
dateToFront.setMinutes(dateToFront.getMinutes() + 15)
const dataSegments = [
'baga6ea4seaqcq4xx7rqx2lsrm6iky7qqk5jh7pbaj5bgdu22afhp4fodvccb6bq',
'baga6ea4seaqhisghxrl4yntcsxtoc6rany2fjtbxgcnhhztkh5myy2mbs4nk2ki'
].map(s => CID.parse(s))
await Promise.all(
dataSegments.map(commitmentProof => offerStore.queue({ commitmentProof, offers }))
)

// @ts-expect-error set one segment aggregate
aggregateStore.set(dataSegments[0])
const resWithFirstSegment = await mutateOffersToArrange(arrangedOfferStore, aggregateStore)
t.is(resWithFirstSegment.length, 1)
t.is(resWithFirstSegment[0].commitmentProof.toString(), dataSegments[0].toString())

const resWithoutReadySegments = await mutateOffersToArrange(arrangedOfferStore, aggregateStore)
t.is(resWithoutReadySegments.length, 0)

// @ts-expect-error set second segment aggregate
aggregateStore.set(dataSegments[1])
const resWithSecondSegment = await mutateOffersToArrange(arrangedOfferStore, aggregateStore)
t.is(resWithSecondSegment.length, 1)
t.is(resWithSecondSegment[0].commitmentProof.toString(), dataSegments[1].toString())
})
38 changes: 38 additions & 0 deletions packages/core/test/helpers/offers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import * as AggregateAPI from '@web3-storage/aggregate-api'

import { randomCARs } from './utils.js'

/**
* @param {number} length
* @param {number} size
*/
export async function generateOffers(length, size) {
return (await randomCARs(length, size))
// Inflate size for testing within range
.map((car) => ({
...car,
size: car.size * 10e5,
}))
}

/**
* @returns {AggregateAPI.AggregateStore}
*/
export function createAggregateStore() {
const aggregates = new Set()

return {
get: async (commitmentProof) => {
if (aggregates.has(commitmentProof.toString())) {
return Promise.resolve(['success'])
}
return Promise.resolve(undefined)
},
// @ts-expect-error not in interface
set: async (commitmentProof) => {
aggregates.add(commitmentProof.toString())

return Promise.resolve()
}
}
}
Loading

0 comments on commit 935835e

Please sign in to comment.