Skip to content

Commit

Permalink
Increase live data poll rate to 2 seconds in OSS only (#17000)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Due to our recent changes to how we fetch asset data we can no longer
provide a query refresh countdown. This makes the app feel less alive so
for OSS only we want to increase the poll rate for data to every 2
seconds. In practice this means assets could be fetched at best every 2
seconds but it could take longer due to the chunking/queueing behavior
of AssetLiveDataProvider (which is good).

@schrockn Asked for this specifically.

## How I Tested These Changes

Locally I checked the network tab and made sure we were polling every 2
seconds
  • Loading branch information
salazarm authored Oct 10, 2023
1 parent 3ec4767 commit f668316
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 31 deletions.
23 changes: 13 additions & 10 deletions js_modules/dagster-ui/packages/app-oss/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav';
import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot';
import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton';
import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks';
import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider';
import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider';
import React from 'react';

import {CommunityNux} from './NUX/CommunityNux';
import {extractInitializationData} from './extractInitializationData';
import {telemetryLink} from './telemetryLink';

const {pathPrefix, telemetryEnabled} = extractInitializationData();
const {pathPrefix, telemetryEnabled, liveDataPollRate} = extractInitializationData();

const apolloLinks = [logLink, errorLink, timeStartLink];

Expand All @@ -37,14 +38,16 @@ const appCache = createAppCache();
// eslint-disable-next-line import/no-default-export
export default function AppPage() {
return (
<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
<LiveDataPollRateContext.Provider value={liveDataPollRate ?? 2000}>
<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
</LiveDataPollRateContext.Provider>
);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
const ELEMENT_ID = 'initialization-data';
const PREFIX_PLACEHOLDER = '__PATH_PREFIX__';
const TELEMETRY_PLACEHOLDER = '__TELEMETRY_ENABLED__';
const LIVE_DATA_POLL_RATE = '__LIVE_DATA_POLL_RATE__';

let value: {pathPrefix: string; telemetryEnabled: boolean} | undefined = undefined;
let value: {pathPrefix: string; telemetryEnabled: boolean; liveDataPollRate?: number} | undefined =
undefined;

// Determine the path prefix value, which is set server-side.
// This value will be used for prefixing paths for the GraphQL
// endpoint and dynamically loaded bundles.
export const extractInitializationData = (): {
pathPrefix: string;
telemetryEnabled: boolean;
liveDataPollRate?: number;
} => {
if (!value) {
value = {pathPrefix: '', telemetryEnabled: false};
Expand All @@ -22,6 +25,9 @@ export const extractInitializationData = (): {
if (parsed.telemetryEnabled !== TELEMETRY_PLACEHOLDER) {
value.telemetryEnabled = parsed.telemetryEnabled;
}
if (parsed.liveDataPollRate !== LIVE_DATA_POLL_RATE) {
value.liveDataPollRate = parsed.liveDataPollRate;
}
}
}
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export default function Document() {
__html: `
{
"pathPrefix": "__PATH_PREFIX__",
"telemetryEnabled": "__TELEMETRY_ENABLED__"
"telemetryEnabled": "__TELEMETRY_ENABLED__",
"liveDataPollRate": "__LIVE_DATA_POLL_RATE__"
}
`,
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const BATCHING_INTERVAL = 250;
export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

export const LiveDataPollRateContext = React.createContext<number>(SUBSCRIPTION_IDLE_POLL_RATE);

type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
Expand Down Expand Up @@ -179,33 +181,38 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
setOldestDataTimestamp(oldestDataTimestamp === Infinity ? 0 : oldestDataTimestamp);
}, []);

const pollRate = React.useContext(LiveDataPollRateContext);

React.useEffect(() => {
if (!isDocumentVisible) {
return;
}
// Check for assets to fetch every 5 seconds to simplify logic
// This means assets will be fetched at most 5 + SUBSCRIPTION_IDLE_POLL_RATE after their first fetch
// but then will be fetched every SUBSCRIPTION_IDLE_POLL_RATE after that
const interval = setInterval(() => fetchData(client, onUpdatingOrUpdated), 5000);
fetchData(client, onUpdatingOrUpdated);
const interval = setInterval(
() => fetchData(client, pollRate, onUpdatingOrUpdated),
Math.min(pollRate, 5000),
);
fetchData(client, pollRate, onUpdatingOrUpdated);
return () => {
clearInterval(interval);
};
}, [client, isDocumentVisible, onUpdatingOrUpdated]);
}, [client, pollRate, isDocumentVisible, onUpdatingOrUpdated]);

React.useEffect(() => {
if (!needsImmediateFetch) {
return;
}
const timeout = setTimeout(() => {
fetchData(client, onUpdatingOrUpdated);
fetchData(client, pollRate, onUpdatingOrUpdated);
setNeedsImmediateFetch(false);
// Wait BATCHING_INTERVAL before doing fetch in case the component is unmounted quickly (eg. in the case of scrolling/filtering quickly)
}, BATCHING_INTERVAL);
return () => {
clearTimeout(timeout);
};
}, [client, needsImmediateFetch, onUpdatingOrUpdated]);
}, [client, needsImmediateFetch, pollRate, onUpdatingOrUpdated]);

React.useEffect(() => {
providerListener = (stringKey, assetData) => {
Expand Down Expand Up @@ -297,6 +304,7 @@ let isFetching = false;
async function _batchedQueryAssets(
assetKeys: AssetKeyInput[],
client: ApolloClient<any>,
pollRate: number,
setData: (data: Record<string, LiveDataForNode>) => void,
onUpdatingOrUpdated: () => void,
) {
Expand All @@ -314,11 +322,11 @@ async function _batchedQueryAssets(
});
onUpdatingOrUpdated();

function doNextFetch() {
function doNextFetch(pollRate: number) {
isFetching = false;
const nextAssets = _determineAssetsToFetch();
const nextAssets = _determineAssetsToFetch(pollRate);
if (nextAssets.length) {
_batchedQueryAssets(nextAssets, client, setData, onUpdatingOrUpdated);
_batchedQueryAssets(nextAssets, client, pollRate, setData, onUpdatingOrUpdated);
}
}
try {
Expand All @@ -331,7 +339,7 @@ async function _batchedQueryAssets(
});
setData(data);
onUpdatingOrUpdated();
doNextFetch();
doNextFetch(pollRate);
} catch (e) {
console.error(e);
// Retry fetching in 5 seconds if theres a network error
Expand Down Expand Up @@ -369,7 +377,7 @@ function _unsubscribeToAssetKey(assetKey: AssetKeyInput, setData: DataForNodeLis
}

// Determine assets to fetch taking into account the last time they were fetched and whether they are already being fetched.
function _determineAssetsToFetch() {
function _determineAssetsToFetch(pollRate: number) {
const assetsToFetch: AssetKeyInput[] = [];
const assetsWithoutData: AssetKeyInput[] = [];
const allKeys = Object.keys(_assetKeyListeners);
Expand All @@ -380,7 +388,7 @@ function _determineAssetsToFetch() {
continue;
}
const lastFetchTime = lastFetchedOrRequested[key]?.fetched ?? null;
if (lastFetchTime !== null && Date.now() - lastFetchTime < SUBSCRIPTION_IDLE_POLL_RATE) {
if (lastFetchTime !== null && Date.now() - lastFetchTime < pollRate) {
continue;
}
if (lastFetchTime && isDocumentVisible()) {
Expand All @@ -394,10 +402,11 @@ function _determineAssetsToFetch() {
return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE);
}

function fetchData(client: ApolloClient<any>, onUpdatingOrUpdated: () => void) {
function fetchData(client: ApolloClient<any>, pollRate: number, onUpdatingOrUpdated: () => void) {
_batchedQueryAssets(
_determineAssetsToFetch(),
_determineAssetsToFetch(pollRate),
client,
pollRate,
(data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster-webserver/dagster_webserver/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from dagster import (
_check as check,
)
Expand All @@ -12,6 +14,7 @@
def create_app_from_workspace_process_context(
workspace_process_context: IWorkspaceProcessContext,
path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
**kwargs,
) -> Starlette:
check.inst_param(
Expand All @@ -34,4 +37,5 @@ def create_app_from_workspace_process_context(
return DagsterWebserver(
workspace_process_context,
path_prefix,
live_data_poll_rate,
).create_asgi_app(**kwargs)
20 changes: 18 additions & 2 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def create_dagster_webserver_cli():
required=False,
hidden=True,
)
@click.option(
"--live-data-poll-rate",
help="Rate at which the dagster UI polls for updated asset data (in milliseconds)",
type=click.INT,
required=False,
default=2000,
show_default=True,
)
@click.version_option(version=__version__, prog_name="dagster-webserver")
def dagster_webserver(
host: str,
Expand All @@ -175,6 +183,7 @@ def dagster_webserver(
dagster_log_level: str,
code_server_log_level: str,
instance_ref: Optional[str],
live_data_poll_rate: int,
**kwargs: ClickArgValue,
):
if suppress_warnings:
Expand Down Expand Up @@ -205,7 +214,12 @@ def dagster_webserver(
code_server_log_level=code_server_log_level,
) as workspace_process_context:
host_dagster_ui_with_workspace_process_context(
workspace_process_context, host, port, path_prefix, uvicorn_log_level
workspace_process_context,
host,
port,
path_prefix,
uvicorn_log_level,
live_data_poll_rate,
)


Expand All @@ -227,18 +241,20 @@ def host_dagster_ui_with_workspace_process_context(
port: Optional[int],
path_prefix: str,
log_level: str,
live_data_poll_rate: Optional[int] = None,
):
check.inst_param(
workspace_process_context, "workspace_process_context", IWorkspaceProcessContext
)
host = check.opt_str_param(host, "host", "127.0.0.1")
check.opt_int_param(port, "port")
check.str_param(path_prefix, "path_prefix")
check.opt_int_param(live_data_poll_rate, "live_data_poll_rate")

logger = logging.getLogger(WEBSERVER_LOGGER_NAME)

app = create_app_from_workspace_process_context(
workspace_process_context, path_prefix, lifespan=_lifespan
workspace_process_context, path_prefix, live_data_poll_rate, lifespan=_lifespan
)

if not port:
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster-webserver/dagster_webserver/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import io
import uuid
from os import path, walk
from typing import Generic, List, TypeVar
from typing import Generic, List, Optional, TypeVar

import dagster._check as check
from dagster import __version__ as dagster_version
Expand Down Expand Up @@ -51,9 +51,11 @@ def __init__(
self,
process_context: T_IWorkspaceProcessContext,
app_path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
uses_app_path_prefix: bool = True,
):
self._process_context = process_context
self._live_data_poll_rate = live_data_poll_rate
self._uses_app_path_prefix = uses_app_path_prefix
super().__init__(app_path_prefix)

Expand Down Expand Up @@ -315,17 +317,22 @@ def index_html_endpoint(self, request: Request):
**{"Content-Security-Policy": self.make_csp_header(nonce)},
**self.make_security_headers(),
}
return HTMLResponse(
content = (
rendered_template.replace(
"BUILDTIME_ASSETPREFIX_REPLACE_ME", f"{self._app_path_prefix}"
)
.replace("__PATH_PREFIX__", self._app_path_prefix)
.replace(
'"__TELEMETRY_ENABLED__"', str(context.instance.telemetry_enabled).lower()
)
.replace("NONCE-PLACEHOLDER", nonce),
headers=headers,
.replace("NONCE-PLACEHOLDER", nonce)
)

if self._live_data_poll_rate:
content = content.replace(
"__LIVE_DATA_POLL_RATE__", str(self._live_data_poll_rate)
)
return HTMLResponse(content, headers=headers)
except FileNotFoundError:
raise Exception("""
Can't find webapp files.
Expand Down
9 changes: 9 additions & 0 deletions python_modules/dagster/dagster/_cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def dev_command_options(f):
help="Host to use for the Dagster webserver.",
required=False,
)
@click.option(
"--live-data-poll-rate",
help="Rate at which the dagster UI polls for updated asset data (in milliseconds)",
default="2000",
show_default=True,
required=False,
)
@deprecated(
breaking_version="2.0", subject="--dagit-port and --dagit-host args", emit_runtime_warning=False
)
Expand All @@ -89,6 +96,7 @@ def dev_command(
log_level: str,
port: Optional[str],
host: Optional[str],
live_data_poll_rate: Optional[str],
**kwargs: ClickArgValue,
) -> None:
# check if dagster-webserver installed, crash if not
Expand Down Expand Up @@ -163,6 +171,7 @@ def dev_command(
+ (["--port", port] if port else [])
+ (["--host", host] if host else [])
+ (["--dagster-log-level", log_level])
+ (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else [])
+ args
)
daemon_process = open_ipc_subprocess(
Expand Down

1 comment on commit f668316

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-ld98a6m10-elementl.vercel.app

Built with commit f668316.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.