Skip to content

Commit

Permalink
Updates and fixes to error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jun 1, 2023
1 parent ff8c75a commit 75fea03
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 51 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ members = [
"arroyo-server-common",
"arroyo-sql",
"arroyo-sql-macro",
# "arroyo-sql-testing",
"arroyo-sql-testing",
"arroyo-state",
"arroyo-types",
"arroyo-worker",
Expand Down
4 changes: 3 additions & 1 deletion arroyo-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ pub(crate) async fn get_metrics(
"{}{{job_id=\"{}\",run_id=\"{}\"}}",
TX_QUEUE_REM, job_id, run_id
);
format!("({} - {}) / {}", tx_queue_size, tx_queue_rem, tx_queue_size)
// add 1 to each value to account for uninitialized values (which report 0); this can happen when a task
// never reads any data
format!("1 - (({} + 1) / ({} + 1))", tx_queue_rem, tx_queue_size)
}

fn get_query(&self, job_id: &str, run_id: u64, rate: &str) -> String {
Expand Down
2 changes: 1 addition & 1 deletion arroyo-console/src/components/CheckpointDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const CheckpointDetails: React.FC<CheckpointDetailsProps> = ({ checkpoint, ops,
const tableBody = (
<Tbody>
{ops?.map(op => {
const size = bytes(Object.values(op.checkpoint.tasks));
const size = bytes(Object.values(op.checkpoint?.tasks || []));
const timeline = <OperatorCheckpointTimeline op={op} scale={scale} w={w} />;
return row(op.name, timeline, size);
})}
Expand Down
4 changes: 2 additions & 2 deletions arroyo-console/src/components/Loading.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ export interface LoadingProps {

const Loading: React.FC<LoadingProps> = ({ size = 'xl' }) => {
return (
<Flex border={'1px solid blue'} justify={'center'} height={'75%'}>
<Spinner speed="0.65s" emptyColor="gray.200" color="blue.500" size={size} />
<Flex border={'blue.900'} borderWidth={'1px'} justify={'center'} height={'75%'} p={8}>
<Spinner speed="0.65s" emptyColor="gray.200" color="blue.900" size={size} />
</Flex>
);
};
Expand Down
16 changes: 6 additions & 10 deletions arroyo-console/src/components/OperatorErrors.tsx
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { useOperatorErrors } from '../lib/data_fetching';
import { ApiClient } from '../main';
import Loading from './Loading';
import { Table, TableContainer, Tbody, Td, Th, Thead, Tr } from '@chakra-ui/react';
import React from 'react';
import { formatDate } from '../lib/util';
import { OperatorErrorsRes } from '../gen/api_pb';

export interface OperatorErrorsProps {
client: ApiClient;
jobId?: string;
operatorErrors?: OperatorErrorsRes;
}

const OperatorErrors: React.FC<OperatorErrorsProps> = ({ client, jobId }) => {
const { operatorErrors } = useOperatorErrors(client, jobId);

const OperatorErrors: React.FC<OperatorErrorsProps> = ({ operatorErrors }) => {
if (!operatorErrors) {
return <Loading />;
}
Expand All @@ -21,7 +17,7 @@ const OperatorErrors: React.FC<OperatorErrorsProps> = ({ client, jobId }) => {
<Tbody>
{operatorErrors.messages.map(m => {
return (
<Tr>
<Tr key={String(m.createdAt)}>
<Td>{formatDate(m.createdAt)}</Td>
<Td>{m.operatorId}</Td>
<Td>{m.taskIndex?.toString()}</Td>
Expand All @@ -34,8 +30,8 @@ const OperatorErrors: React.FC<OperatorErrorsProps> = ({ client, jobId }) => {
);

const table = (
<TableContainer width={'100%'} padding={5}>
<Table variant="striped">
<TableContainer padding={5}>
<Table variant="striped" w={'100%'}>
<Thead>
<Tr>
<Th>Time</Th>
Expand Down
2 changes: 1 addition & 1 deletion arroyo-console/src/lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export function getOperatorBackpressure(
}

export function formatDate(timestamp: bigint) {
const date = new Date(Number(timestamp));
const date = new Date(Number(timestamp / BigInt(1000)));
return new Intl.DateTimeFormat('en', {
dateStyle: 'short',
timeStyle: 'short',
Expand Down
67 changes: 39 additions & 28 deletions arroyo-console/src/routes/pipelines/JobDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ButtonGroup,
Flex,
Spacer,
Spinner,
Stack,
Tab,
TabList,
Expand All @@ -31,7 +32,12 @@ import { PipelineOutputs } from './JobOutputs';
import { CodeEditor } from './SqlEditor';
import PipelineConfigModal from './PipelineConfigModal';
import { Code as ConnectWebCode, ConnectError } from '@bufbuild/connect-web';
import { useJob, useJobCheckpoints, useJobMetrics } from '../../lib/data_fetching';
import {
useJob,
useJobCheckpoints,
useJobMetrics,
useOperatorErrors,
} from '../../lib/data_fetching';
import OperatorDetail from '../../components/OperatorDetail';
import JobNotFound from '../../components/JobNotFound';
import Checkpoints from '../../components/Checkpoints';
Expand All @@ -54,6 +60,7 @@ export function JobDetail({ client }: { client: ApiClient }) {
const { job, jobError, updateJob } = useJob(client, id);
const { metrics } = useJobMetrics(client, id);
const { checkpoints } = useJobCheckpoints(client, id);
const { operatorErrors } = useOperatorErrors(client, id);

if (jobError) {
if (jobError instanceof ConnectError && jobError.code === ConnectWebCode.NotFound) {
Expand Down Expand Up @@ -187,33 +194,38 @@ export function JobDetail({ client }: { client: ApiClient }) {

const errorsTab = (
<TabPanel>
<OperatorErrors client={client} jobId={job.jobStatus.jobId} />
<OperatorErrors operatorErrors={operatorErrors} />
</TabPanel>
);

inner = (
<Flex direction={'column'} minH={0}>
<Tabs display={'flex'} flexDirection={'column'} overflow={'scroll'}>
<div>
<TabList>
<Tab>Operators</Tab>
<Tab>Outputs</Tab>
<Tab>Checkpoints</Tab>
<Tab>Query</Tab>
<Tab>UDFs</Tab>
<Tab>Errors</Tab>
</TabList>
</div>
<TabPanels display={'flex'} flexDirection={'column'} flexGrow={1} overflowY={'scroll'}>
{operatorsTab}
{outputsTab}
{checkpointsTab}
{queryTab}
{udfsTab}
{errorsTab}
</TabPanels>
</Tabs>
</Flex>
<Tabs h={'100%'}>
<div>
<TabList>
<Tab>Operators</Tab>
<Tab>Outputs</Tab>
<Tab>Checkpoints</Tab>
<Tab>Query</Tab>
<Tab>UDFs</Tab>
<Tab>
Errors{' '}
{(operatorErrors?.messages?.length || 0) > 0 && (
<Badge ml={2} colorScheme="red" size={'xs'}>
{operatorErrors!.messages.length}
</Badge>
)}
</Tab>
</TabList>
</div>
<TabPanels h={'100%'}>
{operatorsTab}
{outputsTab}
{checkpointsTab}
{queryTab}
{udfsTab}
{errorsTab}
</TabPanels>
</Tabs>
);
}

Expand All @@ -239,19 +251,18 @@ export function JobDetail({ client }: { client: ApiClient }) {
actionButton = (
<Button
isDisabled={job.action == undefined}
isLoading={job.inProgress}
loadingText={job.actionText}
onClick={async () => {
await updateJobState(job.action!);
}}
>
{job.inProgress ? <Spinner size="xs" mr={2} /> : null}
{job.actionText}
</Button>
);
}

return (
<Flex direction={'column'} height={'100vh'}>
<Box top={0} bottom={0} right={0} left={200} position="absolute" overflowY="hidden">
<Flex>
<Box p={5}>
<Text fontSize={20}>
Expand All @@ -268,6 +279,6 @@ export function JobDetail({ client }: { client: ApiClient }) {
</Flex>
{inner}
{configModal}
</Flex>
</Box>
);
}
9 changes: 7 additions & 2 deletions arroyo-console/src/routes/pipelines/JobGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,13 @@ export function PipelineGraph({
});

return (
<Box height={'70vh'} className="pipelineGraph">
<ReactFlow nodes={nodes} edges={edges} nodeTypes={nodeTypes}>
<Box className="pipelineGraph">
<ReactFlow
proOptions={{ hideAttribution: true }}
nodes={nodes}
edges={edges}
nodeTypes={nodeTypes}
>
<Background />
</ReactFlow>
</Box>
Expand Down
2 changes: 0 additions & 2 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,6 @@ impl PlanNode {
sort_key_type,
max_elements: *max_elements,
});
println!("sliding logical plan operator {:#?}", self);
println!("sliding physical plan operator {:#?}", operator);
operator
}
PlanOperator::TumblingTopN {
Expand Down
4 changes: 1 addition & 3 deletions arroyo-worker/src/operators/sources/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ where
let has_state = !state.is_empty();

let state: HashMap<i32, KafkaState> = state.iter().map(|s| (s.partition, **s)).collect();
let metadata = consumer
.fetch_metadata(Some(&self.topic), Duration::from_secs(30))
.expect("failed to fetch kafka metadata");
let metadata = consumer.fetch_metadata(Some(&self.topic), Duration::from_secs(30))?;

info!("Fetched metadata for topic {}", self.topic);

Expand Down

0 comments on commit 75fea03

Please sign in to comment.