Skip to content

Commit

Permalink
Feeds (#21)
Browse files Browse the repository at this point in the history
* Add yaml config for feeds
  • Loading branch information
d10xa authored Dec 12, 2024
1 parent 6e365c2 commit 4ae0f1a
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import fs2.*

import java.io.*

class ShellImpl extends Shell {
class ShellImpl[F[_]] extends Shell[F] {

def mergeCommands(commands: List[String]): Stream[IO, String] = Stream.empty
def mergeCommands(commands: List[String]): Stream[F, String] = Stream.empty

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.effect.IO
import ru.d10xa.jsonlogviewer.ConfigYamlReader
import ru.d10xa.jsonlogviewer.decline.Config.FormatIn
import cats.syntax.all.*
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml

import java.io.File

Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
package ru.d10xa.jsonlogviewer.shell

import cats.effect.*
import fs2.*
import cats.syntax.all.*

import java.io.*
class ShellImpl[F[_]: Async] extends Shell[F] {

class ShellImpl extends Shell {

def createProcess(command: String): Resource[IO, Process] =
Resource.make(IO {
def createProcess(command: String): Resource[F, Process] =
Resource.make(Async[F].delay {
new ProcessBuilder("sh", "-c", command)
.redirectErrorStream(true)
.start()
})(process => IO(process.destroy()).void)
})(process => Async[F].delay(process.destroy()))

def runInfiniteCommand(command: String): Stream[IO, String] =
Stream.resource(createProcess(command)).flatMap { process =>
def runInfiniteCommand(command: String): fs2.Stream[F, String] =
fs2.Stream.resource(createProcess(command)).flatMap { process =>
fs2.io
.readInputStream(
IO(process.getInputStream),
Async[F].delay(process.getInputStream),
4096,
closeAfterUse = false
)
.through(text.utf8.decode)
.through(text.lines)
.onFinalize(IO {
.through(fs2.text.utf8.decode)
.through(fs2.text.lines)
.onFinalize(Async[F].delay {
process.waitFor()
}.void)
}

def mergeCommands(commands: List[String]): Stream[IO, String] = {
def mergeCommands(commands: List[String]): fs2.Stream[F, String] = {
val streams = commands.map(runInfiniteCommand)
Stream.emits(streams).parJoin(math.max(1, commands.length))
fs2.Stream.emits(streams).parJoin(math.max(1, commands.length))
}

}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package ru.d10xa.jsonlogviewer

import cats.data.Validated
import cats.effect.*
import com.monovore.decline.Opts
import com.monovore.decline.effect.CommandIOApp
import fs2.*
import fs2.io.*
import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.Config.FormatIn
import ru.d10xa.jsonlogviewer.decline.DeclineOpts
import ru.d10xa.jsonlogviewer.logfmt.LogfmtLogLineParser
import _root_.io.circe.yaml.scalayaml.parser
import cats.syntax.all.*
import ru.d10xa.jsonlogviewer.decline.ConfigInit
import ru.d10xa.jsonlogviewer.decline.ConfigInitImpl
import ru.d10xa.jsonlogviewer.decline.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.DeclineOpts
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.logfmt.LogfmtLogLineParser
import ru.d10xa.jsonlogviewer.shell.ShellImpl

object Application
Expand All @@ -23,30 +20,13 @@ object Application
"Print json logs in human-readable form"
):

private val stdinLinesStream: Stream[IO, String] =
stdinUtf8[IO](1024 * 1024 * 10)
.repartition(s => Chunk.array(s.split("\n", -1)))
.filter(_.nonEmpty)

private val configInit: ConfigInit = new ConfigInitImpl

def main: Opts[IO[ExitCode]] = DeclineOpts.config.map { c =>
configInit.initConfig(c).flatMap { updatedConfig =>
IO {
val jsonPrefixPostfix = JsonPrefixPostfix(JsonDetector())
val logLineParser = updatedConfig.formatIn match {
case Some(FormatIn.Logfmt) => LogfmtLogLineParser(updatedConfig)
case _ => JsonLogLineParser(updatedConfig, jsonPrefixPostfix)
}
val commandsOpt = updatedConfig.configYaml.flatMap(_.commands).filter(_.nonEmpty)
val stream = commandsOpt match {
case Some(cmds) if cmds.nonEmpty =>
new ShellImpl().mergeCommands(cmds)
case _ =>
stdinLinesStream
}
stream
.through(LogViewerStream.stream[IO](updatedConfig, logLineParser))
LogViewerStream
.stream[IO](updatedConfig)
.through(text.utf8.encode)
.through(io.stdout)
.compile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ru.d10xa.jsonlogviewer

import cats.effect.IO
import cats.data.ValidatedNel
import ru.d10xa.jsonlogviewer.decline.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.ConfigYamlLoader
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYamlLoader

import scala.io.Source

Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,115 @@
package ru.d10xa.jsonlogviewer

import cats.effect.Async
import cats.syntax.all.*
import fs2.*
import fs2.io.*
import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.Config.FormatIn
import ru.d10xa.jsonlogviewer.decline.ConfigInit
import ru.d10xa.jsonlogviewer.decline.ConfigInitImpl
import ru.d10xa.jsonlogviewer.decline.DeclineOpts
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.yaml.Feed
import ru.d10xa.jsonlogviewer.formatout.ColorLineFormatter
import ru.d10xa.jsonlogviewer.formatout.RawFormatter
import ru.d10xa.jsonlogviewer.logfmt.LogfmtLogLineParser
import ru.d10xa.jsonlogviewer.query.QueryAST
import ru.d10xa.jsonlogviewer.shell.ShellImpl

object LogViewerStream {

def stream[F[_]](
private def makeLogLineParser(
config: Config,
logLineParser: LogLineParser
): Pipe[F, String, String] = stream =>
optFormatIn: Option[FormatIn]
): LogLineParser = {
val jsonPrefixPostfix = JsonPrefixPostfix(JsonDetector())
optFormatIn match {
case Some(FormatIn.Logfmt) => LogfmtLogLineParser(config)
case _ => JsonLogLineParser(config, jsonPrefixPostfix)
}
}

private def commandsToStream[F[_]: Async](
commands: List[String]
): Stream[F, String] = {
new ShellImpl[F]().mergeCommands(commands)
}

private def stdinLinesStream[F[_]: Async]: Stream[F, String] =
stdinUtf8[F](1024 * 1024 * 10)
.repartition(s => Chunk.array(s.split("\n", -1)))
.filter(_.nonEmpty)

private def processStream[F[_]: Async](
baseConfig: Config,
lines: Stream[F, String],
feedFilter: Option[QueryAST],
feedFormatIn: Option[FormatIn],
feedName: Option[String]
): Stream[F, String] = {
val effectiveFormatIn = feedFormatIn.orElse(baseConfig.formatIn)
val effectiveFilter = feedFilter.orElse(baseConfig.filter)
val effectiveConfig = baseConfig.copy(
filter = effectiveFilter,
formatIn = effectiveFormatIn
)

val timestampFilter = TimestampFilter()
val outputLineFormatter = config.formatOut match
val parseResultKeys = ParseResultKeys(effectiveConfig)
val logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys)
val logLineParser = makeLogLineParser(effectiveConfig, effectiveFormatIn)
val outputLineFormatter = effectiveConfig.formatOut match
case Some(Config.FormatOut.Raw) => RawFormatter()
case Some(Config.FormatOut.Pretty) | None => ColorLineFormatter(config)

val parseResultKeys = ParseResultKeys(config)
val logLineFilter = LogLineFilter(config, parseResultKeys)
stream
case Some(Config.FormatOut.Pretty) | None =>
ColorLineFormatter(effectiveConfig, feedName)

lines
.map(logLineParser.parse)
.filter(logLineFilter.grep)
.filter(logLineFilter.logLineQueryPredicate)
.through(timestampFilter.filterTimestampAfter[F](config.timestamp.after))
.through(
timestampFilter.filterTimestampBefore[F](config.timestamp.before)
timestampFilter.filterTimestampAfter[F](effectiveConfig.timestamp.after)
)
.through(
timestampFilter.filterTimestampBefore[F](
effectiveConfig.timestamp.before
)
)
.map(outputLineFormatter.formatLine)
.map(_.toString)
}

def stream[F[_]: Async](config: Config): Stream[F, String] = {
val topCommandsOpt: Option[List[String]] =
config.configYaml.flatMap(_.commands).filter(_.nonEmpty)
val feedsOpt: Option[List[Feed]] =
config.configYaml.flatMap(_.feeds).filter(_.nonEmpty)

val finalStream = feedsOpt match {
case Some(feeds) =>
val feedStreams = feeds.map { feed =>
val feedStream = commandsToStream[F](feed.commands)
processStream(
config,
feedStream,
feed.filter,
feed.formatIn,
feed.name.some
)
}
Stream.emits(feedStreams).parJoin(feedStreams.size)

case None =>
val baseStream = topCommandsOpt match {
case Some(cmds) => commandsToStream[F](cmds)
case None => stdinLinesStream[F]
}
processStream(config, baseStream, None, None, None)
}

finalStream
.intersperse("\n")
.append(Stream.emit("\n"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.Config.ConfigGrep
import ru.d10xa.jsonlogviewer.decline.ConfigFile
import ru.d10xa.jsonlogviewer.decline.TimestampConfig
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.query.QueryAST

import scala.util.matching.Regex
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ru.d10xa.jsonlogviewer.decline.yaml

import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.query.QueryAST

case class ConfigYaml(
filter: Option[QueryAST],
formatIn: Option[Config.FormatIn],
commands: Option[List[String]],
feeds: Option[List[Feed]]
)

object ConfigYaml:
val empty: ConfigYaml = ConfigYaml(None, None, None, None)

Loading

0 comments on commit 4ae0f1a

Please sign in to comment.