Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.6] Add metrics for query types #901

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ public final class MetricConstants {
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";

/**
* Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX)
*/
public static final String QUERY_DROP_COUNT_METRIC = "query.drop.count";
public static final String QUERY_VACUUM_COUNT_METRIC = "query.vacuum.count";
public static final String QUERY_ALTER_COUNT_METRIC = "query.alter.count";
public static final String QUERY_REFRESH_COUNT_METRIC = "query.refresh.count";
public static final String QUERY_CREATE_INDEX_COUNT_METRIC = "query.createIndex.count";
public static final String QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC = "query.createIndex.autoRefresh.count";
public static final String QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC = "query.createIndex.manualRefresh.count";

/**
* Metric for tracking the total bytes read from input
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql

import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

trait IndexMetricHelper {
def emitCreateIndexMetric(autoRefresh: Boolean): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_COUNT_METRIC)
if (autoRefresh) {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_AUTO_REFRESH_COUNT_METRIC)
} else {
MetricsUtil.incrementCounter(MetricConstants.QUERY_CREATE_INDEX_MANUAL_REFRESH_COUNT_METRIC)
}
}

def emitRefreshIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
}

def emitAlterIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_ALTER_COUNT_METRIC)
}

def emitDropIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_DROP_COUNT_METRIC)
}

def emitVacuumIndexMetric(): Unit = {
MetricsUtil.incrementCounter(MetricConstants.QUERY_VACUUM_COUNT_METRIC)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -20,7 +21,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
*/
trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkCoveringIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateCoveringIndexStatement(
Expand Down Expand Up @@ -49,6 +52,8 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions, indexName)
.create(ignoreIfExists)

emitCreateIndexMetric(indexOptions.autoRefresh())

// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && !indexBuilder.isExternalSchedulerEnabled()) {
Expand All @@ -62,6 +67,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
MetricsUtil.incrementCounter(MetricConstants.QUERY_REFRESH_COUNT_METRIC)
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -107,6 +113,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterCoveringIndexStatement(
ctx: AlterCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -121,6 +128,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.deleteIndex(flintIndexName)
Seq.empty
Expand All @@ -130,6 +138,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumCoveringIndexStatement(
ctx: VacuumCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.vacuumIndex(flintIndexName)
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -22,7 +22,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
*/
trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkMaterializedViewAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
Expand All @@ -40,6 +42,8 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
val indexOptions = visitPropertyList(ctx.propertyList())
val flintIndexName = getFlintIndexName(flint, ctx.mvName)

emitCreateIndexMetric(indexOptions.autoRefresh())

mvBuilder
.options(indexOptions, flintIndexName)
.create(ignoreIfExists)
Expand All @@ -56,6 +60,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
Seq.empty
Expand Down Expand Up @@ -106,6 +111,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitAlterMaterializedViewStatement(
ctx: AlterMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getFlintIndexName(flint, ctx.mvName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand All @@ -120,6 +126,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand All @@ -128,6 +135,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
override def visitVacuumMaterializedViewStatement(
ctx: VacuumMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, IndexMetricHelper, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

Expand All @@ -26,7 +26,9 @@ import org.apache.spark.sql.types.StringType
/**
* Flint Spark AST builder that builds Spark command for Flint skipping index statement.
*/
trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
trait FlintSparkSkippingIndexAstBuilder
extends FlintSparkSqlExtensionsVisitor[AnyRef]
with IndexMetricHelper {
self: SparkSqlAstBuilder =>

override def visitCreateSkippingIndexStatement(
Expand Down Expand Up @@ -73,6 +75,8 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val indexOptions = visitPropertyList(ctx.propertyList())
val indexName = getSkippingIndexName(flint, ctx.tableName)

emitCreateIndexMetric(indexOptions.autoRefresh())

indexBuilder
.options(indexOptions, indexName)
.create(ignoreIfExists)
Expand All @@ -88,6 +92,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitRefreshIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
Seq.empty
Expand Down Expand Up @@ -115,6 +120,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitAlterSkippingIndexStatement(
ctx: AlterSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitAlterIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
Expand Down Expand Up @@ -142,6 +148,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
emitDropIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
Expand All @@ -150,6 +157,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitVacuumSkippingIndexStatement(
ctx: VacuumSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
emitVacuumIndexMetric()
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.vacuumIndex(indexName)
Seq.empty
Expand Down
Loading