Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
[Streamlet] Reinforce usage of StreamletNamePrefixes for more type-sa…
Browse files Browse the repository at this point in the history
…fety (#2627)

* [Streamlet] Reinforce using StreamletNamePrefixes for more type-safety

* [Streamlet] Fix documentation issues

* Review comment is addressed.
  • Loading branch information
erenavsarogullari authored and kramasamy committed Dec 22, 2017
1 parent 9b17e30 commit 77904c2
Show file tree
Hide file tree
Showing 18 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public boolean allBuilt() {
return true;
}

protected enum StreamletNamePrefixes {
protected enum StreamletNamePrefix {
CONSUMER("consumer"),
FILTER("filter"),
FLATMAP("flatmap"),
Expand All @@ -109,7 +109,7 @@ protected enum StreamletNamePrefixes {

private final String prefix;

StreamletNamePrefixes(final String prefix) {
StreamletNamePrefix(final String prefix) {
this.prefix = prefix;
}

Expand Down Expand Up @@ -157,7 +157,7 @@ public String getName() {
* @param prefix The name prefix of this streamlet
* @param stageNames The collections of created streamlet/stage names
*/
protected void setDefaultNameIfNone(String prefix, Set<String> stageNames) {
protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
if (getName() == null) {
setName(defaultNameCalculator(prefix, stageNames));
}
Expand Down Expand Up @@ -220,11 +220,11 @@ public <T> void addChild(StreamletImpl<T> child) {
children.add(child);
}

private String defaultNameCalculator(String prefix, Set<String> stageNames) {
private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
int index = 1;
String calculatedName;
while (true) {
calculatedName = new StringBuilder(prefix).append(index).toString();
calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
if (!stageNames.contains(calculatedName)) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ConsumerStreamlet(StreamletImpl<R> parent, SerializableConsumer<R> consum

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.CONSUMER.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.CONSUMER, stageNames);
bldr.setBolt(getName(), new ConsumerSink<>(consumer),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public FilterStreamlet(StreamletImpl<R> parent, SerializablePredicate<? super R>

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.FILTER.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.FILTER, stageNames);
bldr.setBolt(getName(), new FilterOperator<R>(filterFn),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FlatMapStreamlet(StreamletImpl<R> parent,

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.FLATMAP.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.FLATMAP, stageNames);
bldr.setBolt(getName(), new FlatMapOperator<R, T>(flatMapFn),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.REDUCE.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
GeneralReduceByKeyAndWindowOperator<K, V, VR> bolt =
new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
windowCfg.attachWindowConfig(bolt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
if (!left.isBuilt() || !right.isBuilt()) {
return false;
}
setDefaultNameIfNone(StreamletNamePrefixes.JOIN.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.JOIN, stageNames);
JoinOperator<K, R, S, T> bolt = new JoinOperator<>(joinType, left.getName(),
right.getName(), leftKeyExtractor, rightKeyExtractor, joinFn);
windowCfg.attachWindowConfig(bolt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public LogStreamlet(StreamletImpl<R> parent) {

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.LOGGER.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.LOGGER, stageNames);
bldr.setBolt(getName(), new LogSink<R>(),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public MapStreamlet(StreamletImpl<R> parent, SerializableFunction<? super R, ? e

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.MAP.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.MAP, stageNames);
bldr.setBolt(getName(), new MapOperator<R, T>(mapFn),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.REDUCE.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
ReduceByKeyAndWindowOperator<K, V, R> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
valueExtractor, reduceFn);
windowCfg.attachWindowConfig(bolt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public RemapStreamlet(StreamletImpl<R> parent,

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.REMAP.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.REMAP, stageNames);
bldr.setBolt(getName(), new MapOperator<R, R>((a) -> a),
getNumPartitions())
.customGrouping(parent.getName(), new RemapCustomGrouping<R>(remapFn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SinkStreamlet(StreamletImpl<R> parent, Sink<R> sink) {

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.SINK.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.SINK, stageNames);
bldr.setBolt(getName(), new ComplexSink<>(sink),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SourceStreamlet(Source<R> generator) {

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.SOURCE.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.SOURCE, stageNames);
bldr.setSpout(getName(), new ComplexSource<R>(generator), getNumPartitions());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SupplierStreamlet(SerializableSupplier<R> supplier) {

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.SUPPLIER.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.SUPPLIER, stageNames);
bldr.setSpout(getName(), new SupplierSource<R>(supplier), getNumPartitions());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public TransformStreamlet(StreamletImpl<R> parent,

@Override
public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
setDefaultNameIfNone(StreamletNamePrefixes.TRANSFORM.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.TRANSFORM, stageNames);
bldr.setBolt(getName(), new TransformOperator<R, T>(serializableTransformer),
getNumPartitions()).shuffleGrouping(parent.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
// The system will call us again later
return false;
}
setDefaultNameIfNone(StreamletNamePrefixes.UNION.toString(), stageNames);
setDefaultNameIfNone(StreamletNamePrefix.UNION, stageNames);
bldr.setBolt(getName(), new UnionOperator<I>(),
getNumPartitions()).shuffleGrouping(left.getName()).shuffleGrouping(right.getName());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public void testDefaultStreamletNameIfNotSet() {

// set default name by streamlet name prefix
supplierStreamlet.setDefaultNameIfNone(
StreamletImpl.StreamletNamePrefixes.SUPPLIER.toString(), stageNames);
StreamletImpl.StreamletNamePrefix.SUPPLIER, stageNames);

// verify stageNames
assertEquals(1, stageNames.size());
Expand All @@ -342,7 +342,7 @@ public void testStreamletNameIfAlreadySet() {

// set default name by streamlet name prefix
supplierStreamlet.setDefaultNameIfNone(
StreamletImpl.StreamletNamePrefixes.SUPPLIER.toString(), stageNames);
StreamletImpl.StreamletNamePrefix.SUPPLIER, stageNames);

// verify stageNames
assertEquals(1, stageNames.size());
Expand Down
1 change: 0 additions & 1 deletion website/content/docs/concepts/streamlet-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ Builder processingGraphBuilder = Builder.newBuilder();

Streamlet<Integer> ones = processingGraphBuilder.newSource(() -> 1);
Streamlet<Integer> thirteens = ones.map(i -> i + 12);
thirteens.
```

In this example, a supplier streamlet emits an indefinite series of 1s. The `map` operation then adds 12 to each incoming element, producing a streamlet of 13s. The effect of this operation is to transform the `Streamlet<Integer>` into a `Streamlet<Integer>` with different values (map operations can also convert streamlets into streamlets of a different type).
Expand Down
2 changes: 1 addition & 1 deletion website/content/docs/concepts/topologies.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ physical plan:

![Topology Physical Plan](https://www.lucidchart.com/publicSegments/view/5c2fe0cb-e4cf-4192-9416-b1b64b5ce958/image.png)

In this example, a Heron topology consists of one [spout](#spouts) and five
In this example, a Heron topology consists of two [spout](#spouts) and five
different [bolts](#bolts) (each of which has multiple instances) that have automatically
been distributed between five different containers.

Expand Down

0 comments on commit 77904c2

Please sign in to comment.