Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKRAJPUT5 authored Oct 7, 2019
0 parents commit 9d5dd9c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#THIS IS FLINK EXAMPLE

This function takes a data stream and matches it against a query

It's similar to CEP

It gives out the location's which have temperature more than the Threshold Temperature
13 changes: 13 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name := "CEPdone"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.10" % "1.9.0"

libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
52 changes: 52 additions & 0 deletions src/main/scala/flinkExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import java.util
import java.util.Properties

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema

object flinkExample {
def main(args: Array[String]): Unit = {

val TEMPERATURE_THRESHOLD: Double = 50.00

val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")

val src: DataStream[ObjectNode] = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast", new JSONKeyValueDeserializationSchema(false), properties))

var keyedStream = src.map(v => v.get("value"))
.map {
v =>
val loc = v.get("locationID").asText()
val temp = v.get("temp").asDouble()
(loc, temp)
}
.keyBy(v => v._2)

val pat = Pattern
.begin[(String, Double)]("start")
.where(_._2 > TEMPERATURE_THRESHOLD)

val patternStream = CEP.pattern(keyedStream, pat)

val result: DataStream[Map[String, Any]] = patternStream.select(
new PatternSelectFunction[(String, Double), Map[String, Any]]() {
override def select(pattern: util.Map[String, util.List[(String, Double)]]): Map[String, Any] = {
val data = pattern.get("start").get(0) //alternative of iteration
Map("locationID" -> data._1, "temperature" -> data._2)
}
}
)

result.print()
see.execute("ASK Flink Kafka")
}
}

0 comments on commit 9d5dd9c

Please sign in to comment.