Skip to content

Commit

Permalink
api: task: add projectId to tasks (#2268)
Browse files Browse the repository at this point in the history
* api: task: add projectId to tasks

* address comments

* fix

* pass user around

* fix

* address comments

* fix

* fix tests
  • Loading branch information
gioelecerati authored Aug 28, 2024
1 parent 39956a5 commit 3794beb
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 28 deletions.
10 changes: 9 additions & 1 deletion packages/api/src/controllers/asset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
ObjectStore,
PlaybackPolicy,
Task,
User,
} from "../schema/types";
import { db } from "../store";
import {
Expand Down Expand Up @@ -517,6 +518,7 @@ async function reconcileAssetStorage(
{ taskScheduler, config }: Request,
asset: WithID<Asset>,
newStorage: Asset["storage"],
user: User,
task?: WithID<Task>,
): Promise<Asset["storage"]> {
let { storage } = asset;
Expand All @@ -535,6 +537,7 @@ async function reconcileAssetStorage(
task = await taskScheduler.createAndScheduleTask(
"export",
{ export: { ipfs: newSpec } },
user,
asset,
);
}
Expand Down Expand Up @@ -824,6 +827,7 @@ app.post(
const task = await req.taskScheduler.createAndScheduleTask(
"export",
{ export: params },
req.user,
asset,
);

Expand All @@ -837,6 +841,7 @@ app.post(
req,
asset,
{ ipfs: { spec: params.ipfs } },
req.user,
task,
);
await req.taskScheduler.updateAsset(asset, { storage });
Expand Down Expand Up @@ -919,6 +924,7 @@ app.post(
targetSegmentSizeSecs,
},
},
req.user,
undefined,
asset,
);
Expand Down Expand Up @@ -991,6 +997,8 @@ app.post(
...(profiles ? { profiles } : null), // avoid serializing null profiles on the task
},
},
req.user.id,
req.project?.id,
null,
asset,
);
Expand Down Expand Up @@ -1292,7 +1300,7 @@ app.patch(

let storage = storageInputToState(storageInput);
if (storage) {
storage = await reconcileAssetStorage(req, asset, storage);
storage = await reconcileAssetStorage(req, asset, storage, req.user);
}

if (
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/controllers/clip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ app.post(
sourceObjectStoreId: objectStoreId,
},
},
owner,
null,
asset,
owner.id,
requesterId,
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import serverPromise, { TestServer } from "../../test-server";
import { TestClient, clearDatabase, setupUsers } from "../../test-helpers";
import { v4 as uuid } from "uuid";
import { db } from "../../store";

const CREATOR_PUBLIC_KEY = "0xB7D5D7a6FcFE31611E4673AA3E61f21dC56723fC";
const NOW = 1685527855812;
Expand Down Expand Up @@ -84,7 +85,6 @@ beforeEach(async () => {
});
expect(res.status).toBe(204);

client = new TestClient({ server });
res = await client.post("/experiment/-/attestation", {
primaryType: "VideoAttestation",
domain: {
Expand Down
18 changes: 11 additions & 7 deletions packages/api/src/controllers/experiment/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ app.post("/", validatePost("attestation"), async (req, res) => {
...req.body,
});

const task = await taskScheduler.createAndScheduleTask("export-data", {
exportData: {
ipfs: {},
type: "attestation",
id: attestationMetadata.id,
content: toContent(attestationMetadata),
const task = await taskScheduler.createAndScheduleTask(
"export-data",
{
exportData: {
ipfs: {},
type: "attestation",
id: attestationMetadata.id,
content: toContent(attestationMetadata),
},
},
});
req.user,
);

attestationMetadata.storage = {
status: {
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/controllers/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const fieldsMap: FieldsMap = {
type: `task.data->>'type'`,
inputAssetId: `task.data->>'inputAssetId'`,
outputAssetId: `task.data->>'outputAssetId'`,
projectId: `task.data->>'projectId'`,
};

export function toExternalTask(
Expand Down Expand Up @@ -209,6 +210,11 @@ app.get("/", authorizer({}), async (req, res) => {

const query = parseFilters(fieldsMap, filters);
query.push(sql`task.data->>'userId' = ${req.user.id}`);
query.push(
sql`coalesce(task.data->>'projectId', ${
req.user.defaultProjectId || ""
}) = ${req.project?.id || ""}`,
);

if (!all || all === "false" || !req.user.admin) {
query.push(sql`task.data->>'deleted' IS NULL`);
Expand Down
4 changes: 1 addition & 3 deletions packages/api/src/controllers/transcode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ app.post(
catalystPipelineStrategy,
},
},
null,
null,
req.user.id,
req.user,
);
res.json(task);
},
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/schema/api-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,10 @@ components:
type: string
description: ID of the output asset
example: 09F8B46C-61A0-4254-9875-F71F4C605BC7
projectId:
type: string
description: ID of the project
example: 09F8B46C-61A0-4254-9875-F71F4C605BC7
requesterId:
type: string
description: ID of the requester hash(IP + SALT + PlaybackId)
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/schema/db-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,8 @@ components:
index: true
outputAssetId:
index: true
projectId:
index: true
requesterId:
type: string
description: ID of the requester hash(IP + SALT + PlaybackId)
Expand Down
25 changes: 10 additions & 15 deletions packages/api/src/task/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
toExternalTask,
} from "../controllers/task";
import { CliArgs } from "../parse-cli";
import { Asset, Task } from "../schema/types";
import { Asset, Task, User } from "../schema/types";
import { jobsDb as db } from "../store"; // use only the jobs DB pool on queue logic
import { taskOutputToIpfsStorage } from "../store/asset-table";
import { TooManyRequestsError } from "../store/errors";
Expand Down Expand Up @@ -347,40 +347,34 @@ export class TaskScheduler {
async createAndScheduleTask(
type: Task["type"],
params: Task["params"],
user: User,
inputAsset?: Asset,
outputAsset?: Asset,
userId?: string,
requesterId?: string,
) {
const projectId =
inputAsset?.projectId || outputAsset?.projectId || user?.defaultProjectId;
const task = await this.createTask(
type,
params,
user.id,
projectId,
inputAsset,
outputAsset,
userId,
requesterId,
);

let uId = inputAsset?.userId || outputAsset?.userId || userId;

if (uId) {
const user = await db.user.get(uId);

if (user?.disabled) {
throw new Error("user is disabled");
}
}

await this.scheduleTask(task);
return task;
}

async createTask(
type: Task["type"],
params: Task["params"],
userId: string,
projectId: string,
inputAsset?: Asset,
outputAsset?: Asset,
userId?: string,
requesterId?: string,
) {
const task = await db.task.create({
Expand All @@ -389,12 +383,13 @@ export class TaskScheduler {
type: type,
outputAssetId: outputAsset?.id,
inputAssetId: inputAsset?.id,
userId: inputAsset?.userId || outputAsset?.userId || userId,
userId,
params,
status: {
phase: "pending",
updatedAt: Date.now(),
},
projectId,
requesterId,
});
await this.queue.publishWebhook("events.task.spawned", {
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/webhooks/cannon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ export default class WebhookCannon {
this.secondaryRecordObjectStoreId,
);

const user = await db.user.get(session.userId, { useCache: true });

await taskScheduler.createAndScheduleTask(
"upload",
{
Expand All @@ -615,6 +617,7 @@ export default class WebhookCannon {
)),
},
},
user,
undefined,
asset,
);
Expand Down

0 comments on commit 3794beb

Please sign in to comment.