Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase live data poll rate to 2 seconds in OSS only #17000

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions js_modules/dagster-ui/packages/app-oss/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav';
import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot';
import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton';
import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks';
import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider';
import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider';
import React from 'react';

Expand Down Expand Up @@ -37,14 +38,16 @@ const appCache = createAppCache();
// eslint-disable-next-line import/no-default-export
export default function AppPage() {
return (
<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
<LiveDataPollRateContext.Provider value={2000}>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this outside of AppProvider intentionally because we only want to do this in OSS and not Cloud

<AppProvider appCache={appCache} config={config}>
<AppTopNav searchPlaceholder="Search…" allowGlobalReload>
<UserSettingsButton />
</AppTopNav>
<App>
<ContentRoot />
<CommunityNux />
</App>
</AppProvider>
</LiveDataPollRateContext.Provider>
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const BATCHING_INTERVAL = 250;
export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

export const LiveDataPollRateContext = React.createContext<number>(SUBSCRIPTION_IDLE_POLL_RATE);

type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
Expand Down Expand Up @@ -179,33 +181,38 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
setOldestDataTimestamp(oldestDataTimestamp === Infinity ? 0 : oldestDataTimestamp);
}, []);

const pollRate = React.useContext(LiveDataPollRateContext);

React.useEffect(() => {
if (!isDocumentVisible) {
return;
}
// Check for assets to fetch every 5 seconds to simplify logic
// This means assets will be fetched at most 5 + SUBSCRIPTION_IDLE_POLL_RATE after their first fetch
// but then will be fetched every SUBSCRIPTION_IDLE_POLL_RATE after that
const interval = setInterval(() => fetchData(client, onUpdatingOrUpdated), 5000);
fetchData(client, onUpdatingOrUpdated);
const interval = setInterval(
() => fetchData(client, pollRate, onUpdatingOrUpdated),
Math.min(pollRate, 5000),
);
fetchData(client, pollRate, onUpdatingOrUpdated);
return () => {
clearInterval(interval);
};
}, [client, isDocumentVisible, onUpdatingOrUpdated]);
}, [client, pollRate, isDocumentVisible, onUpdatingOrUpdated]);

React.useEffect(() => {
if (!needsImmediateFetch) {
return;
}
const timeout = setTimeout(() => {
fetchData(client, onUpdatingOrUpdated);
fetchData(client, pollRate, onUpdatingOrUpdated);
setNeedsImmediateFetch(false);
// Wait BATCHING_INTERVAL before doing fetch in case the component is unmounted quickly (eg. in the case of scrolling/filtering quickly)
}, BATCHING_INTERVAL);
return () => {
clearTimeout(timeout);
};
}, [client, needsImmediateFetch, onUpdatingOrUpdated]);
}, [client, needsImmediateFetch, pollRate, onUpdatingOrUpdated]);

React.useEffect(() => {
providerListener = (stringKey, assetData) => {
Expand Down Expand Up @@ -297,6 +304,7 @@ let isFetching = false;
async function _batchedQueryAssets(
assetKeys: AssetKeyInput[],
client: ApolloClient<any>,
pollRate: number,
setData: (data: Record<string, LiveDataForNode>) => void,
onUpdatingOrUpdated: () => void,
) {
Expand All @@ -314,11 +322,11 @@ async function _batchedQueryAssets(
});
onUpdatingOrUpdated();

function doNextFetch() {
function doNextFetch(pollRate: number) {
isFetching = false;
const nextAssets = _determineAssetsToFetch();
const nextAssets = _determineAssetsToFetch(pollRate);
if (nextAssets.length) {
_batchedQueryAssets(nextAssets, client, setData, onUpdatingOrUpdated);
_batchedQueryAssets(nextAssets, client, pollRate, setData, onUpdatingOrUpdated);
}
}
try {
Expand All @@ -331,7 +339,7 @@ async function _batchedQueryAssets(
});
setData(data);
onUpdatingOrUpdated();
doNextFetch();
doNextFetch(pollRate);
} catch (e) {
console.error(e);
// Retry fetching in 5 seconds if theres a network error
Expand Down Expand Up @@ -369,7 +377,7 @@ function _unsubscribeToAssetKey(assetKey: AssetKeyInput, setData: DataForNodeLis
}

// Determine assets to fetch taking into account the last time they were fetched and whether they are already being fetched.
function _determineAssetsToFetch() {
function _determineAssetsToFetch(pollRate: number) {
const assetsToFetch: AssetKeyInput[] = [];
const assetsWithoutData: AssetKeyInput[] = [];
const allKeys = Object.keys(_assetKeyListeners);
Expand All @@ -380,7 +388,7 @@ function _determineAssetsToFetch() {
continue;
}
const lastFetchTime = lastFetchedOrRequested[key]?.fetched ?? null;
if (lastFetchTime !== null && Date.now() - lastFetchTime < SUBSCRIPTION_IDLE_POLL_RATE) {
if (lastFetchTime !== null && Date.now() - lastFetchTime < pollRate) {
continue;
}
if (lastFetchTime && isDocumentVisible()) {
Expand All @@ -394,10 +402,11 @@ function _determineAssetsToFetch() {
return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE);
}

function fetchData(client: ApolloClient<any>, onUpdatingOrUpdated: () => void) {
function fetchData(client: ApolloClient<any>, pollRate: number, onUpdatingOrUpdated: () => void) {
_batchedQueryAssets(
_determineAssetsToFetch(),
_determineAssetsToFetch(pollRate),
client,
pollRate,
(data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
Expand Down