Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Joshfischer/eco config (#2658)
Browse files Browse the repository at this point in the history
* initial addition of directory structure

* clean up

* adding base config for build file \
fixing linting issues

* saving progress.  Still need to fix missing class definition on ParseException

* fixing shaded dependency issue

* starting experimentation on how to approach loading config from yaml file

* experimenting with snake yaml to extract topology defintion

* adding start of topology and component definitions

* refactor

* successfully populating ecoTopologyDefinition from yaml

* clean up

* clean up

* fixed versioning error

* saving progress, still working through building topology from yaml

* saving progress

* cleaning up styles

* checking default config values

* adding test for customized config map

* transitioning to support heron api

* removing last of streamlet api from eco

* making a pivot to match storm flux behavior better

* saving before chaos

* saving progress, running into issue with verifying the toplogy

* adding some logging for debugging

* printing topology definition.  Adding tests

* saving with failing test, still working through investigation

* back to working

* saving progress

* finishing eco parser test

* handling exception

* package change

* package change

* adding missed id

* MVP of ECO at this commit

* fixing styles, pulling topology name from topology definition file

* fixing spelling

* clean up

* clean up

* refatoring eco builder package

* start to removing static calls

* removing static calls

* still working through linting errors

* fixing lint errors

* fixing setter

* adding test

* start to verfiying bean references

* verifying components

* fixing tests

* worked passed constructor argument issue.  Now need to deal with backtypes from Storm

* saving before restructure

* building kafka topology

* fixing imports for test

* fixing folder structure

* refactor for testability

* removing more statics

* cleaning up eco builders

* confirmation config builder maps as expected

* finishing configBuilder test

* cleaning up BoltBuilder

* verfiying component builder

* replacing java logger

* adding tests

* adding verification of building implementations of IRichBolts

* clean up

* adding mocks for testing

* fixing classNotFound errors

* coverage on all of stream bulder

* need to refactor ObjectBuilder

* moving static call

* adding verifcation to test

* clean up

* clean up

* verifying path of building objects with args

* covering other path

* starting on builder utility

* verifying className

* adding test

* adding test

* clean up

* adding test

* clean up

* clean up

* fixing checkstyle errors

* fixing linting issues

* fixing linting issues

* fixing linting issues

* fixing linting issues

* linting issues

* fixing checkstyle

* fixing checkstyle

* saving progress

* adding simple wordcount eco example

* adding examples

* checkstyles
  • Loading branch information
joshfischer1108 authored and kramasamy committed Jan 9, 2018
1 parent d609b7c commit 400caa4
Show file tree
Hide file tree
Showing 48 changed files with 4,051 additions and 3 deletions.
21 changes: 21 additions & 0 deletions examples/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,24 @@ genrule(
outs = ["heron-streamlet-examples.jar"],
cmd = "cp $< $@",
)


java_binary(
name='eco-examples-unshaded',
srcs = glob(["com/twitter/heron/examples/eco/**/*.java"]),
deps = [
"//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/eco/src/java:eco-core",
"//storm-compatibility/src/java:storm-compatibility-java",
],
create_executable = 0,
)

genrule(
name = 'heron-eco-examples',
srcs = [":eco-examples-unshaded_deploy.jar"],
outs = ["heron-eco-examples.jar"],
cmd = "cp $< $@",
)
41 changes: 41 additions & 0 deletions examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.examples.eco;


import java.util.logging.Logger;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

/**
* Simple bolt that does nothing other than LOG.info() every tuple received.
*
*/
@SuppressWarnings("serial")
public class LogInfoBolt extends BaseBasicBolt {
private static final Logger LOG = Logger.getLogger(LogInfoBolt.class.getName());

@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
LOG.info("{ }" + tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.examples.eco;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import static com.twitter.heron.api.utils.Utils.tuple;
@SuppressWarnings({"serial", "rawtypes"})
public class TestNameCounter extends BaseBasicBolt {

private Map<String, Integer> counts;

@Override
public void prepare(Map map, TopologyContext topologyContext) {
counts = new HashMap<>();
}


protected String getTupleValue(Tuple t, int idx) {
return (String) t.getValues().get(idx);
}

public void execute(Tuple input, BasicOutputCollector collector) {
String word = getTupleValue(input, 0);
int count = 0;
if (counts.containsKey(word)) {
count = counts.get(word);
}
count++;
counts.put(word, count);
collector.emit(tuple(word, count));
}

public void cleanup() {

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name", "count"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.examples.eco;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.storm.Config;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

@SuppressWarnings({"serial", "HiddenField"})
public class TestNameSpout extends BaseRichSpout {
private boolean isdistributed;
private SpoutOutputCollector collector;

public TestNameSpout() {
this(true);
}

public TestNameSpout(boolean isDistributed) {
isdistributed = isDistributed;
}

public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}

public void close() {

}

public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"marge", "homer", "bart", "simpson", "lisa"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
collector.emit(new Values(word));
}

public void ack(Object msgId) {

}

public void fail(Object msgId) {

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
if (!isdistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_WORKERS, 1);
return ret;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.examples.eco;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

@SuppressWarnings("serial")
public class TestPrintBolt extends BaseBasicBolt {

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.examples.eco;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;

@SuppressWarnings({"serial", "HiddenField"})
public class TestWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;


@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(TupleWindow inputWindow) {
collector.emit(new Values(inputWindow.get().size()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("count"));
}
}
67 changes: 67 additions & 0 deletions examples/src/resources/simple_windowing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2017 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---

name: "sliding-window-topology"

components:
- id: "windowLength"
className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
constructorArgs:
- 5
- id: "slidingInterval"
className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
constructorArgs:
- 3

config:
topology.workers: 1

# spout definitions
spouts:
- id: "spout-1"
className: "com.twitter.heron.examples.eco.TestNameSpout"
parallelism: 1

# bolt definitions
bolts:
- id: "bolt-1"
className: "com.twitter.heron.examples.eco.TestWindowBolt"
configMethods:
- name: "withWindow"
args: [ref: "windowLength", ref: "slidingInterval"]
parallelism: 1
- id: "bolt-2"
className: "com.twitter.heron.examples.eco.TestPrintBolt"
parallelism: 1


#stream definitions
# stream definitions define connections between spouts and bolts.
# note that such connections can be cyclical
streams:
- name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
# id: "connection-1"
from: "spout-1"
to: "bolt-1"
grouping:
type: FIELDS
args: ["word"]
- name: "bolt-1 --> bolt-2" # name isn't used (placeholder for logging, UI, etc.)
# id: "connection-1"
from: "bolt-1"
to: "bolt-2"
grouping:
type: SHUFFLE
Loading

0 comments on commit 400caa4

Please sign in to comment.