Skip to content

Commit

Permalink
Refactor AssetLiveDataProvider -> LiveDataProvider (#19453)
Browse files Browse the repository at this point in the history
## Summary & Motivation

The strategy of chunking/batching queries to the server is one I want to
reuse on the upcoming asset timeline page. To support that I'm
refactoring this class to allow for the core chunking/batching logic to
be reused by other queries.

## How I Tested These Changes


I relied on the existing jest tests for AssetLiveDataProvider
  • Loading branch information
salazarm authored Feb 1, 2024
1 parent 626fede commit f6a0162
Show file tree
Hide file tree
Showing 19 changed files with 612 additions and 523 deletions.
2 changes: 1 addition & 1 deletion js_modules/dagster-ui/packages/app-oss/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import {AppTopNav} from '@dagster-io/ui-core/app/AppTopNav';
import {ContentRoot} from '@dagster-io/ui-core/app/ContentRoot';
import {UserSettingsButton} from '@dagster-io/ui-core/app/UserSettingsButton';
import {logLink, timeStartLink} from '@dagster-io/ui-core/app/apolloLinks';
import {LiveDataPollRateContext} from '@dagster-io/ui-core/asset-data/AssetLiveDataProvider';
import {DeploymentStatusType} from '@dagster-io/ui-core/instance/DeploymentStatusProvider';
import {LiveDataPollRateContext} from '@dagster-io/ui-core/live-data-provider/LiveDataProvider';

import {CommunityNux} from './NUX/CommunityNux';
import {extractInitializationData} from './extractInitializationData';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,172 +1,91 @@
import {useApolloClient} from '@apollo/client';
import {ApolloClient, gql, useApolloClient} from '@apollo/client';
import uniq from 'lodash/uniq';
import React from 'react';

import {AssetDataRefreshButton} from './AssetDataRefreshButton';
import {AssetLiveDataThreadID} from './AssetLiveDataThread';
import {AssetLiveDataThreadManager} from './AssetLiveDataThreadManager';
import {
AssetGraphLiveQuery,
AssetGraphLiveQueryVariables,
} from './types/AssetLiveDataProvider.types';
import {SUBSCRIPTION_MAX_POLL_RATE} from './util';
import {observeAssetEventsInRuns} from '../asset-graph/AssetRunLogObserver';
import {LiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils';
import {
LiveDataForNode,
buildLiveDataForNode,
tokenForAssetKey,
tokenToAssetKey,
} from '../asset-graph/Utils';
import {AssetKeyInput} from '../graphql/types';
import {useDocumentVisibility} from '../hooks/useDocumentVisibility';
import {liveDataFactory} from '../live-data-provider/Factory';
import {LiveDataPollRateContext} from '../live-data-provider/LiveDataProvider';
import {LiveDataThreadID} from '../live-data-provider/LiveDataThread';
import {useDidLaunchEvent} from '../runs/RunUtils';

export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

export const LiveDataPollRateContext = React.createContext<number>(SUBSCRIPTION_IDLE_POLL_RATE);

export const AssetLiveDataRefreshContext = React.createContext<{
isGloballyRefreshing: boolean;
oldestDataTimestamp: number;
refresh: () => void;
}>({
isGloballyRefreshing: false,
oldestDataTimestamp: Infinity,
refresh: () => {},
});

export function useAssetLiveData(
assetKey: AssetKeyInput,
thread: AssetLiveDataThreadID = 'default',
) {
const {liveDataByNode, refresh, refreshing} = useAssetsLiveData(
React.useMemo(() => [assetKey], [assetKey]),
thread,
function makeFactory() {
return liveDataFactory(
() => {
return useApolloClient();
},
async (keys, client: ApolloClient<any>) => {
const {data} = await client.query<AssetGraphLiveQuery, AssetGraphLiveQueryVariables>({
query: ASSETS_GRAPH_LIVE_QUERY,
fetchPolicy: 'network-only',
variables: {
assetKeys: keys.map(tokenToAssetKey),
},
});
const nodesByKey = Object.fromEntries(
data.assetNodes.map((node) => [tokenForAssetKey(node.assetKey), node]),
);

const liveDataByKey = Object.fromEntries(
data.assetsLatestInfo.map((assetLatestInfo) => {
const id = tokenForAssetKey(assetLatestInfo.assetKey);
return [id, buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo)];
}),
);
return liveDataByKey;
},
);
return {
liveData: liveDataByNode[tokenForAssetKey(assetKey)],
refresh,
refreshing,
};
}
export const factory = makeFactory();

export function useAssetLiveData(assetKey: AssetKeyInput, thread: LiveDataThreadID = 'default') {
return factory.useLiveDataSingle(tokenForAssetKey(assetKey), thread);
}

export function useAssetsLiveData(
assetKeys: AssetKeyInput[],
thread: AssetLiveDataThreadID = 'default',
batchUpdatesInterval: number = 1000,
thread: LiveDataThreadID = 'default',
) {
const [data, setData] = React.useState<Record<string, LiveDataForNode>>({});

const [isRefreshing, setIsRefreshing] = React.useState(false);

const client = useApolloClient();
const manager = AssetLiveDataThreadManager.getInstance(client);

React.useEffect(() => {
let timeout: ReturnType<typeof setTimeout> | null = null;
let didUpdateOnce = false;
let didScheduleUpdateOnce = false;
let updates: {stringKey: string; assetData: LiveDataForNode | undefined}[] = [];

function processUpdates() {
setData((data) => {
const copy = {...data};
updates.forEach(({stringKey, assetData}) => {
if (assetData) {
copy[stringKey] = assetData;
} else {
delete copy[stringKey];
}
});
updates = [];
return copy;
});
}

const setDataSingle = (stringKey: string, assetData?: LiveDataForNode) => {
/**
* Throttle updates to avoid triggering too many GCs and too many updates when fetching 1,000 assets,
*/
updates.push({stringKey, assetData});
if (!didUpdateOnce) {
if (!didScheduleUpdateOnce) {
didScheduleUpdateOnce = true;
requestAnimationFrame(() => {
processUpdates();
didUpdateOnce = true;
});
}
} else if (!timeout) {
timeout = setTimeout(() => {
processUpdates();
timeout = null;
}, batchUpdatesInterval);
}
};
const unsubscribeCallbacks = assetKeys.map((key) =>
manager.subscribe(key, setDataSingle, thread),
);
return () => {
unsubscribeCallbacks.forEach((cb) => {
cb();
});
};
}, [assetKeys, batchUpdatesInterval, manager, thread]);

return {
liveDataByNode: data,

refresh: React.useCallback(() => {
manager.refreshKeys(assetKeys);
setIsRefreshing(true);
}, [assetKeys, manager]),

refreshing: React.useMemo(() => {
if (isRefreshing && !manager.areKeysRefreshing(assetKeys)) {
setTimeout(() => {
setIsRefreshing(false);
});
return false;
}
return true;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [assetKeys, data, isRefreshing]),
};
return factory.useLiveData(
assetKeys.map((key) => tokenForAssetKey(key)),
thread,
);
}

export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) => {
const [allObservedKeys, setAllObservedKeys] = React.useState<AssetKeyInput[]>([]);

const client = useApolloClient();
const manager = AssetLiveDataThreadManager.getInstance(client);

const [isGloballyRefreshing, setIsGloballyRefreshing] = React.useState(false);
const [oldestDataTimestamp, setOldestDataTimestamp] = React.useState(0);

const onUpdatingOrUpdated = React.useCallback(() => {
const {isRefreshing, oldestDataTimestamp} = manager.getOldestDataTimestamp();
setIsGloballyRefreshing(isRefreshing);
setOldestDataTimestamp(oldestDataTimestamp);
}, [manager]);

React.useEffect(() => {
manager.setOnSubscriptionsChangedCallback((keys) =>
factory.manager.setOnSubscriptionsChangedCallback((keys) =>
setAllObservedKeys(keys.map((key) => ({path: key.split('/')}))),
);
manager.setOnUpdatingOrUpdated(onUpdatingOrUpdated);
}, [manager, onUpdatingOrUpdated]);

const isDocumentVisible = useDocumentVisibility();
}, []);

const pollRate = React.useContext(LiveDataPollRateContext);

React.useEffect(() => {
manager.onDocumentVisiblityChange(isDocumentVisible);
}, [manager, isDocumentVisible]);

React.useEffect(() => {
manager.setPollRate(pollRate);
}, [manager, pollRate]);
factory.manager.setPollRate(pollRate);
}, [pollRate]);

useDidLaunchEvent(() => {
manager.refreshKeys();
factory.manager.invalidateCache();
}, SUBSCRIPTION_MAX_POLL_RATE);

React.useEffect(() => {
const assetKeyTokens = new Set(allObservedKeys.map(tokenForAssetKey));
const dataForObservedKeys = allObservedKeys
.map((key) => manager.getCacheEntry(key))
.map((key) => factory.manager.getCacheEntry(tokenForAssetKey(key)))
.filter((n) => n) as LiveDataForNode[];

const assetStepKeys = new Set(dataForObservedKeys.flatMap((n) => n.opNames));
Expand All @@ -190,36 +109,132 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
(e.stepKey && assetStepKeys.has(e.stepKey)),
)
) {
manager.refreshKeys();
factory.manager.invalidateCache();
}
});
return unobserve;
}, [allObservedKeys, manager]);

return (
<AssetLiveDataRefreshContext.Provider
value={{
isGloballyRefreshing,
oldestDataTimestamp,
refresh: React.useCallback(() => {
manager.refreshKeys();
}, [manager]),
}}
>
{children}
</AssetLiveDataRefreshContext.Provider>
);
}, [allObservedKeys]);

return <factory.LiveDataProvider>{children}</factory.LiveDataProvider>;
};

export function AssetLiveDataRefresh() {
const {isGloballyRefreshing, oldestDataTimestamp, refresh} = React.useContext(
AssetLiveDataRefreshContext,
);
return (
<AssetDataRefreshButton
isRefreshing={isGloballyRefreshing}
oldestDataTimestamp={oldestDataTimestamp}
onRefresh={refresh}
/>
);
export function AssetLiveDataRefreshButton() {
return <factory.LiveDataRefresh />;
}

export const ASSET_LATEST_INFO_FRAGMENT = gql`
fragment AssetLatestInfoFragment on AssetLatestInfo {
id
assetKey {
path
}
unstartedRunIds
inProgressRunIds
latestRun {
id
...AssetLatestInfoRun
}
}
fragment AssetLatestInfoRun on Run {
status
endTime
id
}
`;

export const ASSET_NODE_LIVE_FRAGMENT = gql`
fragment AssetNodeLiveFragment on AssetNode {
id
opNames
repository {
id
}
assetKey {
path
}
assetMaterializations(limit: 1) {
...AssetNodeLiveMaterialization
}
assetObservations(limit: 1) {
...AssetNodeLiveObservation
}
assetChecksOrError {
... on AssetChecks {
checks {
...AssetCheckLiveFragment
}
}
}
freshnessInfo {
...AssetNodeLiveFreshnessInfo
}
staleStatus
staleCauses {
key {
path
}
reason
category
dependency {
path
}
}
partitionStats {
numMaterialized
numMaterializing
numPartitions
numFailed
}
}
fragment AssetNodeLiveFreshnessInfo on AssetFreshnessInfo {
currentMinutesLate
}
fragment AssetNodeLiveMaterialization on MaterializationEvent {
timestamp
runId
}
fragment AssetNodeLiveObservation on ObservationEvent {
timestamp
runId
}
fragment AssetCheckLiveFragment on AssetCheck {
name
canExecuteIndividually
executionForLatestMaterialization {
id
runId
status
timestamp
stepKey
evaluation {
severity
}
}
}
`;

export const ASSETS_GRAPH_LIVE_QUERY = gql`
query AssetGraphLiveQuery($assetKeys: [AssetKeyInput!]!) {
assetNodes(assetKeys: $assetKeys, loadMaterializations: true) {
id
...AssetNodeLiveFragment
}
assetsLatestInfo(assetKeys: $assetKeys) {
id
...AssetLatestInfoFragment
}
}
${ASSET_NODE_LIVE_FRAGMENT}
${ASSET_LATEST_INFO_FRAGMENT}
`;

// For tests
export function __resetForJest() {
Object.assign(factory, makeFactory());
}
Loading

1 comment on commit f6a0162

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-df63p3rrj-elementl.vercel.app

Built with commit f6a0162.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.