-
Notifications
You must be signed in to change notification settings - Fork 19
袁逸凡 edited this page Aug 9, 2018
·
2 revisions
收集到的日志本质上是一个无限的流,并且写入了某种类型的消息队列,那么我们可以使用流处理的方式进行处理。
这一部分暂时不开源,但是提供一些思路。
通过分析日志流我们可以做到什么?
- Nginx慢速接口统计
- 心跳包失联报警
- 错误日志过多报警
- 接口调用统计
我们可以使用FLink这样的流数据处理框架加速我们的开发,首先从新建一个来自Kafka的DataStream,并且将数据变成Document类型方便下一步分析:
配置文件lofka-kafka-client.properties
:
bootstrap.servers = data1.cvnavi-test.com:9092,data2.cvnavi-test.com:9092,data3.cvnavi-test.com:9092
group.id=logger-json-server-consumer
enable.auto.commit=false
auto.commit.interval.ms=99999999
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# My config
kafka.topic=logger-json
创建流logSource
:
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.{DeserializationSchema, SimpleStringSchema}
import org.bson.Document
随后在main函数中加入:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val logSource: DataStream[Document] = env.addSource(
new FlinkKafkaConsumer09[String]("logger-json", new SimpleStringSchema(), {
// Generate properties here
val stream = LogQpsStatistic.getClass.getResourceAsStream("/lofka-kafka-client.properties")
val properties = new Properties()
properties.load(stream)
stream.close()
properties
})
).flatMap(
str => try {
Some(Document.parse(str))
} catch {
case _: Throwable => None
}
)
接下来对logSource进行相应的处理即可: