diff --git a/DevOps/tweet-collector.yaml b/DevOps/tweet-collector.yaml index 3b9f6a6..4278da3 100644 --- a/DevOps/tweet-collector.yaml +++ b/DevOps/tweet-collector.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - name: tweet-collector - image: zmyzheng/tweet-collector:1.0-SNAPSHOT + image: zmyzheng/tweet-collector:1.1-SNAPSHOT # ports: # - containerPort: 8443 # args: ["--kafka.bootstrapServers=b-3.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092,b-1.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092,b-2.fs-ec-msk-cluster-vpc.n6h4ok.c2.kafka.us-west-2.amazonaws.com:9092"] diff --git a/README.md b/README.md index 239da7f..ab6b15e 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ./gradlew :tweet-collector:clean :tweet-collector:build :tweet-collector:dockerPush -java -jar tweet-collector/build/libs/tweet-collector-1.0-SNAPSHOT.jar +java -jar tweet-collector/build/libs/tweet-collector-1.1-SNAPSHOT.jar ./gradlew :flink-processor:clean :flink-processor:build diff --git a/build.gradle b/build.gradle index 3ea3c7a..64fe5d6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ subprojects { group 'io.zmyzheng' - version '1.0-SNAPSHOT' + version '1.1-SNAPSHOT' diff --git a/flink-processor/src/main/java/io/zmyzheng/processor/EsSinkFactory.java b/flink-processor/src/main/java/io/zmyzheng/processor/EsSinkFactory.java index aab0b66..dc7ba34 100644 --- a/flink-processor/src/main/java/io/zmyzheng/processor/EsSinkFactory.java +++ b/flink-processor/src/main/java/io/zmyzheng/processor/EsSinkFactory.java @@ -46,6 +46,7 @@ public IndexRequest createIndexRequest(Tweet element) throws JsonProcessingExcep return Requests.indexRequest() .index("streaming") .type("tweets") + .id(element.getId()) .source(objectMapper.writeValueAsBytes(element), XContentType.JSON); } diff --git a/flink-processor/src/main/java/io/zmyzheng/processor/StreamProcessor.java b/flink-processor/src/main/java/io/zmyzheng/processor/StreamProcessor.java index a22c4c2..6a5634a 100644 --- a/flink-processor/src/main/java/io/zmyzheng/processor/StreamProcessor.java +++ b/flink-processor/src/main/java/io/zmyzheng/processor/StreamProcessor.java @@ -58,7 +58,7 @@ public void run() throws Exception { env.getCheckpointConfig().setPreferCheckpointForRecovery(true); Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "54.212.221.82:9092"); + properties.setProperty("bootstrap.servers", "34.218.59.198:9092"); properties.setProperty("group.id", this.getClass().getName()); DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("tweets", new SimpleStringSchema(), properties)); @@ -71,6 +71,7 @@ public Tweet map(String value) { try { JsonNode node = objectMapper.readTree(value); Tweet tweet = new Tweet(); + tweet.setId(node.get("id_str").asText()); tweet.setTimestamp(Long.parseLong(node.get("timestamp_ms").asText())); ArrayNode arrayNode = (ArrayNode) node.get("coordinates").get("coordinates"); diff --git a/tweet-collector/src/main/java/io/zmyzheng/collector/App.java b/tweet-collector/src/main/java/io/zmyzheng/collector/App.java index ecbd7c2..42301b7 100644 --- a/tweet-collector/src/main/java/io/zmyzheng/collector/App.java +++ b/tweet-collector/src/main/java/io/zmyzheng/collector/App.java @@ -36,7 +36,7 @@ public static void main(String[] args) throws IOException { "pCx8t4pEh6RHoMQChn9SpymIUCoJTrD3KDDQNgBDwEgh4jxUI41", "827004953310949377-qlvGf7jrJmtIvJd77XllpzDwyiOVbst1", "IaJvnMHYD0JVPIYgyRCIPCyFOTMEYCcrpvsSnlNtT4FQm1", - "54.212.221.82:9092", + "34.218.59.198:9092", "tweets" );