-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pullRegion to streams to make the ingest node sticky #2127
Changes from 4 commits
dbd5d9c
7a021ed
a1c8b34
8adb51a
d9b11cd
d6645e2
c5b569a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ import logger from "../logger"; | |
import { authorizer } from "../middleware"; | ||
import { validatePost } from "../middleware"; | ||
import { geolocateMiddleware } from "../middleware"; | ||
import { fetchWithTimeout } from "../util"; | ||
import { CliArgs } from "../parse-cli"; | ||
import { | ||
DetectionWebhookPayload, | ||
|
@@ -235,6 +236,39 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) { | |
); | ||
} | ||
|
||
async function resolvePullRegion( | ||
stream: NewStreamPayload, | ||
ingest: string | ||
): Promise<string> { | ||
if (process.env.NODE_ENV === "test") { | ||
return null; | ||
} | ||
const url = new URL( | ||
pathJoin(ingest, `hls`, "not-used-playback", `index.m3u8`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't we need to handle flv as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this call is not for the "any redirect", this is just to discover which should be the ingest node. So, kind-of hack the system and use catalyst-api redirect balancing (effectively using |
||
); | ||
const { lat, lon } = stream.pull?.location ?? {}; | ||
if (lat && lon) { | ||
url.searchParams.set("lat", lat.toString()); | ||
url.searchParams.set("lon", lon.toString()); | ||
} | ||
const playbackUrl = url.toString(); | ||
const response = await fetchWithTimeout(playbackUrl, { redirect: "manual" }); | ||
leszko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (response.status < 300 || response.status >= 400) { | ||
// not a redirect response, so we can't determine the region | ||
return null; | ||
} | ||
const redirectUrl = response.headers.get("location"); | ||
return extractRegionFrom(redirectUrl); | ||
} | ||
|
||
// Extracts region from redirected node URL, e.g. "sto" from "https://sto-prod-catalyst-0.lp-playback.studio:443/hls/video+foo/index.m3u8" | ||
export function extractRegionFrom(playbackUrl: string): string { | ||
const regionRegex = | ||
/https?:\/\/(\w+)-.+-catalyst.+not-used-playback\/index.m3u8/; | ||
const matches = playbackUrl.match(regionRegex); | ||
return matches ? matches[1] : null; | ||
} | ||
|
||
export function getHLSPlaybackUrl(ingest: string, stream: DBStream) { | ||
return pathJoin(ingest, `hls`, stream.playbackId, `index.m3u8`); | ||
} | ||
|
@@ -1001,6 +1035,8 @@ app.put( | |
const { key = "pull.source", waitActive } = toStringValues(req.query); | ||
const rawPayload = req.body as NewStreamPayload; | ||
|
||
const ingest = await getIngestBase(req); | ||
|
||
if (!rawPayload.pull) { | ||
return res.status(400).json({ | ||
errors: [`stream pull configuration is required`], | ||
|
@@ -1046,9 +1082,13 @@ app.put( | |
} | ||
const streamExisted = streams.length === 1; | ||
|
||
const pullRegion = await resolvePullRegion(rawPayload, ingest); | ||
|
||
let stream: DBStream; | ||
if (!streamExisted) { | ||
stream = await handleCreateStream(req); | ||
stream.pullRegion = pullRegion; | ||
await db.stream.replace(stream); | ||
} else { | ||
const oldStream = streams[0]; | ||
const sleepFor = terminateDelay(oldStream); | ||
|
@@ -1063,6 +1103,7 @@ app.put( | |
...oldStream, | ||
...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload | ||
suspended: false, | ||
pullRegion, | ||
...payload, | ||
}; | ||
await db.stream.replace(stream); | ||
|
@@ -1073,7 +1114,6 @@ app.put( | |
} | ||
|
||
if (!stream.isActive || streamExisted) { | ||
const ingest = await getIngestBase(req); | ||
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream)); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to have some flv urls in here to verify nothing breaks:
e.g
/flv/<playback-id>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As pointed out below, we don't use it for
/flv