Skip to content

Commit

Permalink
Update flinkTable25.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKRAJPUT5 authored Oct 14, 2019
1 parent a876a3e commit ab67767
Showing 1 changed file with 90 additions and 67 deletions.
157 changes: 90 additions & 67 deletions src/main/scala/flinkTable25.scala
Original file line number Diff line number Diff line change
@@ -1,82 +1,105 @@

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment

object flinkTable25 extends App {

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")

val temperature = new FlinkKafkaConsumer010("broadcast", new SimpleStringSchema, properties)

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, fsSettings)

val stream: DataStream[Event] = env.
addSource(temperature)
.map { v =>
val t = v.split(",")
Event(t.head.trim.toInt, t(1), t(2).trim.toDouble)
}
// .map(data => (data(0).trim.toInt, data(1), data(2).trim.toDouble))
/* val streamInfo = new RowTypeInfo(
Types.INT,
Types.STRING,
Types.DOUBLE
object RealTimeAlert extends App {

/**
* @param ID
* @param locationID
* @param temp
* case class for the table columns made (event Table)
*/
case class Event(ID: Int, locationID: String, temp: Double)

/**
* @param ID
* @param locationIDPat
* @param temperature
* case class for the table columns made (Temp)
*/
case class Temp(ID: Int, locationIDPat: String, temperature: Double)


val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
/**
* This temperature val will create a Kafka Stream for flink to consume with topic -broadcast, and is read as simple String schema
*/
val temperature = new FlinkKafkaConsumer010("broadcast", new SimpleStringSchema, properties)

/**
* Environment Settings, Stream Execution Environment and Stream Table environment are made
*/
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, fsSettings)

/**
*Data Stream for the Event data namely stream(data on which patterns are to be matched) is made
*/
val stream: DataStream[Event] = env.
addSource(temperature)
.map { v =>
val t = v.split(",")
Event(t.head.trim.toInt, t(1), t(2).trim.toDouble)
}

/**
*stream DataStream is converted into a table by registering it with the name event
*/
val tbl = tenv.registerDataStream("event", stream, 'ID, 'locationID, 'temp)

/**
* This pattern val will create a Kafka Stream for flink to consume with topic -pattern, and is read as simple String schema
*/
val pattern = new FlinkKafkaConsumer010("pattern", new SimpleStringSchema(), properties)

/**
*Data Stream for the pattern data namely streamPat(data which will create alert if got matched with any Event (stream)) is made
*/
val streamPat: DataStream[Temp] = env.
addSource(pattern)
.map {
v =>
val tp = v.split(",")
Temp(tp.head.trim.toInt, tp(1), tp(2).trim.toDouble)
}

/**
* streamPat DataStream is converted into a table by registering it with the name patt
*/
val tbl1 = tenv.registerDataStream("patt", streamPat, 'ID, 'locationIDPat, 'temperature)

/**
*Join is performed on the two table namely -event and pat , the continuous query made will check the ID and
* if the ID's are same it'll check whether the temp of the location of particular ID has temperature more than or
* equal to the predefined temperatures stated for all the particular locations
*/
val res: Table = tenv.sqlQuery(
"""
|select event.ID,event.locationID, event.temp
|from event
|JOIN patt
|ON event.ID = patt.ID
|AND event.temp >= patt.temperature
|""".stripMargin
)

val parsedStreamData: DataStream[Row] = stream.map(x => x.split(","))
.map(data => (data(0).toInt, data(1), data(2).toDouble))(streamInfo)
print(stream.getClass)*/
val tbl = tenv.registerDataStream("event", stream, 'ID, 'locationID, 'temp)

val pattern = new FlinkKafkaConsumer010("pattern", new SimpleStringSchema(), properties)
val streamPat: DataStream[Temp] = env.
addSource(pattern)
.map {
v =>
val tp = v.split(",")
Temp(tp.head.trim.toInt, tp(1), tp(2).trim.toDouble)
}

// .map(patt => (patt(0).trim.toInt, patt(1), patt(2).trim.toDouble))

val tbl1 = tenv.registerDataStream("patt", streamPat, 'ID, 'locationIDPat, 'temperature)

// val res = tenv.sqlQuery(
// """
// |select *
// |FROM kafka AS k,
// |flink AS f
// |where k.kID = f.fID
// |""".stripMargin
// )
val res: Table = tenv.sqlQuery(
"""
|select event.ID,event.locationID, event.temp
|from event
|JOIN patt
|ON event.ID = patt.ID
|AND event.temp >= patt.temperature
|""".stripMargin
)

// println(res.getClass)
res.toAppendStream[Event].print("Alert for these location")

res.toAppendStream[Event].print("Alert for these location")
/**
* Used to execute the environment , so that the code could be run
*/
env.execute()

env.execute()

case class Event(ID: Int, locationID: String, temp: Double)
case class Temp(ID: Int, locationIDPat: String, temperature: Double)
}

}

0 comments on commit ab67767

Please sign in to comment.