Skip to content
袁逸凡 edited this page Aug 9, 2018 · 2 revisions

日志分析处理简介

收集到的日志本质上是一个无限的流,并且写入了某种类型的消息队列,那么我们可以使用流处理的方式进行处理。

这一部分暂时不开源,但是提供一些思路。

思路简述

通过分析日志流我们可以做到什么?

  • Nginx慢速接口统计
  • 心跳包失联报警
  • 错误日志过多报警
  • 接口调用统计

使用FLink开始你的分析

我们可以使用FLink这样的流数据处理框架加速我们的开发,首先从新建一个来自Kafka的DataStream,并且将数据变成Document类型方便下一步分析:

配置文件lofka-kafka-client.properties

bootstrap.servers = data1.cvnavi-test.com:9092,data2.cvnavi-test.com:9092,data3.cvnavi-test.com:9092
group.id=logger-json-server-consumer
enable.auto.commit=false
auto.commit.interval.ms=99999999
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# My config
kafka.topic=logger-json

创建流logSource

import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.{DeserializationSchema, SimpleStringSchema}
import org.bson.Document

随后在main函数中加入:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val logSource: DataStream[Document] = env.addSource(
    new FlinkKafkaConsumer09[String]("logger-json", new SimpleStringSchema(), {
        // Generate properties here
        val stream = LogQpsStatistic.getClass.getResourceAsStream("/lofka-kafka-client.properties")
        val properties = new Properties()
        properties.load(stream)
        stream.close()
        properties
    })
).flatMap(
    str => try {
        Some(Document.parse(str))
    } catch {
        case _: Throwable => None
    }
)

接下来对logSource进行相应的处理即可: