Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase live data poll rate to 2 seconds in OSS only #17000

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions js_modules/dagster-ui/packages/app-oss/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav';
import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot';
import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton';
import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks';
import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider';
import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider';
import React from 'react';

import {CommunityNux} from './NUX/CommunityNux';
import {extractInitializationData} from './extractInitializationData';
import {telemetryLink} from './telemetryLink';

const {pathPrefix, telemetryEnabled} = extractInitializationData();
const {pathPrefix, telemetryEnabled, liveDataPollRate} = extractInitializationData();

const apolloLinks = [logLink, errorLink, timeStartLink];

Expand All @@ -37,14 +38,16 @@ const appCache = createAppCache();
// eslint-disable-next-line import/no-default-export
export default function AppPage() {
return (
<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
<LiveDataPollRateContext.Provider value={liveDataPollRate ?? 2000}>
<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
</LiveDataPollRateContext.Provider>
);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
const ELEMENT_ID = 'initialization-data';
const PREFIX_PLACEHOLDER = '__PATH_PREFIX__';
const TELEMETRY_PLACEHOLDER = '__TELEMETRY_ENABLED__';
const LIVE_DATA_POLL_RATE = '__LIVE_DATA_POLL_RATE__';

let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefined;
let value: {pathPrefix: string; telemetryEnabled: boolean; liveDataPollRate?: number} | undefined =
undefined;

// Determine the path prefix value, which is set server-side.
// This value will be used for prefixing paths for the GraphQL
// endpoint and dynamically loaded bundles.
export const extractInitializationData = (): {
pathPrefix: string;
telemetryEnabled: boolean;
liveDataPollRate?: number;
} => {
if (!value) {
value = {pathPrefix: '', telemetryEnabled: false};
Expand All @@ -22,6 +25,9 @@ export const extractInitializationData = (): {
if (parsed.telemetryEnabled !== TELEMETRY_PLACEHOLDER) {
value.telemetryEnabled = parsed.telemetryEnabled;
}
if (parsed.liveDataPollRate !== LIVE_DATA_POLL_RATE) {
value.liveDataPollRate = parsed.liveDataPollRate;
}
}
}
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export default function Document() {
__html: `
{
"pathPrefix": "__PATH_PREFIX__",
"telemetryEnabled": "__TELEMETRY_ENABLED__"
"telemetryEnabled": "__TELEMETRY_ENABLED__",
"liveDataPollRate": "__LIVE_DATA_POLL_RATE__"
}
`,
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const BATCHING_INTERVAL = 250;
export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

export const LiveDataPollRateContext = React.createContext<number>(SUBSCRIPTION_IDLE_POLL_RATE);

type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
Expand Down Expand Up @@ -179,33 +181,38 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
setOldestDataTimestamp(oldestDataTimestamp === Infinity ? 0 : oldestDataTimestamp);
}, []);

const pollRate = React.useContext(LiveDataPollRateContext);

React.useEffect(() => {
if (!isDocumentVisible) {
return;
}
// Check for assets to fetch every 5 seconds to simplify logic
// This means assets will be fetched at most 5 + SUBSCRIPTION_IDLE_POLL_RATE after their first fetch
// but then will be fetched every SUBSCRIPTION_IDLE_POLL_RATE after that
const interval = setInterval(() => fetchData(client, onUpdatingOrUpdated), 5000);
fetchData(client, onUpdatingOrUpdated);
const interval = setInterval(
() => fetchData(client, pollRate, onUpdatingOrUpdated),
Math.min(pollRate, 5000),
);
fetchData(client, pollRate, onUpdatingOrUpdated);
return () => {
clearInterval(interval);
};
}, [client, isDocumentVisible, onUpdatingOrUpdated]);
}, [client, pollRate, isDocumentVisible, onUpdatingOrUpdated]);

React.useEffect(() => {
if (!needsImmediateFetch) {
return;
}
const timeout = setTimeout(() => {
fetchData(client, onUpdatingOrUpdated);
fetchData(client, pollRate, onUpdatingOrUpdated);
setNeedsImmediateFetch(false);
// Wait BATCHING_INTERVAL before doing fetch in case the component is unmounted quickly (eg. in the case of scrolling/filtering quickly)
}, BATCHING_INTERVAL);
return () => {
clearTimeout(timeout);
};
}, [client, needsImmediateFetch, onUpdatingOrUpdated]);
}, [client, needsImmediateFetch, pollRate, onUpdatingOrUpdated]);

React.useEffect(() => {
providerListener = (stringKey, assetData) => {
Expand Down Expand Up @@ -297,6 +304,7 @@ let isFetching = false;
async function _batchedQueryAssets(
assetKeys: AssetKeyInput[],
client: ApolloClient<any>,
pollRate: number,
setData: (data: Record<string, LiveDataForNode>) => void,
onUpdatingOrUpdated: () => void,
) {
Expand All @@ -314,11 +322,11 @@ async function _batchedQueryAssets(
});
onUpdatingOrUpdated();

function doNextFetch() {
function doNextFetch(pollRate: number) {
isFetching = false;
const nextAssets = _determineAssetsToFetch();
const nextAssets = _determineAssetsToFetch(pollRate);
if (nextAssets.length) {
_batchedQueryAssets(nextAssets, client, setData, onUpdatingOrUpdated);
_batchedQueryAssets(nextAssets, client, pollRate, setData, onUpdatingOrUpdated);
}
}
try {
Expand All @@ -331,7 +339,7 @@ async function _batchedQueryAssets(
});
setData(data);
onUpdatingOrUpdated();
doNextFetch();
doNextFetch(pollRate);
} catch (e) {
console.error(e);
// Retry fetching in 5 seconds if theres a network error
Expand Down Expand Up @@ -369,7 +377,7 @@ function _unsubscribeToAssetKey(assetKey: AssetKeyInput, setData: DataForNodeLis
}

// Determine assets to fetch taking into account the last time they were fetched and whether they are already being fetched.
function _determineAssetsToFetch() {
function _determineAssetsToFetch(pollRate: number) {
const assetsToFetch: AssetKeyInput[] = [];
const assetsWithoutData: AssetKeyInput[] = [];
const allKeys = Object.keys(_assetKeyListeners);
Expand All @@ -380,7 +388,7 @@ function _determineAssetsToFetch() {
continue;
}
const lastFetchTime = lastFetchedOrRequested[key]?.fetched ?? null;
if (lastFetchTime !== null && Date.now() - lastFetchTime < SUBSCRIPTION_IDLE_POLL_RATE) {
if (lastFetchTime !== null && Date.now() - lastFetchTime < pollRate) {
continue;
}
if (lastFetchTime && isDocumentVisible()) {
Expand All @@ -394,10 +402,11 @@ function _determineAssetsToFetch() {
return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE);
}

function fetchData(client: ApolloClient<any>, onUpdatingOrUpdated: () => void) {
function fetchData(client: ApolloClient<any>, pollRate: number, onUpdatingOrUpdated: () => void) {
_batchedQueryAssets(
_determineAssetsToFetch(),
_determineAssetsToFetch(pollRate),
client,
pollRate,
(data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from dagster import (
_check as check,
)
Expand All @@ -12,6 +14,7 @@
def create_app_from_workspace_process_context(
workspace_process_context: IWorkspaceProcessContext,
path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
**kwargs,
) -> Starlette:
check.inst_param(
Expand All @@ -34,4 +37,5 @@ def create_app_from_workspace_process_context(
return DagsterWebserver(
workspace_process_context,
path_prefix,
live_data_poll_rate,
).create_asgi_app(**kwargs)
20 changes: 18 additions & 2 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def create_dagster_webserver_cli():
required=False,
hidden=True,
)
@click.option(
"--live-data-poll-rate",
help="Rate at which the dagster UI polls for updated asset data (in milliseconds)",
type=click.INT,
required=False,
default=2000,
show_default=True,
)
@click.version_option(version=__version__, prog_name="dagster-webserver")
def dagster_webserver(
host: str,
Expand All @@ -175,6 +183,7 @@ def dagster_webserver(
dagster_log_level: str,
code_server_log_level: str,
instance_ref: Optional[str],
live_data_poll_rate: int,
**kwargs: ClickArgValue,
):
if suppress_warnings:
Expand Down Expand Up @@ -205,7 +214,12 @@ def dagster_webserver(
code_server_log_level=code_server_log_level,
) as workspace_process_context:
host_dagster_ui_with_workspace_process_context(
workspace_process_context, host, port, path_prefix, uvicorn_log_level
workspace_process_context,
host,
port,
path_prefix,
uvicorn_log_level,
live_data_poll_rate,
)


Expand All @@ -227,18 +241,20 @@ def host_dagster_ui_with_workspace_process_context(
port: Optional[int],
path_prefix: str,
log_level: str,
live_data_poll_rate: Optional[int] = None,
):
check.inst_param(
workspace_process_context, "workspace_process_context", IWorkspaceProcessContext
)
host = check.opt_str_param(host, "host", "127.0.0.1")
check.opt_int_param(port, "port")
check.str_param(path_prefix, "path_prefix")
check.opt_int_param(live_data_poll_rate, "live_data_poll_rate")

logger = logging.getLogger(WEBSERVER_LOGGER_NAME)

app = create_app_from_workspace_process_context(
workspace_process_context, path_prefix, lifespan=_lifespan
workspace_process_context, path_prefix, live_data_poll_rate, lifespan=_lifespan
)

if not port:
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import io
import uuid
from os import path, walk
from typing import Generic, List, TypeVar
from typing import Generic, List, Optional, TypeVar

import dagster._check as check
from dagster import __version__ as dagster_version
Expand Down Expand Up @@ -46,9 +46,11 @@ def __init__(
self,
process_context: T_IWorkspaceProcessContext,
app_path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
uses_app_path_prefix: bool = True,
):
self._process_context = process_context
self._live_data_poll_rate = live_data_poll_rate
self._uses_app_path_prefix = uses_app_path_prefix
super().__init__(app_path_prefix)

Expand Down Expand Up @@ -209,17 +211,22 @@ def index_html_endpoint(self, request: Request):
**{"Content-Security-Policy": self.make_csp_header(nonce)},
**self.make_security_headers(),
}
return HTMLResponse(
content = (
rendered_template.replace(
"BUILDTIME_ASSETPREFIX_REPLACE_ME", f"{self._app_path_prefix}"
)
.replace("__PATH_PREFIX__", self._app_path_prefix)
.replace(
'"__TELEMETRY_ENABLED__"', str(context.instance.telemetry_enabled).lower()
)
.replace("NONCE-PLACEHOLDER", nonce),
headers=headers,
.replace("NONCE-PLACEHOLDER", nonce)
)

if self._live_data_poll_rate:
content = content.replace(
"__LIVE_DATA_POLL_RATE__", str(self._live_data_poll_rate)
)
return HTMLResponse(content, headers=headers)
except FileNotFoundError:
raise Exception("""
Can't find webapp files.
Expand Down
9 changes: 9 additions & 0 deletions python_modules/dagster/dagster/_cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def dev_command_options(f):
help="Host to use for the Dagster webserver.",
required=False,
)
@click.option(
"--live-data-poll-rate",
help="Rate at which the dagster UI polls for updated asset data (in milliseconds)",
default="2000",
show_default=True,
required=False,
)
@deprecated(
breaking_version="2.0", subject="--dagit-port and --dagit-host args", emit_runtime_warning=False
)
Expand All @@ -89,6 +96,7 @@ def dev_command(
log_level: str,
port: Optional[str],
host: Optional[str],
live_data_poll_rate: Optional[str],
**kwargs: ClickArgValue,
) -> None:
# check if dagster-webserver installed, crash if not
Expand Down Expand Up @@ -163,6 +171,7 @@ def dev_command(
+ (["--port", port] if port else [])
+ (["--host", host] if host else [])
+ (["--dagster-log-level", log_level])
+ (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else [])
+ args
)
daemon_process = open_ipc_subprocess(
Expand Down