diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java index 9b42ee6f34f..eb9ee165437 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java @@ -54,9 +54,16 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou @Override public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); - if (filterFn.test(obj)) { - collector.emit(new Values(obj)); + try { + Boolean passed = filterFn.test(obj); + if (passed) { + collector.emit(new Values(obj)); + } + collector.ack(tuple); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch(Exception e) { + e.printStackTrace(); + collector.fail(tuple); } - collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java index 4ba3a7f7f9b..6bcecd4fa71 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java @@ -54,11 +54,17 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - R obj = (R) tuple.getValue(0); - Iterable result = flatMapFn.apply(obj); - for (T o : result) { - collector.emit(new Values(o)); + try { + R obj = (R) tuple.getValue(0); + Iterable result = flatMapFn.apply(obj); + for (T o : result) { + collector.emit(new Values(o)); + } + collector.ack(tuple); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + collector.fail(tuple); } - collector.ack(tuple); } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java index f37ebeb646b..1481465e963 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java @@ -65,10 +65,20 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou public void execute(TupleWindow inputWindow) { Map reduceMap = new HashMap<>(); Map windowCountMap = new HashMap<>(); - for (Tuple tuple : inputWindow.get()) { - V tup = (V) tuple.getValue(0); - addMap(reduceMap, windowCountMap, tup); + try { + for (Tuple tuple : inputWindow.get()) { + V tup = (V) tuple.getValue(0); + addMap(reduceMap, windowCountMap, tup); + } + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + for (Tuple tuple : inputWindow.get()) { + collector.fail(tuple); + } + return; // Early out } + long startWindow; long endWindow; if (inputWindow.getStartTimestamp() == null) { @@ -86,6 +96,9 @@ public void execute(TupleWindow inputWindow) { KeyedWindow keyedWindow = new KeyedWindow<>(key, window); collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key)))); } + for (Tuple tuple : inputWindow.get()) { + collector.ack(tuple); + } } private void addMap(Map reduceMap, Map windowCountMap, V tup) { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java index 1aa31382a2f..755d1ce0dc0 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java @@ -90,20 +90,32 @@ public Map getComponentConfiguration() { @Override public void execute(TupleWindow inputWindow) { Map, List>> joinMap = new HashMap<>(); - for (Tuple tuple : inputWindow.get()) { - if (tuple.getSourceComponent().equals(leftComponent)) { - V1 tup = (V1) tuple.getValue(0); - if (tup != null) { - addMapLeft(joinMap, tup); - } - } else { - V2 tup = (V2) tuple.getValue(0); - if (tup != null) { - addMapRight(joinMap, tup); + try { + for (Tuple tuple : inputWindow.get()) { + if (tuple.getSourceComponent().equals(leftComponent)) { + V1 tup = (V1) tuple.getValue(0); + if (tup != null) { + addMapLeft(joinMap, tup); + } + } else { + V2 tup = (V2) tuple.getValue(0); + if (tup != null) { + addMapRight(joinMap, tup); + } } } + evaluateJoinMap(joinMap, inputWindow); + + for (Tuple tuple : inputWindow.get()) { + collector.ack(tuple); + } + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + for (Tuple tuple : inputWindow.get()) { + collector.fail(tuple); + } } - evaluateJoinMap(joinMap, inputWindow); } private void evaluateJoinMap(Map, List>> joinMap, TupleWindow tupleWindow) { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java index b05e6f19e2b..d7c0fd4f89c 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java @@ -53,8 +53,14 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou @Override public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); - T result = mapFn.apply(obj); - collector.emit(new Values(result)); - collector.ack(tuple); + try { + T result = mapFn.apply(obj); + collector.emit(new Values(result)); + collector.ack(tuple); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + collector.fail(tuple); + } } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java index cf6dc38a4a5..0cf54ddb0c0 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java @@ -66,10 +66,20 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou public void execute(TupleWindow inputWindow) { Map reduceMap = new HashMap<>(); Map windowCountMap = new HashMap<>(); - for (Tuple tuple : inputWindow.get()) { - R tup = (R) tuple.getValue(0); - addMap(reduceMap, windowCountMap, tup); + try { + for (Tuple tuple : inputWindow.get()) { + R tup = (R) tuple.getValue(0); + addMap(reduceMap, windowCountMap, tup); + } + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + for (Tuple tuple : inputWindow.get()) { + collector.fail(tuple); + } + return; // Early out } + long startWindow; long endWindow; if (inputWindow.getStartTimestamp() == null) { @@ -87,6 +97,10 @@ public void execute(TupleWindow inputWindow) { KeyedWindow keyedWindow = new KeyedWindow<>(key, window); collector.emit(new Values(new KeyValue<>(keyedWindow, reduceMap.get(key)))); } + + for (Tuple tuple : inputWindow.get()) { + collector.ack(tuple); + } } private void addMap(Map reduceMap, Map windowCountMap, R tup) { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java index e588f8bc996..cbd7d69db1d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java @@ -80,7 +80,13 @@ public void prepare(Map map, @Override public void execute(Tuple tuple) { R obj = (R) tuple.getValue(0); - serializableTransformer.transform(obj, x -> collector.emit(new Values(x))); - collector.ack(tuple); + try { + serializableTransformer.transform(obj, x -> collector.emit(new Values(x))); + collector.ack(tuple); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + collector.fail(tuple); + } } } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java index f061eaeb0a4..3a5a7050f1a 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java @@ -48,7 +48,13 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou @Override public void execute(Tuple tuple) { I obj = (I) tuple.getValue(0); - collector.emit(new Values(obj)); - collector.ack(tuple); + try { + collector.emit(new Values(obj)); + collector.ack(tuple); + // SUPPRESS CHECKSTYLE IllegalCatch + } catch (Exception e) { + e.printStackTrace(); + collector.fail(tuple); + } } }