Skip to content

Commit

Permalink
use original tweet id, change kafka url, change build version
Browse files Browse the repository at this point in the history
  • Loading branch information
zmyzheng committed Feb 26, 2020
1 parent 53166a7 commit 1ed0075
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion DevOps/tweet-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
spec:
containers:
- name: tweet-collector
image: zmyzheng/tweet-collector:1.0-SNAPSHOT
image: zmyzheng/tweet-collector:1.1-SNAPSHOT
# ports:
# - containerPort: 8443
# args: ["--kafka.bootstrapServers=b-3.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092,b-1.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092,b-2.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

./gradlew :tweet-collector:clean :tweet-collector:build :tweet-collector:dockerPush

java -jar tweet-collector/build/libs/tweet-collector-1.0-SNAPSHOT.jar
java -jar tweet-collector/build/libs/tweet-collector-1.1-SNAPSHOT.jar

./gradlew :flink-processor:clean :flink-processor:build

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
subprojects {

group 'io.zmyzheng'
version '1.0-SNAPSHOT'
version '1.1-SNAPSHOT'



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public IndexRequest createIndexRequest(Tweet element) throws JsonProcessingExcep
return Requests.indexRequest()
.index("streaming")
.type("tweets")
.id(element.getId())
.source(objectMapper.writeValueAsBytes(element), XContentType.JSON);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void run() throws Exception {
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "54.212.221.82:9092");
properties.setProperty("bootstrap.servers", "34.218.59.198:9092");
properties.setProperty("group.id", this.getClass().getName());

DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("tweets", new SimpleStringSchema(), properties));
Expand All @@ -71,6 +71,7 @@ public Tweet map(String value) {
try {
JsonNode node = objectMapper.readTree(value);
Tweet tweet = new Tweet();
tweet.setId(node.get("id_str").asText());
tweet.setTimestamp(Long.parseLong(node.get("timestamp_ms").asText()));

ArrayNode arrayNode = (ArrayNode) node.get("coordinates").get("coordinates");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static void main(String[] args) throws IOException {
"pCx8t4pEh6RHoMQChn9SpymIUCoJTrD3KDDQNgBDwEgh4jxUI41",
"827004953310949377-qlvGf7jrJmtIvJd77XllpzDwyiOVbst1",
"IaJvnMHYD0JVPIYgyRCIPCyFOTMEYCcrpvsSnlNtT4FQm1",
"54.212.221.82:9092",
"34.218.59.198:9092",
"tweets"
);

Expand Down

0 comments on commit 1ed0075

Please sign in to comment.