Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKRAJPUT5 authored Oct 11, 2019
0 parents commit a876a3e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name := "flinkTable25"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

libraryDependencies += "org.apache.flink" % "flink-table" % "1.9.0"

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.10" % "1.9.0"

libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.9.0"

libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.9.0"
82 changes: 82 additions & 0 deletions src/main/scala/flinkTable25.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment

object flinkTable25 extends App {

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")

val temperature = new FlinkKafkaConsumer010("broadcast", new SimpleStringSchema, properties)

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, fsSettings)

val stream: DataStream[Event] = env.
addSource(temperature)
.map { v =>
val t = v.split(",")
Event(t.head.trim.toInt, t(1), t(2).trim.toDouble)
}
// .map(data => (data(0).trim.toInt, data(1), data(2).trim.toDouble))
/* val streamInfo = new RowTypeInfo(
Types.INT,
Types.STRING,
Types.DOUBLE
)
val parsedStreamData: DataStream[Row] = stream.map(x => x.split(","))
.map(data => (data(0).toInt, data(1), data(2).toDouble))(streamInfo)
print(stream.getClass)*/
val tbl = tenv.registerDataStream("event", stream, 'ID, 'locationID, 'temp)

val pattern = new FlinkKafkaConsumer010("pattern", new SimpleStringSchema(), properties)
val streamPat: DataStream[Temp] = env.
addSource(pattern)
.map {
v =>
val tp = v.split(",")
Temp(tp.head.trim.toInt, tp(1), tp(2).trim.toDouble)
}

// .map(patt => (patt(0).trim.toInt, patt(1), patt(2).trim.toDouble))

val tbl1 = tenv.registerDataStream("patt", streamPat, 'ID, 'locationIDPat, 'temperature)

// val res = tenv.sqlQuery(
// """
// |select *
// |FROM kafka AS k,
// |flink AS f
// |where k.kID = f.fID
// |""".stripMargin
// )
val res: Table = tenv.sqlQuery(
"""
|select event.ID,event.locationID, event.temp
|from event
|JOIN patt
|ON event.ID = patt.ID
|AND event.temp >= patt.temperature
|""".stripMargin
)

// println(res.getClass)

res.toAppendStream[Event].print("Alert for these location")

env.execute()

case class Event(ID: Int, locationID: String, temp: Double)
case class Temp(ID: Int, locationIDPat: String, temperature: Double)

}

0 comments on commit a876a3e

Please sign in to comment.