Skip to content

Commit

Permalink
merge master to latest (#2149)
Browse files Browse the repository at this point in the history
* Stick to the pull host while triggering the catalyst pull start (#2138)

* Revert "Stick to the pull host while triggering the catalyst pull start (#2138)" (#2141)

This reverts commit 29ad25a.

* Stick to the pull host while triggering the catalyst pull start (#2142)

* api: ingest: direct base & playback (#2139)

* api: ingest: direct base & playback

* fix

* fix

* implement for a specific hardcoded id for tests

* fix

* fix

* api: Handle active cleanup from pull lock API (#2144)

* refactor: improve api schema (#2109)

* format yaml

* add operationId to all endpoints

* marked rooms api as deprecated

* add tags to each endpoints

* moved unused components to db-schema

* add examples

* override name for sdk generation

* override operation name for sdk generatino

* run prettier & fix typo

* Update packages/api/src/schema/api-schema.yaml

Co-authored-by: Victor Elias <[email protected]>

* Update packages/api/src/schema/api-schema.yaml

Co-authored-by: Victor Elias <[email protected]>

* Update packages/api/src/schema/api-schema.yaml

Co-authored-by: Victor Elias <[email protected]>

* Update api-schema.yaml

* fix: build error

* revert: add new-asset-from-url-payload to api-schema.yaml

* Update api-schema.yaml

* fix build issue

* hardcoded inputcreator filter

* fix: create stream and loosening schema (#2134)

* Update compile-schemas.js

* fix: tsconfig

* fix: revert additionalproperties

* fix: remove InputCreatorId unknown

* fix: api schema

* fix: api schema

* fix: revert changes to additional properties

* fix: fixes from review

* fix: lint

---------

Co-authored-by: Victor Elias <[email protected]>
Co-authored-by: Chase Adams <[email protected]>

* api: Query isHealthy field as a string to handle JSOnull (#2143)

* api: Query isHealthy field as a string to handle JSOnull

* api: Add check on boolean fields filter value

* api: moved objectStoreId and catalystPipelineStrategy to db schema (#2146)

* api: moved params to db schema

* Update db-schema.yaml

* remove url

* api: fix direct playback api (#2147)

* fix june trigger (#2145)

* fix sending track multiple times

* add missing imports

* replace track with page method

---------

Co-authored-by: Rafał Leszko <[email protected]>
Co-authored-by: gioelecerati <[email protected]>
Co-authored-by: Victor Elias <[email protected]>
Co-authored-by: Chase Adams <[email protected]>
  • Loading branch information
5 people authored Apr 24, 2024
1 parent 676d44d commit 7e7361a
Show file tree
Hide file tree
Showing 24 changed files with 891 additions and 312 deletions.
14 changes: 12 additions & 2 deletions packages/api/src/compile-schemas.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const data = _.merge({}, apiData, dbData);
const ajv = new Ajv({ sourceCode: true });

const index = [];
const types = [];
let types = [];

for (const [name, schema] of Object.entries(data.components.schemas)) {
schema.title = name;
Expand All @@ -72,7 +72,17 @@ const data = _.merge({}, apiData, dbData);
const indexPath = path.resolve(validatorDir, "index.js");
write(indexPath, indexStr);

const typeStr = types.join("\n");
const typeDefinition = `export type InputCreatorId =
| {
type: "unverified";
value: string;
}
| string;`;

let typeStr = types.join("\n\n");
const cleanedTypeStr = typeStr.split(typeDefinition).join("");
typeStr = `${cleanedTypeStr.trim()}\n\n${typeDefinition}`;

const typePath = path.resolve(schemaDir, "types.d.ts");
write(typePath, typeStr);
})().catch((err) => {
Expand Down
9 changes: 8 additions & 1 deletion packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ function parseFiltersRaw(fieldsMap: FieldsMap, val: string): SQLStatement[] {
q.push(sql``.append(fv).append(sql` = ${filter.value}`));
} else if (fv.val) {
if (fv.type === "boolean") {
if (typeof filter.value !== "boolean") {
throw new Error(
`expected boolean value for field "${
filter.id
}", got: ${JSON.stringify(filter.value)}`
);
}
q.push(
sql``.append(
`coalesce((${fv.val})::boolean, FALSE) IS ${
Expand Down Expand Up @@ -653,7 +660,7 @@ export const triggerCatalystPullStart =
url.searchParams.set("lon", lon.toString());
playbackUrl = url.toString();
console.log(
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}`
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}, playbackUrl=${playbackUrl}`
);
}

Expand Down
11 changes: 9 additions & 2 deletions packages/api/src/controllers/playback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,16 @@ app.get("/:id", async (req, res) => {
res.status(501);
return res.json({ errors: ["Ingest not configured"] });
}
const ingest = ingests[0].base;

let ingest = ingests[0].base;
let { id } = req.params;

if (
(id === "1ba7nrr34rbjl4bb" || req.user?.directPlayback) &&
ingests[0].baseDirect
) {
ingest = ingests[0].baseDirect;
}

const withRecordings = req.query.recordings === "true";

const origin = req.headers["origin"] ?? "";
Expand Down
50 changes: 48 additions & 2 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractRegionFrom } from "./stream";
import { extractUrlFrom, extractRegionFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -550,13 +550,33 @@ describe("controllers/stream", () => {
const stream = await res.json();

// Mark stream as active
await db.stream.update(stream.id, { isActive: true });
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now(),
});

// Requesting pull lock should fail, because the stream is active (so it should be replicated instead of being pulled)
const reslockPull = await client.post(`/stream/${stream.id}/lockPull`);
expect(reslockPull.status).toBe(423);
});

it("should still lock pull for an active stream that got lost", async () => {
// Create stream pull
const res = await client.put("/stream/pull", postMockPullStream);
expect(res.status).toBe(201);
const stream = await res.json();

// Mark stream as active
await db.stream.update(stream.id, {
isActive: true,
lastSeen: Date.now() - 24 * 60 * 60 * 1000,
});

// Requesting pull lock should work, because the stream is not actually active (outdated lastSeen)
const reslockPull = await client.post(`/stream/${stream.id}/lockPull`);
expect(reslockPull.status).toBe(204);
});

it("should not lock pull for already locked pull", async () => {
// Create stream pull
const res = await client.put("/stream/pull", postMockPullStream);
Expand Down Expand Up @@ -713,6 +733,32 @@ describe("controllers/stream", () => {
const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});
it("should extract host from redirected playback url", async () => {
expect(
extractUrlFrom(
"https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+");
expect(
extractUrlFrom(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+"
);
expect(
extractUrlFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8"
)
).toBe(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+"
);
expect(
extractUrlFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8"
)
).toBe(null);
});
it("should extract region from redirected playback url", async () => {
expect(
extractRegionFrom(
Expand Down
62 changes: 46 additions & 16 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import logger from "../logger";
import { authorizer } from "../middleware";
import { validatePost } from "../middleware";
import { geolocateMiddleware } from "../middleware";
import { fetchWithTimeout } from "../util";
import { fetchWithTimeoutAndRedirects } from "../util";
import { CliArgs } from "../parse-cli";
import {
DetectionWebhookPayload,
Expand Down Expand Up @@ -236,12 +236,12 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) {
);
}

async function resolvePullRegion(
async function resolvePullUrlAndRegion(
stream: NewStreamPayload,
ingest: string
): Promise<string> {
): Promise<{ pullUrl: string; pullRegion: string }> {
if (process.env.NODE_ENV === "test") {
return null;
return { pullUrl: null, pullRegion: null };
}
const url = new URL(
pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`)
Expand All @@ -253,13 +253,23 @@ async function resolvePullRegion(
}
const playbackUrl = url.toString();
// Send any playback request to catalyst-api, which effectively resolves the region using MistUtilLoad
const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" });
if (response.status < 300 || response.status >= 400) {
// not a redirect response, so we can't determine the region
const response = await fetchWithTimeoutAndRedirects(playbackUrl, {});
if (response.status !== 200) {
// not a correct status code, so we can't determine the region/host
return null;
}
const redirectUrl = response.headers.get("location");
return extractRegionFrom(redirectUrl);
return {
pullUrl: extractUrlFrom(response.url),
pullRegion: extractRegionFrom(response.url),
};
}

// Extracts Mist URL from redirected node URL, e.g. "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
export function extractUrlFrom(playbackUrl: string): string {
const hostRegex =
/(https?:\/\/.+-\w+-catalyst.+\/hls\/.+)not-used-playback\/index.m3u8/;
const matches = playbackUrl.match(hostRegex);
return matches ? matches[1] : null;
}

// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
Expand Down Expand Up @@ -379,7 +389,8 @@ const fieldsMap: FieldsMap = {
val: `stream.data->'transcodedSegmentsDuration'`,
type: "real",
},
isHealthy: { val: `stream.data->'isHealthy'`, type: "boolean" },
// isHealthy field is sometimes JSON-`null` so we query it as a string (->>)
isHealthy: { val: `stream.data->>'isHealthy'`, type: "boolean" },
};

app.get("/", authorizer({}), async (req, res) => {
Expand Down Expand Up @@ -1085,7 +1096,10 @@ app.put(
}
const streamExisted = streams.length === 1;

const pullRegion = await resolvePullRegion(rawPayload, ingest);
const { pullUrl, pullRegion } = await resolvePullUrlAndRegion(
rawPayload,
ingest
);

let stream: DBStream;
if (!streamExisted) {
Expand Down Expand Up @@ -1122,8 +1136,12 @@ app.put(
await triggerCatalystStreamUpdated(req, stream.playbackId);
}

// If pullHost was resolved, then stick to that host for triggering Catalyst pull start
const playbackUrl = pullUrl
? pathJoin(pullUrl + stream.playbackId, `index.m3u8`)
: getHLSPlaybackUrl(ingest, stream);
if (!stream.isActive || streamExisted) {
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));
await triggerCatalystPullStart(stream, playbackUrl);
}

res.status(streamExisted ? 200 : 201);
Expand Down Expand Up @@ -1153,12 +1171,24 @@ app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {
return res.json({ errors: ["not found"] });
}

// We have an issue that some of the streams/sessions are not marked as inactive when they should be.
// This is a workaround to clean up the stream in the background
const doingActiveCleanup = activeCleanupOne(
req.config,
stream,
req.queue,
await getIngestBase(req)
);

// the `isActive` field is only cleared later in background, so we ignore it
// in the query below in case we triggered an active cleanup logic above.
const leaseDeadline = Date.now() - leaseTimeout;
const updateRes = await db.stream.update(
[
sql`id = ${stream.id}`,
sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${
Date.now() - leaseTimeout
} AND COALESCE((data->>'isActive')::boolean,FALSE) = FALSE))`,
doingActiveCleanup
? sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${leaseDeadline}))`
: sql`(data->>'pullLockedBy' = ${host} OR (COALESCE((data->>'pullLockedAt')::bigint,0) < ${leaseDeadline} AND COALESCE((data->>'isActive')::boolean,FALSE) = FALSE))`,
],
{ pullLockedAt: Date.now(), pullLockedBy: host },
{ throwIfEmpty: false }
Expand All @@ -1169,7 +1199,7 @@ app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {
return;
}
logger.info(
`/lockPull failed for stream=${id}, isActive=${stream.isActive}, pullLockedBy=${stream.pullLockedBy}, pullLockedAt=${stream.pullLockedAt}`
`/lockPull failed for stream=${id}, isActive=${stream.isActive}, lastSeen=${stream.lastSeen}, pullLockedBy=${stream.pullLockedBy}, pullLockedAt=${stream.pullLockedAt}`
);
res.status(423).end();
});
Expand Down
13 changes: 12 additions & 1 deletion packages/api/src/controllers/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ app.post("/", validatePost("user"), async (req, res) => {
res.json(user);
});

app.patch("/:id", authorizer({}), async (req, res) => {
app.patch("/:id/email", authorizer({}), async (req, res) => {
const { email } = req.body;
const userId = req.user.id;

Expand Down Expand Up @@ -731,6 +731,17 @@ app.patch(
}
);

app.patch("/:id", authorizer({ anyAdmin: true }), async (req, res) => {
const { id } = req.params;
const { directPlayback } = req.body;

if (typeof directPlayback !== "undefined") {
await db.user.update(id, { directPlayback });
}

res.status(204).end();
});

app.post("/token", validatePost("user"), async (req, res) => {
const user = await findUserByEmail(req.body.email);
const [hashedPassword] = await hash(req.body.password, user.salt);
Expand Down
Loading

0 comments on commit 7e7361a

Please sign in to comment.