From dbd5d9c37f82d4e14a5f60bffc1097db21d5bd2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 9 Apr 2024 16:00:20 +0200 Subject: [PATCH 1/7] Add createdRegion to streams --- packages/api/src/controllers/stream.ts | 3 +++ packages/api/src/schema/db-schema.yaml | 2 ++ packages/api/src/store/stream-table.ts | 1 + 3 files changed, 6 insertions(+) diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 05959b0d85..564cde77fd 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -1063,6 +1063,7 @@ app.put( ...oldStream, ...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload suspended: false, + createdRegion: req.config.ownRegion, ...payload, }; await db.stream.replace(stream); @@ -1203,6 +1204,7 @@ async function handleCreateStream(req: Request) { const id = uuid(); const createdAt = Date.now(); + const createdRegion = req.config.ownRegion; // TODO: Don't create a streamKey if there's a pull source (here and on www) const streamKey = await generateUniqueStreamKey(id); let playbackId = await generateUniquePlaybackId(id, [streamKey]); @@ -1230,6 +1232,7 @@ async function handleCreateStream(req: Request) { objectStoreId, id, createdAt, + createdRegion, streamKey, playbackId, createdByTokenName: req.token?.name, diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index 7b18d42888..99dd00638f 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -688,6 +688,8 @@ components: type: object isActive: index: true + createdRegion: + type: string createdAt: index: true lastTerminatedAt: diff --git a/packages/api/src/store/stream-table.ts b/packages/api/src/store/stream-table.ts index f3b2350619..8fa8f0a948 100644 --- a/packages/api/src/store/stream-table.ts +++ b/packages/api/src/store/stream-table.ts @@ -414,6 +414,7 @@ const adminOnlyFields = [ "createdByTokenId", "pullLockedAt", "pullLockedBy", + "createdRegion", ]; const privateFields = [ From 7a021ed37a6518d2181fbbb28c9417e8e9700f78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 10 Apr 2024 10:33:59 +0200 Subject: [PATCH 2/7] Add pull region to stream object --- packages/api/src/controllers/stream.ts | 37 +++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 564cde77fd..8782323d92 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -9,6 +9,7 @@ import logger from "../logger"; import { authorizer } from "../middleware"; import { validatePost } from "../middleware"; import { geolocateMiddleware } from "../middleware"; +import { fetchWithTimeout } from "../util"; import { CliArgs } from "../parse-cli"; import { DetectionWebhookPayload, @@ -235,6 +236,31 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) { ); } +async function resolvePullRegion( + req: Request, + ingest: string +): Promise { + const payload = req.body as NewStreamPayload; + const url = new URL(pathJoin(ingest, `hls`, "any-playback", `index.m3u8`)); + const { lat, lon } = payload.pull?.location ?? {}; + if (lat && lon) { + url.searchParams.set("lat", lat.toString()); + url.searchParams.set("lon", lon.toString()); + } + const playbackUrl = url.toString(); + const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" }); + if (response.status < 300 || response.status >= 400) { + return null; + } + const redirectUrl = response.headers.get("location"); + + // TODO: Write better regxp :) + const regionRegex = /https:\/\/(\w+)-\w+-(\d+)\./; + const match = redirectUrl.match(regionRegex); + const region = match ? match[2] : null; + return region; +} + export function getHLSPlaybackUrl(ingest: string, stream: DBStream) { return pathJoin(ingest, `hls`, stream.playbackId, `index.m3u8`); } @@ -1001,6 +1027,8 @@ app.put( const { key = "pull.source", waitActive } = toStringValues(req.query); const rawPayload = req.body as NewStreamPayload; + const ingest = await getIngestBase(req); + if (!rawPayload.pull) { return res.status(400).json({ errors: [`stream pull configuration is required`], @@ -1046,9 +1074,13 @@ app.put( } const streamExisted = streams.length === 1; + const pullRegion = await resolvePullRegion(req, ingest); + let stream: DBStream; if (!streamExisted) { stream = await handleCreateStream(req); + stream.createdRegion = pullRegion; + await db.stream.replace(stream); } else { const oldStream = streams[0]; const sleepFor = terminateDelay(oldStream); @@ -1063,7 +1095,7 @@ app.put( ...oldStream, ...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload suspended: false, - createdRegion: req.config.ownRegion, + createdRegion: pullRegion, ...payload, }; await db.stream.replace(stream); @@ -1074,7 +1106,6 @@ app.put( } if (!stream.isActive || streamExisted) { - const ingest = await getIngestBase(req); await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream)); } @@ -1204,7 +1235,6 @@ async function handleCreateStream(req: Request) { const id = uuid(); const createdAt = Date.now(); - const createdRegion = req.config.ownRegion; // TODO: Don't create a streamKey if there's a pull source (here and on www) const streamKey = await generateUniqueStreamKey(id); let playbackId = await generateUniquePlaybackId(id, [streamKey]); @@ -1232,7 +1262,6 @@ async function handleCreateStream(req: Request) { objectStoreId, id, createdAt, - createdRegion, streamKey, playbackId, createdByTokenName: req.token?.name, From a1c8b3453c94d632b7ec5265f42547053dcc6d3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Thu, 11 Apr 2024 14:41:04 +0200 Subject: [PATCH 3/7] Raname createdRegion to pullRegion --- packages/api/src/controllers/stream.ts | 4 ++-- packages/api/src/schema/db-schema.yaml | 4 ++-- packages/api/src/store/stream-table.ts | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 8782323d92..b43b72c2eb 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -1079,7 +1079,7 @@ app.put( let stream: DBStream; if (!streamExisted) { stream = await handleCreateStream(req); - stream.createdRegion = pullRegion; + stream.pullRegion = pullRegion; await db.stream.replace(stream); } else { const oldStream = streams[0]; @@ -1095,7 +1095,7 @@ app.put( ...oldStream, ...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload suspended: false, - createdRegion: pullRegion, + pullRegion, ...payload, }; await db.stream.replace(stream); diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index 99dd00638f..fb90633793 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -688,8 +688,6 @@ components: type: object isActive: index: true - createdRegion: - type: string createdAt: index: true lastTerminatedAt: @@ -709,6 +707,8 @@ components: example: 1587667174725 pullLockedBy: type: string + pullRegion: + type: string playbackId: unique: true mistHost: diff --git a/packages/api/src/store/stream-table.ts b/packages/api/src/store/stream-table.ts index 8fa8f0a948..0ae1b91223 100644 --- a/packages/api/src/store/stream-table.ts +++ b/packages/api/src/store/stream-table.ts @@ -414,7 +414,7 @@ const adminOnlyFields = [ "createdByTokenId", "pullLockedAt", "pullLockedBy", - "createdRegion", + "pullRegion", ]; const privateFields = [ From 8adb51abdde906509d644c5217df717c6fdc2d12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Thu, 11 Apr 2024 16:11:42 +0200 Subject: [PATCH 4/7] Refactor + add test --- packages/api/src/controllers/stream.test.ts | 23 +++++++++++++++++ packages/api/src/controllers/stream.ts | 28 +++++++++++++-------- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/packages/api/src/controllers/stream.test.ts b/packages/api/src/controllers/stream.test.ts index ff2949c268..efcdaf65df 100644 --- a/packages/api/src/controllers/stream.test.ts +++ b/packages/api/src/controllers/stream.test.ts @@ -23,6 +23,7 @@ import { import serverPromise, { TestServer } from "../test-server"; import { semaphore, sleep } from "../util"; import { generateUniquePlaybackId } from "./generate-keys"; +import { extractRegionFrom } from "./stream"; const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/; @@ -712,6 +713,28 @@ describe("controllers/stream", () => { const document = await db.stream.get(stream.id); expect(db.stream.addDefaultFields(document)).toEqual(updatedStream); }); + it("should extract region from redirected playback url", async () => { + expect( + extractRegionFrom( + "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8" + ) + ).toBe("sto"); + expect( + extractRegionFrom( + "https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8" + ) + ).toBe("mos2"); + expect( + extractRegionFrom( + "https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8" + ) + ).toBe("fra"); + expect( + extractRegionFrom( + "https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8" + ) + ).toBe(null); + }); }); it("should create a stream, delete it, and error when attempting additional delete or replace", async () => { diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index b43b72c2eb..27805008e1 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -237,12 +237,16 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) { } async function resolvePullRegion( - req: Request, + stream: NewStreamPayload, ingest: string ): Promise { - const payload = req.body as NewStreamPayload; - const url = new URL(pathJoin(ingest, `hls`, "any-playback", `index.m3u8`)); - const { lat, lon } = payload.pull?.location ?? {}; + if (process.env.NODE_ENV === "test") { + return null; + } + const url = new URL( + pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`) + ); + const { lat, lon } = stream.pull?.location ?? {}; if (lat && lon) { url.searchParams.set("lat", lat.toString()); url.searchParams.set("lon", lon.toString()); @@ -250,15 +254,19 @@ async function resolvePullRegion( const playbackUrl = url.toString(); const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" }); if (response.status < 300 || response.status >= 400) { + // not a redirect response, so we can't determine the region return null; } const redirectUrl = response.headers.get("location"); + return extractRegionFrom(redirectUrl); +} - // TODO: Write better regxp :) - const regionRegex = /https:\/\/(\w+)-\w+-(\d+)\./; - const match = redirectUrl.match(regionRegex); - const region = match ? match[2] : null; - return region; +// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8" +export function extractRegionFrom(playbackUrl: string): string { + const regionRegex = + /https?:\/\/(\w+)-.+-catalyst.+not-used-playback\/index.m3u8/; + const matches = playbackUrl.match(regionRegex); + return matches ? matches[1] : null; } export function getHLSPlaybackUrl(ingest: string, stream: DBStream) { @@ -1074,7 +1082,7 @@ app.put( } const streamExisted = streams.length === 1; - const pullRegion = await resolvePullRegion(req, ingest); + const pullRegion = await resolvePullRegion(rawPayload, ingest); let stream: DBStream; if (!streamExisted) { From d9b11cdcee8980a8cffafb92e516324a491684e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 12 Apr 2024 10:00:21 +0200 Subject: [PATCH 5/7] Add comment --- packages/api/src/controllers/stream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 27805008e1..149df0754a 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -252,6 +252,7 @@ async function resolvePullRegion( url.searchParams.set("lon", lon.toString()); } const playbackUrl = url.toString(); + // Send any playback request to catalyst-api, which effectively resolves the region using MistUtilLoad const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" }); if (response.status < 300 || response.status >= 400) { // not a redirect response, so we can't determine the region From d6645e263740064766dc118ed71650676365af4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 12 Apr 2024 10:46:30 +0200 Subject: [PATCH 6/7] Add additional logging --- packages/api/src/controllers/helpers.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/api/src/controllers/helpers.ts b/packages/api/src/controllers/helpers.ts index 5ed5b947b2..441c37cfa3 100644 --- a/packages/api/src/controllers/helpers.ts +++ b/packages/api/src/controllers/helpers.ts @@ -652,6 +652,9 @@ export const triggerCatalystPullStart = url.searchParams.set("lat", lat.toString()); url.searchParams.set("lon", lon.toString()); playbackUrl = url.toString(); + console.log( + `triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}` + ); } const deadline = Date.now() + 2 * PULL_START_TIMEOUT; From c5b569a617696096a7b709fc72dc76e94c6ff92f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 12 Apr 2024 12:25:34 +0200 Subject: [PATCH 7/7] Fix extractRegionFrom regex for staging --- packages/api/src/controllers/stream.test.ts | 2 +- packages/api/src/controllers/stream.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/api/src/controllers/stream.test.ts b/packages/api/src/controllers/stream.test.ts index efcdaf65df..22be226b66 100644 --- a/packages/api/src/controllers/stream.test.ts +++ b/packages/api/src/controllers/stream.test.ts @@ -728,7 +728,7 @@ describe("controllers/stream", () => { extractRegionFrom( "https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8" ) - ).toBe("fra"); + ).toBe("fra-staging"); expect( extractRegionFrom( "https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8" diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 149df0754a..f7baae2699 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -265,7 +265,7 @@ async function resolvePullRegion( // Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8" export function extractRegionFrom(playbackUrl: string): string { const regionRegex = - /https?:\/\/(\w+)-.+-catalyst.+not-used-playback\/index.m3u8/; + /https?:\/\/(.+)-\w+-catalyst.+not-used-playback\/index.m3u8/; const matches = playbackUrl.match(regionRegex); return matches ? matches[1] : null; }