From dd1399d1b295ea17293bd5bf28c516507caa3bb9 Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Wed, 4 Oct 2023 09:13:29 -0400 Subject: [PATCH 1/6] increase live data poll-rate to 2seconds in OSS only --- .../dagster-ui/packages/app-oss/src/App.tsx | 21 ++++++----- .../src/asset-data/AssetLiveDataProvider.tsx | 35 ++++++++++++------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/js_modules/dagster-ui/packages/app-oss/src/App.tsx b/js_modules/dagster-ui/packages/app-oss/src/App.tsx index f44c84b2f9b29..d918d36372954 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/App.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/App.tsx @@ -6,6 +6,7 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav'; import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot'; import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton'; import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks'; +import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider'; import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider'; import React from 'react'; @@ -37,14 +38,16 @@ const appCache = createAppCache(); // eslint-disable-next-line import/no-default-export export default function AppPage() { return ( - - - - - - - - - + + + + + + + + + + + ); } diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx index 1b7913fbb3bbd..d93602ebb546c 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx @@ -112,6 +112,8 @@ const BATCHING_INTERVAL = 250; export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000; const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000; +export const LiveDataPollRateContext = React.createContext(SUBSCRIPTION_IDLE_POLL_RATE); + type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void; const AssetLiveDataContext = React.createContext<{ @@ -179,6 +181,8 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = setOldestDataTimestamp(oldestDataTimestamp === Infinity ? 0 : oldestDataTimestamp); }, []); + const pollRate = React.useContext(LiveDataPollRateContext); + React.useEffect(() => { if (!isDocumentVisible) { return; @@ -186,26 +190,29 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = // Check for assets to fetch every 5 seconds to simplify logic // This means assets will be fetched at most 5 + SUBSCRIPTION_IDLE_POLL_RATE after their first fetch // but then will be fetched every SUBSCRIPTION_IDLE_POLL_RATE after that - const interval = setInterval(() => fetchData(client, onUpdatingOrUpdated), 5000); - fetchData(client, onUpdatingOrUpdated); + const interval = setInterval( + () => fetchData(client, pollRate, onUpdatingOrUpdated), + Math.min(pollRate, 5000), + ); + fetchData(client, pollRate, onUpdatingOrUpdated); return () => { clearInterval(interval); }; - }, [client, isDocumentVisible, onUpdatingOrUpdated]); + }, [client, pollRate, isDocumentVisible, onUpdatingOrUpdated]); React.useEffect(() => { if (!needsImmediateFetch) { return; } const timeout = setTimeout(() => { - fetchData(client, onUpdatingOrUpdated); + fetchData(client, pollRate, onUpdatingOrUpdated); setNeedsImmediateFetch(false); // Wait BATCHING_INTERVAL before doing fetch in case the component is unmounted quickly (eg. in the case of scrolling/filtering quickly) }, BATCHING_INTERVAL); return () => { clearTimeout(timeout); }; - }, [client, needsImmediateFetch, onUpdatingOrUpdated]); + }, [client, needsImmediateFetch, pollRate, onUpdatingOrUpdated]); React.useEffect(() => { providerListener = (stringKey, assetData) => { @@ -297,6 +304,7 @@ let isFetching = false; async function _batchedQueryAssets( assetKeys: AssetKeyInput[], client: ApolloClient, + pollRate: number, setData: (data: Record) => void, onUpdatingOrUpdated: () => void, ) { @@ -314,11 +322,11 @@ async function _batchedQueryAssets( }); onUpdatingOrUpdated(); - function doNextFetch() { + function doNextFetch(pollRate: number) { isFetching = false; - const nextAssets = _determineAssetsToFetch(); + const nextAssets = _determineAssetsToFetch(pollRate); if (nextAssets.length) { - _batchedQueryAssets(nextAssets, client, setData, onUpdatingOrUpdated); + _batchedQueryAssets(nextAssets, client, pollRate, setData, onUpdatingOrUpdated); } } try { @@ -331,7 +339,7 @@ async function _batchedQueryAssets( }); setData(data); onUpdatingOrUpdated(); - doNextFetch(); + doNextFetch(pollRate); } catch (e) { console.error(e); // Retry fetching in 5 seconds if theres a network error @@ -369,7 +377,7 @@ function _unsubscribeToAssetKey(assetKey: AssetKeyInput, setData: DataForNodeLis } // Determine assets to fetch taking into account the last time they were fetched and whether they are already being fetched. -function _determineAssetsToFetch() { +function _determineAssetsToFetch(pollRate: number) { const assetsToFetch: AssetKeyInput[] = []; const assetsWithoutData: AssetKeyInput[] = []; const allKeys = Object.keys(_assetKeyListeners); @@ -380,7 +388,7 @@ function _determineAssetsToFetch() { continue; } const lastFetchTime = lastFetchedOrRequested[key]?.fetched ?? null; - if (lastFetchTime !== null && Date.now() - lastFetchTime < SUBSCRIPTION_IDLE_POLL_RATE) { + if (lastFetchTime !== null && Date.now() - lastFetchTime < pollRate) { continue; } if (lastFetchTime && isDocumentVisible()) { @@ -394,10 +402,11 @@ function _determineAssetsToFetch() { return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE); } -function fetchData(client: ApolloClient, onUpdatingOrUpdated: () => void) { +function fetchData(client: ApolloClient, pollRate: number, onUpdatingOrUpdated: () => void) { _batchedQueryAssets( - _determineAssetsToFetch(), + _determineAssetsToFetch(pollRate), client, + pollRate, (data) => { Object.entries(data).forEach(([key, assetData]) => { const listeners = _assetKeyListeners[key]; From ceb223f162d2fbb740e15bb0b353b2e130bc37c5 Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Mon, 9 Oct 2023 11:30:43 -0400 Subject: [PATCH 2/6] add CLI argument for live data poll rate --- .../dagster-ui/packages/app-oss/src/App.tsx | 4 ++-- .../app-oss/src/extractInitializationData.ts | 8 +++++++- .../packages/app-oss/src/pages/_document.tsx | 3 ++- .../dagster-webserver/dagster_webserver/app.py | 3 +++ .../dagster-webserver/dagster_webserver/cli.py | 15 +++++++++++++-- .../dagster_webserver/webserver.py | 18 +++++++++++------- python_modules/dagster/dagster/_cli/dev.py | 9 +++++++++ 7 files changed, 47 insertions(+), 13 deletions(-) diff --git a/js_modules/dagster-ui/packages/app-oss/src/App.tsx b/js_modules/dagster-ui/packages/app-oss/src/App.tsx index d918d36372954..7d2ac4e68c901 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/App.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/App.tsx @@ -14,7 +14,7 @@ import {CommunityNux} from './NUX/CommunityNux'; import {extractInitializationData} from './extractInitializationData'; import {telemetryLink} from './telemetryLink'; -const {pathPrefix, telemetryEnabled} = extractInitializationData(); +const {pathPrefix, telemetryEnabled, liveDataPollRate} = extractInitializationData(); const apolloLinks = [logLink, errorLink, timeStartLink]; @@ -38,7 +38,7 @@ const appCache = createAppCache(); // eslint-disable-next-line import/no-default-export export default function AppPage() { return ( - + diff --git a/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts b/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts index 14ce8793b114f..bd8296776e20e 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts +++ b/js_modules/dagster-ui/packages/app-oss/src/extractInitializationData.ts @@ -1,8 +1,10 @@ const ELEMENT_ID = 'initialization-data'; const PREFIX_PLACEHOLDER = '__PATH_PREFIX__'; const TELEMETRY_PLACEHOLDER = '__TELEMETRY_ENABLED__'; +const LIVE_DATA_POLL_RATE = '__LIVE_DATA_POLL_RATE__'; -let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefined; +let value: {pathPrefix: string; telemetryEnabled: boolean; liveDataPollRate?: number} | undefined = + undefined; // Determine the path prefix value, which is set server-side. // This value will be used for prefixing paths for the GraphQL @@ -10,6 +12,7 @@ let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefin export const extractInitializationData = (): { pathPrefix: string; telemetryEnabled: boolean; + liveDataPollRate?: number; } => { if (!value) { value = {pathPrefix: '', telemetryEnabled: false}; @@ -22,6 +25,9 @@ export const extractInitializationData = (): { if (parsed.telemetryEnabled !== TELEMETRY_PLACEHOLDER) { value.telemetryEnabled = parsed.telemetryEnabled; } + if (parsed.liveDataPollRate !== LIVE_DATA_POLL_RATE) { + value.liveDataPollRate = parsed.liveDataPollRate; + } } } return value; diff --git a/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx b/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx index b3b8fe3d427dc..4111160df449a 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/pages/_document.tsx @@ -35,7 +35,8 @@ export default function Document() { __html: ` { "pathPrefix": "__PATH_PREFIX__", - "telemetryEnabled": "__TELEMETRY_ENABLED__" + "telemetryEnabled": "__TELEMETRY_ENABLED__", + "liveDataPollRate": "__LIVE_DATA_POLL_RATE__" } `, }} diff --git a/python_modules/dagster-webserver/dagster_webserver/app.py b/python_modules/dagster-webserver/dagster_webserver/app.py index 9223fa27f0d84..d96a6278a6be7 100644 --- a/python_modules/dagster-webserver/dagster_webserver/app.py +++ b/python_modules/dagster-webserver/dagster_webserver/app.py @@ -1,3 +1,4 @@ +from typing import Optional from dagster import ( _check as check, ) @@ -12,6 +13,7 @@ def create_app_from_workspace_process_context( workspace_process_context: IWorkspaceProcessContext, path_prefix: str = "", + live_data_poll_rate: Optional[int] = None, **kwargs, ) -> Starlette: check.inst_param( @@ -34,4 +36,5 @@ def create_app_from_workspace_process_context( return DagsterWebserver( workspace_process_context, path_prefix, + live_data_poll_rate, ).create_asgi_app(**kwargs) diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 80452a0bcdf41..1990556f109db 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -162,6 +162,14 @@ def create_dagster_webserver_cli(): required=False, hidden=True, ) +@click.option( + "--live-data-poll-rate", + help="Rate at which the dagster UI polls for updated asset data (defaults to 2 seconds)", + type=click.INT, + required=False, + default=2000, + show_default=True +) @click.version_option(version=__version__, prog_name="dagster-webserver") def dagster_webserver( host: str, @@ -175,6 +183,7 @@ def dagster_webserver( dagster_log_level: str, code_server_log_level: str, instance_ref: Optional[str], + live_data_poll_rate: int, **kwargs: ClickArgValue, ): if suppress_warnings: @@ -205,7 +214,7 @@ def dagster_webserver( code_server_log_level=code_server_log_level, ) as workspace_process_context: host_dagster_ui_with_workspace_process_context( - workspace_process_context, host, port, path_prefix, uvicorn_log_level + workspace_process_context, host, port, path_prefix, uvicorn_log_level, live_data_poll_rate ) @@ -227,6 +236,7 @@ def host_dagster_ui_with_workspace_process_context( port: Optional[int], path_prefix: str, log_level: str, + live_data_poll_rate: int, ): check.inst_param( workspace_process_context, "workspace_process_context", IWorkspaceProcessContext @@ -234,11 +244,12 @@ def host_dagster_ui_with_workspace_process_context( host = check.opt_str_param(host, "host", "127.0.0.1") check.opt_int_param(port, "port") check.str_param(path_prefix, "path_prefix") + check.int_param(live_data_poll_rate, "live_data_poll_rate") logger = logging.getLogger(WEBSERVER_LOGGER_NAME) app = create_app_from_workspace_process_context( - workspace_process_context, path_prefix, lifespan=_lifespan + workspace_process_context, path_prefix, live_data_poll_rate, lifespan=_lifespan ) if not port: diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 0973f43c7b5ac..5f06133435ef9 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -2,7 +2,7 @@ import io import uuid from os import path, walk -from typing import Generic, List, TypeVar +from typing import Generic, List, Optional, TypeVar import dagster._check as check from dagster import __version__ as dagster_version @@ -40,16 +40,15 @@ class DagsterWebserver(GraphQLServer, Generic[T_IWorkspaceProcessContext]): _process_context: T_IWorkspaceProcessContext - _uses_app_path_prefix: bool def __init__( self, process_context: T_IWorkspaceProcessContext, app_path_prefix: str = "", - uses_app_path_prefix: bool = True, + live_data_poll_rate: Optional[int] = None, ): self._process_context = process_context - self._uses_app_path_prefix = uses_app_path_prefix + self._live_data_poll_rate = live_data_poll_rate super().__init__(app_path_prefix) def build_graphql_schema(self) -> Schema: @@ -209,7 +208,7 @@ def index_html_endpoint(self, request: Request): **{"Content-Security-Policy": self.make_csp_header(nonce)}, **self.make_security_headers(), } - return HTMLResponse( + content = ( rendered_template.replace( "BUILDTIME_ASSETPREFIX_REPLACE_ME", f"{self._app_path_prefix}" ) @@ -217,9 +216,14 @@ def index_html_endpoint(self, request: Request): .replace( '"__TELEMETRY_ENABLED__"', str(context.instance.telemetry_enabled).lower() ) - .replace("NONCE-PLACEHOLDER", nonce), - headers=headers, + .replace("NONCE-PLACEHOLDER", nonce) ) + + if self._live_data_poll_rate: + content = content.replace( + "__LIVE_DATA_POLL_RATE__", str(self._live_data_poll_rate) + ) + return HTMLResponse(content, headers=headers) except FileNotFoundError: raise Exception(""" Can't find webapp files. diff --git a/python_modules/dagster/dagster/_cli/dev.py b/python_modules/dagster/dagster/_cli/dev.py index 6937e10c0bfef..6755db5d1c5a7 100644 --- a/python_modules/dagster/dagster/_cli/dev.py +++ b/python_modules/dagster/dagster/_cli/dev.py @@ -81,6 +81,13 @@ def dev_command_options(f): help="Host to use for the Dagster webserver.", required=False, ) +@click.option( + "--live-data-poll-rate", + help="Rate at which the dagster UI polls for updated asset data ", + default="2000", + show_default=True, + required=False, +) @deprecated( breaking_version="2.0", subject="--dagit-port and --dagit-host args", emit_runtime_warning=False ) @@ -89,6 +96,7 @@ def dev_command( log_level: str, port: Optional[str], host: Optional[str], + live_data_poll_rate: Optional[str], **kwargs: ClickArgValue, ) -> None: # check if dagster-webserver installed, crash if not @@ -163,6 +171,7 @@ def dev_command( + (["--port", port] if port else []) + (["--host", host] if host else []) + (["--dagster-log-level", log_level]) + + (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else []) + args ) daemon_process = open_ipc_subprocess( From a1406489f9e57cbe75ce5673d95b6068064db508 Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Mon, 9 Oct 2023 12:01:02 -0400 Subject: [PATCH 3/6] ruff/black --- .../dagster-webserver/dagster_webserver/app.py | 1 + .../dagster-webserver/dagster_webserver/cli.py | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-webserver/dagster_webserver/app.py b/python_modules/dagster-webserver/dagster_webserver/app.py index d96a6278a6be7..5d04ecd3c9c71 100644 --- a/python_modules/dagster-webserver/dagster_webserver/app.py +++ b/python_modules/dagster-webserver/dagster_webserver/app.py @@ -1,4 +1,5 @@ from typing import Optional + from dagster import ( _check as check, ) diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 1990556f109db..0c82e38c2de92 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -168,7 +168,7 @@ def create_dagster_webserver_cli(): type=click.INT, required=False, default=2000, - show_default=True + show_default=True, ) @click.version_option(version=__version__, prog_name="dagster-webserver") def dagster_webserver( @@ -214,7 +214,12 @@ def dagster_webserver( code_server_log_level=code_server_log_level, ) as workspace_process_context: host_dagster_ui_with_workspace_process_context( - workspace_process_context, host, port, path_prefix, uvicorn_log_level, live_data_poll_rate + workspace_process_context, + host, + port, + path_prefix, + uvicorn_log_level, + live_data_poll_rate, ) From b945db8ea4729ff9677b7eec27f7af6f4e5fd9e7 Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Mon, 9 Oct 2023 12:40:52 -0400 Subject: [PATCH 4/6] milliseconds --- python_modules/dagster-webserver/dagster_webserver/cli.py | 2 +- python_modules/dagster/dagster/_cli/dev.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 0c82e38c2de92..35f3876661411 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -164,7 +164,7 @@ def create_dagster_webserver_cli(): ) @click.option( "--live-data-poll-rate", - help="Rate at which the dagster UI polls for updated asset data (defaults to 2 seconds)", + help="Rate at which the dagster UI polls for updated asset data (in milliseconds)", type=click.INT, required=False, default=2000, diff --git a/python_modules/dagster/dagster/_cli/dev.py b/python_modules/dagster/dagster/_cli/dev.py index 6755db5d1c5a7..ac49c7ad0c2aa 100644 --- a/python_modules/dagster/dagster/_cli/dev.py +++ b/python_modules/dagster/dagster/_cli/dev.py @@ -83,7 +83,7 @@ def dev_command_options(f): ) @click.option( "--live-data-poll-rate", - help="Rate at which the dagster UI polls for updated asset data ", + help="Rate at which the dagster UI polls for updated asset data (in milliseconds)", default="2000", show_default=True, required=False, From 98d071b7b9efe7b2ef13abd3f873419e8e7d2dbd Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Mon, 9 Oct 2023 13:27:10 -0400 Subject: [PATCH 5/6] optional int --- python_modules/dagster-webserver/dagster_webserver/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 35f3876661411..1dc6a3393a404 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -241,7 +241,7 @@ def host_dagster_ui_with_workspace_process_context( port: Optional[int], path_prefix: str, log_level: str, - live_data_poll_rate: int, + live_data_poll_rate: Optional[int] = None, ): check.inst_param( workspace_process_context, "workspace_process_context", IWorkspaceProcessContext @@ -249,7 +249,7 @@ def host_dagster_ui_with_workspace_process_context( host = check.opt_str_param(host, "host", "127.0.0.1") check.opt_int_param(port, "port") check.str_param(path_prefix, "path_prefix") - check.int_param(live_data_poll_rate, "live_data_poll_rate") + check.opt_int_param(live_data_poll_rate, "live_data_poll_rate") logger = logging.getLogger(WEBSERVER_LOGGER_NAME) From 5c4dda6d85f4c97ad6e8f4da012ecc91e963c0ba Mon Sep 17 00:00:00 2001 From: Marco Salazar Date: Tue, 10 Oct 2023 12:33:19 -0400 Subject: [PATCH 6/6] add back unrelated change --- .../dagster-webserver/dagster_webserver/webserver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 5f06133435ef9..4a272aedd4042 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -40,15 +40,18 @@ class DagsterWebserver(GraphQLServer, Generic[T_IWorkspaceProcessContext]): _process_context: T_IWorkspaceProcessContext + _uses_app_path_prefix: bool def __init__( self, process_context: T_IWorkspaceProcessContext, app_path_prefix: str = "", live_data_poll_rate: Optional[int] = None, + uses_app_path_prefix: bool = True, ): self._process_context = process_context self._live_data_poll_rate = live_data_poll_rate + self._uses_app_path_prefix = uses_app_path_prefix super().__init__(app_path_prefix) def build_graphql_schema(self) -> Schema: