diff --git a/docs/content/dagster-plus/managing-deployments/controlling-logs.mdx b/docs/content/dagster-plus/managing-deployments/controlling-logs.mdx index d8d61bd95d2fe..9b1265552ed4a 100644 --- a/docs/content/dagster-plus/managing-deployments/controlling-logs.mdx +++ b/docs/content/dagster-plus/managing-deployments/controlling-logs.mdx @@ -16,7 +16,7 @@ Depending on your organization's needs, you may want to retain these logs in you ## Modifying compute log storage -Dagster's compute logs are handled by the configured [`ComputeLogManager`](/\_apidocs/internals#dagster.\_core.storage.compute_log_manager.ComputeLogManager). By default, Dagster+ utilizes the `CloudComputeLogManager` which stores logs in a Dagster+-managed Amazon S3 bucket, but you can customize this behavior to store logs in a destination of your choice. +Dagster's compute logs are handled by the configured [`CapturedLogManager`](/\_apidocs/internals#dagster.\_core.storage.captured_log_manager.CapturedLogManager). By default, Dagster+ utilizes the `CloudComputeLogManager` which stores logs in a Dagster+-managed Amazon S3 bucket, but you can customize this behavior to store logs in a destination of your choice. ### Writing to your own S3 bucket diff --git a/docs/sphinx/sections/api/apidocs/internals.rst b/docs/sphinx/sections/api/apidocs/internals.rst index 545b2504ac582..26075f361d6df 100644 --- a/docs/sphinx/sections/api/apidocs/internals.rst +++ b/docs/sphinx/sections/api/apidocs/internals.rst @@ -152,10 +152,6 @@ Compute log manager .. autoclass:: CapturedLogManager -.. currentmodule:: dagster._core.storage.compute_log_manager - -.. autoclass:: ComputeLogManager - .. currentmodule:: dagster._core.storage.local_compute_log_manager .. autoclass:: LocalComputeLogManager diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index ea61221e20c10..4f79439e02493 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1,18 +1,3 @@ -type ComputeLogFile { - path: String! - data: String - cursor: Int! - size: Int! - downloadUrl: String -} - -type ComputeLogs { - runId: String! - stepKey: String! - stdout: ComputeLogFile - stderr: ComputeLogFile -} - interface DisplayableEvent { label: String description: String @@ -1145,7 +1130,6 @@ interface PipelineRun { solidSelection: [String!] stats: RunStatsSnapshotOrError! stepStats: [RunStepStats!]! - computeLogs(stepKey: String!): ComputeLogs! capturedLogs(fileKey: String!): CapturedLogs! executionPlan: ExecutionPlan stepKeysToExecute: [String!] @@ -1398,7 +1382,6 @@ type Run implements PipelineRun { solidSelection: [String!] stats: RunStatsSnapshotOrError! stepStats: [RunStepStats!]! - computeLogs(stepKey: String!): ComputeLogs! capturedLogs(fileKey: String!): CapturedLogs! executionPlan: ExecutionPlan stepKeysToExecute: [String!] @@ -2397,7 +2380,6 @@ type Instance { executablePath: String! daemonHealth: DaemonHealth! hasInfo: Boolean! - hasCapturedLogManager: Boolean! autoMaterializePaused: Boolean! supportsConcurrencyLimits: Boolean! minConcurrencyLimitValue: Int! @@ -3781,12 +3763,6 @@ type DeleteDynamicPartitionsSuccess { type Subscription { pipelineRunLogs(runId: ID!, cursor: String): PipelineRunLogsSubscriptionPayload! - computeLogs(runId: ID!, stepKey: String!, ioType: ComputeIOType!, cursor: String): ComputeLogFile! capturedLogs(logKey: [String!]!, cursor: String): CapturedLogs! locationStateChangeEvents: LocationStateChangeSubscription! } - -enum ComputeIOType { - STDOUT - STDERR -} diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index f6d4f95a71eaf..4b5260176e65e 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -834,28 +834,6 @@ export type CompositeSolidDefinitionSolidHandlesArgs = { parentHandleID?: InputMaybe; }; -export enum ComputeIoType { - STDERR = 'STDERR', - STDOUT = 'STDOUT', -} - -export type ComputeLogFile = { - __typename: 'ComputeLogFile'; - cursor: Scalars['Int']['output']; - data: Maybe; - downloadUrl: Maybe; - path: Scalars['String']['output']; - size: Scalars['Int']['output']; -}; - -export type ComputeLogs = { - __typename: 'ComputeLogs'; - runId: Scalars['String']['output']; - stderr: Maybe; - stdout: Maybe; - stepKey: Scalars['String']['output']; -}; - export type ConcurrencyKeyInfo = { __typename: 'ConcurrencyKeyInfo'; activeRunIds: Array; @@ -1912,7 +1890,6 @@ export type Instance = { concurrencyLimits: Array; daemonHealth: DaemonHealth; executablePath: Scalars['String']['output']; - hasCapturedLogManager: Scalars['Boolean']['output']; hasInfo: Scalars['Boolean']['output']; id: Scalars['String']['output']; info: Maybe; @@ -3475,7 +3452,6 @@ export type PipelineRun = { assets: Array; canTerminate: Scalars['Boolean']['output']; capturedLogs: CapturedLogs; - computeLogs: ComputeLogs; eventConnection: EventConnection; executionPlan: Maybe; id: Scalars['ID']['output']; @@ -3502,10 +3478,6 @@ export type PipelineRunCapturedLogsArgs = { fileKey: Scalars['String']['input']; }; -export type PipelineRunComputeLogsArgs = { - stepKey: Scalars['String']['input']; -}; - export type PipelineRunEventConnectionArgs = { afterCursor?: InputMaybe; }; @@ -4390,7 +4362,6 @@ export type Run = PipelineRun & { assets: Array; canTerminate: Scalars['Boolean']['output']; capturedLogs: CapturedLogs; - computeLogs: ComputeLogs; creationTime: Scalars['Float']['output']; endTime: Maybe; eventConnection: EventConnection; @@ -4429,10 +4400,6 @@ export type RunCapturedLogsArgs = { fileKey: Scalars['String']['input']; }; -export type RunComputeLogsArgs = { - stepKey: Scalars['String']['input']; -}; - export type RunEventConnectionArgs = { afterCursor?: InputMaybe; }; @@ -5249,7 +5216,6 @@ export type StopSensorMutationResultOrError = export type Subscription = { __typename: 'Subscription'; capturedLogs: CapturedLogs; - computeLogs: ComputeLogFile; locationStateChangeEvents: LocationStateChangeSubscription; pipelineRunLogs: PipelineRunLogsSubscriptionPayload; }; @@ -5259,13 +5225,6 @@ export type SubscriptionCapturedLogsArgs = { logKey: Array; }; -export type SubscriptionComputeLogsArgs = { - cursor?: InputMaybe; - ioType: ComputeIoType; - runId: Scalars['ID']['input']; - stepKey: Scalars['String']['input']; -}; - export type SubscriptionPipelineRunLogsArgs = { cursor?: InputMaybe; runId: Scalars['ID']['input']; @@ -7137,48 +7096,6 @@ export const buildCompositeSolidDefinition = ( }; }; -export const buildComputeLogFile = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'ComputeLogFile'} & ComputeLogFile => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('ComputeLogFile'); - return { - __typename: 'ComputeLogFile', - cursor: overrides && overrides.hasOwnProperty('cursor') ? overrides.cursor! : 1566, - data: overrides && overrides.hasOwnProperty('data') ? overrides.data! : 'quia', - downloadUrl: - overrides && overrides.hasOwnProperty('downloadUrl') ? overrides.downloadUrl! : 'sed', - path: overrides && overrides.hasOwnProperty('path') ? overrides.path! : 'beatae', - size: overrides && overrides.hasOwnProperty('size') ? overrides.size! : 7860, - }; -}; - -export const buildComputeLogs = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'ComputeLogs'} & ComputeLogs => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('ComputeLogs'); - return { - __typename: 'ComputeLogs', - runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'est', - stderr: - overrides && overrides.hasOwnProperty('stderr') - ? overrides.stderr! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), - stdout: - overrides && overrides.hasOwnProperty('stdout') - ? overrides.stdout! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), - stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'cum', - }; -}; - export const buildConcurrencyKeyInfo = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -8808,10 +8725,6 @@ export const buildInstance = ( : buildDaemonHealth({}, relationshipsToOmit), executablePath: overrides && overrides.hasOwnProperty('executablePath') ? overrides.executablePath! : 'fuga', - hasCapturedLogManager: - overrides && overrides.hasOwnProperty('hasCapturedLogManager') - ? overrides.hasCapturedLogManager! - : true, hasInfo: overrides && overrides.hasOwnProperty('hasInfo') ? overrides.hasInfo! : true, id: overrides && overrides.hasOwnProperty('id') ? overrides.id! : 'deleniti', info: overrides && overrides.hasOwnProperty('info') ? overrides.info! : 'qui', @@ -11464,12 +11377,6 @@ export const buildPipelineRun = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogs') - ? ({} as ComputeLogs) - : buildComputeLogs({}, relationshipsToOmit), eventConnection: overrides && overrides.hasOwnProperty('eventConnection') ? overrides.eventConnection! @@ -12845,12 +12752,6 @@ export const buildRun = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogs') - ? ({} as ComputeLogs) - : buildComputeLogs({}, relationshipsToOmit), creationTime: overrides && overrides.hasOwnProperty('creationTime') ? overrides.creationTime! : 5.95, endTime: overrides && overrides.hasOwnProperty('endTime') ? overrides.endTime! : 7.08, @@ -14444,12 +14345,6 @@ export const buildSubscription = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), locationStateChangeEvents: overrides && overrides.hasOwnProperty('locationStateChangeEvents') ? overrides.locationStateChangeEvents! diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts b/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts deleted file mode 100644 index a5d9053401f85..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts +++ /dev/null @@ -1,10 +0,0 @@ -// Generated GraphQL types, do not edit manually. - -import * as Types from '../../graphql/types'; - -export type InstanceSupportsCapturedLogsQueryVariables = Types.Exact<{[key: string]: never}>; - -export type InstanceSupportsCapturedLogsQuery = { - __typename: 'Query'; - instance: {__typename: 'Instance'; id: string; hasCapturedLogManager: boolean}; -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx deleted file mode 100644 index 1d7e9804dd169..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx +++ /dev/null @@ -1,25 +0,0 @@ -import {gql, useQuery} from '@apollo/client'; - -import { - InstanceSupportsCapturedLogsQuery, - InstanceSupportsCapturedLogsQueryVariables, -} from './types/useSupportsCapturedLogs.types'; -import {useBlockTraceOnQueryResult} from '../performance/TraceContext'; - -export const useSupportsCapturedLogs = () => { - const queryResult = useQuery< - InstanceSupportsCapturedLogsQuery, - InstanceSupportsCapturedLogsQueryVariables - >(INSTANCE_SUPPORTS_CAPTURED_LOGS); - useBlockTraceOnQueryResult(queryResult, 'InstanceSupportsCapturedLogsQuery'); - return !!queryResult.data?.instance.hasCapturedLogManager; -}; - -const INSTANCE_SUPPORTS_CAPTURED_LOGS = gql` - query InstanceSupportsCapturedLogs { - instance { - id - hasCapturedLogManager - } - } -`; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx deleted file mode 100644 index c7c04539646a7..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx +++ /dev/null @@ -1,111 +0,0 @@ -import {Box, Spinner} from '@dagster-io/ui-components'; -import {memo, useContext, useEffect} from 'react'; - -import {RawLogContent} from './RawLogContent'; -import {useComputeLogs} from './useComputeLogs'; -import {AppContext} from '../app/AppContext'; - -interface ComputeLogPanelProps { - runId: string; - ioType: string; - setComputeLogUrl: (url: string | null) => void; -} - -interface ComputeLogPanelMaybeKeyProps extends ComputeLogPanelProps { - computeLogFileKey?: string; -} - -export const ComputeLogPanel = (props: ComputeLogPanelMaybeKeyProps) => { - const {runId, computeLogFileKey, ioType, setComputeLogUrl} = props; - - if (!computeLogFileKey) { - return ( - - - - ); - } - - return ( - - ); -}; - -interface ComputeLogPanelWithKeyProps extends ComputeLogPanelProps { - computeLogFileKey: string; -} - -const resolveDownloadUrl = (rootServerURI: string, downloadUrl: string | null) => { - if (!downloadUrl) { - return null; - } - const isRelativeUrl = (x?: string) => x && x.startsWith('/'); - return isRelativeUrl(downloadUrl) ? rootServerURI + downloadUrl : downloadUrl; -}; - -const ComputeLogsPanelWithKey = memo((props: ComputeLogPanelWithKeyProps) => { - const {runId, computeLogFileKey, ioType, setComputeLogUrl} = props; - const {rootServerURI} = useContext(AppContext); - - const {isLoading, stdout, stderr} = useComputeLogs(runId, computeLogFileKey); - const stdoutDownloadUrl = resolveDownloadUrl(rootServerURI, stdout?.downloadUrl || null); - const stderrDownloadUrl = resolveDownloadUrl(rootServerURI, stderr?.downloadUrl || null); - - return ( -
- - -
- ); -}); - -const ContentWrapper = ({ - isLoading, - isVisible, - content, - path, - downloadUrl, - setComputeLogUrl, -}: { - isVisible: boolean; - isLoading: boolean; - content: string | null; - path?: string; - downloadUrl: string | null; - setComputeLogUrl: (url: string | null) => void; -}) => { - useEffect(() => { - setComputeLogUrl(downloadUrl); - }, [setComputeLogUrl, downloadUrl]); - return ( - - ); -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx index 95b58bde6f340..d545e36e101e2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx @@ -14,7 +14,6 @@ import {memo} from 'react'; import styled from 'styled-components'; import {CapturedOrExternalLogPanel} from './CapturedLogPanel'; -import {ComputeLogPanel} from './ComputeLogPanel'; import {LogFilter, LogsProvider, LogsProviderLogs} from './LogsProvider'; import {LogsScrollingTable} from './LogsScrollingTable'; import {LogType, LogsToolbar} from './LogsToolbar'; @@ -38,7 +37,6 @@ import {RunStatus} from '../graphql/types'; import {useDocumentTitle} from '../hooks/useDocumentTitle'; import {useFavicon} from '../hooks/useFavicon'; import {useQueryPersistedState} from '../hooks/useQueryPersistedState'; -import {useSupportsCapturedLogs} from '../instance/useSupportsCapturedLogs'; import {CompletionType, useTraceDependency} from '../performance/TraceContext'; interface RunProps { @@ -210,7 +208,6 @@ const RunWithData = ({ : []; }, [runtimeGraph, selectionQuery]); - const supportsCapturedLogs = useSupportsCapturedLogs(); const {logCaptureInfo, computeLogFileKey, setComputeLogFileKey} = useComputeLogFileKeyForSelection({ stepKeys, @@ -369,20 +366,13 @@ const RunWithData = ({ {logType !== LogType.structured ? ( !computeLogFileKey ? ( - ) : supportsCapturedLogs ? ( + ) : ( - ) : ( - ) ) : ( ( @@ -112,7 +110,6 @@ export const StepLogsModalContent = ({ metadata: IRunMetadataDict; logs: LogsProviderLogs; }) => { - const supportsCapturedLogs = useSupportsCapturedLogs(); const [logType, setComputeLogType] = useState(LogType.structured); const [computeLogUrl, setComputeLogUrl] = React.useState(null); @@ -166,21 +163,12 @@ export const StepLogsModalContent = ({ {logType !== LogType.structured ? ( - supportsCapturedLogs ? ( - - ) : ( - - ) + ) : ( ; -}>; - -export type ComputeLogsSubscription = { - __typename: 'Subscription'; - computeLogs: { - __typename: 'ComputeLogFile'; - path: string; - cursor: number; - data: string | null; - downloadUrl: string | null; - }; -}; - -export type ComputeLogForSubscriptionFragment = { - __typename: 'ComputeLogFile'; - path: string; - cursor: number; - data: string | null; - downloadUrl: string | null; -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx deleted file mode 100644 index 3f3d205549112..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx +++ /dev/null @@ -1,117 +0,0 @@ -import {gql, useSubscription} from '@apollo/client'; -import {useReducer} from 'react'; - -import { - ComputeLogForSubscriptionFragment, - ComputeLogsSubscription, - ComputeLogsSubscriptionVariables, -} from './types/useComputeLogs.types'; -import {ComputeIoType} from '../graphql/types'; - -const MAX_STREAMING_LOG_BYTES = 5242880; // 5 MB - -const slice = (s: string) => - s.length < MAX_STREAMING_LOG_BYTES ? s : s.slice(-MAX_STREAMING_LOG_BYTES); - -const merge = ( - a: ComputeLogForSubscriptionFragment | null, - b: ComputeLogForSubscriptionFragment | null, -): ComputeLogForSubscriptionFragment | null => { - if (!b) { - return a; - } - let data = a?.data; - if (a?.data && b?.data) { - data = slice(a.data + b.data); - } else if (b?.data) { - data = slice(b.data); - } - return { - __typename: b.__typename, - path: b.path, - downloadUrl: b.downloadUrl, - data: typeof data === 'string' ? data : null, - cursor: b.cursor, - }; -}; - -interface State { - stepKey: string; - stdout: ComputeLogForSubscriptionFragment | null; - stderr: ComputeLogForSubscriptionFragment | null; - isLoading: boolean; -} - -type Action = - | {type: 'stdout'; stepKey: string; log: ComputeLogForSubscriptionFragment | null} - | {type: 'stderr'; stepKey: string; log: ComputeLogForSubscriptionFragment | null}; - -const reducer = (state: State, action: Action): State => { - switch (action.type) { - case 'stdout': - const stdout = - action.stepKey === state.stepKey ? merge(state.stdout, action.log) : action.log; - return {...state, isLoading: false, stdout}; - case 'stderr': - const stderr = - action.stepKey === state.stepKey ? merge(state.stderr, action.log) : action.log; - return {...state, isLoading: false, stderr}; - default: - return state; - } -}; - -const initialState: State = { - stepKey: '', - stdout: null, - stderr: null, - isLoading: true, -}; - -export const useComputeLogs = (runId: string, stepKey: string) => { - const [state, dispatch] = useReducer(reducer, initialState); - - useSubscription( - COMPUTE_LOGS_SUBSCRIPTION, - { - fetchPolicy: 'no-cache', - variables: {runId, stepKey, ioType: ComputeIoType.STDOUT, cursor: null}, - onSubscriptionData: ({subscriptionData}) => { - dispatch({type: 'stdout', stepKey, log: subscriptionData.data?.computeLogs || null}); - }, - }, - ); - - useSubscription( - COMPUTE_LOGS_SUBSCRIPTION, - { - fetchPolicy: 'no-cache', - variables: {runId, stepKey, ioType: ComputeIoType.STDERR, cursor: null}, - onSubscriptionData: ({subscriptionData}) => { - dispatch({type: 'stderr', stepKey, log: subscriptionData.data?.computeLogs || null}); - }, - }, - ); - - return state; -}; - -const COMPUTE_LOGS_SUBSCRIPTION = gql` - subscription ComputeLogsSubscription( - $runId: ID! - $stepKey: String! - $ioType: ComputeIOType! - $cursor: String - ) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType, cursor: $cursor) { - ...ComputeLogForSubscription - } - } - - fragment ComputeLogForSubscription on ComputeLogFile { - path - cursor - data - downloadUrl - } -`; diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 3e3cf20a3591d..2a2758612796c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -24,8 +24,7 @@ EngineEventData, ) from dagster._core.instance import DagsterInstance -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogFileData +from dagster._core.storage.captured_log_manager import CapturedLogData, CapturedLogManager from dagster._core.storage.dagster_run import CANCELABLE_RUN_STATUSES from dagster._core.workspace.permissions import Permissions from dagster._utils.error import serializable_error_info_from_exc_info @@ -47,10 +46,7 @@ ) if TYPE_CHECKING: - from dagster_graphql.schema.logs.compute_logs import ( - GrapheneCapturedLogs, - GrapheneComputeLogFile, - ) + from dagster_graphql.schema.logs.compute_logs import GrapheneCapturedLogs from dagster_graphql.schema.pipelines.subscription import ( GraphenePipelineRunLogsSubscriptionFailure, GraphenePipelineRunLogsSubscriptionSuccess, @@ -311,40 +307,6 @@ def _enqueue(event, cursor): instance.end_watch_event_logs(run_id, _enqueue) -async def gen_compute_logs( - graphene_info: "ResolveInfo", - run_id: str, - step_key: str, - io_type: ComputeIOType, - cursor: Optional[str] = None, -) -> AsyncIterator[Optional["GrapheneComputeLogFile"]]: - from ...schema.logs.compute_logs import from_compute_log_file - - check.str_param(run_id, "run_id") - check.str_param(step_key, "step_key") - check.inst_param(io_type, "io_type", ComputeIOType) - check.opt_str_param(cursor, "cursor") - instance = graphene_info.context.instance - - obs = instance.compute_log_manager.observable(run_id, step_key, io_type, cursor) - - loop = asyncio.get_event_loop() - queue: asyncio.Queue[ComputeLogFileData] = asyncio.Queue() - - def _enqueue(new_event): - loop.call_soon_threadsafe(queue.put_nowait, new_event) - - obs(_enqueue) - is_complete = False - try: - while not is_complete: - update = await queue.get() - yield from_compute_log_file(update) - is_complete = obs.is_complete - finally: - obs.dispose() - - async def gen_captured_log_data( graphene_info: "ResolveInfo", log_key: Sequence[str], cursor: Optional[str] = None ) -> AsyncIterator["GrapheneCapturedLogs"]: @@ -359,7 +321,7 @@ async def gen_captured_log_data( subscription = compute_log_manager.subscribe(log_key, cursor) loop = asyncio.get_event_loop() - queue: asyncio.Queue[ComputeLogFileData] = asyncio.Queue() + queue: asyncio.Queue[CapturedLogData] = asyncio.Queue() def _enqueue(new_event): loop.call_soon_threadsafe(queue.put_nowait, new_event) @@ -369,7 +331,7 @@ def _enqueue(new_event): try: while not is_complete: update = await queue.get() - yield from_captured_log_data(update) # type: ignore + yield from_captured_log_data(update) is_complete = subscription.is_complete finally: subscription.dispose() diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 6d3a6f77a7024..d180d36617fa6 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -18,8 +18,7 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external import ExternalPartitionSet -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py index a4bc12acccc18..6266fd45772b5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py @@ -6,7 +6,6 @@ import yaml from dagster._core.instance import DagsterInstance from dagster._core.launcher.base import RunLauncher -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.event_log.sql_event_log import SqlEventLogStorage from dagster._daemon.asset_daemon import get_auto_materialize_paused from dagster._daemon.types import DaemonStatus @@ -234,7 +233,6 @@ class GrapheneInstance(graphene.ObjectType): executablePath = graphene.NonNull(graphene.String) daemonHealth = graphene.NonNull(GrapheneDaemonHealth) hasInfo = graphene.NonNull(graphene.Boolean) - hasCapturedLogManager = graphene.NonNull(graphene.Boolean) autoMaterializePaused = graphene.NonNull(graphene.Boolean) supportsConcurrencyLimits = graphene.NonNull(graphene.Boolean) minConcurrencyLimitValue = graphene.NonNull(graphene.Int) @@ -294,9 +292,6 @@ def resolve_executablePath(self, _graphene_info: ResolveInfo): def resolve_daemonHealth(self, _graphene_info: ResolveInfo): return GrapheneDaemonHealth(instance=self._instance) - def resolve_hasCapturedLogManager(self, _graphene_info: ResolveInfo): - return isinstance(self._instance.compute_log_manager, CapturedLogManager) - def resolve_autoMaterializePaused(self, _graphene_info: ResolveInfo): return get_auto_materialize_paused(self._instance) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py index ff3517a23cd89..4f3548bce4c8d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py @@ -1,5 +1,4 @@ def types(): - from .compute_logs import GrapheneComputeLogFile, GrapheneComputeLogs from .events import ( GrapheneAssetMaterializationPlannedEvent, GrapheneDisplayableEvent, @@ -50,8 +49,6 @@ def types(): from .log_level import GrapheneLogLevel return [ - GrapheneComputeLogFile, - GrapheneComputeLogs, GrapheneDisplayableEvent, GrapheneEngineEvent, GrapheneExecutionStepFailureEvent, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py index abcef130835da..d5761862aaaee 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py @@ -1,64 +1,7 @@ -import dagster._check as check import graphene from dagster._core.storage.captured_log_manager import CapturedLogData -from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogFileData -from dagster_graphql.schema.util import ResolveInfo, non_null_list - - -class GrapheneComputeIOType(graphene.Enum): - STDOUT = "stdout" - STDERR = "stderr" - - class Meta: - name = "ComputeIOType" - - -class GrapheneComputeLogFile(graphene.ObjectType): - class Meta: - name = "ComputeLogFile" - - path = graphene.NonNull(graphene.String) - data = graphene.Field( - graphene.String, description="The data output captured from step computation at query time" - ) - cursor = graphene.NonNull(graphene.Int) - size = graphene.NonNull(graphene.Int) - download_url = graphene.Field(graphene.String) - - -def from_compute_log_file(file: ComputeLogFileData): - check.opt_inst_param(file, "file", ComputeLogFileData) - if not file: - return None - return GrapheneComputeLogFile( - path=file.path, - data=file.data, - cursor=file.cursor, - size=file.size, - download_url=file.download_url, - ) - - -class GrapheneComputeLogs(graphene.ObjectType): - runId = graphene.NonNull(graphene.String) - stepKey = graphene.NonNull(graphene.String) - stdout = graphene.Field(GrapheneComputeLogFile) - stderr = graphene.Field(GrapheneComputeLogFile) - - class Meta: - name = "ComputeLogs" - - def _resolve_compute_log(self, graphene_info: ResolveInfo, io_type): - return graphene_info.context.instance.compute_log_manager.read_logs_file( - self.runId, self.stepKey, io_type, 0 - ) - - def resolve_stdout(self, graphene_info: ResolveInfo): - return self._resolve_compute_log(graphene_info, ComputeIOType.STDOUT) - - def resolve_stderr(self, graphene_info: ResolveInfo): - return self._resolve_compute_log(graphene_info, ComputeIOType.STDERR) +from dagster_graphql.schema.util import non_null_list def from_captured_log_data(log_data: CapturedLogData): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index ae52079519bdf..4bc888b0559e8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -37,7 +37,7 @@ ) from ..errors import GrapheneDagsterTypeNotFoundError, GraphenePythonError, GrapheneRunNotFoundError from ..execution import GrapheneExecutionPlan -from ..logs.compute_logs import GrapheneCapturedLogs, GrapheneComputeLogs, from_captured_log_data +from ..logs.compute_logs import GrapheneCapturedLogs, from_captured_log_data from ..logs.events import ( GrapheneDagsterRunEvent, GrapheneMaterializationEvent, @@ -292,13 +292,6 @@ class GraphenePipelineRun(graphene.Interface): solidSelection = graphene.List(graphene.NonNull(graphene.String)) stats = graphene.NonNull(GrapheneRunStatsSnapshotOrError) stepStats = non_null_list(GrapheneRunStepStats) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogs), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - description=""" - Compute logs are the stdout/stderr logs for a given solid step computation - """, - ) capturedLogs = graphene.Field( graphene.NonNull(GrapheneCapturedLogs), fileKey=graphene.Argument(graphene.NonNull(graphene.String)), @@ -342,13 +335,6 @@ class GrapheneRun(graphene.ObjectType): resolvedOpSelection = graphene.List(graphene.NonNull(graphene.String)) stats = graphene.NonNull(GrapheneRunStatsSnapshotOrError) stepStats = non_null_list(GrapheneRunStepStats) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogs), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - description=""" - Compute logs are the stdout/stderr logs for a given solid step computation - """, - ) executionPlan = graphene.Field(GrapheneExecutionPlan) stepKeysToExecute = graphene.List(graphene.NonNull(graphene.String)) runConfigYaml = graphene.NonNull(graphene.String) @@ -468,9 +454,6 @@ def resolve_stats(self, graphene_info: ResolveInfo): def resolve_stepStats(self, graphene_info: ResolveInfo): return get_step_stats(graphene_info, self.run_id) - def resolve_computeLogs(self, _graphene_info: ResolveInfo, stepKey): - return GrapheneComputeLogs(runId=self.run_id, stepKey=stepKey) - def resolve_capturedLogs(self, graphene_info: ResolveInfo, fileKey): compute_log_manager = get_compute_log_manager(graphene_info) log_key = compute_log_manager.build_log_key_for_run(self.run_id, fileKey) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 33dc5d28f92a2..62d398938a7a5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -1143,9 +1143,6 @@ def resolve_capturedLogs( cursor: Optional[str] = None, limit: Optional[int] = None, ) -> GrapheneCapturedLogs: - # Type-ignore because `get_log_data` returns a `ComputeLogManager` but in practice this is - # always also an instance of `CapturedLogManager`, which defines `get_log_data`. Probably - # `ComputeLogManager` should subclass `CapturedLogManager`. log_data = get_compute_log_manager(graphene_info).get_log_data( logKey, cursor=cursor, max_bytes=limit ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py index b350b8ce74c36..144b1d70f2857 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py @@ -1,9 +1,8 @@ import graphene -from dagster._core.storage.compute_log_manager import ComputeIOType -from ...implementation.execution import gen_captured_log_data, gen_compute_logs, gen_events_for_run +from ...implementation.execution import gen_captured_log_data, gen_events_for_run from ..external import GrapheneLocationStateChangeSubscription, gen_location_state_changes -from ..logs.compute_logs import GrapheneCapturedLogs, GrapheneComputeIOType, GrapheneComputeLogFile +from ..logs.compute_logs import GrapheneCapturedLogs from ..pipelines.subscription import GraphenePipelineRunLogsSubscriptionPayload from ..util import ResolveInfo, non_null_list @@ -27,18 +26,6 @@ class Meta: description="Retrieve real-time event logs after applying a filter on run id and cursor.", ) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogFile), - runId=graphene.Argument(graphene.NonNull(graphene.ID)), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - ioType=graphene.Argument(graphene.NonNull(GrapheneComputeIOType)), - cursor=graphene.Argument(graphene.String), - description=( - "Retrieve real-time compute logs after applying a filter on run id, step name, log" - " type, and cursor." - ), - ) - capturedLogs = graphene.Field( graphene.NonNull(GrapheneCapturedLogs), logKey=graphene.Argument(non_null_list(graphene.String)), @@ -56,11 +43,6 @@ class Meta: def subscribe_pipelineRunLogs(self, graphene_info: ResolveInfo, runId, cursor=None): return gen_events_for_run(graphene_info, runId, cursor) - def subscribe_computeLogs( - self, graphene_info: ResolveInfo, runId, stepKey, ioType, cursor=None - ): - return gen_compute_logs(graphene_info, runId, stepKey, ComputeIOType(ioType.value), cursor) - def subscribe_capturedLogs(self, graphene_info: ResolveInfo, logKey, cursor=None): return gen_captured_log_data(graphene_info, logKey, cursor) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/util.py b/python_modules/dagster-graphql/dagster_graphql/schema/util.py index 73b3926f16a38..1a34c7b27a558 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/util.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/util.py @@ -15,9 +15,5 @@ def non_null_list(of_type): return graphene.NonNull(graphene.List(graphene.NonNull(of_type))) -# Type-ignore because `get_log_data` returns a `ComputeLogManager` but in practice this is -# always also an instance of `CapturedLogManager`, which defines the APIs that we access in -# dagster-graphql. Probably `ComputeLogManager` should subclass `CapturedLogManager`-- this is a -# temporary workaround to satisfy type-checking. def get_compute_log_manager(graphene_info: ResolveInfo) -> CapturedLogManager: - return cast(CapturedLogManager, graphene_info.context.instance.compute_log_manager) + return graphene_info.context.instance.compute_log_manager diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py deleted file mode 100644 index 3cc89432b912c..0000000000000 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py +++ /dev/null @@ -1,80 +0,0 @@ -from dagster._core.events import DagsterEventType -from dagster_graphql.test.utils import ( - execute_dagster_graphql, - execute_dagster_graphql_subscription, - infer_job_selector, -) - -from .graphql_context_test_suite import ExecutingGraphQLContextTestMatrix -from .utils import sync_execute_get_run_log_data - -COMPUTE_LOGS_QUERY = """ - query ComputeLogsQuery($runId: ID!, $stepKey: String!) { - pipelineRunOrError(runId: $runId) { - ... on PipelineRun { - runId - computeLogs(stepKey: $stepKey) { - stdout { - data - } - } - } - } - } -""" -COMPUTE_LOGS_SUBSCRIPTION = """ - subscription ComputeLogsSubscription($runId: ID!, $stepKey: String!, $ioType: ComputeIOType!, $cursor: String!) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType, cursor: $cursor) { - data - } - } -""" - - -class TestComputeLogs(ExecutingGraphQLContextTestMatrix): - def test_get_compute_logs_over_graphql(self, graphql_context, snapshot): - selector = infer_job_selector(graphql_context, "spew_job") - payload = sync_execute_get_run_log_data( - context=graphql_context, - variables={"executionParams": {"selector": selector, "mode": "default"}}, - ) - run_id = payload["run"]["runId"] - logs = graphql_context.instance.all_logs(run_id, of_type=DagsterEventType.LOGS_CAPTURED) - assert len(logs) == 1 - entry = logs[0] - file_key = entry.dagster_event.logs_captured_data.file_key - result = execute_dagster_graphql( - graphql_context, - COMPUTE_LOGS_QUERY, - variables={"runId": run_id, "stepKey": file_key}, - ) - compute_logs = result.data["pipelineRunOrError"]["computeLogs"] - snapshot.assert_match(compute_logs) - - def test_compute_logs_subscription_graphql(self, graphql_context, snapshot): - selector = infer_job_selector(graphql_context, "spew_job") - payload = sync_execute_get_run_log_data( - context=graphql_context, - variables={"executionParams": {"selector": selector, "mode": "default"}}, - ) - run_id = payload["run"]["runId"] - logs = graphql_context.instance.all_logs(run_id, of_type=DagsterEventType.LOGS_CAPTURED) - assert len(logs) == 1 - entry = logs[0] - file_key = entry.dagster_event.logs_captured_data.file_key - - results = execute_dagster_graphql_subscription( - graphql_context, - COMPUTE_LOGS_SUBSCRIPTION, - variables={ - "runId": run_id, - "stepKey": file_key, - "ioType": "STDOUT", - "cursor": "0", - }, - ) - - assert len(results) == 1 - result = results[0] - assert result.data["computeLogs"]["data"] == "HELLO WORLD\n" - snapshot.assert_match([result.data]) diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 470e8b1d982c5..876bdc226a364 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -9,8 +9,8 @@ from dagster import __version__ as dagster_version from dagster._annotations import deprecated from dagster._core.debug import DebugRunPayload +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager from dagster._core.storage.runs.sql_run_storage import SqlRunStorage from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext @@ -154,30 +154,6 @@ async def download_notebook(self, request: Request): (body, resources) = html_exporter.from_notebook_node(notebook) return HTMLResponse("" + body) - async def download_compute_logs_endpoint(self, request: Request): - run_id = request.path_params["run_id"] - step_key = request.path_params["step_key"] - file_type = request.path_params["file_type"] - context = self.make_request_context(request) - - file = context.instance.compute_log_manager.get_local_path( - run_id, - step_key, - ComputeIOType(file_type), - ) - - if not path.exists(file): - raise HTTPException(404, detail="No log files available for download") - - return FileResponse( - context.instance.compute_log_manager.get_local_path( - run_id, - step_key, - ComputeIOType(file_type), - ), - filename=f"{run_id}_{step_key}.{file_type}", - ) - async def download_captured_logs_endpoint(self, request: Request): [*log_key, file_extension] = request.path_params["path"].split("/") context = self.make_request_context(request) @@ -325,10 +301,6 @@ def build_routes(self): + self.build_static_routes() + [ # download file endpoints - Route( - "/download/{run_id:str}/{step_key:str}/{file_type:str}", - self.download_compute_logs_endpoint, - ), Route( "/logs/{path:path}", self.download_captured_logs_endpoint, diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py b/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py index 720803a79683c..f2199b80c3d53 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py @@ -23,16 +23,14 @@ } """ -COMPUTE_LOG_SUBSCRIPTION = """ - subscription ComputeLogsSubscription( - $runId: ID! - $stepKey: String! - $ioType: ComputeIOType! - ) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType) { - __typename - } +CAPTURED_LOGS_SUBSCRIPTION = """ + subscription CapturedLogsSubscription($logKey: [String!]!) { + capturedLogs(logKey: $logKey) { + stdout + stderr + cursor } + } """ @@ -158,28 +156,26 @@ def test_event_log_subscription_chunked(asgi_client, run_id): @mock.patch( - "dagster._core.storage.local_compute_log_manager.LocalComputeLogManager.is_watch_completed" + "dagster._core.storage.local_compute_log_manager.LocalComputeLogManager.is_capture_complete" ) -def test_compute_log_subscription(mock_watch_completed, asgi_client, run_id): - mock_watch_completed.return_value = False +def test_captured_log_subscription(mock_capture_completed, asgi_client, run_id): + mock_capture_completed.return_value = False with asgi_client.websocket_connect("/graphql", GraphQLWS.PROTOCOL) as ws: start_connection(ws) start_subscription( ws, - COMPUTE_LOG_SUBSCRIPTION, + CAPTURED_LOGS_SUBSCRIPTION, { - "runId": run_id, - "stepKey": "example_op", - "ioType": "STDERR", + "logKey": [run_id, "compute_logs", "example_op"], }, ) rx = ws.receive_json() assert rx["type"] != GraphQLWS.ERROR, rx gc.collect() - assert len(objgraph.by_type("ComputeLogSubscription")) == 1 + assert len(objgraph.by_type("CapturedLogSubscription")) == 1 end_subscription(ws) gc.collect() - assert len(objgraph.by_type("ComputeLogSubscription")) == 0 + assert len(objgraph.by_type("CapturedLogSubscription")) == 0 diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py index 97b716b1d0db5..53f73b140cc9b 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_app.py @@ -272,11 +272,11 @@ def test_download_compute(instance, test_client: TestClient): logs = instance.all_logs(run_id, of_type=DagsterEventType.LOGS_CAPTURED) entry = logs[0] file_key = entry.dagster_event.logs_captured_data.file_key - response = test_client.get(f"/download/{run_id}/{file_key}/stdout") + response = test_client.get(f"/logs/{run_id}/compute_logs/{file_key}/out") assert response.status_code == 200 assert "STDOUT RULEZ" in str(response.content) - response = test_client.get(f"/download/{run_id}/jonx/stdout") + response = test_client.get(f"/logs/{run_id}/compute_logs/jonx/stdout") assert response.status_code == 404 diff --git a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py index 7ce539e6f7df4..6b4996aeb7c21 100644 --- a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py +++ b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py @@ -8,8 +8,7 @@ from dagster import _seven from dagster._core.instance import DagsterInstance from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType from dagster._core.utils import coerce_valid_log_level from dagster._utils.log import create_console_logger diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py b/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py index b7e08e4de6521..d6ae3d168d91f 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py @@ -25,7 +25,6 @@ step_failure_event_from_exc_info, ) from dagster._core.execution.plan.plan import ExecutionPlan -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info @@ -43,17 +42,14 @@ def inner_plan_execution_iterator( instance_concurrency_context=instance_concurrency_context, ) as active_execution: with ExitStack() as capture_stack: - # begin capturing logs for the whole process if this is a captured log manager - if isinstance(compute_log_manager, CapturedLogManager): - file_key = create_compute_log_file_key() - log_key = compute_log_manager.build_log_key_for_run(job_context.run_id, file_key) - try: - log_context = capture_stack.enter_context( - compute_log_manager.capture_logs(log_key) - ) - yield DagsterEvent.capture_logs(job_context, step_keys, log_key, log_context) - except Exception: - yield from _handle_compute_log_setup_error(job_context, sys.exc_info()) + # begin capturing logs for the whole process + file_key = create_compute_log_file_key() + log_key = compute_log_manager.build_log_key_for_run(job_context.run_id, file_key) + try: + log_context = capture_stack.enter_context(compute_log_manager.capture_logs(log_key)) + yield DagsterEvent.capture_logs(job_context, step_keys, log_key, log_context) + except Exception: + yield from _handle_compute_log_setup_error(job_context, sys.exc_info()) # It would be good to implement a reference tracking algorithm here to # garbage collect results that are no longer needed by any steps @@ -86,47 +82,14 @@ def inner_plan_execution_iterator( ), ) - with ExitStack() as step_stack: - if not isinstance(compute_log_manager, CapturedLogManager): - # capture all of the logs for individual steps - try: - step_stack.enter_context( - job_context.instance.compute_log_manager.watch( - step_context.dagster_run, step_context.step.key - ) - ) - yield DagsterEvent.legacy_compute_log_step_event(step_context) - except Exception: - yield from _handle_compute_log_setup_error(step_context, sys.exc_info()) - - for step_event in check.generator( - dagster_event_sequence_for_step(step_context) - ): - dagster_event = check.inst(step_event, DagsterEvent) - step_event_list.append(dagster_event) - yield dagster_event - active_execution.handle_event(dagster_event) - - active_execution.verify_complete(job_context, step.key) - - try: - step_stack.close() - except Exception: - yield from _handle_compute_log_teardown_error( - step_context, sys.exc_info() - ) - else: - # we have already set up the log capture at the process level, just handle the - # step events - for step_event in check.generator( - dagster_event_sequence_for_step(step_context) - ): - dagster_event = check.inst(step_event, DagsterEvent) - step_event_list.append(dagster_event) - yield dagster_event - active_execution.handle_event(dagster_event) - - active_execution.verify_complete(job_context, step.key) + # we have already set up the log capture at the process level, just handle the step events + for step_event in check.generator(dagster_event_sequence_for_step(step_context)): + dagster_event = check.inst(step_event, DagsterEvent) + step_event_list.append(dagster_event) + yield dagster_event + active_execution.handle_event(dagster_event) + + active_execution.verify_complete(job_context, step.key) # process skips from failures or uncovered inputs for event in active_execution.plan_events_iterator(job_context): diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 126e07e526f4b..273fc29c01336 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -74,7 +74,7 @@ from dagster._utils import PrintFn, is_uuid, traced from dagster._utils.error import serializable_error_info_from_exc_info from dagster._utils.merger import merge_dicts -from dagster._utils.warnings import deprecation_warning, experimental_warning +from dagster._utils.warnings import experimental_warning from .config import ( DAGSTER_CONFIG_YAML_FILENAME, @@ -156,7 +156,7 @@ AssetCheckExecutionRecord, AssetCheckInstanceSupport, ) - from dagster._core.storage.compute_log_manager import ComputeLogManager + from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.daemon_cursor import DaemonCursorStorage from dagster._core.storage.event_log import EventLogStorage from dagster._core.storage.event_log.base import ( @@ -366,7 +366,7 @@ class DagsterInstance(DynamicPartitionsStore): pipeline runs. By default, this will be a :py:class:`dagster._core.storage.event_log.SqliteEventLogStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. - compute_log_manager (Optional[ComputeLogManager]): The compute log manager handles stdout + compute_log_manager (Optional[CapturedLogManager]): The compute log manager handles stdout and stderr logging for op compute functions. By default, this will be a :py:class:`dagster._core.storage.local_compute_log_manager.LocalComputeLogManager`. Configurable in ``dagster.yaml`` using the @@ -396,7 +396,7 @@ def __init__( run_storage: "RunStorage", event_storage: "EventLogStorage", run_coordinator: Optional["RunCoordinator"], - compute_log_manager: Optional["ComputeLogManager"], + compute_log_manager: Optional["CapturedLogManager"], run_launcher: Optional["RunLauncher"], scheduler: Optional["Scheduler"] = None, schedule_storage: Optional["ScheduleStorage"] = None, @@ -410,7 +410,6 @@ def __init__( from dagster._core.scheduler import Scheduler from dagster._core.secrets import SecretsLoader from dagster._core.storage.captured_log_manager import CapturedLogManager - from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.storage.event_log import EventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import RunStorage @@ -428,14 +427,8 @@ def __init__( if compute_log_manager: self._compute_log_manager = check.inst_param( - compute_log_manager, "compute_log_manager", ComputeLogManager + compute_log_manager, "compute_log_manager", CapturedLogManager ) - if not isinstance(self._compute_log_manager, CapturedLogManager): - deprecation_warning( - "ComputeLogManager", - "1.2.0", - "Implement the CapturedLogManager interface instead.", - ) self._compute_log_manager.register_instance(self) else: check.invariant( @@ -811,7 +804,7 @@ def run_launcher(self) -> "RunLauncher": # compute logs @property - def compute_log_manager(self) -> "ComputeLogManager": + def compute_log_manager(self) -> "CapturedLogManager": if not self._compute_log_manager: check.invariant( self._ref, "Compute log manager not provided, and no instance ref available" @@ -820,7 +813,7 @@ def compute_log_manager(self) -> "ComputeLogManager": check.invariant( compute_log_manager, "Compute log manager not configured in instance ref" ) - self._compute_log_manager = cast("ComputeLogManager", compute_log_manager) + self._compute_log_manager = cast("CapturedLogManager", compute_log_manager) self._compute_log_manager.register_instance(self) return self._compute_log_manager diff --git a/python_modules/dagster/dagster/_core/instance/ref.py b/python_modules/dagster/dagster/_core/instance/ref.py index 5ab4ef080dd79..a0b61166f7dca 100644 --- a/python_modules/dagster/dagster/_core/instance/ref.py +++ b/python_modules/dagster/dagster/_core/instance/ref.py @@ -15,7 +15,7 @@ from dagster._core.scheduler.scheduler import Scheduler from dagster._core.secrets.loader import SecretsLoader from dagster._core.storage.base_storage import DagsterStorage - from dagster._core.storage.compute_log_manager import ComputeLogManager + from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.event_log.base import EventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs.base import RunStorage @@ -527,10 +527,10 @@ def schedule_storage(self) -> Optional["ScheduleStorage"]: ) @property - def compute_log_manager(self) -> "ComputeLogManager": - from dagster._core.storage.compute_log_manager import ComputeLogManager + def compute_log_manager(self) -> "CapturedLogManager": + from dagster._core.storage.captured_log_manager import CapturedLogManager - return self.compute_logs_data.rehydrate(as_type=ComputeLogManager) + return self.compute_logs_data.rehydrate(as_type=CapturedLogManager) @property def scheduler(self) -> Optional["Scheduler"]: diff --git a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py b/python_modules/dagster/dagster/_core/storage/captured_log_manager.py index 4ca25c245d12a..919de602c844d 100644 --- a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/captured_log_manager.py @@ -1,17 +1,23 @@ import os from abc import ABC, abstractmethod from contextlib import contextmanager +from enum import Enum from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple from typing_extensions import Final, Self import dagster._check as check from dagster._core.captured_log_api import LogLineCursor -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB +class ComputeIOType(Enum): + STDOUT = "stdout" + STDERR = "stderr" + + class CapturedLogContext( NamedTuple( "_CapturedLogContext", @@ -156,7 +162,7 @@ def _has_max_data(chunk: Optional[bytes]) -> bool: return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ # type: ignore -class CapturedLogManager(ABC): +class CapturedLogManager(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract base class for capturing the unstructured logs (stdout/stderr) in the current process, stored / retrieved with a provided log_key. """ @@ -248,7 +254,6 @@ def subscribe( ComputeLogSubscription """ - @abstractmethod def unsubscribe(self, subscription: CapturedLogSubscription) -> None: """Deregisters an observable object from receiving log updates. @@ -256,6 +261,10 @@ def unsubscribe(self, subscription: CapturedLogSubscription) -> None: subscription (CapturedLogSubscription): subscription object which manages when to send back data to the subscriber """ + pass + + def dispose(self): + pass def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]: """Legacy adapter to translate run_id/key to captured log manager-based log_key.""" diff --git a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py index 295e28f77c24f..9bfe10a9a98ec 100644 --- a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py @@ -5,25 +5,15 @@ from abc import abstractmethod from collections import defaultdict from contextlib import contextmanager -from typing import IO, Iterator, Optional, Sequence, Union +from typing import IO, Iterator, Optional, Sequence -from typing_extensions import TypeAlias - -from dagster import _check as check -from dagster._core.instance import T_DagsterInstance from dagster._core.storage.captured_log_manager import ( CapturedLogContext, CapturedLogData, CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, -) -from dagster._core.storage.compute_log_manager import ( - MAX_BYTES_FILE_READ, ComputeIOType, - ComputeLogFileData, - ComputeLogManager, - ComputeLogSubscription, ) from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, @@ -32,10 +22,8 @@ SUBSCRIPTION_POLLING_INTERVAL = 5 -LogSubscription: TypeAlias = Union[CapturedLogSubscription, ComputeLogSubscription] - -class CloudStorageComputeLogManager(CapturedLogManager, ComputeLogManager[T_DagsterInstance]): +class CloudStorageComputeLogManager(CapturedLogManager): """Abstract class that uses the local compute log manager to capture logs and stores them in remote cloud storage. """ @@ -168,12 +156,7 @@ def on_progress(self, log_key): def subscribe( self, log_key: Sequence[str], cursor: Optional[str] = None ) -> CapturedLogSubscription: - subscription = CapturedLogSubscription(self, log_key, cursor) - self.on_subscribe(subscription) # type: ignore - return subscription - - def unsubscribe(self, subscription): - self.on_unsubscribe(subscription) + return CapturedLogSubscription(self, log_key, cursor) def has_local_file(self, log_key: Sequence[str], io_type: ComputeIOType): local_path = self.local_manager.get_captured_local_path(log_key, IO_TYPE_EXTENSION[io_type]) @@ -201,91 +184,9 @@ def _poll_for_local_upload(self, log_key: Sequence[str]) -> Iterator[None]: yield thread_exit.set() - ############################################### - # - # Methods for the ComputeLogManager interface - # - ############################################### - @contextmanager - def _watch_logs(self, dagster_run, step_key=None): - # proxy watching to the local compute log manager, interacting with the filesystem - log_key = self.local_manager.build_log_key_for_run( - dagster_run.run_id, step_key or dagster_run.job_name - ) - with self.local_manager.capture_logs(log_key): - yield - self.upload_to_cloud_storage(log_key, ComputeIOType.STDOUT) - self.upload_to_cloud_storage(log_key, ComputeIOType.STDERR) - - def get_local_path(self, run_id, key, io_type): - return self.local_manager.get_local_path(run_id, key, io_type) - - def on_watch_start(self, dagster_run, step_key): - self.local_manager.on_watch_start(dagster_run, step_key) - - def on_watch_finish(self, dagster_run, step_key): - self.local_manager.on_watch_finish(dagster_run, step_key) - - def is_watch_completed(self, run_id, key): - return self.local_manager.is_watch_completed(run_id, key) or self.cloud_storage_has_logs( - self.local_manager.build_log_key_for_run(run_id, key), ComputeIOType.STDERR - ) - - def download_url(self, run_id, key, io_type): - if not self.is_watch_completed(run_id, key): - return None - - log_key = self.local_manager.build_log_key_for_run(run_id, key) - return self.download_url_for_type(log_key, io_type) - - def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): - log_key = self.local_manager.build_log_key_for_run(run_id, key) - - if self.has_local_file(log_key, io_type): - data = self.local_manager.read_logs_file(run_id, key, io_type, cursor, max_bytes) - return self._from_local_file_data(run_id, key, io_type, data) - elif self.cloud_storage_has_logs(log_key, io_type): - self.download_from_cloud_storage(log_key, io_type) - data = self.local_manager.read_logs_file(run_id, key, io_type, cursor, max_bytes) - return self._from_local_file_data(run_id, key, io_type, data) - elif self.cloud_storage_has_logs(log_key, io_type, partial=True): - self.download_from_cloud_storage(log_key, io_type, partial=True) - partial_path = self.local_manager.get_captured_local_path( - log_key, IO_TYPE_EXTENSION[io_type], partial=True - ) - captured_data, new_cursor = self.local_manager.read_path( - partial_path, offset=cursor or 0 - ) - return ComputeLogFileData( - path=partial_path, - data=captured_data.decode("utf-8") if captured_data else None, - cursor=new_cursor or 0, - size=len(captured_data) if captured_data else 0, - download_url=None, - ) - local_path = self.local_manager.get_captured_local_path(log_key, IO_TYPE_EXTENSION[io_type]) - return ComputeLogFileData(path=local_path, data=None, cursor=0, size=0, download_url=None) - - def on_subscribe(self, subscription): - pass - - def on_unsubscribe(self, subscription): - pass - def dispose(self): self.local_manager.dispose() - def _from_local_file_data(self, run_id, key, io_type, local_file_data): - log_key = self.local_manager.build_log_key_for_run(run_id, key) - - return ComputeLogFileData( - self.display_path_for_type(log_key, io_type), - local_file_data.data, - local_file_data.cursor, - local_file_data.size, - self.download_url_for_type(log_key, io_type), - ) - class PollingComputeLogSubscriptionManager: def __init__(self, manager): @@ -294,13 +195,7 @@ def __init__(self, manager): self._shutdown_event = None self._polling_thread = None - def _log_key(self, subscription: LogSubscription) -> Sequence[str]: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) - - if isinstance(subscription, ComputeLogSubscription): - return self._manager.build_log_key_for_run(subscription.run_id, subscription.key) + def _log_key(self, subscription: CapturedLogSubscription) -> Sequence[str]: return subscription.log_key def _watch_key(self, log_key: Sequence[str]) -> str: @@ -328,11 +223,7 @@ def _stop_polling_thread(self) -> None: self._polling_thread = None self._shutdown_event = None - def add_subscription(self, subscription: LogSubscription) -> None: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) - + def add_subscription(self, subscription: CapturedLogSubscription) -> None: if not self._polling_thread: self._start_polling_thread() @@ -344,19 +235,10 @@ def add_subscription(self, subscription: LogSubscription) -> None: watch_key = self._watch_key(log_key) self._subscriptions[watch_key].append(subscription) - def is_complete(self, subscription: LogSubscription) -> bool: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) - - if isinstance(subscription, ComputeLogSubscription): - return self._manager.is_watch_completed(subscription.run_id, subscription.key) + def is_complete(self, subscription: CapturedLogSubscription) -> bool: return self._manager.is_capture_complete(subscription.log_key) - def remove_subscription(self, subscription: LogSubscription) -> None: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) + def remove_subscription(self, subscription: CapturedLogSubscription) -> None: log_key = self._log_key(subscription) watch_key = self._watch_key(log_key) diff --git a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/compute_log_manager.py deleted file mode 100644 index 781ec803e3fa7..0000000000000 --- a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py +++ /dev/null @@ -1,278 +0,0 @@ -from abc import ABC, abstractmethod -from contextlib import contextmanager -from enum import Enum -from typing import Callable, Iterator, NamedTuple, Optional - -from typing_extensions import Self - -import dagster._check as check -from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance -from dagster._core.storage.dagster_run import DagsterRun - -MAX_BYTES_FILE_READ = 33554432 # 32 MB -MAX_BYTES_CHUNK_READ = 4194304 # 4 MB - - -class ComputeIOType(Enum): - STDOUT = "stdout" - STDERR = "stderr" - - -class ComputeLogFileData( - NamedTuple( - "ComputeLogFileData", - [ - ("path", str), - ("data", Optional[str]), - ("cursor", int), - ("size", int), - ("download_url", Optional[str]), - ], - ) -): - """Representation of a chunk of compute execution log data.""" - - def __new__( - cls, path: str, data: Optional[str], cursor: int, size: int, download_url: Optional[str] - ): - return super(ComputeLogFileData, cls).__new__( - cls, - path=check.str_param(path, "path"), - data=check.opt_str_param(data, "data"), - cursor=check.int_param(cursor, "cursor"), - size=check.int_param(size, "size"), - download_url=check.opt_str_param(download_url, "download_url"), - ) - - -class ComputeLogManager(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): - """Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute - steps of pipeline solids. - """ - - @contextmanager - def watch(self, dagster_run: DagsterRun, step_key: Optional[str] = None) -> Iterator[None]: - """Watch the stdout/stderr for a given execution for a given run_id / step_key and persist it. - - Args: - dagster_run (DagsterRun): The run config - step_key (Optional[String]): The step_key for a compute step - """ - check.inst_param(dagster_run, "dagster_run", DagsterRun) - check.opt_str_param(step_key, "step_key") - - if not self.enabled(dagster_run, step_key): - yield - return - - self.on_watch_start(dagster_run, step_key) - with self._watch_logs(dagster_run, step_key): - yield - self.on_watch_finish(dagster_run, step_key) - - @contextmanager - @abstractmethod - def _watch_logs( - self, dagster_run: DagsterRun, step_key: Optional[str] = None - ) -> Iterator[None]: - """Method to watch the stdout/stderr logs for a given run_id / step_key. Kept separate from - blessed `watch` method, which triggers all the start/finish hooks that are necessary to - implement the different remote implementations. - - Args: - dagster_run (DagsterRun): The run config - step_key (Optional[String]): The step_key for a compute step - """ - - @abstractmethod - def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType) -> str: - """Get the local path of the logfile for a given execution step. This determines the - location on the local filesystem to which stdout/stderr will be rerouted. - - Args: - run_id (str): The id of the pipeline run. - key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) - io_type (ComputeIOType): Flag indicating the I/O type, either ComputeIOType.STDOUT or - ComputeIOType.STDERR - - Returns: - str - """ - ... - - @abstractmethod - def is_watch_completed(self, run_id: str, key: str) -> bool: - """Flag indicating when computation for a given execution step has completed. - - Args: - run_id (str): The id of the pipeline run. - key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) - - Returns: - Boolean - """ - - @abstractmethod - def on_watch_start(self, dagster_run: DagsterRun, step_key: Optional[str]) -> None: - """Hook called when starting to watch compute logs. - - Args: - pipeline_run (PipelineRun): The pipeline run config - step_key (Optional[String]): The step_key for a compute step - """ - - @abstractmethod - def on_watch_finish(self, dagster_run: DagsterRun, step_key: Optional[str]) -> None: - """Hook called when computation for a given execution step is finished. - - Args: - pipeline_run (PipelineRun): The pipeline run config - step_key (Optional[String]): The step_key for a compute step - """ - - @abstractmethod - def download_url(self, run_id: str, key: str, io_type: ComputeIOType) -> str: - """Get a URL where the logs can be downloaded. - - Args: - run_id (str): The id of the pipeline run. - key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) - io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr - - Returns: - String - """ - - @abstractmethod - def read_logs_file( - self, - run_id: str, - key: str, - io_type: ComputeIOType, - cursor: int = 0, - max_bytes: int = MAX_BYTES_FILE_READ, - ) -> ComputeLogFileData: - """Get compute log data for a given compute step. - - Args: - run_id (str): The id of the pipeline run. - key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) - io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr - cursor (Optional[Int]): Starting cursor (byte) of log file - max_bytes (Optional[Int]): Maximum number of bytes to be read and returned - - Returns: - ComputeLogFileData - """ - - def enabled(self, _dagster_run: DagsterRun, _step_key: Optional[str]) -> bool: - """Hook for disabling compute log capture. - - Args: - _step_key (Optional[String]): The step_key for a compute step - - Returns: - Boolean - """ - return True - - @abstractmethod - def on_subscribe(self, subscription: "ComputeLogSubscription") -> None: - """Hook for managing streaming subscriptions for log data from `dagster-webserver`. - - Args: - subscription (ComputeLogSubscription): subscription object which manages when to send - back data to the subscriber - """ - - def on_unsubscribe(self, subscription: "ComputeLogSubscription") -> None: - pass - - def observable( - self, run_id: str, key: str, io_type: ComputeIOType, cursor: Optional[str] = None - ) -> "ComputeLogSubscription": - """Return a ComputeLogSubscription which streams back log data from the execution logs for a given - compute step. - - Args: - run_id (str): The id of the pipeline run. - key (str): The unique descriptor of the execution step (e.g. `solid_invocation.compute`) - io_type (ComputeIOType): Flag indicating the I/O type, either stdout or stderr - cursor (Optional[Int]): Starting cursor (byte) of log file - - Returns: - Observable - """ - check.str_param(run_id, "run_id") - check.str_param(key, "key") - check.inst_param(io_type, "io_type", ComputeIOType) - check.opt_str_param(cursor, "cursor") - - if cursor: - cursor = int(cursor) # type: ignore # (var reassigned diff type) - else: - cursor = 0 # type: ignore # (var reassigned diff type) - - subscription = ComputeLogSubscription(self, run_id, key, io_type, cursor) # type: ignore # (var reassigned diff type) - self.on_subscribe(subscription) - return subscription - - def dispose(self): - pass - - -class ComputeLogSubscription: - """Observable object that generates ComputeLogFileData objects as compute step execution logs - are written. - """ - - def __init__( - self, - manager: ComputeLogManager, - run_id: str, - key: str, - io_type: ComputeIOType, - cursor: int, - ): - self.manager = manager - self.run_id = run_id - self.key = key - self.io_type = io_type - self.cursor = cursor - self.observer: Optional[Callable[[ComputeLogFileData], None]] = None - self.is_complete = False - - def __call__(self, observer: Callable[[ComputeLogFileData], None]) -> Self: - self.observer = observer - self.fetch() - if self.manager.is_watch_completed(self.run_id, self.key): - self.complete() - return self - - def dispose(self) -> None: - # called when the connection gets closed, allowing the observer to get GC'ed - self.observer = None - self.manager.on_unsubscribe(self) - - def fetch(self) -> None: - if not self.observer: - return - - should_fetch = True - while should_fetch: - update = self.manager.read_logs_file( - self.run_id, - self.key, - self.io_type, - self.cursor, - max_bytes=MAX_BYTES_CHUNK_READ, - ) - if not self.cursor or update.cursor != self.cursor: - self.observer(update) - self.cursor = update.cursor - should_fetch = update.data and len(update.data.encode("utf-8")) >= MAX_BYTES_CHUNK_READ - - def complete(self) -> None: - self.is_complete = True - if not self.observer: - return diff --git a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py index 99c91adc77f25..c55d831052261 100644 --- a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py @@ -4,7 +4,7 @@ from collections import defaultdict from contextlib import contextmanager from pathlib import Path -from typing import IO, TYPE_CHECKING, Generator, Iterator, Mapping, Optional, Sequence, Tuple +from typing import IO, Generator, Iterator, Mapping, Optional, Sequence, Tuple from typing_extensions import Final from watchdog.events import PatternMatchingEventHandler @@ -18,7 +18,6 @@ ) from dagster._config.config_schema import UserConfigSchema from dagster._core.execution.compute_logs import mirror_stream_to_file -from dagster._core.storage.dagster_run import DagsterRun from dagster._serdes import ConfigurableClass, ConfigurableClassData from dagster._seven import json from dagster._utils import ensure_dir, ensure_file, touch_file @@ -30,18 +29,9 @@ CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, -) -from .compute_log_manager import ( - MAX_BYTES_FILE_READ, ComputeIOType, - ComputeLogFileData, - ComputeLogManager, - ComputeLogSubscription, ) -if TYPE_CHECKING: - from dagster._core.storage.cloud_storage_compute_log_manager import LogSubscription - DEFAULT_WATCHDOG_POLLING_TIMEOUT: Final = 2.5 IO_TYPE_EXTENSION: Final[Mapping[ComputeIOType, str]] = { @@ -52,7 +42,7 @@ MAX_FILENAME_LENGTH: Final = 255 -class LocalComputeLogManager(CapturedLogManager, ComputeLogManager, ConfigurableClass): +class LocalComputeLogManager(CapturedLogManager, ConfigurableClass): """Stores copies of stdout & stderr for each compute step locally on disk.""" def __init__( @@ -237,11 +227,11 @@ def subscribe( self, log_key: Sequence[str], cursor: Optional[str] = None ) -> CapturedLogSubscription: subscription = CapturedLogSubscription(self, log_key, cursor) - self.on_subscribe(subscription) + self._subscription_manager.add_subscription(subscription) return subscription def unsubscribe(self, subscription): - self.on_unsubscribe(subscription) + self._subscription_manager.remove_subscription(subscription) def get_log_keys_for_log_key_prefix( self, log_key_prefix: Sequence[str], io_type: ComputeIOType @@ -261,87 +251,6 @@ def get_log_keys_for_log_key_prefix( return results - ############################################### - # - # Methods for the ComputeLogManager interface - # - ############################################### - @contextmanager - def _watch_logs( - self, dagster_run: DagsterRun, step_key: Optional[str] = None - ) -> Iterator[None]: - check.inst_param(dagster_run, "dagster_run", DagsterRun) - check.opt_str_param(step_key, "step_key") - - log_key = self.build_log_key_for_run(dagster_run.run_id, step_key or dagster_run.job_name) - with self.capture_logs(log_key): - yield - - def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType) -> str: - """Legacy adapter from compute log manager to more generic captured log manager API.""" - check.inst_param(io_type, "io_type", ComputeIOType) - log_key = self.build_log_key_for_run(run_id, key) - return self.get_captured_local_path(log_key, IO_TYPE_EXTENSION[io_type]) - - def read_logs_file( - self, - run_id: str, - key: str, - io_type: ComputeIOType, - cursor: int = 0, - max_bytes: int = MAX_BYTES_FILE_READ, - ) -> ComputeLogFileData: - path = self.get_local_path(run_id, key, io_type) - - if not os.path.exists(path) or not os.path.isfile(path): - return ComputeLogFileData(path=path, data=None, cursor=0, size=0, download_url=None) - - # See: https://docs.python.org/2/library/stdtypes.html#file.tell for Windows behavior - with open(path, "rb") as f: - f.seek(cursor, os.SEEK_SET) - data = f.read(max_bytes) - cursor = f.tell() - stats = os.fstat(f.fileno()) - - # local download path - download_url = self.download_url(run_id, key, io_type) - return ComputeLogFileData( - path=path, - data=data.decode("utf-8"), - cursor=cursor, - size=stats.st_size, - download_url=download_url, - ) - - def get_key(self, dagster_run: DagsterRun, step_key: Optional[str]): - check.inst_param(dagster_run, "dagster_run", DagsterRun) - check.opt_str_param(step_key, "step_key") - return step_key or dagster_run.job_name - - def is_watch_completed(self, run_id: str, key: str) -> bool: - log_key = self.build_log_key_for_run(run_id, key) - return self.is_capture_complete(log_key) - - def on_watch_start(self, dagster_run: DagsterRun, step_key: Optional[str]): - pass - - def on_watch_finish(self, dagster_run: DagsterRun, step_key: Optional[str] = None): - check.inst_param(dagster_run, "dagster_run", DagsterRun) - check.opt_str_param(step_key, "step_key") - log_key = self.build_log_key_for_run(dagster_run.run_id, step_key or dagster_run.job_name) - touchpath = self.complete_artifact_path(log_key) - touch_file(touchpath) - - def download_url(self, run_id: str, key: str, io_type: ComputeIOType): - check.inst_param(io_type, "io_type", ComputeIOType) - return f"/download/{run_id}/{key}/{io_type.value}" - - def on_subscribe(self, subscription: "LogSubscription") -> None: - self._subscription_manager.add_subscription(subscription) - - def on_unsubscribe(self, subscription: "LogSubscription") -> None: - self._subscription_manager.remove_subscription(subscription) - def dispose(self) -> None: self._subscription_manager.dispose() @@ -353,10 +262,8 @@ def __init__(self, manager): self._watchers = {} self._observer = None - def add_subscription(self, subscription: "LogSubscription") -> None: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) + def add_subscription(self, subscription: CapturedLogSubscription) -> None: + check.inst_param(subscription, "subscription", CapturedLogSubscription) if self.is_complete(subscription): subscription.fetch() @@ -367,32 +274,22 @@ def add_subscription(self, subscription: "LogSubscription") -> None: self._subscriptions[watch_key].append(subscription) self.watch(subscription) - def is_complete(self, subscription: "LogSubscription") -> bool: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) + def is_complete(self, subscription: CapturedLogSubscription) -> bool: + check.inst_param(subscription, "subscription", CapturedLogSubscription) - if isinstance(subscription, ComputeLogSubscription): - return self._manager.is_watch_completed(subscription.run_id, subscription.key) return self._manager.is_capture_complete(subscription.log_key) - def remove_subscription(self, subscription: "LogSubscription") -> None: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) + def remove_subscription(self, subscription: CapturedLogSubscription) -> None: + check.inst_param(subscription, "subscription", CapturedLogSubscription) log_key = self._log_key(subscription) watch_key = self._watch_key(log_key) if subscription in self._subscriptions[watch_key]: self._subscriptions[watch_key].remove(subscription) subscription.complete() - def _log_key(self, subscription: "LogSubscription") -> Sequence[str]: - check.inst_param( - subscription, "subscription", (ComputeLogSubscription, CapturedLogSubscription) - ) + def _log_key(self, subscription: CapturedLogSubscription) -> Sequence[str]: + check.inst_param(subscription, "subscription", CapturedLogSubscription) - if isinstance(subscription, ComputeLogSubscription): - return self._manager.build_log_key_for_run(subscription.run_id, subscription.key) return subscription.log_key def _watch_key(self, log_key: Sequence[str]) -> str: @@ -403,7 +300,7 @@ def remove_all_subscriptions(self, log_key: Sequence[str]) -> None: for subscription in self._subscriptions.pop(watch_key, []): subscription.complete() - def watch(self, subscription: "LogSubscription") -> None: + def watch(self, subscription: CapturedLogSubscription) -> None: log_key = self._log_key(subscription) watch_key = self._watch_key(log_key) if watch_key in self._watchers: diff --git a/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py index 360ca1ec5915e..bada719a8ff04 100644 --- a/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py @@ -10,18 +10,12 @@ CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, -) -from dagster._serdes import ConfigurableClass, ConfigurableClassData - -from .compute_log_manager import ( - MAX_BYTES_FILE_READ, ComputeIOType, - ComputeLogFileData, - ComputeLogManager, ) +from dagster._serdes import ConfigurableClass, ConfigurableClassData -class NoOpComputeLogManager(CapturedLogManager, ComputeLogManager, ConfigurableClass): +class NoOpComputeLogManager(CapturedLogManager, ConfigurableClass): """When enabled for a Dagster instance, stdout and stderr will not be available for any step.""" def __init__(self, inst_data: Optional[ConfigurableClassData] = None): @@ -41,38 +35,6 @@ def from_config_value( ) -> Self: return cls(inst_data=inst_data, **config_value) - def enabled(self, _dagster_run, _step_key): - return False - - def _watch_logs(self, dagster_run, step_key=None): - pass - - def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType) -> str: - raise NotImplementedError() - - def is_watch_completed(self, run_id, key): - return True - - def on_watch_start(self, dagster_run, step_key): - pass - - def on_watch_finish(self, dagster_run, step_key): - pass - - def download_url(self, run_id, key, io_type): - return None - - def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): - return ComputeLogFileData( - path=f"{key}.{io_type}", data=None, cursor=0, size=0, download_url=None - ) - - def on_subscribe(self, subscription): - pass - - def on_unsubscribe(self, subscription): - pass - @contextmanager def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: yield CapturedLogContext(log_key=log_key) diff --git a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py index 38f493dc89c92..9fed514cf7a2f 100644 --- a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py +++ b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py @@ -1746,4 +1746,4 @@ def test_logging_run_status_sensor( assert records record = records[0] assert record[LOG_RECORD_METADATA_ATTR]["orig_message"] == f"run succeeded: {run.run_id}" - instance.compute_log_manager.delete_logs(log_key=tick.log_key) # type: ignore + instance.compute_log_manager.delete_logs(log_key=tick.log_key) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 0609276a7144d..9070b26ad3700 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -56,8 +56,7 @@ InProcessCodeLocationOrigin, RemoteRepositoryOrigin, ) -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, DagsterRun, diff --git a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py index 457949c81fc1f..60d57a9117344 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py +++ b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py @@ -472,20 +472,6 @@ def test_crash_hard_multiprocessing(): # Neither the stderr not the stdout spew will (reliably) make it to the compute logs -- # documenting this behavior here though we may want to change it - # assert ( - # 'Crashy output to stdout' - # not in instance.compute_log_manager.read_logs_file( - # result.run_id, 'segfault_solid', ComputeIOType.STDOUT - # ).data - # ) - - # assert ( - # instance.compute_log_manager.read_logs_file( - # result.run_id, 'sys_exit', ComputeIOType.STDERR - # ).data - # is None - # ) - def get_dynamic_resource_init_failure_job(): return get_dynamic_job_resource_init_failure(multiprocess_executor)[0] diff --git a/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py b/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py index 238b0546d4ee8..93f1d8635d8cf 100644 --- a/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py +++ b/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py @@ -15,9 +15,12 @@ from dagster._core.execution.compute_logs import should_disable_io_stream_redirect from dagster._core.instance import DagsterInstance from dagster._core.instance.ref import InstanceRef -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.storage.local_compute_log_manager import ( + IO_TYPE_EXTENSION, + LocalComputeLogManager, +) from dagster._core.test_utils import create_run_for_test, instance_for_test from dagster._core.utils import make_new_run_id from dagster._utils import ensure_dir, touch_file @@ -63,6 +66,7 @@ def test_compute_log_to_disk(): with instance_for_test() as instance: spew_job = define_job() manager = instance.compute_log_manager + assert isinstance(manager, LocalComputeLogManager) result = spew_job.execute_in_process(instance=instance) assert result.success @@ -74,10 +78,12 @@ def test_compute_log_to_disk(): assert len(capture_events) == 1 event = capture_events[0] assert len(event.logs_captured_data.step_keys) == 3 - file_key = event.logs_captured_data.file_key - compute_io_path = manager.get_local_path(result.run_id, file_key, ComputeIOType.STDOUT) - assert os.path.exists(compute_io_path) - with open(compute_io_path, "r", encoding="utf8") as stdout_file: + log_key = [result.run_id, "compute_logs", event.logs_captured_data.file_key] + local_path = manager.get_captured_local_path( + log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT] + ) + assert os.path.exists(local_path) + with open(local_path, "r", encoding="utf8") as stdout_file: assert normalize_file_content(stdout_file.read()) == f"{HELLO_FROM_OP}\n{HELLO_FROM_OP}" @@ -88,6 +94,7 @@ def test_compute_log_to_disk_multiprocess(): spew_job = reconstructable(define_job) with instance_for_test() as instance: manager = instance.compute_log_manager + assert isinstance(manager, LocalComputeLogManager) result = execute_job( spew_job, instance=instance, @@ -102,45 +109,15 @@ def test_compute_log_to_disk_multiprocess(): assert len(capture_events) == 3 # one for each step last_spew_event = capture_events[-1] assert len(last_spew_event.logs_captured_data.step_keys) == 1 - file_key = last_spew_event.logs_captured_data.file_key - compute_io_path = manager.get_local_path(result.run_id, file_key, ComputeIOType.STDOUT) - assert os.path.exists(compute_io_path) - with open(compute_io_path, "r", encoding="utf8") as stdout_file: + log_key = [result.run_id, "compute_logs", last_spew_event.logs_captured_data.file_key] + local_path = manager.get_captured_local_path( + log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT] + ) + assert os.path.exists(local_path) + with open(local_path, "r", encoding="utf8") as stdout_file: assert normalize_file_content(stdout_file.read()) == HELLO_FROM_OP -@pytest.mark.skipif( - should_disable_io_stream_redirect(), reason="compute logs disabled for win / py3.6+" -) -def test_compute_log_manager(): - with instance_for_test() as instance: - manager = instance.compute_log_manager - spew_job = define_job() - result = spew_job.execute_in_process(instance=instance) - assert result.success - - capture_events = [ - event - for event in result.all_events - if event.event_type == DagsterEventType.LOGS_CAPTURED - ] - assert len(capture_events) == 1 - event = capture_events[0] - file_key = event.logs_captured_data.file_key - assert manager.is_watch_completed(result.run_id, file_key) - - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert normalize_file_content(stdout.data) == f"{HELLO_FROM_OP}\n{HELLO_FROM_OP}" - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - cleaned_logs = stderr.data.replace("\x1b[34m", "").replace("\x1b[0m", "") - assert "dagster - DEBUG - spew_job - " in cleaned_logs - - bad_logs = manager.read_logs_file("not_a_run_id", file_key, ComputeIOType.STDOUT) - assert bad_logs.data is None - assert not manager.is_watch_completed("not_a_run_id", file_key) - - @pytest.mark.skipif( should_disable_io_stream_redirect(), reason="compute logs disabled for win / py3.6+" ) @@ -184,24 +161,12 @@ def test_compute_log_manager_subscriptions(): ] assert len(capture_events) == 1 event = capture_events[0] - file_key = event.logs_captured_data.file_key - stdout_observable = instance.compute_log_manager.observable( - result.run_id, file_key, ComputeIOType.STDOUT - ) - stderr_observable = instance.compute_log_manager.observable( - result.run_id, file_key, ComputeIOType.STDERR - ) - stdout = [] - stdout_observable(stdout.append) - stderr = [] - stderr_observable(stderr.append) - assert len(stdout) == 1 - assert stdout[0].data.startswith(HELLO_FROM_OP) - # print(stdout[0].data) - assert stdout[0].cursor in range(28, 31) - assert len(stderr) == 1 - assert stderr[0].cursor == len(stderr[0].data) - assert stderr[0].cursor > 400 + log_key = [result.run_id, "compute_logs", event.logs_captured_data.file_key] + subscription = instance.compute_log_manager.subscribe(log_key) + log_data = [] + subscription(log_data.append) + assert len(log_data) == 1 + assert log_data[0].stdout.decode("utf-8").startswith(HELLO_FROM_OP) @pytest.mark.skipif( @@ -212,24 +177,23 @@ def test_compute_log_manager_subscription_updates(): with tempfile.TemporaryDirectory() as temp_dir: compute_log_manager = LocalComputeLogManager(temp_dir, polling_timeout=0.5) - run_id = make_new_run_id() - step_key = "spew" - stdout_path = compute_log_manager.get_local_path(run_id, step_key, ComputeIOType.STDOUT) - + log_key = [make_new_run_id(), "compute_logs", "spew"] + stdout_path = compute_log_manager.get_captured_local_path( + log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT] + ) # make sure the parent directory to be watched exists, file exists ensure_dir(os.path.dirname(stdout_path)) touch_file(stdout_path) # set up the subscription messages = [] - observable = compute_log_manager.observable(run_id, step_key, ComputeIOType.STDOUT) - observable(messages.append) + subscription = compute_log_manager.subscribe(log_key) + subscription(messages.append) # returns a single update, with 0 data assert len(messages) == 1 last_chunk = messages[-1] - assert not last_chunk.data - assert last_chunk.cursor == 0 + assert not last_chunk.stdout with open(stdout_path, "a+", encoding="utf8") as f: print(HELLO_FROM_OP, file=f) @@ -238,8 +202,8 @@ def test_compute_log_manager_subscription_updates(): time.sleep(1) assert len(messages) == 2 last_chunk = messages[-1] - assert last_chunk.data - assert last_chunk.cursor > 0 + assert last_chunk.stdout + assert last_chunk.cursor def gen_op_name(length): @@ -272,12 +236,11 @@ def long_job(): ] assert len(capture_events) == 1 event = capture_events[0] - file_key = event.logs_captured_data.file_key - - assert manager.is_watch_completed(result.run_id, file_key) + log_key = [result.run_id, "compute_logs", event.logs_captured_data.file_key] + assert manager.is_capture_complete(log_key) - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert normalize_file_content(stdout.data) == HELLO_FROM_OP + log_data = manager.get_log_data(log_key) + assert normalize_file_content(log_data.stdout.decode("utf-8")) == HELLO_FROM_OP def execute_inner(step_key: str, dagster_run: DagsterRun, instance_ref: InstanceRef) -> None: @@ -286,7 +249,8 @@ def execute_inner(step_key: str, dagster_run: DagsterRun, instance_ref: Instance def inner_step(instance: DagsterInstance, dagster_run: DagsterRun, step_key: str) -> None: - with instance.compute_log_manager.watch(dagster_run, step_key=step_key): + log_key = [dagster_run.run_id, "compute_logs", step_key] + with instance.compute_log_manager.capture_logs(log_key): time.sleep(0.1) print(step_key, "inner 1") # noqa: T201 print(step_key, "inner 2") # noqa: T201 @@ -312,7 +276,8 @@ def test_single(): step_keys = ["A", "B", "C"] - with instance.compute_log_manager.watch(dagster_run): + log_key = [dagster_run.run_id, "compute_logs", dagster_run.job_name] + with instance.compute_log_manager.capture_logs(log_key): print("outer 1") # noqa: T201 print("outer 2") # noqa: T201 print("outer 3") # noqa: T201 @@ -321,16 +286,19 @@ def test_single(): inner_step(instance, dagster_run, step_key) for step_key in step_keys: - stdout = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, step_key, ComputeIOType.STDOUT + log_key = [dagster_run.run_id, "compute_logs", step_key] + log_data = instance.compute_log_manager.get_log_data(log_key) + assert normalize_file_content(log_data.stdout.decode("utf-8")) == expected_inner_output( + step_key ) - assert normalize_file_content(stdout.data) == expected_inner_output(step_key) - full_out = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, job_name, ComputeIOType.STDOUT + full_data = instance.compute_log_manager.get_log_data( + [dagster_run.run_id, "compute_logs", job_name] ) - assert normalize_file_content(full_out.data).startswith(expected_outer_prefix()) + assert normalize_file_content(full_data.stdout.decode("utf-8")).startswith( + expected_outer_prefix() + ) @pytest.mark.skipif( @@ -353,7 +321,8 @@ def test_compute_log_base_with_spaces(): step_keys = ["A", "B", "C"] - with instance.compute_log_manager.watch(dagster_run): + log_key = [dagster_run.run_id, "compute_logs", dagster_run.job_name] + with instance.compute_log_manager.capture_logs(log_key): print("outer 1") # noqa: T201 print("outer 2") # noqa: T201 print("outer 3") # noqa: T201 @@ -362,16 +331,19 @@ def test_compute_log_base_with_spaces(): inner_step(instance, dagster_run, step_key) for step_key in step_keys: - stdout = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, step_key, ComputeIOType.STDOUT - ) - assert normalize_file_content(stdout.data) == expected_inner_output(step_key) - - full_out = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, job_name, ComputeIOType.STDOUT + log_key = [dagster_run.run_id, "compute_logs", step_key] + log_data = instance.compute_log_manager.get_log_data(log_key) + assert normalize_file_content( + log_data.stdout.decode("utf-8") + ) == expected_inner_output(step_key) + + full_data = instance.compute_log_manager.get_log_data( + [dagster_run.run_id, "compute_logs", job_name] ) - assert normalize_file_content(full_out.data).startswith(expected_outer_prefix()) + assert normalize_file_content(full_data.stdout.decode("utf-8")).startswith( + expected_outer_prefix() + ) @pytest.mark.skipif( @@ -386,7 +358,8 @@ def test_multi(): step_keys = ["A", "B", "C"] - with instance.compute_log_manager.watch(dagster_run): + log_key = [dagster_run.run_id, "compute_logs", dagster_run.job_name] + with instance.compute_log_manager.capture_logs(log_key): print("outer 1") # noqa: T201 print("outer 2") # noqa: T201 print("outer 3") # noqa: T201 @@ -400,16 +373,19 @@ def test_multi(): process.join() for step_key in step_keys: - stdout = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, step_key, ComputeIOType.STDOUT + log_key = [dagster_run.run_id, "compute_logs", step_key] + log_data = instance.compute_log_manager.get_log_data(log_key) + assert normalize_file_content(log_data.stdout.decode("utf-8")) == expected_inner_output( + step_key ) - assert normalize_file_content(stdout.data) == expected_inner_output(step_key) - full_out = instance.compute_log_manager.read_logs_file( - dagster_run.run_id, job_name, ComputeIOType.STDOUT + full_data = instance.compute_log_manager.get_log_data( + [dagster_run.run_id, "compute_logs", job_name] ) # The way that the multiprocess compute-logging interacts with pytest (which stubs out the # sys.stdout fileno) makes this difficult to test. The pytest-captured stdout only captures # the stdout from the outer process, not also the inner process - assert normalize_file_content(full_out.data).startswith(expected_outer_prefix()) + assert normalize_file_content(full_data.stdout.decode("utf-8")).startswith( + expected_outer_prefix() + ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py index a0ff7e5464320..2b82898f27c60 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py @@ -7,8 +7,7 @@ import pytest from dagster import job, op from dagster._core.events import DagsterEventType -from dagster._core.storage.captured_log_manager import CapturedLogContext -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager from dagster._core.test_utils import instance_for_test diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py index d65f73fb90433..398fed12b11c6 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py @@ -3,7 +3,6 @@ from typing import IO, Generator, Optional, Sequence import dagster._check as check -import pytest from dagster import job, op from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher import DefaultRunLauncher @@ -14,12 +13,7 @@ CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, -) -from dagster._core.storage.compute_log_manager import ( - MAX_BYTES_FILE_READ, ComputeIOType, - ComputeLogFileData, - ComputeLogManager, ) from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.root import LocalArtifactStorage @@ -33,7 +27,7 @@ def test_compute_log_manager_instance(): assert instance.compute_log_manager._instance # noqa: SLF001 -class BrokenCapturedLogManager(CapturedLogManager, ComputeLogManager): +class BrokenCapturedLogManager(CapturedLogManager): def __init__(self, fail_on_setup=False, fail_on_teardown=False): self._fail_on_setup = check.opt_bool_param(fail_on_setup, "fail_on_setup") self._fail_on_teardown = check.opt_bool_param(fail_on_teardown, "fail_on_teardown") @@ -79,89 +73,6 @@ def subscribe( def unsubscribe(self, subscription: CapturedLogSubscription): return - @contextmanager - def _watch_logs(self, pipeline_run, step_key=None): - pass - - def get_local_path(self, run_id, key, io_type): - pass - - def is_watch_completed(self, run_id, key): - return True - - def on_watch_start(self, pipeline_run, step_key): - pass - - def on_watch_finish(self, pipeline_run, step_key): - pass - - def download_url(self, run_id, key, io_type): - return None - - def read_logs_file( - self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ - ) -> ComputeLogFileData: - return ComputeLogFileData(path="", data=None, cursor=0, size=0, download_url=None) - - def on_subscribe(self, subscription): - pass - - def on_unsubscribe(self, subscription): - pass - - -class BrokenComputeLogManager(ComputeLogManager): - def __init__(self, fail_on_setup=False, fail_on_teardown=False): - self._fail_on_setup = check.opt_bool_param(fail_on_setup, "fail_on_setup") - self._fail_on_teardown = check.opt_bool_param(fail_on_teardown, "fail_on_teardown") - - @contextmanager - def _watch_logs(self, pipeline_run, step_key=None): - yield - - def is_watch_completed(self, run_id, key): - return True - - def on_watch_start(self, pipeline_run, step_key): - if self._fail_on_setup: - raise Exception("wahhh") - - def on_watch_finish(self, pipeline_run, step_key): - if self._fail_on_teardown: - raise Exception("blahhh") - - def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType): - pass - - def download_url(self, run_id, key, io_type): - return None - - def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): - return ComputeLogFileData( - path=f"{key}.{io_type}", data=None, cursor=0, size=0, download_url=None - ) - - def on_subscribe(self, subscription): - pass - - -@contextmanager -def broken_compute_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): - with tempfile.TemporaryDirectory() as temp_dir: - with environ({"DAGSTER_HOME": temp_dir}): - yield DagsterInstance( - instance_type=InstanceType.PERSISTENT, - local_artifact_storage=LocalArtifactStorage(temp_dir), - run_storage=SqliteRunStorage.from_local(temp_dir), - event_storage=SqliteEventLogStorage(temp_dir), - compute_log_manager=BrokenComputeLogManager( - fail_on_setup=fail_on_setup, fail_on_teardown=fail_on_teardown - ), - run_coordinator=DefaultRunCoordinator(), - run_launcher=DefaultRunLauncher(), - ref=InstanceRef.from_dir(temp_dir), - ) - @contextmanager def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): @@ -181,14 +92,6 @@ def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=F ) -@pytest.fixture( - name="instance_cm", - params=[broken_compute_log_manager_instance, broken_captured_log_manager_instance], -) -def instance_cm_fixture(request): - return request.param - - def _has_setup_exception(execute_result): return any( [ @@ -231,8 +134,8 @@ def boo_job(): boo() -def test_broken_compute_log_manager(instance_cm): - with instance_cm(fail_on_setup=True) as instance: +def test_broken_compute_log_manager(): + with broken_captured_log_manager_instance(fail_on_setup=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_setup_exception(yay_result) @@ -241,7 +144,7 @@ def test_broken_compute_log_manager(instance_cm): assert not boo_result.success assert _has_setup_exception(boo_result) - with instance_cm(fail_on_teardown=True) as instance: + with broken_captured_log_manager_instance(fail_on_teardown=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_teardown_exception(yay_result) @@ -251,7 +154,7 @@ def test_broken_compute_log_manager(instance_cm): assert not boo_result.success assert _has_teardown_exception(boo_result) - with instance_cm() as instance: + with broken_captured_log_manager_instance() as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert not _has_setup_exception(yay_result) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py index b321ffa648079..0ceded16131a4 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py @@ -5,7 +5,7 @@ import pytest from dagster._core.execution.compute_logs import should_disable_io_stream_redirect -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._time import get_current_datetime diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py index c38f9277f7a52..f330e3594fd92 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py @@ -7,7 +7,11 @@ from airflow.utils.dates import days_ago from dagster import DagsterEventType from dagster._core.instance import AIRFLOW_EXECUTION_DATE_STR -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.local_compute_log_manager import ( + IO_TYPE_EXTENSION, + LocalComputeLogManager, +) from dagster._core.test_utils import instance_for_test from dagster._time import get_current_datetime from dagster_airflow import make_dagster_job_from_airflow_dag @@ -32,6 +36,7 @@ def normalize_file_content(s): def check_captured_logs(manager, result, execution_date_fmt): assert result.success + assert isinstance(manager, LocalComputeLogManager) capture_events = [ event for event in result.all_events if event.event_type == DagsterEventType.LOGS_CAPTURED @@ -39,8 +44,10 @@ def check_captured_logs(manager, result, execution_date_fmt): assert len(capture_events) == 1 event = capture_events[0] assert event.logs_captured_data.step_keys == ["test_tags_dag__templated"] - file_key = event.logs_captured_data.file_key - compute_io_path = manager.get_local_path(result.run_id, file_key, ComputeIOType.STDOUT) + log_key = [result.run_id, "compute_logs", event.logs_captured_data.file_key] + compute_io_path = manager.get_captured_local_path( + log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT] + ) assert os.path.exists(compute_io_path) stdout_file = open(compute_io_path, "r", encoding="utf8") file_contents = normalize_file_content(stdout_file.read()) @@ -107,6 +114,7 @@ def test_job_auto_tag(): with instance_for_test() as instance: manager = instance.compute_log_manager + assert isinstance(manager, LocalComputeLogManager) pre_execute_time = get_current_datetime() @@ -126,10 +134,12 @@ def test_job_auto_tag(): ] event = capture_events[0] assert event.logs_captured_data.step_keys == ["test_tags_dag__templated"] - file_key = event.logs_captured_data.file_key post_execute_time = get_current_datetime() - compute_io_path = manager.get_local_path(result.run_id, file_key, ComputeIOType.STDOUT) + log_key = [result.run_id, "compute_logs", event.logs_captured_data.file_key] + compute_io_path = manager.get_captured_local_path( + log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT] + ) assert os.path.exists(compute_io_path) stdout_file = open(compute_io_path, "r", encoding="utf8") file_contents = normalize_file_content(stdout_file.read()) diff --git a/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py b/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py index 7608e0f938964..0e83c0088085c 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py @@ -12,12 +12,11 @@ _check as check, ) from dagster._config.config_type import Noneable -from dagster._core.storage.captured_log_manager import CapturedLogContext +from dagster._core.storage.captured_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.cloud_storage_compute_log_manager import ( CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py index 1b97fb2228f25..8be9056d6f4a7 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py @@ -8,7 +8,7 @@ from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher import DefaultRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.local_compute_log_manager import IO_TYPE_EXTENSION from dagster._core.storage.root import LocalArtifactStorage @@ -181,11 +181,10 @@ def test_blank_compute_logs(mock_s3_bucket): ) # simulate subscription to an in-progress run, where there is no key in the bucket - stdout = manager.read_logs_file("my_run_id", "my_step_key", ComputeIOType.STDOUT) - stderr = manager.read_logs_file("my_run_id", "my_step_key", ComputeIOType.STDERR) + log_data = manager.get_log_data(["my_run_id", "compute_logs", "my_step_key"]) - assert not stdout.data - assert not stderr.data + assert not log_data.stdout + assert not log_data.stderr def test_prefix_filter(mock_s3_bucket): diff --git a/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py index c874625df201f..a86709020e660 100644 --- a/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py @@ -11,11 +11,11 @@ StringSource, _check as check, ) +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.cloud_storage_compute_log_manager import ( CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py index 49711b4933c3d..dfc8bf65e3bc3 100644 --- a/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py @@ -8,7 +8,7 @@ from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.local_compute_log_manager import IO_TYPE_EXTENSION from dagster._core.storage.root import LocalArtifactStorage @@ -89,14 +89,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - # Check ADLS2 directly adls2_object = fake_client.get_blob_client( container=container, @@ -119,14 +111,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - def test_compute_log_manager_from_config(storage_account, container, credential): prefix = "foobar" @@ -366,14 +350,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - # Check ADLS2 directly adls2_object = fake_client.get_blob_client( container=container, @@ -396,14 +372,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - def test_compute_log_manager_from_config_default_azure_credential(storage_account, container): prefix = "foobar" diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py index ab009391f5f7a..00b5826646afe 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py @@ -11,12 +11,11 @@ _check as check, ) from dagster._config.config_type import Noneable -from dagster._core.storage.captured_log_manager import CapturedLogContext +from dagster._core.storage.captured_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.cloud_storage_compute_log_manager import ( CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py index f1581dcd772ce..981b0f2d3d392 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py @@ -10,7 +10,7 @@ from dagster._core.instance.ref import InstanceRef from dagster._core.launcher import DefaultRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import SqliteRunStorage @@ -80,14 +80,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - # Check GCS directly stderr_gcs = ( storage.Client() @@ -113,14 +105,6 @@ def easy(context): for expected in EXPECTED_LOGS: assert expected in stderr - # Legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - @pytest.mark.integration def test_compute_log_manager_with_envvar(gcs_bucket): @@ -172,14 +156,6 @@ def easy(context): stdout = log_data.stdout.decode("utf-8") assert stdout == HELLO_WORLD + SEPARATOR - # legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - # Check GCS directly stderr_gcs = ( storage.Client() @@ -202,14 +178,6 @@ def easy(context): stdout = log_data.stdout.decode("utf-8") assert stdout == HELLO_WORLD + SEPARATOR - # legacy API - stdout = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDOUT) - assert stdout.data == HELLO_WORLD + SEPARATOR - - stderr = manager.read_logs_file(result.run_id, file_key, ComputeIOType.STDERR) - for expected in EXPECTED_LOGS: - assert expected in stderr.data - @pytest.mark.integration def test_compute_log_manager_from_config(gcs_bucket):