Skip to content

Commit

Permalink
Stick to the pull host while triggering the catalyst pull start (#2138)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Apr 22, 2024
1 parent 676d44d commit 29ad25a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ export const triggerCatalystPullStart =
url.searchParams.set("lon", lon.toString());
playbackUrl = url.toString();
console.log(
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}`
`triggering catalyst pull start for streamId=${stream.id} playbackId=${stream.playbackId} lat=${lat} lon=${lon} pullRegion=${stream.pullRegion}, playbackUrl=${playbackUrl}`
);
}

Expand Down
24 changes: 23 additions & 1 deletion packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
import serverPromise, { TestServer } from "../test-server";
import { semaphore, sleep } from "../util";
import { generateUniquePlaybackId } from "./generate-keys";
import { extractRegionFrom } from "./stream";
import { extractHostFrom, extractRegionFrom } from "./stream";

const uuidRegex = /[0-9a-f]+(-[0-9a-f]+){4}/;

Expand Down Expand Up @@ -713,6 +713,28 @@ describe("controllers/stream", () => {
const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});
it("should extract host from redirected playback url", async () => {
expect(
extractHostFrom(
"https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("https://sto-prod-catalyst-0.lp-playback.studio:443");
expect(
extractHostFrom(
"https://mos2-prod-catalyst-0.lp-playback.studio:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("https://mos2-prod-catalyst-0.lp-playback.studio:443");
expect(
extractHostFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+not-used-playback/index.m3u8"
)
).toBe("https://fra-staging-staging-catalyst-0.livepeer.monster:443");
expect(
extractHostFrom(
"https://fra-staging-staging-catalyst-0.livepeer.monster:443/hls/video+other-playback/index.m3u8"
)
).toBe(null);
});
it("should extract region from redirected playback url", async () => {
expect(
extractRegionFrom(
Expand Down
39 changes: 28 additions & 11 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import logger from "../logger";
import { authorizer } from "../middleware";
import { validatePost } from "../middleware";
import { geolocateMiddleware } from "../middleware";
import { fetchWithTimeout } from "../util";
import { fetchWithTimeoutAndRedirects } from "../util";
import { CliArgs } from "../parse-cli";
import {
DetectionWebhookPayload,
Expand Down Expand Up @@ -236,12 +236,12 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) {
);
}

async function resolvePullRegion(
async function resolvePullHostAndRegion(
stream: NewStreamPayload,
ingest: string
): Promise<string> {
): Promise<{ pullHost: string; pullRegion: string }> {
if (process.env.NODE_ENV === "test") {
return null;
return { pullHost: null, pullRegion: null };
}
const url = new URL(
pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`)
Expand All @@ -253,13 +253,23 @@ async function resolvePullRegion(
}
const playbackUrl = url.toString();
// Send any playback request to catalyst-api, which effectively resolves the region using MistUtilLoad
const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" });
if (response.status < 300 || response.status >= 400) {
// not a redirect response, so we can't determine the region
const response = await fetchWithTimeoutAndRedirects(playbackUrl, {});
if (response.status !== 200) {
// not a correct status code, so we can't determine the region/host
return null;
}
const redirectUrl = response.headers.get("location");
return extractRegionFrom(redirectUrl);
return {
pullHost: extractHostFrom(response.url),
pullRegion: extractRegionFrom(response.url),
};
}

// Extracts host from redirected node URL, e.g. "https://sto-prod-catalyst-0.lp-playback.studio:443" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
export function extractHostFrom(playbackUrl: string): string {
const hostRegex =
/(https?:\/\/.+-\w+-catalyst.+)\/hls\/.+not-used-playback\/index.m3u8/;
const matches = playbackUrl.match(hostRegex);
return matches ? matches[1] : null;
}

// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8"
Expand Down Expand Up @@ -1085,7 +1095,10 @@ app.put(
}
const streamExisted = streams.length === 1;

const pullRegion = await resolvePullRegion(rawPayload, ingest);
const { pullHost, pullRegion } = await resolvePullHostAndRegion(
rawPayload,
ingest
);

let stream: DBStream;
if (!streamExisted) {
Expand Down Expand Up @@ -1122,8 +1135,12 @@ app.put(
await triggerCatalystStreamUpdated(req, stream.playbackId);
}

// If pullHost was resolved, then stick to that host for triggering Catalyst pull start
const playbackUrl = pullHost
? getHLSPlaybackUrl(pullHost, stream)
: getHLSPlaybackUrl(ingest, stream);
if (!stream.isActive || streamExisted) {
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));
await triggerCatalystPullStart(stream, playbackUrl);
}

res.status(streamExisted ? 200 : 201);
Expand Down

0 comments on commit 29ad25a

Please sign in to comment.