From f6a0162819fb280ac229b745502403a3f3ae3c34 Mon Sep 17 00:00:00 2001 From: Marco polo Date: Thu, 1 Feb 2024 08:56:38 -0500 Subject: [PATCH] Refactor AssetLiveDataProvider -> LiveDataProvider (#19453) ## Summary & Motivation The strategy of chunking/batching queries to the server is one I want to reuse on the upcoming asset timeline page. To support that I'm refactoring this class to allow for the core chunking/batching logic to be reused by other queries. ## How I Tested These Changes I relied on the existing jest tests for AssetLiveDataProvider --- .../dagster-ui/packages/app-oss/src/App.tsx | 2 +- .../src/asset-data/AssetLiveDataProvider.tsx | 343 +++++++++--------- .../src/asset-data/AssetLiveDataThread.tsx | 246 ------------- .../__tests__/AssetLiveDataProvider.test.tsx | 12 +- .../ui-core/src/asset-data/__tests__/util.ts | 4 +- ...ypes.ts => AssetLiveDataProvider.types.ts} | 0 .../src/asset-graph/AssetGraphExplorer.tsx | 4 +- .../ui-core/src/asset-graph/Utils.tsx | 6 +- .../__stories__/AssetNode.stories.tsx | 10 +- .../asset-graph/__tests__/AssetNode.test.tsx | 7 +- .../packages/ui-core/src/assets/AssetView.tsx | 4 +- .../FailedRunSinceMaterializationBanner.tsx | 2 +- .../__fixtures__/AssetTables.fixtures.ts | 4 +- .../src/live-data-provider/Factory.tsx | 54 +++ .../live-data-provider/LiveDataProvider.tsx | 173 +++++++++ .../LiveDataRefreshButton.tsx} | 4 +- .../src/live-data-provider/LiveDataThread.tsx | 112 ++++++ .../LiveDataThreadManager.tsx} | 140 +++---- .../ui-core/src/live-data-provider/util.ts | 8 + 19 files changed, 612 insertions(+), 523 deletions(-) delete mode 100644 js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThread.tsx rename js_modules/dagster-ui/packages/ui-core/src/asset-data/types/{AssetLiveDataThread.types.ts => AssetLiveDataProvider.types.ts} (100%) create mode 100644 js_modules/dagster-ui/packages/ui-core/src/live-data-provider/Factory.tsx create mode 100644 js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataProvider.tsx rename js_modules/dagster-ui/packages/ui-core/src/{asset-data/AssetDataRefreshButton.tsx => live-data-provider/LiveDataRefreshButton.tsx} (92%) create mode 100644 js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThread.tsx rename js_modules/dagster-ui/packages/ui-core/src/{asset-data/AssetLiveDataThreadManager.tsx => live-data-provider/LiveDataThreadManager.tsx} (52%) create mode 100644 js_modules/dagster-ui/packages/ui-core/src/live-data-provider/util.ts diff --git a/js_modules/dagster-ui/packages/app-oss/src/App.tsx b/js_modules/dagster-ui/packages/app-oss/src/App.tsx index b9040320edeff..8a0ee36be09e1 100644 --- a/js_modules/dagster-ui/packages/app-oss/src/App.tsx +++ b/js_modules/dagster-ui/packages/app-oss/src/App.tsx @@ -6,8 +6,8 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav'; import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot'; import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton'; import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks'; -import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider'; import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider'; +import {LiveDataPollRateContext} from '@dagster-io/ui-core/live-data-provider/LiveDataProvider'; import {CommunityNux} from './NUX/CommunityNux'; import {extractInitializationData} from './extractInitializationData'; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx index cf931db07ada2..4056f45884c88 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx @@ -1,172 +1,91 @@ -import {useApolloClient} from '@apollo/client'; +import {ApolloClient, gql, useApolloClient} from '@apollo/client'; import uniq from 'lodash/uniq'; import React from 'react'; -import {AssetDataRefreshButton} from './AssetDataRefreshButton'; -import {AssetLiveDataThreadID} from './AssetLiveDataThread'; -import {AssetLiveDataThreadManager} from './AssetLiveDataThreadManager'; +import { + AssetGraphLiveQuery, + AssetGraphLiveQueryVariables, +} from './types/AssetLiveDataProvider.types'; +import {SUBSCRIPTION_MAX_POLL_RATE} from './util'; import {observeAssetEventsInRuns} from '../asset-graph/AssetRunLogObserver'; -import {LiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils'; +import { + LiveDataForNode, + buildLiveDataForNode, + tokenForAssetKey, + tokenToAssetKey, +} from '../asset-graph/Utils'; import {AssetKeyInput} from '../graphql/types'; -import {useDocumentVisibility} from '../hooks/useDocumentVisibility'; +import {liveDataFactory} from '../live-data-provider/Factory'; +import {LiveDataPollRateContext} from '../live-data-provider/LiveDataProvider'; +import {LiveDataThreadID} from '../live-data-provider/LiveDataThread'; import {useDidLaunchEvent} from '../runs/RunUtils'; -export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000; -const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000; - -export const LiveDataPollRateContext = React.createContext(SUBSCRIPTION_IDLE_POLL_RATE); - -export const AssetLiveDataRefreshContext = React.createContext<{ - isGloballyRefreshing: boolean; - oldestDataTimestamp: number; - refresh: () => void; -}>({ - isGloballyRefreshing: false, - oldestDataTimestamp: Infinity, - refresh: () => {}, -}); - -export function useAssetLiveData( - assetKey: AssetKeyInput, - thread: AssetLiveDataThreadID = 'default', -) { - const {liveDataByNode, refresh, refreshing} = useAssetsLiveData( - React.useMemo(() => [assetKey], [assetKey]), - thread, +function makeFactory() { + return liveDataFactory( + () => { + return useApolloClient(); + }, + async (keys, client: ApolloClient) => { + const {data} = await client.query({ + query: ASSETS_GRAPH_LIVE_QUERY, + fetchPolicy: 'network-only', + variables: { + assetKeys: keys.map(tokenToAssetKey), + }, + }); + const nodesByKey = Object.fromEntries( + data.assetNodes.map((node) => [tokenForAssetKey(node.assetKey), node]), + ); + + const liveDataByKey = Object.fromEntries( + data.assetsLatestInfo.map((assetLatestInfo) => { + const id = tokenForAssetKey(assetLatestInfo.assetKey); + return [id, buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo)]; + }), + ); + return liveDataByKey; + }, ); - return { - liveData: liveDataByNode[tokenForAssetKey(assetKey)], - refresh, - refreshing, - }; +} +export const factory = makeFactory(); + +export function useAssetLiveData(assetKey: AssetKeyInput, thread: LiveDataThreadID = 'default') { + return factory.useLiveDataSingle(tokenForAssetKey(assetKey), thread); } export function useAssetsLiveData( assetKeys: AssetKeyInput[], - thread: AssetLiveDataThreadID = 'default', - batchUpdatesInterval: number = 1000, + thread: LiveDataThreadID = 'default', ) { - const [data, setData] = React.useState>({}); - - const [isRefreshing, setIsRefreshing] = React.useState(false); - - const client = useApolloClient(); - const manager = AssetLiveDataThreadManager.getInstance(client); - - React.useEffect(() => { - let timeout: ReturnType | null = null; - let didUpdateOnce = false; - let didScheduleUpdateOnce = false; - let updates: {stringKey: string; assetData: LiveDataForNode | undefined}[] = []; - - function processUpdates() { - setData((data) => { - const copy = {...data}; - updates.forEach(({stringKey, assetData}) => { - if (assetData) { - copy[stringKey] = assetData; - } else { - delete copy[stringKey]; - } - }); - updates = []; - return copy; - }); - } - - const setDataSingle = (stringKey: string, assetData?: LiveDataForNode) => { - /** - * Throttle updates to avoid triggering too many GCs and too many updates when fetching 1,000 assets, - */ - updates.push({stringKey, assetData}); - if (!didUpdateOnce) { - if (!didScheduleUpdateOnce) { - didScheduleUpdateOnce = true; - requestAnimationFrame(() => { - processUpdates(); - didUpdateOnce = true; - }); - } - } else if (!timeout) { - timeout = setTimeout(() => { - processUpdates(); - timeout = null; - }, batchUpdatesInterval); - } - }; - const unsubscribeCallbacks = assetKeys.map((key) => - manager.subscribe(key, setDataSingle, thread), - ); - return () => { - unsubscribeCallbacks.forEach((cb) => { - cb(); - }); - }; - }, [assetKeys, batchUpdatesInterval, manager, thread]); - - return { - liveDataByNode: data, - - refresh: React.useCallback(() => { - manager.refreshKeys(assetKeys); - setIsRefreshing(true); - }, [assetKeys, manager]), - - refreshing: React.useMemo(() => { - if (isRefreshing && !manager.areKeysRefreshing(assetKeys)) { - setTimeout(() => { - setIsRefreshing(false); - }); - return false; - } - return true; - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [assetKeys, data, isRefreshing]), - }; + return factory.useLiveData( + assetKeys.map((key) => tokenForAssetKey(key)), + thread, + ); } export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) => { const [allObservedKeys, setAllObservedKeys] = React.useState([]); - const client = useApolloClient(); - const manager = AssetLiveDataThreadManager.getInstance(client); - - const [isGloballyRefreshing, setIsGloballyRefreshing] = React.useState(false); - const [oldestDataTimestamp, setOldestDataTimestamp] = React.useState(0); - - const onUpdatingOrUpdated = React.useCallback(() => { - const {isRefreshing, oldestDataTimestamp} = manager.getOldestDataTimestamp(); - setIsGloballyRefreshing(isRefreshing); - setOldestDataTimestamp(oldestDataTimestamp); - }, [manager]); - React.useEffect(() => { - manager.setOnSubscriptionsChangedCallback((keys) => + factory.manager.setOnSubscriptionsChangedCallback((keys) => setAllObservedKeys(keys.map((key) => ({path: key.split('/')}))), ); - manager.setOnUpdatingOrUpdated(onUpdatingOrUpdated); - }, [manager, onUpdatingOrUpdated]); - - const isDocumentVisible = useDocumentVisibility(); + }, []); const pollRate = React.useContext(LiveDataPollRateContext); React.useEffect(() => { - manager.onDocumentVisiblityChange(isDocumentVisible); - }, [manager, isDocumentVisible]); - - React.useEffect(() => { - manager.setPollRate(pollRate); - }, [manager, pollRate]); + factory.manager.setPollRate(pollRate); + }, [pollRate]); useDidLaunchEvent(() => { - manager.refreshKeys(); + factory.manager.invalidateCache(); }, SUBSCRIPTION_MAX_POLL_RATE); React.useEffect(() => { const assetKeyTokens = new Set(allObservedKeys.map(tokenForAssetKey)); const dataForObservedKeys = allObservedKeys - .map((key) => manager.getCacheEntry(key)) + .map((key) => factory.manager.getCacheEntry(tokenForAssetKey(key))) .filter((n) => n) as LiveDataForNode[]; const assetStepKeys = new Set(dataForObservedKeys.flatMap((n) => n.opNames)); @@ -190,36 +109,132 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = (e.stepKey && assetStepKeys.has(e.stepKey)), ) ) { - manager.refreshKeys(); + factory.manager.invalidateCache(); } }); return unobserve; - }, [allObservedKeys, manager]); - - return ( - { - manager.refreshKeys(); - }, [manager]), - }} - > - {children} - - ); + }, [allObservedKeys]); + + return {children}; }; -export function AssetLiveDataRefresh() { - const {isGloballyRefreshing, oldestDataTimestamp, refresh} = React.useContext( - AssetLiveDataRefreshContext, - ); - return ( - - ); +export function AssetLiveDataRefreshButton() { + return ; +} + +export const ASSET_LATEST_INFO_FRAGMENT = gql` + fragment AssetLatestInfoFragment on AssetLatestInfo { + id + assetKey { + path + } + unstartedRunIds + inProgressRunIds + latestRun { + id + ...AssetLatestInfoRun + } + } + + fragment AssetLatestInfoRun on Run { + status + endTime + id + } +`; + +export const ASSET_NODE_LIVE_FRAGMENT = gql` + fragment AssetNodeLiveFragment on AssetNode { + id + opNames + repository { + id + } + assetKey { + path + } + assetMaterializations(limit: 1) { + ...AssetNodeLiveMaterialization + } + assetObservations(limit: 1) { + ...AssetNodeLiveObservation + } + assetChecksOrError { + ... on AssetChecks { + checks { + ...AssetCheckLiveFragment + } + } + } + freshnessInfo { + ...AssetNodeLiveFreshnessInfo + } + staleStatus + staleCauses { + key { + path + } + reason + category + dependency { + path + } + } + partitionStats { + numMaterialized + numMaterializing + numPartitions + numFailed + } + } + + fragment AssetNodeLiveFreshnessInfo on AssetFreshnessInfo { + currentMinutesLate + } + + fragment AssetNodeLiveMaterialization on MaterializationEvent { + timestamp + runId + } + + fragment AssetNodeLiveObservation on ObservationEvent { + timestamp + runId + } + + fragment AssetCheckLiveFragment on AssetCheck { + name + canExecuteIndividually + executionForLatestMaterialization { + id + runId + status + timestamp + stepKey + evaluation { + severity + } + } + } +`; + +export const ASSETS_GRAPH_LIVE_QUERY = gql` + query AssetGraphLiveQuery($assetKeys: [AssetKeyInput!]!) { + assetNodes(assetKeys: $assetKeys, loadMaterializations: true) { + id + ...AssetNodeLiveFragment + } + assetsLatestInfo(assetKeys: $assetKeys) { + id + ...AssetLatestInfoFragment + } + } + + ${ASSET_NODE_LIVE_FRAGMENT} + ${ASSET_LATEST_INFO_FRAGMENT} +`; + +// For tests +export function __resetForJest() { + Object.assign(factory, makeFactory()); } diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThread.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThread.tsx deleted file mode 100644 index 45eae21617e65..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThread.tsx +++ /dev/null @@ -1,246 +0,0 @@ -import {ApolloClient, gql} from '@apollo/client'; - -import type {AssetLiveDataThreadManager} from './AssetLiveDataThreadManager'; -import {AssetGraphLiveQuery, AssetGraphLiveQueryVariables} from './types/AssetLiveDataThread.types'; -import {BATCHING_INTERVAL} from './util'; -import {buildLiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils'; -import {AssetKeyInput} from '../graphql/types'; - -export type AssetLiveDataThreadID = 'default' | 'sidebar' | 'asset-graph' | 'group-node'; - -export class AssetLiveDataThread { - private isFetching: boolean = false; - private listenersCount: {[key: string]: number}; - private client: ApolloClient; - private isLooping: boolean = false; - private interval?: ReturnType; - private manager: AssetLiveDataThreadManager; - public pollRate: number = 30000; - - protected static _threads: {[key: string]: AssetLiveDataThread} = {}; - - constructor(client: ApolloClient, manager: AssetLiveDataThreadManager) { - this.client = client; - this.listenersCount = {}; - this.manager = manager; - } - - public setPollRate(pollRate: number) { - this.pollRate = pollRate; - } - - public subscribe(key: string) { - this.listenersCount[key] = this.listenersCount[key] || 0; - this.listenersCount[key] += 1; - this.startFetchLoop(); - } - - public unsubscribe(key: string) { - this.listenersCount[key] -= 1; - if (this.listenersCount[key] === 0) { - delete this.listenersCount[key]; - } - if (this.getObservedKeys().length === 0) { - this.stopFetchLoop(); - } - } - - public getObservedKeys() { - return Object.keys(this.listenersCount); - } - - public startFetchLoop() { - if (this.isLooping) { - return; - } - this.isLooping = true; - const fetch = () => { - this._batchedQueryAssets(); - }; - setTimeout(fetch, BATCHING_INTERVAL); - this.interval = setInterval(fetch, 5000); - } - - public stopFetchLoop() { - if (!this.isLooping) { - return; - } - this.isLooping = false; - clearInterval(this.interval); - this.interval = undefined; - } - - private async queryAssetKeys(assetKeys: AssetKeyInput[]) { - const {data} = await this.client.query({ - query: ASSETS_GRAPH_LIVE_QUERY, - fetchPolicy: 'network-only', - variables: { - assetKeys, - }, - }); - const nodesByKey = Object.fromEntries( - data.assetNodes.map((node) => [tokenForAssetKey(node.assetKey), node]), - ); - - const liveDataByKey = Object.fromEntries( - data.assetsLatestInfo.map((assetLatestInfo) => { - const id = tokenForAssetKey(assetLatestInfo.assetKey); - return [id, buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo)]; - }), - ); - - this.manager._updateCache(liveDataByKey); - return liveDataByKey; - } - - private async _batchedQueryAssets() { - const assetKeys = this.manager.determineAssetsToFetch(this.getObservedKeys()); - if (!assetKeys.length || this.isFetching) { - return; - } - this.isFetching = true; - this.manager._markAssetsRequested(assetKeys); - - const doNextFetch = () => { - this.isFetching = false; - this._batchedQueryAssets(); - }; - try { - const data = await this.queryAssetKeys(assetKeys); - this.manager._updateFetchedAssets(assetKeys, data); - doNextFetch(); - } catch (e) { - console.error(e); - - if ((e as any)?.message?.includes('500')) { - // Mark these assets as fetched so that we don't retry them until after the poll interval rather than retrying them immediately. - // This is preferable because if the assets failed to fetch it's likely due to a timeout due to the query being too expensive and retrying it - // will not make it more likely to succeed and it would add more load to the database. - this.manager._updateFetchedAssets(assetKeys, {}); - } else { - // If it's not a timeout from the backend then lets keep retrying instead of moving on. - this.manager._unmarkAssetsRequested(assetKeys); - } - - setTimeout( - () => { - doNextFetch(); - }, - // If the poll rate is faster than 5 seconds lets use that instead - Math.min(this.pollRate, 5000), - ); - } - } -} - -export const ASSET_LATEST_INFO_FRAGMENT = gql` - fragment AssetLatestInfoFragment on AssetLatestInfo { - id - assetKey { - path - } - unstartedRunIds - inProgressRunIds - latestRun { - id - ...AssetLatestInfoRun - } - } - - fragment AssetLatestInfoRun on Run { - status - endTime - id - } -`; - -export const ASSET_NODE_LIVE_FRAGMENT = gql` - fragment AssetNodeLiveFragment on AssetNode { - id - opNames - repository { - id - } - assetKey { - path - } - assetMaterializations(limit: 1) { - ...AssetNodeLiveMaterialization - } - assetObservations(limit: 1) { - ...AssetNodeLiveObservation - } - assetChecksOrError { - ... on AssetChecks { - checks { - ...AssetCheckLiveFragment - } - } - } - freshnessInfo { - ...AssetNodeLiveFreshnessInfo - } - staleStatus - staleCauses { - key { - path - } - reason - category - dependency { - path - } - } - partitionStats { - numMaterialized - numMaterializing - numPartitions - numFailed - } - } - - fragment AssetNodeLiveFreshnessInfo on AssetFreshnessInfo { - currentMinutesLate - } - - fragment AssetNodeLiveMaterialization on MaterializationEvent { - timestamp - runId - } - - fragment AssetNodeLiveObservation on ObservationEvent { - timestamp - runId - } - - fragment AssetCheckLiveFragment on AssetCheck { - name - canExecuteIndividually - executionForLatestMaterialization { - id - runId - status - timestamp - stepKey - evaluation { - severity - } - } - } -`; - -export const ASSETS_GRAPH_LIVE_QUERY = gql` - query AssetGraphLiveQuery($assetKeys: [AssetKeyInput!]!) { - assetNodes(assetKeys: $assetKeys, loadMaterializations: true) { - id - ...AssetNodeLiveFragment - } - assetsLatestInfo(assetKeys: $assetKeys) { - id - ...AssetLatestInfoFragment - } - } - - ${ASSET_NODE_LIVE_FRAGMENT} - ${ASSET_LATEST_INFO_FRAGMENT} -`; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/AssetLiveDataProvider.test.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/AssetLiveDataProvider.test.tsx index 2d27058d9980a..660fbf54668d3 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/AssetLiveDataProvider.test.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/AssetLiveDataProvider.test.tsx @@ -3,21 +3,19 @@ jest.useFakeTimers(); import {MockedProvider, MockedResponse} from '@apollo/client/testing'; import {act, render, waitFor} from '@testing-library/react'; import {GraphQLError} from 'graphql/error'; -import React from 'react'; import {buildMockedAssetGraphLiveQuery} from './util'; import {AssetKey, AssetKeyInput, buildAssetKey} from '../../graphql/types'; +import {LiveDataThreadID} from '../../live-data-provider/LiveDataThread'; import {getMockResultFn} from '../../testing/mocking'; -import {AssetLiveDataProvider, useAssetsLiveData} from '../AssetLiveDataProvider'; -import {AssetLiveDataThreadID} from '../AssetLiveDataThread'; -import {AssetLiveDataThreadManager} from '../AssetLiveDataThreadManager'; +import {AssetLiveDataProvider, __resetForJest, useAssetsLiveData} from '../AssetLiveDataProvider'; import {BATCH_SIZE, SUBSCRIPTION_IDLE_POLL_RATE} from '../util'; Object.defineProperty(document, 'visibilityState', {value: 'visible', writable: true}); Object.defineProperty(document, 'hidden', {value: false, writable: true}); afterEach(() => { - AssetLiveDataThreadManager.__resetForJest(); + __resetForJest(); }); function Test({ @@ -27,7 +25,7 @@ function Test({ mocks: MockedResponse[]; hooks: { keys: AssetKeyInput[]; - thread?: AssetLiveDataThreadID; + thread?: LiveDataThreadID; hookResult: (data: ReturnType['liveDataByNode']) => void; }[]; }) { @@ -37,7 +35,7 @@ function Test({ hookResult, }: { keys: AssetKeyInput[]; - thread?: AssetLiveDataThreadID; + thread?: LiveDataThreadID; hookResult: (data: ReturnType['liveDataByNode']) => void; }) { hookResult(useAssetsLiveData(keys, thread).liveDataByNode); diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/util.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/util.ts index 0f40eeb83b8ac..16bfb7a904cfe 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/util.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/__tests__/util.ts @@ -7,11 +7,11 @@ import { buildAssetNode, } from '../../graphql/types'; import {buildQueryMock} from '../../testing/mocking'; -import {ASSETS_GRAPH_LIVE_QUERY} from '../AssetLiveDataThread'; +import {ASSETS_GRAPH_LIVE_QUERY} from '../AssetLiveDataProvider'; import { AssetGraphLiveQuery, AssetGraphLiveQueryVariables, -} from '../types/AssetLiveDataThread.types'; +} from '../types/AssetLiveDataProvider.types'; export function buildMockedAssetGraphLiveQuery( assetKeys: AssetKeyInput[], diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/types/AssetLiveDataThread.types.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-data/types/AssetLiveDataProvider.types.ts similarity index 100% rename from js_modules/dagster-ui/packages/ui-core/src/asset-data/types/AssetLiveDataThread.types.ts rename to js_modules/dagster-ui/packages/ui-core/src/asset-data/types/AssetLiveDataProvider.types.ts diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx index 229afec7ca306..0c9b76164423d 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx @@ -42,7 +42,7 @@ import {AssetNodeForGraphQueryFragment} from './types/useAssetGraphData.types'; import {AssetGraphFetchScope, AssetGraphQueryItem, useAssetGraphData} from './useAssetGraphData'; import {AssetLocation, useFindAssetLocation} from './useFindAssetLocation'; import {ShortcutHandler} from '../app/ShortcutHandler'; -import {AssetLiveDataRefresh} from '../asset-data/AssetLiveDataProvider'; +import {AssetLiveDataRefreshButton} from '../asset-data/AssetLiveDataProvider'; import {LaunchAssetExecutionButton} from '../assets/LaunchAssetExecutionButton'; import {LaunchAssetObservationButton} from '../assets/LaunchAssetObservationButton'; import {AssetKey} from '../assets/types'; @@ -781,7 +781,7 @@ const AssetGraphExplorerWithData = ({ popoverPosition="bottom-left" /> - + { const dimensions = getAssetNodeDimensions(definitionCopy); function SetCacheEntry() { - const client = useApolloClient(); - AssetLiveDataThreadManager.getInstance(client)._updateCache({ + factory.manager._updateCache({ [tokenForAssetKey(definitionCopy.assetKey)]: scenario.liveData!, }); return null; @@ -103,8 +100,7 @@ export const PartnerTags = () => { const liveData = Mocks.LiveDataForNodeMaterialized; function SetCacheEntry() { - const client = useApolloClient(); - AssetLiveDataThreadManager.getInstance(client)._updateCache({ + factory.manager._updateCache({ [tokenForAssetKey(buildAssetKey({path: [liveData.stepKey]}))]: liveData!, }); return null; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/AssetNode.test.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/AssetNode.test.tsx index e5c26695c21d1..3dce3ae8f68ff 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/AssetNode.test.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/AssetNode.test.tsx @@ -1,10 +1,8 @@ -import {useApolloClient} from '@apollo/client'; import {MockedProvider} from '@apollo/client/testing'; import {render, screen, waitFor} from '@testing-library/react'; import {MemoryRouter} from 'react-router-dom'; -import {AssetLiveDataProvider} from '../../asset-data/AssetLiveDataProvider'; -import {AssetLiveDataThreadManager} from '../../asset-data/AssetLiveDataThreadManager'; +import {AssetLiveDataProvider, factory} from '../../asset-data/AssetLiveDataProvider'; import {AssetNode} from '../AssetNode'; import {tokenForAssetKey} from '../Utils'; import { @@ -34,8 +32,7 @@ describe('AssetNode', () => { : JSON.parse(scenario.definition.id); function SetCacheEntry() { - const client = useApolloClient(); - AssetLiveDataThreadManager.getInstance(client)._updateCache({ + factory.manager._updateCache({ [tokenForAssetKey(definitionCopy.assetKey)]: scenario.liveData!, }); return null; diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/AssetView.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/AssetView.tsx index d7539aa67b231..a0b8e13adb1ef 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/AssetView.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/AssetView.tsx @@ -31,7 +31,7 @@ import {healthRefreshHintFromLiveData} from './usePartitionHealthData'; import {useReportEventsModal} from './useReportEventsModal'; import {useFeatureFlags} from '../app/Flags'; import {Timestamp} from '../app/time/Timestamp'; -import {AssetLiveDataRefresh, useAssetLiveData} from '../asset-data/AssetLiveDataProvider'; +import {AssetLiveDataRefreshButton, useAssetLiveData} from '../asset-data/AssetLiveDataProvider'; import { GraphData, LiveDataForNode, @@ -263,7 +263,7 @@ export const AssetView = ({assetKey, trace}: Props) => { - + } diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/FailedRunSinceMaterializationBanner.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/FailedRunSinceMaterializationBanner.tsx index 6ee3ed4797795..c4eeceed44443 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/FailedRunSinceMaterializationBanner.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/FailedRunSinceMaterializationBanner.tsx @@ -6,7 +6,7 @@ import { } from '@dagster-io/ui-components/src/components/types'; import {Link} from 'react-router-dom'; -import {AssetLatestInfoRunFragment} from '../asset-data/types/AssetLiveDataThread.types'; +import {AssetLatestInfoRunFragment} from '../asset-data/types/AssetLiveDataProvider.types'; import {titleForRun} from '../runs/RunUtils'; import {useStepLogs} from '../runs/StepLogsDialog'; diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/__fixtures__/AssetTables.fixtures.ts b/js_modules/dagster-ui/packages/ui-core/src/assets/__fixtures__/AssetTables.fixtures.ts index b0dc208e21606..3891b4281bfa6 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/__fixtures__/AssetTables.fixtures.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/__fixtures__/AssetTables.fixtures.ts @@ -1,7 +1,7 @@ import {MockedResponse} from '@apollo/client/testing'; -import {ASSETS_GRAPH_LIVE_QUERY} from '../../asset-data/AssetLiveDataThread'; -import {AssetGraphLiveQuery} from '../../asset-data/types/AssetLiveDataThread.types'; +import {ASSETS_GRAPH_LIVE_QUERY} from '../../asset-data/AssetLiveDataProvider'; +import {AssetGraphLiveQuery} from '../../asset-data/types/AssetLiveDataProvider.types'; import {MockStaleReasonData} from '../../asset-graph/__fixtures__/AssetNode.fixtures'; import { RunStatus, diff --git a/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/Factory.tsx b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/Factory.tsx new file mode 100644 index 0000000000000..26c9395c6ebeb --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/Factory.tsx @@ -0,0 +1,54 @@ +import React from 'react'; + +import { + LiveDataProvider, + LiveDataRefresh, + useLiveData, + useLiveDataSingle, +} from './LiveDataProvider'; +import {LiveDataThreadID} from './LiveDataThread'; +import {LiveDataThreadManager} from './LiveDataThreadManager'; + +export function liveDataFactory( + useHooks: () => R, + queryKeys: (keys: string[], result: R) => Promise>, +) { + const resultsFromUseHook: {current: R | undefined} = {current: undefined}; + const manager = new LiveDataThreadManager((keys: string[]) => { + if (!resultsFromUseHook.current) { + throw new Error( + 'Expected LiveDataProvider to have been in the DOM by the time queryKeys is called', + ); + } + return queryKeys(keys, resultsFromUseHook.current); + }); + + const LiveDataRefreshContext = React.createContext<{ + isGloballyRefreshing: boolean; + oldestDataTimestamp: number; + refresh: () => void; + }>({ + isGloballyRefreshing: false, + oldestDataTimestamp: Infinity, + refresh: () => {}, + }); + + return { + LiveDataProvider: ({children}: {children: React.ReactNode}) => { + resultsFromUseHook.current = useHooks(); + return ( + manager={manager} LiveDataRefreshContext={LiveDataRefreshContext}> + {children} + + ); + }, + useLiveData: (keys: string[], thread: LiveDataThreadID = 'default') => { + return useLiveData(keys, manager, thread); + }, + useLiveDataSingle: (key: string, thread: LiveDataThreadID = 'default') => { + return useLiveDataSingle(key, manager, thread); + }, + manager, + LiveDataRefresh: () => , + }; +} diff --git a/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataProvider.tsx new file mode 100644 index 0000000000000..0bcc2dc199c94 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataProvider.tsx @@ -0,0 +1,173 @@ +import React from 'react'; + +import {LiveDataRefreshButton} from './LiveDataRefreshButton'; +import {LiveDataThreadID} from './LiveDataThread'; +import {LiveDataThreadManager} from './LiveDataThreadManager'; +import {useDocumentVisibility} from '../hooks/useDocumentVisibility'; + +export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000; + +export const LiveDataPollRateContext = React.createContext(SUBSCRIPTION_IDLE_POLL_RATE); + +export function useLiveDataSingle( + key: string, + manager: LiveDataThreadManager, + thread: LiveDataThreadID = 'default', +) { + const {liveDataByNode, refresh, refreshing} = useLiveData( + React.useMemo(() => [key], [key]), + manager, + thread, + ); + return { + liveData: liveDataByNode[key], + refresh, + refreshing, + }; +} + +export function useLiveData( + keys: string[], + manager: LiveDataThreadManager, + thread: LiveDataThreadID = 'default', + batchUpdatesInterval: number = 1000, +) { + const [data, setData] = React.useState>({}); + + const [isRefreshing, setIsRefreshing] = React.useState(false); + + React.useEffect(() => { + let timeout: ReturnType | null = null; + let didUpdateOnce = false; + let didScheduleUpdateOnce = false; + let updates: {stringKey: string; data: T | undefined}[] = []; + + function processUpdates() { + setData((data) => { + const copy = {...data}; + updates.forEach(({stringKey, data}) => { + if (data) { + copy[stringKey] = data; + } else { + delete copy[stringKey]; + } + }); + updates = []; + return copy; + }); + } + + const setDataSingle = (stringKey: string, data?: T | undefined) => { + /** + * Throttle updates to avoid triggering too many GCs and too many updates when fetching 1,000 assets, + */ + updates.push({stringKey, data}); + if (!didUpdateOnce) { + if (!didScheduleUpdateOnce) { + didScheduleUpdateOnce = true; + requestAnimationFrame(() => { + processUpdates(); + didUpdateOnce = true; + }); + } + } else if (!timeout) { + timeout = setTimeout(() => { + processUpdates(); + timeout = null; + }, batchUpdatesInterval); + } + }; + const unsubscribeCallbacks = keys.map((key) => manager.subscribe(key, setDataSingle, thread)); + return () => { + unsubscribeCallbacks.forEach((cb) => { + cb(); + }); + }; + }, [keys, batchUpdatesInterval, manager, thread]); + + return { + liveDataByNode: data, + + refresh: React.useCallback(() => { + manager.invalidateCache(keys); + setIsRefreshing(true); + }, [keys, manager]), + + refreshing: React.useMemo(() => { + if (isRefreshing && !manager.areKeysRefreshing(keys)) { + setTimeout(() => { + setIsRefreshing(false); + }); + return false; + } + return true; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [keys, data, isRefreshing]), + }; +} + +export const LiveDataProvider = ({ + children, + LiveDataRefreshContext, + manager, +}: { + children: React.ReactNode; + LiveDataRefreshContext: React.Context<{ + isGloballyRefreshing: boolean; + oldestDataTimestamp: number; + refresh: () => void; + }>; + manager: LiveDataThreadManager; +}) => { + const [isGloballyRefreshing, setIsGloballyRefreshing] = React.useState(false); + const [oldestDataTimestamp, setOldestDataTimestamp] = React.useState(0); + + const onUpdatingOrUpdated = React.useCallback(() => { + const {isRefreshing, oldestDataTimestamp} = manager.getOldestDataTimestamp(); + setIsGloballyRefreshing(isRefreshing); + setOldestDataTimestamp(oldestDataTimestamp); + }, [manager]); + + React.useEffect(() => { + manager.setOnUpdatingOrUpdated(onUpdatingOrUpdated); + }, [manager, onUpdatingOrUpdated]); + + const isDocumentVisible = useDocumentVisibility(); + React.useEffect(() => { + manager.onDocumentVisiblityChange(isDocumentVisible); + }, [manager, isDocumentVisible]); + + return ( + { + manager.invalidateCache(); + }, [manager]), + }} + > + {children} + + ); +}; + +export function LiveDataRefresh({ + LiveDataRefreshContext, +}: { + LiveDataRefreshContext: React.Context<{ + isGloballyRefreshing: boolean; + oldestDataTimestamp: number; + refresh: () => void; + }>; +}) { + const {isGloballyRefreshing, oldestDataTimestamp, refresh} = + React.useContext(LiveDataRefreshContext); + return ( + + ); +} diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetDataRefreshButton.tsx b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataRefreshButton.tsx similarity index 92% rename from js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetDataRefreshButton.tsx rename to js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataRefreshButton.tsx index e6d5b80f6a519..e830c5895d012 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetDataRefreshButton.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataRefreshButton.tsx @@ -25,7 +25,7 @@ dayjs.updateLocale('en', { }, }); -export const AssetDataRefreshButton = ({ +export const LiveDataRefreshButton = ({ isRefreshing, onRefresh, oldestDataTimestamp, @@ -69,5 +69,5 @@ const TimeFromNowWithSeconds = ({timestamp}: {timestamp: number}) => { clearInterval(interval); }; }, [timestamp]); - return <>{text === '0s' ? 'Refreshing asset data…' : `Data is at most ${text} old`}; + return <>{text === '0s' ? 'Refreshing data…' : `Data is at most ${text} old`}; }; diff --git a/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThread.tsx b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThread.tsx new file mode 100644 index 0000000000000..e576979e91c3c --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThread.tsx @@ -0,0 +1,112 @@ +import {LiveDataThreadManager} from './LiveDataThreadManager'; +import {BATCHING_INTERVAL} from './util'; + +export type LiveDataThreadID = 'default' | 'sidebar' | 'asset-graph' | 'group-node'; + +export class LiveDataThread { + private isFetching: boolean = false; + private listenersCount: {[key: string]: number}; + private isLooping: boolean = false; + private interval?: ReturnType; + private manager: LiveDataThreadManager; + public pollRate: number = 30000; + + protected static _threads: {[key: string]: LiveDataThread} = {}; + + private async queryKeys(_keys: string[]): Promise> { + return {}; + } + + constructor( + manager: LiveDataThreadManager, + queryKeys: (keys: string[]) => Promise>, + ) { + this.queryKeys = queryKeys; + this.listenersCount = {}; + this.manager = manager; + } + + public setPollRate(pollRate: number) { + this.pollRate = pollRate; + } + + public subscribe(key: string) { + this.listenersCount[key] = this.listenersCount[key] || 0; + this.listenersCount[key] += 1; + this.startFetchLoop(); + } + + public unsubscribe(key: string) { + this.listenersCount[key] -= 1; + if (this.listenersCount[key] === 0) { + delete this.listenersCount[key]; + } + if (this.getObservedKeys().length === 0) { + this.stopFetchLoop(); + } + } + + public getObservedKeys() { + return Object.keys(this.listenersCount); + } + + public startFetchLoop() { + if (this.isLooping) { + return; + } + this.isLooping = true; + const fetch = () => { + this._batchedQueryKeys(); + }; + setTimeout(fetch, BATCHING_INTERVAL); + this.interval = setInterval(fetch, 5000); + } + + public stopFetchLoop() { + if (!this.isLooping) { + return; + } + this.isLooping = false; + clearInterval(this.interval); + this.interval = undefined; + } + + private async _batchedQueryKeys() { + const keys = this.manager.determineKeysToFetch(this.getObservedKeys()); + if (!keys.length || this.isFetching) { + return; + } + this.isFetching = true; + this.manager._markKeysRequested(keys); + + const doNextFetch = () => { + this.isFetching = false; + this._batchedQueryKeys(); + }; + try { + const data = await this.queryKeys(keys); + this.manager._updateFetchedKeys(keys, data); + doNextFetch(); + } catch (e) { + console.error(e); + + if ((e as any)?.message?.includes('500')) { + // Mark these keys as fetched so that we don't retry them until after the poll interval rather than retrying them immediately. + // This is preferable because if the keys failed to fetch it's likely due to a timeout due to the query being too expensive and retrying it + // will not make it more likely to succeed and it would add more load to the database. + this.manager._updateFetchedKeys(keys, {}); + } else { + // If it's not a timeout from the backend then lets keep retrying instead of moving on. + this.manager._unmarkKeysRequested(keys); + } + + setTimeout( + () => { + doNextFetch(); + }, + // If the poll rate is faster than 5 seconds lets use that instead + Math.min(this.pollRate, 5000), + ); + } + } +} diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThreadManager.tsx b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThreadManager.tsx similarity index 52% rename from js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThreadManager.tsx rename to js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThreadManager.tsx index 3e9ae85c7d2ed..dfdc8460a5ad7 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataThreadManager.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/LiveDataThreadManager.tsx @@ -1,45 +1,37 @@ -import {ApolloClient} from '@apollo/client'; - -import {AssetLiveDataThread, AssetLiveDataThreadID} from './AssetLiveDataThread'; +import {LiveDataThread, LiveDataThreadID} from './LiveDataThread'; import {BATCH_SIZE} from './util'; -import {LiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils'; -import {AssetKeyInput} from '../graphql/types'; import {isDocumentVisible} from '../hooks/useDocumentVisibility'; -type Listener = (stringKey: string, assetData?: LiveDataForNode) => void; +type Listener = (stringKey: string, data?: T | undefined) => void; -export class AssetLiveDataThreadManager { - protected static _instance: AssetLiveDataThreadManager; - private threads: Partial>; +export class LiveDataThreadManager { + protected static _instance: LiveDataThreadManager; + private threads: Partial>>; private lastFetchedOrRequested: Record< string, {fetched: number; requested?: undefined} | {requested: number; fetched?: undefined} | null >; - private cache: Record; - private client: ApolloClient; + private cache: Record; private pollRate: number = 30000; - private listeners: Record; + private listeners: Record[]>; private isPaused: boolean; private onSubscriptionsChanged(_allKeys: string[]) {} private onUpdatedOrUpdating() {} - constructor(client: ApolloClient) { + private async queryKeys(_keys: string[]): Promise> { + return {}; + } + + constructor(queryKeys: (keys: string[]) => Promise>) { + this.queryKeys = queryKeys; this.lastFetchedOrRequested = {}; this.cache = {}; - this.client = client; this.threads = {}; this.listeners = {}; this.isPaused = false; } - static getInstance(client: ApolloClient) { - if (!this._instance) { - this._instance = new AssetLiveDataThreadManager(client); - } - return this._instance; - } - public setPollRate(pollRate: number) { this.pollRate = pollRate; Object.values(this.threads).forEach((thread) => { @@ -47,7 +39,7 @@ export class AssetLiveDataThreadManager { }); } - // This callback is used by the main provider context to identify which assets we should be listening to run events for. + // This callback is used by the main provider context to identify which keys we should be listening to run events for. public setOnSubscriptionsChangedCallback( onSubscriptionsChanged: typeof this.onSubscriptionsChanged, ) { @@ -60,36 +52,31 @@ export class AssetLiveDataThreadManager { this.onUpdatedOrUpdating = onUpdatingOrUpdated; } - public subscribe( - key: AssetKeyInput, - listener: Listener, - threadID: AssetLiveDataThreadID = 'default', - ) { - const assetKey = tokenForAssetKey(key); + public subscribe(key: string, listener: Listener, threadID: LiveDataThreadID = 'default') { let _thread = this.threads[threadID]; if (!_thread) { - _thread = new AssetLiveDataThread(this.client, this); + _thread = new LiveDataThread(this, this.queryKeys); if (!this.isPaused) { _thread.startFetchLoop(); } this.threads[threadID] = _thread; } - this.listeners[assetKey] = this.listeners[assetKey] || []; - this.listeners[assetKey]!.push(listener); - if (this.cache[assetKey]) { - listener(assetKey, this.cache[assetKey]); + this.listeners[key] = this.listeners[key] || []; + this.listeners[key]!.push(listener); + if (this.cache[key]) { + listener(key, this.cache[key]); } const thread = _thread; - thread.subscribe(assetKey); + thread.subscribe(key); this.scheduleOnSubscriptionsChanged(); return () => { - thread.unsubscribe(assetKey); + thread.unsubscribe(key); this.scheduleOnSubscriptionsChanged(); }; } /** - * Schedule calling onSubscriptionsChanged instead of calling it synchronously in case we're unsubscribing from 1,000+ assets synchronously + * Schedule calling onSubscriptionsChanged instead of calling it synchronously in case we're unsubscribing from 1,000+ keys synchronously */ private scheduledOnSubscriptionsChanged: boolean = false; private scheduleOnSubscriptionsChanged() { @@ -104,22 +91,20 @@ export class AssetLiveDataThreadManager { } /** - * Removes the lastFetchedOrRequested entries for the assetKeys specified or all assetKeys if none are specified - * so that the assetKeys are re-eligible for fetching again despite the pollRate. + * Removes the lastFetchedOrRequested entries for the keys specified or all keys if none are specified + * so that the keys are re-eligible for fetching again despite the pollRate. */ - public refreshKeys(keys?: AssetKeyInput[]) { - (keys?.map((key) => tokenForAssetKey(key)) ?? Object.keys(this.lastFetchedOrRequested)).forEach( - (key) => { - delete this.lastFetchedOrRequested[key]; - }, - ); + public invalidateCache(keys?: string[]) { + (keys ?? Object.keys(this.lastFetchedOrRequested)).forEach((key) => { + delete this.lastFetchedOrRequested[key]; + }); } // Function used by threads. - public determineAssetsToFetch(keys: string[]) { - const assetsToFetch: AssetKeyInput[] = []; - const assetsWithoutData: AssetKeyInput[] = []; - while (keys.length && assetsWithoutData.length < BATCH_SIZE) { + public determineKeysToFetch(keys: string[]) { + const keysToFetch: string[] = []; + const keysWithoutData: string[] = []; + while (keys.length && keysWithoutData.length < BATCH_SIZE) { const key = keys.shift()!; const isRequested = !!this.lastFetchedOrRequested[key]?.requested; if (isRequested) { @@ -130,20 +115,19 @@ export class AssetLiveDataThreadManager { continue; } if (lastFetchTime && isDocumentVisible()) { - assetsToFetch.push({path: key.split('/')}); + keysToFetch.push(key); } else { - assetsWithoutData.push({path: key.split('/')}); + keysWithoutData.push(key); } } - // Prioritize fetching assets for which there is no data in the cache - return assetsWithoutData.concat(assetsToFetch).slice(0, BATCH_SIZE); + // Prioritize fetching keys for which there is no data in the cache + return keysWithoutData.concat(keysToFetch).slice(0, BATCH_SIZE); } - public areKeysRefreshing(assetKeys: AssetKeyInput[]) { - for (const key of assetKeys) { - const stringKey = tokenForAssetKey(key); - if (!this.lastFetchedOrRequested[stringKey]?.fetched) { + public areKeysRefreshing(keys: string[]) { + for (const key of keys) { + if (!this.lastFetchedOrRequested[key]?.fetched) { return true; } } @@ -156,10 +140,10 @@ export class AssetLiveDataThreadManager { } public getOldestDataTimestamp() { - const allAssetKeys = Object.keys(this.listeners).filter((key) => this.listeners[key]?.length); - let isRefreshing = allAssetKeys.length ? true : false; + const allKeys = Object.keys(this.listeners).filter((key) => this.listeners[key]?.length); + let isRefreshing = allKeys.length ? true : false; let oldestDataTimestamp = Infinity; - for (const key of allAssetKeys) { + for (const key of allKeys) { if (this.lastFetchedOrRequested[key]?.fetched) { isRefreshing = false; } @@ -174,7 +158,7 @@ export class AssetLiveDataThreadManager { }; } - public _updateCache(data: Record) { + public _updateCache(data: Record) { Object.assign(this.cache, data); } @@ -202,51 +186,45 @@ export class AssetLiveDataThreadManager { }); } - public getCacheEntry(key: AssetKeyInput) { - return this.cache[tokenForAssetKey(key)]; + public getCacheEntry(key: string) { + return this.cache[key]; } - public _markAssetsRequested(assetKeys: AssetKeyInput[]) { + public _markKeysRequested(keys: string[]) { const requestTime = Date.now(); - assetKeys.forEach((key) => { - this.lastFetchedOrRequested[tokenForAssetKey(key)] = { + keys.forEach((key) => { + this.lastFetchedOrRequested[key] = { requested: requestTime, }; }); this.onUpdatedOrUpdating(); } - public _unmarkAssetsRequested(assetKeys: AssetKeyInput[]) { - assetKeys.forEach((key) => { - delete this.lastFetchedOrRequested[tokenForAssetKey(key)]; + public _unmarkKeysRequested(keys: string[]) { + keys.forEach((key) => { + delete this.lastFetchedOrRequested[key]; }); } - public _updateFetchedAssets(assetKeys: AssetKeyInput[], data: Record) { + public _updateFetchedKeys(keys: string[], data: Record) { const fetchedTime = Date.now(); - assetKeys.forEach((key) => { - const stringKey = tokenForAssetKey(key); - this.lastFetchedOrRequested[stringKey] = { + keys.forEach((key) => { + this.lastFetchedOrRequested[key] = { fetched: fetchedTime, }; - const assetData = data[stringKey]; + const assetData = data[key]; if (!assetData) { return; } - this.cache[stringKey] = assetData; - const listeners = this.listeners[stringKey]; + this.cache[key] = assetData; + const listeners = this.listeners[key]; if (!listeners) { return; } listeners.forEach((listener) => { - listener(stringKey, assetData); + listener(key, assetData); }); }); this.onUpdatedOrUpdating(); } - - public static __resetForJest() { - // @ts-expect-error - its ok - AssetLiveDataThreadManager._instance = undefined; - } } diff --git a/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/util.ts b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/util.ts new file mode 100644 index 0000000000000..fb63b233131b0 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/live-data-provider/util.ts @@ -0,0 +1,8 @@ +// How many assets to fetch at once +export const BATCH_SIZE = 10; + +// Milliseconds we wait until sending a batched query +export const BATCHING_INTERVAL = 250; + +export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000; +export const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;