diff --git a/js_modules/dagster-ui/packages/app-oss/src/App.tsx b/js_modules/dagster-ui/packages/app-oss/src/App.tsx index f44c84b2f9b29..7d2ac4e68c901 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/App.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/App.tsx @@ -6,6 +6,7 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav'; import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot'; import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton'; import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks'; +import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider'; import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider'; import React from 'react'; @@ -13,7 +14,7 @@ import {CommunityNux} from './NUX/CommunityNux'; import {extractInitializationData} from './extractInitializationData'; import {telemetryLink} from './telemetryLink'; -const {pathPrefix, telemetryEnabled} = extractInitializationData(); +const {pathPrefix, telemetryEnabled, liveDataPollRate} = extractInitializationData(); const apolloLinks = [logLink, errorLink, timeStartLink]; @@ -37,14 +38,16 @@ const appCache = createAppCache(); // eslint-disable-next-line import/no-default-export export default function AppPage() { return ( - - - - - - - - - + + + + + + + + + + + ); } diff --git a/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts b/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts index 14ce8793b114f..bd8296776e20e 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts +++ b/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts @@ -1,8 +1,10 @@ const ELEMENT_ID = 'initialization-data'; const PREFIX_PLACEHOLDER = '__PATH_PREFIX__'; const TELEMETRY_PLACEHOLDER = '__TELEMETRY_ENABLED__'; +const LIVE_DATA_POLL_RATE = '__LIVE_DATA_POLL_RATE__'; -let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefined; +let value: {pathPrefix: string; telemetryEnabled: boolean; liveDataPollRate?: number} | undefined = + undefined; // Determine the path prefix value, which is set server-side. // This value will be used for prefixing paths for the GraphQL @@ -10,6 +12,7 @@ let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefin export const extractInitializationData = (): { pathPrefix: string; telemetryEnabled: boolean; + liveDataPollRate?: number; } => { if (!value) { value = {pathPrefix: '', telemetryEnabled: false}; @@ -22,6 +25,9 @@ export const extractInitializationData = (): { if (parsed.telemetryEnabled !== TELEMETRY_PLACEHOLDER) { value.telemetryEnabled = parsed.telemetryEnabled; } + if (parsed.liveDataPollRate !== LIVE_DATA_POLL_RATE) { + value.liveDataPollRate = parsed.liveDataPollRate; + } } } return value; diff --git a/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx b/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx index b3b8fe3d427dc..4111160df449a 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx @@ -35,7 +35,8 @@ export default function Document() { __html: ` { "pathPrefix": "__PATH_PREFIX__", - "telemetryEnabled": "__TELEMETRY_ENABLED__" + "telemetryEnabled": "__TELEMETRY_ENABLED__", + "liveDataPollRate": "__LIVE_DATA_POLL_RATE__" } `, }} diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx index 1b7913fbb3bbd..d93602ebb546c 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx @@ -112,6 +112,8 @@ const BATCHING_INTERVAL = 250; export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000; const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000; +export const LiveDataPollRateContext = React.createContext(SUBSCRIPTION_IDLE_POLL_RATE); + type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void; const AssetLiveDataContext = React.createContext<{ @@ -179,6 +181,8 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = setOldestDataTimestamp(oldestDataTimestamp === Infinity ? 0 : oldestDataTimestamp); }, []); + const pollRate = React.useContext(LiveDataPollRateContext); + React.useEffect(() => { if (!isDocumentVisible) { return; @@ -186,26 +190,29 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = // Check for assets to fetch every 5 seconds to simplify logic // This means assets will be fetched at most 5 + SUBSCRIPTION_IDLE_POLL_RATE after their first fetch // but then will be fetched every SUBSCRIPTION_IDLE_POLL_RATE after that - const interval = setInterval(() => fetchData(client, onUpdatingOrUpdated), 5000); - fetchData(client, onUpdatingOrUpdated); + const interval = setInterval( + () => fetchData(client, pollRate, onUpdatingOrUpdated), + Math.min(pollRate, 5000), + ); + fetchData(client, pollRate, onUpdatingOrUpdated); return () => { clearInterval(interval); }; - }, [client, isDocumentVisible, onUpdatingOrUpdated]); + }, [client, pollRate, isDocumentVisible, onUpdatingOrUpdated]); React.useEffect(() => { if (!needsImmediateFetch) { return; } const timeout = setTimeout(() => { - fetchData(client, onUpdatingOrUpdated); + fetchData(client, pollRate, onUpdatingOrUpdated); setNeedsImmediateFetch(false); // Wait BATCHING_INTERVAL before doing fetch in case the component is unmounted quickly (eg. in the case of scrolling/filtering quickly) }, BATCHING_INTERVAL); return () => { clearTimeout(timeout); }; - }, [client, needsImmediateFetch, onUpdatingOrUpdated]); + }, [client, needsImmediateFetch, pollRate, onUpdatingOrUpdated]); React.useEffect(() => { providerListener = (stringKey, assetData) => { @@ -297,6 +304,7 @@ let isFetching = false; async function _batchedQueryAssets( assetKeys: AssetKeyInput[], client: ApolloClient, + pollRate: number, setData: (data: Record) => void, onUpdatingOrUpdated: () => void, ) { @@ -314,11 +322,11 @@ async function _batchedQueryAssets( }); onUpdatingOrUpdated(); - function doNextFetch() { + function doNextFetch(pollRate: number) { isFetching = false; - const nextAssets = _determineAssetsToFetch(); + const nextAssets = _determineAssetsToFetch(pollRate); if (nextAssets.length) { - _batchedQueryAssets(nextAssets, client, setData, onUpdatingOrUpdated); + _batchedQueryAssets(nextAssets, client, pollRate, setData, onUpdatingOrUpdated); } } try { @@ -331,7 +339,7 @@ async function _batchedQueryAssets( }); setData(data); onUpdatingOrUpdated(); - doNextFetch(); + doNextFetch(pollRate); } catch (e) { console.error(e); // Retry fetching in 5 seconds if theres a network error @@ -369,7 +377,7 @@ function _unsubscribeToAssetKey(assetKey: AssetKeyInput, setData: DataForNodeLis } // Determine assets to fetch taking into account the last time they were fetched and whether they are already being fetched. -function _determineAssetsToFetch() { +function _determineAssetsToFetch(pollRate: number) { const assetsToFetch: AssetKeyInput[] = []; const assetsWithoutData: AssetKeyInput[] = []; const allKeys = Object.keys(_assetKeyListeners); @@ -380,7 +388,7 @@ function _determineAssetsToFetch() { continue; } const lastFetchTime = lastFetchedOrRequested[key]?.fetched ?? null; - if (lastFetchTime !== null && Date.now() - lastFetchTime < SUBSCRIPTION_IDLE_POLL_RATE) { + if (lastFetchTime !== null && Date.now() - lastFetchTime < pollRate) { continue; } if (lastFetchTime && isDocumentVisible()) { @@ -394,10 +402,11 @@ function _determineAssetsToFetch() { return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE); } -function fetchData(client: ApolloClient, onUpdatingOrUpdated: () => void) { +function fetchData(client: ApolloClient, pollRate: number, onUpdatingOrUpdated: () => void) { _batchedQueryAssets( - _determineAssetsToFetch(), + _determineAssetsToFetch(pollRate), client, + pollRate, (data) => { Object.entries(data).forEach(([key, assetData]) => { const listeners = _assetKeyListeners[key]; diff --git a/python_modules/dagster-webserver/dagster_webserver/app.py b/python_modules/dagster-webserver/dagster_webserver/app.py index 9223fa27f0d84..5d04ecd3c9c71 100644 --- a/python_modules/dagster-webserver/dagster_webserver/app.py +++ b/python_modules/dagster-webserver/dagster_webserver/app.py @@ -1,3 +1,5 @@ +from typing import Optional + from dagster import ( _check as check, ) @@ -12,6 +14,7 @@ def create_app_from_workspace_process_context( workspace_process_context: IWorkspaceProcessContext, path_prefix: str = "", + live_data_poll_rate: Optional[int] = None, **kwargs, ) -> Starlette: check.inst_param( @@ -34,4 +37,5 @@ def create_app_from_workspace_process_context( return DagsterWebserver( workspace_process_context, path_prefix, + live_data_poll_rate, ).create_asgi_app(**kwargs) diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 80452a0bcdf41..1dc6a3393a404 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -162,6 +162,14 @@ def create_dagster_webserver_cli(): required=False, hidden=True, ) +@click.option( + "--live-data-poll-rate", + help="Rate at which the dagster UI polls for updated asset data (in milliseconds)", + type=click.INT, + required=False, + default=2000, + show_default=True, +) @click.version_option(version=__version__, prog_name="dagster-webserver") def dagster_webserver( host: str, @@ -175,6 +183,7 @@ def dagster_webserver( dagster_log_level: str, code_server_log_level: str, instance_ref: Optional[str], + live_data_poll_rate: int, **kwargs: ClickArgValue, ): if suppress_warnings: @@ -205,7 +214,12 @@ def dagster_webserver( code_server_log_level=code_server_log_level, ) as workspace_process_context: host_dagster_ui_with_workspace_process_context( - workspace_process_context, host, port, path_prefix, uvicorn_log_level + workspace_process_context, + host, + port, + path_prefix, + uvicorn_log_level, + live_data_poll_rate, ) @@ -227,6 +241,7 @@ def host_dagster_ui_with_workspace_process_context( port: Optional[int], path_prefix: str, log_level: str, + live_data_poll_rate: Optional[int] = None, ): check.inst_param( workspace_process_context, "workspace_process_context", IWorkspaceProcessContext @@ -234,11 +249,12 @@ def host_dagster_ui_with_workspace_process_context( host = check.opt_str_param(host, "host", "127.0.0.1") check.opt_int_param(port, "port") check.str_param(path_prefix, "path_prefix") + check.opt_int_param(live_data_poll_rate, "live_data_poll_rate") logger = logging.getLogger(WEBSERVER_LOGGER_NAME) app = create_app_from_workspace_process_context( - workspace_process_context, path_prefix, lifespan=_lifespan + workspace_process_context, path_prefix, live_data_poll_rate, lifespan=_lifespan ) if not port: diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 37aa16e977aa6..96b10e04592ec 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -2,7 +2,7 @@ import io import uuid from os import path, walk -from typing import Generic, List, TypeVar +from typing import Generic, List, Optional, TypeVar import dagster._check as check from dagster import __version__ as dagster_version @@ -51,9 +51,11 @@ def __init__( self, process_context: T_IWorkspaceProcessContext, app_path_prefix: str = "", + live_data_poll_rate: Optional[int] = None, uses_app_path_prefix: bool = True, ): self._process_context = process_context + self._live_data_poll_rate = live_data_poll_rate self._uses_app_path_prefix = uses_app_path_prefix super().__init__(app_path_prefix) @@ -315,7 +317,7 @@ def index_html_endpoint(self, request: Request): **{"Content-Security-Policy": self.make_csp_header(nonce)}, **self.make_security_headers(), } - return HTMLResponse( + content = ( rendered_template.replace( "BUILDTIME_ASSETPREFIX_REPLACE_ME", f"{self._app_path_prefix}" ) @@ -323,9 +325,14 @@ def index_html_endpoint(self, request: Request): .replace( '"__TELEMETRY_ENABLED__"', str(context.instance.telemetry_enabled).lower() ) - .replace("NONCE-PLACEHOLDER", nonce), - headers=headers, + .replace("NONCE-PLACEHOLDER", nonce) ) + + if self._live_data_poll_rate: + content = content.replace( + "__LIVE_DATA_POLL_RATE__", str(self._live_data_poll_rate) + ) + return HTMLResponse(content, headers=headers) except FileNotFoundError: raise Exception(""" Can't find webapp files. diff --git a/python_modules/dagster/dagster/_cli/dev.py b/python_modules/dagster/dagster/_cli/dev.py index 6937e10c0bfef..ac49c7ad0c2aa 100644 --- a/python_modules/dagster/dagster/_cli/dev.py +++ b/python_modules/dagster/dagster/_cli/dev.py @@ -81,6 +81,13 @@ def dev_command_options(f): help="Host to use for the Dagster webserver.", required=False, ) +@click.option( + "--live-data-poll-rate", + help="Rate at which the dagster UI polls for updated asset data (in milliseconds)", + default="2000", + show_default=True, + required=False, +) @deprecated( breaking_version="2.0", subject="--dagit-port and --dagit-host args", emit_runtime_warning=False ) @@ -89,6 +96,7 @@ def dev_command( log_level: str, port: Optional[str], host: Optional[str], + live_data_poll_rate: Optional[str], **kwargs: ClickArgValue, ) -> None: # check if dagster-webserver installed, crash if not @@ -163,6 +171,7 @@ def dev_command( + (["--port", port] if port else []) + (["--host", host] if host else []) + (["--dagster-log-level", log_level]) + + (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else []) + args ) daemon_process = open_ipc_subprocess(